diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 56f3e6cb3..5dc3eec26 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -16622,7 +16622,7 @@ "blurb": "Context name of the inter elements to share with", "conditionally-available": false, "construct": false, - "construct-only": true, + "construct-only": false, "controllable": false, "default": "", "mutable": "null", diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index c17490fe1..a61bb1ebc 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -39,7 +39,7 @@ getifaddrs = "0.2" [dev-dependencies] gst-check.workspace = true -gst-app.workspace = true +gst-app = { workspace = true, features = [ "v1_20" ] } # Used by examples clap = { version = "4", features = ["derive"] } flume = "0.11" diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 675e45895..120d3afed 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -18,8 +18,6 @@ * to function as though they were one without having to manually shuttle buffers, * events, queries, etc. * - * This doesn't support dynamically changing `ts-intersink` for now. - * * The `ts-intersink` & `ts-intersrc` elements take advantage of the `threadshare` * runtime, reducing the number of threads & context switches which would be * necessary with other forms of inter-pipelines elements. @@ -534,6 +532,123 @@ impl InterSrc { local_ctx.as_ref().expect("set in prepare").shared.clone() } + fn join_inter_ctx_blocking( + &self, + inter_ctx_name: &str, + ts_ctx: Context, + dataqueue: DataQueue, + ) -> Result<(), gst::ErrorMessage> { + let elem = self.obj(); + + let inter_src_capacity = self.settings.lock().unwrap().inter_src_capacity; + + let src_ctx = block_on(InterContextSrc::add( + inter_ctx_name.to_string(), + inter_src_capacity, + dataqueue.clone(), + elem.clone(), + )); + + if self + .task + .prepare( + InterSrcTask::new(elem.clone(), dataqueue.clone()), + ts_ctx.clone(), + ) + .block_on() + .is_err() + { + return Err(gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to start Task"] + )); + } + + *self.src_ctx.lock().unwrap() = Some(src_ctx); + + Ok(()) + } + + fn change_inter_ctx_blocking(&self, target_inter_ctx: &str) -> Result<(), gst::ErrorMessage> { + if self.obj().current_state() <= gst::State::Null { + // Element not prepared yet => nothing to change + return Ok(()); + } + + let seqnum = gst::Seqnum::next(); + + if self.obj().current_state() != gst::State::Ready { + if let Err(err) = self.task.stop().block_on() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to stop current Task: {err}"] + )); + } + + self.srcpad + .gst_pad() + .push_event(gst::event::FlushStart::builder().seqnum(seqnum).build()); + } + + if let Err(err) = self.task.unprepare().block_on() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to unprepare current Task: {err}"] + )); + } + + // Remove the InterContextSrc from the InterContext + drop(self.src_ctx.lock().unwrap().take()); + + *self.upstream_latency.lock().unwrap() = None; + + let dataqueue = self + .dataqueue + .lock() + .unwrap() + .clone() + .expect("set in prepare"); + let ts_ctx = self.ts_ctx.lock().unwrap().clone().expect("set in prepare"); + + gst::info!( + CAT, + imp = self, + "Joining new inter-context {target_inter_ctx}" + ); + + self.join_inter_ctx_blocking(target_inter_ctx, ts_ctx, dataqueue)?; + + match self.obj().current_state() { + gst::State::Paused => { + self.srcpad + .gst_pad() + .push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build()); + + if let Err(err) = self.task.pause().block_on() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to set new Task in Pause: {err}"] + )); + } + } + gst::State::Playing => { + self.srcpad + .gst_pad() + .push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build()); + + if let Err(err) = self.task.start().block_on() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to start new Task: {err}"] + )); + } + } + _ => (), + } + + Ok(()) + } + // Sets the upstream latency without blocking the caller. pub fn set_upstream_latency(&self, up_latency: gst::ClockTime) { if let Some(ref ts_ctx) = *self.ts_ctx.lock().unwrap() { @@ -585,6 +700,7 @@ impl InterSrc { ["Failed to acquire Context: {err}"] ) })?; + *self.ts_ctx.lock().unwrap() = Some(ts_ctx.clone()); let dataqueue = DataQueue::new( &self.obj().clone().upcast(), @@ -605,38 +721,13 @@ impl InterSrc { Some(settings.max_size_time) }, ); + *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); - let obj = self.obj().clone(); - let (ctx_name, inter_src_capacity) = { - let settings = self.settings.lock().unwrap(); - (settings.inter_context.clone(), settings.inter_src_capacity) - }; + let inter_ctx_name = self.settings.lock().unwrap().inter_context.to_string(); + self.join_inter_ctx_blocking(&inter_ctx_name, ts_ctx, dataqueue)?; - block_on(async move { - let imp = obj.imp(); - let src_ctx = - InterContextSrc::add(ctx_name, inter_src_capacity, dataqueue.clone(), obj.clone()) - .await; - - *imp.src_ctx.lock().unwrap() = Some(src_ctx); - *imp.ts_ctx.lock().unwrap() = Some(ts_ctx.clone()); - *imp.dataqueue.lock().unwrap() = Some(dataqueue.clone()); - - if imp - .task - .prepare(InterSrcTask::new(obj.clone(), dataqueue), ts_ctx) - .await - .is_ok() - { - gst::debug!(CAT, imp = imp, "Prepared"); - Ok(()) - } else { - Err(gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to start Task"] - )) - } - }) + gst::debug!(CAT, imp = self, "Prepared"); + Ok(()) } fn unprepare(&self) { @@ -760,7 +851,6 @@ impl ObjectImpl for InterSrc { .blurb("Context name of the inter elements to share with") .default_value(Some(DEFAULT_INTER_CONTEXT)) .readwrite() - .construct_only() .build(), glib::ParamSpecUInt::builder("max-size-buffers") .nick("Max Size Buffers") @@ -791,33 +881,45 @@ impl ObjectImpl for InterSrc { } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); match pspec.name() { "max-size-buffers" => { - settings.max_size_buffers = value.get().expect("type checked upstream"); + self.settings.lock().unwrap().max_size_buffers = + value.get().expect("type checked upstream"); } "max-size-bytes" => { - settings.max_size_bytes = value.get().expect("type checked upstream"); + self.settings.lock().unwrap().max_size_bytes = + value.get().expect("type checked upstream"); } "max-size-time" => { - settings.max_size_time = value.get::().unwrap().nseconds(); + self.settings.lock().unwrap().max_size_time = + value.get::().unwrap().nseconds(); } "context" => { - settings.context = value + self.settings.lock().unwrap().context = value .get::>() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = Duration::from_millis( + self.settings.lock().unwrap().context_wait = Duration::from_millis( value.get::().expect("type checked upstream").into(), ); } "inter-context" => { - settings.inter_context = value + let target_inter_ctx = value .get::>() .expect("type checked upstream") .unwrap_or_else(|| DEFAULT_INTER_CONTEXT.into()); + + if target_inter_ctx == self.settings.lock().unwrap().inter_context { + return; + } + + if let Err(err) = self.change_inter_ctx_blocking(&target_inter_ctx) { + gst::error!(CAT, imp = self, "Failed to change inter-context: {err}"); + } else { + self.settings.lock().unwrap().inter_context = target_inter_ctx; + } } _ => unimplemented!(), } diff --git a/generic/threadshare/tests/inter.rs b/generic/threadshare/tests/inter.rs index aff45b67f..1326aabc8 100644 --- a/generic/threadshare/tests/inter.rs +++ b/generic/threadshare/tests/inter.rs @@ -17,12 +17,14 @@ // // SPDX-License-Identifier: LGPL-2.1-or-later -use futures::channel::oneshot; +use futures::channel::{mpsc, oneshot}; use futures::prelude::*; use gst::prelude::*; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, +}; use std::time::Duration; fn init() { @@ -471,3 +473,245 @@ fn one_to_many_up_first() { // pipe_down_3 was set to Playing after pipe_up was shutdown assert!(samples_3.load(Ordering::SeqCst) == 0); } + +#[test] +fn changing_inter_ctx() { + init(); + + let pipe_up1 = gst::Pipeline::with_name("upstream1::changing_inter_ctx"); + let src1 = gst::ElementFactory::make("audiotestsrc") + .name("testsrc1::changing_inter_ctx") + .property("is-live", true) + .build() + .unwrap(); + let capsfilter1 = gst::ElementFactory::make("capsfilter") + .name("capsfilter1::one_to_one_down_first") + .property( + "caps", + gst::Caps::builder("audio/x-raw") + .field("channels", 1i32) + .build(), + ) + .build() + .unwrap(); + let intersink1 = gst::ElementFactory::make("ts-intersink") + .name("intersink1::changing_inter_ctx") + .property("inter-context", "inter1::changing_inter_ctx") + .build() + .unwrap(); + + let upstream1_elems = [&src1, &capsfilter1, &intersink1]; + pipe_up1.add_many(upstream1_elems).unwrap(); + gst::Element::link_many(upstream1_elems).unwrap(); + + let pipe_up2 = gst::Pipeline::with_name("upstream2::changing_inter_ctx"); + let src2 = gst::ElementFactory::make("audiotestsrc") + .name("testsrc2::changing_inter_ctx") + .property("is-live", true) + .build() + .unwrap(); + let capsfilter2 = gst::ElementFactory::make("capsfilter") + .name("capsfilter2::one_to_one_down_first") + .property( + "caps", + gst::Caps::builder("audio/x-raw") + .field("channels", 2i32) + .build(), + ) + .build() + .unwrap(); + let intersink2 = gst::ElementFactory::make("ts-intersink") + .name("intersink2::changing_inter_ctx") + .property("inter-context", "inter2::changing_inter_ctx") + .build() + .unwrap(); + + let upstream2_elems = [&src2, &capsfilter2, &intersink2]; + pipe_up2.add_many(upstream2_elems).unwrap(); + gst::Element::link_many(upstream2_elems).unwrap(); + + let pipe_down = gst::Pipeline::with_name("downstream::changing_inter_ctx"); + let intersrc = gst::ElementFactory::make("ts-intersrc") + .name("intersrc::changing_inter_ctx") + .property("context", "inter::changing_inter_ctx") + .property("context-wait", 20u32) + .build() + .unwrap(); + let appsink = gst_app::AppSink::builder() + .name("appsink::changing_inter_ctx") + .build(); + + let downstream_elems = [&intersrc, appsink.upcast_ref()]; + pipe_down.add_many(downstream_elems).unwrap(); + gst::Element::link_many(downstream_elems).unwrap(); + + pipe_up1.set_base_time(gst::ClockTime::ZERO); + pipe_up1.set_start_time(gst::ClockTime::NONE); + pipe_up2.set_base_time(gst::ClockTime::ZERO); + pipe_up2.set_start_time(gst::ClockTime::NONE); + pipe_down.set_base_time(gst::ClockTime::ZERO); + pipe_down.set_start_time(gst::ClockTime::NONE); + + let (mut caps_tx, mut caps_rx) = mpsc::channel(1); + let (mut n_buffers_tx, mut n_buffers_rx) = mpsc::channel(1); + let starting = Arc::new(AtomicBool::new(true)); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample({ + let starting = starting.clone(); + let mut cur_caps = None; + let mut samples = 0; + let mut start_count = 0; + move |appsink| { + if starting.fetch_and(false, Ordering::SeqCst) { + cur_caps = None; + samples = 0; + start_count += 1; + } + + let sample = appsink.pull_sample().unwrap(); + if let Some(caps) = sample.caps() { + if cur_caps.as_ref().is_none() { + cur_caps = Some(caps.to_owned()); + caps_tx.try_send(caps.to_owned()).unwrap(); + } + } + + samples += 1; + + if samples == 10 { + n_buffers_tx.try_send(()).unwrap(); + if start_count == 2 { + return Err(gst::FlowError::Eos); + } + } + + Ok(gst::FlowSuccess::Ok) + } + }) + .new_event(move |appsink| { + let obj = appsink.pull_object().unwrap(); + if let Some(event) = obj.downcast_ref::() { + if let gst::EventView::FlushStop(_) = event.view() { + println!("inter::changing_inter_ctx: appsink got FlushStop"); + starting.store(true, Ordering::SeqCst); + } + } + // let basesink handle the event + false + }) + .build(), + ); + + // Starting upstream first + pipe_up1.set_state(gst::State::Playing).unwrap(); + pipe_up2.set_state(gst::State::Playing).unwrap(); + + // Connect downstream to pipe_up1 initially + intersrc.set_property("inter-context", "inter1::changing_inter_ctx"); + pipe_down.set_state(gst::State::Playing).unwrap(); + + let mut bus_up1_stream = pipe_up1.bus().unwrap().stream(); + let mut bus_up2_stream = pipe_up1.bus().unwrap().stream(); + let mut bus_down_stream = pipe_down.bus().unwrap().stream(); + + futures::executor::block_on(async { + use gst::MessageView::*; + + loop { + futures::select! { + caps = caps_rx.next() => { + println!("inter::changing_inter_ctx: caps 1: {caps:?}"); + if let Some(caps) = caps { + let s = caps.structure(0).unwrap(); + assert_eq!(s.get::("channels").unwrap(), 1); + } + } + _ = n_buffers_rx.next() => { + println!("inter::changing_inter_ctx: got n buffers 1"); + break; + } + msg = bus_up1_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipe_up1.recalculate_latency(); + } + Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"), + _ => (), + } + } + msg = bus_up2_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipe_up2.recalculate_latency(); + } + Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"), + _ => (), + } + } + msg = bus_down_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipe_down.recalculate_latency(); + } + Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"), + _ => (), + } + } + }; + } + }); + + println!("inter::changing_inter_ctx: changing now"); + intersrc.set_property("inter-context", "inter2::changing_inter_ctx"); + + futures::executor::block_on(async { + use gst::MessageView::*; + + loop { + println!("changing_inter_ctx: iter 2"); + + futures::select! { + caps = caps_rx.next() => { + println!("inter::changing_inter_ctx: caps 2: {caps:?}"); + if let Some(caps) = caps { + let s = caps.structure(0).unwrap(); + assert_eq!(s.get::("channels").unwrap(), 2); + } + } + _ = n_buffers_rx.next() => { + println!("inter::changing_inter_ctx: got n buffers 2"); + break; + } + msg = bus_up2_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipe_up2.recalculate_latency(); + } + Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"), + _ => (), + } + } + msg = bus_down_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipe_down.recalculate_latency(); + } + Error(err) => unreachable!("inter::changing_inter_ctx {err:?}"), + _ => (), + } + } + }; + } + }); + + println!("inter::changing_inter_ctx: stopping"); + pipe_down.set_state(gst::State::Null).unwrap(); + pipe_up2.set_state(gst::State::Null).unwrap(); + pipe_up1.set_state(gst::State::Null).unwrap(); +}