From 8c6ff2405289f25f9d54a31252507e340ac6a321 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 4 Nov 2021 18:26:50 +0100 Subject: [PATCH] webrtcsink: Initial congestion control implementation Naive heuristic lifted from an earlier proof of concept, augmented with logic from https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-5.5 A property is exposed to disable congestion control for testing purposes, it can be extended in the future to allow selecting a different congestion control scheme. + Update the documentation --- Cargo.lock | 70 +++- README.md | 85 ++++- plugins/Cargo.toml | 5 +- plugins/src/webrtcsink/imp.rs | 655 +++++++++++++++++++++++++++++++--- plugins/src/webrtcsink/mod.rs | 10 + 5 files changed, 764 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7dc1866d5..247a008ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,23 @@ dependencies = [ "url", ] +[[package]] +name = "async-process" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b21b63ab5a0db0369deb913540af2892750e42d949faacc7a61495ac418a1692" +dependencies = [ + "async-io", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "libc", + "once_cell", + "signal-hook", + "winapi", +] + [[package]] name = "async-std" version = "1.10.0" @@ -108,6 +125,7 @@ dependencies = [ "async-global-executor", "async-io", "async-lock", + "async-process", "crossbeam-utils", "futures-channel", "futures-core", @@ -516,7 +534,7 @@ dependencies = [ [[package]] name = "glib" version = "0.15.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047" +source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a" dependencies = [ "bitflags", "futures-channel", @@ -534,7 +552,7 @@ dependencies = [ [[package]] name = "glib-macros" version = "0.15.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047" +source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a" dependencies = [ "anyhow", "heck", @@ -548,7 +566,7 @@ dependencies = [ [[package]] name = "glib-sys" version = "0.15.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047" +source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a" dependencies = [ "libc", "system-deps", @@ -570,7 +588,7 @@ dependencies = [ [[package]] name = "gobject-sys" version = "0.15.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047" +source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a" dependencies = [ "glib-sys", "libc", @@ -663,6 +681,30 @@ dependencies = [ "system-deps", ] +[[package]] +name = "gstreamer-rtp" +version = "0.18.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#b43d785d837738a9071ad0690232e245e487dd53" +dependencies = [ + "bitflags", + "glib", + "gstreamer", + "gstreamer-rtp-sys", + "once_cell", +] + +[[package]] +name = "gstreamer-rtp-sys" +version = "0.18.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#b43d785d837738a9071ad0690232e245e487dd53" +dependencies = [ + "glib-sys", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + [[package]] name = "gstreamer-sdp" version = "0.18.0" @@ -1312,6 +1354,25 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.5" @@ -1630,6 +1691,7 @@ dependencies = [ "gst-plugin-version-helper", "gstreamer", "gstreamer-app", + "gstreamer-rtp", "gstreamer-sdp", "gstreamer-video", "gstreamer-webrtc", diff --git a/README.md b/README.md index b338d7ee7..82a83cd3c 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,17 @@ useful alternative. While this is not on the roadmap at the moment, nothing in the design prevents implementing this optimization. +* Congestion control: the element levarages transport-wide congestion control + feedback messages in order to adapt the bitrate of individual consumers' video + encoders to the available bandwidth. + * Configuration: the level of user control over the element is at the moment quite narrow, as the only interface exposed is control over proposed codecs, as well - as their order of priority. Consult `gst-inspect=1.0` for more information. + as their order of priority, and disabling congestion control. Consult `gst-inspect=1.0` + for more information. More features are on the roadmap, focusing on mechanisms for mitigating packet -loss and congestion. +loss. It is important to note that full control over the individual elements used by `webrtcsink` is *not* on the roadmap, as it will act as a black box in that respect, @@ -52,13 +57,7 @@ expose interfaces to guide and tune the heuristics it employs. ### Prerequisites -The element has only been tested for now against GStreamer master, with an -extra Merge Request pending review: - - - -The MR should hopefully make it in in time for GStreamer's 1.20 release, -in the meantime the patches must be applied locally. +The element has only been tested for now against GStreamer master. For testing, it is recommended to simply build GStreamer locally and run in the uninstalled devenv. @@ -98,7 +97,7 @@ python3 -m http.server In the third, run: -``` +``` shell export GST_PLUGIN_PATH=$PWD/target/debug:$GST_PLUGIN_PATH gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws. ``` @@ -106,13 +105,77 @@ gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws. When the pipeline above is running succesfully, open a browser and point it to the python server: -``` +``` shell xdg-open http://127.0.0.1:8000 ``` You should see an identifier listed in the left-hand panel, click on it. You should see a test video stream, and hear a test tone. +## Configuration + +The element itself can be configured through its properties, see +`gst-inspect-1.0 webrtcsink` for more information about that, in addition the +default signaller also exposes properties for configuring it, in +particular setting the signalling server address, those properties +can be accessed through the `gst::ChildProxy` interface, for example +with gst-launch: + +``` shell +gst-launch-1.0 webrtcsink signaller::address="ws://127.0.0.1:8443" .. +``` + +The signaller object can not be inspected, refer to [the source code] +for the list of properties. + +[the source code]: plugins/src/signaller/imp.rs + +## Testing congestion control + +For the purpose of testing congestion in a reproducible manner, a +[simple tool] has been used, I only used it on Linux but it is documented +as usable on MacOS too. I had to run the client browser on a separate +machine on my local network for congestion to actually be applied, I didn't +look into why that was necessary. + +My testing procedure was: + +* identify the server machine network interface (eg with `ifconfig` on Linux) + +* identify the client machine IP address (eg with `ifconfig` on Linux) + +* start the various services as explained in the Usage section (use + `GST_DEBUG=webrtcsink:7` to get detailed logs about congestion control) + +* start playback in the client browser + +* Run a `comcast` command on the server machine, for instance: + + ``` shell + /home/meh/go/bin/comcast --device=$SERVER_INTERFACE --target-bw 3000 --target-addr=$CLIENT_IP --target-port=1:65535 --target-proto=udp + ``` + +* Observe the bitrate sharply decreasing, playback should slow down briefly + then catch back up + +* Remove the bandwidth limitation, and observe the bitrate eventually increasing + back to a maximum: + + ``` shell + /home/meh/go/bin/comcast --device=$SERVER_INTERFACE --stop + ``` + +For comparison, the congestion control property can be set to disabled on +webrtcsink, then the above procedure applied again, the expected result is +for playback to simply crawl down to a halt until the bandwidth limitation +is lifted: + +``` shell +gst-launch-1.0 webrtcsink congestion-control=disabled +``` + +[simple tool]: https://github.com/tylertreat/comcast + ## License All code in this repository is licensed under the [MIT license]. diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 5b28425ea..415bc13f4 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -14,11 +14,12 @@ gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gst gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-sdp = { package = "gstreamer-sdp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } once_cell = "1.0" smallvec = "1" anyhow = "1" futures = "0.3" -async-std = "1" +async-std = { version = "1", features = ["unstable"] } async-tungstenite = { version = "0.10", features = ["async-std-runtime", "async-native-tls"] } serde = "1" serde_derive = "1" @@ -48,4 +49,4 @@ install_subdir = "gstreamer-1.0" versioning = false [package.metadata.capi.pkg_config] -requires_private = "gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0" +requires_private = "gstreamer-rtp >= 1.20, gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/plugins/src/webrtcsink/imp.rs b/plugins/src/webrtcsink/imp.rs index a1100b16b..f526af381 100644 --- a/plugins/src/webrtcsink/imp.rs +++ b/plugins/src/webrtcsink/imp.rs @@ -3,6 +3,7 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning}; +use gst_rtp::prelude::*; use async_std::task; use futures::prelude::*; @@ -10,9 +11,11 @@ use futures::prelude::*; use anyhow::{anyhow, Error}; use once_cell::sync::Lazy; use std::collections::HashMap; +use std::ops::Mul; use std::sync::Mutex; use super::utils::{make_element, StreamProducer}; +use super::WebRTCSinkCongestionControl; use crate::signaller::Signaller; use std::collections::BTreeMap; @@ -24,10 +27,17 @@ static CAT: Lazy = Lazy::new(|| { ) }); +const RTP_TWCC_URI: &str = + "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; + +const DEFAULT_CONGESTION_CONTROL: WebRTCSinkCongestionControl = + WebRTCSinkCongestionControl::Homegrown; + /// User configuration struct Settings { video_caps: gst::Caps, audio_caps: gst::Caps, + cc_heuristic: WebRTCSinkCongestionControl, } /// Represents a codec we can offer @@ -57,6 +67,8 @@ struct InputStream { #[derive(Clone)] struct WebRTCPad { pad: gst::Pad, + /// The (fixed) caps of the corresponding input stream + in_caps: gst::Caps, /// Our offer caps caps: gst::Caps, /// The m= line index in the SDP @@ -68,11 +80,66 @@ struct WebRTCPad { payload: Option, } +/// Wrapper around GStreamer encoder element, keeps track of factory +/// name in order to provide a unified set / get bitrate API, also +/// tracks a raw capsfilter used to resize / decimate the input video +/// stream according to the bitrate, thresholds hardcoded for now +struct VideoEncoder { + factory_name: String, + element: gst::Element, + filter: gst::Element, + halved_framerate: gst::Fraction, + full_width: i32, + peer_id: String, +} + +struct CongestionController { + /// Overall bitrate target for all video streams. + /// Hasn't been tested with multiple video streams, but + /// current design is simply to divide bitrate equally. + bitrate_ema: Option, + /// Exponential moving average, updated when bitrate is + /// decreased, discarded when increased again past last + /// congestion window. Smoothing factor hardcoded. + target_bitrate: i32, + /// Exponentially weighted moving variance, recursively + /// updated along with bitrate_ema. sqrt'd to obtain standard + /// deviation, used to determine whether to increase bitrate + /// additively or multiplicatively + bitrate_emvar: f64, + /// Used in additive mode to track last control time, influences + /// calculation of added value according to gcc section 5.5 + last_update_time: Option, + /// For logging purposes + peer_id: String, +} + +#[derive(Debug)] +enum IncreaseType { + /// Increase bitrate by value + Additive(f64), + /// Increase bitrate by factor + Multiplicative(f64), +} + +#[derive(Debug)] +enum CongestionControlOp { + /// Don't update target bitrate + Hold, + /// Decrease target bitrate + Decrease(f64), + /// Increase target bitrate, either additively or multiplicatively + Increase(IncreaseType), +} + struct Consumer { pipeline: gst::Pipeline, webrtcbin: gst::Element, webrtc_pads: HashMap, peer_id: String, + encoders: Vec, + /// None if congestion control was disabled + congestion_controller: Option, } #[derive(PartialEq)] @@ -120,6 +187,7 @@ impl Default for Settings { .iter() .map(|s| gst::Structure::new_empty(s)) .collect::(), + cc_heuristic: WebRTCSinkCongestionControl::Homegrown, } } } @@ -151,9 +219,14 @@ fn setup_encoding( src: &gst::Element, codec: &Codec, ssrc: Option, -) -> Result { + twcc: bool, +) -> Result<(gst::Element, gst::Element, gst::Element), Error> { let conv = match codec.is_video { - true => make_element("videoconvert", None)?, + true => gst::parse_bin_from_description( + "videoconvert ! videoscale ! videorate drop-only=true", + true, + )? + .upcast(), false => gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast(), }; @@ -206,11 +279,66 @@ fn setup_encoding( let conv_caps = if codec.encoder.name() == "nvh264enc" { gst::Caps::builder("video/x-raw") .field("format", &gst::List::new(&[&"NV12", &"YV12", &"I420"])) + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) + .build() + } else if codec.is_video { + gst::Caps::builder("video/x-raw") + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .build() } else { - gst::Caps::new_any() + gst::Caps::builder("audio/x-raw").build() }; + match codec.encoder.name().as_str() { + "vp8enc" | "vp9enc" => { + enc.set_property("deadline", 1i64).unwrap(); + enc.set_property("threads", 12i32).unwrap(); + enc.set_property("target-bitrate", 2560000i32).unwrap(); + enc.set_property("cpu-used", -16i32).unwrap(); + enc.set_property("keyframe-max-dist", 2000i32).unwrap(); + enc.set_property_from_str("end-usage", "cbr").unwrap(); + enc.set_property("buffer-initial-size", 100i32).unwrap(); + enc.set_property("buffer-optimal-size", 120i32).unwrap(); + enc.set_property("buffer-size", 300i32).unwrap(); + enc.set_property("resize-allowed", true).unwrap(); + enc.set_property("max-intra-bitrate", 250i32).unwrap(); + + pay.set_property_from_str("picture-id-mode", "15-bit") + .unwrap(); + } + "x264enc" => { + enc.set_property("bitrate", 25608u32).unwrap(); + enc.set_property_from_str("tune", "zerolatency").unwrap(); + enc.set_property_from_str("speed-preset", "ultrafast") + .unwrap(); + enc.set_property("threads", 12u32).unwrap(); + enc.set_property("key-int-max", 2560u32).unwrap(); + enc.set_property("b-adapt", false).unwrap(); + enc.set_property("vbv-buf-capacity", 120u32).unwrap(); + } + "nvh264enc" => { + enc.set_property("bitrate", 2048u32).unwrap(); + enc.set_property("gop-size", 2560i32).unwrap(); + enc.set_property_from_str("rc-mode", "cbr").unwrap(); + enc.set_property("vbv-buffer-size", 120u32).unwrap(); + enc.set_property("zerolatency", true).unwrap(); + } + _ => (), + } + + /* We only enforce TWCC in the offer caps, once a remote description + * has been set it will get automatically negotiated. This is necessary + * because the implementor in Firefox had apparently not understood the + * concept of *transport-wide* congestion control, and firefox doesn't + * provide feedback for audio packets. + */ + if twcc { + let twcc_extension = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap(); + twcc_extension.set_id(1); + pay.emit_by_name("add-extension", &[&twcc_extension]) + .unwrap(); + } + conv_filter.set_property("caps", conv_caps).unwrap(); let parse_caps = if codec_name == "video/x-h264" { @@ -230,31 +358,380 @@ fn setup_encoding( gst::Element::link_many(&[&parse_filter, &pay]).with_context(|| "Linking encoding elements")?; - Ok(pay) + Ok((enc, conv_filter, pay)) } -impl State { - fn remove_consumer(&mut self, element: &super::WebRTCSink, peer_id: &str, signal: bool) { - if let Some(consumer) = self.consumers.remove(peer_id) { - for webrtc_pad in consumer.webrtc_pads.values() { - if let Some(producer) = self - .streams - .get(&webrtc_pad.stream_name) - .and_then(|stream| stream.producer.as_ref()) +fn lookup_remote_inbound_rtp_stats(stats: &gst::StructureRef) -> Option { + for (_, field_value) in stats { + if let Ok(s) = field_value.get::() { + if let Ok(type_) = s.get::("type") { + if type_ == gst_webrtc::WebRTCStatsType::RemoteInboundRtp { + return Some(s); + } + } + } + } + + None +} + +fn lookup_transport_stats(stats: &gst::StructureRef) -> Option { + for (_, field_value) in stats { + if let Ok(s) = field_value.get::() { + if let Ok(type_) = s.get::("type") { + if type_ == gst_webrtc::WebRTCStatsType::Transport && s.has_field("gst-twcc-stats") { - consumer.disconnect_input_stream(producer); + return Some(s); + } + } + } + } + + None +} + +impl VideoEncoder { + fn new( + element: gst::Element, + filter: gst::Element, + in_caps: &gst::Caps, + peer_id: &str, + ) -> Self { + let s = in_caps.structure(0).unwrap(); + + let halved_framerate = s + .get::("framerate") + .unwrap() + .mul(gst::Fraction::new(1, 2)); + let full_width = s.get::("width").unwrap(); + + Self { + factory_name: element.factory().unwrap().name().into(), + element, + filter, + halved_framerate, + full_width, + peer_id: peer_id.to_string(), + } + } + + fn bitrate(&self) -> i32 { + match self.factory_name.as_str() { + "vp8enc" | "vp9enc" => self + .element + .property("target-bitrate") + .unwrap() + .get::() + .unwrap(), + "x264enc" | "nvh264enc" => { + (self + .element + .property("bitrate") + .unwrap() + .get::() + .unwrap() + * 1000) as i32 + } + _ => unreachable!(), + } + } + + fn set_bitrate(&self, element: &super::WebRTCSink, bitrate: i32) { + match self.factory_name.as_str() { + "vp8enc" | "vp9enc" => self + .element + .set_property("target-bitrate", bitrate) + .unwrap(), + "x264enc" | "nvh264enc" => self + .element + .set_property("bitrate", (bitrate / 1000) as u32) + .unwrap(), + _ => unreachable!(), + } + + let mut s = self + .filter + .property("caps") + .unwrap() + .get::() + .unwrap() + .structure(0) + .unwrap() + .to_owned(); + + // Hardcoded thresholds, may be tuned further in the future, and + // adapted according to the codec in use + if bitrate < 500000 { + s.set("width", 360i32.min(self.full_width)); + s.set("framerate", self.halved_framerate); + } else if bitrate < 1000000 { + s.set("width", 360i32.min(self.full_width)); + s.remove_field("framerate"); + } else if bitrate < 2000000 { + s.set("width", 720i32.min(self.full_width)); + s.remove_field("framerate"); + } else { + s.remove_field("width"); + s.remove_field("framerate"); + } + + let caps = gst::Caps::builder_full().structure(s).build(); + + gst_log!( + CAT, + obj: element, + "consumer {}: setting bitrate {} and caps {} on encoder {:?}", + self.peer_id, + bitrate, + caps, + self.element + ); + + self.filter.set_property("caps", caps).unwrap(); + } +} + +impl CongestionController { + fn new(peer_id: &str) -> Self { + Self { + target_bitrate: 0, + bitrate_ema: None, + bitrate_emvar: 0., + last_update_time: None, + peer_id: peer_id.to_string(), + } + } + + fn update( + &mut self, + element: &super::WebRTCSink, + twcc_stats: &gst::StructureRef, + rtt: f64, + ) -> CongestionControlOp { + let target_bitrate = self.target_bitrate as f64; + // Unwrap, all those fields must be there or there's been an API + // break, which qualifies as programming error + let bitrate_sent = twcc_stats.get::("bitrate-sent").unwrap(); + let bitrate_recv = twcc_stats.get::("bitrate-recv").unwrap(); + let delta_of_delta = twcc_stats.get::("avg-delta-of-delta").unwrap(); + let loss_percentage = twcc_stats.get::("packet-loss-pct").unwrap(); + + let sent_minus_received = bitrate_sent.saturating_sub(bitrate_recv); + + let delay_factor = sent_minus_received as f64 / target_bitrate; + let last_update_time = self.last_update_time.replace(std::time::Instant::now()); + + gst_trace!( + CAT, + obj: element, + "consumer {}: considering stats {}", + self.peer_id, + twcc_stats + ); + + if delay_factor > 0.01 { + CongestionControlOp::Decrease(if delay_factor < 0.64 { + gst_trace!( + CAT, + obj: element, + "consumer {}: low delay factor", + self.peer_id + ); + 0.96 + } else { + gst_trace!( + CAT, + obj: element, + "consumer {}: High delay factor", + self.peer_id + ); + delay_factor.sqrt().sqrt().clamp(0.8, 0.96) + }) + } else if delta_of_delta > 1000000 || loss_percentage > 2.0 { + CongestionControlOp::Decrease(if loss_percentage > 0. && loss_percentage < 2.0 { + gst_trace!(CAT, obj: element, "consumer {}: low loss", self.peer_id); + 0.97 + } else { + gst_log!(CAT, obj: element, "consumer: {}: high loss", self.peer_id); + ((100. - loss_percentage) / 100.).clamp(0.7, 0.98) + }) + } else if loss_percentage > 0.01 { + gst_trace!(CAT, obj: element, "consumer {}: tiny loss", self.peer_id); + CongestionControlOp::Hold + } else { + gst_trace!( + CAT, + obj: element, + "consumer {}: no detected congestion", + self.peer_id + ); + CongestionControlOp::Increase(if let Some(ema) = self.bitrate_ema { + let bitrate_stdev = self.bitrate_emvar.sqrt(); + + gst_trace!( + CAT, + obj: element, + "consumer {}: Old bitrate: {}, ema: {}, stddev: {}", + self.peer_id, + target_bitrate, + ema, + bitrate_stdev, + ); + + // gcc section 5.5 advises 3 standard deviations, but experiments + // have shown this to be too low, probably related to the rest of + // homegrown algorithm not implementing gcc, revisit when implementing + // the rest of the RFC + if target_bitrate < ema - 7. * bitrate_stdev { + gst_trace!( + CAT, + obj: element, + "consumer {}: below last congestion window", + self.peer_id + ); + /* Multiplicative increase */ + IncreaseType::Multiplicative(1.03) + } else if target_bitrate > ema + 7. * bitrate_stdev { + gst_trace!( + CAT, + obj: element, + "consumer {}: above last congestion window", + self.peer_id + ); + /* We have gone past our last estimated max bandwidth + * network situation may have changed, go back to + * multiplicative increase + */ + self.bitrate_ema.take(); + IncreaseType::Multiplicative(1.03) + } else { + let rtt_ms = rtt * 1000.; + let response_time_ms = 100. + rtt_ms; + let time_since_last_update_ms = match last_update_time { + None => 0., + Some(instant) => { + (self.last_update_time.unwrap() - instant).as_millis() as f64 + } + }; + // gcc section 5.5 advises 0.95 as the smoothing factor, but that + // seems intuitively much too low, granting disproportionate importance + // to the last measurement. 0.5 seems plenty enough, I don't have maths + // to back that up though :) + let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0); + let bits_per_frame = target_bitrate / 30.; + let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.)); + let avg_packet_size_bits = bits_per_frame / packets_per_frame; + + gst_trace!( + CAT, + obj: element, + "consumer {}: still in last congestion window", + self.peer_id, + ); + + /* Additive increase */ + IncreaseType::Additive(f64::max(1000., alpha * avg_packet_size_bits)) + } + } else { + /* Multiplicative increase */ + gst_trace!( + CAT, + obj: element, + "consumer {}: outside congestion window", + self.peer_id + ); + IncreaseType::Multiplicative(1.03) + }) + } + } + + fn clamp_bitrate(&mut self, bitrate: i32, n_encoders: i32) { + self.target_bitrate = bitrate.clamp(1000 * n_encoders, 8192000 * n_encoders); + } + + fn control( + &mut self, + element: &super::WebRTCSink, + stats: &gst::StructureRef, + encoders: &Vec, + ) { + let n_encoders = encoders.len() as i32; + + let rtt = lookup_remote_inbound_rtp_stats(stats) + .and_then(|s| s.get::("round-trip-time").ok()) + .unwrap_or(0.); + + if let Some(twcc_stats) = lookup_transport_stats(stats).and_then(|transport_stats| { + transport_stats.get::("gst-twcc-stats").ok() + }) { + let control_op = self.update(element, &twcc_stats, rtt); + + gst_trace!( + CAT, + obj: element, + "consumer {}: applying congestion control operation {:?}", + self.peer_id, + control_op + ); + + match control_op { + CongestionControlOp::Hold => (), + CongestionControlOp::Increase(IncreaseType::Additive(value)) => { + self.clamp_bitrate(self.target_bitrate + value as i32, n_encoders); + } + CongestionControlOp::Increase(IncreaseType::Multiplicative(factor)) => { + self.clamp_bitrate((self.target_bitrate as f64 * factor) as i32, n_encoders); + } + CongestionControlOp::Decrease(factor) => { + self.clamp_bitrate((self.target_bitrate as f64 * factor) as i32, n_encoders); + + // Smoothing factor + let alpha = 0.75; + if let Some(ema) = self.bitrate_ema { + let sigma: f64 = (self.target_bitrate as f64) - ema; + self.bitrate_ema = Some(ema + (alpha * sigma)); + self.bitrate_emvar = + (1. - alpha) * (self.bitrate_emvar + alpha * sigma.powi(2)); + } else { + self.bitrate_ema = Some(self.target_bitrate as f64); + self.bitrate_emvar = 0.; + } } } - consumer.pipeline.call_async(|pipeline| { - let _ = pipeline.set_state(gst::State::Null); - }); - - if signal { - self.signaller.consumer_removed(element, peer_id); + for encoder in encoders { + encoder.set_bitrate(element, self.target_bitrate / n_encoders); } } } +} + +impl State { + fn finalize_consumer(&mut self, element: &super::WebRTCSink, consumer: Consumer, signal: bool) { + for webrtc_pad in consumer.webrtc_pads.values() { + if let Some(producer) = self + .streams + .get(&webrtc_pad.stream_name) + .and_then(|stream| stream.producer.as_ref()) + { + consumer.disconnect_input_stream(producer); + } + } + + consumer.pipeline.call_async(|pipeline| { + let _ = pipeline.set_state(gst::State::Null); + }); + + if signal { + self.signaller.consumer_removed(element, &consumer.peer_id); + } + } + + fn remove_consumer(&mut self, element: &super::WebRTCSink, peer_id: &str, signal: bool) { + if let Some(consumer) = self.consumers.remove(peer_id) { + self.finalize_consumer(element, consumer, signal); + } + } fn maybe_start_signaller(&mut self, element: &super::WebRTCSink) { if self.signaller_state == SignallerState::Stopped @@ -296,16 +773,11 @@ impl Consumer { } /// Request a sink pad on our webrtcbin, and set its transceiver's codec_preferences - fn request_webrtcbin_pad( - &mut self, - element: &super::WebRTCSink, - caps: &gst::Caps, - stream_name: &str, - ) { + fn request_webrtcbin_pad(&mut self, element: &super::WebRTCSink, stream: &InputStream) { let ssrc = self.generate_ssrc(); let media_idx = self.webrtc_pads.len() as i32; - let mut payloader_caps = caps.to_owned(); + let mut payloader_caps = stream.out_caps.as_ref().unwrap().to_owned(); { let payloader_caps_mut = payloader_caps.make_mut(); @@ -346,10 +818,11 @@ impl Consumer { ssrc, WebRTCPad { pad, + in_caps: stream.in_caps.as_ref().unwrap().clone(), caps: payloader_caps, media_idx: media_idx as u32, ssrc, - stream_name: stream_name.to_string(), + stream_name: stream.sink_pad.name().to_string(), payload: None, }, ); @@ -358,7 +831,7 @@ impl Consumer { /// Called when we have received an answer, connects an InputStream /// to a given WebRTCPad fn connect_input_stream( - &self, + &mut self, element: &super::WebRTCSink, producer: &StreamProducer, webrtc_pad: &WebRTCPad, @@ -381,7 +854,33 @@ impl Consumer { let appsrc = make_element("appsrc", None)?; self.pipeline.add(&appsrc).unwrap(); - let pay = setup_encoding(&self.pipeline, &appsrc, codec, Some(webrtc_pad.ssrc))?; + let (enc, filter, pay) = + setup_encoding(&self.pipeline, &appsrc, codec, Some(webrtc_pad.ssrc), false)?; + + if codec.is_video { + let enc = VideoEncoder::new( + enc.clone(), + filter.clone(), + &webrtc_pad.in_caps, + &self.peer_id, + ); + + if let Some(congestion_controller) = self.congestion_controller.as_mut() { + congestion_controller.target_bitrate += enc.bitrate(); + } else { + /* If congestion control is disabled, we simply use the highest + * known "safe" value for the bitrate. + * + * I have found higher values to cause packet loss *somewhere* in + * my local network, possibly related to chrome's pretty low UDP + * buffer sizes, this probably should be exposed as a property + * eventually. + */ + enc.set_bitrate(element, 8192000i32); + } + + self.encoders.push(enc); + } let appsrc = appsrc.downcast::().unwrap(); @@ -723,6 +1222,7 @@ impl WebRTCSink { /// Called by the signaller to add a new consumer pub fn add_consumer(&self, element: &super::WebRTCSink, peer_id: &str) -> Result<(), Error> { + let cc_heuristic = self.settings.lock().unwrap().cc_heuristic; let mut state = self.state.lock().unwrap(); if state.consumers.contains_key(peer_id) { @@ -882,15 +1382,17 @@ impl WebRTCSink { webrtcbin, webrtc_pads: HashMap::new(), peer_id: peer_id.to_string(), + congestion_controller: match cc_heuristic { + WebRTCSinkCongestionControl::Disabled => None, + WebRTCSinkCongestionControl::Homegrown => Some(CongestionController::new(peer_id)), + }, + encoders: Vec::new(), }; - state.streams.iter().for_each(|(_, stream)| { - consumer.request_webrtcbin_pad( - element, - stream.out_caps.as_ref().unwrap(), - stream.sink_pad.name().as_str(), - ) - }); + state + .streams + .iter() + .for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &stream)); let clock = element.clock(); @@ -950,12 +1452,27 @@ impl WebRTCSink { Ok(()) } + fn process_webrtcbin_stats( + &self, + element: &super::WebRTCSink, + peer_id: &str, + stats: &gst::StructureRef, + ) { + let mut state = self.state.lock().unwrap(); + + if let Some(consumer) = state.consumers.get_mut(peer_id) { + if let Some(congestion_controller) = consumer.congestion_controller.as_mut() { + congestion_controller.control(element, stats, &consumer.encoders); + } + } + } + fn on_remote_description_set(&self, element: &super::WebRTCSink, peer_id: String) { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let mut remove = false; - if let Some(consumer) = state.consumers.get(&peer_id) { - for webrtc_pad in consumer.webrtc_pads.values() { + if let Some(mut consumer) = state.consumers.remove(&peer_id) { + for webrtc_pad in consumer.webrtc_pads.clone().values() { if let Some(producer) = state .streams .get(&webrtc_pad.stream_name) @@ -986,12 +1503,43 @@ impl WebRTCSink { break; } } - } - drop(state); + let element_clone = element.downgrade(); + let webrtcbin = consumer.webrtcbin.downgrade(); + let peer_id_clone = peer_id.clone(); - if remove { - let _ = self.remove_consumer(element, &peer_id, true); + task::spawn(async move { + let mut interval = + async_std::stream::interval(std::time::Duration::from_millis(100)); + + while let Some(_) = interval.next().await { + let element_clone = element_clone.clone(); + let peer_id_clone = peer_id_clone.clone(); + if let Some(webrtcbin) = webrtcbin.upgrade() { + let promise = gst::Promise::with_change_func(move |reply| { + if let Some(element) = element_clone.upgrade() { + let this = Self::from_instance(&element); + + if let Ok(Some(stats)) = reply { + this.process_webrtcbin_stats(&element, &peer_id_clone, stats); + } + } + }); + + webrtcbin + .emit_by_name("get-stats", &[&None::, &promise]) + .unwrap(); + } else { + break; + } + } + }); + + if remove { + state.finalize_consumer(element, consumer, true); + } else { + state.consumers.insert(consumer.peer_id.clone(), consumer); + } } } @@ -1110,7 +1658,7 @@ impl WebRTCSink { src.link(&capsfilter) .with_context(|| format!("Running discovery pipeline for caps {}", caps))?; - let pay = setup_encoding(&pipe.0, &capsfilter, codec, None)?; + let (_, _, pay) = setup_encoding(&pipe.0, &capsfilter, codec, None, true)?; let sink = make_element("fakesink", None)?; @@ -1144,6 +1692,7 @@ impl WebRTCSink { "seqnum-offset", "ssrc", "sprop-parameter-sets", + "a-framerate", ]); s.set("payload", codec.payload); return Ok(s); @@ -1349,6 +1898,14 @@ impl ObjectImpl for WebRTCSink { gst::Caps::static_type(), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpec::new_enum( + "congestion-control", + "Congestion control", + "Defines how congestion is controlled, if at all", + WebRTCSinkCongestionControl::static_type(), + DEFAULT_CONGESTION_CONTROL as i32, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -1377,6 +1934,12 @@ impl ObjectImpl for WebRTCSink { .expect("type checked upstream") .unwrap_or_else(|| gst::Caps::new_empty()); } + "congestion-control" => { + let mut settings = self.settings.lock().unwrap(); + settings.cc_heuristic = value + .get::() + .expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -1391,6 +1954,10 @@ impl ObjectImpl for WebRTCSink { let settings = self.settings.lock().unwrap(); settings.audio_caps.to_value() } + "congestion-control" => { + let settings = self.settings.lock().unwrap(); + settings.cc_heuristic.to_value() + } _ => unimplemented!(), } } diff --git a/plugins/src/webrtcsink/mod.rs b/plugins/src/webrtcsink/mod.rs index 711e3c5d3..7609422d5 100644 --- a/plugins/src/webrtcsink/mod.rs +++ b/plugins/src/webrtcsink/mod.rs @@ -110,6 +110,16 @@ impl WebRTCSink { } } +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::GEnum)] +#[repr(u32)] +#[genum(type_name = "GstWebRTCSinkCongestionControl")] +pub enum WebRTCSinkCongestionControl { + #[genum(name = "Disabled: no congestion control is applied", nick = "disabled")] + Disabled, + #[genum(name = "Homegrown: simple sender-side heuristic", nick = "homegrown")] + Homegrown, +} + pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin),