threadshare: add inter elements

These threadshare-based elements provide a means to connect an upstream pipeline
to multiple downstream pipelines while taking advantage of reduced nb of threads
& context switches.

Differences with the `ts-proxy` elements:

* Link one to many pipelines instead of one to one.
* No back pressure: items which can't be handled by a downstream pipeline are
  lost, wherease they are kept in a pending queue and block the stream for
  `ts-proxysink`.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2293>
This commit is contained in:
François Laignel 2025-06-20 17:10:52 +02:00
parent b48ab031a0
commit 6ae07945ac
13 changed files with 2289 additions and 2 deletions

12
Cargo.lock generated
View file

@ -189,6 +189,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-lock"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [
"event-listener",
"event-listener-strategy",
"pin-project-lite",
]
[[package]]
name = "async-recursion"
version = "1.1.1"
@ -3357,6 +3368,7 @@ dependencies = [
name = "gst-plugin-threadshare"
version = "0.14.0-alpha.1"
dependencies = [
"async-lock",
"async-task",
"bitflags 2.9.1",
"cc",

View file

@ -16165,6 +16165,142 @@
},
"rank": "none"
},
"ts-intersink": {
"author": "François Laignel <francois@centricular.com>",
"description": "Thread-sharing inter-pipelines sink",
"hierarchy": [
"GstTsInterSink",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Sink/Generic",
"pad-templates": {
"sink": {
"caps": "ANY",
"direction": "sink",
"presence": "always"
}
},
"properties": {
"inter-context": {
"blurb": "Context name of the inter elements to share with",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
},
"ts-intersrc": {
"author": "François Laignel <francois@centricular.com>",
"description": "Thread-sharing inter-pipelines source",
"hierarchy": [
"GstTsInterSrc",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Source/Generic",
"pad-templates": {
"src": {
"caps": "ANY",
"direction": "src",
"presence": "always"
}
},
"properties": {
"context": {
"blurb": "Context name to share threads with",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"context-wait": {
"blurb": "Throttle poll loop to run at most once every this many ms",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "0",
"max": "1000",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"inter-context": {
"blurb": "Context name of the inter elements to share with",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"max-size-buffers": {
"blurb": "Maximum number of buffers to queue (0=unlimited)",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "200",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"max-size-bytes": {
"blurb": "Maximum number of bytes to queue (0=unlimited)",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "1048576",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"max-size-time": {
"blurb": "Maximum number of nanoseconds to queue (0=unlimited)",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "1000000000",
"max": "18446744073709551614",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
}
},
"rank": "none"
},
"ts-jitterbuffer": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "Simple jitterbuffer",

View file

@ -10,6 +10,7 @@ rust-version.workspace = true
[dependencies]
async-task = "4.3.0"
async-lock = "3.4.0"
cfg-if = "1"
concurrent-queue = "2.2.0"
futures = "0.3.28"
@ -62,6 +63,14 @@ path = "examples/tcpclientsrc_benchmark_sender.rs"
name = "ts-standalone"
path = "examples/standalone/main.rs"
[[example]]
name = "ts-inter-simple"
path = "examples/inter/simple.rs"
[[test]]
name = "ts-inter"
path = "tests/inter.rs"
[build-dependencies]
gst-plugin-version-helper.workspace = true
cc = "1.0.38"

View file

@ -0,0 +1,106 @@
use futures::prelude::*;
use gst::prelude::*;
fn main() {
let g_ctx = gst::glib::MainContext::default();
gst::init().unwrap();
let pipe_up = gst::parse::launch(
"
audiotestsrc is-live=true num-buffers=2000 volume=0.02
! opusenc
! ts-intersink inter-context=my-inter-ctx
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// A downstream pipeline which will receive the Opus encoded audio stream
// and render it locally.
let pipe_down = gst::parse::launch(
"
ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
! opusdec
! audioconvert
! audioresample
! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
! autoaudiosink
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// Both pipelines must agree on the timing information or we'll get glitches
// or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
// as pipe_down, but since that will be set asynchronously to the audio clock, it
// is simpler and likely accurate enough to use the system clock for both
// pipelines. If no element in either pipeline will provide a clock, this
// is not needed.
let clock = gst::SystemClock::obtain();
pipe_up.set_clock(Some(&clock)).unwrap();
pipe_down.set_clock(Some(&clock)).unwrap();
// This is not really needed in this case since the pipelines are created and
// started at the same time. However, an application that dynamically
// generates pipelines must ensure that all the pipelines that will be
// connected together share the same base time.
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
g_ctx.block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
futures::select! {
msg_up = bus_up_stream.next() => {
let Some(msg) = msg_up else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up.recalculate_latency();
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
msg_down = bus_down_stream.next() => {
let Some(msg) = msg_down else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Eos(_) => {
println!("Got EoS");
break;
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
};
}
});
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
// This is needed by some tracers to write their log file
unsafe {
gst::deinit();
}
}

View file

@ -35,7 +35,7 @@ static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
)
});
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DataQueueItem {
Buffer(gst::Buffer),
BufferList(gst::BufferList),

View file

@ -0,0 +1,118 @@
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/// Module for the threadsharing `ts-intersink` & `ts-intersrc` elements
///
/// These threadshare-based elements provide a means to connect an upstream pipeline to
/// multiple downstream pipelines while taking advantage of reduced nunmber of threads &
/// context switches.
///
/// Differences with the `ts-proxy` elements:
///
/// * Link one to many pipelines instead of one to one.
/// * No back pressure: items which can't be handled by a downstream pipeline are
/// lost, wherease they are kept in a pending queue and block the stream for
/// `ts-proxysink`.
use gst::glib;
use gst::prelude::*;
use slab::Slab;
use async_lock::{
futures::{Read as AsyncLockRead, Write as AsyncLockWrite},
Mutex, RwLock,
};
use std::collections::HashMap;
use std::sync::{Arc, LazyLock, Weak};
use std::time::Duration;
use crate::dataqueue::DataQueue;
use crate::runtime::executor::block_on_or_add_sub_task;
mod sink;
mod src;
static INTER_CONTEXTS: LazyLock<Mutex<HashMap<String, InterContextWeak>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
const DEFAULT_INTER_CONTEXT: &str = "";
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug)]
struct InterContextInner {
name: String,
dataqueues: Slab<DataQueue>,
sources: Slab<src::InterSrc>,
sinkpad: Option<gst::Pad>,
}
impl InterContextInner {
fn new(name: &str) -> InterContextInner {
InterContextInner {
name: name.into(),
dataqueues: Slab::new(),
sources: Slab::new(),
sinkpad: None,
}
}
}
impl Drop for InterContextInner {
fn drop(&mut self) {
let name = self.name.clone();
block_on_or_add_sub_task(async move {
let mut inter_ctxs = INTER_CONTEXTS.lock().await;
inter_ctxs.remove(&name);
});
}
}
#[derive(Debug, Clone)]
struct InterContext(Arc<RwLock<InterContextInner>>);
impl InterContext {
fn new(name: &str) -> InterContext {
InterContext(Arc::new(RwLock::new(InterContextInner::new(name))))
}
fn downgrade(&self) -> InterContextWeak {
InterContextWeak(Arc::downgrade(&self.0))
}
fn read(&self) -> AsyncLockRead<'_, InterContextInner> {
self.0.read()
}
fn write(&self) -> AsyncLockWrite<'_, InterContextInner> {
self.0.write()
}
}
#[derive(Debug, Clone)]
struct InterContextWeak(Weak<RwLock<InterContextInner>>);
impl InterContextWeak {
fn upgrade(&self) -> Option<InterContext> {
self.0.upgrade().map(InterContext)
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ts-intersink",
gst::Rank::NONE,
sink::InterSink::static_type(),
)?;
gst::Element::register(
Some(plugin),
"ts-intersrc",
gst::Rank::NONE,
src::InterSrc::static_type(),
)
}

View file

@ -0,0 +1,436 @@
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-ts-intersink
* @see_also: ts-intersrc, ts-proxysink, ts-proxysrc, intersink, intersrc
*
* Thread-sharing sink for inter-pipelines communication.
*
* `ts-intersink` is an element that proxies events travelling downstream, non-serialized
* queries and buffers (including metas) to other pipelines that contains a matching
* `ts-intersrc` element. The purpose is to allow one to many decoupled pipelines
* to function as though they were one without having to manually shuttle buffers,
* events, queries, etc.
*
* This element doesn't implement back-pressure like `ts-proxysink` does as we don't
* want one lagging downstream `ts-proxysrc` to block the others.
*
* The `ts-intersink` & `ts-intersrc` elements take advantage of the `threadshare`
* runtime, reducing the number of threads & context switches which would be
* necessary with other forms of inter-pipelines elements.
*
* ## Usage
*
* See document for `ts-intersrc`.
*
* Since: plugins-rs-0.14.0
*/
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::sync::{LazyLock, Mutex};
use crate::runtime::executor::{block_on, block_on_or_add_sub_task};
use crate::runtime::prelude::*;
use crate::runtime::PadSink;
use crate::dataqueue::DataQueueItem;
use crate::inter::{InterContext, InterContextWeak, DEFAULT_INTER_CONTEXT, INTER_CONTEXTS};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"ts-intersink",
gst::DebugColorFlags::empty(),
Some("Thread-sharing inter sink"),
)
});
#[derive(Debug, Clone)]
struct Settings {
inter_context: String,
}
impl Default for Settings {
fn default() -> Self {
Settings {
inter_context: DEFAULT_INTER_CONTEXT.into(),
}
}
}
#[derive(Debug)]
struct InterContextSink {
shared: InterContext,
}
impl InterContextSink {
async fn add(name: String, sinkpad: gst::Pad) -> Option<Self> {
let mut inter_ctxs = INTER_CONTEXTS.lock().await;
let shared = if let Some(shared) = inter_ctxs.get(&name).and_then(InterContextWeak::upgrade)
{
{
let mut shared = shared.write().await;
if shared.sinkpad.is_some() {
gst::error!(CAT, "Attempt to set the InterContext sink more than once");
return None;
}
shared.sinkpad = Some(sinkpad);
}
shared
} else {
let shared = InterContext::new(&name);
shared.write().await.sinkpad = Some(sinkpad);
inter_ctxs.insert(name, shared.downgrade());
shared
};
Some(InterContextSink { shared })
}
}
impl Drop for InterContextSink {
fn drop(&mut self) {
let shared = self.shared.clone();
block_on_or_add_sub_task(async move {
let _ = shared.write().await.sinkpad.take();
});
}
}
#[derive(Clone, Debug)]
struct InterSinkPadHandler;
impl PadSinkHandler for InterSinkPadHandler {
type ElementImpl = InterSink;
async fn sink_chain(
self,
pad: gst::Pad,
elem: super::InterSink,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj = pad, "Handling {buffer:?}");
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
async fn sink_chain_list(
self,
pad: gst::Pad,
elem: super::InterSink,
list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj = pad, "Handling {list:?}");
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
fn sink_event(self, _pad: &gst::Pad, imp: &InterSink, event: gst::Event) -> bool {
let elem = imp.obj().clone();
if event.is_downstream() {
block_on_or_add_sub_task(async move {
let imp = elem.imp();
gst::debug!(
CAT,
imp = imp,
"Handling non-serialized downstream {event:?}"
);
let shared_ctx = imp.shared_ctx();
let shared_ctx = shared_ctx.read().await;
if shared_ctx.sources.is_empty() {
gst::info!(CAT, imp = imp, "No sources to forward {event:?} to",);
} else {
gst::log!(
CAT,
imp = imp,
"Forwarding non-serialized downstream {event:?}"
);
for (_, source) in shared_ctx.sources.iter() {
if !source.send_event(event.clone()) {
gst::warning!(
CAT,
imp = imp,
"Failed to forward {event:?} to {}",
source.name()
);
}
}
}
});
true
} else {
gst::debug!(
CAT,
obj = elem,
"Handling non-serialized upstream {event:?}"
);
imp.sinkpad.gst_pad().push_event(event)
}
}
async fn sink_event_serialized(
self,
pad: gst::Pad,
elem: super::InterSink,
event: gst::Event,
) -> bool {
gst::log!(CAT, obj = pad, "Handling serialized {event:?}");
let imp = elem.imp();
use gst::EventView;
match event.view() {
EventView::Eos(..) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
EventView::FlushStop(..) => imp.start(),
_ => (),
}
gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
}
#[derive(Debug)]
pub struct InterSink {
sinkpad: PadSink,
sink_ctx: Mutex<Option<InterContextSink>>,
upstream_latency: Mutex<Option<gst::ClockTime>>,
settings: Mutex<Settings>,
}
impl InterSink {
fn shared_ctx(&self) -> InterContext {
let local_ctx = self.sink_ctx.lock().unwrap();
local_ctx.as_ref().expect("set in prepare").shared.clone()
}
async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> {
let shared_ctx = self.shared_ctx();
let shared_ctx = shared_ctx.read().await;
for (_, dq) in shared_ctx.dataqueues.iter() {
if dq.push(item.clone()).is_err() {
gst::debug!(CAT, imp = self, "Failed to enqueue item: {item:?}");
}
}
Ok(gst::FlowSuccess::Ok)
}
pub fn latency(&self) -> Option<gst::ClockTime> {
*self.upstream_latency.lock().unwrap()
}
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Preparing");
let obj = self.obj().clone();
let sinkpad = self.sinkpad.gst_pad().clone();
let ctx_name = self.settings.lock().unwrap().inter_context.clone();
block_on(async move {
let sink_ctx = InterContextSink::add(ctx_name, sinkpad).await;
if sink_ctx.is_some() {
let imp = obj.imp();
*imp.sink_ctx.lock().unwrap() = sink_ctx;
gst::debug!(CAT, imp = imp, "Prepared");
Ok(())
} else {
Err(gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to add the Sink to InterContext"]
))
}
})
}
fn unprepare(&self) {
gst::debug!(CAT, imp = self, "Unpreparing");
*self.sink_ctx.lock().unwrap() = None;
gst::debug!(CAT, imp = self, "Unprepared");
}
fn start(&self) {
gst::debug!(CAT, imp = self, "Started");
}
fn stop(&self) {
gst::debug!(CAT, imp = self, "Stopped");
*self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE;
gst::debug!(CAT, imp = self, "Stopped");
}
}
#[glib::object_subclass]
impl ObjectSubclass for InterSink {
const NAME: &'static str = "GstTsInterSink";
type Type = super::InterSink;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
Self {
sinkpad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap()),
InterSinkPadHandler,
),
sink_ctx: Mutex::new(None),
upstream_latency: Mutex::new(gst::ClockTime::NONE),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for InterSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![glib::ParamSpecString::builder("inter-context")
.nick("Inter Context")
.blurb("Context name of the inter elements to share with")
.default_value(Some(DEFAULT_INTER_CONTEXT))
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"inter-context" => {
settings.inter_context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_INTER_CONTEXT.into());
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"inter-context" => settings.inter_context.to_value(),
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(self.sinkpad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SINK);
}
}
impl GstObjectImpl for InterSink {}
impl ElementImpl for InterSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Thread-sharing inter sink",
"Sink/Generic",
"Thread-sharing inter-pipelines sink",
"François Laignel <francois@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn send_event(&self, event: gst::Event) -> bool {
gst::log!(CAT, imp = self, "Got {event:?}");
if let gst::EventView::Latency(lat_evt) = event.view() {
let latency = lat_evt.latency();
*self.upstream_latency.lock().unwrap() = Some(latency);
let obj = self.obj().clone();
let shared_ctx = self.shared_ctx();
let _ = block_on_or_add_sub_task(async move {
let shared_ctx = shared_ctx.read().await;
if shared_ctx.sources.is_empty() {
gst::info!(CAT, obj = obj, "No sources to set upstream latency");
} else {
gst::log!(CAT, obj = obj, "Setting upstream latency {latency}");
for (_, src) in shared_ctx.sources.iter() {
src.imp().set_upstream_latency(latency);
}
}
});
}
self.sinkpad.gst_pad().push_event(event)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp = self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
self.prepare().map_err(|err| {
self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToReady => {
self.stop();
}
gst::StateChange::ReadyToNull => {
self.unprepare();
}
_ => (),
}
let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::ReadyToPaused {
self.start();
}
Ok(success)
}
}

View file

@ -0,0 +1,27 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use gst::glib;
mod imp;
glib::wrapper! {
pub struct InterSink(ObjectSubclass<imp::InterSink>) @extends gst::Element, gst::Object;
}

View file

@ -0,0 +1,941 @@
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-ts-intersrc
* @see_also: ts-intersink, ts-proxysink, ts-proxysrc, intersink, intersrc
*
* Thread-sharing source for inter-pipelines communication.
*
* `ts-intersrc` is an element that proxies events travelling downstream, non-serialized
* queries and buffers (including metas) from another pipeline that contains a matching
* `ts-intersink` element. The purpose is to allow one to many decoupled pipelines
* to function as though they were one without having to manually shuttle buffers,
* events, queries, etc.
*
* This doesn't support dynamically changing `ts-intersink` for now.
*
* The `ts-intersink` & `ts-intersrc` elements take advantage of the `threadshare`
* runtime, reducing the number of threads & context switches which would be
* necessary with other forms of inter-pipelines elements.
*
* ## Usage
*
* |[<!-- language="rust" -->
* use futures::prelude::*;
* use gst::prelude::*;
*
* let g_ctx = gst::glib::MainContext::default();
* gst::init().unwrap();
*
* // An upstream pipeline producing a 1 second Opus encoded audio stream
* let pipe_up = gst::parse::launch(
* "
* audiotestsrc is-live=true num-buffers=50 volume=0.02
* ! opusenc
* ! ts-intersink inter-context=my-inter-ctx
* ",
* )
* .unwrap()
* .downcast::<gst::Pipeline>()
* .unwrap();
*
* // A downstream pipeline which will receive the Opus encoded audio stream
* // and render it locally.
* let pipe_down = gst::parse::launch(
* "
* ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
* ! opusdec
* ! audioconvert
* ! audioresample
* ! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
* ! autoaudiosink
* ",
* )
* .unwrap()
* .downcast::<gst::Pipeline>()
* .unwrap();
*
* // Both pipelines must agree on the timing information or we'll get glitches
* // or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
* // as pipe_down, but since that will be set asynchronously to the audio clock, it
* // is simpler and likely accurate enough to use the system clock for both
* // pipelines. If no element in either pipeline will provide a clock, this
* // is not needed.
* let clock = gst::SystemClock::obtain();
* pipe_up.set_clock(Some(&clock)).unwrap();
* pipe_down.set_clock(Some(&clock)).unwrap();
*
* // This is not really needed in this case since the pipelines are created and
* // started at the same time. However, an application that dynamically
* // generates pipelines must ensure that all the pipelines that will be
* // connected together share the same base time.
* pipe_up.set_base_time(gst::ClockTime::ZERO);
* pipe_up.set_start_time(gst::ClockTime::NONE);
* pipe_down.set_base_time(gst::ClockTime::ZERO);
* pipe_down.set_start_time(gst::ClockTime::NONE);
*
* pipe_up.set_state(gst::State::Playing).unwrap();
* pipe_down.set_state(gst::State::Playing).unwrap();
*
* g_ctx.block_on(async {
* use gst::MessageView::*;
*
* let mut bus_up_stream = pipe_up.bus().unwrap().stream();
* let mut bus_down_stream = pipe_down.bus().unwrap().stream();
*
* loop {
* futures::select! {
* msg = bus_up_stream.next() => {
* let Some(msg) = msg else { continue };
* match msg.view() {
* Latency(_) => {
* let _ = pipe_down.recalculate_latency();
* }
* Error(err) => {
* eprintln!("Error with downstream pipeline {err:?}");
* break;
* }
* _ => (),
* }
* }
* msg = bus_down_stream.next() => {
* let Some(msg) = msg else { continue };
* match msg.view() {
* Latency(_) => {
* let _ = pipe_down.recalculate_latency();
* }
* Eos(_) => {
* println!("Got EoS");
* break;
* }
* Error(err) => {
* eprintln!("Error with downstream pipeline {err:?}");
* break;
* }
* _ => (),
* }
* }
* };
* }
* });
*
* pipe_up.set_state(gst::State::Null).unwrap();
* pipe_down.set_state(gst::State::Null).unwrap();
* ]|
*
* Since: plugins-rs-0.14.0
*/
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::ops::ControlFlow;
use std::sync::{LazyLock, Mutex};
use std::time::Duration;
use crate::runtime::executor::{block_on, block_on_or_add_sub_task};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
use crate::inter::{
InterContext, InterContextWeak, DEFAULT_CONTEXT, DEFAULT_CONTEXT_WAIT, DEFAULT_INTER_CONTEXT,
INTER_CONTEXTS,
};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"ts-intersrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing inter source"),
)
});
/// Initial capacity for the `ts-intersrc` related `Slab`s in the shared `InterContext`.
// TODO Could had a property for users to allocate large `Slab` and avoid incremental re-allocations.
// Let's see how it behaves in the wild like this first.
const DEFAULT_INTER_SRC_CAPACITY: u32 = 16;
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
#[derive(Debug, Clone)]
struct Settings {
max_size_buffers: u32,
max_size_bytes: u32,
max_size_time: gst::ClockTime,
context: String,
context_wait: Duration,
inter_context: String,
inter_src_capacity: usize,
}
impl Default for Settings {
fn default() -> Self {
Settings {
max_size_buffers: DEFAULT_MAX_SIZE_BUFFERS,
max_size_bytes: DEFAULT_MAX_SIZE_BYTES,
max_size_time: DEFAULT_MAX_SIZE_TIME,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
inter_context: DEFAULT_INTER_CONTEXT.into(),
inter_src_capacity: DEFAULT_INTER_SRC_CAPACITY as usize,
}
}
}
#[derive(Debug)]
struct InterContextSrc {
shared: InterContext,
dataqueue_key: usize,
src_key: usize,
}
impl InterContextSrc {
async fn add(
name: String,
capacity: usize,
dataqueue: DataQueue,
src: super::InterSrc,
) -> Self {
let mut inter_ctxs = INTER_CONTEXTS.lock().await;
let shared = if let Some(shared) = inter_ctxs.get(&name).and_then(InterContextWeak::upgrade)
{
shared
} else {
let shared = InterContext::new(&name);
{
let mut shared = shared.write().await;
shared.dataqueues.reserve(capacity);
shared.sources.reserve(capacity);
}
inter_ctxs.insert(name, shared.downgrade());
shared
};
let (dataqueue_key, srcpad_key) = {
let mut shared = shared.write().await;
(
shared.dataqueues.insert(dataqueue),
shared.sources.insert(src),
)
};
InterContextSrc {
shared,
dataqueue_key,
src_key: srcpad_key,
}
}
}
impl Drop for InterContextSrc {
fn drop(&mut self) {
let shared = self.shared.clone();
let dataqueue_key = self.dataqueue_key;
let src_key = self.src_key;
block_on_or_add_sub_task(async move {
let mut shared = shared.write().await;
let _ = shared.dataqueues.remove(dataqueue_key);
let _ = shared.sources.remove(src_key);
});
}
}
#[derive(Clone, Debug)]
struct InterSrcPadHandler;
impl PadSrcHandler for InterSrcPadHandler {
type ElementImpl = InterSrc;
fn src_event(self, pad: &gst::Pad, imp: &InterSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj = pad, "Handling {event:?}");
use gst::EventView::*;
match event.view() {
FlushStart(..) => imp.flush_start().is_ok(),
FlushStop(..) => imp.flush_stop().is_ok(),
_ => true,
}
}
fn src_query(self, pad: &gst::Pad, imp: &InterSrc, query: &mut gst::QueryRef) -> bool {
gst::debug!(CAT, obj = pad, "Handling {query:?}");
use gst::QueryViewMut;
let ret = match query.view_mut() {
QueryViewMut::Latency(q) => {
let (_, q_min, q_max) = q.result();
let Some(upstream_latency) = *imp.upstream_latency.lock().unwrap() else {
gst::debug!(
CAT,
obj = pad,
"Upstream latency not available yet, can't handle {query:?}"
);
return false;
};
let max_time = imp.settings.lock().unwrap().max_size_time;
let max = if max_time > gst::ClockTime::ZERO {
// TODO also use max-size-buffers & CAPS when applicable
Some(q_max.unwrap_or(gst::ClockTime::ZERO) + max_time)
} else {
q_max
};
q.set(true, q_min + upstream_latency, max);
true
}
QueryViewMut::Scheduling(q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes([gst::PadMode::Push]);
true
}
QueryViewMut::Caps(q) => {
let caps = if let Some(ref caps) = pad.current_caps() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
} else {
q.filter()
.map(|f| f.to_owned())
.unwrap_or_else(gst::Caps::new_any)
};
q.set_result(&caps);
true
}
_ => false,
};
if ret {
gst::log!(CAT, obj = pad, "Handled {query:?}");
} else {
gst::log!(CAT, obj = pad, "Didn't handle {query:?}");
}
ret
}
}
#[derive(Debug)]
struct InterSrcTask {
elem: super::InterSrc,
dataqueue: DataQueue,
got_first_item: bool,
}
impl InterSrcTask {
fn new(elem: super::InterSrc, dataqueue: DataQueue) -> Self {
InterSrcTask {
elem,
dataqueue,
got_first_item: false,
}
}
/// Tries to get the upstream latency.
///
/// This is needed when a `ts-intersrc` joins the `inter-context` after
/// the matching `ts-intersink` has notified the upstream latency.
async fn maybe_get_upstream_latency(&self) -> Result<(), gst::FlowError> {
let imp = self.elem.imp();
if imp.upstream_latency.lock().unwrap().is_some() {
return Ok(());
}
gst::log!(CAT, imp = imp, "Getting upstream latency");
let shared_ctx = imp.shared_ctx();
let shared_ctx = shared_ctx.read().await;
let Some(ref sinkpad) = shared_ctx.sinkpad else {
gst::info!(
CAT,
imp = imp,
"sinkpad is gone before we could get latency"
);
return Err(gst::FlowError::Error);
};
let sinkpad_parent = sinkpad.parent().expect("sinkpad should have a parent");
let intersink = sinkpad_parent
.downcast_ref::<crate::inter::sink::InterSink>()
.expect("sinkpad parent should be a ts-intersink");
if let Some(latency) = intersink.imp().latency() {
imp.set_upstream_latency_priv(latency);
} else {
gst::log!(CAT, imp = imp, "Upstream latency is still unknown");
}
Ok(())
}
async fn push_item(&mut self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let imp = self.elem.imp();
if !self.got_first_item {
let shared_ctx = imp.shared_ctx();
let shared_ctx = shared_ctx.read().await;
let Some(ref sinkpad) = shared_ctx.sinkpad else {
gst::info!(
CAT,
imp = imp,
"sinkpad is gone before we could handle first item"
);
return Err(gst::FlowError::Error);
};
let mut evts = Vec::new();
sinkpad.sticky_events_foreach(|evt| {
evts.push(evt.copy());
ControlFlow::Continue(gst::EventForeachAction::Keep)
});
for evt in evts {
if imp.srcpad.push_event(evt.copy()).await {
gst::log!(CAT, imp = imp, "Pushed sticky event {evt:?}");
} else {
gst::error!(CAT, imp = imp, "Failed to push sticky event {evt:?}");
return Err(gst::FlowError::Error);
}
}
self.got_first_item = true;
}
use DataQueueItem::*;
match item {
Buffer(buffer) => {
self.maybe_get_upstream_latency().await?;
gst::log!(CAT, obj = self.elem, "Forwarding {buffer:?}");
imp.srcpad.push(buffer).await.map(drop)
}
BufferList(list) => {
self.maybe_get_upstream_latency().await?;
gst::log!(CAT, obj = self.elem, "Forwarding {list:?}");
imp.srcpad.push_list(list).await.map(drop)
}
Event(event) => {
gst::log!(CAT, obj = self.elem, "Forwarding {event:?}");
let is_eos = event.type_() == gst::EventType::Eos;
imp.srcpad.push_event(event).await;
if is_eos {
return Err(gst::FlowError::Eos);
}
Ok(())
}
}
}
}
impl TaskImpl for InterSrcTask {
type Item = DataQueueItem;
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Starting task");
self.dataqueue.start();
gst::log!(CAT, obj = self.elem, "Task started");
Ok(())
}
async fn try_next(&mut self) -> Result<DataQueueItem, gst::FlowError> {
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
async fn handle_item(&mut self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let res = self.push_item(item).await;
match res {
Ok(()) => {
gst::log!(CAT, obj = self.elem, "Successfully pushed item");
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj = self.elem, "Flushing");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj = self.elem, "EOS");
}
Err(err) => {
gst::error!(CAT, obj = self.elem, "Got error {err}");
gst::element_error!(
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {err}"]
);
}
}
res
}
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Stopping task");
self.dataqueue.stop();
self.dataqueue.clear();
self.got_first_item = false;
gst::log!(CAT, obj = self.elem, "Task stopped");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Starting task flush");
self.dataqueue.clear();
self.got_first_item = false;
gst::log!(CAT, obj = self.elem, "Task flush started");
Ok(())
}
}
#[derive(Debug)]
pub struct InterSrc {
srcpad: PadSrc,
task: Task,
src_ctx: Mutex<Option<InterContextSrc>>,
ts_ctx: Mutex<Option<Context>>,
dataqueue: Mutex<Option<DataQueue>>,
upstream_latency: Mutex<Option<gst::ClockTime>>,
settings: Mutex<Settings>,
}
impl InterSrc {
fn shared_ctx(&self) -> InterContext {
let local_ctx = self.src_ctx.lock().unwrap();
local_ctx.as_ref().expect("set in prepare").shared.clone()
}
// Sets the upstream latency without blocking the caller.
pub fn set_upstream_latency(&self, up_latency: gst::ClockTime) {
if let Some(ref ts_ctx) = *self.ts_ctx.lock().unwrap() {
let obj = self.obj().clone();
gst::log!(CAT, imp = self, "Setting upstream latency async");
ts_ctx.spawn(async move {
obj.imp().set_upstream_latency_priv(up_latency);
});
} else {
gst::debug!(CAT, imp = self, "Not ready to handle upstream latency");
}
}
// Sets the upstream latency blocking the caller until it's handled.
fn set_upstream_latency_priv(&self, up_latency: gst::ClockTime) {
let new_latency = up_latency
+ gst::ClockTime::from_mseconds(
self.settings.lock().unwrap().context_wait.as_millis() as u64
);
{
let mut upstream_latency = self.upstream_latency.lock().unwrap();
if let Some(upstream_latency) = *upstream_latency {
if upstream_latency == new_latency {
return;
}
}
*upstream_latency = Some(new_latency);
}
gst::debug!(
CAT,
imp = self,
"Got new upstream latency {up_latency} => will report {new_latency}"
);
self.post_message(gst::message::Latency::builder().src(&*self.obj()).build());
}
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Preparing");
let settings = self.settings.lock().unwrap().clone();
let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {err}"]
)
})?;
let dataqueue = DataQueue::new(
&self.obj().clone().upcast(),
self.srcpad.gst_pad(),
if settings.max_size_buffers == 0 {
None
} else {
Some(settings.max_size_buffers)
},
if settings.max_size_bytes == 0 {
None
} else {
Some(settings.max_size_bytes)
},
if settings.max_size_time.is_zero() {
None
} else {
Some(settings.max_size_time)
},
);
let obj = self.obj().clone();
let (ctx_name, inter_src_capacity) = {
let settings = self.settings.lock().unwrap();
(settings.inter_context.clone(), settings.inter_src_capacity)
};
block_on(async move {
let imp = obj.imp();
let src_ctx =
InterContextSrc::add(ctx_name, inter_src_capacity, dataqueue.clone(), obj.clone())
.await;
*imp.src_ctx.lock().unwrap() = Some(src_ctx);
*imp.ts_ctx.lock().unwrap() = Some(ts_ctx.clone());
*imp.dataqueue.lock().unwrap() = Some(dataqueue.clone());
if imp
.task
.prepare(InterSrcTask::new(obj.clone(), dataqueue), ts_ctx)
.await
.is_ok()
{
gst::debug!(CAT, imp = imp, "Prepared");
Ok(())
} else {
Err(gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to start Task"]
))
}
})
}
fn unprepare(&self) {
gst::debug!(CAT, imp = self, "Unpreparing");
self.task.unprepare().block_on().unwrap();
*self.dataqueue.lock().unwrap() = None;
*self.src_ctx.lock().unwrap() = None;
*self.ts_ctx.lock().unwrap() = None;
gst::debug!(CAT, imp = self, "Unprepared");
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Stopping");
self.task.stop().await_maybe_on_context()?;
*self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE;
gst::debug!(CAT, imp = self, "Stopped");
Ok(())
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Starting");
self.task.start().await_maybe_on_context()?;
gst::debug!(CAT, imp = self, "Started");
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().await_maybe_on_context()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
fn flush_start(&self) -> Result<(), gst::FlowError> {
gst::debug!(CAT, imp = self, "Flushing");
let res = self.task.flush_start().await_maybe_on_context();
if let Err(err) = res {
gst::error!(CAT, imp = self, "FlushStart failed {err:?}");
gst::element_imp_error!(
self,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStart failed {err:?}"]
);
return Err(gst::FlowError::Error);
}
Ok(())
}
fn flush_stop(&self) -> Result<(), gst::FlowError> {
gst::debug!(CAT, imp = self, "Stopping flush");
let res = self.task.flush_stop().await_maybe_on_context();
if let Err(err) = res {
gst::error!(CAT, imp = self, "FlushStop failed {err:?}");
gst::element_imp_error!(
self,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {err:?}"]
);
return Err(gst::FlowError::Error);
}
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for InterSrc {
const NAME: &'static str = "GstTsInterSrc";
type Type = super::InterSrc;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
Self {
srcpad: PadSrc::new(
gst::Pad::from_template(&klass.pad_template("src").unwrap()),
InterSrcPadHandler,
),
task: Task::default(),
src_ctx: Mutex::new(None),
ts_ctx: Mutex::new(None),
dataqueue: Mutex::new(None),
upstream_latency: Mutex::new(gst::ClockTime::NONE),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for InterSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("context")
.nick("Context")
.blurb("Context name to share threads with")
.default_value(Some(DEFAULT_CONTEXT))
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
glib::ParamSpecUInt::builder("context-wait")
.nick("Context Wait")
.blurb("Throttle poll loop to run at most once every this many ms")
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
glib::ParamSpecString::builder("inter-context")
.nick("Inter Context")
.blurb("Context name of the inter elements to share with")
.default_value(Some(DEFAULT_INTER_CONTEXT))
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
glib::ParamSpecUInt::builder("max-size-buffers")
.nick("Max Size Buffers")
.blurb("Maximum number of buffers to queue (0=unlimited)")
.default_value(DEFAULT_MAX_SIZE_BUFFERS)
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
glib::ParamSpecUInt::builder("max-size-bytes")
.nick("Max Size Bytes")
.blurb("Maximum number of bytes to queue (0=unlimited)")
.default_value(DEFAULT_MAX_SIZE_BYTES)
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
glib::ParamSpecUInt64::builder("max-size-time")
.nick("Max Size Time")
.blurb("Maximum number of nanoseconds to queue (0=unlimited)")
.maximum(u64::MAX - 1)
.default_value(DEFAULT_MAX_SIZE_TIME.nseconds())
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => {
settings.max_size_buffers = value.get().expect("type checked upstream");
}
"max-size-bytes" => {
settings.max_size_bytes = value.get().expect("type checked upstream");
}
"max-size-time" => {
settings.max_size_time = value.get::<u64>().unwrap().nseconds();
}
"context" => {
settings.context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
"context-wait" => {
settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"inter-context" => {
settings.inter_context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_INTER_CONTEXT.into());
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.nseconds().to_value(),
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"inter-context" => settings.inter_context.to_value(),
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(self.srcpad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl GstObjectImpl for InterSrc {}
impl ElementImpl for InterSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Thread-sharing inter source",
"Source/Generic",
"Thread-sharing inter-pipelines source",
"François Laignel <francois@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn send_event(&self, event: gst::Event) -> bool {
gst::log!(CAT, imp = self, "Handling {event:?}");
if let Some(ref ts_ctx) = *self.ts_ctx.lock().unwrap() {
gst::log!(CAT, imp = self, "Handling {event:?}");
let obj = self.obj().clone();
ts_ctx.spawn(async move {
let imp = obj.imp();
if let gst::EventView::FlushStart(_) = event.view() {
let _ = obj.imp().flush_start();
}
imp.srcpad.gst_pad().push_event(event)
});
true
} else {
gst::info!(CAT, imp = self, "Not ready to handle {event:?}");
false
}
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp = self, "Changing state {transition:?}");
match transition {
gst::StateChange::NullToReady => {
self.prepare().map_err(|err| {
self.post_error_message(err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
}
_ => (),
}
let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
Ok(success)
}
}

View file

@ -0,0 +1,27 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use gst::glib;
mod imp;
glib::wrapper! {
pub struct InterSrc(ObjectSubclass<imp::InterSrc>) @extends gst::Element, gst::Object;
}

View file

@ -18,6 +18,7 @@ mod appsrc;
mod audiotestsrc;
pub mod dataqueue;
mod inputselector;
mod inter;
mod jitterbuffer;
mod proxy;
mod queue;
@ -34,6 +35,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
appsrc::register(plugin)?;
audiotestsrc::register(plugin)?;
inputselector::register(plugin)?;
inter::register(plugin)?;
jitterbuffer::register(plugin)?;
proxy::register(plugin)?;
queue::register(plugin)?;

View file

@ -901,7 +901,7 @@ impl PadSink {
gst::fixme!(
RUNTIME_CAT,
obj = gst_pad,
"Serialized Query not supported"
"Serialized Query not supported {query:?}"
);
false
}

View file

@ -0,0 +1,473 @@
// Copyright (C) 2025 François Laignel <francois@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::channel::oneshot;
use futures::prelude::*;
use gst::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstthreadshare::plugin_register_static().expect("gstthreadshare inter test");
});
}
#[test]
fn one_to_one_down_first() {
init();
let pipe_up = gst::Pipeline::with_name("upstream::one_to_one_down_first");
let audiotestsrc = gst::ElementFactory::make("audiotestsrc")
.name("testsrc::one_to_one_down_first")
.property("num-buffers", 20i32)
.property("is-live", true)
.build()
.unwrap();
let intersink = gst::ElementFactory::make("ts-intersink")
.name("intersink::one_to_one_down_first")
.property("inter-context", "inter::one_to_one_down_first")
.build()
.unwrap();
let pipe_down = gst::Pipeline::with_name("downstream::one_to_one_down_first");
let intersrc = gst::ElementFactory::make("ts-intersrc")
.name("intersrc::one_to_one_down_first")
.property("inter-context", "inter::one_to_one_down_first")
.property("context", "inter::test")
.property("context-wait", 20u32)
.build()
.unwrap();
let appsink = gst_app::AppSink::builder()
.name("appsink::one_to_one_down_first")
.build();
let upstream_elems = [&audiotestsrc, &intersink];
pipe_up.add_many(upstream_elems).unwrap();
gst::Element::link_many(upstream_elems).unwrap();
pipe_down
.add_many([&intersrc, appsink.upcast_ref()])
.unwrap();
intersrc.link(&appsink).unwrap();
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let samples = Arc::new(AtomicU32::new(0));
let (eos_tx, mut eos_rx) = oneshot::channel::<()>();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let samples = samples.clone();
move |appsink| {
let _ = appsink.pull_sample().unwrap();
samples.fetch_add(1, Ordering::SeqCst);
Ok(gst::FlowSuccess::Ok)
}
})
.eos({
let mut eos_tx = Some(eos_tx);
move |_| eos_tx.take().unwrap().send(()).unwrap()
})
.build(),
);
// Starting downstream first => we will get all buffers
pipe_down.set_state(gst::State::Playing).unwrap();
pipe_up.set_state(gst::State::Playing).unwrap();
let mut got_eos_evt = false;
let mut got_eos_msg = false;
futures::executor::block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
futures::select! {
_ = eos_rx => {
println!("inter::one_to_one_down_first got eos notif");
got_eos_evt = true;
if got_eos_msg {
break;
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Eos(_) => {
println!("inter::one_to_one_down_first got eos msg");
got_eos_msg = true;
if got_eos_evt {
break;
}
}
Error(err) => unreachable!("inter::one_to_one_down_first {err:?}"),
_ => (),
}
}
msg = bus_up_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_one_down_first {err:?}"),
_ => (),
}
}
};
}
});
assert_eq!(samples.load(Ordering::SeqCst), 20);
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
}
#[test]
fn one_to_one_up_first() {
init();
let pipe_up = gst::Pipeline::with_name("upstream::one_to_one_up_first");
let audiotestsrc = gst::ElementFactory::make("audiotestsrc")
.name("testsrc::one_to_one_up_first")
.property("is-live", true)
.build()
.unwrap();
let opusenc = gst::ElementFactory::make("opusenc")
.name("opusenc::one_to_one_down_first")
.build()
.unwrap();
let intersink = gst::ElementFactory::make("ts-intersink")
.name("intersink::one_to_one_up_first")
.property("inter-context", "inter::one_to_one_up_first")
.build()
.unwrap();
let pipe_down = gst::Pipeline::with_name("downstream::one_to_one_up_first");
let intersrc = gst::ElementFactory::make("ts-intersrc")
.name("intersrc::one_to_one_up_first")
.property("inter-context", "inter::one_to_one_up_first")
.property("context", "inter::one_to_one_up_first")
.property("context-wait", 20u32)
.build()
.unwrap();
let appsink = gst_app::AppSink::builder()
.name("appsink::one_to_one_up_first")
.build();
let upstream_elems = [&audiotestsrc, &opusenc, &intersink];
pipe_up.add_many(upstream_elems).unwrap();
gst::Element::link_many(upstream_elems).unwrap();
let downstream_elems = [&intersrc, appsink.upcast_ref()];
pipe_down.add_many(downstream_elems).unwrap();
gst::Element::link_many(downstream_elems).unwrap();
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let (eos_tx, mut eos_rx) = oneshot::channel::<()>();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let mut samples = 0;
let mut eos_tx = Some(eos_tx);
move |appsink| {
let _ = appsink.pull_sample().unwrap();
samples += 1;
if samples == 100 {
eos_tx.take().unwrap().send(()).unwrap();
return Err(gst::FlowError::Eos);
}
Ok(gst::FlowSuccess::Ok)
}
})
.build(),
);
// Starting upstream first
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
futures::executor::block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
futures::select! {
_ = eos_rx => {
println!("inter::one_to_one_up_first got eos notif");
break;
}
msg = bus_up_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_one_up_first {err:?}"),
_ => (),
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_one_up_first {err:?}"),
_ => (),
}
}
};
}
});
let mut q = gst::query::Latency::new();
assert!(pipe_up.query(&mut q));
// Upstream latency: testsrc (20ms) + opusenc (20ms)
let up_latency = q.result().1;
assert!(
up_latency >= 40.mseconds(),
"unexpected upstream latency {up_latency}"
);
let mut q = gst::query::Latency::new();
assert!(pipe_down.query(&mut q));
// Downstream latency: upstream latency + intersrc (context-wait 20ms) + appsink
let down_latency = q.result().1;
assert!(
down_latency > up_latency + 20.mseconds(),
"unexpected downstream latency {down_latency}"
);
pipe_down.set_state(gst::State::Null).unwrap();
pipe_up.set_state(gst::State::Null).unwrap();
}
#[test]
fn one_to_many_up_first() {
use gstthreadshare::runtime::executor as ts_executor;
init();
fn build_pipe_down(
i: u32,
num_buf: impl Into<Option<u32>>,
) -> (
gst::Pipeline,
Arc<AtomicU32>,
Option<futures::channel::oneshot::Receiver<()>>,
) {
let num_buf = num_buf.into();
let pipe_down =
gst::Pipeline::with_name(&format!("downstream-{i:02}::one_to_many_up_first"));
let intersrc = gst::ElementFactory::make("ts-intersrc")
.name(format!("intersrc-{i:02}::one_to_many_up_first"))
.property("inter-context", "inter::one_to_many_up_first")
.property("context", "inter::one_to_many_up_first")
.property("context-wait", 20u32)
.build()
.unwrap();
let appsink = gst_app::AppSink::builder()
.name(format!("appsink-{i:02}::one_to_many_up_first"))
.sync(false)
.build();
pipe_down
.add_many([&intersrc, appsink.upcast_ref()])
.unwrap();
intersrc.link(&appsink).unwrap();
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let samples = Arc::new(AtomicU32::new(0));
let (mut eos_tx, eos_rx) = if num_buf.is_some() {
let (eos_tx, eos_rx) = oneshot::channel::<()>();
(Some(eos_tx), Some(eos_rx))
} else {
(None, None)
};
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let samples = samples.clone();
move |appsink| {
let _ = appsink.pull_sample().unwrap();
let cur = samples.fetch_add(1, Ordering::SeqCst);
if let Some(num_buf) = num_buf {
if cur + 1 == num_buf {
eos_tx.take().unwrap().send(()).unwrap();
return Err(gst::FlowError::Eos);
}
}
Ok(gst::FlowSuccess::Ok)
}
})
.build(),
);
(pipe_down, samples, eos_rx)
}
let pipe_up = gst::Pipeline::with_name("upstream::one_to_many_up_first");
let audiotestsrc = gst::ElementFactory::make("audiotestsrc")
.name("testsrc::one_to_many_up_first")
.property("is-live", true)
.build()
.unwrap();
let intersink = gst::ElementFactory::make("ts-intersink")
.name("intersink::one_to_many_up_first")
.property("inter-context", "inter::one_to_many_up_first")
.build()
.unwrap();
let upstream_elems = [&audiotestsrc, &intersink];
pipe_up.add_many(upstream_elems).unwrap();
gst::Element::link_many(upstream_elems).unwrap();
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
// Starting upstream first
pipe_up.set_state(gst::State::Playing).unwrap();
let (pipe_down_1, samples_1, eos_rx_1) = build_pipe_down(1, 20);
let mut eos_rx_1 = eos_rx_1.unwrap();
pipe_down_1.set_state(gst::State::Playing).unwrap();
let (pipe_down_2, samples_2, eos_rx_2) = build_pipe_down(2, 20);
let eos_rx_2 = eos_rx_2.unwrap();
pipe_down_2.set_state(gst::State::Playing).unwrap();
futures::executor::block_on(async {
use gst::MessageView::*;
let mut bus_down_stream_1 = pipe_down_1.bus().unwrap().stream();
let mut bus_down_stream_2 = pipe_down_2.bus().unwrap().stream();
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
loop {
futures::select! {
_ = eos_rx_1 => {
println!("inter::one_to_many_up_first got eos notif");
break;
}
msg_down_1 = bus_down_stream_1.next() => {
let Some(msg) = msg_down_1 else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down_1.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_many_up_first {err:?}"),
_ => (),
}
}
msg_down_2 = bus_down_stream_2.next() => {
let Some(msg) = msg_down_2 else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down_2.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_many_up_first {err:?}"),
_ => (),
}
}
msg_up = bus_up_stream.next() => {
let Some(msg) = msg_up else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up.recalculate_latency();
}
Error(err) => unreachable!("inter::one_to_many_up_first {err:?}"),
_ => (),
}
}
}
}
});
pipe_down_1.set_state(gst::State::Null).unwrap();
// pipe_down_1 was set to stop after 20 buffers
assert_eq!(samples_1.load(Ordering::SeqCst), 20);
// Waiting for pipe_down_2 to handle its buffers too
futures::executor::block_on(eos_rx_2).unwrap();
pipe_down_2.set_state(gst::State::Null).unwrap();
assert_eq!(samples_2.load(Ordering::SeqCst), 20);
pipe_up.set_state(gst::State::Null).unwrap();
println!("up null");
let (pipe_down_3, samples_3, _) = build_pipe_down(3, None);
pipe_down_3.set_state(gst::State::Playing).unwrap();
let bus_down_3 = pipe_down_3.bus().unwrap();
ts_executor::block_on(async move {
let mut bus_down_stream_3 = bus_down_3.stream();
let mut timer = ts_executor::timer::delay_for(Duration::from_millis(200)).fuse();
loop {
futures::select! {
_ = timer => {
break;
}
msg_down_3 = bus_down_stream_3.next() => {
let Some(msg) = msg_down_3 else { continue };
if let gst::MessageView::Error(err) = msg.view() {
unreachable!("inter::one_to_many_up_first {err:?}");
}
}
}
}
});
println!("down3 to null");
pipe_down_3.set_state(gst::State::Null).unwrap();
// pipe_down_3 was set to Playing after pipe_up was shutdown
assert!(samples_3.load(Ordering::SeqCst) == 0);
}