threadshare: fix flush for ts-queue ts-proxy

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2400>
This commit is contained in:
François Laignel 2025-07-29 16:26:04 +02:00
parent 9f4a8d3eeb
commit ecc198e5c2
3 changed files with 31 additions and 42 deletions

View file

@ -74,6 +74,10 @@ path = "examples/inter/simple.rs"
name = "ts-inter"
path = "tests/inter.rs"
[[test]]
name = "ts-pad"
path = "tests/pad.rs"
[[test]]
name = "ts-rtpdtmfsrc"
path = "tests/rtpdtmfsrc.rs"

View file

@ -679,7 +679,6 @@ impl PadSrcHandler for ProxySrcPadHandler {
("Internal data stream error"),
["FlushStart failed {:?}", err]
);
return false;
}
}
EventView::FlushStop(..) => {
@ -691,7 +690,6 @@ impl PadSrcHandler for ProxySrcPadHandler {
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
}
_ => (),
@ -798,13 +796,8 @@ impl TaskImpl for ProxySrcTask {
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
self.dataqueue.start();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
gst::log!(SRC_CAT, obj = self.element, "Task started");
Ok(())
@ -858,13 +851,20 @@ impl TaskImpl for ProxySrcTask {
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Stopping task");
self.flush_start().await?;
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Task flush start");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
self.dataqueue.stop();
self.dataqueue.clear();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
@ -872,22 +872,14 @@ impl TaskImpl for ProxySrcTask {
pending_queue.notify_more_queue_space();
}
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Starting task flush");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Task flush stop");
self.start().await?;
gst::log!(SRC_CAT, obj = self.element, "Task flush stopped");
Ok(())
}
}

View file

@ -133,7 +133,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let imp = elem.imp();
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
if let Err(err) = imp.task.flush_stop().await {
gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
@ -141,7 +141,6 @@ impl PadSinkHandler for QueuePadSinkHandler {
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
}
@ -188,7 +187,6 @@ impl PadSrcHandler for QueuePadSrcHandler {
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
}
_ => (),
@ -272,7 +270,6 @@ impl TaskImpl for QueueTask {
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.start();
*last_res = Ok(gst::FlowSuccess::Ok);
gst::log!(CAT, obj = self.element, "Task started");
@ -320,6 +317,13 @@ impl TaskImpl for QueueTask {
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task");
self.flush_start().await?;
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Task flush start");
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
@ -333,25 +337,14 @@ impl TaskImpl for QueueTask {
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task stopped");
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task flush");
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.clear();
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task flush started");
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Task flush stop");
self.start().await?;
gst::log!(CAT, obj = self.element, "Task flush stopped");
Ok(())
}
}