From f6f079f3a8361d64dbfc0485a97412b19b8de151 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Wed, 11 May 2022 10:41:11 -0400 Subject: [PATCH] Use StreamConsumer from gstreamer itself Merged in https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1022 --- Cargo.lock | 150 ++++++--------- plugins/Cargo.toml | 1 + plugins/src/webrtcsink/imp.rs | 52 +++--- plugins/src/webrtcsink/mod.rs | 1 - plugins/src/webrtcsink/utils.rs | 313 -------------------------------- 5 files changed, 86 insertions(+), 431 deletions(-) delete mode 100644 plugins/src/webrtcsink/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 339d1a76..0c2e03fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.17" +version = "3.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47582c09be7c8b32c0ab3a6181825ababb713fde6fff20fc573a3870dd45c6a0" +checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b" dependencies = [ "atty", "bitflags", @@ -330,9 +330,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.1.7" +version = "3.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1" +checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c" dependencies = [ "heck", "proc-macro-error", @@ -598,7 +598,7 @@ dependencies = [ [[package]] name = "glib" version = "0.16.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4" +source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6" dependencies = [ "bitflags", "futures-channel", @@ -618,7 +618,7 @@ dependencies = [ [[package]] name = "glib-macros" version = "0.16.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4" +source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6" dependencies = [ "anyhow", "heck", @@ -632,7 +632,7 @@ dependencies = [ [[package]] name = "glib-sys" version = "0.16.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4" +source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6" dependencies = [ "libc", "system-deps", @@ -653,7 +653,7 @@ dependencies = [ [[package]] name = "gobject-sys" version = "0.16.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4" +source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6" dependencies = [ "glib-sys", "libc", @@ -672,7 +672,7 @@ dependencies = [ [[package]] name = "gstreamer" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "bitflags", "cfg-if", @@ -697,7 +697,7 @@ dependencies = [ [[package]] name = "gstreamer-app" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "bitflags", "futures-core", @@ -713,7 +713,7 @@ dependencies = [ [[package]] name = "gstreamer-app-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -725,7 +725,7 @@ dependencies = [ [[package]] name = "gstreamer-base" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "bitflags", "cfg-if", @@ -738,7 +738,7 @@ dependencies = [ [[package]] name = "gstreamer-base-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gobject-sys", @@ -750,7 +750,7 @@ dependencies = [ [[package]] name = "gstreamer-rtp" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "bitflags", "glib", @@ -763,7 +763,7 @@ dependencies = [ [[package]] name = "gstreamer-rtp-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -775,7 +775,7 @@ dependencies = [ [[package]] name = "gstreamer-sdp" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib", "gstreamer", @@ -785,7 +785,7 @@ dependencies = [ [[package]] name = "gstreamer-sdp-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gstreamer-sys", @@ -796,7 +796,7 @@ dependencies = [ [[package]] name = "gstreamer-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gobject-sys", @@ -804,10 +804,22 @@ dependencies = [ "system-deps", ] +[[package]] +name = "gstreamer-utils" +version = "0.19.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" +dependencies = [ + "gstreamer", + "gstreamer-app", + "gstreamer-video", + "once_cell", + "thiserror", +] + [[package]] name = "gstreamer-video" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "bitflags", "cfg-if", @@ -824,7 +836,7 @@ dependencies = [ [[package]] name = "gstreamer-video-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gobject-sys", @@ -837,7 +849,7 @@ dependencies = [ [[package]] name = "gstreamer-webrtc" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib", "gstreamer", @@ -849,7 +861,7 @@ dependencies = [ [[package]] name = "gstreamer-webrtc-sys" version = "0.19.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537" dependencies = [ "glib-sys", "gstreamer-sdp-sys", @@ -933,9 +945,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "js-sys" @@ -963,9 +975,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "log" @@ -1125,9 +1137,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.0.0" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" +checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" [[package]] name = "parking" @@ -1226,11 +1238,11 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" +checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -1316,9 +1328,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" [[package]] name = "schannel" @@ -1415,9 +1427,9 @@ dependencies = [ [[package]] name = "signal-hook" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" dependencies = [ "libc", "signal-hook-registry", @@ -1446,12 +1458,12 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" -version = "0.4.5" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca642ba17f8b2995138b1d7711829c92e98c0a25ea019de790f4f09279c4e296" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" dependencies = [ "libc", - "windows-sys", + "winapi", ] [[package]] @@ -1462,13 +1474,13 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.92" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52" +checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -1672,6 +1684,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +[[package]] +name = "unicode-ident" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" + [[package]] name = "unicode-normalization" version = "0.1.19" @@ -1681,12 +1699,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-xid" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" - [[package]] name = "url" version = "2.2.2" @@ -1852,6 +1864,7 @@ dependencies = [ "gstreamer-app", "gstreamer-rtp", "gstreamer-sdp", + "gstreamer-utils", "gstreamer-video", "gstreamer-webrtc", "human_bytes", @@ -1936,46 +1949,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-sys" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" - -[[package]] -name = "windows_i686_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" - -[[package]] -name = "windows_i686_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 0b4c318b..e57d16d3 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -15,6 +15,7 @@ gst-video = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", packa gst-webrtc = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-webrtc", features = ["v1_20"] } gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-sdp", features = ["v1_20"] } gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] } +gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" } once_cell = "1.0" smallvec = "1" anyhow = "1" diff --git a/plugins/src/webrtcsink/imp.rs b/plugins/src/webrtcsink/imp.rs index e0421917..61168262 100644 --- a/plugins/src/webrtcsink/imp.rs +++ b/plugins/src/webrtcsink/imp.rs @@ -4,6 +4,7 @@ use gst::glib::value::FromValue; use gst::prelude::*; use gst::subclass::prelude::*; use gst_rtp::prelude::*; +use gst_utils::StreamProducer; use gst_video::prelude::*; use gst_video::subclass::prelude::*; use gst_webrtc::WebRTCDataChannel; @@ -17,7 +18,6 @@ use std::collections::HashMap; use std::ops::Mul; use std::sync::Mutex; -use super::utils::{make_element, StreamProducer}; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::signaller::Signaller; use std::collections::BTreeMap; @@ -202,6 +202,7 @@ struct Consumer { max_bitrate: u32, + links: HashMap, stats_sigid: Option, } @@ -240,6 +241,13 @@ fn create_navigation_event>(sink: &N, msg: &str) { } } +/// Wrapper around `gst::ElementFactory::make` with a better error +/// message +pub fn make_element(element: &str, name: Option<&str>) -> Result { + gst::ElementFactory::make(element, name) + .with_context(|| format!("Failed to make element {}", element)) +} + /// Simple utility for tearing down a pipeline cleanly struct PipelineWrapper(gst::Pipeline); @@ -965,7 +973,7 @@ impl State { fn finalize_consumer( &mut self, element: &super::WebRTCSink, - consumer: &Consumer, + consumer: &mut Consumer, signal: bool, ) { consumer.pipeline.debug_to_dot_file_with_ts( @@ -973,14 +981,8 @@ impl State { format!("removing-peer-{}-", consumer.peer_id,), ); - for webrtc_pad in consumer.webrtc_pads.values() { - if let Some(producer) = self - .streams - .get(&webrtc_pad.stream_name) - .and_then(|stream| stream.producer.as_ref()) - { - consumer.disconnect_input_stream(producer); - } + for ssrc in consumer.webrtc_pads.keys() { + consumer.links.remove(ssrc); } consumer.pipeline.call_async(|pipeline| { @@ -998,8 +1000,8 @@ impl State { peer_id: &str, signal: bool, ) -> Option { - if let Some(consumer) = self.consumers.remove(peer_id) { - self.finalize_consumer(element, &consumer, signal); + if let Some(mut consumer) = self.consumers.remove(peer_id) { + self.finalize_consumer(element, &mut consumer, signal); Some(consumer) } else { None @@ -1052,6 +1054,7 @@ impl Consumer { stats: gst::Structure::new_empty("application/x-webrtc-stats"), webrtc_pads: HashMap::new(), encoders: Vec::new(), + links: HashMap::new(), stats_sigid: None, } } @@ -1168,7 +1171,7 @@ impl Consumer { .get(&payload) .ok_or_else(|| anyhow!("No codec for payload {}", payload))?; - let appsrc = make_element("appsrc", None)?; + let appsrc = make_element("appsrc", Some(&webrtc_pad.stream_name))?; self.pipeline.add(&appsrc).unwrap(); let pay_filter = make_element("capsfilter", None)?; @@ -1258,11 +1261,7 @@ impl Consumer { } let appsrc = appsrc.downcast::().unwrap(); - - appsrc.set_format(gst::Format::Time); - appsrc.set_is_live(true); - appsrc.set_handle_segment_change(true); - + gst_utils::StreamProducer::configure_consumer(&appsrc); self.pipeline .sync_children_states() .with_context(|| format!("Connecting input stream for {}", self.peer_id))?; @@ -1275,14 +1274,13 @@ impl Consumer { .link(&webrtc_pad.pad) .with_context(|| format!("Connecting input stream for {}", self.peer_id))?; - producer.add_consumer(&appsrc, &self.peer_id); - - Ok(()) - } - - /// Called when tearing down the consumer - fn disconnect_input_stream(&self, producer: &StreamProducer) { - producer.remove_consumer(&self.peer_id); + match producer.add_consumer(&appsrc) { + Ok(link) => { + self.links.insert(webrtc_pad.ssrc, link); + Ok(()) + } + Err(err) => Err(anyhow!("Could not link producer: {:?}", err)), + } } } @@ -2038,7 +2036,7 @@ impl WebRTCSink { }); if remove { - state.finalize_consumer(element, &consumer, true); + state.finalize_consumer(element, &mut consumer, true); } else { state.consumers.insert(consumer.peer_id.clone(), consumer); } diff --git a/plugins/src/webrtcsink/mod.rs b/plugins/src/webrtcsink/mod.rs index c700c1b8..d933342d 100644 --- a/plugins/src/webrtcsink/mod.rs +++ b/plugins/src/webrtcsink/mod.rs @@ -4,7 +4,6 @@ use gst::subclass::prelude::ObjectSubclassExt; use std::error::Error; mod imp; -mod utils; glib::wrapper! { pub struct WebRTCSink(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; diff --git a/plugins/src/webrtcsink/utils.rs b/plugins/src/webrtcsink/utils.rs deleted file mode 100644 index ca911179..00000000 --- a/plugins/src/webrtcsink/utils.rs +++ /dev/null @@ -1,313 +0,0 @@ -use std::collections::HashMap; -use std::mem; -use std::sync::{atomic, Arc, Mutex}; - -use anyhow::{Context, Error}; -use once_cell::sync::Lazy; - -use gst::prelude::*; - -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "app-stream-producer", - gst::DebugColorFlags::empty(), - Some("gst_app Stream Producer interface"), - ) -}); - -/// Wrapper around `gst::ElementFactory::make` with a better error -/// message -pub fn make_element(element: &str, name: Option<&str>) -> Result { - gst::ElementFactory::make(element, name) - .with_context(|| format!("Failed to make element {}", element)) -} - -/// The interface for transporting media data from one node -/// to another. -/// -/// A producer is essentially a GStreamer `appsink` whose output -/// is sent to a set of consumers, who are essentially `appsrc` wrappers -#[derive(Debug, Clone)] -pub struct StreamProducer { - /// The appsink to dispatch data for - appsink: gst_app::AppSink, - /// The consumers to dispatch data to - consumers: Arc>, -} - -impl PartialEq for StreamProducer { - fn eq(&self, other: &Self) -> bool { - self.appsink.eq(&other.appsink) - } -} - -impl Eq for StreamProducer {} - -impl StreamProducer { - /// Add an appsrc to dispatch data to - pub fn add_consumer(&self, consumer: &gst_app::AppSrc, consumer_id: &str) { - let mut consumers = self.consumers.lock().unwrap(); - if consumers.consumers.get(consumer_id).is_some() { - gst::error!(CAT, "Consumer already added"); - return; - } - - gst::debug!(CAT, "Adding consumer"); - - consumer.set_property("max-buffers", 0u64); - consumer.set_property("max-bytes", 0u64); - consumer.set_property("max-time", 500 * gst::ClockTime::MSECOND); - consumer.set_property_from_str("leaky-type", "downstream"); - - // Forward force-keyunit events upstream to the appsink - let srcpad = consumer.static_pad("src").unwrap(); - let appsink_clone = self.appsink.clone(); - let fku_probe_id = srcpad - .add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() { - gst::debug!(CAT, "Requesting keyframe"); - let _ = appsink_clone.send_event(ev.clone()); - } - } - - gst::PadProbeReturn::Ok - }) - .unwrap(); - - consumers.consumers.insert( - consumer_id.to_string(), - StreamConsumer::new(consumer, fku_probe_id, consumer_id), - ); - } - - /// Remove a consumer appsrc by id - pub fn remove_consumer(&self, consumer_id: &str) { - if let Some(consumer) = self.consumers.lock().unwrap().consumers.remove(consumer_id) { - gst::debug!(CAT, "Removed consumer {}", consumer.appsrc.name()); - } else { - gst::debug!(CAT, "Consumer {} not found", consumer_id); - } - } - - /// Stop discarding data samples and start forwarding them to the consumers. - /// - /// This is useful for example for prerolling live sources. - pub fn forward(&self) { - self.consumers.lock().unwrap().discard = false; - } - - /// Get the GStreamer `appsink` wrapped by this producer - pub fn appsink(&self) -> &gst_app::AppSink { - &self.appsink - } -} - -impl<'a> From<&'a gst_app::AppSink> for StreamProducer { - fn from(appsink: &'a gst_app::AppSink) -> Self { - let consumers = Arc::new(Mutex::new(StreamConsumers { - current_latency: None, - latency_updated: false, - consumers: HashMap::new(), - discard: true, - })); - - let consumers_clone = consumers.clone(); - let consumers_clone2 = consumers.clone(); - appsink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample(move |appsink| { - let mut consumers = consumers_clone.lock().unwrap(); - - let sample = match appsink.pull_sample() { - Ok(sample) => sample, - Err(_err) => { - gst::debug!(CAT, "Failed to pull sample"); - return Err(gst::FlowError::Flushing); - } - }; - - if consumers.discard { - return Ok(gst::FlowSuccess::Ok); - } - - gst::trace!(CAT, "processing sample"); - - let latency = consumers.current_latency; - let latency_updated = mem::replace(&mut consumers.latency_updated, false); - let mut requested_keyframe = false; - - let current_consumers = consumers - .consumers - .values() - .map(|c| { - if let Some(latency) = latency { - if c.forwarded_latency - .compare_exchange( - false, - true, - atomic::Ordering::SeqCst, - atomic::Ordering::SeqCst, - ) - .is_ok() - || latency_updated - { - c.appsrc.set_latency(latency, gst::ClockTime::NONE); - } - } - - if c.first_buffer - .compare_exchange( - true, - false, - atomic::Ordering::SeqCst, - atomic::Ordering::SeqCst, - ) - .is_ok() - && !requested_keyframe - { - gst::debug!(CAT, "Requesting keyframe for first buffer"); - appsink.send_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); - requested_keyframe = true; - } - - c.appsrc.clone() - }) - .collect::>(); - drop(consumers); - - for consumer in current_consumers { - if let Err(err) = consumer.push_sample(&sample) { - gst::warning!(CAT, "Failed to push sample: {}", err); - } - } - - Ok(gst::FlowSuccess::Ok) - }) - .eos(move |_| { - let current_consumers = consumers_clone2 - .lock() - .unwrap() - .consumers - .values() - .map(|c| c.appsrc.clone()) - .collect::>(); - - for consumer in current_consumers { - let _ = consumer.end_of_stream(); - } - }) - .build(), - ); - - let consumers_clone = consumers.clone(); - let sinkpad = appsink.static_pad("sink").unwrap(); - sinkpad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - use gst::EventView; - - if let EventView::Latency(ev) = ev.view() { - if pad.parent().is_some() { - let latency = ev.latency(); - let mut consumers = consumers_clone.lock().unwrap(); - consumers.current_latency = Some(latency); - consumers.latency_updated = true; - } - } - } - gst::PadProbeReturn::Ok - }); - - StreamProducer { - appsink: appsink.clone(), - consumers, - } - } -} - -/// Wrapper around a HashMap of consumers, exists for thread safety -/// and also protects some of the producer state -#[derive(Debug)] -struct StreamConsumers { - /// The currently-observed latency - current_latency: Option, - /// Whether the consumers' appsrc latency needs updating - latency_updated: bool, - /// The consumers, link id -> consumer - consumers: HashMap, - /// Whether appsrc samples should be forwarded to consumers yet - discard: bool, -} - -/// Wrapper around a consumer's `appsrc` -#[derive(Debug)] -struct StreamConsumer { - /// The GStreamer `appsrc` of the consumer - appsrc: gst_app::AppSrc, - /// The id of a pad probe that intercepts force-key-unit events - fku_probe_id: Option, - /// Whether an initial latency was forwarded to the `appsrc` - forwarded_latency: atomic::AtomicBool, - /// Whether a first buffer has made it through, used to determine - /// whether a new key unit should be requested. Only useful for encoded - /// streams. - first_buffer: atomic::AtomicBool, -} - -impl StreamConsumer { - /// Create a new consumer - fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId, consumer_id: &str) -> Self { - let consumer_id = consumer_id.to_string(); - appsrc.set_callbacks( - gst_app::AppSrcCallbacks::builder() - .enough_data(move |_appsrc| { - gst::debug!( - CAT, - "consumer {} is not consuming fast enough, old samples are getting dropped", - consumer_id - ); - }) - .build(), - ); - - StreamConsumer { - appsrc: appsrc.clone(), - fku_probe_id: Some(fku_probe_id), - forwarded_latency: atomic::AtomicBool::new(false), - first_buffer: atomic::AtomicBool::new(true), - } - } -} - -impl Drop for StreamConsumer { - fn drop(&mut self) { - if let Some(fku_probe_id) = self.fku_probe_id.take() { - let srcpad = self.appsrc.static_pad("src").unwrap(); - srcpad.remove_probe(fku_probe_id); - } - } -} - -impl PartialEq for StreamConsumer { - fn eq(&self, other: &Self) -> bool { - self.appsrc.eq(&other.appsrc) - } -} - -impl Eq for StreamConsumer {} - -impl std::hash::Hash for StreamConsumer { - fn hash(&self, state: &mut H) { - std::hash::Hash::hash(&self.appsrc, state); - } -} - -impl std::borrow::Borrow for StreamConsumer { - fn borrow(&self) -> &gst_app::AppSrc { - &self.appsrc - } -}