generic: expose inter plugin

This new plugin exposes two elements, intersink and intersrc. These act
as wormholes for data in the same process and can be used to forward
data from one pipeline to another.

The implementation makes use of gstreamer-utils' StreamProducer, and
supports dynamically adding and removing consumers, before and after
producers, and changing producer names while PLAYING, both on the sink
and the src.

This initial implementation comes with a small demo, and a few tests.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1257>
This commit is contained in:
Mathieu Duponchelle 2023-06-21 19:55:27 +02:00 committed by GStreamer Marge Bot
parent 6523a07a9f
commit e905299eba
17 changed files with 1381 additions and 0 deletions

View file

@ -14,6 +14,7 @@ members = [
"generic/file",
"generic/sodium",
"generic/threadshare",
"generic/inter",
"mux/flavors",
"mux/fmp4",
@ -64,6 +65,7 @@ default-members = [
"audio/lewton",
"generic/threadshare",
"generic/inter",
"mux/fmp4",
"mux/mp4",

View file

@ -22,6 +22,7 @@ RS_PREFIXED = [
'png',
'tracers',
'rtp',
'inter',
]
OVERRIDE = {

View file

@ -36,6 +36,7 @@ RENAMES = {
'rswebrtc': 'webrtc',
'rspng': 'png',
'rsvideofx': 'videofx',
'rsinter': 'inter',
'textahead': 'ahead',
'textwrap': 'wrap',
}

View file

@ -5369,6 +5369,94 @@
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"rsinter": {
"description": "GStreamer Inter Plugin",
"elements": {
"intersink": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "Inter Sink",
"hierarchy": [
"GstInterSink",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Generic/Sink",
"pad-templates": {
"sink": {
"caps": "ANY",
"direction": "sink",
"presence": "always"
}
},
"properties": {
"producer-name": {
"blurb": "Producer Name to use",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "playing",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
},
"intersrc": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "Inter Src",
"hierarchy": [
"GstInterSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Generic/Src",
"pad-templates": {
"src": {
"caps": "ANY",
"direction": "src",
"presence": "always"
}
},
"properties": {
"producer-name": {
"blurb": "Producer Name to consume from",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "playing",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
}
},
"filename": "gstrsinter",
"license": "MPL-2.0",
"other-types": {},
"package": "gst-plugin-inter",
"source": "gst-plugin-inter",
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"rsonvif": {
"description": "GStreamer Rust ONVIF Plugin",
"elements": {

53
generic/inter/Cargo.toml Normal file
View file

@ -0,0 +1,53 @@
[package]
name = "gst-plugin-inter"
version = "0.11.0-alpha.1"
authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
license = "MPL-2.0"
description = "GStreamer Inter Plugin"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
edition = "2021"
rust-version = "1.66"
[dependencies]
anyhow = "1"
gst = { package = "gstreamer", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst_utils = { package = "gstreamer-utils", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst_app = { package = "gstreamer-app", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1.0"
[dev-dependencies]
pretty_assertions = "1"
gst-check = { package = "gstreamer-check", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
futures = "0.3"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1.11"
serial_test = "2"
[lib]
name = "gstrsinter"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
[features]
static = []
capi = []
doc = ["gst/v1_18"]
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
[[example]]
name = "plug-and-play"

5
generic/inter/build.rs Normal file
View file

@ -0,0 +1,5 @@
// SPDX-License-Identifier: MPL-2.0
fn main() {
gst_plugin_version_helper::info()
}

View file

@ -0,0 +1,74 @@
use anyhow::Error;
use futures::prelude::*;
use futures::stream::select_all;
use gst::prelude::*;
fn toplevel(obj: &gst::Object) -> gst::Object {
if let Some(parent) = obj.parent() {
toplevel(&parent)
} else {
obj.clone()
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
gst::init()?;
let src_pipeline = gst::parse_launch("videotestsrc is-live=true ! intersink")?;
let sink_pipeline = gst::parse_launch("intersrc ! videoconvert ! autovideosink")?;
let mut stream = select_all([
src_pipeline.bus().unwrap().stream(),
sink_pipeline.bus().unwrap().stream(),
]);
let base_time = gst::SystemClock::obtain().time().unwrap();
src_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
src_pipeline.set_start_time(gst::ClockTime::NONE);
src_pipeline.set_base_time(base_time);
sink_pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
sink_pipeline.set_start_time(gst::ClockTime::NONE);
sink_pipeline.set_base_time(base_time);
src_pipeline.set_state(gst::State::Playing)?;
sink_pipeline.set_state(gst::State::Playing)?;
while let Some(msg) = stream.next().await {
use gst::MessageView;
match msg.view() {
MessageView::Latency(..) => {
if let Some(o) = msg.src() {
if let Ok(pipeline) = toplevel(o).downcast::<gst::Pipeline>() {
eprintln!("Recalculating latency {:?}", pipeline);
let _ = pipeline.recalculate_latency();
}
}
}
MessageView::Eos(..) => {
eprintln!("Unexpected EOS");
break;
}
MessageView::Error(err) => {
eprintln!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
}
}
src_pipeline.set_state(gst::State::Null)?;
sink_pipeline.set_state(gst::State::Null)?;
Ok(())
}

View file

@ -0,0 +1,325 @@
use anyhow::Error;
use futures::prelude::*;
use gst::prelude::*;
use std::collections::HashMap;
use std::io::prelude::*;
use tokio::task;
struct Producer {
pipeline: gst::Pipeline,
sink: gst::Element,
overlay: gst::Element,
}
struct Consumer {
pipeline: gst::Pipeline,
src: gst::Element,
}
fn create_sink_pipeline(producer_name: &str) -> Result<Producer, Error> {
let pipeline = gst::Pipeline::builder()
.name(format!("producer-{producer_name}"))
.build();
let videotestsrc = gst::ElementFactory::make("videotestsrc")
.property_from_str("pattern", "ball")
.property("is-live", true)
.build()?;
let capsfilter = gst::ElementFactory::make("capsfilter")
.property(
"caps",
gst::Caps::builder("video/x-raw")
.field("framerate", gst::Fraction::new(50, 1))
.build(),
)
.build()?;
let queue = gst::ElementFactory::make("queue").build()?;
let overlay = gst::ElementFactory::make("textoverlay")
.property("font-desc", "Sans 30")
.property("text", format!("Producer: {producer_name}"))
.property_from_str("valignment", "top")
.build()?;
let timeoverlay = gst::ElementFactory::make("timeoverlay")
.property("font-desc", "Sans 30")
.property_from_str("valignment", "center")
.property_from_str("halignment", "center")
.build()?;
let sink = gst::ElementFactory::make("intersink")
.property("producer-name", producer_name)
.build()?;
pipeline.add_many([
&videotestsrc,
&capsfilter,
&queue,
&overlay,
&timeoverlay,
&sink,
])?;
gst::Element::link_many([
&videotestsrc,
&capsfilter,
&queue,
&overlay,
&timeoverlay,
&sink,
])?;
Ok(Producer {
pipeline,
sink,
overlay,
})
}
fn create_src_pipeline(producer_name: &str, consumer_name: &str) -> Result<Consumer, Error> {
let pipeline = gst::Pipeline::builder()
.name(format!("consumer-{consumer_name}"))
.build();
let src = gst::ElementFactory::make("intersrc")
.property("producer-name", producer_name)
.build()?;
let queue = gst::ElementFactory::make("queue").build()?;
let vconv = gst::ElementFactory::make("videoconvert").build()?;
let overlay = gst::ElementFactory::make("textoverlay")
.property("font-desc", "Sans 30")
.property("text", format!("Consumer: {consumer_name}"))
.property_from_str("valignment", "bottom")
.build()?;
let vconv2 = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("autovideosink").build()?;
pipeline.add_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?;
gst::Element::link_many([&src, &queue, &vconv, &overlay, &vconv2, &sink])?;
Ok(Consumer { pipeline, src })
}
fn prompt_on() {
print!("$ ");
let _ = std::io::stdout().flush();
}
fn monitor_pipeline(pipeline: &gst::Pipeline, base_time: gst::ClockTime) -> Result<(), Error> {
pipeline.set_clock(Some(&gst::SystemClock::obtain()))?;
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(base_time);
pipeline.set_state(gst::State::Playing)?;
let mut bus_stream = pipeline.bus().expect("Pipeline should have a bus").stream();
let pipeline_clone = pipeline.downgrade();
task::spawn(async move {
while let Some(msg) = bus_stream.next().await {
use gst::MessageView;
if let Some(pipeline) = pipeline_clone.upgrade() {
match msg.view() {
MessageView::Latency(..) => {
let _ = pipeline.recalculate_latency();
}
MessageView::Eos(..) => {
println!(
"EOS from {}",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into())
);
prompt_on();
break;
}
MessageView::Error(err) => {
let _ = pipeline.set_state(gst::State::Null);
println!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
prompt_on();
break;
}
MessageView::StateChanged(sc) => {
if msg.src() == Some(pipeline.upcast_ref()) {
gst::debug_bin_to_dot_file(
pipeline.upcast_ref::<gst::Bin>(),
gst::DebugGraphDetails::all(),
format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()),
);
}
}
_ => (),
}
} else {
break;
}
}
});
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
gst::init()?;
println!("h for help");
let base_time = gst::SystemClock::obtain().time().unwrap();
let mut producers: HashMap<String, Producer> = HashMap::new();
let mut consumers: HashMap<String, Consumer> = HashMap::new();
let mut stdin = std::io::stdin().lock();
loop {
let mut buf = String::new();
prompt_on();
match stdin.read_line(&mut buf)? {
0 => {
eprintln!("EOF!");
break;
}
_ => {
let command: Vec<_> = buf.split_whitespace().collect();
match command.first() {
Some(&"ap") => {
if command.len() != 2 {
println!("ap <producer_name>: Add a producer");
} else {
let producer_name = command.get(1).unwrap().to_string();
if producers.contains_key(&producer_name) {
println!("Producer with name {producer_name} already exists!");
continue;
}
let producer = create_sink_pipeline(&producer_name)?;
monitor_pipeline(&producer.pipeline, base_time)?;
println!("Added producer with name {producer_name}");
producers.insert(producer_name, producer);
}
}
Some(&"ac") => {
if command.len() != 3 {
println!("ac <consumer_name> <producer_name>: Add a consumer");
} else {
let consumer_name = command.get(1).unwrap().to_string();
let producer_name = command.get(2).unwrap().to_string();
if consumers.contains_key(&consumer_name) {
println!("Consumer with name {consumer_name} already exists!");
continue;
}
let consumer = create_src_pipeline(&producer_name, &consumer_name)?;
monitor_pipeline(&consumer.pipeline, base_time)?;
println!("Added consumer with name {consumer_name} and producer name {producer_name}");
consumers.insert(consumer_name, consumer);
}
}
Some(&"rp") => {
if command.len() != 2 {
println!("rp <producer_name>: Remove a producer");
} else {
let producer_name = command.get(1).unwrap().to_string();
if let Some(producer) = producers.remove(&producer_name) {
let _ = producer.pipeline.set_state(gst::State::Null);
println!("Removed producer with name {producer_name}");
} else {
println!("No producer with name {producer_name}");
}
}
}
Some(&"rc") => {
if command.len() != 2 {
println!("rc <consumer_name>: Remove a consumer");
} else {
let consumer_name = command.get(1).unwrap().to_string();
if let Some(consumer) = consumers.remove(&consumer_name) {
let _ = consumer.pipeline.set_state(gst::State::Null);
println!("Removed consumer with name {consumer_name}");
} else {
println!("No consumer with name {consumer_name}");
}
}
}
Some(&"cnp") => {
if command.len() != 3 {
println!("cnp <old_producer_name> <new_producer_name>: Change the name of a producer");
} else {
let old_producer_name = command.get(1).unwrap().to_string();
let producer_name = command.get(2).unwrap().to_string();
if producers.contains_key(&producer_name) {
println!("Producer with name {producer_name} already exists!");
continue;
}
if let Some(producer) = producers.remove(&old_producer_name) {
producer.sink.set_property("producer-name", &producer_name);
producer
.overlay
.set_property("text", format!("Producer: {producer_name}"));
println!(
"Changed producer name {old_producer_name} -> {producer_name}"
);
producers.insert(producer_name, producer);
} else {
println!("No producer with name {old_producer_name}");
}
}
}
Some(&"cpn") => {
if command.len() != 3 {
println!("cpn <consumer_name> <new_producer_name>: Change the producer name for a consumer");
} else {
let consumer_name = command.get(1).unwrap().to_string();
let producer_name = command.get(2).unwrap().to_string();
if let Some(consumer) = consumers.get_mut(&consumer_name) {
consumer.src.set_property("producer-name", &producer_name);
println!("Changed producer name for consumer {consumer_name} to {producer_name}");
} else {
println!("No consumer with name {consumer_name}");
}
}
}
Some(&"h") => {
println!("h: show this help");
println!("ap <producer_name>: Add a producer");
println!("ac <consumer_name> <producer_name>: Add a consumer");
println!("rp <producer_name>: Remove a producer");
println!("rc <consumer_name>: Remove a consumer");
println!("cnp <old_producer_name> <new_producer_name>: Change the name of a producer");
println!("cpn <consumer_name> <new_producer_name>: Change the producer name for a consumer");
}
_ => {
println!("Unknown command");
}
}
}
}
buf.clear();
}
for (_, producer) in producers {
let _ = producer.pipeline.set_state(gst::State::Null);
}
for (_, consumer) in consumers {
let _ = consumer.pipeline.set_state(gst::State::Null);
}
Ok(())
}

44
generic/inter/src/lib.rs Normal file
View file

@ -0,0 +1,44 @@
// Copyright (C) 2023 Mathieu Duponchelle <mathieu@centricular.com>
//
// Take a look at the license at the top of the repository in the LICENSE file.
#![allow(unused_doc_comments)]
//! GStreamer elements for connecting pipelines in the same process
mod sink;
mod src;
mod streamproducer;
/**
* plugin-rsinter:
* @title: Rust inter elements
* @short_description: A set of elements for transferring data between pipelines
*
* This plugin exposes two elements, `intersink` and `intersrc`, that can be
* used to transfer data from one pipeline to multiple others in the same
* process.
*
* The elements are implemented using the `StreamProducer` API from
* gstreamer-utils.
*
* Since: plugins-rs-0.11.0
*/
use gst::glib;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
sink::register(plugin)?;
src::register(plugin)?;
Ok(())
}
gst::plugin_define!(
rsinter,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MPL-2.0",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

View file

@ -0,0 +1,217 @@
// SPDX-License-Identifier: MPL-2.0
use crate::streamproducer::InterStreamProducer;
use anyhow::Error;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::sync::Mutex;
use once_cell::sync::Lazy;
const DEFAULT_PRODUCER_NAME: &str = "default";
#[derive(Debug)]
struct Settings {
producer_name: String,
}
impl Default for Settings {
fn default() -> Self {
Settings {
producer_name: DEFAULT_PRODUCER_NAME.to_string(),
}
}
}
struct State {
appsink: gst_app::AppSink,
sinkpad: gst::GhostPad,
}
/* Locking order is field order */
pub struct InterSink {
settings: Mutex<Settings>,
state: Mutex<State>,
}
impl InterSink {
fn prepare(&self) -> Result<(), Error> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
InterStreamProducer::acquire(&settings.producer_name, &state.appsink)?;
Ok(())
}
fn unprepare(&self) {
let settings = self.settings.lock().unwrap();
InterStreamProducer::release(&settings.producer_name);
}
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"intersink",
gst::DebugColorFlags::empty(),
Some("Inter Sink"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for InterSink {
const NAME: &'static str = "GstInterSink";
type Type = super::InterSink;
type ParentType = gst::Bin;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::GhostPad::from_template(&templ);
Self {
settings: Mutex::new(Default::default()),
state: Mutex::new(State {
appsink: gst_app::AppSink::builder().name("appsink").build(),
sinkpad: sinkpad.upcast(),
}),
}
}
}
impl ObjectImpl for InterSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecString::builder("producer-name")
.nick("Producer Name")
.blurb("Producer Name to use")
.doc_show_default()
.mutable_playing()
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"producer-name" => {
let mut settings = self.settings.lock().unwrap();
let old_producer_name = settings.producer_name.clone();
settings.producer_name = value
.get::<String>()
.unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string());
if let Some(appsink) = InterStreamProducer::release(&old_producer_name) {
if let Err(err) =
InterStreamProducer::acquire(&settings.producer_name, &appsink)
{
drop(settings);
gst::error!(CAT, imp: self, "{err}");
self.post_error_message(gst::error_msg!(
gst::StreamError::Failed,
["{err}"]
))
} else {
drop(settings);
// This is required because StreamProducer obtains the latency
// it needs to forward from Latency events, and we need to let the
// application know it should recalculate latency to get the event
// to travel upstream again
self.post_message(gst::message::Latency::new());
}
}
}
_ => unimplemented!(),
};
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"producer-name" => {
let settings = self.settings.lock().unwrap();
settings.producer_name.to_value()
}
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SINK);
let state = self.state.lock().unwrap();
obj.add(&state.appsink).unwrap();
obj.add_pad(&state.sinkpad).unwrap();
state
.sinkpad
.set_target(Some(&state.appsink.static_pad("sink").unwrap()))
.unwrap();
}
}
impl GstObjectImpl for InterSink {}
impl ElementImpl for InterSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Inter Sink",
"Generic/Sink",
"Inter Sink",
"Mathieu Duponchelle <mathieu@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
if transition == gst::StateChange::ReadyToPaused {
if let Err(err) = self.prepare() {
gst::element_error!(
self.obj(),
gst::StreamError::Failed,
["Failed to prepare: {}", err]
);
return Err(gst::StateChangeError);
}
}
let ret = self.parent_change_state(transition)?;
if transition == gst::StateChange::PausedToReady {
self.unprepare();
}
Ok(ret)
}
}
impl BinImpl for InterSink {}

View file

@ -0,0 +1,35 @@
// SPDX-License-Identifier: MPL-2.0
use glib::prelude::*;
use gst::glib;
mod imp;
/**
* SECTION:element-intersink
*
* #intersink is an element that can be used to produce data for
* multiple #intersrc elements to consume.
*
* You can access the underlying appsink element through the static name
* "appsink".
*
* #intersink should not reside in the same pipeline as the #intersrc
* that consumes from it, here is an example of how to use those elements
* in separate pipelines:
*
* {{ generic/inter/examples/basic.rs }}
*/
glib::wrapper! {
pub struct InterSink(ObjectSubclass<imp::InterSink>) @extends gst::Bin, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"intersink",
gst::Rank::None,
InterSink::static_type(),
)
}

View file

@ -0,0 +1,203 @@
// SPDX-License-Identifier: MPL-2.0
use crate::streamproducer::InterStreamProducer;
use anyhow::Error;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::sync::Mutex;
use once_cell::sync::Lazy;
const DEFAULT_PRODUCER_NAME: &str = "default";
#[derive(Debug)]
struct Settings {
producer_name: String,
}
impl Default for Settings {
fn default() -> Self {
Settings {
producer_name: DEFAULT_PRODUCER_NAME.to_string(),
}
}
}
struct State {
srcpad: gst::GhostPad,
appsrc: gst_app::AppSrc,
}
/* Locking order is field order */
pub struct InterSrc {
settings: Mutex<Settings>,
state: Mutex<State>,
}
impl InterSrc {
fn prepare(&self) -> Result<(), Error> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc);
Ok(())
}
fn unprepare(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
InterStreamProducer::unsubscribe(&settings.producer_name, &state.appsrc);
}
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("intersrc", gst::DebugColorFlags::empty(), Some("Inter Src"))
});
#[glib::object_subclass]
impl ObjectSubclass for InterSrc {
const NAME: &'static str = "GstInterSrc";
type Type = super::InterSrc;
type ParentType = gst::Bin;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::GhostPad::from_template(&templ);
Self {
settings: Mutex::new(Default::default()),
state: Mutex::new(State {
srcpad: srcpad.upcast(),
appsrc: gst_app::AppSrc::builder().name("appsrc").build(),
}),
}
}
}
impl ObjectImpl for InterSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecString::builder("producer-name")
.nick("Producer Name")
.blurb("Producer Name to consume from")
.doc_show_default()
.mutable_playing()
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"producer-name" => {
let mut settings = self.settings.lock().unwrap();
let old_producer_name = settings.producer_name.clone();
settings.producer_name = value
.get::<String>()
.unwrap_or_else(|_| DEFAULT_PRODUCER_NAME.to_string());
let state = self.state.lock().unwrap();
if InterStreamProducer::unsubscribe(&old_producer_name, &state.appsrc) {
InterStreamProducer::subscribe(&settings.producer_name, &state.appsrc);
}
}
_ => unimplemented!(),
};
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"producer-name" => {
let settings = self.settings.lock().unwrap();
settings.producer_name.to_value()
}
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
let state = self.state.lock().unwrap();
gst_utils::StreamProducer::configure_consumer(&state.appsrc);
obj.add(&state.appsrc).unwrap();
obj.add_pad(&state.srcpad).unwrap();
state
.srcpad
.set_target(Some(&state.appsrc.static_pad("src").unwrap()))
.unwrap();
}
}
impl GstObjectImpl for InterSrc {}
impl ElementImpl for InterSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Inter Src",
"Generic/Src",
"Inter Src",
"Mathieu Duponchelle <mathieu@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
if transition == gst::StateChange::ReadyToPaused {
if let Err(err) = self.prepare() {
gst::element_error!(
self.obj(),
gst::StreamError::Failed,
["Failed to prepare: {}", err]
);
return Err(gst::StateChangeError);
}
}
let ret = self.parent_change_state(transition)?;
if transition == gst::StateChange::PausedToReady {
self.unprepare();
}
Ok(ret)
}
}
impl BinImpl for InterSrc {}

View file

@ -0,0 +1,34 @@
// SPDX-License-Identifier: MPL-2.0
use glib::prelude::*;
use gst::glib;
mod imp;
/**
* SECTION:element-intersrc
*
* #intersrc is an element that can be used to consume data from an #intersink.
*
* You can access the underlying appsrc element through the static name
* "appsrc".
*
* #intersrc should not reside in the same pipeline as the #intersink
* that it consumes from, here is an example of how to use those elements
* in separate pipelines:
*
* {{ generic/inter/examples/basic.rs }}
*/
glib::wrapper! {
pub struct InterSrc(ObjectSubclass<imp::InterSrc>) @extends gst::Bin, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"intersrc",
gst::Rank::None,
InterSrc::static_type(),
)
}

View file

@ -0,0 +1,159 @@
use gst::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
use anyhow::{anyhow, Error};
use once_cell::sync::Lazy;
pub enum InterStreamProducer {
Pending {
consumers: HashSet<gst_app::AppSrc>,
},
Active {
producer: gst_utils::StreamProducer,
links: HashMap<gst_app::AppSrc, gst_utils::ConsumptionLink>,
},
}
static PRODUCERS: Lazy<Mutex<HashMap<String, InterStreamProducer>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
fn toplevel(obj: &gst::Object) -> gst::Object {
if let Some(parent) = obj.parent() {
toplevel(&parent)
} else {
obj.clone()
}
}
fn ensure_different_toplevel(producer: &gst_app::AppSink, consumer: &gst_app::AppSrc) {
let top_a = toplevel(producer.upcast_ref());
let top_b = toplevel(consumer.upcast_ref());
if top_a == top_b {
gst::glib::g_critical!(
"gstrsinter",
"Intersink with appsink {} should not share the same toplevel bin \
as intersrc with appsrc {}, this results in loops in latency calculation",
producer.name(),
consumer.name()
);
}
}
impl InterStreamProducer {
pub fn acquire(
name: &str,
appsink: &gst_app::AppSink,
) -> Result<gst_utils::StreamProducer, Error> {
let mut producers = PRODUCERS.lock().unwrap();
if let Some(producer) = producers.remove(name) {
match producer {
InterStreamProducer::Pending { consumers } => {
let producer = gst_utils::StreamProducer::from(appsink);
let mut links = HashMap::new();
for consumer in consumers {
ensure_different_toplevel(appsink, &consumer);
let link = producer
.add_consumer(&consumer)
.expect("consumer should not have already been added");
links.insert(consumer, link);
}
producers.insert(
name.to_string(),
InterStreamProducer::Active {
producer: producer.clone(),
links,
},
);
Ok(producer)
}
InterStreamProducer::Active { .. } => {
producers.insert(name.to_string(), producer);
Err(anyhow!(
"An active producer already exists with name {}",
name
))
}
}
} else {
let producer = gst_utils::StreamProducer::from(appsink);
producers.insert(
name.to_string(),
InterStreamProducer::Active {
producer: producer.clone(),
links: HashMap::new(),
},
);
Ok(producer)
}
}
pub fn release(name: &str) -> Option<gst_app::AppSink> {
let mut producers = PRODUCERS.lock().unwrap();
if let Some(producer) = producers.remove(name) {
match producer {
InterStreamProducer::Pending { .. } => None,
InterStreamProducer::Active { links, producer } => {
producers.insert(
name.to_string(),
InterStreamProducer::Pending {
consumers: links.into_keys().collect(),
},
);
Some(producer.appsink().clone())
}
}
} else {
None
}
}
pub fn subscribe(name: &str, consumer: &gst_app::AppSrc) {
let mut producers = PRODUCERS.lock().unwrap();
if let Some(producer) = producers.get_mut(name) {
match producer {
InterStreamProducer::Pending { consumers } => {
consumers.insert(consumer.clone());
}
InterStreamProducer::Active { producer, links } => {
ensure_different_toplevel(producer.appsink(), consumer);
let link = producer
.add_consumer(consumer)
.expect("consumer should not already have been added");
links.insert(consumer.clone(), link);
}
}
} else {
let producer = InterStreamProducer::Pending {
consumers: [consumer.clone()].into(),
};
producers.insert(name.to_string(), producer);
}
}
pub fn unsubscribe(name: &str, consumer: &gst_app::AppSrc) -> bool {
let mut producers = PRODUCERS.lock().unwrap();
if let Some(producer) = producers.get_mut(name) {
match producer {
InterStreamProducer::Pending { consumers } => consumers.remove(consumer),
InterStreamProducer::Active { links, .. } => links.remove(consumer).is_some(),
}
} else {
false
}
}
}

View file

@ -0,0 +1,138 @@
// SPDX-License-Identifier: MPL-2.0
use gst::prelude::*;
use serial_test::serial;
use pretty_assertions::assert_eq;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstrsinter::plugin_register_static().unwrap();
});
}
fn start_consumer(producer_name: &str) -> gst_check::Harness {
let mut hc = gst_check::Harness::new("intersrc");
hc.element()
.unwrap()
.set_property("producer-name", producer_name);
hc.play();
hc
}
fn start_producer(producer_name: &str) -> (gst::Pad, gst::Element) {
let element = gst::ElementFactory::make("intersink").build().unwrap();
element.set_property("producer-name", producer_name);
element.set_state(gst::State::Playing).unwrap();
let sinkpad = element.static_pad("sink").unwrap();
let srcpad = gst::Pad::new(gst::PadDirection::Src);
srcpad.set_active(true).unwrap();
srcpad.link(&sinkpad).unwrap();
srcpad.push_event(gst::event::StreamStart::builder("foo").build());
srcpad
.push_event(gst::event::Caps::builder(&gst::Caps::builder("video/x-raw").build()).build());
srcpad.push_event(
gst::event::Segment::builder(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
(srcpad, element)
}
fn push_one(srcpad: &gst::Pad, pts: gst::ClockTime) {
let mut inbuf = gst::Buffer::with_size(1).unwrap();
{
let buf = inbuf.get_mut().unwrap();
buf.set_pts(pts);
}
srcpad.push(inbuf).unwrap();
}
#[test]
#[serial]
fn test_forward_one_buffer() {
init();
let mut hc = start_consumer("p1");
let (srcpad, element) = start_producer("p1");
push_one(&srcpad, gst::ClockTime::from_nseconds(1));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
element.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_change_name_of_producer() {
init();
let mut hc1 = start_consumer("p1");
let mut hc2 = start_consumer("p2");
let (srcpad, element) = start_producer("p1");
/* Once this returns, the buffer should have been dispatched only to hc1 */
push_one(&srcpad, gst::ClockTime::from_nseconds(1));
let outbuf = hc1.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
element.set_property("producer-name", "p2");
/* This should only get dispatched to hc2, and it should be its first buffer */
push_one(&srcpad, gst::ClockTime::from_nseconds(2));
let outbuf = hc2.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
element.set_property("producer-name", "p1");
/* Back to hc1, which should not see the buffer we pushed in the previous step */
push_one(&srcpad, gst::ClockTime::from_nseconds(3));
let outbuf = hc1.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(3)));
element.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_change_producer_name() {
init();
let mut hc = start_consumer("p1");
let (srcpad1, element1) = start_producer("p1");
let (srcpad2, element2) = start_producer("p2");
/* This buffer should be dispatched to no consumer */
push_one(&srcpad2, gst::ClockTime::from_nseconds(1));
/* This one should be dispatched to hc, and it should be its first buffer */
push_one(&srcpad1, gst::ClockTime::from_nseconds(2));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
hc.element().unwrap().set_property("producer-name", "p2");
/* This buffer should be dispatched to no consumer */
push_one(&srcpad1, gst::ClockTime::from_nseconds(3));
/* This one should be dispatched to hc, and it should be its next buffer */
push_one(&srcpad2, gst::ClockTime::from_nseconds(4));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(4)));
element1.set_state(gst::State::Null).unwrap();
element2.set_state(gst::State::Null).unwrap();
}

View file

@ -107,6 +107,7 @@ plugins = {
'ts-standalone',
],
},
'inter': {'library': 'libgstrsinter'},
'mp4': {'library': 'libgstmp4'},
'fmp4': {

View file

@ -14,6 +14,7 @@ option('sodium-source', type: 'combo',
choices: ['system', 'built-in'], value: 'built-in',
description: 'Whether to use libsodium from the system or the built-in version from the sodiumoxide crate')
option('threadshare', type: 'feature', value: 'auto', description: 'Build threadshare plugin')
option('inter', type: 'feature', value: 'auto', description: 'Build inter plugin')
# mux
option('flavors', type: 'feature', value: 'auto', description: 'Build flavors plugin')