From f3bba21faa6baf995b099f32f1fc7e6935d8abc2 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Fri, 11 Mar 2022 10:03:03 -0300 Subject: [PATCH] Introduce StreamProducer Introduce a new `gstreamer-utils` crate where we implement a `StreamProducer` structure that allows "producing" pipeline (Producers) pushing their data to 0 or several "consuming" pipelines. The Producer needs to push their data to an `appsink` which the `StreamProducer` controls and the "consumer" pipelines need to have an AppSrc which the same StreamProducer controls. It allows similare behavior as a `tee` element but with a simpler to handle addition and removal of "consumers" as well as a total decoupling of the various consumer, both between each others and the producer pipeline. This has simply been extracted from [webrtcsink](https://github.com/centricular/webrtcsink/blob/main/plugins/src/webrtcsink/utils.rs) --- Cargo.toml | 1 + gstreamer-utils/CHANGELOG.md | 1 + gstreamer-utils/COPYRIGHT | 1 + gstreamer-utils/Cargo.toml | 33 ++ gstreamer-utils/LICENSE-APACHE | 1 + gstreamer-utils/LICENSE-MIT | 1 + gstreamer-utils/src/lib.rs | 3 + gstreamer-utils/src/streamproducer.rs | 512 ++++++++++++++++++++++++++ 8 files changed, 553 insertions(+) create mode 120000 gstreamer-utils/CHANGELOG.md create mode 120000 gstreamer-utils/COPYRIGHT create mode 100644 gstreamer-utils/Cargo.toml create mode 120000 gstreamer-utils/LICENSE-APACHE create mode 120000 gstreamer-utils/LICENSE-MIT create mode 100644 gstreamer-utils/src/lib.rs create mode 100644 gstreamer-utils/src/streamproducer.rs diff --git a/Cargo.toml b/Cargo.toml index 8f47e109a..582dd5c9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "gstreamer-video", "gstreamer-webrtc", "gstreamer-allocators", + "gstreamer-utils", "examples", "tutorials", ] diff --git a/gstreamer-utils/CHANGELOG.md b/gstreamer-utils/CHANGELOG.md new file mode 120000 index 000000000..670919a56 --- /dev/null +++ b/gstreamer-utils/CHANGELOG.md @@ -0,0 +1 @@ +../gstreamer/CHANGELOG.md \ No newline at end of file diff --git a/gstreamer-utils/COPYRIGHT b/gstreamer-utils/COPYRIGHT new file mode 120000 index 000000000..dc5f40a22 --- /dev/null +++ b/gstreamer-utils/COPYRIGHT @@ -0,0 +1 @@ +../COPYRIGHT \ No newline at end of file diff --git a/gstreamer-utils/Cargo.toml b/gstreamer-utils/Cargo.toml new file mode 100644 index 000000000..abd6f0d76 --- /dev/null +++ b/gstreamer-utils/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "gstreamer-utils" +version = "0.19.0" +authors = ["Mathieu Duponchelle ", "Thibault Saunier "] +categories = ["multimedia"] +description = "Exposes an object to build several Gst pipeline with one producer and several consumer" +repository = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" +license = "MIT/Apache-2.0" +readme = "README.md" +homepage = "https://gstreamer.freedesktop.org" +documentation = "https://gstreamer.pages.freedesktop.org/gstreamer-rs/stable/latest/docs/gstreamer_utils/" +keywords = ["gstreamer", "multimedia", "audio", "video", "gnome"] +edition = "2021" +rust-version = "1.57" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +gst = { package = "gstreamer", path = "../gstreamer" } +gst_app = { package = "gstreamer-app", path = "../gstreamer-app" } +gst_video = { package = "gstreamer-video", path = "../gstreamer-video" } +once_cell = "1" + +[dev-dependencies] +futures = { version = "0.3", features = ["executor"] } +gst_app = { package = "gstreamer-app", path = "../gstreamer-app" } + +[features] +default = [] +v1_16 = [] +v1_18 = ['v1_16'] +v1_20 = ['v1_18'] +v1_22 = ['v1_20'] diff --git a/gstreamer-utils/LICENSE-APACHE b/gstreamer-utils/LICENSE-APACHE new file mode 120000 index 000000000..965b606f3 --- /dev/null +++ b/gstreamer-utils/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/gstreamer-utils/LICENSE-MIT b/gstreamer-utils/LICENSE-MIT new file mode 120000 index 000000000..76219eb72 --- /dev/null +++ b/gstreamer-utils/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/gstreamer-utils/src/lib.rs b/gstreamer-utils/src/lib.rs new file mode 100644 index 000000000..fe94a8ebd --- /dev/null +++ b/gstreamer-utils/src/lib.rs @@ -0,0 +1,3 @@ +mod streamproducer; + +pub use crate::streamproducer::StreamProducer; diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs new file mode 100644 index 000000000..2eed66239 --- /dev/null +++ b/gstreamer-utils/src/streamproducer.rs @@ -0,0 +1,512 @@ +use std::collections::HashMap; +use std::mem; +use std::sync::{atomic, Arc, Mutex}; + +use once_cell::sync::Lazy; + +use gst::{glib, prelude::*}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "utilsrs-stream-producer", + gst::DebugColorFlags::empty(), + Some("gst_app Stream Producer interface"), + ) +}); + +/// The interface for transporting media data from one node +/// to another. +/// +/// A producer is essentially a GStreamer `appsink` whose output +/// is sent to a set of consumers, who are essentially `appsrc` wrappers +#[derive(Debug, Clone)] +pub struct StreamProducer { + /// The appsink to dispatch data for + appsink: gst_app::AppSink, + /// The consumers to dispatch data to + consumers: Arc>, +} + +impl PartialEq for StreamProducer { + fn eq(&self, other: &Self) -> bool { + self.appsink.eq(&other.appsink) + } +} + +impl Eq for StreamProducer {} + +impl StreamProducer { + /// Add an appsrc to dispatch data to + pub fn add_consumer(&self, consumer: &gst_app::AppSrc) { + let mut consumers = self.consumers.lock().unwrap(); + if consumers + .consumers + .get(&(consumer.as_ptr() as usize)) + .is_some() + { + gst::error!(CAT, obj: &self.appsink, "Consumer already added"); + return; + } + + gst::debug!(CAT, obj: &self.appsink, "Adding consumer"); + + consumer.set_property("max-buffers", 0u64); + consumer.set_property("max-bytes", 0u64); + consumer.set_property("max-time", gst::ClockTime::from_mseconds(500)); + consumer.set_property("format", gst::Format::Time); + consumer.set_property_from_str("leaky-type", "downstream"); + consumer + .try_set_property("handle-segment-change", true) + .ok(); + consumer.set_automatic_eos(false); + + // Forward force-keyunit events upstream to the appsink + let srcpad = consumer.static_pad("src").unwrap(); + let appsink = &self.appsink; + let fku_probe_id = srcpad + .add_probe( + gst::PadProbeType::EVENT_UPSTREAM, + glib::clone!(@weak appsink, @weak consumer => @default-panic, move |_pad, info| { + if let Some(gst::PadProbeData::Event(ref ev)) = info.data { + if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() { + gst::debug!(CAT, obj: &appsink, "Requesting keyframe"); + let _ = appsink.send_event(ev.clone()); + } + } + + gst::PadProbeReturn::Ok + }), + ) + .unwrap(); + + consumers.consumers.insert( + consumer.as_ptr() as usize, + StreamConsumer::new(consumer, fku_probe_id), + ); + } + + /// Remove a consumer appsrc by id + pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) { + let name = consumer.name(); + if self + .consumers + .lock() + .unwrap() + .consumers + .remove(&(consumer.as_ptr() as usize)) + .is_some() + { + gst::debug!(CAT, obj: &self.appsink, "Removed consumer {}", name); + } else { + gst::debug!(CAT, obj: &self.appsink, "Consumer {} not found", name); + } + } + + /// Stop discarding data samples and start forwarding them to the consumers. + /// + /// This is useful for example for prerolling live sources. + pub fn forward(&self) { + self.consumers.lock().unwrap().discard = false; + } + + /// Get the GStreamer `appsink` wrapped by this producer + pub fn appsink(&self) -> &gst_app::AppSink { + &self.appsink + } +} + +impl<'a> From<&'a gst_app::AppSink> for StreamProducer { + fn from(appsink: &'a gst_app::AppSink) -> Self { + let consumers = Arc::new(Mutex::new(StreamConsumers { + current_latency: None, + latency_updated: false, + consumers: HashMap::new(), + discard: true, + })); + + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(glib::clone!(@strong consumers => move |appsink| { + let mut consumers = consumers.lock().unwrap(); + + let sample = match appsink.pull_sample() { + Ok(sample) => sample, + Err(_err) => { + gst::debug!(CAT, obj: appsink, "Failed to pull sample"); + return Err(gst::FlowError::Flushing); + } + }; + + if consumers.discard { + return Ok(gst::FlowSuccess::Ok); + } + + let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() { + let flags = buf.flags(); + + (flags.contains(gst::BufferFlags::DISCONT), + !flags.contains(gst::BufferFlags::DELTA_UNIT)) + } else { + (false, true) + }; + + gst::trace!(CAT, obj: appsink, "processing sample"); + + let latency = consumers.current_latency; + let latency_updated = mem::replace(&mut consumers.latency_updated, false); + let mut requested_keyframe = false; + + let current_consumers = consumers + .consumers + .values() + .filter_map(|consumer| { + if let Some(latency) = latency { + if consumer.forwarded_latency + .compare_exchange( + false, + true, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) + .is_ok() + || latency_updated + { + consumer.appsrc.set_latency(latency, gst::ClockTime::NONE); + } + } + + if is_discont { + // Whenever we have a discontinuity, we need a new keyframe + consumer.needs_keyframe.store( + true, + atomic::Ordering::SeqCst, + ); + } + + if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) + { + // If we need a keyframe (and this one isn't) request a keyframe upstream + if !requested_keyframe { + gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer"); + + appsink.send_event( + gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(), + ); + requested_keyframe = true; + } + + gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe", + consumer.appsrc.name()); + None + } else { + consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst); + + Some(consumer.appsrc.clone()) + } + }) + .collect::>(); + drop(consumers); + + for consumer in current_consumers { + if let Err(err) = consumer.push_sample(&sample) { + gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err); + } + } + + Ok(gst::FlowSuccess::Ok) + })) + .eos(glib::clone!(@strong consumers => move |_| { + let current_consumers = consumers + .lock() + .unwrap() + .consumers + .values() + .map(|c| c.appsrc.clone()) + .collect::>(); + + for consumer in current_consumers { + let _ = consumer.end_of_stream(); + } + })) + .build(), + ); + + let sinkpad = appsink.static_pad("sink").unwrap(); + sinkpad.add_probe( + gst::PadProbeType::EVENT_UPSTREAM, + glib::clone!(@strong consumers => move |_pad, info| { + if let Some(gst::PadProbeData::Event(ref ev)) = info.data { + if let gst::EventView::Latency(ev) = ev.view() { + let latency = ev.latency(); + let mut consumers = consumers.lock().unwrap(); + consumers.current_latency = Some(latency); + consumers.latency_updated = true; + } + } + gst::PadProbeReturn::Ok + }), + ); + + StreamProducer { + appsink: appsink.clone(), + consumers, + } + } +} + +/// Wrapper around a HashMap of consumers, exists for thread safety +/// and also protects some of the producer state +#[derive(Debug)] +struct StreamConsumers { + /// The currently-observed latency + current_latency: Option, + /// Whether the consumers' appsrc latency needs updating + latency_updated: bool, + /// The consumers, AppSrc pointer value -> consumer + consumers: HashMap, + /// Whether appsrc samples should be forwarded to consumers yet + discard: bool, +} + +/// Wrapper around a consumer's `appsrc` +#[derive(Debug)] +struct StreamConsumer { + /// The GStreamer `appsrc` of the consumer + appsrc: gst_app::AppSrc, + /// The id of a pad probe that intercepts force-key-unit events + fku_probe_id: Option, + /// Whether an initial latency was forwarded to the `appsrc` + forwarded_latency: atomic::AtomicBool, + /// Whether a first buffer has made it through, used to determine + /// whether a new key unit should be requested. Only useful for encoded + /// streams. + needs_keyframe: atomic::AtomicBool, +} + +impl StreamConsumer { + /// Create a new consumer + fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { + appsrc.set_callbacks( + gst_app::AppSrcCallbacks::builder() + .enough_data(move |appsrc| { + gst::debug!( + CAT, + obj: appsrc, + "consumer is not consuming fast enough, old samples are getting dropped", + ); + }) + .build(), + ); + + StreamConsumer { + appsrc: appsrc.clone(), + fku_probe_id: Some(fku_probe_id), + forwarded_latency: atomic::AtomicBool::new(false), + needs_keyframe: atomic::AtomicBool::new(true), + } + } +} + +impl Drop for StreamConsumer { + fn drop(&mut self) { + if let Some(fku_probe_id) = self.fku_probe_id.take() { + let srcpad = self.appsrc.static_pad("src").unwrap(); + srcpad.remove_probe(fku_probe_id); + } + } +} + +impl PartialEq for StreamConsumer { + fn eq(&self, other: &Self) -> bool { + self.appsrc.eq(&other.appsrc) + } +} + +impl Eq for StreamConsumer {} + +impl std::hash::Hash for StreamConsumer { + fn hash(&self, state: &mut H) { + std::hash::Hash::hash(&self.appsrc, state); + } +} + +impl std::borrow::Borrow for StreamConsumer { + fn borrow(&self) -> &gst_app::AppSrc { + &self.appsrc + } +} + +#[cfg(test)] +mod tests { + use std::{ + str::FromStr, + sync::{Arc, Mutex}, + }; + + use futures::channel::mpsc; + use futures::{channel::mpsc::Receiver, SinkExt, StreamExt}; + use gst::prelude::*; + + use crate::StreamProducer; + + fn create_producer() -> ( + gst::Pipeline, + gst_app::AppSrc, + gst_app::AppSink, + StreamProducer, + ) { + let producer_pipe = + gst::parse_launch("appsrc name=producer_src ! appsink name=producer_sink") + .unwrap() + .downcast::() + .unwrap(); + let producer_sink = producer_pipe + .by_name("producer_sink") + .unwrap() + .downcast::() + .unwrap(); + + ( + producer_pipe.clone(), + producer_pipe + .by_name("producer_src") + .unwrap() + .downcast::() + .unwrap(), + producer_sink.clone(), + StreamProducer::from(&producer_sink), + ) + } + + struct Consumer { + pipeline: gst::Pipeline, + src: gst_app::AppSrc, + sink: gst_app::AppSink, + receiver: Mutex>, + connected: Mutex, + } + + impl Consumer { + fn new(id: &str) -> Self { + let pipeline = gst::parse_launch(&format!("appsrc name={} ! appsink name=sink", id)) + .unwrap() + .downcast::() + .unwrap(); + + let (sender, receiver) = mpsc::channel::(1000); + let sender = Arc::new(Mutex::new(sender)); + let sink = pipeline + .by_name("sink") + .unwrap() + .downcast::() + .unwrap(); + + sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + // Add a handler to the "new-sample" signal. + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sender_clone = sender.clone(); + futures::executor::block_on( + sender_clone + .lock() + .unwrap() + .send(appsink.pull_sample().unwrap()), + ) + .unwrap(); + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + Self { + pipeline: pipeline.clone(), + src: pipeline + .by_name(id) + .unwrap() + .downcast::() + .unwrap(), + sink, + receiver: Mutex::new(receiver), + connected: Mutex::new(false), + } + } + + fn connect(&self, producer: &StreamProducer) { + { + let mut connected = self.connected.lock().unwrap(); + *connected = true; + } + + producer.add_consumer(&self.src); + } + + fn disconnect(&self, producer: &StreamProducer) { + { + let mut connected = self.connected.lock().unwrap(); + *connected = false; + } + + producer.remove_consumer(&self.src); + } + } + + #[test] + fn simple() { + gst::init().unwrap(); + + let (producer_pipe, producer_src, _producer_sink, producer) = create_producer(); + producer.forward(); + producer_pipe + .set_state(gst::State::Playing) + .expect("Couldn't set producer pipeline state"); + + let mut consumers: Vec = Vec::new(); + let consumer = Consumer::new("consumer1"); + consumer.connect(&producer); + consumer + .pipeline + .set_state(gst::State::Playing) + .expect("Couldn't set producer pipeline state"); + consumers.push(consumer); + + let consumer = Consumer::new("consumer2"); + consumer.connect(&producer); + consumer + .pipeline + .set_state(gst::State::Playing) + .expect("Couldn't set producer pipeline state"); + consumers.push(consumer); + + for i in 0..10 { + let caps = gst::Caps::from_str(&format!("test,n={}", i)).unwrap(); + producer_src.set_caps(Some(&caps)); + producer_src.push_buffer(gst::Buffer::new()).unwrap(); + + for consumer in &consumers { + if *consumer.connected.lock().unwrap() { + let sample = + futures::executor::block_on(consumer.receiver.lock().unwrap().next()) + .expect("Received an empty buffer?"); + sample.buffer().expect("No buffer on the sample?"); + assert_eq!(sample.caps(), Some(caps.as_ref())); + } else { + debug_assert!( + consumer + .sink + .try_pull_sample(gst::ClockTime::from_nseconds(0)) + .is_none(), + "Disconnected consumer got a new sample?!" + ); + } + } + + if i == 5 { + consumers.get(0).unwrap().disconnect(&producer); + } + } + } +}