mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 13:31:00 +00:00
sink: Move the homegrown congestion controller in it own file
This commit is contained in:
parent
c399b1f0c6
commit
b8767fa18f
3 changed files with 423 additions and 406 deletions
420
plugins/src/webrtcsink/homegrown_cc.rs
Normal file
420
plugins/src/webrtcsink/homegrown_cc.rs
Normal file
|
@ -0,0 +1,420 @@
|
|||
use gst::{
|
||||
glib::{self, value::FromValue},
|
||||
prelude::*,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use super::imp::VideoEncoder;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"webrtcsink-homegrowncc",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("WebRTC sink"),
|
||||
)
|
||||
});
|
||||
|
||||
#[derive(Debug)]
|
||||
enum IncreaseType {
|
||||
/// Increase bitrate by value
|
||||
Additive(f64),
|
||||
/// Increase bitrate by factor
|
||||
Multiplicative(f64),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum ControllerType {
|
||||
// Running the "delay-based controller"
|
||||
Delay,
|
||||
// Running the "loss based controller"
|
||||
Loss,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum CongestionControlOp {
|
||||
/// Don't update target bitrate
|
||||
Hold,
|
||||
/// Decrease target bitrate
|
||||
Decrease {
|
||||
factor: f64,
|
||||
#[allow(dead_code)]
|
||||
reason: String, // for Debug
|
||||
},
|
||||
/// Increase target bitrate, either additively or multiplicatively
|
||||
Increase(IncreaseType),
|
||||
}
|
||||
|
||||
fn lookup_twcc_stats(stats: &gst::StructureRef) -> Option<gst::Structure> {
|
||||
for (_, field_value) in stats {
|
||||
if let Ok(s) = field_value.get::<gst::Structure>() {
|
||||
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
|
||||
if (type_ == gst_webrtc::WebRTCStatsType::Transport
|
||||
|| type_ == gst_webrtc::WebRTCStatsType::CandidatePair)
|
||||
&& s.has_field("gst-twcc-stats")
|
||||
{
|
||||
return Some(s.get::<gst::Structure>("gst-twcc-stats").unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub struct CongestionController {
|
||||
/// Note: The target bitrate applied is the min of
|
||||
/// target_bitrate_on_delay and target_bitrate_on_loss
|
||||
///
|
||||
/// Bitrate target based on delay factor for all video streams.
|
||||
/// Hasn't been tested with multiple video streams, but
|
||||
/// current design is simply to divide bitrate equally.
|
||||
pub target_bitrate_on_delay: i32,
|
||||
|
||||
/// Bitrate target based on loss for all video streams.
|
||||
pub target_bitrate_on_loss: i32,
|
||||
|
||||
/// Exponential moving average, updated when bitrate is
|
||||
/// decreased, discarded when increased again past last
|
||||
/// congestion window. Smoothing factor hardcoded.
|
||||
bitrate_ema: Option<f64>,
|
||||
/// Exponentially weighted moving variance, recursively
|
||||
/// updated along with bitrate_ema. sqrt'd to obtain standard
|
||||
/// deviation, used to determine whether to increase bitrate
|
||||
/// additively or multiplicatively
|
||||
bitrate_emvar: f64,
|
||||
/// Used in additive mode to track last control time, influences
|
||||
/// calculation of added value according to gcc section 5.5
|
||||
last_update_time: Option<std::time::Instant>,
|
||||
/// For logging purposes
|
||||
peer_id: String,
|
||||
|
||||
min_bitrate: u32,
|
||||
max_bitrate: u32,
|
||||
}
|
||||
|
||||
impl CongestionController {
|
||||
pub fn new(peer_id: &str, min_bitrate: u32, max_bitrate: u32) -> Self {
|
||||
Self {
|
||||
target_bitrate_on_delay: 0,
|
||||
target_bitrate_on_loss: 0,
|
||||
bitrate_ema: None,
|
||||
bitrate_emvar: 0.,
|
||||
last_update_time: None,
|
||||
peer_id: peer_id.to_string(),
|
||||
min_bitrate,
|
||||
max_bitrate,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_delay(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
twcc_stats: &gst::StructureRef,
|
||||
rtt: f64,
|
||||
) -> CongestionControlOp {
|
||||
let target_bitrate = f64::min(
|
||||
self.target_bitrate_on_delay as f64,
|
||||
self.target_bitrate_on_loss as f64,
|
||||
);
|
||||
// Unwrap, all those fields must be there or there's been an API
|
||||
// break, which qualifies as programming error
|
||||
let bitrate_sent = twcc_stats.get::<u32>("bitrate-sent").unwrap();
|
||||
let bitrate_recv = twcc_stats.get::<u32>("bitrate-recv").unwrap();
|
||||
let delta_of_delta = twcc_stats.get::<i64>("avg-delta-of-delta").unwrap();
|
||||
|
||||
let sent_minus_received = bitrate_sent.saturating_sub(bitrate_recv);
|
||||
|
||||
let delay_factor = sent_minus_received as f64 / target_bitrate;
|
||||
let last_update_time = self.last_update_time.replace(std::time::Instant::now());
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: considering stats {}",
|
||||
self.peer_id,
|
||||
twcc_stats
|
||||
);
|
||||
|
||||
if delay_factor > 0.1 {
|
||||
let (factor, reason) = if delay_factor < 0.64 {
|
||||
(0.96, format!("low delay factor {}", delay_factor))
|
||||
} else {
|
||||
(
|
||||
delay_factor.sqrt().sqrt().clamp(0.8, 0.96),
|
||||
format!("High delay factor {}", delay_factor),
|
||||
)
|
||||
};
|
||||
|
||||
CongestionControlOp::Decrease { factor, reason }
|
||||
} else if delta_of_delta > 1_000_000 {
|
||||
CongestionControlOp::Decrease {
|
||||
factor: 0.97,
|
||||
reason: format!("High delta: {}", delta_of_delta),
|
||||
}
|
||||
} else {
|
||||
CongestionControlOp::Increase(if let Some(ema) = self.bitrate_ema {
|
||||
let bitrate_stdev = self.bitrate_emvar.sqrt();
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: Old bitrate: {}, ema: {}, stddev: {}",
|
||||
self.peer_id,
|
||||
target_bitrate,
|
||||
ema,
|
||||
bitrate_stdev,
|
||||
);
|
||||
|
||||
// gcc section 5.5 advises 3 standard deviations, but experiments
|
||||
// have shown this to be too low, probably related to the rest of
|
||||
// homegrown algorithm not implementing gcc, revisit when implementing
|
||||
// the rest of the RFC
|
||||
if target_bitrate < ema - 7. * bitrate_stdev {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: below last congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
/* Multiplicative increase */
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
} else if target_bitrate > ema + 7. * bitrate_stdev {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: above last congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
/* We have gone past our last estimated max bandwidth
|
||||
* network situation may have changed, go back to
|
||||
* multiplicative increase
|
||||
*/
|
||||
self.bitrate_ema.take();
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
} else {
|
||||
let rtt_ms = rtt * 1000.;
|
||||
let response_time_ms = 100. + rtt_ms;
|
||||
let time_since_last_update_ms = match last_update_time {
|
||||
None => 0.,
|
||||
Some(instant) => {
|
||||
(self.last_update_time.unwrap() - instant).as_millis() as f64
|
||||
}
|
||||
};
|
||||
// gcc section 5.5 advises 0.95 as the smoothing factor, but that
|
||||
// seems intuitively much too low, granting disproportionate importance
|
||||
// to the last measurement. 0.5 seems plenty enough, I don't have maths
|
||||
// to back that up though :)
|
||||
let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0);
|
||||
let bits_per_frame = target_bitrate / 30.;
|
||||
let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.));
|
||||
let avg_packet_size_bits = bits_per_frame / packets_per_frame;
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: still in last congestion window",
|
||||
self.peer_id,
|
||||
);
|
||||
|
||||
/* Additive increase */
|
||||
IncreaseType::Additive(f64::max(1000., alpha * avg_packet_size_bits))
|
||||
}
|
||||
} else {
|
||||
/* Multiplicative increase */
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: outside congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn clamp_bitrate(&mut self, bitrate: i32, n_encoders: i32, controller_type: ControllerType) {
|
||||
match controller_type {
|
||||
ControllerType::Loss => {
|
||||
self.target_bitrate_on_loss = bitrate.clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
)
|
||||
}
|
||||
|
||||
ControllerType::Delay => {
|
||||
self.target_bitrate_on_delay = bitrate.clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_remote_inbound_stats(&self, stats: &gst::StructureRef) -> Vec<gst::Structure> {
|
||||
let mut inbound_rtp_stats: Vec<gst::Structure> = Default::default();
|
||||
for (_, field_value) in stats {
|
||||
if let Ok(s) = field_value.get::<gst::Structure>() {
|
||||
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
|
||||
if type_ == gst_webrtc::WebRTCStatsType::RemoteInboundRtp {
|
||||
inbound_rtp_stats.push(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inbound_rtp_stats
|
||||
}
|
||||
|
||||
fn lookup_rtt(&self, stats: &gst::StructureRef) -> f64 {
|
||||
let inbound_rtp_stats = self.get_remote_inbound_stats(stats);
|
||||
let mut rtt = 0.;
|
||||
let mut n_rtts = 0u64;
|
||||
for inbound_stat in &inbound_rtp_stats {
|
||||
if let Err(err) = (|| -> Result<(), gst::structure::GetError<<<f64 as FromValue>::Checker as glib::value::ValueTypeChecker>::Error>> {
|
||||
rtt += inbound_stat.get::<f64>("round-trip-time")?;
|
||||
n_rtts += 1;
|
||||
|
||||
Ok(())
|
||||
})() {
|
||||
gst::debug!(CAT, "{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
rtt /= f64::max(1., n_rtts as f64);
|
||||
|
||||
gst::log!(CAT, "Round trip time: {}", rtt);
|
||||
|
||||
rtt
|
||||
}
|
||||
|
||||
pub fn loss_control(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
stats: &gst::StructureRef,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
) {
|
||||
let loss_percentage = stats.get::<f64>("packet-loss-pct").unwrap();
|
||||
|
||||
self.apply_control_op(
|
||||
element,
|
||||
encoders,
|
||||
if loss_percentage > 10. {
|
||||
CongestionControlOp::Decrease {
|
||||
factor: ((100. - (0.5 * loss_percentage)) / 100.).clamp(0.7, 0.98),
|
||||
reason: format!("High loss: {}", loss_percentage),
|
||||
}
|
||||
} else if loss_percentage > 2. {
|
||||
CongestionControlOp::Hold
|
||||
} else {
|
||||
CongestionControlOp::Increase(IncreaseType::Multiplicative(1.05))
|
||||
},
|
||||
ControllerType::Loss,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn delay_control(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
stats: &gst::StructureRef,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
) {
|
||||
if let Some(twcc_stats) = lookup_twcc_stats(stats) {
|
||||
let op = self.update_delay(element, &twcc_stats, self.lookup_rtt(stats));
|
||||
self.apply_control_op(element, encoders, op, ControllerType::Delay);
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_control_op(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
control_op: CongestionControlOp,
|
||||
controller_type: ControllerType,
|
||||
) {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: applying congestion control operation {:?}",
|
||||
self.peer_id,
|
||||
control_op
|
||||
);
|
||||
|
||||
let n_encoders = encoders.len() as i32;
|
||||
let prev_bitrate = i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss);
|
||||
match &control_op {
|
||||
CongestionControlOp::Hold => {}
|
||||
CongestionControlOp::Increase(IncreaseType::Additive(value)) => {
|
||||
self.clamp_bitrate(
|
||||
self.target_bitrate_on_delay + *value as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
}
|
||||
CongestionControlOp::Increase(IncreaseType::Multiplicative(factor)) => {
|
||||
self.clamp_bitrate(
|
||||
(self.target_bitrate_on_delay as f64 * factor) as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
}
|
||||
CongestionControlOp::Decrease { factor, .. } => {
|
||||
self.clamp_bitrate(
|
||||
(self.target_bitrate_on_delay as f64 * factor) as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
|
||||
if let ControllerType::Delay = controller_type {
|
||||
// Smoothing factor
|
||||
let alpha = 0.75;
|
||||
if let Some(ema) = self.bitrate_ema {
|
||||
let sigma: f64 = (self.target_bitrate_on_delay as f64) - ema;
|
||||
self.bitrate_ema = Some(ema + (alpha * sigma));
|
||||
self.bitrate_emvar =
|
||||
(1. - alpha) * (self.bitrate_emvar + alpha * sigma.powi(2));
|
||||
} else {
|
||||
self.bitrate_ema = Some(self.target_bitrate_on_delay as f64);
|
||||
self.bitrate_emvar = 0.;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let target_bitrate =
|
||||
i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss).clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
) / n_encoders;
|
||||
|
||||
if target_bitrate != prev_bitrate {
|
||||
gst::info!(
|
||||
CAT,
|
||||
"{:?} {} => {} | on delay {} - on loss {} | min {} - max {}",
|
||||
control_op,
|
||||
human_bytes::human_bytes(prev_bitrate),
|
||||
human_bytes::human_bytes(target_bitrate),
|
||||
human_bytes::human_bytes(self.target_bitrate_on_delay),
|
||||
human_bytes::human_bytes(self.target_bitrate_on_loss),
|
||||
human_bytes::human_bytes(self.min_bitrate),
|
||||
human_bytes::human_bytes(self.max_bitrate),
|
||||
);
|
||||
}
|
||||
|
||||
let fec_ratio = {
|
||||
if target_bitrate <= 2000000 || self.max_bitrate <= 2000000 {
|
||||
0f64
|
||||
} else {
|
||||
(target_bitrate as f64 - 2000000f64) / (self.max_bitrate as f64 - 2000000f64)
|
||||
}
|
||||
};
|
||||
|
||||
let fec_percentage = (fec_ratio * 50f64) as u32;
|
||||
|
||||
for encoder in encoders.iter_mut() {
|
||||
encoder.set_bitrate(element, target_bitrate);
|
||||
encoder
|
||||
.transceiver
|
||||
.set_property("fec-percentage", fec_percentage);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
use anyhow::Context;
|
||||
use gst::glib;
|
||||
use gst::glib::value::FromValue;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use gst_rtp::prelude::*;
|
||||
|
@ -17,6 +16,7 @@ use std::collections::HashMap;
|
|||
use std::ops::Mul;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use super::homegrown_cc::CongestionController;
|
||||
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
|
||||
use crate::signaller::Signaller;
|
||||
use std::collections::BTreeMap;
|
||||
|
@ -127,69 +127,9 @@ pub struct VideoEncoder {
|
|||
video_info: gst_video::VideoInfo,
|
||||
peer_id: String,
|
||||
mitigation_mode: WebRTCSinkMitigationMode,
|
||||
transceiver: gst_webrtc::WebRTCRTPTransceiver,
|
||||
pub transceiver: gst_webrtc::WebRTCRTPTransceiver,
|
||||
}
|
||||
|
||||
struct CongestionController {
|
||||
/// Note: The target bitrate applied is the min of
|
||||
/// target_bitrate_on_delay and target_bitrate_on_loss
|
||||
///
|
||||
/// Bitrate target based on delay factor for all video streams.
|
||||
/// Hasn't been tested with multiple video streams, but
|
||||
/// current design is simply to divide bitrate equally.
|
||||
target_bitrate_on_delay: i32,
|
||||
|
||||
/// Bitrate target based on loss for all video streams.
|
||||
target_bitrate_on_loss: i32,
|
||||
|
||||
/// Exponential moving average, updated when bitrate is
|
||||
/// decreased, discarded when increased again past last
|
||||
/// congestion window. Smoothing factor hardcoded.
|
||||
bitrate_ema: Option<f64>,
|
||||
/// Exponentially weighted moving variance, recursively
|
||||
/// updated along with bitrate_ema. sqrt'd to obtain standard
|
||||
/// deviation, used to determine whether to increase bitrate
|
||||
/// additively or multiplicatively
|
||||
bitrate_emvar: f64,
|
||||
/// Used in additive mode to track last control time, influences
|
||||
/// calculation of added value according to gcc section 5.5
|
||||
last_update_time: Option<std::time::Instant>,
|
||||
/// For logging purposes
|
||||
peer_id: String,
|
||||
|
||||
min_bitrate: u32,
|
||||
max_bitrate: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum IncreaseType {
|
||||
/// Increase bitrate by value
|
||||
Additive(f64),
|
||||
/// Increase bitrate by factor
|
||||
Multiplicative(f64),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum CongestionControlOp {
|
||||
/// Don't update target bitrate
|
||||
Hold,
|
||||
/// Decrease target bitrate
|
||||
Decrease {
|
||||
factor: f64,
|
||||
#[allow(dead_code)]
|
||||
reason: String, // for Debug
|
||||
},
|
||||
/// Increase target bitrate, either additively or multiplicatively
|
||||
Increase(IncreaseType),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum ControllerType {
|
||||
// Running the "delay-based controller"
|
||||
Delay,
|
||||
// Running the "loss based controller"
|
||||
Loss,
|
||||
}
|
||||
struct Consumer {
|
||||
pipeline: gst::Pipeline,
|
||||
webrtcbin: gst::Element,
|
||||
|
@ -583,23 +523,6 @@ fn setup_encoding(
|
|||
Ok((enc, conv_filter, pay))
|
||||
}
|
||||
|
||||
fn lookup_twcc_stats(stats: &gst::StructureRef) -> Option<gst::Structure> {
|
||||
for (_, field_value) in stats {
|
||||
if let Ok(s) = field_value.get::<gst::Structure>() {
|
||||
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
|
||||
if (type_ == gst_webrtc::WebRTCStatsType::Transport
|
||||
|| type_ == gst_webrtc::WebRTCStatsType::CandidatePair)
|
||||
&& s.has_field("gst-twcc-stats")
|
||||
{
|
||||
return Some(s.get::<gst::Structure>("gst-twcc-stats").unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
impl VideoEncoder {
|
||||
fn new(
|
||||
element: gst::Element,
|
||||
|
@ -733,333 +656,6 @@ impl VideoEncoder {
|
|||
}
|
||||
}
|
||||
|
||||
impl CongestionController {
|
||||
fn new(peer_id: &str, min_bitrate: u32, max_bitrate: u32) -> Self {
|
||||
Self {
|
||||
target_bitrate_on_delay: 0,
|
||||
target_bitrate_on_loss: 0,
|
||||
bitrate_ema: None,
|
||||
bitrate_emvar: 0.,
|
||||
last_update_time: None,
|
||||
peer_id: peer_id.to_string(),
|
||||
min_bitrate,
|
||||
max_bitrate,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_delay(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
twcc_stats: &gst::StructureRef,
|
||||
rtt: f64,
|
||||
) -> CongestionControlOp {
|
||||
let target_bitrate = f64::min(
|
||||
self.target_bitrate_on_delay as f64,
|
||||
self.target_bitrate_on_loss as f64,
|
||||
);
|
||||
// Unwrap, all those fields must be there or there's been an API
|
||||
// break, which qualifies as programming error
|
||||
let bitrate_sent = twcc_stats.get::<u32>("bitrate-sent").unwrap();
|
||||
let bitrate_recv = twcc_stats.get::<u32>("bitrate-recv").unwrap();
|
||||
let delta_of_delta = twcc_stats.get::<i64>("avg-delta-of-delta").unwrap();
|
||||
|
||||
let sent_minus_received = bitrate_sent.saturating_sub(bitrate_recv);
|
||||
|
||||
let delay_factor = sent_minus_received as f64 / target_bitrate;
|
||||
let last_update_time = self.last_update_time.replace(std::time::Instant::now());
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: considering stats {}",
|
||||
self.peer_id,
|
||||
twcc_stats
|
||||
);
|
||||
|
||||
if delay_factor > 0.1 {
|
||||
let (factor, reason) = if delay_factor < 0.64 {
|
||||
(0.96, format!("low delay factor {}", delay_factor))
|
||||
} else {
|
||||
(
|
||||
delay_factor.sqrt().sqrt().clamp(0.8, 0.96),
|
||||
format!("High delay factor {}", delay_factor),
|
||||
)
|
||||
};
|
||||
|
||||
CongestionControlOp::Decrease { factor, reason }
|
||||
} else if delta_of_delta > 1_000_000 {
|
||||
CongestionControlOp::Decrease {
|
||||
factor: 0.97,
|
||||
reason: format!("High delta: {}", delta_of_delta),
|
||||
}
|
||||
} else {
|
||||
CongestionControlOp::Increase(if let Some(ema) = self.bitrate_ema {
|
||||
let bitrate_stdev = self.bitrate_emvar.sqrt();
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: Old bitrate: {}, ema: {}, stddev: {}",
|
||||
self.peer_id,
|
||||
target_bitrate,
|
||||
ema,
|
||||
bitrate_stdev,
|
||||
);
|
||||
|
||||
// gcc section 5.5 advises 3 standard deviations, but experiments
|
||||
// have shown this to be too low, probably related to the rest of
|
||||
// homegrown algorithm not implementing gcc, revisit when implementing
|
||||
// the rest of the RFC
|
||||
if target_bitrate < ema - 7. * bitrate_stdev {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: below last congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
/* Multiplicative increase */
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
} else if target_bitrate > ema + 7. * bitrate_stdev {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: above last congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
/* We have gone past our last estimated max bandwidth
|
||||
* network situation may have changed, go back to
|
||||
* multiplicative increase
|
||||
*/
|
||||
self.bitrate_ema.take();
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
} else {
|
||||
let rtt_ms = rtt * 1000.;
|
||||
let response_time_ms = 100. + rtt_ms;
|
||||
let time_since_last_update_ms = match last_update_time {
|
||||
None => 0.,
|
||||
Some(instant) => {
|
||||
(self.last_update_time.unwrap() - instant).as_millis() as f64
|
||||
}
|
||||
};
|
||||
// gcc section 5.5 advises 0.95 as the smoothing factor, but that
|
||||
// seems intuitively much too low, granting disproportionate importance
|
||||
// to the last measurement. 0.5 seems plenty enough, I don't have maths
|
||||
// to back that up though :)
|
||||
let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0);
|
||||
let bits_per_frame = target_bitrate / 30.;
|
||||
let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.));
|
||||
let avg_packet_size_bits = bits_per_frame / packets_per_frame;
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: still in last congestion window",
|
||||
self.peer_id,
|
||||
);
|
||||
|
||||
/* Additive increase */
|
||||
IncreaseType::Additive(f64::max(1000., alpha * avg_packet_size_bits))
|
||||
}
|
||||
} else {
|
||||
/* Multiplicative increase */
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: outside congestion window",
|
||||
self.peer_id
|
||||
);
|
||||
IncreaseType::Multiplicative(1.03)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn clamp_bitrate(&mut self, bitrate: i32, n_encoders: i32, controller_type: ControllerType) {
|
||||
match controller_type {
|
||||
ControllerType::Loss => {
|
||||
self.target_bitrate_on_loss = bitrate.clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
)
|
||||
}
|
||||
|
||||
ControllerType::Delay => {
|
||||
self.target_bitrate_on_delay = bitrate.clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_remote_inbound_stats(&self, stats: &gst::StructureRef) -> Vec<gst::Structure> {
|
||||
let mut inbound_rtp_stats: Vec<gst::Structure> = Default::default();
|
||||
for (_, field_value) in stats {
|
||||
if let Ok(s) = field_value.get::<gst::Structure>() {
|
||||
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
|
||||
if type_ == gst_webrtc::WebRTCStatsType::RemoteInboundRtp {
|
||||
inbound_rtp_stats.push(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inbound_rtp_stats
|
||||
}
|
||||
|
||||
fn lookup_rtt(&self, stats: &gst::StructureRef) -> f64 {
|
||||
let inbound_rtp_stats = self.get_remote_inbound_stats(stats);
|
||||
let mut rtt = 0.;
|
||||
let mut n_rtts = 0u64;
|
||||
for inbound_stat in &inbound_rtp_stats {
|
||||
if let Err(err) = (|| -> Result<(), gst::structure::GetError<<<f64 as FromValue>::Checker as glib::value::ValueTypeChecker>::Error>> {
|
||||
rtt += inbound_stat.get::<f64>("round-trip-time")?;
|
||||
n_rtts += 1;
|
||||
|
||||
Ok(())
|
||||
})() {
|
||||
gst::debug!(CAT, "{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
rtt /= f64::max(1., n_rtts as f64);
|
||||
|
||||
gst::log!(CAT, "Round trip time: {}", rtt);
|
||||
|
||||
rtt
|
||||
}
|
||||
|
||||
fn loss_control(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
stats: &gst::StructureRef,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
) {
|
||||
let loss_percentage = stats.get::<f64>("packet-loss-pct").unwrap();
|
||||
|
||||
self.apply_control_op(
|
||||
element,
|
||||
encoders,
|
||||
if loss_percentage > 10. {
|
||||
CongestionControlOp::Decrease {
|
||||
factor: ((100. - (0.5 * loss_percentage)) / 100.).clamp(0.7, 0.98),
|
||||
reason: format!("High loss: {}", loss_percentage),
|
||||
}
|
||||
} else if loss_percentage > 2. {
|
||||
CongestionControlOp::Hold
|
||||
} else {
|
||||
CongestionControlOp::Increase(IncreaseType::Multiplicative(1.05))
|
||||
},
|
||||
ControllerType::Loss,
|
||||
);
|
||||
}
|
||||
|
||||
fn delay_control(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
stats: &gst::StructureRef,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
) {
|
||||
if let Some(twcc_stats) = lookup_twcc_stats(stats) {
|
||||
let op = self.update_delay(element, &twcc_stats, self.lookup_rtt(stats));
|
||||
self.apply_control_op(element, encoders, op, ControllerType::Delay);
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_control_op(
|
||||
&mut self,
|
||||
element: &super::WebRTCSink,
|
||||
encoders: &mut Vec<VideoEncoder>,
|
||||
control_op: CongestionControlOp,
|
||||
controller_type: ControllerType,
|
||||
) {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"consumer {}: applying congestion control operation {:?}",
|
||||
self.peer_id,
|
||||
control_op
|
||||
);
|
||||
|
||||
let n_encoders = encoders.len() as i32;
|
||||
let prev_bitrate = i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss);
|
||||
match &control_op {
|
||||
CongestionControlOp::Hold => {}
|
||||
CongestionControlOp::Increase(IncreaseType::Additive(value)) => {
|
||||
self.clamp_bitrate(
|
||||
self.target_bitrate_on_delay + *value as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
}
|
||||
CongestionControlOp::Increase(IncreaseType::Multiplicative(factor)) => {
|
||||
self.clamp_bitrate(
|
||||
(self.target_bitrate_on_delay as f64 * factor) as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
}
|
||||
CongestionControlOp::Decrease { factor, .. } => {
|
||||
self.clamp_bitrate(
|
||||
(self.target_bitrate_on_delay as f64 * factor) as i32,
|
||||
n_encoders,
|
||||
controller_type,
|
||||
);
|
||||
|
||||
if let ControllerType::Delay = controller_type {
|
||||
// Smoothing factor
|
||||
let alpha = 0.75;
|
||||
if let Some(ema) = self.bitrate_ema {
|
||||
let sigma: f64 = (self.target_bitrate_on_delay as f64) - ema;
|
||||
self.bitrate_ema = Some(ema + (alpha * sigma));
|
||||
self.bitrate_emvar =
|
||||
(1. - alpha) * (self.bitrate_emvar + alpha * sigma.powi(2));
|
||||
} else {
|
||||
self.bitrate_ema = Some(self.target_bitrate_on_delay as f64);
|
||||
self.bitrate_emvar = 0.;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let target_bitrate =
|
||||
i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss).clamp(
|
||||
self.min_bitrate as i32 * n_encoders,
|
||||
self.max_bitrate as i32 * n_encoders,
|
||||
) / n_encoders;
|
||||
|
||||
if target_bitrate != prev_bitrate {
|
||||
gst::info!(
|
||||
CAT,
|
||||
"{:?} {} => {} | on delay {} - on loss {} | min {} - max {}",
|
||||
control_op,
|
||||
human_bytes::human_bytes(prev_bitrate),
|
||||
human_bytes::human_bytes(target_bitrate),
|
||||
human_bytes::human_bytes(self.target_bitrate_on_delay),
|
||||
human_bytes::human_bytes(self.target_bitrate_on_loss),
|
||||
human_bytes::human_bytes(self.min_bitrate),
|
||||
human_bytes::human_bytes(self.max_bitrate),
|
||||
);
|
||||
}
|
||||
|
||||
let fec_ratio = {
|
||||
if target_bitrate <= 2000000 || self.max_bitrate <= 2000000 {
|
||||
0f64
|
||||
} else {
|
||||
(target_bitrate as f64 - 2000000f64) / (self.max_bitrate as f64 - 2000000f64)
|
||||
}
|
||||
};
|
||||
|
||||
let fec_percentage = (fec_ratio * 50f64) as u32;
|
||||
|
||||
for encoder in encoders.iter_mut() {
|
||||
encoder.set_bitrate(element, target_bitrate);
|
||||
encoder
|
||||
.transceiver
|
||||
.set_property("fec-percentage", fec_percentage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn finalize_consumer(
|
||||
&mut self,
|
||||
|
|
|
@ -3,6 +3,7 @@ use gst::prelude::*;
|
|||
use gst::subclass::prelude::*;
|
||||
use std::error::Error;
|
||||
|
||||
mod homegrown_cc;
|
||||
mod imp;
|
||||
|
||||
glib::wrapper! {
|
||||
|
|
Loading…
Reference in a new issue