diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index c7824c8b..01e05fd1 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7158,6 +7158,18 @@ "type": "guint", "writable": true }, + "estimator": { + "blurb": "How to calculate the delay estimate that will be compared against the dynamic delay threshold.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "kalman (0)", + "mutable": "ready", + "readable": true, + "type": "GstRtpGCCBwEEstimator", + "writable": true + }, "max-bitrate": { "blurb": "Maximum bitrate to use (in bit/sec) when computing it through the bandwidth estimation algorithm", "conditionally-available": false, @@ -8210,6 +8222,21 @@ } } }, + "GstRtpGCCBwEEstimator": { + "kind": "enum", + "values": [ + { + "desc": "Use Kalman filter", + "name": "kalman", + "value": "0" + }, + { + "desc": "Use linear regression slope", + "name": "linear-regression", + "value": "1" + } + ] + }, "GstRtpMpeg4GenericPayAggregateMode": { "kind": "enum", "values": [ diff --git a/net/rtp/src/gcc/imp.rs b/net/rtp/src/gcc/imp.rs index a12ff76c..46241929 100644 --- a/net/rtp/src/gcc/imp.rs +++ b/net/rtp/src/gcc/imp.rs @@ -48,20 +48,9 @@ static CAT: Lazy = Lazy::new(|| { // Table1. Time limit in milliseconds between packet bursts which identifies a group const BURST_TIME: Duration = Duration::milliseconds(5); -// Table1. Coefficient used for the measured noise variance -// [0.1,0.001] -const CHI: f64 = 0.01; -const ONE_MINUS_CHI: f64 = 1. - CHI; - -// Table1. State noise covariance matrix -const Q: f64 = 0.001; - // Table1. Initial value for the adaptive threshold const INITIAL_DEL_VAR_TH: Duration = Duration::microseconds(12500); -// Table1. Initial value of the system error covariance -const INITIAL_ERROR_COVARIANCE: f64 = 0.1; - // Table1. Time required to trigger an overuse signal const OVERUSE_TIME_TH: Duration = Duration::milliseconds(10); @@ -261,10 +250,53 @@ enum NetworkUsage { Under, } +/// Simple abstraction over an estimator that allows different estimator +/// implementations, and allows them to be changed at runtime. +trait EstimatorImpl: Send { + /// Update the estimator. + fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup); + + /// Get the estimate that will be compared against the dynamic delay + /// threshold of GCC. Note that this value will be multiplied by a dynamic + /// factor before being compared against the threshold. + fn estimate(&self) -> Duration; + + /// Get the most recent measurement used as input to the estimator. + /// Typically this will be the most recent inter-group delay variation. + fn measure(&self) -> Duration; +} + +mod kalman_estimator; +use kalman_estimator::KalmanEstimator; + +mod linear_regression_estimator; +use linear_regression_estimator::LinearRegressionEstimator; + +/// An enum will all known estimators. The active estimator can be changed at +/// runtime through the "estimator" property. +#[derive(Debug, Default, Copy, Clone, glib::Enum)] +#[repr(i32)] +#[enum_type(name = "GstRtpGCCBwEEstimator")] +pub enum Estimator { + #[default] + #[enum_value(name = "Use Kalman filter")] + Kalman = 0, + #[enum_value(name = "Use linear regression slope")] + LinearRegression = 1, +} + +impl Estimator { + fn to_impl(self) -> Box { + match self { + Estimator::Kalman => Box::::default(), + Estimator::LinearRegression => Box::::default(), + } + } +} + struct Detector { group: PacketGroup, // Packet group that is being filled prev_group: Option, // Group that is ready to be used once "group" is filled - measure: Duration, // Delay variation measure last_received_packets: BTreeMap, // Order by seqnums, front is the newest, back is the oldest @@ -273,11 +305,8 @@ struct Detector { // Moving average of the packet loss loss_average: f64, - // Kalman filter fields - gain: f64, - measurement_uncertainty: f64, // var_v_hat(i-1) - estimate_error: f64, // e(i-1) - estimate: Duration, // m_hat(i-1) + // Estimator fields + estimator_impl: Box, // Threshold fields threshold: Duration, @@ -308,8 +337,8 @@ impl Debug for Detector { "Network Usage: {:?}. Effective bitrate: {}ps - Measure: {} Estimate: {} threshold {} - overuse_estimate {}", self.usage, human_kbits(self.effective_bitrate()), - self.measure, - self.estimate, + self.estimator_impl.measure(), + self.estimator_impl.estimate(), self.threshold, self.last_overuse_estimate, ) @@ -317,11 +346,10 @@ impl Debug for Detector { } impl Detector { - fn new() -> Self { + fn new(estimator: Estimator) -> Self { Detector { group: Default::default(), prev_group: Default::default(), - measure: Duration::ZERO, /* Smallish value to hold PACKETS_RECEIVED_WINDOW packets */ last_received_packets: BTreeMap::new(), @@ -329,10 +357,7 @@ impl Detector { last_loss_update: None, loss_average: 0., - gain: 0., - measurement_uncertainty: 0., - estimate_error: INITIAL_ERROR_COVARIANCE, - estimate: Duration::ZERO, + estimator_impl: estimator.to_impl(), threshold: INITIAL_DEL_VAR_TH, last_threshold_update: None, @@ -503,7 +528,7 @@ impl Detector { ); if let Some(prev_group) = mem::replace(&mut self.prev_group, Some(group.clone())) { // 5.3 Arrival-time filter - self.kalman_estimate(&prev_group, &group); + self.estimator_impl.update(&prev_group, &group); // 5.4 Over-use detector self.overuse_filter(); } @@ -534,32 +559,6 @@ impl Detector { self.last_loss_update = Some(now); } - fn kalman_estimate(&mut self, prev_group: &PacketGroup, group: &PacketGroup) { - self.measure = group.inter_delay_variation(prev_group); - - let z = self.measure - self.estimate; - let zms = z.whole_microseconds() as f64 / 1000.0; - - // This doesn't exactly follows the spec as we should compute and - // use f_max here, no implementation we have found actually uses it. - let alpha = ONE_MINUS_CHI.powf(30.0 / (1000. * 5. * 1_000_000.)); - let root = self.measurement_uncertainty.sqrt(); - let root3 = 3. * root; - - if zms > root3 { - self.measurement_uncertainty = - (alpha * self.measurement_uncertainty + (1. - alpha) * root3.powf(2.)).max(1.); - } else { - self.measurement_uncertainty = - (alpha * self.measurement_uncertainty + (1. - alpha) * zms.powf(2.)).max(1.); - } - - let estimate_uncertainty = self.estimate_error + Q; - self.gain = estimate_uncertainty / (estimate_uncertainty + self.measurement_uncertainty); - self.estimate += Duration::nanoseconds((self.gain * zms * 1_000_000.) as i64); - self.estimate_error = (1. - self.gain) * estimate_uncertainty; - } - fn compare_threshold(&mut self) -> (NetworkUsage, Duration) { // FIXME: It is unclear where that factor is coming from but all // implementations we found have it (libwebrtc, pion, jitsi...), and the @@ -568,11 +567,12 @@ impl Detector { self.num_deltas += 1; if self.num_deltas < 2 { - return (NetworkUsage::Normal, self.estimate); + return (NetworkUsage::Normal, self.estimator_impl.estimate()); } let amplified_estimate = Duration::nanoseconds( - self.estimate.whole_nanoseconds() as i64 * i64::min(self.num_deltas, MAX_DELTAS), + self.estimator_impl.estimate().whole_nanoseconds() as i64 + * i64::min(self.num_deltas, MAX_DELTAS), ); let usage = if amplified_estimate > self.threshold { NetworkUsage::Over @@ -648,8 +648,8 @@ impl Detector { CAT, "{:?} - measure: {} - estimate: {} - amp_est: {} - th: {} - inc_dur: {} - inc_cnt: {}", th_usage, - self.measure, - self.estimate, + self.estimator_impl.measure(), + self.estimator_impl.estimate(), amplified_estimate, self.threshold, self.increasing_duration, @@ -720,6 +720,7 @@ struct State { min_bitrate: Bitrate, max_bitrate: Bitrate, + estimator: Estimator, detector: Detector, clock_entry: Option, @@ -735,6 +736,7 @@ struct State { impl Default for State { fn default() -> Self { + let estimator = Estimator::default(); Self { target_bitrate_on_delay: DEFAULT_ESTIMATED_BITRATE, target_bitrate_on_loss: DEFAULT_ESTIMATED_BITRATE, @@ -745,7 +747,8 @@ impl Default for State { last_decrease_on_delay: Instant::now(), min_bitrate: DEFAULT_MIN_BITRATE, max_bitrate: DEFAULT_MAX_BITRATE, - detector: Detector::new(), + estimator, + detector: Detector::new(estimator), buffers: Default::default(), estimated_bitrate: DEFAULT_ESTIMATED_BITRATE, last_control_op: BandwidthEstimationOp::Increase("Initial increase".into()), @@ -1300,6 +1303,11 @@ impl ObjectImpl for BandwidthEstimator { .default_value(DEFAULT_MAX_BITRATE) .mutable_ready() .build(), + glib::ParamSpecEnum::builder_with_default("estimator", Estimator::default()) + .nick("Estimator") + .blurb("How to calculate the delay estimate that will be compared against the dynamic delay threshold.") + .mutable_ready() + .build(), ] }); @@ -1323,6 +1331,11 @@ impl ObjectImpl for BandwidthEstimator { state.target_bitrate_on_loss = bitrate; state.estimated_bitrate = bitrate; } + "estimator" => { + let mut state = self.state.lock().unwrap(); + state.estimator = value.get().unwrap(); + state.detector.estimator_impl = state.estimator.to_impl() + } _ => unimplemented!(), } } @@ -1341,6 +1354,10 @@ impl ObjectImpl for BandwidthEstimator { let state = self.state.lock().unwrap(); state.estimated_bitrate.to_value() } + "estimator" => { + let state = self.state.lock().unwrap(); + state.estimator.to_value() + } _ => unimplemented!(), } } diff --git a/net/rtp/src/gcc/imp/kalman_estimator.rs b/net/rtp/src/gcc/imp/kalman_estimator.rs new file mode 100644 index 00000000..127cda51 --- /dev/null +++ b/net/rtp/src/gcc/imp/kalman_estimator.rs @@ -0,0 +1,74 @@ +//! This is the estimator that follows the algorithm described in +//! https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02. + +use super::Duration; +use super::EstimatorImpl; +use super::PacketGroup; + +// Table1. Coefficient used for the measured noise variance +// [0.1,0.001] +const CHI: f64 = 0.01; +const ONE_MINUS_CHI: f64 = 1. - CHI; + +// Table1. State noise covariance matrix +const Q: f64 = 0.001; + +// Table1. Initial value of the system error covariance +const INITIAL_ERROR_COVARIANCE: f64 = 0.1; + +#[derive(Debug, PartialEq, Clone)] +pub struct KalmanEstimator { + measure: Duration, // Delay variation measure + gain: f64, + measurement_uncertainty: f64, // var_v_hat(i-1) + estimate_error: f64, // e(i-1) + estimate: Duration, // m_hat(i-1) +} + +impl Default for KalmanEstimator { + fn default() -> Self { + Self { + measure: Duration::ZERO, + gain: 0., + measurement_uncertainty: 0., + estimate_error: INITIAL_ERROR_COVARIANCE, + estimate: Duration::ZERO, + } + } +} + +impl EstimatorImpl for KalmanEstimator { + fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup) { + self.measure = group.inter_delay_variation(prev_group); + + let z = self.measure - self.estimate; + let zms = z.whole_microseconds() as f64 / 1000.0; + + // This doesn't exactly follows the spec as we should compute and + // use f_max here, no implementation we have found actually uses it. + let alpha = ONE_MINUS_CHI.powf(30.0 / (1000. * 5. * 1_000_000.)); + let root = self.measurement_uncertainty.sqrt(); + let root3 = 3. * root; + + if zms > root3 { + self.measurement_uncertainty = + (alpha * self.measurement_uncertainty + (1. - alpha) * root3.powf(2.)).max(1.); + } else { + self.measurement_uncertainty = + (alpha * self.measurement_uncertainty + (1. - alpha) * zms.powf(2.)).max(1.); + } + + let estimate_uncertainty = self.estimate_error + Q; + self.gain = estimate_uncertainty / (estimate_uncertainty + self.measurement_uncertainty); + self.estimate += Duration::nanoseconds((self.gain * zms * 1_000_000.) as i64); + self.estimate_error = (1. - self.gain) * estimate_uncertainty; + } + + fn estimate(&self) -> Duration { + self.estimate + } + + fn measure(&self) -> Duration { + self.measure + } +} diff --git a/net/rtp/src/gcc/imp/linear_regression_estimator.rs b/net/rtp/src/gcc/imp/linear_regression_estimator.rs new file mode 100644 index 00000000..2404d8e4 --- /dev/null +++ b/net/rtp/src/gcc/imp/linear_regression_estimator.rs @@ -0,0 +1,193 @@ +//! This algorithm is not described in +//! https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02. Instead, this +//! algorithm can be found in the GCC implementation in libwrtc, which is used +//! by e.g. Chromium. +//! +//! To explore the algorithm in libwebrtc, use the +//! `delay_detector_for_packet->Update(...)` call in +//! +//! as a starting point. That call invokes `TrendlineEstimator::Update()` in +//! +//! which uses the same core algorithm as we do here. + +use super::Duration; +use super::EstimatorImpl; +use super::PacketGroup; + +mod linear_regression; +use linear_regression::Samples; + +const DEFAULT_SAMPLES_MAX_LEN: usize = 20; + +// Must be between 0.0 and 1.0. +const SMOOTHING_FACTOR: f64 = 0.9; + +// Since our estimate is a slope we need to amplify our estimate somewhat to +// match the dynamic threshold our estimate is compared against. +const GAIN: f64 = 4.; + +#[derive(Debug, PartialEq, Clone)] +pub struct LinearRegressionEstimator { + /// Our (pre-GAIN'ed) output value.. + estimate: Duration, + + /// The last inter-group delay variation measurement. + measure: Duration, + + /// The samples used for calculating the slope of `accumulated_delay` with + /// simple linear regression. + samples: linear_regression::Samples, + + /// The sum over time of inter-group delay variation measurements. The + /// measurements will jump up a down a bit, but on average they will sum to + /// 0, since otherwise the receiver and sender will drift away from each + /// other. After network problems, this can drift away from 0. But once + /// things stabilize, it is expected to maintain a constant value again over + /// time. If it remains constant, the network conditions are good. If it + /// begins to increase, it indicates over-use of the network. If it begins + /// to decrease, it indicates that network problems are clearing up. Since + /// we are interested in changes to this value over time, we are interested + /// in the slope of this value over time. Its absolute value (height over + /// the y axis) over time does not matter to us. + accumulated_delay: Duration, + + /// To make `accumulated_delay` less sensitive to measurement noise and + /// natural network variation, we apply a low-pass filter to + /// `accumulated_delay`, and this is the filtered value. It is actually the + /// slope of this field that we use, and not the slope of + /// `accumulated_delay`. + smoothed_delay: Duration, +} + +impl Default for LinearRegressionEstimator { + fn default() -> Self { + Self::with_samples_max_len(DEFAULT_SAMPLES_MAX_LEN) + } +} + +impl LinearRegressionEstimator { + fn with_samples_max_len(samples_max_len: usize) -> Self { + Self { + accumulated_delay: Duration::ZERO, + smoothed_delay: Duration::ZERO, + samples: Samples::with_max_len(samples_max_len), + measure: Duration::ZERO, + estimate: Duration::ZERO, + } + } +} + +impl EstimatorImpl for LinearRegressionEstimator { + fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup) { + self.measure = group.inter_delay_variation(prev_group); + self.accumulated_delay += self.measure; + self.smoothed_delay = SMOOTHING_FACTOR * self.smoothed_delay + + (1f64 - SMOOTHING_FACTOR) * self.accumulated_delay; + + gst::log!( + super::CAT, + "accumulated_delay: {} - smoothed_delay: {} - samples len: {}", + self.accumulated_delay, + self.smoothed_delay, + self.samples.len(), + ); + + // This never panics because the `inter_delay_variation()` call above + // already did this. + let arrival_time = group.arrival.unwrap(); + + // As long as both x and y use the same unit, the exact unit does not + // matter. Go with seconds since f64 versions of seconds exists. + self.samples.push(linear_regression::Sample { + x: arrival_time.as_seconds_f64(), + y: self.smoothed_delay.as_seconds_f64(), + }); + + // To avoid big movements in slope in the beginning, wait until we have + // enough samples. It won't take long. + if self.samples.full() { + if let Some(slope) = self.samples.slope() { + // The slope is dimensionless, but pretend it is milliseconds. That + // makes the algorithm work. + self.estimate = Duration::nanoseconds((slope * 1_000_000f64) as i64) * GAIN; + } + } + } + + fn estimate(&self) -> Duration { + self.estimate + } + + fn measure(&self) -> Duration { + self.measure + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_zero_accumulation() { + let max_len = DEFAULT_SAMPLES_MAX_LEN; + let mut estimator = LinearRegressionEstimator::with_samples_max_len(max_len); + + let (prev_group, group) = with_inter_group_delay(Duration::ZERO); + + for times_updated in 1..max_len * 5 { + estimator.update(&prev_group, &group); + + // Since inter_group_delay is 0 we expect the accumulated and + // smoothed diff to remain zero + assert_eq!(estimator.accumulated_delay, Duration::ZERO); + assert_eq!(estimator.smoothed_delay, Duration::ZERO); + + // The linear regression sample size shall increase until we reach + // the max len. + assert_eq!(estimator.samples.len(), times_updated.min(max_len)) + } + } + + #[test] + fn test_small_accumulation() { + let mut estimator = LinearRegressionEstimator::default(); + + let inter_group_delay = Duration::milliseconds(10); + let (prev_group, group) = with_inter_group_delay(inter_group_delay); + + for times_updated in 1..10i32 { + estimator.update(&prev_group, &group); + + // We expect the accumulated delay to increase proportionally with + // the number of times we called update(). + assert_eq!( + estimator.accumulated_delay, + inter_group_delay * times_updated, + ); + // Don't bother checking smoothed_delay. It's a bit awkward to + // predict it. + } + } + + // Helper to create fake PacketGroups with the desired `inter_group_delay` + // for tests . The underlying absolute values are arbitrary since they don't + // matter for our tests. + fn with_inter_group_delay(inter_group_delay: Duration) -> (PacketGroup, PacketGroup) { + let inter_departure_delay = Duration::milliseconds(100); // Exact value does not matter + let inter_arrival_delay = inter_departure_delay + inter_group_delay; + + let prev_group = PacketGroup { + packets: vec![], // Unused + departure: Duration::milliseconds(1000), // Exact value does not matter + arrival: Some(Duration::milliseconds(1050)), // Exact value does not matter + }; + + let group = PacketGroup { + packets: vec![], // Unused + departure: prev_group.departure + inter_departure_delay, + arrival: Some(prev_group.arrival.unwrap() + inter_arrival_delay), + }; + + (prev_group, group) + } +} diff --git a/net/rtp/src/gcc/imp/linear_regression_estimator/linear_regression.rs b/net/rtp/src/gcc/imp/linear_regression_estimator/linear_regression.rs new file mode 100644 index 00000000..548ae0d9 --- /dev/null +++ b/net/rtp/src/gcc/imp/linear_regression_estimator/linear_regression.rs @@ -0,0 +1,126 @@ +use std::collections::VecDeque; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub struct Sample { + pub x: f64, + pub y: f64, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct Samples { + samples: VecDeque, + max_len: usize, +} + +impl Samples { + pub fn with_max_len(max_len: usize) -> Self { + Self { + samples: VecDeque::with_capacity(max_len), + max_len, + } + } + + pub fn len(&self) -> usize { + self.samples.len() + } + + pub fn full(&self) -> bool { + self.samples.len() == self.max_len + } + + // https://en.wikipedia.org/wiki/Simple_linear_regression + pub fn slope(&self) -> Option { + // Calculate x and y mean. + let len = self.samples.len() as f64; + if len < 2. { + return None; + } + let mut x_mean = 0.; + let mut y_mean = 0.; + for entry in &self.samples { + x_mean += entry.x; + y_mean += entry.y; + } + x_mean /= len; + y_mean /= len; + + // Calculate slope. + let mut num = 0.; + let mut denum = 0.; + for entry in &self.samples { + let delta_x = entry.x - x_mean; + let delta_y = entry.y - y_mean; + num += delta_x * delta_y; + denum += delta_x * delta_x; + } + if denum != 0. { + Some(num / denum) + } else { + None + } + } + + pub fn push(&mut self, sample: Sample) { + if self.samples.len() == self.max_len { + self.samples.pop_back(); + } + self.samples.push_front(sample); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_slope() { + let test_cases: Vec<(Vec, Option)> = vec![ + (vec![], None), + (vec![Sample { x: 0., y: 0. }], None), + (vec![Sample { x: 0., y: 0. }, Sample { x: 0., y: 0. }], None), + ( + vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: 0. }], + Some(0.), + ), + ( + vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: 1. }], + Some(1.), + ), + ( + vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: -1. }], + Some(-1.), + ), + ( + vec![ + Sample { x: 0., y: 0. }, + Sample { x: 1., y: 2. }, + Sample { x: 2., y: 4. }, + ], + Some(2.), + ), + ]; + + for test_case in &test_cases { + let input = &test_case.0; + let expected_slope = test_case.1; + + let mut samples = Samples::with_max_len(100); + for sample in input { + samples.push(*sample); + } + + let actual_slope = samples.slope(); + + let msg_if_fail = + format!("input={input:?} actual={actual_slope:?} expected={expected_slope:?}"); + + if let Some(slope) = actual_slope { + const EPSILON: f64 = 0.000001; + let expected = expected_slope.unwrap_or_else(|| panic!("{msg_if_fail}")); + assert!((slope - expected).abs() < EPSILON, "{}", msg_if_fail); + } else { + assert_eq!(expected_slope, None) + } + } + } +} diff --git a/net/rtp/src/gcc/mod.rs b/net/rtp/src/gcc/mod.rs index 16d0fbb3..727e6005 100644 --- a/net/rtp/src/gcc/mod.rs +++ b/net/rtp/src/gcc/mod.rs @@ -9,6 +9,11 @@ glib::wrapper! { } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + imp::Estimator::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + gst::Element::register( Some(plugin), "rtpgccbwe",