本文共 19904 字,大约阅读时间需要 66 分钟。
从哪里开始讲起呢?还是从收到transport feedback报文开始讲起吧。如下
NetworkControlUpdate BbrNetworkController::OnTransportPacketsFeedback( TransportPacketsFeedback msg) { Timestamp feedback_recv_time = msg.feedback_time; absl::optionallast_sent_packet = msg.PacketsWithFeedback().back().sent_packet; if (!last_sent_packet.has_value()) { RTC_LOG(LS_WARNING) << "Last ack packet not in history, no RTT update"; } else { Timestamp send_time = last_sent_packet->send_time; TimeDelta send_delta = feedback_recv_time - send_time; rtt_stats_.UpdateRtt(send_delta, TimeDelta::Zero(), feedback_recv_time); } const DataSize total_data_acked_before = sampler_->total_data_acked(); bool is_round_start = false; bool min_rtt_expired = false; std::vector lost_packets = msg.LostWithSendInfo(); DiscardLostPackets(lost_packets); std::vector acked_packets = msg.ReceivedWithSendInfo(); int packets_sent = static_cast (lost_packets.size() + acked_packets.size()); int packets_lost = static_cast (lost_packets.size()); loss_rate_.UpdateWithLossStatus(msg.feedback_time.ms(), packets_sent, packets_lost); // Input the new data into the BBR model of the connection. if (!acked_packets.empty()) { int64_t last_acked_packet = acked_packets.rbegin()->sent_packet->sequence_number; is_round_start = UpdateRoundTripCounter(last_acked_packet); min_rtt_expired = UpdateBandwidthAndMinRtt(msg.feedback_time, acked_packets); UpdateRecoveryState(last_acked_packet, !lost_packets.empty(), is_round_start); const DataSize data_acked = sampler_->total_data_acked() - total_data_acked_before; UpdateAckAggregationBytes(msg.feedback_time, data_acked); if (max_aggregation_bytes_multiplier_ > 0) { if (msg.data_in_flight <= 1.25 * GetTargetCongestionWindow(pacing_gain_)) { bytes_acked_since_queue_drained_ = DataSize::Zero(); } else { bytes_acked_since_queue_drained_ += data_acked; } } } // Handle logic specific to PROBE_BW mode. if (mode_ == PROBE_BW) { UpdateGainCyclePhase(msg.feedback_time, msg.prior_in_flight, !lost_packets.empty()); } // Handle logic specific to STARTUP and DRAIN modes. if (is_round_start && !is_at_full_bandwidth_) { CheckIfFullBandwidthReached(); } MaybeExitStartupOrDrain(msg); // Handle logic specific to PROBE_RTT. MaybeEnterOrExitProbeRtt(msg, is_round_start, min_rtt_expired); // Calculate number of packets acked and lost. DataSize data_acked = sampler_->total_data_acked() - total_data_acked_before; DataSize data_lost = DataSize::Zero(); for (const PacketResult& packet : lost_packets) { data_lost += packet.sent_packet->size; } // After the model is updated, recalculate the pacing rate and congestion // window. CalculatePacingRate(); CalculateCongestionWindow(data_acked); CalculateRecoveryWindow(data_acked, data_lost, msg.data_in_flight); // Cleanup internal state. if (!acked_packets.empty()) { sampler_->RemoveObsoletePackets( acked_packets.back().sent_packet->sequence_number); } return CreateRateUpdate(msg.feedback_time);}
min rtt更新
// Updates the RTT based on a new sample.void RttStats::UpdateRtt(TimeDelta send_delta, TimeDelta ack_delay, Timestamp now) { if (send_delta.IsInfinite() || send_delta <= TimeDelta::Zero()) { RTC_LOG(LS_WARNING) << "Ignoring measured send_delta, because it's is " << "either infinite, zero, or negative. send_delta = " << ToString(send_delta); return; } // Update min_rtt_ first. min_rtt_ does not use an rtt_sample corrected for // ack_delay but the raw observed send_delta, since poor clock granularity at // the client may cause a high ack_delay to result in underestimation of the // min_rtt_. if (min_rtt_.IsZero() || min_rtt_ > send_delta) { min_rtt_ = send_delta; } // Correct for ack_delay if information received from the peer results in a // positive RTT sample. Otherwise, we use the send_delta as a reasonable // measure for smoothed_rtt. TimeDelta rtt_sample = send_delta; previous_srtt_ = smoothed_rtt_; if (rtt_sample > ack_delay) { rtt_sample = rtt_sample - ack_delay; } latest_rtt_ = rtt_sample; // First time call. if (smoothed_rtt_.IsZero()) { smoothed_rtt_ = rtt_sample; mean_deviation_ = rtt_sample / 2; } else { mean_deviation_ = kOneMinusBeta * mean_deviation_ + kBeta * (smoothed_rtt_ - rtt_sample).Abs(); smoothed_rtt_ = kOneMinusAlpha * smoothed_rtt_ + kAlpha * rtt_sample; RTC_LOG(LS_VERBOSE) << " smoothed_rtt(us):" << smoothed_rtt_.us() << " mean_deviation(us):" << mean_deviation_.us(); }}
统计丢包率
void LossRateFilter::UpdateWithLossStatus(int64_t feedback_time, int packets_sent, int packets_lost) { lost_packets_since_last_loss_update_ += packets_lost; expected_packets_since_last_loss_update_ += packets_sent; //时间过1000ms并且发送的包20个以上统计一次丢包率 if (feedback_time >= next_loss_update_ms_ && expected_packets_since_last_loss_update_ >= kLimitNumPackets) { int64_t lost = lost_packets_since_last_loss_update_; int64_t expected = expected_packets_since_last_loss_update_; loss_rate_estimate_ = static_cast(lost) / expected; next_loss_update_ms_ = feedback_time + kUpdateIntervalMs; lost_packets_since_last_loss_update_ = 0; expected_packets_since_last_loss_update_ = 0; }}
更新周期
//上次发出的包被确认,就认为新周期开始bool BbrNetworkController::UpdateRoundTripCounter(int64_t last_acked_packet) { if (last_acked_packet > current_round_trip_end_) { round_trip_count_++; current_round_trip_end_ = last_sent_packet_; return true; } return false;}
计算采样rtt和带宽
//计算一次rtt和带宽的采样//带宽取的是发送速度和ack速度最小值BandwidthSample BandwidthSampler::OnPacketAcknowledgedInner( Timestamp ack_time, int64_t packet_number, const ConnectionStateOnSentPacket& sent_packet) { total_data_acked_ += sent_packet.size; total_data_sent_at_last_acked_packet_ = sent_packet.total_data_sent; last_acked_packet_sent_time_ = sent_packet.sent_time; last_acked_packet_ack_time_ = ack_time; // Exit app-limited phase once a packet that was sent while the connection is // not app-limited is acknowledged. if (is_app_limited_ && packet_number > end_of_app_limited_phase_) { is_app_limited_ = false; } // There might have been no packets acknowledged at the moment when the // current packet was sent. In that case, there is no bandwidth sample to // make. if (!sent_packet.last_acked_packet_sent_time || !sent_packet.last_acked_packet_ack_time) { return BandwidthSample(); } // Infinite rate indicates that the sampler is supposed to discard the // current send rate sample and use only the ack rate. DataRate send_rate = DataRate::Infinity(); if (sent_packet.sent_time > *sent_packet.last_acked_packet_sent_time) { DataSize sent_delta = sent_packet.total_data_sent - sent_packet.total_data_sent_at_last_acked_packet; TimeDelta time_delta = sent_packet.sent_time - *sent_packet.last_acked_packet_sent_time; send_rate = sent_delta / time_delta; } // During the slope calculation, ensure that ack time of the current packet is // always larger than the time of the previous packet, otherwise division by // zero or integer underflow can occur. if (ack_time <= *sent_packet.last_acked_packet_ack_time) { RTC_LOG(LS_WARNING) << "Time of the previously acked packet is larger than the time " "of the current packet."; return BandwidthSample(); } DataSize ack_delta = total_data_acked_ - sent_packet.total_data_acked_at_the_last_acked_packet; TimeDelta time_delta = ack_time - *sent_packet.last_acked_packet_ack_time; DataRate ack_rate = ack_delta / time_delta; BandwidthSample sample; sample.bandwidth = std::min(send_rate, ack_rate); // Note: this sample does not account for delayed acknowledgement time. This // means that the RTT measurements here can be artificially high, especially // on low bandwidth connections. sample.rtt = ack_time - sent_packet.sent_time; // A sample is app-limited if the packet was sent during the app-limited // phase. sample.is_app_limited = sent_packet.is_app_limited; return sample;}
更新最大带宽。带宽0,1,2带宽从大到小依次排列。这里new_time不是时间,是round的概念,默认是10。可以理解为收到10次transportCC报文
// Updates best estimates with |sample|, and expires and updates best // estimates as necessary. void Update(T new_sample, TimeT new_time) { // Reset all estimates if they have not yet been initialized, if new sample // is a new best, or if the newest recorded estimate is too old. if (estimates_[0].sample == zero_value_ || Compare()(new_sample, estimates_[0].sample) || new_time - estimates_[2].time > window_length_) { Reset(new_sample, new_time); return; } if (Compare()(new_sample, estimates_[1].sample)) { estimates_[1] = Sample(new_sample, new_time); estimates_[2] = estimates_[1]; } else if (Compare()(new_sample, estimates_[2].sample)) { estimates_[2] = Sample(new_sample, new_time); } // Expire and update estimates as necessary. if (new_time - estimates_[0].time > window_length_) { // The best estimate hasn't been updated for an entire window, so promote // second and third best estimates. estimates_[0] = estimates_[1]; estimates_[1] = estimates_[2]; estimates_[2] = Sample(new_sample, new_time); // Need to iterate one more time. Check if the new best estimate is // outside the window as well, since it may also have been recorded a // long time ago. Don't need to iterate once more since we cover that // case at the beginning of the method. if (new_time - estimates_[0].time > window_length_) { estimates_[0] = estimates_[1]; estimates_[1] = estimates_[2]; } return; } if (estimates_[1].sample == estimates_[0].sample && new_time - estimates_[1].time > window_length_ >> 2) { // A quarter of the window has passed without a better sample, so the // second-best estimate is taken from the second quarter of the window. estimates_[2] = estimates_[1] = Sample(new_sample, new_time); return; } if (estimates_[2].sample == estimates_[1].sample && new_time - estimates_[2].time > window_length_ >> 1) { // We've passed a half of the window without a better estimate, so take // a third-best estimate from the second half of the window. estimates_[2] = Sample(new_sample, new_time); } }
//这个主要来控制发送到网络上未被确认的数据量void BbrNetworkController::UpdateRecoveryState(int64_t last_acked_packet, bool has_losses, bool is_round_start) { // Exit recovery when there are no losses for a round. if (has_losses) { end_recovery_at_ = last_sent_packet_; } switch (recovery_state_) { case NOT_IN_RECOVERY: // Enter conservation on the first loss. if (has_losses) { recovery_state_ = CONSERVATION; if (mode_ == STARTUP) { recovery_state_ = config_.initial_conservation_in_startup; } // This will cause the |recovery_window_| to be set to the correct // value in CalculateRecoveryWindow(). recovery_window_ = DataSize::Zero(); // Since the conservation phase is meant to be lasting for a whole // round, extend the current round as if it were started right now. current_round_trip_end_ = last_sent_packet_; } break; case CONSERVATION: case MEDIUM_GROWTH: if (is_round_start) { recovery_state_ = GROWTH; } RTC_FALLTHROUGH(); case GROWTH: // Exit recovery if appropriate. if (!has_losses && (!end_recovery_at_ || last_acked_packet > *end_recovery_at_)) { recovery_state_ = NOT_IN_RECOVERY; } break; }}
计算一下比预期的多发了多少数据
void BbrNetworkController::UpdateAckAggregationBytes( Timestamp ack_time, DataSize newly_acked_bytes) { if (!aggregation_epoch_start_time_) { RTC_LOG(LS_ERROR) << "Received feedback before information about sent packets."; RTC_DCHECK(aggregation_epoch_start_time_.has_value()); return; } // Compute how many bytes are expected to be delivered, assuming max bandwidth // is correct. DataSize expected_bytes_acked = max_bandwidth_.GetBest() * (ack_time - *aggregation_epoch_start_time_); // Reset the current aggregation epoch as soon as the ack arrival rate is less // than or equal to the max bandwidth. if (aggregation_epoch_bytes_ <= expected_bytes_acked) { // Reset to start measuring a new aggregation epoch. aggregation_epoch_bytes_ = newly_acked_bytes; aggregation_epoch_start_time_ = ack_time; return; } // Compute how many extra bytes were delivered vs max bandwidth. // Include the bytes most recently acknowledged to account for stretch acks. aggregation_epoch_bytes_ += newly_acked_bytes; max_ack_height_.Update(aggregation_epoch_bytes_ - expected_bytes_acked, round_trip_count_);}
计算拥塞窗口
//其实核心就是最小rtt和当前估计带宽的乘积DataSize BbrNetworkController::GetTargetCongestionWindow(double gain) const { DataSize bdp = GetMinRtt() * BandwidthEstimate(); DataSize congestion_window = gain * bdp; // BDP estimate will be zero if no bandwidth samples are available yet. if (congestion_window.IsZero()) { congestion_window = gain * initial_congestion_window_; } return std::max(congestion_window, min_congestion_window_);}
probe_bw阶段更新
//看注释void BbrNetworkController::UpdateGainCyclePhase(Timestamp now, DataSize prior_in_flight, bool has_losses) { // In most cases, the cycle is advanced after an RTT passes. bool should_advance_gain_cycling = now - last_cycle_start_ > GetMinRtt(); // If the pacing gain is above 1.0, the connection is trying to probe the // bandwidth by increasing the number of bytes in flight to at least // pacing_gain * BDP. Make sure that it actually reaches the target, as long // as there are no losses suggesting that the buffers are not able to hold // that much. if (pacing_gain_ > 1.0 && !has_losses && prior_in_flight < GetTargetCongestionWindow(pacing_gain_)) { should_advance_gain_cycling = false; } // If pacing gain is below 1.0, the connection is trying to drain the extra // queue which could have been incurred by probing prior to it. If the number // of bytes in flight falls down to the estimated BDP value earlier, conclude // that the queue has been successfully drained and exit this cycle early. if (pacing_gain_ < 1.0 && prior_in_flight <= GetTargetCongestionWindow(1)) { should_advance_gain_cycling = true; } if (should_advance_gain_cycling) { cycle_current_offset_ = (cycle_current_offset_ + 1) % kGainCycleLength; last_cycle_start_ = now; // Stay in low gain mode until the target BDP is hit. // Low gain mode will be exited immediately when the target BDP is achieved. if (config_.fully_drain_queue && pacing_gain_ < 1 && GetPacingGain(cycle_current_offset_) == 1 && prior_in_flight > GetTargetCongestionWindow(1)) { return; } pacing_gain_ = GetPacingGain(cycle_current_offset_); }}
//三次带宽不增长就认为到达最大带宽
void BbrNetworkController::CheckIfFullBandwidthReached() { if (last_sample_is_app_limited_) { return; } DataRate target = bandwidth_at_last_round_ * kStartupGrowthTarget; if (BandwidthEstimate() >= target) { bandwidth_at_last_round_ = BandwidthEstimate(); rounds_without_bandwidth_gain_ = 0; return; } rounds_without_bandwidth_gain_++; if ((rounds_without_bandwidth_gain_ >= config_.num_startup_rtts) || (config_.exit_startup_on_loss && InRecovery())) { is_at_full_bandwidth_ = true; }}
//
void BbrNetworkController::MaybeExitStartupOrDrain( const TransportPacketsFeedback& msg) { TimeDelta exit_threshold = config_.exit_startup_rtt_threshold; TimeDelta rtt_delta = last_rtt_ - min_rtt_; //如果在startup阶段到达最大带宽,就转为drain阶段 if (mode_ == STARTUP && (is_at_full_bandwidth_ || rtt_delta > exit_threshold)) { if (rtt_delta > exit_threshold) RTC_LOG(LS_INFO) << "Exiting startup due to rtt increase from: " << ToString(min_rtt_) << " to:" << ToString(last_rtt_) << " > " << ToString(min_rtt_ + exit_threshold); mode_ = DRAIN; pacing_gain_ = kDrainGain; congestion_window_gain_ = kHighGain; } //当flight数据量小于拥塞控制窗口时进入probeBW阶段 if (mode_ == DRAIN && msg.data_in_flight <= GetTargetCongestionWindow(1)) { EnterProbeBandwidthMode(msg.feedback_time); }}
probe
void BbrNetworkController::MaybeEnterOrExitProbeRtt( const TransportPacketsFeedback& msg, bool is_round_start, bool min_rtt_expired) { if (min_rtt_expired && !exiting_quiescence_ && mode_ != PROBE_RTT) { mode_ = PROBE_RTT; pacing_gain_ = 1; // Do not decide on the time to exit PROBE_RTT until the |bytes_in_flight| // is at the target small value. exit_probe_rtt_at_.reset(); } if (mode_ == PROBE_RTT) { if (config_.max_renew_bandwith_rtt_ms) { sampler_->OnAppLimited(); } if (!exit_probe_rtt_at_) { // If the window has reached the appropriate size, schedule exiting // PROBE_RTT. The CWND during PROBE_RTT is kMinimumCongestionWindow, but // we allow an extra packet since QUIC checks CWND before sending a // packet. if (msg.data_in_flight < ProbeRttCongestionWindow() + kMaxPacketSize) { exit_probe_rtt_at_ = msg.feedback_time + TimeDelta::ms(kProbeRttTimeMs); probe_rtt_round_passed_ = false; } } else { if (is_round_start) { probe_rtt_round_passed_ = true; } if (msg.feedback_time >= *exit_probe_rtt_at_ && probe_rtt_round_passed_) { min_rtt_timestamp_ = msg.feedback_time; if (!is_at_full_bandwidth_) { EnterStartupMode(); } else { EnterProbeBandwidthMode(msg.feedback_time); } } } } exiting_quiescence_ = false;}
//计算发送速度void BbrNetworkController::CalculatePacingRate() { if (BandwidthEstimate().IsZero()) { return; } DataRate target_rate = pacing_gain_ * BandwidthEstimate(); if (config_.rate_based_recovery && InRecovery()) { pacing_rate_ = pacing_gain_ * max_bandwidth_.GetThirdBest(); } if (is_at_full_bandwidth_) { pacing_rate_ = target_rate; RTC_LOG(LS_INFO) << "is at full bandwidth pacing_rate_:"<< pacing_rate_.kbps() <<" kbps" << "pacing_gain:"<<< ", estimate bw:" << BandwidthEstimate().kbps() << " kbps"; return; } // Pace at the rate of initial_window / RTT as soon as RTT measurements are // available. if (pacing_rate_.IsZero() && !rtt_stats_.min_rtt().IsZero()) { pacing_rate_ = initial_congestion_window_ / rtt_stats_.min_rtt(); if (max_key_frame_send_rate_kbps_) { pacing_rate_ = DataRate::kbps(max_key_frame_send_rate_kbps_); RTC_LOG(LS_INFO) << "use key frame send rate:" << max_key_frame_send_rate_kbps_; } RTC_LOG(LS_INFO) << "initial pacing_rate_:"<< pacing_rate_.kbps() <<" kbps"; return; } // Slow the pacing rate in STARTUP once loss has ever been detected. const bool has_ever_detected_loss = end_recovery_at_.has_value(); if (config_.slower_startup && has_ever_detected_loss) { pacing_rate_ = kStartupAfterLossGain * BandwidthEstimate(); RTC_LOG(LS_INFO) << "is at slower startup pacing_rate_:"<< pacing_rate_.kbps() <<" kbps"; return; } // Do not decrease the pacing rate during the startup. pacing_rate_ = std::max(pacing_rate_, target_rate); RTC_LOG(LS_INFO) << "is max pacing_rate_:"<< pacing_rate_.kbps() <<" kbps";}
转载地址:http://zsprj.baihongyu.com/