From fb7929dda68bcf091811750eebe26a8ff1c10c0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Fri, 5 Aug 2022 14:44:50 +0200 Subject: [PATCH] ts: update elements for new TransitionStatus See previous commit --- generic/threadshare/src/appsrc/imp.rs | 19 ++++------- generic/threadshare/src/jitterbuffer/imp.rs | 21 +++++------- generic/threadshare/src/proxy/imp.rs | 19 ++++------- generic/threadshare/src/queue/imp.rs | 21 +++++------- generic/threadshare/src/tcpclientsrc/imp.rs | 35 +++++++++++-------- generic/threadshare/src/udpsink/imp.rs | 17 ++++------ generic/threadshare/src/udpsrc/imp.rs | 19 ++++------- generic/threadshare/tests/pad.rs | 37 ++++++++++----------- 8 files changed, 81 insertions(+), 107 deletions(-) diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 120ed55f..0fa4c282 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -94,8 +94,8 @@ impl PadSrcHandler for AppSrcPadHandler { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { - EventView::FlushStart(..) => appsrc.task.flush_start().is_ok(), - EventView::FlushStop(..) => appsrc.task.flush_stop().is_ok(), + EventView::FlushStart(..) => appsrc.task.flush_start().await_maybe_on_context().is_ok(), + EventView::FlushStop(..) => appsrc.task.flush_stop().await_maybe_on_context().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -396,12 +396,7 @@ impl AppSrc { self.task .prepare(AppSrcTask::new(element.clone(), receiver), context) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(CAT, obj: element, "Prepared"); @@ -412,28 +407,28 @@ impl AppSrc { gst::debug!(CAT, obj: element, "Unpreparing"); *self.sender.lock().unwrap() = None; - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().block_on()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().block_on()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - self.task.pause()?; + let _ = self.task.pause().check()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 16018ac3..76f589ea 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -590,7 +590,7 @@ impl PadSinkHandler for SinkHandler { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); if let EventView::FlushStart(..) = event.view() { - if let Err(err) = jb.task.flush_start() { + if let Err(err) = jb.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::element_error!( element, @@ -632,7 +632,7 @@ impl PadSinkHandler for SinkHandler { state.segment = e.segment().clone().downcast::().unwrap(); } EventView::FlushStop(..) => { - if let Err(err) = jb.task.flush_stop() { + if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( element, @@ -908,7 +908,7 @@ impl PadSrcHandler for SrcHandler { match event.view() { EventView::FlushStart(..) => { - if let Err(err) = jb.task.flush_start() { + if let Err(err) = jb.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::element_error!( element, @@ -920,7 +920,7 @@ impl PadSrcHandler for SrcHandler { } } EventView::FlushStop(..) => { - if let Err(err) = jb.task.flush_stop() { + if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( element, @@ -1286,12 +1286,7 @@ impl JitterBuffer { JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler), context, ) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(CAT, obj: element, "Prepared"); @@ -1300,20 +1295,20 @@ impl JitterBuffer { fn unprepare(&self, element: &super::JitterBuffer) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().block_on()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().block_on()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 731218c9..3d47e366 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -734,7 +734,7 @@ impl PadSrcHandler for ProxySrcPadHandler { match event.view() { EventView::FlushStart(..) => { - if let Err(err) = proxysrc.task.flush_start() { + if let Err(err) = proxysrc.task.flush_start().await_maybe_on_context() { gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::element_error!( element, @@ -746,7 +746,7 @@ impl PadSrcHandler for ProxySrcPadHandler { } } EventView::FlushStop(..) => { - if let Err(err) = proxysrc.task.flush_stop() { + if let Err(err) = proxysrc.task.flush_stop().await_maybe_on_context() { gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( element, @@ -1045,12 +1045,7 @@ impl ProxySrc { self.task .prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(SRC_CAT, obj: element, "Prepared"); @@ -1066,7 +1061,7 @@ impl ProxySrc { proxy_src_pads.remove(&settings.proxy_context); } - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); *self.dataqueue.lock().unwrap() = None; *self.proxy_ctx.lock().unwrap() = None; @@ -1076,21 +1071,21 @@ impl ProxySrc { fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { gst::debug!(SRC_CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().await_maybe_on_context()?; gst::debug!(SRC_CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { gst::debug!(SRC_CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().await_maybe_on_context()?; gst::debug!(SRC_CAT, obj: element, "Started"); Ok(()) } fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { gst::debug!(SRC_CAT, obj: element, "Pausing"); - self.task.pause()?; + let _ = self.task.pause().check()?; gst::debug!(SRC_CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 029a003a..9ed640b1 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -135,7 +135,7 @@ impl PadSinkHandler for QueuePadSinkHandler { gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); if let EventView::FlushStart(..) = event.view() { - if let Err(err) = queue.task.flush_start() { + if let Err(err) = queue.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::element_error!( element, @@ -169,7 +169,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let queue = element.imp(); if let EventView::FlushStop(..) = event.view() { - if let Err(err) = queue.task.flush_stop() { + if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( element, @@ -229,12 +229,12 @@ impl PadSrcHandler for QueuePadSrcHandler { match event.view() { EventView::FlushStart(..) => { - if let Err(err) = queue.task.flush_start() { + if let Err(err) = queue.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); } } EventView::FlushStop(..) => { - if let Err(err) = queue.task.flush_stop() { + if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( element, @@ -639,12 +639,7 @@ impl Queue { self.task .prepare(QueueTask::new(element.clone(), dataqueue), context) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(CAT, obj: element, "Prepared"); @@ -654,7 +649,7 @@ impl Queue { fn unprepare(&self, element: &super::Queue) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); *self.dataqueue.lock().unwrap() = None; *self.pending_queue.lock().unwrap() = None; @@ -666,14 +661,14 @@ impl Queue { fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().await_maybe_on_context()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().await_maybe_on_context()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 69fe7b2b..32ef55e5 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -108,8 +108,16 @@ impl PadSrcHandler for TcpClientSrcPadHandler { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { - EventView::FlushStart(..) => tcpclientsrc.task.flush_start().is_ok(), - EventView::FlushStop(..) => tcpclientsrc.task.flush_stop().is_ok(), + EventView::FlushStart(..) => tcpclientsrc + .task + .flush_start() + .await_maybe_on_context() + .is_ok(), + EventView::FlushStop(..) => tcpclientsrc + .task + .flush_stop() + .await_maybe_on_context() + .is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -440,46 +448,45 @@ impl TcpClientSrc { let saddr = SocketAddr::new(host, port as u16); - self.task + // Don't block on `prepare` as the socket connection takes time. + // This will be performed in the background and we'll block on + // `start` which will also ensure `prepare` completed successfully. + let _ = self + .task .prepare( TcpClientSrcTask::new(element.clone(), saddr, buffer_pool), context, ) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .check()?; - gst::debug!(CAT, obj: element, "Prepared"); + gst::debug!(CAT, obj: element, "Preparing asynchronously"); Ok(()) } fn unprepare(&self, element: &super::TcpClientSrc) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().block_on()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().block_on()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - self.task.pause()?; + let _ = self.task.pause().check()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 9a09ba80..1a909c91 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -186,7 +186,7 @@ impl PadSinkHandler for UdpSinkPadHandler { async move { if let EventView::FlushStop(_) = event.view() { let udpsink = element.imp(); - return udpsink.task.flush_stop().is_ok(); + return udpsink.task.flush_stop().await_maybe_on_context().is_ok(); } else if sender.send_async(TaskItem::Event(event)).await.is_err() { gst::debug!(CAT, obj: &element, "Flushing"); } @@ -204,7 +204,7 @@ impl PadSinkHandler for UdpSinkPadHandler { event: gst::Event, ) -> bool { if let EventView::FlushStart(..) = event.view() { - return udpsink.task.flush_start().is_ok(); + return udpsink.task.flush_start().await_maybe_on_context().is_ok(); } true @@ -805,12 +805,7 @@ impl UdpSink { let (item_sender, item_receiver) = flume::bounded(0); let (cmd_sender, cmd_receiver) = flume::unbounded(); let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver); - self.task.prepare(task_impl, context).map_err(|err| { - error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + self.task.prepare(task_impl, context).block_on()?; *self.item_sender.lock().unwrap() = Some(item_sender); *self.cmd_sender.lock().unwrap() = Some(cmd_sender); @@ -822,20 +817,20 @@ impl UdpSink { fn unprepare(&self, element: &super::UdpSink) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().block_on()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().block_on()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index fb3265b8..5c66f0ac 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -125,8 +125,8 @@ impl PadSrcHandler for UdpSrcPadHandler { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { - EventView::FlushStart(..) => udpsrc.task.flush_start().is_ok(), - EventView::FlushStop(..) => udpsrc.task.flush_stop().is_ok(), + EventView::FlushStart(..) => udpsrc.task.flush_start().await_maybe_on_context().is_ok(), + EventView::FlushStop(..) => udpsrc.task.flush_stop().await_maybe_on_context().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -587,12 +587,7 @@ impl UdpSrc { *self.configured_caps.lock().unwrap() = None; self.task .prepare(UdpSrcTask::new(element.clone()), context) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(CAT, obj: element, "Prepared"); @@ -601,27 +596,27 @@ impl UdpSrc { fn unprepare(&self, element: &super::UdpSrc) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Stopping"); - self.task.stop()?; + self.task.stop().block_on()?; gst::debug!(CAT, obj: element, "Stopped"); Ok(()) } fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Starting"); - self.task.start()?; + self.task.start().block_on()?; gst::debug!(CAT, obj: element, "Started"); Ok(()) } fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - self.task.pause()?; + let _ = self.task.pause().check()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 106d7b99..89917b52 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -97,15 +97,17 @@ mod imp_src { gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { - EventView::FlushStart(..) => { - elem_src_test.task.flush_start().unwrap(); - true - } + EventView::FlushStart(..) => elem_src_test + .task + .flush_start() + .await_maybe_on_context() + .is_ok(), EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, - EventView::FlushStop(..) => { - elem_src_test.task.flush_stop().unwrap(); - true - } + EventView::FlushStop(..) => elem_src_test + .task + .flush_stop() + .await_maybe_on_context() + .is_ok(), _ => false, }; @@ -243,12 +245,7 @@ mod imp_src { self.task .prepare(ElementSrcTestTask::new(element.clone(), receiver), context) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["Error preparing Task: {:?}", err] - ) - })?; + .block_on()?; gst::debug!(SRC_CAT, obj: element, "Prepared"); @@ -259,26 +256,26 @@ mod imp_src { gst::debug!(SRC_CAT, obj: element, "Unpreparing"); *self.sender.lock().unwrap() = None; - self.task.unprepare().unwrap(); + self.task.unprepare().block_on().unwrap(); gst::debug!(SRC_CAT, obj: element, "Unprepared"); } fn stop(&self, element: &super::ElementSrcTest) { gst::debug!(SRC_CAT, obj: element, "Stopping"); - self.task.stop().unwrap(); + self.task.stop().await_maybe_on_context().unwrap(); gst::debug!(SRC_CAT, obj: element, "Stopped"); } fn start(&self, element: &super::ElementSrcTest) { gst::debug!(SRC_CAT, obj: element, "Starting"); - self.task.start().unwrap(); + self.task.start().await_maybe_on_context().unwrap(); gst::debug!(SRC_CAT, obj: element, "Started"); } fn pause(&self, element: &super::ElementSrcTest) { gst::debug!(SRC_CAT, obj: element, "Pausing"); - self.task.pause().unwrap(); + let _ = self.task.pause().check().unwrap(); gst::debug!(SRC_CAT, obj: element, "Paused"); } } @@ -421,10 +418,10 @@ mod imp_src { fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { match event.view() { EventView::FlushStart(..) => { - self.task.flush_start().unwrap(); + self.task.flush_start().await_maybe_on_context().unwrap(); } EventView::FlushStop(..) => { - self.task.flush_stop().unwrap(); + self.task.flush_stop().await_maybe_on_context().unwrap(); } _ => (), }