From e4e14cf5ca2399a3471b87942bc7c2d7a9c5fb00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 12 Aug 2025 18:28:52 +0200 Subject: [PATCH] threadshare: intersink: return from blocking tasks when stopping This commit cancels blocking tasks when the element is requested to stop. This is to prevent deadlocks. Part-of: --- generic/threadshare/src/inter/sink/imp.rs | 102 +++++++++++++++------- 1 file changed, 69 insertions(+), 33 deletions(-) diff --git a/generic/threadshare/src/inter/sink/imp.rs b/generic/threadshare/src/inter/sink/imp.rs index 8ef0d7a09..d8c928f31 100644 --- a/generic/threadshare/src/inter/sink/imp.rs +++ b/generic/threadshare/src/inter/sink/imp.rs @@ -6,6 +6,7 @@ // // SPDX-License-Identifier: MPL-2.0 +use gio::glib::ControlFlow; /** * SECTION:element-ts-intersink * @see_also: ts-intersrc, ts-proxysink, ts-proxysrc, intersink, intersrc @@ -72,6 +73,7 @@ impl Default for Settings { #[derive(Debug)] struct InterContextSink { shared: InterContext, + got_first_buffer: bool, } impl InterContextSink { @@ -98,7 +100,10 @@ impl InterContextSink { shared }; - Some(InterContextSink { shared }) + Some(InterContextSink { + shared, + got_first_buffer: false, + }) } } @@ -162,6 +167,11 @@ impl PadSinkHandler for InterSinkPadHandler { "Forwarding non-serialized downstream {event:?}" ); for (_, source) in shared_ctx.sources.iter() { + if imp.stop.load(Ordering::SeqCst) { + gst::log!(CAT, imp = imp, "Stop requested"); + return false; + } + if !source.send_event(event.clone()) { gst::warning!( CAT, @@ -172,9 +182,10 @@ impl PadSinkHandler for InterSinkPadHandler { } } } - }); - true + true + }) + .unwrap_or(false) } else { gst::debug!( CAT, @@ -196,13 +207,8 @@ impl PadSinkHandler for InterSinkPadHandler { let imp = elem.imp(); - use gst::EventView; - match event.view() { - EventView::Eos(..) => { - let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); - } - EventView::FlushStop(..) => imp.start(), - _ => (), + if let gst::EventView::Eos(..) = event.view() { + let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); } gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event); @@ -214,7 +220,7 @@ impl PadSinkHandler for InterSinkPadHandler { pub struct InterSink { sinkpad: PadSink, sink_ctx: Mutex>, - got_first_buffer: AtomicBool, + stop: AtomicBool, settings: Mutex, } @@ -225,20 +231,40 @@ impl InterSink { } async fn enqueue_item(&self, item: DataQueueItem) -> Result { - if !self.got_first_buffer.load(Ordering::SeqCst) - && matches!( - item, - DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) - ) - { - self.got_first_buffer.store(true, Ordering::SeqCst); - let _ = self.post_message(gst::message::Latency::new()); - } + let shared_ctx = { + let mut local_ctx_grd = self.sink_ctx.lock().unwrap(); + let local_ctx = local_ctx_grd.as_mut().expect("set in prepare"); + + if self.stop.load(Ordering::SeqCst) { + gst::log!(CAT, imp = self, "Stop requested"); + return Err(gst::FlowError::Flushing); + } + + let shared_ctx = local_ctx.shared.clone(); + + if !local_ctx.got_first_buffer + && matches!( + item, + DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) + ) + { + local_ctx.got_first_buffer = true; + drop(local_ctx_grd); + + let _ = self.post_message(gst::message::Latency::new()); + } + + shared_ctx + }; - let shared_ctx = self.shared_ctx(); let shared_ctx = shared_ctx.read().await; for (_, dq) in shared_ctx.dataqueues.iter() { + if self.stop.load(Ordering::SeqCst) { + gst::log!(CAT, imp = self, "Stop requested"); + return Err(gst::FlowError::Flushing); + } + if dq.push(item.clone()).is_err() { gst::debug!(CAT, imp = self, "Failed to enqueue item: {item:?}"); } @@ -280,14 +306,19 @@ impl InterSink { fn start(&self) { gst::debug!(CAT, imp = self, "Started"); + self.stop.store(false, Ordering::SeqCst); } fn stop(&self) { gst::debug!(CAT, imp = self, "Stopping"); - self.got_first_buffer.store(false, Ordering::SeqCst); + self.stop.store(true, Ordering::SeqCst); - let shared_ctx = self.shared_ctx(); + let mut local_ctx = self.sink_ctx.lock().unwrap(); + let local_ctx = local_ctx.as_mut().expect("set in prepare"); + local_ctx.got_first_buffer = false; + + let shared_ctx = local_ctx.shared.clone(); block_on(async move { shared_ctx.write().await.upstream_latency = None; }); @@ -309,7 +340,7 @@ impl ObjectSubclass for InterSink { InterSinkPadHandler, ), sink_ctx: Mutex::new(None), - got_first_buffer: AtomicBool::new(false), + stop: AtomicBool::new(true), settings: Mutex::new(Settings::default()), } } @@ -376,13 +407,6 @@ impl ElementImpl for InterSink { Some(&*ELEMENT_METADATA) } - fn query(&self, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, imp = self, "Got {query:?}"); - let res = self.parent_query(query); - gst::log!(CAT, imp = self, "Parent returned {res}, {query:?}"); - res - } - fn send_event(&self, event: gst::Event) -> bool { gst::log!(CAT, imp = self, "Got {event:?}"); @@ -392,7 +416,7 @@ impl ElementImpl for InterSink { let obj = self.obj().clone(); let shared_ctx = self.shared_ctx(); - let _ = block_on_or_add_sub_task(async move { + let res = block_on_or_add_sub_task(async move { let mut shared_ctx = shared_ctx.write().await; shared_ctx.upstream_latency = Some(latency); @@ -401,10 +425,22 @@ impl ElementImpl for InterSink { } else { gst::log!(CAT, obj = obj, "Setting upstream latency {latency}"); for (_, src) in shared_ctx.sources.iter() { + if obj.imp().stop.load(Ordering::SeqCst) { + gst::log!(CAT, obj = obj, "Stop requested"); + return ControlFlow::Break; + } + src.imp().set_upstream_latency(latency); } } - }); + ControlFlow::Continue + }) + .unwrap_or(ControlFlow::Break); + + if res.is_break() { + // We are stopping, don't propagate upstream + return false; + } } self.sinkpad.gst_pad().push_event(event)