rtpgccbwe: Support linear regression based delay estimation

In our tests, the slope (found with linear regression) on a
history of the (smoothed) accumulated inter-group delays
gives a more stable congestion control. In particular,
low-end devices becomes less sensitive to spikes in
inter-group delay measurements.

This flavour of delay based bandwidth estimation with Google
Congestion Control is also what Chromium is using.

To make it easy to experiment with the new estimator, as
well as add support for new ones in the future, also add
infrastructure for making delay estimator flavour selectable
at runtime.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1566>
This commit is contained in:
Martin Nordholts 2024-05-02 14:13:10 +02:00 committed by Thibault Saunier
parent 71e9c2bb04
commit 9a7f37e2b7
6 changed files with 499 additions and 57 deletions

View file

@ -7158,6 +7158,18 @@
"type": "guint",
"writable": true
},
"estimator": {
"blurb": "How to calculate the delay estimate that will be compared against the dynamic delay threshold.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "kalman (0)",
"mutable": "ready",
"readable": true,
"type": "GstRtpGCCBwEEstimator",
"writable": true
},
"max-bitrate": {
"blurb": "Maximum bitrate to use (in bit/sec) when computing it through the bandwidth estimation algorithm",
"conditionally-available": false,
@ -8210,6 +8222,21 @@
}
}
},
"GstRtpGCCBwEEstimator": {
"kind": "enum",
"values": [
{
"desc": "Use Kalman filter",
"name": "kalman",
"value": "0"
},
{
"desc": "Use linear regression slope",
"name": "linear-regression",
"value": "1"
}
]
},
"GstRtpMpeg4GenericPayAggregateMode": {
"kind": "enum",
"values": [

View file

@ -48,20 +48,9 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
// Table1. Time limit in milliseconds between packet bursts which identifies a group
const BURST_TIME: Duration = Duration::milliseconds(5);
// Table1. Coefficient used for the measured noise variance
// [0.1,0.001]
const CHI: f64 = 0.01;
const ONE_MINUS_CHI: f64 = 1. - CHI;
// Table1. State noise covariance matrix
const Q: f64 = 0.001;
// Table1. Initial value for the adaptive threshold
const INITIAL_DEL_VAR_TH: Duration = Duration::microseconds(12500);
// Table1. Initial value of the system error covariance
const INITIAL_ERROR_COVARIANCE: f64 = 0.1;
// Table1. Time required to trigger an overuse signal
const OVERUSE_TIME_TH: Duration = Duration::milliseconds(10);
@ -261,10 +250,53 @@ enum NetworkUsage {
Under,
}
/// Simple abstraction over an estimator that allows different estimator
/// implementations, and allows them to be changed at runtime.
trait EstimatorImpl: Send {
/// Update the estimator.
fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup);
/// Get the estimate that will be compared against the dynamic delay
/// threshold of GCC. Note that this value will be multiplied by a dynamic
/// factor before being compared against the threshold.
fn estimate(&self) -> Duration;
/// Get the most recent measurement used as input to the estimator.
/// Typically this will be the most recent inter-group delay variation.
fn measure(&self) -> Duration;
}
mod kalman_estimator;
use kalman_estimator::KalmanEstimator;
mod linear_regression_estimator;
use linear_regression_estimator::LinearRegressionEstimator;
/// An enum will all known estimators. The active estimator can be changed at
/// runtime through the "estimator" property.
#[derive(Debug, Default, Copy, Clone, glib::Enum)]
#[repr(i32)]
#[enum_type(name = "GstRtpGCCBwEEstimator")]
pub enum Estimator {
#[default]
#[enum_value(name = "Use Kalman filter")]
Kalman = 0,
#[enum_value(name = "Use linear regression slope")]
LinearRegression = 1,
}
impl Estimator {
fn to_impl(self) -> Box<dyn EstimatorImpl> {
match self {
Estimator::Kalman => Box::<KalmanEstimator>::default(),
Estimator::LinearRegression => Box::<LinearRegressionEstimator>::default(),
}
}
}
struct Detector {
group: PacketGroup, // Packet group that is being filled
prev_group: Option<PacketGroup>, // Group that is ready to be used once "group" is filled
measure: Duration, // Delay variation measure
last_received_packets: BTreeMap<u64, Packet>, // Order by seqnums, front is the newest, back is the oldest
@ -273,11 +305,8 @@ struct Detector {
// Moving average of the packet loss
loss_average: f64,
// Kalman filter fields
gain: f64,
measurement_uncertainty: f64, // var_v_hat(i-1)
estimate_error: f64, // e(i-1)
estimate: Duration, // m_hat(i-1)
// Estimator fields
estimator_impl: Box<dyn EstimatorImpl>,
// Threshold fields
threshold: Duration,
@ -308,8 +337,8 @@ impl Debug for Detector {
"Network Usage: {:?}. Effective bitrate: {}ps - Measure: {} Estimate: {} threshold {} - overuse_estimate {}",
self.usage,
human_kbits(self.effective_bitrate()),
self.measure,
self.estimate,
self.estimator_impl.measure(),
self.estimator_impl.estimate(),
self.threshold,
self.last_overuse_estimate,
)
@ -317,11 +346,10 @@ impl Debug for Detector {
}
impl Detector {
fn new() -> Self {
fn new(estimator: Estimator) -> Self {
Detector {
group: Default::default(),
prev_group: Default::default(),
measure: Duration::ZERO,
/* Smallish value to hold PACKETS_RECEIVED_WINDOW packets */
last_received_packets: BTreeMap::new(),
@ -329,10 +357,7 @@ impl Detector {
last_loss_update: None,
loss_average: 0.,
gain: 0.,
measurement_uncertainty: 0.,
estimate_error: INITIAL_ERROR_COVARIANCE,
estimate: Duration::ZERO,
estimator_impl: estimator.to_impl(),
threshold: INITIAL_DEL_VAR_TH,
last_threshold_update: None,
@ -503,7 +528,7 @@ impl Detector {
);
if let Some(prev_group) = mem::replace(&mut self.prev_group, Some(group.clone())) {
// 5.3 Arrival-time filter
self.kalman_estimate(&prev_group, &group);
self.estimator_impl.update(&prev_group, &group);
// 5.4 Over-use detector
self.overuse_filter();
}
@ -534,32 +559,6 @@ impl Detector {
self.last_loss_update = Some(now);
}
fn kalman_estimate(&mut self, prev_group: &PacketGroup, group: &PacketGroup) {
self.measure = group.inter_delay_variation(prev_group);
let z = self.measure - self.estimate;
let zms = z.whole_microseconds() as f64 / 1000.0;
// This doesn't exactly follows the spec as we should compute and
// use f_max here, no implementation we have found actually uses it.
let alpha = ONE_MINUS_CHI.powf(30.0 / (1000. * 5. * 1_000_000.));
let root = self.measurement_uncertainty.sqrt();
let root3 = 3. * root;
if zms > root3 {
self.measurement_uncertainty =
(alpha * self.measurement_uncertainty + (1. - alpha) * root3.powf(2.)).max(1.);
} else {
self.measurement_uncertainty =
(alpha * self.measurement_uncertainty + (1. - alpha) * zms.powf(2.)).max(1.);
}
let estimate_uncertainty = self.estimate_error + Q;
self.gain = estimate_uncertainty / (estimate_uncertainty + self.measurement_uncertainty);
self.estimate += Duration::nanoseconds((self.gain * zms * 1_000_000.) as i64);
self.estimate_error = (1. - self.gain) * estimate_uncertainty;
}
fn compare_threshold(&mut self) -> (NetworkUsage, Duration) {
// FIXME: It is unclear where that factor is coming from but all
// implementations we found have it (libwebrtc, pion, jitsi...), and the
@ -568,11 +567,12 @@ impl Detector {
self.num_deltas += 1;
if self.num_deltas < 2 {
return (NetworkUsage::Normal, self.estimate);
return (NetworkUsage::Normal, self.estimator_impl.estimate());
}
let amplified_estimate = Duration::nanoseconds(
self.estimate.whole_nanoseconds() as i64 * i64::min(self.num_deltas, MAX_DELTAS),
self.estimator_impl.estimate().whole_nanoseconds() as i64
* i64::min(self.num_deltas, MAX_DELTAS),
);
let usage = if amplified_estimate > self.threshold {
NetworkUsage::Over
@ -648,8 +648,8 @@ impl Detector {
CAT,
"{:?} - measure: {} - estimate: {} - amp_est: {} - th: {} - inc_dur: {} - inc_cnt: {}",
th_usage,
self.measure,
self.estimate,
self.estimator_impl.measure(),
self.estimator_impl.estimate(),
amplified_estimate,
self.threshold,
self.increasing_duration,
@ -720,6 +720,7 @@ struct State {
min_bitrate: Bitrate,
max_bitrate: Bitrate,
estimator: Estimator,
detector: Detector,
clock_entry: Option<gst::SingleShotClockId>,
@ -735,6 +736,7 @@ struct State {
impl Default for State {
fn default() -> Self {
let estimator = Estimator::default();
Self {
target_bitrate_on_delay: DEFAULT_ESTIMATED_BITRATE,
target_bitrate_on_loss: DEFAULT_ESTIMATED_BITRATE,
@ -745,7 +747,8 @@ impl Default for State {
last_decrease_on_delay: Instant::now(),
min_bitrate: DEFAULT_MIN_BITRATE,
max_bitrate: DEFAULT_MAX_BITRATE,
detector: Detector::new(),
estimator,
detector: Detector::new(estimator),
buffers: Default::default(),
estimated_bitrate: DEFAULT_ESTIMATED_BITRATE,
last_control_op: BandwidthEstimationOp::Increase("Initial increase".into()),
@ -1300,6 +1303,11 @@ impl ObjectImpl for BandwidthEstimator {
.default_value(DEFAULT_MAX_BITRATE)
.mutable_ready()
.build(),
glib::ParamSpecEnum::builder_with_default("estimator", Estimator::default())
.nick("Estimator")
.blurb("How to calculate the delay estimate that will be compared against the dynamic delay threshold.")
.mutable_ready()
.build(),
]
});
@ -1323,6 +1331,11 @@ impl ObjectImpl for BandwidthEstimator {
state.target_bitrate_on_loss = bitrate;
state.estimated_bitrate = bitrate;
}
"estimator" => {
let mut state = self.state.lock().unwrap();
state.estimator = value.get().unwrap();
state.detector.estimator_impl = state.estimator.to_impl()
}
_ => unimplemented!(),
}
}
@ -1341,6 +1354,10 @@ impl ObjectImpl for BandwidthEstimator {
let state = self.state.lock().unwrap();
state.estimated_bitrate.to_value()
}
"estimator" => {
let state = self.state.lock().unwrap();
state.estimator.to_value()
}
_ => unimplemented!(),
}
}

View file

@ -0,0 +1,74 @@
//! This is the estimator that follows the algorithm described in
//! https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02.
use super::Duration;
use super::EstimatorImpl;
use super::PacketGroup;
// Table1. Coefficient used for the measured noise variance
// [0.1,0.001]
const CHI: f64 = 0.01;
const ONE_MINUS_CHI: f64 = 1. - CHI;
// Table1. State noise covariance matrix
const Q: f64 = 0.001;
// Table1. Initial value of the system error covariance
const INITIAL_ERROR_COVARIANCE: f64 = 0.1;
#[derive(Debug, PartialEq, Clone)]
pub struct KalmanEstimator {
measure: Duration, // Delay variation measure
gain: f64,
measurement_uncertainty: f64, // var_v_hat(i-1)
estimate_error: f64, // e(i-1)
estimate: Duration, // m_hat(i-1)
}
impl Default for KalmanEstimator {
fn default() -> Self {
Self {
measure: Duration::ZERO,
gain: 0.,
measurement_uncertainty: 0.,
estimate_error: INITIAL_ERROR_COVARIANCE,
estimate: Duration::ZERO,
}
}
}
impl EstimatorImpl for KalmanEstimator {
fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup) {
self.measure = group.inter_delay_variation(prev_group);
let z = self.measure - self.estimate;
let zms = z.whole_microseconds() as f64 / 1000.0;
// This doesn't exactly follows the spec as we should compute and
// use f_max here, no implementation we have found actually uses it.
let alpha = ONE_MINUS_CHI.powf(30.0 / (1000. * 5. * 1_000_000.));
let root = self.measurement_uncertainty.sqrt();
let root3 = 3. * root;
if zms > root3 {
self.measurement_uncertainty =
(alpha * self.measurement_uncertainty + (1. - alpha) * root3.powf(2.)).max(1.);
} else {
self.measurement_uncertainty =
(alpha * self.measurement_uncertainty + (1. - alpha) * zms.powf(2.)).max(1.);
}
let estimate_uncertainty = self.estimate_error + Q;
self.gain = estimate_uncertainty / (estimate_uncertainty + self.measurement_uncertainty);
self.estimate += Duration::nanoseconds((self.gain * zms * 1_000_000.) as i64);
self.estimate_error = (1. - self.gain) * estimate_uncertainty;
}
fn estimate(&self) -> Duration {
self.estimate
}
fn measure(&self) -> Duration {
self.measure
}
}

View file

@ -0,0 +1,193 @@
//! This algorithm is not described in
//! https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02. Instead, this
//! algorithm can be found in the GCC implementation in libwrtc, which is used
//! by e.g. Chromium.
//!
//! To explore the algorithm in libwebrtc, use the
//! `delay_detector_for_packet->Update(...)` call in
//! <https://webrtc.googlesource.com/src/+/refs/heads/main/modules/congestion_controller/goog_cc/delay_based_bwe.cc>
//! as a starting point. That call invokes `TrendlineEstimator::Update()` in
//! <https://webrtc.googlesource.com/src/+/refs/heads/main/modules/congestion_controller/goog_cc/trendline_estimator.cc>
//! which uses the same core algorithm as we do here.
use super::Duration;
use super::EstimatorImpl;
use super::PacketGroup;
mod linear_regression;
use linear_regression::Samples;
const DEFAULT_SAMPLES_MAX_LEN: usize = 20;
// Must be between 0.0 and 1.0.
const SMOOTHING_FACTOR: f64 = 0.9;
// Since our estimate is a slope we need to amplify our estimate somewhat to
// match the dynamic threshold our estimate is compared against.
const GAIN: f64 = 4.;
#[derive(Debug, PartialEq, Clone)]
pub struct LinearRegressionEstimator {
/// Our (pre-GAIN'ed) output value..
estimate: Duration,
/// The last inter-group delay variation measurement.
measure: Duration,
/// The samples used for calculating the slope of `accumulated_delay` with
/// simple linear regression.
samples: linear_regression::Samples,
/// The sum over time of inter-group delay variation measurements. The
/// measurements will jump up a down a bit, but on average they will sum to
/// 0, since otherwise the receiver and sender will drift away from each
/// other. After network problems, this can drift away from 0. But once
/// things stabilize, it is expected to maintain a constant value again over
/// time. If it remains constant, the network conditions are good. If it
/// begins to increase, it indicates over-use of the network. If it begins
/// to decrease, it indicates that network problems are clearing up. Since
/// we are interested in changes to this value over time, we are interested
/// in the slope of this value over time. Its absolute value (height over
/// the y axis) over time does not matter to us.
accumulated_delay: Duration,
/// To make `accumulated_delay` less sensitive to measurement noise and
/// natural network variation, we apply a low-pass filter to
/// `accumulated_delay`, and this is the filtered value. It is actually the
/// slope of this field that we use, and not the slope of
/// `accumulated_delay`.
smoothed_delay: Duration,
}
impl Default for LinearRegressionEstimator {
fn default() -> Self {
Self::with_samples_max_len(DEFAULT_SAMPLES_MAX_LEN)
}
}
impl LinearRegressionEstimator {
fn with_samples_max_len(samples_max_len: usize) -> Self {
Self {
accumulated_delay: Duration::ZERO,
smoothed_delay: Duration::ZERO,
samples: Samples::with_max_len(samples_max_len),
measure: Duration::ZERO,
estimate: Duration::ZERO,
}
}
}
impl EstimatorImpl for LinearRegressionEstimator {
fn update(&mut self, prev_group: &PacketGroup, group: &PacketGroup) {
self.measure = group.inter_delay_variation(prev_group);
self.accumulated_delay += self.measure;
self.smoothed_delay = SMOOTHING_FACTOR * self.smoothed_delay
+ (1f64 - SMOOTHING_FACTOR) * self.accumulated_delay;
gst::log!(
super::CAT,
"accumulated_delay: {} - smoothed_delay: {} - samples len: {}",
self.accumulated_delay,
self.smoothed_delay,
self.samples.len(),
);
// This never panics because the `inter_delay_variation()` call above
// already did this.
let arrival_time = group.arrival.unwrap();
// As long as both x and y use the same unit, the exact unit does not
// matter. Go with seconds since f64 versions of seconds exists.
self.samples.push(linear_regression::Sample {
x: arrival_time.as_seconds_f64(),
y: self.smoothed_delay.as_seconds_f64(),
});
// To avoid big movements in slope in the beginning, wait until we have
// enough samples. It won't take long.
if self.samples.full() {
if let Some(slope) = self.samples.slope() {
// The slope is dimensionless, but pretend it is milliseconds. That
// makes the algorithm work.
self.estimate = Duration::nanoseconds((slope * 1_000_000f64) as i64) * GAIN;
}
}
}
fn estimate(&self) -> Duration {
self.estimate
}
fn measure(&self) -> Duration {
self.measure
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_accumulation() {
let max_len = DEFAULT_SAMPLES_MAX_LEN;
let mut estimator = LinearRegressionEstimator::with_samples_max_len(max_len);
let (prev_group, group) = with_inter_group_delay(Duration::ZERO);
for times_updated in 1..max_len * 5 {
estimator.update(&prev_group, &group);
// Since inter_group_delay is 0 we expect the accumulated and
// smoothed diff to remain zero
assert_eq!(estimator.accumulated_delay, Duration::ZERO);
assert_eq!(estimator.smoothed_delay, Duration::ZERO);
// The linear regression sample size shall increase until we reach
// the max len.
assert_eq!(estimator.samples.len(), times_updated.min(max_len))
}
}
#[test]
fn test_small_accumulation() {
let mut estimator = LinearRegressionEstimator::default();
let inter_group_delay = Duration::milliseconds(10);
let (prev_group, group) = with_inter_group_delay(inter_group_delay);
for times_updated in 1..10i32 {
estimator.update(&prev_group, &group);
// We expect the accumulated delay to increase proportionally with
// the number of times we called update().
assert_eq!(
estimator.accumulated_delay,
inter_group_delay * times_updated,
);
// Don't bother checking smoothed_delay. It's a bit awkward to
// predict it.
}
}
// Helper to create fake PacketGroups with the desired `inter_group_delay`
// for tests . The underlying absolute values are arbitrary since they don't
// matter for our tests.
fn with_inter_group_delay(inter_group_delay: Duration) -> (PacketGroup, PacketGroup) {
let inter_departure_delay = Duration::milliseconds(100); // Exact value does not matter
let inter_arrival_delay = inter_departure_delay + inter_group_delay;
let prev_group = PacketGroup {
packets: vec![], // Unused
departure: Duration::milliseconds(1000), // Exact value does not matter
arrival: Some(Duration::milliseconds(1050)), // Exact value does not matter
};
let group = PacketGroup {
packets: vec![], // Unused
departure: prev_group.departure + inter_departure_delay,
arrival: Some(prev_group.arrival.unwrap() + inter_arrival_delay),
};
(prev_group, group)
}
}

View file

@ -0,0 +1,126 @@
use std::collections::VecDeque;
#[derive(Debug, PartialEq, Copy, Clone)]
pub struct Sample {
pub x: f64,
pub y: f64,
}
#[derive(Debug, PartialEq, Clone)]
pub struct Samples {
samples: VecDeque<Sample>,
max_len: usize,
}
impl Samples {
pub fn with_max_len(max_len: usize) -> Self {
Self {
samples: VecDeque::with_capacity(max_len),
max_len,
}
}
pub fn len(&self) -> usize {
self.samples.len()
}
pub fn full(&self) -> bool {
self.samples.len() == self.max_len
}
// https://en.wikipedia.org/wiki/Simple_linear_regression
pub fn slope(&self) -> Option<f64> {
// Calculate x and y mean.
let len = self.samples.len() as f64;
if len < 2. {
return None;
}
let mut x_mean = 0.;
let mut y_mean = 0.;
for entry in &self.samples {
x_mean += entry.x;
y_mean += entry.y;
}
x_mean /= len;
y_mean /= len;
// Calculate slope.
let mut num = 0.;
let mut denum = 0.;
for entry in &self.samples {
let delta_x = entry.x - x_mean;
let delta_y = entry.y - y_mean;
num += delta_x * delta_y;
denum += delta_x * delta_x;
}
if denum != 0. {
Some(num / denum)
} else {
None
}
}
pub fn push(&mut self, sample: Sample) {
if self.samples.len() == self.max_len {
self.samples.pop_back();
}
self.samples.push_front(sample);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_slope() {
let test_cases: Vec<(Vec<Sample>, Option<f64>)> = vec![
(vec![], None),
(vec![Sample { x: 0., y: 0. }], None),
(vec![Sample { x: 0., y: 0. }, Sample { x: 0., y: 0. }], None),
(
vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: 0. }],
Some(0.),
),
(
vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: 1. }],
Some(1.),
),
(
vec![Sample { x: 0., y: 0. }, Sample { x: 1., y: -1. }],
Some(-1.),
),
(
vec![
Sample { x: 0., y: 0. },
Sample { x: 1., y: 2. },
Sample { x: 2., y: 4. },
],
Some(2.),
),
];
for test_case in &test_cases {
let input = &test_case.0;
let expected_slope = test_case.1;
let mut samples = Samples::with_max_len(100);
for sample in input {
samples.push(*sample);
}
let actual_slope = samples.slope();
let msg_if_fail =
format!("input={input:?} actual={actual_slope:?} expected={expected_slope:?}");
if let Some(slope) = actual_slope {
const EPSILON: f64 = 0.000001;
let expected = expected_slope.unwrap_or_else(|| panic!("{msg_if_fail}"));
assert!((slope - expected).abs() < EPSILON, "{}", msg_if_fail);
} else {
assert_eq!(expected_slope, None)
}
}
}
}

View file

@ -9,6 +9,11 @@ glib::wrapper! {
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
imp::Estimator::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register(
Some(plugin),
"rtpgccbwe",