From 458e59979f5f0a2b9edb50796f499c0b59f102d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 12 Aug 2025 18:39:52 +0200 Subject: [PATCH] threadshare: inter: store upstream latency in InterContext ... instead of retrieving the element following the sink pad. Part-of: --- generic/threadshare/src/inter/mod.rs | 2 ++ generic/threadshare/src/inter/sink/imp.rs | 21 +++++++++++---------- generic/threadshare/src/inter/src/imp.rs | 16 +--------------- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/generic/threadshare/src/inter/mod.rs b/generic/threadshare/src/inter/mod.rs index ba348de92..d3b5df259 100644 --- a/generic/threadshare/src/inter/mod.rs +++ b/generic/threadshare/src/inter/mod.rs @@ -49,6 +49,7 @@ struct InterContextInner { dataqueues: Slab, sources: Slab, sinkpad: Option, + upstream_latency: Option, } impl InterContextInner { @@ -58,6 +59,7 @@ impl InterContextInner { dataqueues: Slab::new(), sources: Slab::new(), sinkpad: None, + upstream_latency: None, } } } diff --git a/generic/threadshare/src/inter/sink/imp.rs b/generic/threadshare/src/inter/sink/imp.rs index c7f661402..8ef0d7a09 100644 --- a/generic/threadshare/src/inter/sink/imp.rs +++ b/generic/threadshare/src/inter/sink/imp.rs @@ -214,7 +214,6 @@ impl PadSinkHandler for InterSinkPadHandler { pub struct InterSink { sinkpad: PadSink, sink_ctx: Mutex>, - upstream_latency: Mutex>, got_first_buffer: AtomicBool, settings: Mutex, } @@ -248,10 +247,6 @@ impl InterSink { Ok(gst::FlowSuccess::Ok) } - pub fn latency(&self) -> Option { - *self.upstream_latency.lock().unwrap() - } - fn prepare(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp = self, "Preparing"); @@ -288,9 +283,15 @@ impl InterSink { } fn stop(&self) { - gst::debug!(CAT, imp = self, "Stopped"); + gst::debug!(CAT, imp = self, "Stopping"); + self.got_first_buffer.store(false, Ordering::SeqCst); - *self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE; + + let shared_ctx = self.shared_ctx(); + block_on(async move { + shared_ctx.write().await.upstream_latency = None; + }); + gst::debug!(CAT, imp = self, "Stopped"); } } @@ -308,7 +309,6 @@ impl ObjectSubclass for InterSink { InterSinkPadHandler, ), sink_ctx: Mutex::new(None), - upstream_latency: Mutex::new(gst::ClockTime::NONE), got_first_buffer: AtomicBool::new(false), settings: Mutex::new(Settings::default()), } @@ -388,13 +388,14 @@ impl ElementImpl for InterSink { if let gst::EventView::Latency(lat_evt) = event.view() { let latency = lat_evt.latency(); - *self.upstream_latency.lock().unwrap() = Some(latency); let obj = self.obj().clone(); let shared_ctx = self.shared_ctx(); let _ = block_on_or_add_sub_task(async move { - let shared_ctx = shared_ctx.read().await; + let mut shared_ctx = shared_ctx.write().await; + shared_ctx.upstream_latency = Some(latency); + if shared_ctx.sources.is_empty() { gst::info!(CAT, obj = obj, "No sources to set upstream latency"); } else { diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index dc8a5c14e..00f00bdd0 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -362,21 +362,7 @@ impl InterSrcTask { let shared_ctx = imp.shared_ctx(); let shared_ctx = shared_ctx.read().await; - let Some(ref sinkpad) = shared_ctx.sinkpad else { - gst::info!( - CAT, - imp = imp, - "sinkpad is gone before we could get latency" - ); - return Err(gst::FlowError::Error); - }; - - let sinkpad_parent = sinkpad.parent().expect("sinkpad should have a parent"); - let intersink = sinkpad_parent - .downcast_ref::() - .expect("sinkpad parent should be a ts-intersink"); - - if let Some(latency) = intersink.imp().latency() { + if let Some(latency) = shared_ctx.upstream_latency { imp.set_upstream_latency_priv(latency); } else { gst::log!(CAT, imp = imp, "Upstream latency is still unknown");