diff --git a/Cargo.toml b/Cargo.toml index 44eea60f..279f43e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "generic/file", "generic/sodium", "generic/threadshare", + "generic/inter", "mux/flavors", "mux/fmp4", @@ -64,6 +65,7 @@ default-members = [ "audio/lewton", "generic/threadshare", + "generic/inter", "mux/fmp4", "mux/mp4", diff --git a/ci/utils.py b/ci/utils.py index ce56e1da..c43a2843 100644 --- a/ci/utils.py +++ b/ci/utils.py @@ -22,6 +22,7 @@ RS_PREFIXED = [ 'png', 'tracers', 'rtp', + 'inter', ] OVERRIDE = { diff --git a/dependencies.py b/dependencies.py index 9955a5df..0bf3c2e3 100755 --- a/dependencies.py +++ b/dependencies.py @@ -36,6 +36,7 @@ RENAMES = { 'rswebrtc': 'webrtc', 'rspng': 'png', 'rsvideofx': 'videofx', + 'rsinter': 'inter', 'textahead': 'ahead', 'textwrap': 'wrap', } diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 364a2deb..a05532aa 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5369,6 +5369,94 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "rsinter": { + "description": "GStreamer Inter Plugin", + "elements": { + "intersink": { + "author": "Mathieu Duponchelle ", + "description": "Inter Sink", + "hierarchy": [ + "GstInterSink", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy" + ], + "klass": "Generic/Sink", + "pad-templates": { + "sink": { + "caps": "ANY", + "direction": "sink", + "presence": "always" + } + }, + "properties": { + "producer-name": { + "blurb": "Producer Name to use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + }, + "intersrc": { + "author": "Mathieu Duponchelle ", + "description": "Inter Src", + "hierarchy": [ + "GstInterSrc", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy" + ], + "klass": "Generic/Src", + "pad-templates": { + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "producer-name": { + "blurb": "Producer Name to consume from", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstrsinter", + "license": "MPL-2.0", + "other-types": {}, + "package": "gst-plugin-inter", + "source": "gst-plugin-inter", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "rsonvif": { "description": "GStreamer Rust ONVIF Plugin", "elements": { diff --git a/generic/inter/Cargo.toml b/generic/inter/Cargo.toml new file mode 100644 index 00000000..09657542 --- /dev/null +++ b/generic/inter/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "gst-plugin-inter" +version = "0.11.0-alpha.1" +authors = ["Mathieu Duponchelle "] +license = "MPL-2.0" +description = "GStreamer Inter Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2021" +rust-version = "1.66" + +[dependencies] +anyhow = "1" +gst = { package = "gstreamer", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst_utils = { package = "gstreamer-utils", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst_app = { package = "gstreamer-app", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +once_cell = "1.0" + +[dev-dependencies] +pretty_assertions = "1" +gst-check = { package = "gstreamer-check", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +futures = "0.3" +tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] } +tokio-stream = "0.1.11" +serial_test = "2" + +[lib] +name = "gstrsinter" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0" + +[[example]] +name = "plug-and-play" diff --git a/generic/inter/build.rs b/generic/inter/build.rs new file mode 100644 index 00000000..76b2a7c2 --- /dev/null +++ b/generic/inter/build.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: MPL-2.0 + +fn main() { + gst_plugin_version_helper::info() +} diff --git a/generic/inter/examples/basic.rs b/generic/inter/examples/basic.rs new file mode 100644 index 00000000..1afbe8a4 --- /dev/null +++ b/generic/inter/examples/basic.rs @@ -0,0 +1,74 @@ +use anyhow::Error; +use futures::prelude::*; +use futures::stream::select_all; +use gst::prelude::*; + +fn toplevel(obj: &gst::Object) -> gst::Object { + if let Some(parent) = obj.parent() { + toplevel(&parent) + } else { + obj.clone() + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + gst::init()?; + + let src_pipeline = gst::parse_launch("videotestsrc is-live=true ! intersink")?; + let sink_pipeline = gst::parse_launch("intersrc ! videoconvert ! autovideosink")?; + + let mut stream = select_all([ + src_pipeline.bus().unwrap().stream(), + sink_pipeline.bus().unwrap().stream(), + ]); + + let base_time = gst::SystemClock::obtain().time().unwrap(); + + src_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?; + src_pipeline.set_start_time(gst::ClockTime::NONE); + src_pipeline.set_base_time(base_time); + + sink_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?; + sink_pipeline.set_start_time(gst::ClockTime::NONE); + sink_pipeline.set_base_time(base_time); + + src_pipeline.set_state(gst::State::Playing)?; + sink_pipeline.set_state(gst::State::Playing)?; + + while let Some(msg) = stream.next().await { + use gst::MessageView; + + match msg.view() { + MessageView::Latency(..) => { + if let Some(o) = msg.src() { + if let Ok(pipeline) = toplevel(o).downcast::() { + eprintln!("Recalculating latency {:?}", pipeline); + let _ = pipeline.recalculate_latency(); + } + } + } + MessageView::Eos(..) => { + eprintln!("Unexpected EOS"); + break; + } + MessageView::Error(err) => { + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + src_pipeline.set_state(gst::State::Null)?; + sink_pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/generic/inter/examples/plug-and-play.rs b/generic/inter/examples/plug-and-play.rs new file mode 100644 index 00000000..f797e0b4 --- /dev/null +++ b/generic/inter/examples/plug-and-play.rs @@ -0,0 +1,325 @@ +use anyhow::Error; +use futures::prelude::*; +use gst::prelude::*; +use std::collections::HashMap; +use std::io::prelude::*; +use tokio::task; + +struct Producer { + pipeline: gst::Pipeline, + sink: gst::Element, + overlay: gst::Element, +} + +struct Consumer { + pipeline: gst::Pipeline, + src: gst::Element, +} + +fn create_sink_pipeline(producer_name: &str) -> Result { + let pipeline = gst::Pipeline::builder() + .name(format!("producer-{producer_name}")) + .build(); + + let videotestsrc = gst::ElementFactory::make("videotestsrc") + .property_from_str("pattern", "ball") + .property("is-live", true) + .build()?; + let capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst::Caps::builder("video/x-raw") + .field("framerate", gst::Fraction::new(50, 1)) + .build(), + ) + .build()?; + let queue = gst::ElementFactory::make("queue").build()?; + let overlay = gst::ElementFactory::make("textoverlay") + .property("font-desc", "Sans 30") + .property("text", format!("Producer: {producer_name}")) + .property_from_str("valignment", "top") + .build()?; + let timeoverlay = gst::ElementFactory::make("timeoverlay") + .property("font-desc", "Sans 30") + .property_from_str("valignment", "center") + .property_from_str("halignment", "center") + .build()?; + let sink = gst::ElementFactory::make("intersink") + .property("producer-name", producer_name) + .build()?; + + pipeline.add_many([ + &videotestsrc, + &capsfilter, + &queue, + &overlay, + &timeoverlay, + &sink, + ])?; + gst::Element::link_many([ + &videotestsrc, + &capsfilter, + &queue, + &overlay, + &timeoverlay, + &sink, + ])?; + + Ok(Producer { + pipeline, + sink, + overlay, + }) +} + +fn create_src_pipeline(producer_name: &str, consumer_name: &str) -> Result { + let pipeline = gst::Pipeline::builder() + .name(format!("consumer-{consumer_name}")) + .build(); + + let src = gst::ElementFactory::make("intersrc") + .property("producer-name", producer_name) + .build()?; + let queue = gst::ElementFactory::make("queue").build()?; + let vconv = gst::ElementFactory::make("videoconvert").build()?; + let overlay = gst::ElementFactory::make("textoverlay") + .property("font-desc", "Sans 30") + .property("text", format!("Consumer: {consumer_name}")) + .property_from_str("valignment", "bottom") + .build()?; + let vconv2 = gst::ElementFactory::make("videoconvert").build()?; + let sink = gst::ElementFactory::make("autovideosink").build()?; + + pipeline.add_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?; + gst::Element::link_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?; + + Ok(Consumer { pipeline, src }) +} + +fn prompt_on() { + print!("$ "); + let _ = std::io::stdout().flush(); +} + +fn monitor_pipeline(pipeline: &gst::Pipeline, base_time: gst::ClockTime) -> Result<(), Error> { + pipeline.set_clock(Some(&gst::SystemClock::obtain()))?; + pipeline.set_start_time(gst::ClockTime::NONE); + pipeline.set_base_time(base_time); + + pipeline.set_state(gst::State::Playing)?; + + let mut bus_stream = pipeline.bus().expect("Pipeline should have a bus").stream(); + + let pipeline_clone = pipeline.downgrade(); + task::spawn(async move { + while let Some(msg) = bus_stream.next().await { + use gst::MessageView; + + if let Some(pipeline) = pipeline_clone.upgrade() { + match msg.view() { + MessageView::Latency(..) => { + let _ = pipeline.recalculate_latency(); + } + MessageView::Eos(..) => { + println!( + "EOS from {}", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()) + ); + prompt_on(); + break; + } + MessageView::Error(err) => { + let _ = pipeline.set_state(gst::State::Null); + println!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + prompt_on(); + break; + } + MessageView::StateChanged(sc) => { + if msg.src() == Some(pipeline.upcast_ref()) { + gst::debug_bin_to_dot_file( + pipeline.upcast_ref::(), + gst::DebugGraphDetails::all(), + format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()), + ); + } + } + _ => (), + } + } else { + break; + } + } + }); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + gst::init()?; + + println!("h for help"); + + let base_time = gst::SystemClock::obtain().time().unwrap(); + + let mut producers: HashMap = HashMap::new(); + let mut consumers: HashMap = HashMap::new(); + + let mut stdin = std::io::stdin().lock(); + loop { + let mut buf = String::new(); + + prompt_on(); + + match stdin.read_line(&mut buf)? { + 0 => { + eprintln!("EOF!"); + break; + } + _ => { + let command: Vec<_> = buf.split_whitespace().collect(); + + match command.first() { + Some(&"ap") => { + if command.len() != 2 { + println!("ap : Add a producer"); + } else { + let producer_name = command.get(1).unwrap().to_string(); + + if producers.contains_key(&producer_name) { + println!("Producer with name {producer_name} already exists!"); + continue; + } + + let producer = create_sink_pipeline(&producer_name)?; + monitor_pipeline(&producer.pipeline, base_time)?; + + println!("Added producer with name {producer_name}"); + + producers.insert(producer_name, producer); + } + } + Some(&"ac") => { + if command.len() != 3 { + println!("ac : Add a consumer"); + } else { + let consumer_name = command.get(1).unwrap().to_string(); + let producer_name = command.get(2).unwrap().to_string(); + + if consumers.contains_key(&consumer_name) { + println!("Consumer with name {consumer_name} already exists!"); + continue; + } + + let consumer = create_src_pipeline(&producer_name, &consumer_name)?; + monitor_pipeline(&consumer.pipeline, base_time)?; + + println!("Added consumer with name {consumer_name} and producer name {producer_name}"); + + consumers.insert(consumer_name, consumer); + } + } + Some(&"rp") => { + if command.len() != 2 { + println!("rp : Remove a producer"); + } else { + let producer_name = command.get(1).unwrap().to_string(); + if let Some(producer) = producers.remove(&producer_name) { + let _ = producer.pipeline.set_state(gst::State::Null); + println!("Removed producer with name {producer_name}"); + } else { + println!("No producer with name {producer_name}"); + } + } + } + Some(&"rc") => { + if command.len() != 2 { + println!("rc : Remove a consumer"); + } else { + let consumer_name = command.get(1).unwrap().to_string(); + if let Some(consumer) = consumers.remove(&consumer_name) { + let _ = consumer.pipeline.set_state(gst::State::Null); + println!("Removed consumer with name {consumer_name}"); + } else { + println!("No consumer with name {consumer_name}"); + } + } + } + Some(&"cnp") => { + if command.len() != 3 { + println!("cnp : Change the name of a producer"); + } else { + let old_producer_name = command.get(1).unwrap().to_string(); + let producer_name = command.get(2).unwrap().to_string(); + + if producers.contains_key(&producer_name) { + println!("Producer with name {producer_name} already exists!"); + continue; + } + + if let Some(producer) = producers.remove(&old_producer_name) { + producer.sink.set_property("producer-name", &producer_name); + producer + .overlay + .set_property("text", format!("Producer: {producer_name}")); + println!( + "Changed producer name {old_producer_name} -> {producer_name}" + ); + producers.insert(producer_name, producer); + } else { + println!("No producer with name {old_producer_name}"); + } + } + } + Some(&"cpn") => { + if command.len() != 3 { + println!("cpn : Change the producer name for a consumer"); + } else { + let consumer_name = command.get(1).unwrap().to_string(); + let producer_name = command.get(2).unwrap().to_string(); + + if let Some(consumer) = consumers.get_mut(&consumer_name) { + consumer.src.set_property("producer-name", &producer_name); + println!("Changed producer name for consumer {consumer_name} to {producer_name}"); + } else { + println!("No consumer with name {consumer_name}"); + } + } + } + Some(&"h") => { + println!("h: show this help"); + println!("ap : Add a producer"); + println!("ac : Add a consumer"); + println!("rp : Remove a producer"); + println!("rc : Remove a consumer"); + println!("cnp : Change the name of a producer"); + println!("cpn : Change the producer name for a consumer"); + } + _ => { + println!("Unknown command"); + } + } + } + } + buf.clear(); + } + + for (_, producer) in producers { + let _ = producer.pipeline.set_state(gst::State::Null); + } + + for (_, consumer) in consumers { + let _ = consumer.pipeline.set_state(gst::State::Null); + } + + Ok(()) +} diff --git a/generic/inter/src/lib.rs b/generic/inter/src/lib.rs new file mode 100644 index 00000000..93ed1c11 --- /dev/null +++ b/generic/inter/src/lib.rs @@ -0,0 +1,44 @@ +// Copyright (C) 2023 Mathieu Duponchelle +// +// Take a look at the license at the top of the repository in the LICENSE file. +#![allow(unused_doc_comments)] + +//! GStreamer elements for connecting pipelines in the same process + +mod sink; +mod src; +mod streamproducer; +/** + * plugin-rsinter: + * @title: Rust inter elements + * @short_description: A set of elements for transferring data between pipelines + * + * This plugin exposes two elements, `intersink` and `intersrc`, that can be + * used to transfer data from one pipeline to multiple others in the same + * process. + * + * The elements are implemented using the `StreamProducer` API from + * gstreamer-utils. + * + * Since: plugins-rs-0.11.0 + */ +use gst::glib; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + sink::register(plugin)?; + src::register(plugin)?; + + Ok(()) +} + +gst::plugin_define!( + rsinter, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MPL-2.0", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/generic/inter/src/sink/imp.rs b/generic/inter/src/sink/imp.rs new file mode 100644 index 00000000..25e2fb3a --- /dev/null +++ b/generic/inter/src/sink/imp.rs @@ -0,0 +1,217 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::streamproducer::InterStreamProducer; +use anyhow::Error; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +const DEFAULT_PRODUCER_NAME: &str = "default"; + +#[derive(Debug)] +struct Settings { + producer_name: String, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + producer_name: DEFAULT_PRODUCER_NAME.to_string(), + } + } +} + +struct State { + appsink: gst_app::AppSink, + sinkpad: gst::GhostPad, +} + +/* Locking order is field order */ +pub struct InterSink { + settings: Mutex, + state: Mutex, +} + +impl InterSink { + fn prepare(&self) -> Result<(), Error> { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + + InterStreamProducer::acquire(&settings.producer_name, &state.appsink)?; + + Ok(()) + } + + fn unprepare(&self) { + let settings = self.settings.lock().unwrap(); + InterStreamProducer::release(&settings.producer_name); + } +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "intersink", + gst::DebugColorFlags::empty(), + Some("Inter Sink"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for InterSink { + const NAME: &'static str = "GstInterSink"; + type Type = super::InterSink; + type ParentType = gst::Bin; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::GhostPad::from_template(&templ); + + Self { + settings: Mutex::new(Default::default()), + state: Mutex::new(State { + appsink: gst_app::AppSink::builder().name("appsink").build(), + sinkpad: sinkpad.upcast(), + }), + } + } +} + +impl ObjectImpl for InterSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecString::builder("producer-name") + .nick("Producer Name") + .blurb("Producer Name to use") + .doc_show_default() + .mutable_playing() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "producer-name" => { + let mut settings = self.settings.lock().unwrap(); + let old_producer_name = settings.producer_name.clone(); + settings.producer_name = value + .get::() + .unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string()); + + if let Some(appsink) = InterStreamProducer::release(&old_producer_name) { + if let Err(err) = + InterStreamProducer::acquire(&settings.producer_name, &appsink) + { + drop(settings); + gst::error!(CAT, imp: self, "{err}"); + self.post_error_message(gst::error_msg!( + gst::StreamError::Failed, + ["{err}"] + )) + } else { + drop(settings); + // This is required because StreamProducer obtains the latency + // it needs to forward from Latency events, and we need to let the + // application know it should recalculate latency to get the event + // to travel upstream again + self.post_message(gst::message::Latency::new()); + } + } + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "producer-name" => { + let settings = self.settings.lock().unwrap(); + settings.producer_name.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self) { + self.parent_constructed(); + let obj = self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SINK); + + let state = self.state.lock().unwrap(); + obj.add(&state.appsink).unwrap(); + obj.add_pad(&state.sinkpad).unwrap(); + state + .sinkpad + .set_target(Some(&state.appsink.static_pad("sink").unwrap())) + .unwrap(); + } +} + +impl GstObjectImpl for InterSink {} + +impl ElementImpl for InterSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Inter Sink", + "Generic/Sink", + "Inter Sink", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + + if transition == gst::StateChange::ReadyToPaused { + if let Err(err) = self.prepare() { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["Failed to prepare: {}", err] + ); + return Err(gst::StateChangeError); + } + } + + let ret = self.parent_change_state(transition)?; + + if transition == gst::StateChange::PausedToReady { + self.unprepare(); + } + + Ok(ret) + } +} + +impl BinImpl for InterSink {} diff --git a/generic/inter/src/sink/mod.rs b/generic/inter/src/sink/mod.rs new file mode 100644 index 00000000..e71f7cf3 --- /dev/null +++ b/generic/inter/src/sink/mod.rs @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: MPL-2.0 + +use glib::prelude::*; +use gst::glib; + +mod imp; + +/** + * SECTION:element-intersink + * + * #intersink is an element that can be used to produce data for + * multiple #intersrc elements to consume. + * + * You can access the underlying appsink element through the static name + * "appsink". + * + * #intersink should not reside in the same pipeline as the #intersrc + * that consumes from it, here is an example of how to use those elements + * in separate pipelines: + * + * {{ generic/inter/examples/basic.rs }} + */ + +glib::wrapper! { + pub struct InterSink(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "intersink", + gst::Rank::None, + InterSink::static_type(), + ) +} diff --git a/generic/inter/src/src/imp.rs b/generic/inter/src/src/imp.rs new file mode 100644 index 00000000..211b9cba --- /dev/null +++ b/generic/inter/src/src/imp.rs @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::streamproducer::InterStreamProducer; +use anyhow::Error; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +const DEFAULT_PRODUCER_NAME: &str = "default"; + +#[derive(Debug)] +struct Settings { + producer_name: String, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + producer_name: DEFAULT_PRODUCER_NAME.to_string(), + } + } +} + +struct State { + srcpad: gst::GhostPad, + appsrc: gst_app::AppSrc, +} + +/* Locking order is field order */ +pub struct InterSrc { + settings: Mutex, + state: Mutex, +} + +impl InterSrc { + fn prepare(&self) -> Result<(), Error> { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + + InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc); + + Ok(()) + } + + fn unprepare(&self) { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + + InterStreamProducer::unsubscribe(&settings.producer_name, &state.appsrc); + } +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new("intersrc", gst::DebugColorFlags::empty(), Some("Inter Src")) +}); + +#[glib::object_subclass] +impl ObjectSubclass for InterSrc { + const NAME: &'static str = "GstInterSrc"; + + type Type = super::InterSrc; + type ParentType = gst::Bin; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::GhostPad::from_template(&templ); + + Self { + settings: Mutex::new(Default::default()), + state: Mutex::new(State { + srcpad: srcpad.upcast(), + appsrc: gst_app::AppSrc::builder().name("appsrc").build(), + }), + } + } +} + +impl ObjectImpl for InterSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecString::builder("producer-name") + .nick("Producer Name") + .blurb("Producer Name to consume from") + .doc_show_default() + .mutable_playing() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "producer-name" => { + let mut settings = self.settings.lock().unwrap(); + let old_producer_name = settings.producer_name.clone(); + settings.producer_name = value + .get::() + .unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string()); + + let state = self.state.lock().unwrap(); + + if InterStreamProducer::unsubscribe(&old_producer_name, &state.appsrc) { + InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc); + } + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "producer-name" => { + let settings = self.settings.lock().unwrap(); + settings.producer_name.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self) { + self.parent_constructed(); + let obj = self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + + let state = self.state.lock().unwrap(); + gst_utils::StreamProducer::configure_consumer(&state.appsrc); + obj.add(&state.appsrc).unwrap(); + obj.add_pad(&state.srcpad).unwrap(); + state + .srcpad + .set_target(Some(&state.appsrc.static_pad("src").unwrap())) + .unwrap(); + } +} + +impl GstObjectImpl for InterSrc {} + +impl ElementImpl for InterSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Inter Src", + "Generic/Src", + "Inter Src", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + + if transition == gst::StateChange::ReadyToPaused { + if let Err(err) = self.prepare() { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["Failed to prepare: {}", err] + ); + return Err(gst::StateChangeError); + } + } + + let ret = self.parent_change_state(transition)?; + + if transition == gst::StateChange::PausedToReady { + self.unprepare(); + } + + Ok(ret) + } +} + +impl BinImpl for InterSrc {} diff --git a/generic/inter/src/src/mod.rs b/generic/inter/src/src/mod.rs new file mode 100644 index 00000000..4d3f2beb --- /dev/null +++ b/generic/inter/src/src/mod.rs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: MPL-2.0 + +use glib::prelude::*; +use gst::glib; + +mod imp; + +/** + * SECTION:element-intersrc + * + * #intersrc is an element that can be used to consume data from an #intersink. + * + * You can access the underlying appsrc element through the static name + * "appsrc". + * + * #intersrc should not reside in the same pipeline as the #intersink + * that it consumes from, here is an example of how to use those elements + * in separate pipelines: + * + * {{ generic/inter/examples/basic.rs }} + */ + +glib::wrapper! { + pub struct InterSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "intersrc", + gst::Rank::None, + InterSrc::static_type(), + ) +} diff --git a/generic/inter/src/streamproducer/mod.rs b/generic/inter/src/streamproducer/mod.rs new file mode 100644 index 00000000..fc83e1a0 --- /dev/null +++ b/generic/inter/src/streamproducer/mod.rs @@ -0,0 +1,159 @@ +use gst::prelude::*; +use std::collections::{HashMap, HashSet}; +use std::sync::Mutex; + +use anyhow::{anyhow, Error}; +use once_cell::sync::Lazy; + +pub enum InterStreamProducer { + Pending { + consumers: HashSet, + }, + Active { + producer: gst_utils::StreamProducer, + links: HashMap, + }, +} + +static PRODUCERS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +fn toplevel(obj: &gst::Object) -> gst::Object { + if let Some(parent) = obj.parent() { + toplevel(&parent) + } else { + obj.clone() + } +} + +fn ensure_different_toplevel(producer: &gst_app::AppSink, consumer: &gst_app::AppSrc) { + let top_a = toplevel(producer.upcast_ref()); + let top_b = toplevel(consumer.upcast_ref()); + + if top_a == top_b { + gst::glib::g_critical!( + "gstrsinter", + "Intersink with appsink {} should not share the same toplevel bin \ + as intersrc with appsrc {}, this results in loops in latency calculation", + producer.name(), + consumer.name() + ); + } +} + +impl InterStreamProducer { + pub fn acquire( + name: &str, + appsink: &gst_app::AppSink, + ) -> Result { + let mut producers = PRODUCERS.lock().unwrap(); + + if let Some(producer) = producers.remove(name) { + match producer { + InterStreamProducer::Pending { consumers } => { + let producer = gst_utils::StreamProducer::from(appsink); + let mut links = HashMap::new(); + + for consumer in consumers { + ensure_different_toplevel(appsink, &consumer); + + let link = producer + .add_consumer(&consumer) + .expect("consumer should not have already been added"); + links.insert(consumer, link); + } + + producers.insert( + name.to_string(), + InterStreamProducer::Active { + producer: producer.clone(), + links, + }, + ); + + Ok(producer) + } + InterStreamProducer::Active { .. } => { + producers.insert(name.to_string(), producer); + + Err(anyhow!( + "An active producer already exists with name {}", + name + )) + } + } + } else { + let producer = gst_utils::StreamProducer::from(appsink); + + producers.insert( + name.to_string(), + InterStreamProducer::Active { + producer: producer.clone(), + links: HashMap::new(), + }, + ); + + Ok(producer) + } + } + + pub fn release(name: &str) -> Option { + let mut producers = PRODUCERS.lock().unwrap(); + + if let Some(producer) = producers.remove(name) { + match producer { + InterStreamProducer::Pending { .. } => None, + InterStreamProducer::Active { links, producer } => { + producers.insert( + name.to_string(), + InterStreamProducer::Pending { + consumers: links.into_keys().collect(), + }, + ); + + Some(producer.appsink().clone()) + } + } + } else { + None + } + } + + pub fn subscribe(name: &str, consumer: &gst_app::AppSrc) { + let mut producers = PRODUCERS.lock().unwrap(); + + if let Some(producer) = producers.get_mut(name) { + match producer { + InterStreamProducer::Pending { consumers } => { + consumers.insert(consumer.clone()); + } + InterStreamProducer::Active { producer, links } => { + ensure_different_toplevel(producer.appsink(), consumer); + + let link = producer + .add_consumer(consumer) + .expect("consumer should not already have been added"); + links.insert(consumer.clone(), link); + } + } + } else { + let producer = InterStreamProducer::Pending { + consumers: [consumer.clone()].into(), + }; + producers.insert(name.to_string(), producer); + } + } + + pub fn unsubscribe(name: &str, consumer: &gst_app::AppSrc) -> bool { + let mut producers = PRODUCERS.lock().unwrap(); + + if let Some(producer) = producers.get_mut(name) { + match producer { + InterStreamProducer::Pending { consumers } => consumers.remove(consumer), + InterStreamProducer::Active { links, .. } => links.remove(consumer).is_some(), + } + } else { + false + } + } +} diff --git a/generic/inter/tests/inter.rs b/generic/inter/tests/inter.rs new file mode 100644 index 00000000..bc7e746f --- /dev/null +++ b/generic/inter/tests/inter.rs @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; +use serial_test::serial; + +use pretty_assertions::assert_eq; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrsinter::plugin_register_static().unwrap(); + }); +} + +fn start_consumer(producer_name: &str) -> gst_check::Harness { + let mut hc = gst_check::Harness::new("intersrc"); + + hc.element() + .unwrap() + .set_property("producer-name", producer_name); + hc.play(); + + hc +} + +fn start_producer(producer_name: &str) -> (gst::Pad, gst::Element) { + let element = gst::ElementFactory::make("intersink").build().unwrap(); + + element.set_property("producer-name", producer_name); + element.set_state(gst::State::Playing).unwrap(); + + let sinkpad = element.static_pad("sink").unwrap(); + let srcpad = gst::Pad::new(gst::PadDirection::Src); + srcpad.set_active(true).unwrap(); + srcpad.link(&sinkpad).unwrap(); + + srcpad.push_event(gst::event::StreamStart::builder("foo").build()); + srcpad + .push_event(gst::event::Caps::builder(&gst::Caps::builder("video/x-raw").build()).build()); + srcpad.push_event( + gst::event::Segment::builder(&gst::FormattedSegment::::new()).build(), + ); + + (srcpad, element) +} + +fn push_one(srcpad: &gst::Pad, pts: gst::ClockTime) { + let mut inbuf = gst::Buffer::with_size(1).unwrap(); + + { + let buf = inbuf.get_mut().unwrap(); + buf.set_pts(pts); + } + + srcpad.push(inbuf).unwrap(); +} + +#[test] +#[serial] +fn test_forward_one_buffer() { + init(); + + let mut hc = start_consumer("p1"); + let (srcpad, element) = start_producer("p1"); + + push_one(&srcpad, gst::ClockTime::from_nseconds(1)); + + let outbuf = hc.pull().unwrap(); + + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1))); + + element.set_state(gst::State::Null).unwrap(); +} + +#[test] +#[serial] +fn test_change_name_of_producer() { + init(); + + let mut hc1 = start_consumer("p1"); + let mut hc2 = start_consumer("p2"); + let (srcpad, element) = start_producer("p1"); + + /* Once this returns, the buffer should have been dispatched only to hc1 */ + push_one(&srcpad, gst::ClockTime::from_nseconds(1)); + let outbuf = hc1.pull().unwrap(); + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1))); + + element.set_property("producer-name", "p2"); + + /* This should only get dispatched to hc2, and it should be its first buffer */ + push_one(&srcpad, gst::ClockTime::from_nseconds(2)); + let outbuf = hc2.pull().unwrap(); + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2))); + + element.set_property("producer-name", "p1"); + + /* Back to hc1, which should not see the buffer we pushed in the previous step */ + push_one(&srcpad, gst::ClockTime::from_nseconds(3)); + let outbuf = hc1.pull().unwrap(); + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(3))); + + element.set_state(gst::State::Null).unwrap(); +} + +#[test] +#[serial] +fn test_change_producer_name() { + init(); + + let mut hc = start_consumer("p1"); + let (srcpad1, element1) = start_producer("p1"); + let (srcpad2, element2) = start_producer("p2"); + + /* This buffer should be dispatched to no consumer */ + push_one(&srcpad2, gst::ClockTime::from_nseconds(1)); + + /* This one should be dispatched to hc, and it should be its first buffer */ + push_one(&srcpad1, gst::ClockTime::from_nseconds(2)); + let outbuf = hc.pull().unwrap(); + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2))); + + hc.element().unwrap().set_property("producer-name", "p2"); + + /* This buffer should be dispatched to no consumer */ + push_one(&srcpad1, gst::ClockTime::from_nseconds(3)); + + /* This one should be dispatched to hc, and it should be its next buffer */ + push_one(&srcpad2, gst::ClockTime::from_nseconds(4)); + let outbuf = hc.pull().unwrap(); + assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(4))); + + element1.set_state(gst::State::Null).unwrap(); + element2.set_state(gst::State::Null).unwrap(); +} diff --git a/meson.build b/meson.build index 69a7fdfb..f029cac5 100644 --- a/meson.build +++ b/meson.build @@ -107,6 +107,7 @@ plugins = { 'ts-standalone', ], }, + 'inter': {'library': 'libgstrsinter'}, 'mp4': {'library': 'libgstmp4'}, 'fmp4': { diff --git a/meson_options.txt b/meson_options.txt index 033e586c..b79107a8 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -14,6 +14,7 @@ option('sodium-source', type: 'combo', choices: ['system', 'built-in'], value: 'built-in', description: 'Whether to use libsodium from the system or the built-in version from the sodiumoxide crate') option('threadshare', type: 'feature', value: 'auto', description: 'Build threadshare plugin') +option('inter', type: 'feature', value: 'auto', description: 'Build inter plugin') # mux option('flavors', type: 'feature', value: 'auto', description: 'Build flavors plugin')