From dccdda96b6e19749fc3ae1f75c17ed322acc238a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 5 Aug 2025 20:24:00 +0200 Subject: [PATCH] threadshare: queue & proxy: fix race condition stopping When stopping `ts-queue` & `ts-proxysink`, the pending queue is removed. There is a short period of time when a buffer can be handled after the pending queue is removed but before the element is actually stopped. Since the dataqueue no longer accepts buffers and the pending queue is removed, a new pending queue was created and the element was set to wait for queue space. The stopping procedure is handled with `last_res` `Mutex` locked and set to `Flushing`. This commit adds a check to `last_res` before creating a new pending queue and aborts if the element is `Flushing`. Part-of: --- generic/threadshare/Cargo.toml | 12 ++++++++++++ generic/threadshare/src/proxy/imp.rs | 4 ++++ generic/threadshare/src/queue/imp.rs | 11 +++++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 296a75659..0cf9a8f6b 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -75,6 +75,18 @@ path = "tests/inter.rs" name = "ts-pad" path = "tests/pad.rs" +[[test]] +name = "ts-pipeline" +path = "tests/pipeline.rs" + +[[test]] +name = "ts-proxy" +path = "tests/proxy.rs" + +[[test]] +name = "ts-queue" +path = "tests/queue.rs" + [[test]] name = "ts-rtpdtmfsrc" path = "tests/rtpdtmfsrc.rs" diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 6490a0f3e..5390c1bd5 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -406,6 +406,10 @@ impl ProxySink { .unwrap_or(true) { if shared_ctx.pending_queue.is_none() { + if shared_ctx.last_res == Err(gst::FlowError::Flushing) { + return Err(gst::FlowError::Flushing); + } + shared_ctx.pending_queue = Some(PendingQueue::default()); } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 5298641df..3620a432c 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -336,12 +336,12 @@ impl TaskImpl for QueueTask { self.dataqueue.stop(); self.dataqueue.clear(); + *last_res = Err(gst::FlowError::Flushing); + if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() { pending_queue.notify_more_queue_space(); } - *last_res = Err(gst::FlowError::Flushing); - gst::log!(CAT, obj = self.element, "Task flush started"); Ok(()) } @@ -471,6 +471,13 @@ impl Queue { .unwrap_or(true) { if pending_queue.is_none() { + // Lock order: last_res, then pending_queue + drop(pending_queue); + if *self.last_res.lock().unwrap() == Err(gst::FlowError::Flushing) { + return Err(gst::FlowError::Flushing); + } + pending_queue = self.pending_queue.lock().unwrap(); + *pending_queue = Some(PendingQueue { more_queue_space_sender: None, scheduled: false,