diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 296a75659..0cf9a8f6b 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -75,6 +75,18 @@ path = "tests/inter.rs" name = "ts-pad" path = "tests/pad.rs" +[[test]] +name = "ts-pipeline" +path = "tests/pipeline.rs" + +[[test]] +name = "ts-proxy" +path = "tests/proxy.rs" + +[[test]] +name = "ts-queue" +path = "tests/queue.rs" + [[test]] name = "ts-rtpdtmfsrc" path = "tests/rtpdtmfsrc.rs" diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 6490a0f3e..5390c1bd5 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -406,6 +406,10 @@ impl ProxySink { .unwrap_or(true) { if shared_ctx.pending_queue.is_none() { + if shared_ctx.last_res == Err(gst::FlowError::Flushing) { + return Err(gst::FlowError::Flushing); + } + shared_ctx.pending_queue = Some(PendingQueue::default()); } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 5298641df..3620a432c 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -336,12 +336,12 @@ impl TaskImpl for QueueTask { self.dataqueue.stop(); self.dataqueue.clear(); + *last_res = Err(gst::FlowError::Flushing); + if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() { pending_queue.notify_more_queue_space(); } - *last_res = Err(gst::FlowError::Flushing); - gst::log!(CAT, obj = self.element, "Task flush started"); Ok(()) } @@ -471,6 +471,13 @@ impl Queue { .unwrap_or(true) { if pending_queue.is_none() { + // Lock order: last_res, then pending_queue + drop(pending_queue); + if *self.last_res.lock().unwrap() == Err(gst::FlowError::Flushing) { + return Err(gst::FlowError::Flushing); + } + pending_queue = self.pending_queue.lock().unwrap(); + *pending_queue = Some(PendingQueue { more_queue_space_sender: None, scheduled: false,