ts-intersrc: handle dynamic inter-ctx changes

Users can now update the `inter-context` property of a `ts-intersrc`, which will
disconnect from the `ts-intersink` associated with previous value of
`inter-context` and attempt to connect to the `ts-intersink` associated with the
new value.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2378>
This commit is contained in:
François Laignel 2025-07-16 00:42:59 +02:00 committed by GStreamer Marge Bot
parent 858aee39b5
commit 50c6e42b58
4 changed files with 391 additions and 45 deletions

View file

@ -16622,7 +16622,7 @@
"blurb": "Context name of the inter elements to share with",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"construct-only": false,
"controllable": false,
"default": "",
"mutable": "null",

View file

@ -39,7 +39,7 @@ getifaddrs = "0.2"
[dev-dependencies]
gst-check.workspace = true
gst-app.workspace = true
gst-app = { workspace = true, features = [ "v1_20" ] }
# Used by examples
clap = { version = "4", features = ["derive"] }
flume = "0.11"

View file

@ -18,8 +18,6 @@
* to function as though they were one without having to manually shuttle buffers,
* events, queries, etc.
*
* This doesn't support dynamically changing `ts-intersink` for now.
*
* The `ts-intersink` & `ts-intersrc` elements take advantage of the `threadshare`
* runtime, reducing the number of threads & context switches which would be
* necessary with other forms of inter-pipelines elements.
@ -534,6 +532,123 @@ impl InterSrc {
local_ctx.as_ref().expect("set in prepare").shared.clone()
}
fn join_inter_ctx_blocking(
&self,
inter_ctx_name: &str,
ts_ctx: Context,
dataqueue: DataQueue,
) -> Result<(), gst::ErrorMessage> {
let elem = self.obj();
let inter_src_capacity = self.settings.lock().unwrap().inter_src_capacity;
let src_ctx = block_on(InterContextSrc::add(
inter_ctx_name.to_string(),
inter_src_capacity,
dataqueue.clone(),
elem.clone(),
));
if self
.task
.prepare(
InterSrcTask::new(elem.clone(), dataqueue.clone()),
ts_ctx.clone(),
)
.block_on()
.is_err()
{
return Err(gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to start Task"]
));
}
*self.src_ctx.lock().unwrap() = Some(src_ctx);
Ok(())
}
fn change_inter_ctx_blocking(&self, target_inter_ctx: &str) -> Result<(), gst::ErrorMessage> {
if self.obj().current_state() <= gst::State::Null {
// Element not prepared yet => nothing to change
return Ok(());
}
let seqnum = gst::Seqnum::next();
if self.obj().current_state() != gst::State::Ready {
if let Err(err) = self.task.stop().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to stop current Task: {err}"]
));
}
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build());
}
if let Err(err) = self.task.unprepare().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to unprepare current Task: {err}"]
));
}
// Remove the InterContextSrc from the InterContext
drop(self.src_ctx.lock().unwrap().take());
*self.upstream_latency.lock().unwrap() = None;
let dataqueue = self
.dataqueue
.lock()
.unwrap()
.clone()
.expect("set in prepare");
let ts_ctx = self.ts_ctx.lock().unwrap().clone().expect("set in prepare");
gst::info!(
CAT,
imp = self,
"Joining new inter-context {target_inter_ctx}"
);
self.join_inter_ctx_blocking(target_inter_ctx, ts_ctx, dataqueue)?;
match self.obj().current_state() {
gst::State::Paused => {
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build());
if let Err(err) = self.task.pause().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to set new Task in Pause: {err}"]
));
}
}
gst::State::Playing => {
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build());
if let Err(err) = self.task.start().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to start new Task: {err}"]
));
}
}
_ => (),
}
Ok(())
}
// Sets the upstream latency without blocking the caller.
pub fn set_upstream_latency(&self, up_latency: gst::ClockTime) {
if let Some(ref ts_ctx) = *self.ts_ctx.lock().unwrap() {
@ -585,6 +700,7 @@ impl InterSrc {
["Failed to acquire Context: {err}"]
)
})?;
*self.ts_ctx.lock().unwrap() = Some(ts_ctx.clone());
let dataqueue = DataQueue::new(
&self.obj().clone().upcast(),
@ -605,38 +721,13 @@ impl InterSrc {
Some(settings.max_size_time)
},
);
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
let obj = self.obj().clone();
let (ctx_name, inter_src_capacity) = {
let settings = self.settings.lock().unwrap();
(settings.inter_context.clone(), settings.inter_src_capacity)
};
let inter_ctx_name = self.settings.lock().unwrap().inter_context.to_string();
self.join_inter_ctx_blocking(&inter_ctx_name, ts_ctx, dataqueue)?;
block_on(async move {
let imp = obj.imp();
let src_ctx =
InterContextSrc::add(ctx_name, inter_src_capacity, dataqueue.clone(), obj.clone())
.await;
*imp.src_ctx.lock().unwrap() = Some(src_ctx);
*imp.ts_ctx.lock().unwrap() = Some(ts_ctx.clone());
*imp.dataqueue.lock().unwrap() = Some(dataqueue.clone());
if imp
.task
.prepare(InterSrcTask::new(obj.clone(), dataqueue), ts_ctx)
.await
.is_ok()
{
gst::debug!(CAT, imp = imp, "Prepared");
gst::debug!(CAT, imp = self, "Prepared");
Ok(())
} else {
Err(gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to start Task"]
))
}
})
}
fn unprepare(&self) {
@ -760,7 +851,6 @@ impl ObjectImpl for InterSrc {
.blurb("Context name of the inter elements to share with")
.default_value(Some(DEFAULT_INTER_CONTEXT))
.readwrite()
.construct_only()
.build(),
glib::ParamSpecUInt::builder("max-size-buffers")
.nick("Max Size Buffers")
@ -791,33 +881,45 @@ impl ObjectImpl for InterSrc {
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"max-size-buffers" => {
settings.max_size_buffers = value.get().expect("type checked upstream");
self.settings.lock().unwrap().max_size_buffers =
value.get().expect("type checked upstream");
}
"max-size-bytes" => {
settings.max_size_bytes = value.get().expect("type checked upstream");
self.settings.lock().unwrap().max_size_bytes =
value.get().expect("type checked upstream");
}
"max-size-time" => {
settings.max_size_time = value.get::<u64>().unwrap().nseconds();
self.settings.lock().unwrap().max_size_time =
value.get::<u64>().unwrap().nseconds();
}
"context" => {
settings.context = value
self.settings.lock().unwrap().context = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| "".into());
}
"context-wait" => {
settings.context_wait = Duration::from_millis(
self.settings.lock().unwrap().context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"inter-context" => {
settings.inter_context = value
let target_inter_ctx = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_INTER_CONTEXT.into());
if target_inter_ctx == self.settings.lock().unwrap().inter_context {
return;
}
if let Err(err) = self.change_inter_ctx_blocking(&target_inter_ctx) {
gst::error!(CAT, imp = self, "Failed to change inter-context: {err}");
} else {
self.settings.lock().unwrap().inter_context = target_inter_ctx;
}
}
_ => unimplemented!(),
}

View file

@ -17,12 +17,14 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use gst::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
};
use std::time::Duration;
fn init() {
@ -471,3 +473,245 @@ fn one_to_many_up_first() {
// pipe_down_3 was set to Playing after pipe_up was shutdown
assert!(samples_3.load(Ordering::SeqCst) == 0);
}
#[test]
fn changing_inter_ctx() {
init();
let pipe_up1 = gst::Pipeline::with_name("upstream1::changing_inter_ctx");
let src1 = gst::ElementFactory::make("audiotestsrc")
.name("testsrc1::changing_inter_ctx")
.property("is-live", true)
.build()
.unwrap();
let capsfilter1 = gst::ElementFactory::make("capsfilter")
.name("capsfilter1::one_to_one_down_first")
.property(
"caps",
gst::Caps::builder("audio/x-raw")
.field("channels", 1i32)
.build(),
)
.build()
.unwrap();
let intersink1 = gst::ElementFactory::make("ts-intersink")
.name("intersink1::changing_inter_ctx")
.property("inter-context", "inter1::changing_inter_ctx")
.build()
.unwrap();
let upstream1_elems = [&src1, &capsfilter1, &intersink1];
pipe_up1.add_many(upstream1_elems).unwrap();
gst::Element::link_many(upstream1_elems).unwrap();
let pipe_up2 = gst::Pipeline::with_name("upstream2::changing_inter_ctx");
let src2 = gst::ElementFactory::make("audiotestsrc")
.name("testsrc2::changing_inter_ctx")
.property("is-live", true)
.build()
.unwrap();
let capsfilter2 = gst::ElementFactory::make("capsfilter")
.name("capsfilter2::one_to_one_down_first")
.property(
"caps",
gst::Caps::builder("audio/x-raw")
.field("channels", 2i32)
.build(),
)
.build()
.unwrap();
let intersink2 = gst::ElementFactory::make("ts-intersink")
.name("intersink2::changing_inter_ctx")
.property("inter-context", "inter2::changing_inter_ctx")
.build()
.unwrap();
let upstream2_elems = [&src2, &capsfilter2, &intersink2];
pipe_up2.add_many(upstream2_elems).unwrap();
gst::Element::link_many(upstream2_elems).unwrap();
let pipe_down = gst::Pipeline::with_name("downstream::changing_inter_ctx");
let intersrc = gst::ElementFactory::make("ts-intersrc")
.name("intersrc::changing_inter_ctx")
.property("context", "inter::changing_inter_ctx")
.property("context-wait", 20u32)
.build()
.unwrap();
let appsink = gst_app::AppSink::builder()
.name("appsink::changing_inter_ctx")
.build();
let downstream_elems = [&intersrc, appsink.upcast_ref()];
pipe_down.add_many(downstream_elems).unwrap();
gst::Element::link_many(downstream_elems).unwrap();
pipe_up1.set_base_time(gst::ClockTime::ZERO);
pipe_up1.set_start_time(gst::ClockTime::NONE);
pipe_up2.set_base_time(gst::ClockTime::ZERO);
pipe_up2.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let (mut caps_tx, mut caps_rx) = mpsc::channel(1);
let (mut n_buffers_tx, mut n_buffers_rx) = mpsc::channel(1);
let starting = Arc::new(AtomicBool::new(true));
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let starting = starting.clone();
let mut cur_caps = None;
let mut samples = 0;
let mut start_count = 0;
move |appsink| {
if starting.fetch_and(false, Ordering::SeqCst) {
cur_caps = None;
samples = 0;
start_count += 1;
}
let sample = appsink.pull_sample().unwrap();
if let Some(caps) = sample.caps() {
if cur_caps.as_ref().is_none() {
cur_caps = Some(caps.to_owned());
caps_tx.try_send(caps.to_owned()).unwrap();
}
}
samples += 1;
if samples == 10 {
n_buffers_tx.try_send(()).unwrap();
if start_count == 2 {
return Err(gst::FlowError::Eos);
}
}
Ok(gst::FlowSuccess::Ok)
}
})
.new_event(move |appsink| {
let obj = appsink.pull_object().unwrap();
if let Some(event) = obj.downcast_ref::<gst::Event>() {
if let gst::EventView::FlushStop(_) = event.view() {
println!("inter::changing_inter_ctx: appsink got FlushStop");
starting.store(true, Ordering::SeqCst);
}
}
// let basesink handle the event
false
})
.build(),
);
// Starting upstream first
pipe_up1.set_state(gst::State::Playing).unwrap();
pipe_up2.set_state(gst::State::Playing).unwrap();
// Connect downstream to pipe_up1 initially
intersrc.set_property("inter-context", "inter1::changing_inter_ctx");
pipe_down.set_state(gst::State::Playing).unwrap();
let mut bus_up1_stream = pipe_up1.bus().unwrap().stream();
let mut bus_up2_stream = pipe_up1.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
futures::executor::block_on(async {
use gst::MessageView::*;
loop {
futures::select! {
caps = caps_rx.next() => {
println!("inter::changing_inter_ctx: caps 1: {caps:?}");
if let Some(caps) = caps {
let s = caps.structure(0).unwrap();
assert_eq!(s.get::<i32>("channels").unwrap(), 1);
}
}
_ = n_buffers_rx.next() => {
println!("inter::changing_inter_ctx: got n buffers 1");
break;
}
msg = bus_up1_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up1.recalculate_latency();
}
Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"),
_ => (),
}
}
msg = bus_up2_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up2.recalculate_latency();
}
Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"),
_ => (),
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"),
_ => (),
}
}
};
}
});
println!("inter::changing_inter_ctx: changing now");
intersrc.set_property("inter-context", "inter2::changing_inter_ctx");
futures::executor::block_on(async {
use gst::MessageView::*;
loop {
println!("changing_inter_ctx: iter 2");
futures::select! {
caps = caps_rx.next() => {
println!("inter::changing_inter_ctx: caps 2: {caps:?}");
if let Some(caps) = caps {
let s = caps.structure(0).unwrap();
assert_eq!(s.get::<i32>("channels").unwrap(), 2);
}
}
_ = n_buffers_rx.next() => {
println!("inter::changing_inter_ctx: got n buffers 2");
break;
}
msg = bus_up2_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_up2.recalculate_latency();
}
Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"),
_ => (),
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"),
_ => (),
}
}
};
}
});
println!("inter::changing_inter_ctx: stopping");
pipe_down.set_state(gst::State::Null).unwrap();
pipe_up2.set_state(gst::State::Null).unwrap();
pipe_up1.set_state(gst::State::Null).unwrap();
}