Implement support for FEC / retransmission

This commit is contained in:
Mathieu Duponchelle 2021-12-10 00:06:46 +01:00
parent 7bd7c4e960
commit 1826111278
7 changed files with 274 additions and 31 deletions

97
Cargo.lock generated
View file

@ -179,6 +179,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
@ -277,6 +288,36 @@ dependencies = [
"winapi",
]
[[package]]
name = "clap"
version = "3.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c068998524e6d40ea78c8d2a4b00398f0a8b818c2d484bcb3cbeb2cff2c105ae"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_derive"
version = "3.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0152ba3ee01fa5a9133d4e15a1d9659c75d2270365768dd5a880cc7e68871874"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
@ -783,6 +824,12 @@ dependencies = [
"system-deps",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
@ -829,6 +876,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -1024,6 +1081,15 @@ dependencies = [
"paste",
]
[[package]]
name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
dependencies = [
"memchr",
]
[[package]]
name = "parking"
version = "2.0.0"
@ -1373,6 +1439,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.81"
@ -1411,6 +1483,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
[[package]]
name = "thiserror"
version = "1.0.30"
@ -1743,6 +1830,7 @@ dependencies = [
"anyhow",
"async-std",
"async-tungstenite",
"clap",
"fastrand",
"futures",
"gst-plugin-version-helper",
@ -1788,6 +1876,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View file

@ -176,6 +176,13 @@ gst-launch-1.0 webrtcsink congestion-control=disabled
[simple tool]: https://github.com/tylertreat/comcast
## Monitoring tool
An example server / client application for monitoring per-consumer stats
can be found [here].
[here]: plugins/examples/README.md
## License
All code in this repository is licensed under the [MIT license].

View file

@ -31,6 +31,7 @@ tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.2", features = ["registry", "env-filter"] }
tracing-log = "0.1"
uuid = { version = "0.8", features = ["v4"] }
clap = { version = "3.0.0-rc.1", features = ["derive"] }
[lib]
name = "webrtcsink"

View file

@ -6,13 +6,31 @@ use anyhow::Error;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use clap::Parser;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib::Type;
use gst::prelude::*;
use tracing::info;
use tracing::{debug, info, trace};
use tracing_subscriber::prelude::*;
#[derive(Parser, Debug)]
#[clap(about, version, author)]
/// Program arguments
struct Args {
/// URI of file to serve. Must hold at least one audio and video stream
uri: String,
/// Disable Forward Error Correction
#[clap(long)]
disable_fec: bool,
/// Disable retransmission
#[clap(long)]
disable_retransmission: bool,
/// Disable congestion control
#[clap(long)]
disable_congestion_control: bool,
}
fn serialize_value(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
Type::STRING => Some(val.get::<String>().unwrap().into()),
@ -68,7 +86,7 @@ struct State {
listeners: Vec<Listener>,
}
async fn run() -> Result<(), Error> {
async fn run(args: Args) -> Result<(), Error> {
tracing_log::LogTracer::init().expect("Failed to set logger");
let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTCSINK_STATS_LOG")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
@ -93,8 +111,24 @@ async fn run() -> Result<(), Error> {
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
let pipeline =
gst::parse_launch("webrtcsink name=ws videotestsrc ! queue ! ws. audiotestsrc ! ws.")?;
info!("Disable FEC: {}", args.disable_fec);
let pipeline_str = format!(
"webrtcsink name=ws do-retransmission={} do-fec={} congestion-control={} \
uridecodebin name=d uri={} \
d. ! video/x-raw ! queue ! ws.video_0 \
d. ! audio/x-raw ! queue ! ws.audio_0",
!args.disable_retransmission,
!args.disable_fec,
if args.disable_congestion_control {
"disabled"
} else {
"homegrown"
},
args.uri
);
let pipeline = gst::parse_launch(&pipeline_str)?;
let ws = pipeline
.downcast_ref::<gst::Bin>()
.unwrap()
@ -110,7 +144,7 @@ async fn run() -> Result<(), Error> {
if let Some(ws) = ws_clone.upgrade() {
let stats = ws.property::<gst::Structure>("stats");
let stats = serialize_value(&stats.to_value()).unwrap();
info!("Stats: {}", serde_json::to_string_pretty(&stats).unwrap());
debug!("Stats: {}", serde_json::to_string_pretty(&stats).unwrap());
let msg = WsMessage::Text(serde_json::to_string(&stats).unwrap());
let listeners = state_clone.lock().unwrap().listeners.clone();
@ -163,7 +197,7 @@ async fn accept_connection(state: Arc<Mutex<State>>, stream: TcpStream) {
task::spawn(async move {
while let Some(msg) = receiver.next().await {
info!("Sending to one listener!");
trace!("Sending to one listener!");
if ws_stream.send(msg).await.is_err() {
info!("Listener errored out");
receiver.close();
@ -175,5 +209,7 @@ async fn accept_connection(state: Arc<Mutex<State>>, stream: TcpStream) {
fn main() -> Result<(), Error> {
gst::init()?;
task::block_on(run())
let args = Args::parse();
task::block_on(run(args))
}

View file

@ -16,9 +16,19 @@
let timeout: ReturnType<typeof setTimeout> | undefined = undefined
const updateConsumerStats = (consumer: ConsumerType, stats: Object) => {
let target_bitrate = 0
let fec_percentage = 0
let keyframe_requests = 0
let retransmission_requests = 0
let bitrate_sent = 0
let bitrate_recv = 0
let packet_loss = 0
let delta_of_delta = 0
if (stats["consumer-stats"]["video-encoders"].length > 0) {
let venc = stats["consumer-stats"]["video-encoders"][0]
consumer.stats["target_bitrate"] = venc["bitrate"]
target_bitrate = venc["bitrate"]
fec_percentage = venc["fec-percentage"]
consumer.video_codec = venc["codec-name"]
let mitigation_mode = MitigationMode.None
@ -41,22 +51,32 @@
}
consumer.mitigation_mode = mitigation_mode
} else {
consumer.stats["target_bitrate"] = 0
}
for (let svalue of Object.values(stats)) {
if (svalue["type"] == "transport") {
let twcc_stats = svalue["gst-twcc-stats"]
if (twcc_stats !== undefined) {
consumer.stats["bitrate_sent"] = twcc_stats["bitrate-sent"]
consumer.stats["bitrate_recv"] = twcc_stats["bitrate-recv"]
consumer.stats["packet_loss"] = twcc_stats["packet-loss-pct"]
consumer.stats["delta_of_delta"] = twcc_stats["avg-delta-of-delta"]
bitrate_sent = twcc_stats["bitrate-sent"]
bitrate_recv = twcc_stats["bitrate-recv"]
packet_loss = twcc_stats["packet-loss-pct"]
delta_of_delta = twcc_stats["avg-delta-of-delta"]
}
} else if (svalue["type"] == "outbound-rtp") {
keyframe_requests += svalue["pli-count"]
retransmission_requests += svalue["nack-count"]
}
}
consumer.stats["target_bitrate"] = target_bitrate
consumer.stats["fec_percentage"] = fec_percentage
consumer.stats["bitrate_sent"] = bitrate_sent
consumer.stats["bitrate_recv"] = bitrate_recv
consumer.stats["packet_loss"] = packet_loss
consumer.stats["delta_of_delta"] = delta_of_delta
consumer.stats["keyframe_requests"] = keyframe_requests
consumer.stats["retransmission_requests"] = retransmission_requests
}
const fetchStats = () => {
@ -93,10 +113,13 @@
mitigation_mode: MitigationMode.None,
stats: new Map([
["target_bitrate", 0],
["fec_percentage", 0],
["bitrate_sent", 0],
["bitrate_recv", 0],
["packet_loss", 0],
["delta_of_delta", 0]
["delta_of_delta", 0],
["keyframe_requests", 0],
["retransmission_requests", 0],
]),
}
consumers.set(key, consumer)

View file

@ -25,10 +25,11 @@
let traces = []
let layout = {
legend: {traceorder: 'reversed'},
height: 800,
}
let ctr = 1;
let domain_step = 1.0 / consumer.stats.size
let domain_margin = domain_step / consumer.stats.size
let domain_margin = 0.05
for (let key of consumer.stats.keys()) {
let trace = {
@ -112,7 +113,7 @@
<style lang="scss">
.modal {
&-body {
width: 700px;
width: 1000px;
padding: 20px 15px 10px;
gap: 15px 0;
.id {

View file

@ -39,6 +39,8 @@ const DEFAULT_MIN_BITRATE: u32 = 1000;
const DEFAULT_MAX_BITRATE: u32 = 8192000;
const DEFAULT_CONGESTION_CONTROL: WebRTCSinkCongestionControl =
WebRTCSinkCongestionControl::Homegrown;
const DEFAULT_DO_FEC: bool = true;
const DEFAULT_DO_RETRANSMISSION: bool = true;
/// User configuration
struct Settings {
@ -49,6 +51,8 @@ struct Settings {
cc_heuristic: WebRTCSinkCongestionControl,
min_bitrate: u32,
max_bitrate: u32,
do_fec: bool,
do_retransmission: bool,
}
/// Represents a codec we can offer
@ -104,6 +108,7 @@ struct VideoEncoder {
full_width: i32,
peer_id: String,
mitigation_mode: WebRTCSinkMitigationMode,
transceiver: gst_webrtc::WebRTCRTPTransceiver,
}
struct CongestionController {
@ -212,6 +217,8 @@ impl Default for Settings {
turn_server: None,
min_bitrate: DEFAULT_MIN_BITRATE,
max_bitrate: DEFAULT_MAX_BITRATE,
do_fec: DEFAULT_DO_FEC,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
}
}
}
@ -320,13 +327,14 @@ fn setup_encoding(
enc.set_property("target-bitrate", 2560000i32);
enc.set_property("cpu-used", -16i32);
enc.set_property("keyframe-max-dist", 2000i32);
enc.set_property_from_str("keyframe-mode", "disabled");
enc.set_property_from_str("end-usage", "cbr");
enc.set_property("buffer-initial-size", 100i32);
enc.set_property("buffer-optimal-size", 120i32);
enc.set_property("buffer-size", 300i32);
enc.set_property("buffer-size", 150i32);
enc.set_property("resize-allowed", true);
enc.set_property("max-intra-bitrate", 250i32);
enc.set_property_from_str("error-resilient", "default");
pay.set_property_from_str("picture-id-mode", "15-bit");
}
"x264enc" => {
@ -418,6 +426,7 @@ impl VideoEncoder {
in_caps: &gst::Caps,
peer_id: &str,
codec_name: &str,
transceiver: gst_webrtc::WebRTCRTPTransceiver,
) -> Self {
let s = in_caps.structure(0).unwrap();
@ -436,6 +445,7 @@ impl VideoEncoder {
full_width,
peer_id: peer_id.to_string(),
mitigation_mode: WebRTCSinkMitigationMode::NONE,
transceiver,
}
}
@ -504,6 +514,10 @@ impl VideoEncoder {
.field("bitrate", self.bitrate())
.field("mitigation-mode", self.mitigation_mode)
.field("codec-name", self.codec_name.as_str())
.field(
"fec-percentage",
self.transceiver.property::<u32>("fec-percentage"),
)
.build()
}
}
@ -741,8 +755,23 @@ impl CongestionController {
}
}
let target_bitrate = self.target_bitrate / n_encoders;
let fec_ratio = {
if target_bitrate <= 2000000 || self.max_bitrate <= 2000000 {
0f64
} else {
(target_bitrate as f64 - 2000000f64) / (self.max_bitrate as f64 - 2000000f64)
}
};
let fec_percentage = (fec_ratio * 50f64) as u32;
for encoder in encoders.iter_mut() {
encoder.set_bitrate(element, self.target_bitrate / n_encoders);
encoder.set_bitrate(element, target_bitrate);
encoder
.transceiver
.set_property("fec-percentage", fec_percentage);
}
}
}
@ -752,10 +781,7 @@ impl State {
fn finalize_consumer(&mut self, element: &super::WebRTCSink, consumer: Consumer, signal: bool) {
consumer.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"removing-peer-{}-",
consumer.peer_id,
),
format!("removing-peer-{}-", consumer.peer_id,),
);
for webrtc_pad in consumer.webrtc_pads.values() {
@ -842,7 +868,12 @@ impl Consumer {
}
/// Request a sink pad on our webrtcbin, and set its transceiver's codec_preferences
fn request_webrtcbin_pad(&mut self, element: &super::WebRTCSink, stream: &InputStream) {
fn request_webrtcbin_pad(
&mut self,
element: &super::WebRTCSink,
settings: &Settings,
stream: &InputStream,
) {
let ssrc = self.generate_ssrc();
let media_idx = self.webrtc_pads.len() as i32;
@ -875,6 +906,14 @@ impl Consumer {
transceiver.set_property("codec-preferences", &payloader_caps);
if stream.sink_pad.name().starts_with("video_") {
if settings.do_fec {
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
transceiver.set_property("do-nack", settings.do_retransmission);
}
self.webrtc_pads.insert(
ssrc,
WebRTCPad {
@ -971,16 +1010,17 @@ impl Consumer {
&webrtc_pad.in_caps,
&self.peer_id,
codec.caps.structure(0).unwrap().name(),
transceiver,
);
if let Some(congestion_controller) = self.congestion_controller.as_mut() {
congestion_controller.target_bitrate += enc.bitrate();
enc.transceiver.set_property("fec-percentage", 0u32);
} else {
/* If congestion control is disabled, we simply use the highest
* known "safe" value for the bitrate.
*
*/
* known "safe" value for the bitrate. */
enc.set_bitrate(element, self.max_bitrate as i32);
enc.transceiver.set_property("fec-percentage", 50u32);
}
self.encoders.push(enc);
@ -1492,7 +1532,7 @@ impl WebRTCSink {
state
.streams
.iter()
.for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &stream));
.for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &settings, &stream));
let clock = element.clock();
@ -1625,6 +1665,11 @@ impl WebRTCSink {
}
}
consumer.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("webrtcsink-peer-{}-remote-description-set", peer_id,),
);
let element_clone = element.downgrade();
let webrtcbin = consumer.webrtcbin.downgrade();
let peer_id_clone = peer_id.clone();
@ -2073,6 +2118,20 @@ impl ObjectImpl for WebRTCSink {
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
glib::ParamSpecBoolean::new(
"do-fec",
"Do Forward Error Correction",
"Whether the element should negotiate and send FEC data",
DEFAULT_DO_FEC,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY
),
glib::ParamSpecBoolean::new(
"do-retransmission",
"Do retransmission",
"Whether the element should offer to honor retransmission requests",
DEFAULT_DO_RETRANSMISSION,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY
),
]
});
@ -2126,14 +2185,15 @@ impl ObjectImpl for WebRTCSink {
match new_heuristic {
WebRTCSinkCongestionControl::Disabled => {
consumer.congestion_controller.take();
},
}
WebRTCSinkCongestionControl::Homegrown => {
let _ = consumer.congestion_controller.insert(
CongestionController::new(
peer_id,
settings.min_bitrate,
settings.max_bitrate,
));
),
);
}
}
}
@ -2147,6 +2207,14 @@ impl ObjectImpl for WebRTCSink {
let mut settings = self.settings.lock().unwrap();
settings.max_bitrate = value.get::<u32>().expect("type checked upstream");
}
"do-fec" => {
let mut settings = self.settings.lock().unwrap();
settings.do_fec = value.get::<bool>().expect("type checked upstream");
}
"do-retransmission" => {
let mut settings = self.settings.lock().unwrap();
settings.do_retransmission = value.get::<bool>().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -2181,6 +2249,14 @@ impl ObjectImpl for WebRTCSink {
let settings = self.settings.lock().unwrap();
settings.max_bitrate.to_value()
}
"do-fec" => {
let settings = self.settings.lock().unwrap();
settings.do_fec.to_value()
}
"do-retransmission" => {
let settings = self.settings.lock().unwrap();
settings.do_retransmission.to_value()
}
"stats" => self.gather_stats().to_value(),
_ => unimplemented!(),
}
@ -2269,6 +2345,8 @@ impl ElementImpl for WebRTCSink {
_name: Option<String>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
gst_error!(CAT, "pad being requested");
if element.current_state() > gst::State::Ready {
gst_error!(CAT, "element pads can only be requested before starting");
return None;