博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
webrtc中BBR算法源码分析
阅读量:3558 次
发布时间:2019-05-20

本文共 19904 字,大约阅读时间需要 66 分钟。

从哪里开始讲起呢?还是从收到transport feedback报文开始讲起吧。如下

NetworkControlUpdate BbrNetworkController::OnTransportPacketsFeedback(    TransportPacketsFeedback msg) {  Timestamp feedback_recv_time = msg.feedback_time;  absl::optional
last_sent_packet = msg.PacketsWithFeedback().back().sent_packet; if (!last_sent_packet.has_value()) { RTC_LOG(LS_WARNING) << "Last ack packet not in history, no RTT update"; } else { Timestamp send_time = last_sent_packet->send_time; TimeDelta send_delta = feedback_recv_time - send_time; rtt_stats_.UpdateRtt(send_delta, TimeDelta::Zero(), feedback_recv_time); } const DataSize total_data_acked_before = sampler_->total_data_acked(); bool is_round_start = false; bool min_rtt_expired = false; std::vector
lost_packets = msg.LostWithSendInfo(); DiscardLostPackets(lost_packets); std::vector
acked_packets = msg.ReceivedWithSendInfo(); int packets_sent = static_cast
(lost_packets.size() + acked_packets.size()); int packets_lost = static_cast
(lost_packets.size()); loss_rate_.UpdateWithLossStatus(msg.feedback_time.ms(), packets_sent, packets_lost); // Input the new data into the BBR model of the connection. if (!acked_packets.empty()) { int64_t last_acked_packet = acked_packets.rbegin()->sent_packet->sequence_number; is_round_start = UpdateRoundTripCounter(last_acked_packet); min_rtt_expired = UpdateBandwidthAndMinRtt(msg.feedback_time, acked_packets); UpdateRecoveryState(last_acked_packet, !lost_packets.empty(), is_round_start); const DataSize data_acked = sampler_->total_data_acked() - total_data_acked_before; UpdateAckAggregationBytes(msg.feedback_time, data_acked); if (max_aggregation_bytes_multiplier_ > 0) { if (msg.data_in_flight <= 1.25 * GetTargetCongestionWindow(pacing_gain_)) { bytes_acked_since_queue_drained_ = DataSize::Zero(); } else { bytes_acked_since_queue_drained_ += data_acked; } } } // Handle logic specific to PROBE_BW mode. if (mode_ == PROBE_BW) { UpdateGainCyclePhase(msg.feedback_time, msg.prior_in_flight, !lost_packets.empty()); } // Handle logic specific to STARTUP and DRAIN modes. if (is_round_start && !is_at_full_bandwidth_) { CheckIfFullBandwidthReached(); } MaybeExitStartupOrDrain(msg); // Handle logic specific to PROBE_RTT. MaybeEnterOrExitProbeRtt(msg, is_round_start, min_rtt_expired); // Calculate number of packets acked and lost. DataSize data_acked = sampler_->total_data_acked() - total_data_acked_before; DataSize data_lost = DataSize::Zero(); for (const PacketResult& packet : lost_packets) { data_lost += packet.sent_packet->size; } // After the model is updated, recalculate the pacing rate and congestion // window. CalculatePacingRate(); CalculateCongestionWindow(data_acked); CalculateRecoveryWindow(data_acked, data_lost, msg.data_in_flight); // Cleanup internal state. if (!acked_packets.empty()) { sampler_->RemoveObsoletePackets( acked_packets.back().sent_packet->sequence_number); } return CreateRateUpdate(msg.feedback_time);}

min rtt更新

// Updates the RTT based on a new sample.void RttStats::UpdateRtt(TimeDelta send_delta,                         TimeDelta ack_delay,                         Timestamp now) {  if (send_delta.IsInfinite() || send_delta <= TimeDelta::Zero()) {    RTC_LOG(LS_WARNING) << "Ignoring measured send_delta, because it's is "                        << "either infinite, zero, or negative.  send_delta = "                        << ToString(send_delta);    return;  }  // Update min_rtt_ first. min_rtt_ does not use an rtt_sample corrected for  // ack_delay but the raw observed send_delta, since poor clock granularity at  // the client may cause a high ack_delay to result in underestimation of the  // min_rtt_.  if (min_rtt_.IsZero() || min_rtt_ > send_delta) {    min_rtt_ = send_delta;  }  // Correct for ack_delay if information received from the peer results in a  // positive RTT sample. Otherwise, we use the send_delta as a reasonable  // measure for smoothed_rtt.  TimeDelta rtt_sample = send_delta;  previous_srtt_ = smoothed_rtt_;  if (rtt_sample > ack_delay) {    rtt_sample = rtt_sample - ack_delay;  }  latest_rtt_ = rtt_sample;  // First time call.  if (smoothed_rtt_.IsZero()) {    smoothed_rtt_ = rtt_sample;    mean_deviation_ = rtt_sample / 2;  } else {    mean_deviation_ = kOneMinusBeta * mean_deviation_ +                      kBeta * (smoothed_rtt_ - rtt_sample).Abs();    smoothed_rtt_ = kOneMinusAlpha * smoothed_rtt_ + kAlpha * rtt_sample;    RTC_LOG(LS_VERBOSE) << " smoothed_rtt(us):" << smoothed_rtt_.us()                        << " mean_deviation(us):" << mean_deviation_.us();  }}

统计丢包率

void LossRateFilter::UpdateWithLossStatus(int64_t feedback_time,                                          int packets_sent,                                          int packets_lost) {  lost_packets_since_last_loss_update_ += packets_lost;  expected_packets_since_last_loss_update_ += packets_sent;  //时间过1000ms并且发送的包20个以上统计一次丢包率  if (feedback_time >= next_loss_update_ms_ &&      expected_packets_since_last_loss_update_ >= kLimitNumPackets) {    int64_t lost = lost_packets_since_last_loss_update_;    int64_t expected = expected_packets_since_last_loss_update_;    loss_rate_estimate_ = static_cast
(lost) / expected; next_loss_update_ms_ = feedback_time + kUpdateIntervalMs; lost_packets_since_last_loss_update_ = 0; expected_packets_since_last_loss_update_ = 0; }}

更新周期

//上次发出的包被确认,就认为新周期开始bool BbrNetworkController::UpdateRoundTripCounter(int64_t last_acked_packet) {  if (last_acked_packet > current_round_trip_end_) {    round_trip_count_++;    current_round_trip_end_ = last_sent_packet_;    return true;  }  return false;}

计算采样rtt和带宽

//计算一次rtt和带宽的采样//带宽取的是发送速度和ack速度最小值BandwidthSample BandwidthSampler::OnPacketAcknowledgedInner(    Timestamp ack_time,    int64_t packet_number,    const ConnectionStateOnSentPacket& sent_packet) {  total_data_acked_ += sent_packet.size;  total_data_sent_at_last_acked_packet_ = sent_packet.total_data_sent;  last_acked_packet_sent_time_ = sent_packet.sent_time;  last_acked_packet_ack_time_ = ack_time;  // Exit app-limited phase once a packet that was sent while the connection is  // not app-limited is acknowledged.  if (is_app_limited_ && packet_number > end_of_app_limited_phase_) {    is_app_limited_ = false;  }  // There might have been no packets acknowledged at the moment when the  // current packet was sent. In that case, there is no bandwidth sample to  // make.  if (!sent_packet.last_acked_packet_sent_time ||      !sent_packet.last_acked_packet_ack_time) {    return BandwidthSample();  }  // Infinite rate indicates that the sampler is supposed to discard the  // current send rate sample and use only the ack rate.  DataRate send_rate = DataRate::Infinity();  if (sent_packet.sent_time > *sent_packet.last_acked_packet_sent_time) {    DataSize sent_delta = sent_packet.total_data_sent -                          sent_packet.total_data_sent_at_last_acked_packet;    TimeDelta time_delta =        sent_packet.sent_time - *sent_packet.last_acked_packet_sent_time;    send_rate = sent_delta / time_delta;  }  // During the slope calculation, ensure that ack time of the current packet is  // always larger than the time of the previous packet, otherwise division by  // zero or integer underflow can occur.  if (ack_time <= *sent_packet.last_acked_packet_ack_time) {    RTC_LOG(LS_WARNING)        << "Time of the previously acked packet is larger than the time "           "of the current packet.";    return BandwidthSample();  }  DataSize ack_delta =      total_data_acked_ - sent_packet.total_data_acked_at_the_last_acked_packet;  TimeDelta time_delta = ack_time - *sent_packet.last_acked_packet_ack_time;  DataRate ack_rate = ack_delta / time_delta;  BandwidthSample sample;  sample.bandwidth = std::min(send_rate, ack_rate);  // Note: this sample does not account for delayed acknowledgement time.  This  // means that the RTT measurements here can be artificially high, especially  // on low bandwidth connections.  sample.rtt = ack_time - sent_packet.sent_time;  // A sample is app-limited if the packet was sent during the app-limited  // phase.  sample.is_app_limited = sent_packet.is_app_limited;  return sample;}

更新最大带宽。带宽0,1,2带宽从大到小依次排列。这里new_time不是时间,是round的概念,默认是10。可以理解为收到10次transportCC报文

// Updates best estimates with |sample|, and expires and updates best  // estimates as necessary.  void Update(T new_sample, TimeT new_time) {    // Reset all estimates if they have not yet been initialized, if new sample    // is a new best, or if the newest recorded estimate is too old.    if (estimates_[0].sample == zero_value_ ||        Compare()(new_sample, estimates_[0].sample) ||        new_time - estimates_[2].time > window_length_) {      Reset(new_sample, new_time);      return;    }    if (Compare()(new_sample, estimates_[1].sample)) {      estimates_[1] = Sample(new_sample, new_time);      estimates_[2] = estimates_[1];    } else if (Compare()(new_sample, estimates_[2].sample)) {      estimates_[2] = Sample(new_sample, new_time);    }    // Expire and update estimates as necessary.    if (new_time - estimates_[0].time > window_length_) {      // The best estimate hasn't been updated for an entire window, so promote      // second and third best estimates.      estimates_[0] = estimates_[1];      estimates_[1] = estimates_[2];      estimates_[2] = Sample(new_sample, new_time);      // Need to iterate one more time. Check if the new best estimate is      // outside the window as well, since it may also have been recorded a      // long time ago. Don't need to iterate once more since we cover that      // case at the beginning of the method.      if (new_time - estimates_[0].time > window_length_) {        estimates_[0] = estimates_[1];        estimates_[1] = estimates_[2];      }      return;    }    if (estimates_[1].sample == estimates_[0].sample &&        new_time - estimates_[1].time > window_length_ >> 2) {      // A quarter of the window has passed without a better sample, so the      // second-best estimate is taken from the second quarter of the window.      estimates_[2] = estimates_[1] = Sample(new_sample, new_time);      return;    }    if (estimates_[2].sample == estimates_[1].sample &&        new_time - estimates_[2].time > window_length_ >> 1) {      // We've passed a half of the window without a better estimate, so take      // a third-best estimate from the second half of the window.      estimates_[2] = Sample(new_sample, new_time);    }  }
//这个主要来控制发送到网络上未被确认的数据量void BbrNetworkController::UpdateRecoveryState(int64_t last_acked_packet,                                               bool has_losses,                                               bool is_round_start) {  // Exit recovery when there are no losses for a round.  if (has_losses) {    end_recovery_at_ = last_sent_packet_;  }  switch (recovery_state_) {    case NOT_IN_RECOVERY:      // Enter conservation on the first loss.      if (has_losses) {        recovery_state_ = CONSERVATION;        if (mode_ == STARTUP) {          recovery_state_ = config_.initial_conservation_in_startup;        }        // This will cause the |recovery_window_| to be set to the correct        // value in CalculateRecoveryWindow().        recovery_window_ = DataSize::Zero();        // Since the conservation phase is meant to be lasting for a whole        // round, extend the current round as if it were started right now.        current_round_trip_end_ = last_sent_packet_;      }      break;    case CONSERVATION:    case MEDIUM_GROWTH:      if (is_round_start) {        recovery_state_ = GROWTH;      }      RTC_FALLTHROUGH();    case GROWTH:      // Exit recovery if appropriate.      if (!has_losses &&          (!end_recovery_at_ || last_acked_packet > *end_recovery_at_)) {        recovery_state_ = NOT_IN_RECOVERY;      }      break;  }}

计算一下比预期的多发了多少数据

void BbrNetworkController::UpdateAckAggregationBytes(    Timestamp ack_time,    DataSize newly_acked_bytes) {  if (!aggregation_epoch_start_time_) {    RTC_LOG(LS_ERROR)        << "Received feedback before information about sent packets.";    RTC_DCHECK(aggregation_epoch_start_time_.has_value());    return;  }  // Compute how many bytes are expected to be delivered, assuming max bandwidth  // is correct.  DataSize expected_bytes_acked =      max_bandwidth_.GetBest() * (ack_time - *aggregation_epoch_start_time_);  // Reset the current aggregation epoch as soon as the ack arrival rate is less  // than or equal to the max bandwidth.  if (aggregation_epoch_bytes_ <= expected_bytes_acked) {    // Reset to start measuring a new aggregation epoch.    aggregation_epoch_bytes_ = newly_acked_bytes;    aggregation_epoch_start_time_ = ack_time;    return;  }  // Compute how many extra bytes were delivered vs max bandwidth.  // Include the bytes most recently acknowledged to account for stretch acks.  aggregation_epoch_bytes_ += newly_acked_bytes;  max_ack_height_.Update(aggregation_epoch_bytes_ - expected_bytes_acked,                         round_trip_count_);}

计算拥塞窗口

//其实核心就是最小rtt和当前估计带宽的乘积DataSize BbrNetworkController::GetTargetCongestionWindow(double gain) const {  DataSize bdp = GetMinRtt() * BandwidthEstimate();  DataSize congestion_window = gain * bdp;  // BDP estimate will be zero if no bandwidth samples are available yet.  if (congestion_window.IsZero()) {    congestion_window = gain * initial_congestion_window_;  }  return std::max(congestion_window, min_congestion_window_);}

probe_bw阶段更新

//看注释void BbrNetworkController::UpdateGainCyclePhase(Timestamp now,                                                DataSize prior_in_flight,                                                bool has_losses) {  // In most cases, the cycle is advanced after an RTT passes.  bool should_advance_gain_cycling = now - last_cycle_start_ > GetMinRtt();  // If the pacing gain is above 1.0, the connection is trying to probe the  // bandwidth by increasing the number of bytes in flight to at least  // pacing_gain * BDP.  Make sure that it actually reaches the target, as long  // as there are no losses suggesting that the buffers are not able to hold  // that much.  if (pacing_gain_ > 1.0 && !has_losses &&      prior_in_flight < GetTargetCongestionWindow(pacing_gain_)) {    should_advance_gain_cycling = false;  }  // If pacing gain is below 1.0, the connection is trying to drain the extra  // queue which could have been incurred by probing prior to it.  If the number  // of bytes in flight falls down to the estimated BDP value earlier, conclude  // that the queue has been successfully drained and exit this cycle early.  if (pacing_gain_ < 1.0 && prior_in_flight <= GetTargetCongestionWindow(1)) {    should_advance_gain_cycling = true;  }  if (should_advance_gain_cycling) {    cycle_current_offset_ = (cycle_current_offset_ + 1) % kGainCycleLength;    last_cycle_start_ = now;    // Stay in low gain mode until the target BDP is hit.    // Low gain mode will be exited immediately when the target BDP is achieved.    if (config_.fully_drain_queue && pacing_gain_ < 1 &&        GetPacingGain(cycle_current_offset_) == 1 &&        prior_in_flight > GetTargetCongestionWindow(1)) {      return;    }    pacing_gain_ = GetPacingGain(cycle_current_offset_);  }}

//三次带宽不增长就认为到达最大带宽

void BbrNetworkController::CheckIfFullBandwidthReached() {  if (last_sample_is_app_limited_) {    return;  }  DataRate target = bandwidth_at_last_round_ * kStartupGrowthTarget;  if (BandwidthEstimate() >= target) {    bandwidth_at_last_round_ = BandwidthEstimate();    rounds_without_bandwidth_gain_ = 0;    return;  }  rounds_without_bandwidth_gain_++;  if ((rounds_without_bandwidth_gain_ >= config_.num_startup_rtts) ||      (config_.exit_startup_on_loss && InRecovery())) {    is_at_full_bandwidth_ = true;  }}

//

void BbrNetworkController::MaybeExitStartupOrDrain(    const TransportPacketsFeedback& msg) {  TimeDelta exit_threshold = config_.exit_startup_rtt_threshold;  TimeDelta rtt_delta = last_rtt_ - min_rtt_;  //如果在startup阶段到达最大带宽,就转为drain阶段  if (mode_ == STARTUP &&      (is_at_full_bandwidth_ || rtt_delta > exit_threshold)) {    if (rtt_delta > exit_threshold)      RTC_LOG(LS_INFO) << "Exiting startup due to rtt increase from: "                       << ToString(min_rtt_) << " to:" << ToString(last_rtt_)                       << " > " << ToString(min_rtt_ + exit_threshold);    mode_ = DRAIN;    pacing_gain_ = kDrainGain;    congestion_window_gain_ = kHighGain;  }  //当flight数据量小于拥塞控制窗口时进入probeBW阶段  if (mode_ == DRAIN && msg.data_in_flight <= GetTargetCongestionWindow(1)) {    EnterProbeBandwidthMode(msg.feedback_time);  }}

probe

void BbrNetworkController::MaybeEnterOrExitProbeRtt(    const TransportPacketsFeedback& msg,    bool is_round_start,    bool min_rtt_expired) {  if (min_rtt_expired && !exiting_quiescence_ && mode_ != PROBE_RTT) {    mode_ = PROBE_RTT;    pacing_gain_ = 1;    // Do not decide on the time to exit PROBE_RTT until the |bytes_in_flight|    // is at the target small value.    exit_probe_rtt_at_.reset();  }  if (mode_ == PROBE_RTT) {    if (config_.max_renew_bandwith_rtt_ms) {      sampler_->OnAppLimited();    }    if (!exit_probe_rtt_at_) {      // If the window has reached the appropriate size, schedule exiting      // PROBE_RTT.  The CWND during PROBE_RTT is kMinimumCongestionWindow, but      // we allow an extra packet since QUIC checks CWND before sending a      // packet.      if (msg.data_in_flight < ProbeRttCongestionWindow() + kMaxPacketSize) {        exit_probe_rtt_at_ = msg.feedback_time + TimeDelta::ms(kProbeRttTimeMs);        probe_rtt_round_passed_ = false;      }    } else {      if (is_round_start) {        probe_rtt_round_passed_ = true;      }      if (msg.feedback_time >= *exit_probe_rtt_at_ && probe_rtt_round_passed_) {        min_rtt_timestamp_ = msg.feedback_time;        if (!is_at_full_bandwidth_) {          EnterStartupMode();        } else {          EnterProbeBandwidthMode(msg.feedback_time);        }      }    }  }  exiting_quiescence_ = false;}
//计算发送速度void BbrNetworkController::CalculatePacingRate() {  if (BandwidthEstimate().IsZero()) {    return;  }  DataRate target_rate = pacing_gain_ * BandwidthEstimate();  if (config_.rate_based_recovery && InRecovery()) {    pacing_rate_ = pacing_gain_ * max_bandwidth_.GetThirdBest();  }  if (is_at_full_bandwidth_) {    pacing_rate_ = target_rate;    RTC_LOG(LS_INFO)        << "is at full bandwidth pacing_rate_:"<< pacing_rate_.kbps() <<" kbps"        << "pacing_gain:"<
<< ", estimate bw:" << BandwidthEstimate().kbps() << " kbps"; return; } // Pace at the rate of initial_window / RTT as soon as RTT measurements are // available. if (pacing_rate_.IsZero() && !rtt_stats_.min_rtt().IsZero()) { pacing_rate_ = initial_congestion_window_ / rtt_stats_.min_rtt(); if (max_key_frame_send_rate_kbps_) { pacing_rate_ = DataRate::kbps(max_key_frame_send_rate_kbps_); RTC_LOG(LS_INFO) << "use key frame send rate:" << max_key_frame_send_rate_kbps_; } RTC_LOG(LS_INFO) << "initial pacing_rate_:"<< pacing_rate_.kbps() <<" kbps"; return; } // Slow the pacing rate in STARTUP once loss has ever been detected. const bool has_ever_detected_loss = end_recovery_at_.has_value(); if (config_.slower_startup && has_ever_detected_loss) { pacing_rate_ = kStartupAfterLossGain * BandwidthEstimate(); RTC_LOG(LS_INFO) << "is at slower startup pacing_rate_:"<< pacing_rate_.kbps() <<" kbps"; return; } // Do not decrease the pacing rate during the startup. pacing_rate_ = std::max(pacing_rate_, target_rate); RTC_LOG(LS_INFO) << "is max pacing_rate_:"<< pacing_rate_.kbps() <<" kbps";}

 

转载地址:http://zsprj.baihongyu.com/

你可能感兴趣的文章
科研日记7.31
查看>>
zemax仿真二向色镜
查看>>
stm32单片机编程时extern的用法
查看>>
UART4和5的问题
查看>>
Spring框架中在并发访问时的线程安全性
查看>>
网站部署
查看>>
什么情况下会发生栈内存溢出。
查看>>
何为去中心化
查看>>
缓存一致性:写策略
查看>>
Cache一致性:MESI
查看>>
缓存一致性:写未命中
查看>>
为什么用中间位作为组索引
查看>>
缓存:局部性
查看>>
mysql原理:b+树索引
查看>>
mysql原理:最左原则
查看>>
mysql原理:join标到底是什么,为什么有军规不建议超过三个
查看>>
redis缓存穿透
查看>>
redis缓存雪崩
查看>>
mysql的事务隔离
查看>>
mvc架构
查看>>