diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index fcdc400b3..6b61693d3 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -74,6 +74,10 @@ path = "examples/inter/simple.rs" name = "ts-inter" path = "tests/inter.rs" +[[test]] +name = "ts-pad" +path = "tests/pad.rs" + [[test]] name = "ts-rtpdtmfsrc" path = "tests/rtpdtmfsrc.rs" diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index cbf88adeb..ece0b8cc2 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -679,7 +679,6 @@ impl PadSrcHandler for ProxySrcPadHandler { ("Internal data stream error"), ["FlushStart failed {:?}", err] ); - return false; } } EventView::FlushStop(..) => { @@ -691,7 +690,6 @@ impl PadSrcHandler for ProxySrcPadHandler { ("Internal data stream error"), ["FlushStop failed {:?}", err] ); - return false; } } _ => (), @@ -798,13 +796,8 @@ impl TaskImpl for ProxySrcTask { let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); - - if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() { - pending_queue.notify_more_queue_space(); - } - self.dataqueue.start(); + shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); gst::log!(SRC_CAT, obj = self.element, "Task started"); Ok(()) @@ -858,13 +851,20 @@ impl TaskImpl for ProxySrcTask { async fn stop(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(SRC_CAT, obj = self.element, "Stopping task"); + self.flush_start().await?; + gst::log!(SRC_CAT, obj = self.element, "Task stopped"); + Ok(()) + } + + async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(SRC_CAT, obj = self.element, "Task flush start"); let proxysrc = self.element.imp(); let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - self.dataqueue.clear(); self.dataqueue.stop(); + self.dataqueue.clear(); shared_ctx.last_res = Err(gst::FlowError::Flushing); @@ -872,22 +872,14 @@ impl TaskImpl for ProxySrcTask { pending_queue.notify_more_queue_space(); } - gst::log!(SRC_CAT, obj = self.element, "Task stopped"); + gst::log!(SRC_CAT, obj = self.element, "Task flush started"); Ok(()) } - async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> { - gst::log!(SRC_CAT, obj = self.element, "Starting task flush"); - - let proxysrc = self.element.imp(); - let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); - let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared(); - - self.dataqueue.clear(); - - shared_ctx.last_res = Err(gst::FlowError::Flushing); - - gst::log!(SRC_CAT, obj = self.element, "Task flush started"); + async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(SRC_CAT, obj = self.element, "Task flush stop"); + self.start().await?; + gst::log!(SRC_CAT, obj = self.element, "Task flush stopped"); Ok(()) } } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index dccde1325..040ba46a4 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -133,7 +133,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let imp = elem.imp(); if let gst::EventView::FlushStop(..) = event.view() { - if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { + if let Err(err) = imp.task.flush_stop().await { gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err); gst::element_imp_error!( imp, @@ -141,7 +141,6 @@ impl PadSinkHandler for QueuePadSinkHandler { ("Internal data stream error"), ["FlushStop failed {:?}", err] ); - return false; } } @@ -188,7 +187,6 @@ impl PadSrcHandler for QueuePadSrcHandler { ("Internal data stream error"), ["FlushStop failed {:?}", err] ); - return false; } } _ => (), @@ -272,7 +270,6 @@ impl TaskImpl for QueueTask { let mut last_res = queue.last_res.lock().unwrap(); self.dataqueue.start(); - *last_res = Ok(gst::FlowSuccess::Ok); gst::log!(CAT, obj = self.element, "Task started"); @@ -320,6 +317,13 @@ impl TaskImpl for QueueTask { async fn stop(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.element, "Stopping task"); + self.flush_start().await?; + gst::log!(CAT, obj = self.element, "Task stopped"); + Ok(()) + } + + async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(CAT, obj = self.element, "Task flush start"); let queue = self.element.imp(); let mut last_res = queue.last_res.lock().unwrap(); @@ -333,25 +337,14 @@ impl TaskImpl for QueueTask { *last_res = Err(gst::FlowError::Flushing); - gst::log!(CAT, obj = self.element, "Task stopped"); + gst::log!(CAT, obj = self.element, "Task flush started"); Ok(()) } - async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> { - gst::log!(CAT, obj = self.element, "Starting task flush"); - - let queue = self.element.imp(); - let mut last_res = queue.last_res.lock().unwrap(); - - self.dataqueue.clear(); - - if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() { - pending_queue.notify_more_queue_space(); - } - - *last_res = Err(gst::FlowError::Flushing); - - gst::log!(CAT, obj = self.element, "Task flush started"); + async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(CAT, obj = self.element, "Task flush stop"); + self.start().await?; + gst::log!(CAT, obj = self.element, "Task flush stopped"); Ok(()) } }