From db8dabf195f1cd891482ad76065de63f79d15574 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 20 Aug 2025 18:05:08 +0200 Subject: [PATCH] threadshare: update examples Part-of: --- generic/threadshare/examples/inter/simple.rs | 20 +-- .../threadshare/examples/standalone/main.rs | 114 ++++++++++-------- .../standalone/sink/async_mutex/imp.rs | 8 +- .../examples/standalone/sink/stats.rs | 96 +++++++-------- .../standalone/sink/sync_mutex/imp.rs | 8 +- .../examples/standalone/sink/task/imp.rs | 8 +- generic/threadshare/src/inter/src/imp.rs | 5 +- 7 files changed, 135 insertions(+), 124 deletions(-) diff --git a/generic/threadshare/examples/inter/simple.rs b/generic/threadshare/examples/inter/simple.rs index 7daad8f25..2ae7c385f 100644 --- a/generic/threadshare/examples/inter/simple.rs +++ b/generic/threadshare/examples/inter/simple.rs @@ -6,11 +6,11 @@ fn main() { gst::init().unwrap(); let pipe_up = gst::parse::launch( - " - audiotestsrc is-live=true num-buffers=2000 volume=0.02 + "pipeline. (name=upstream + ts-audiotestsrc is-live=true num-buffers=2000 volume=0.02 context=ts-group-1 context-wait=20 ! opusenc ! ts-intersink inter-context=my-inter-ctx - ", + )", ) .unwrap() .downcast::() @@ -19,14 +19,15 @@ fn main() { // A downstream pipeline which will receive the Opus encoded audio stream // and render it locally. let pipe_down = gst::parse::launch( - " - ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20 + "pipeline. (name=downstream + ts-intersrc inter-context=my-inter-ctx context=ts-group-1 context-wait=20 ! opusdec ! audioconvert ! audioresample - ! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0 + ! ts-queue context=ts-group-1 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0 + ! ts-blocking-adapter ! autoaudiosink - ", + )", ) .unwrap() .downcast::() @@ -46,10 +47,11 @@ fn main() { // started at the same time. However, an application that dynamically // generates pipelines must ensure that all the pipelines that will be // connected together share the same base time. - pipe_up.set_base_time(gst::ClockTime::ZERO); + let base_time = clock.time(); pipe_up.set_start_time(gst::ClockTime::NONE); - pipe_down.set_base_time(gst::ClockTime::ZERO); + pipe_up.set_base_time(base_time); pipe_down.set_start_time(gst::ClockTime::NONE); + pipe_down.set_base_time(base_time); pipe_up.set_state(gst::State::Playing).unwrap(); pipe_down.set_state(gst::State::Playing).unwrap(); diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index 4c8d00747..2e3f47606 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -50,6 +50,9 @@ fn main() { gstthreadshare::plugin_register_static().unwrap(); self::plugin_register_static().unwrap(); + let main_context = glib::MainContext::default(); + let _guard = main_context.acquire().unwrap(); + #[cfg(debug_assertions)] gst::warning!(CAT, "RUNNING DEBUG BUILD"); @@ -115,67 +118,72 @@ fn main() { let l = glib::MainLoop::new(None, false); - let _bus_watch = pipeline - .bus() - .unwrap() - .add_watch({ - let terminated_count = Arc::new(AtomicU32::new(0)); - let l = l.clone(); - let pipeline = pipeline.clone(); - move |_, msg| { - use gst::MessageView::*; - match msg.view() { - Eos(_) => { - // Actually, we don't post EOS (see sinks impl). - gst::info!(CAT, "Received eos"); - l.quit(); + let bus = pipeline.bus().unwrap(); + let mut bus_stream = bus.stream(); + let pipeline_weak = pipeline.downgrade(); + let l_clone = l.clone(); + main_context.spawn_local(async move { + use futures::prelude::*; - return glib::ControlFlow::Break; - } - Error(msg) => { - if let gst::MessageView::Error(msg) = msg.message().view() { - if msg.error().matches(gst::LibraryError::Shutdown) - && terminated_count.fetch_add(1, Ordering::SeqCst) - == args.streams - 1 - { + let terminated_count = Arc::new(AtomicU32::new(0)); + + while let Some(msg) = bus_stream.next().await { + use gst::MessageView::*; + + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; + + match msg.view() { + Eos(_) => { + // Actually, we don't post EOS (see sinks impl). + gst::info!(CAT, "Received eos"); + l_clone.quit(); + + break; + } + Error(msg) => { + if let gst::MessageView::Error(msg) = msg.message().view() { + if msg.error().matches(gst::LibraryError::Shutdown) { + if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { gst::info!(CAT, "Received all shutdown requests"); - l.quit(); + l_clone.quit(); - return glib::ControlFlow::Break; + break; + } else { + continue; } } - - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l.quit(); - - return glib::ControlFlow::Break; } - Latency(msg) => { - gst::info!( - CAT, - "Latency requirements have changed for element {}", - msg.src() - .map(|src| src.name()) - .as_deref() - .unwrap_or("UNKNOWN"), - ); - if let Err(err) = pipeline.recalculate_latency() { - gst::error!(CAT, "Error recalculating latency: {err}"); - } - } - _ => (), + + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.name()), + msg.error(), + msg.debug() + ); + l_clone.quit(); + + break; } - - glib::ControlFlow::Continue + Latency(msg) => { + gst::info!( + CAT, + "Latency requirements have changed for element {}", + msg.src() + .map(|src| src.name()) + .as_deref() + .unwrap_or("UNKNOWN"), + ); + if let Err(err) = pipeline.recalculate_latency() { + gst::error!(CAT, "Error recalculating latency: {err}"); + } + } + _ => (), } - }) - .expect("Failed to add bus watch"); + } + }); gst::info!(CAT, "Switching to Ready"); let start = Instant::now(); diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs index 3c0a4b218..9cb51e41b 100644 --- a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs +++ b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs @@ -57,19 +57,19 @@ impl PadSinkHandlerInner { .expect("ts before Segment start"); if let Some(last_ts) = self.last_ts { - let cur_ts = elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - ts).into(); + let rt = elem.current_running_time().unwrap(); + let lateness: Duration = (rt - ts).into(); let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { - stats.add_buffer(latency, interval); + stats.add_buffer(lateness, interval); } debug_or_trace!( CAT, self.is_main_elem, obj = elem, - "o latency {latency:.2?}" + "o lateness {lateness:.2?}" ); debug_or_trace!( CAT, diff --git a/generic/threadshare/examples/standalone/sink/stats.rs b/generic/threadshare/examples/standalone/sink/stats.rs index 69370d9cd..727d03c5b 100644 --- a/generic/threadshare/examples/standalone/sink/stats.rs +++ b/generic/threadshare/examples/standalone/sink/stats.rs @@ -16,14 +16,14 @@ pub struct Stats { max_buffers: Option, buffer_count: f32, buffer_count_delta: f32, - latency_sum: f32, - latency_square_sum: f32, - latency_sum_delta: f32, - latency_square_sum_delta: f32, - latency_min: Duration, - latency_min_delta: Duration, - latency_max: Duration, - latency_max_delta: Duration, + lateness_sum: f32, + lateness_square_sum: f32, + lateness_sum_delta: f32, + lateness_square_sum_delta: f32, + lateness_min: Duration, + lateness_min_delta: Duration, + lateness_max: Duration, + lateness_max_delta: Duration, interval_sum: f32, interval_square_sum: f32, interval_sum_delta: f32, @@ -51,14 +51,14 @@ impl Stats { pub fn start(&mut self) { self.buffer_count = 0.0; self.buffer_count_delta = 0.0; - self.latency_sum = 0.0; - self.latency_square_sum = 0.0; - self.latency_sum_delta = 0.0; - self.latency_square_sum_delta = 0.0; - self.latency_min = Duration::MAX; - self.latency_min_delta = Duration::MAX; - self.latency_max = Duration::ZERO; - self.latency_max_delta = Duration::ZERO; + self.lateness_sum = 0.0; + self.lateness_square_sum = 0.0; + self.lateness_sum_delta = 0.0; + self.lateness_square_sum_delta = 0.0; + self.lateness_min = Duration::MAX; + self.lateness_min_delta = Duration::MAX; + self.lateness_max = Duration::ZERO; + self.lateness_max_delta = Duration::ZERO; self.interval_sum = 0.0; self.interval_square_sum = 0.0; self.interval_sum_delta = 0.0; @@ -105,7 +105,7 @@ impl Stats { } } - pub fn add_buffer(&mut self, latency: Duration, interval: Duration) { + pub fn add_buffer(&mut self, lateness: Duration, interval: Duration) { if !self.is_active() { return; } @@ -113,19 +113,19 @@ impl Stats { self.buffer_count += 1.0; self.buffer_count_delta += 1.0; - // Latency - let latency_f32 = latency.as_nanos() as f32; - let latency_square = latency_f32.powi(2); + // Lateness + let lateness_f32 = lateness.as_nanos() as f32; + let lateness_square = lateness_f32.powi(2); - self.latency_sum += latency_f32; - self.latency_square_sum += latency_square; - self.latency_min = self.latency_min.min(latency); - self.latency_max = self.latency_max.max(latency); + self.lateness_sum += lateness_f32; + self.lateness_square_sum += lateness_square; + self.lateness_min = self.lateness_min.min(lateness); + self.lateness_max = self.lateness_max.max(lateness); - self.latency_sum_delta += latency_f32; - self.latency_square_sum_delta += latency_square; - self.latency_min_delta = self.latency_min_delta.min(latency); - self.latency_max_delta = self.latency_max_delta.max(latency); + self.lateness_sum_delta += lateness_f32; + self.lateness_square_sum_delta += lateness_square; + self.lateness_min_delta = self.lateness_min_delta.min(lateness); + self.lateness_max_delta = self.lateness_max_delta.max(lateness); // Interval let interval_f32 = interval.as_nanos() as f32; @@ -186,24 +186,24 @@ impl Stats { self.interval_max_delta = Duration::ZERO; self.interval_late_count_delta = 0.0; - let latency_mean = self.latency_sum_delta / self.buffer_count_delta; - let latency_std_dev = f32::sqrt( - self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), + let lateness_mean = self.lateness_sum_delta / self.buffer_count_delta; + let lateness_std_dev = f32::sqrt( + self.lateness_square_sum_delta / self.buffer_count_delta - lateness_mean.powi(2), ); gst::info!( CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min_delta, - self.latency_max_delta, + "o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(lateness_mean as u64), + Duration::from_nanos(lateness_std_dev as u64), + self.lateness_min_delta, + self.lateness_max_delta, ); - self.latency_sum_delta = 0.0; - self.latency_square_sum_delta = 0.0; - self.latency_min_delta = Duration::MAX; - self.latency_max_delta = Duration::ZERO; + self.lateness_sum_delta = 0.0; + self.lateness_square_sum_delta = 0.0; + self.lateness_min_delta = Duration::MAX; + self.lateness_max_delta = Duration::ZERO; self.buffer_count_delta = 0.0; } @@ -254,17 +254,17 @@ impl Stats { ); } - let latency_mean = self.latency_sum / self.buffer_count; - let latency_std_dev = - f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); + let lateness_mean = self.lateness_sum / self.buffer_count; + let lateness_std_dev = + f32::sqrt(self.lateness_square_sum / self.buffer_count - lateness_mean.powi(2)); gst::info!( CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min, - self.latency_max, + "o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(lateness_mean as u64), + Duration::from_nanos(lateness_std_dev as u64), + self.lateness_min, + self.lateness_max, ); } } diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs index 743b0f7e5..d0ab6e979 100644 --- a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs +++ b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs @@ -56,19 +56,19 @@ impl PadSinkHandlerInner { .expect("ts before Segment start"); if let Some(last_ts) = self.last_ts { - let cur_ts = elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - ts).into(); + let rt = elem.current_running_time().unwrap(); + let lateness: Duration = (rt - ts).into(); let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { - stats.add_buffer(latency, interval); + stats.add_buffer(lateness, interval); } debug_or_trace!( CAT, self.is_main_elem, obj = elem, - "o latency {latency:.2?}" + "o lateness {lateness:.2?}" ); debug_or_trace!( CAT, diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs index 482e685f1..e147053ac 100644 --- a/generic/threadshare/examples/standalone/sink/task/imp.rs +++ b/generic/threadshare/examples/standalone/sink/task/imp.rs @@ -169,19 +169,19 @@ impl TaskImpl for TaskSinkTask { .expect("dts before Segment start"); if let Some(last_ts) = self.last_ts { - let cur_ts = self.elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - ts).into(); + let rt = self.elem.current_running_time().unwrap(); + let lateness: Duration = (rt - ts).into(); let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { - stats.add_buffer(latency, interval); + stats.add_buffer(lateness, interval); } debug_or_trace!( CAT, self.is_main_elem, obj = self.elem, - "o latency {latency:.2?}", + "o lateness {lateness:.2?}", ); debug_or_trace!( CAT, diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 34d818809..4805f6e98 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -74,10 +74,11 @@ * // started at the same time. However, an application that dynamically * // generates pipelines must ensure that all the pipelines that will be * // connected together share the same base time. - * pipe_up.set_base_time(gst::ClockTime::ZERO); + * let base_time = clock.time(); * pipe_up.set_start_time(gst::ClockTime::NONE); - * pipe_down.set_base_time(gst::ClockTime::ZERO); + * pipe_up.set_base_time(base_time); * pipe_down.set_start_time(gst::ClockTime::NONE); + * pipe_down.set_base_time(base_time); * * pipe_up.set_state(gst::State::Playing).unwrap(); * pipe_down.set_state(gst::State::Playing).unwrap();