threadshare: queue & proxy: fix race condition stopping

When stopping `ts-queue` & `ts-proxysink`, the pending queue is removed. There
is a short period of time when a buffer can be handled after the pending queue
is removed but before the element is actually stopped. Since the dataqueue no
longer accepts buffers and the pending queue is removed, a new pending queue was
created and the element was set to wait for queue space.

The stopping procedure is handled with `last_res` `Mutex` locked and set to
`Flushing`.

This commit adds a check to `last_res` before creating a new pending queue and
aborts if the element is `Flushing`.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2428>
This commit is contained in:
François Laignel 2025-08-05 20:24:00 +02:00 committed by GStreamer Marge Bot
parent b61d790081
commit dccdda96b6
3 changed files with 25 additions and 2 deletions

View file

@ -75,6 +75,18 @@ path = "tests/inter.rs"
name = "ts-pad"
path = "tests/pad.rs"
[[test]]
name = "ts-pipeline"
path = "tests/pipeline.rs"
[[test]]
name = "ts-proxy"
path = "tests/proxy.rs"
[[test]]
name = "ts-queue"
path = "tests/queue.rs"
[[test]]
name = "ts-rtpdtmfsrc"
path = "tests/rtpdtmfsrc.rs"

View file

@ -406,6 +406,10 @@ impl ProxySink {
.unwrap_or(true)
{
if shared_ctx.pending_queue.is_none() {
if shared_ctx.last_res == Err(gst::FlowError::Flushing) {
return Err(gst::FlowError::Flushing);
}
shared_ctx.pending_queue = Some(PendingQueue::default());
}

View file

@ -336,12 +336,12 @@ impl TaskImpl for QueueTask {
self.dataqueue.stop();
self.dataqueue.clear();
*last_res = Err(gst::FlowError::Flushing);
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
}
@ -471,6 +471,13 @@ impl Queue {
.unwrap_or(true)
{
if pending_queue.is_none() {
// Lock order: last_res, then pending_queue
drop(pending_queue);
if *self.last_res.lock().unwrap() == Err(gst::FlowError::Flushing) {
return Err(gst::FlowError::Flushing);
}
pending_queue = self.pending_queue.lock().unwrap();
*pending_queue = Some(PendingQueue {
more_queue_space_sender: None,
scheduled: false,