threadshare: intersink: return from blocking tasks when stopping

This commit cancels blocking tasks when the element is requested to stop.
This is to prevent deadlocks.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2465>
This commit is contained in:
François Laignel 2025-08-12 18:28:52 +02:00 committed by GStreamer Marge Bot
parent 458e59979f
commit e4e14cf5ca

View file

@ -6,6 +6,7 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use gio::glib::ControlFlow;
/** /**
* SECTION:element-ts-intersink * SECTION:element-ts-intersink
* @see_also: ts-intersrc, ts-proxysink, ts-proxysrc, intersink, intersrc * @see_also: ts-intersrc, ts-proxysink, ts-proxysrc, intersink, intersrc
@ -72,6 +73,7 @@ impl Default for Settings {
#[derive(Debug)] #[derive(Debug)]
struct InterContextSink { struct InterContextSink {
shared: InterContext, shared: InterContext,
got_first_buffer: bool,
} }
impl InterContextSink { impl InterContextSink {
@ -98,7 +100,10 @@ impl InterContextSink {
shared shared
}; };
Some(InterContextSink { shared }) Some(InterContextSink {
shared,
got_first_buffer: false,
})
} }
} }
@ -162,6 +167,11 @@ impl PadSinkHandler for InterSinkPadHandler {
"Forwarding non-serialized downstream {event:?}" "Forwarding non-serialized downstream {event:?}"
); );
for (_, source) in shared_ctx.sources.iter() { for (_, source) in shared_ctx.sources.iter() {
if imp.stop.load(Ordering::SeqCst) {
gst::log!(CAT, imp = imp, "Stop requested");
return false;
}
if !source.send_event(event.clone()) { if !source.send_event(event.clone()) {
gst::warning!( gst::warning!(
CAT, CAT,
@ -172,9 +182,10 @@ impl PadSinkHandler for InterSinkPadHandler {
} }
} }
} }
});
true true
})
.unwrap_or(false)
} else { } else {
gst::debug!( gst::debug!(
CAT, CAT,
@ -196,14 +207,9 @@ impl PadSinkHandler for InterSinkPadHandler {
let imp = elem.imp(); let imp = elem.imp();
use gst::EventView; if let gst::EventView::Eos(..) = event.view() {
match event.view() {
EventView::Eos(..) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
} }
EventView::FlushStop(..) => imp.start(),
_ => (),
}
gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event); gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
@ -214,7 +220,7 @@ impl PadSinkHandler for InterSinkPadHandler {
pub struct InterSink { pub struct InterSink {
sinkpad: PadSink, sinkpad: PadSink,
sink_ctx: Mutex<Option<InterContextSink>>, sink_ctx: Mutex<Option<InterContextSink>>,
got_first_buffer: AtomicBool, stop: AtomicBool,
settings: Mutex<Settings>, settings: Mutex<Settings>,
} }
@ -225,20 +231,40 @@ impl InterSink {
} }
async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> { async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> {
if !self.got_first_buffer.load(Ordering::SeqCst) let shared_ctx = {
let mut local_ctx_grd = self.sink_ctx.lock().unwrap();
let local_ctx = local_ctx_grd.as_mut().expect("set in prepare");
if self.stop.load(Ordering::SeqCst) {
gst::log!(CAT, imp = self, "Stop requested");
return Err(gst::FlowError::Flushing);
}
let shared_ctx = local_ctx.shared.clone();
if !local_ctx.got_first_buffer
&& matches!( && matches!(
item, item,
DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_)
) )
{ {
self.got_first_buffer.store(true, Ordering::SeqCst); local_ctx.got_first_buffer = true;
drop(local_ctx_grd);
let _ = self.post_message(gst::message::Latency::new()); let _ = self.post_message(gst::message::Latency::new());
} }
let shared_ctx = self.shared_ctx(); shared_ctx
};
let shared_ctx = shared_ctx.read().await; let shared_ctx = shared_ctx.read().await;
for (_, dq) in shared_ctx.dataqueues.iter() { for (_, dq) in shared_ctx.dataqueues.iter() {
if self.stop.load(Ordering::SeqCst) {
gst::log!(CAT, imp = self, "Stop requested");
return Err(gst::FlowError::Flushing);
}
if dq.push(item.clone()).is_err() { if dq.push(item.clone()).is_err() {
gst::debug!(CAT, imp = self, "Failed to enqueue item: {item:?}"); gst::debug!(CAT, imp = self, "Failed to enqueue item: {item:?}");
} }
@ -280,14 +306,19 @@ impl InterSink {
fn start(&self) { fn start(&self) {
gst::debug!(CAT, imp = self, "Started"); gst::debug!(CAT, imp = self, "Started");
self.stop.store(false, Ordering::SeqCst);
} }
fn stop(&self) { fn stop(&self) {
gst::debug!(CAT, imp = self, "Stopping"); gst::debug!(CAT, imp = self, "Stopping");
self.got_first_buffer.store(false, Ordering::SeqCst); self.stop.store(true, Ordering::SeqCst);
let shared_ctx = self.shared_ctx(); let mut local_ctx = self.sink_ctx.lock().unwrap();
let local_ctx = local_ctx.as_mut().expect("set in prepare");
local_ctx.got_first_buffer = false;
let shared_ctx = local_ctx.shared.clone();
block_on(async move { block_on(async move {
shared_ctx.write().await.upstream_latency = None; shared_ctx.write().await.upstream_latency = None;
}); });
@ -309,7 +340,7 @@ impl ObjectSubclass for InterSink {
InterSinkPadHandler, InterSinkPadHandler,
), ),
sink_ctx: Mutex::new(None), sink_ctx: Mutex::new(None),
got_first_buffer: AtomicBool::new(false), stop: AtomicBool::new(true),
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
} }
} }
@ -376,13 +407,6 @@ impl ElementImpl for InterSink {
Some(&*ELEMENT_METADATA) Some(&*ELEMENT_METADATA)
} }
fn query(&self, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, imp = self, "Got {query:?}");
let res = self.parent_query(query);
gst::log!(CAT, imp = self, "Parent returned {res}, {query:?}");
res
}
fn send_event(&self, event: gst::Event) -> bool { fn send_event(&self, event: gst::Event) -> bool {
gst::log!(CAT, imp = self, "Got {event:?}"); gst::log!(CAT, imp = self, "Got {event:?}");
@ -392,7 +416,7 @@ impl ElementImpl for InterSink {
let obj = self.obj().clone(); let obj = self.obj().clone();
let shared_ctx = self.shared_ctx(); let shared_ctx = self.shared_ctx();
let _ = block_on_or_add_sub_task(async move { let res = block_on_or_add_sub_task(async move {
let mut shared_ctx = shared_ctx.write().await; let mut shared_ctx = shared_ctx.write().await;
shared_ctx.upstream_latency = Some(latency); shared_ctx.upstream_latency = Some(latency);
@ -401,10 +425,22 @@ impl ElementImpl for InterSink {
} else { } else {
gst::log!(CAT, obj = obj, "Setting upstream latency {latency}"); gst::log!(CAT, obj = obj, "Setting upstream latency {latency}");
for (_, src) in shared_ctx.sources.iter() { for (_, src) in shared_ctx.sources.iter() {
if obj.imp().stop.load(Ordering::SeqCst) {
gst::log!(CAT, obj = obj, "Stop requested");
return ControlFlow::Break;
}
src.imp().set_upstream_latency(latency); src.imp().set_upstream_latency(latency);
} }
} }
}); ControlFlow::Continue
})
.unwrap_or(ControlFlow::Break);
if res.is_break() {
// We are stopping, don't propagate upstream
return false;
}
} }
self.sinkpad.gst_pad().push_event(event) self.sinkpad.gst_pad().push_event(event)