diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 0fa4c282..7f5a6360 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -238,19 +238,20 @@ impl AppSrcTask { } impl TaskImpl for AppSrcTask { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - let item = self.receiver.next().await.ok_or_else(|| { - gst::error!(CAT, obj: &self.element, "SrcPad channel aborted"); - gst::element_error!( - &self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason: channel aborted"] - ); - gst::FlowError::Flushing - })?; + type Item = StreamItem; + fn try_next(&mut self) -> BoxFuture<'_, Result> { + async move { + self.receiver + .next() + .await + .ok_or_else(|| panic!("Internal channel sender dropped while Task is Started")) + } + .boxed() + } + + fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { let res = self.push_item(item).await; match res { Ok(_) => { @@ -428,7 +429,7 @@ impl AppSrc { fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - let _ = self.task.pause().check()?; + self.task.pause().block_on()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 76f589ea..cb96c963 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -1066,6 +1066,8 @@ impl JitterBufferTask { } impl TaskImpl for JitterBufferTask { + type Item = (); + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Starting task"); @@ -1087,7 +1089,17 @@ impl TaskImpl for JitterBufferTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + // FIXME this function was migrated to the try_next / handle_item model + // but hasn't been touched as there are pending changes to jitterbuffer + // in https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/756. + // It should be possible to remove the loop below as try_next / handle_item + // are executed in a loop by the Task state machine. + // It should also be possible to store latency and context_wait as + // fields of JitterBufferTask so as to avoid locking the settings. + // If latency can change during processing, a command based mechanism + // could be implemented. See the command implemention for ts-udpsink as + // an example. + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { let jb = self.element.imp(); let (latency, context_wait) = { @@ -1223,6 +1235,10 @@ impl TaskImpl for JitterBufferTask { .boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + future::ok(()).boxed() + } + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 3d47e366..06e9e7d6 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -858,6 +858,8 @@ impl ProxySrcTask { } impl TaskImpl for ProxySrcTask { + type Item = DataQueueItem; + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(SRC_CAT, obj: &self.element, "Starting task"); @@ -880,18 +882,18 @@ impl TaskImpl for ProxySrcTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = self.dataqueue.next().await; - - let item = match item { - Some(item) => item, - None => { - gst::log!(SRC_CAT, obj: &self.element, "DataQueue Stopped"); - return Err(gst::FlowError::Flushing); - } - }; + self.dataqueue + .next() + .await + .ok_or_else(|| panic!("DataQueue stopped while Task is Started")) + } + .boxed() + } + fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { let res = self.push_item(item).await; let proxysrc = self.element.imp(); match res { @@ -1085,7 +1087,7 @@ impl ProxySrc { fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> { gst::debug!(SRC_CAT, obj: element, "Pausing"); - let _ = self.task.pause().check()?; + self.task.pause().block_on()?; gst::debug!(SRC_CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 9ed640b1..5ee8fc1d 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -328,6 +328,8 @@ impl QueueTask { } impl TaskImpl for QueueTask { + type Item = DataQueueItem; + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Starting task"); @@ -345,18 +347,18 @@ impl TaskImpl for QueueTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = self.dataqueue.next().await; - - let item = match item { - Some(item) => item, - None => { - gst::log!(CAT, obj: &self.element, "DataQueue Stopped"); - return Err(gst::FlowError::Flushing); - } - }; + self.dataqueue + .next() + .await + .ok_or_else(|| panic!("DataQueue stopped while Task is Started")) + } + .boxed() + } + fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async { let res = self.push_item(item).await; let queue = self.element.imp(); match res { diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index 790170b9..aa53d610 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -117,13 +117,13 @@ impl fmt::Display for SocketError { } } -pub type SocketStreamItem = Result<(gst::Buffer, Option), SocketError>; - impl Socket { // Can't implement this as a Stream trait because we end up using things like // tokio::net::UdpSocket which don't implement pollable functions. #[allow(clippy::should_implement_trait)] - pub async fn next(&mut self) -> Option { + pub async fn try_next( + &mut self, + ) -> Result<(gst::Buffer, Option), SocketError> { gst::log!(SOCKET_CAT, obj: &self.element, "Trying to read data"); if self.mapped_buffer.is_none() { @@ -133,7 +133,7 @@ impl Socket { } Err(err) => { gst::debug!(SOCKET_CAT, obj: &self.element, "Failed to acquire buffer {:?}", err); - return Some(Err(SocketError::Gst(err))); + return Err(SocketError::Gst(err)); } } } @@ -172,12 +172,12 @@ impl Socket { buffer.set_dts(dts); } - Some(Ok((buffer, saddr))) + Ok((buffer, saddr)) } Err(err) => { gst::debug!(SOCKET_CAT, obj: &self.element, "Read error {:?}", err); - Some(Err(SocketError::Io(err))) + Err(SocketError::Io(err)) } } } diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 32ef55e5..6ebe1253 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -278,6 +278,8 @@ impl TcpClientSrcTask { } impl TaskImpl for TcpClientSrcTask { + type Item = gst::Buffer; + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr); @@ -331,41 +333,44 @@ impl TaskImpl for TcpClientSrcTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { - gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); - gst::FlowError::Flushing - })?; - - let (buffer, _) = item.map_err(|err| { - gst::error!(CAT, obj: &self.element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); + self.socket + .as_mut() + .unwrap() + .try_next() + .await + .map(|(buffer, _saddr)| buffer) + .map_err(|err| { + gst::error!(CAT, obj: &self.element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } } - SocketError::Io(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } - } - gst::FlowError::Error - })?; - - self.push_buffer(buffer).await.map(drop) + gst::FlowError::Error + }) } .boxed() } + fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> { + self.push_buffer(buffer).map_ok(drop).boxed() + } + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); @@ -486,7 +491,7 @@ impl TcpClientSrc { fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - let _ = self.task.pause().check()?; + self.task.pause().block_on()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 1a909c91..fa07ffe9 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -691,6 +691,8 @@ impl UdpSinkTask { } impl TaskImpl for UdpSinkTask { + type Item = TaskItem; + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::info!(CAT, obj: &self.element, "Preparing Task"); @@ -725,17 +727,25 @@ impl TaskImpl for UdpSinkTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = futures::select_biased! { - cmd = self.cmd_receiver.recv_async() => { - self.process_command(cmd.unwrap()); - return Ok(()); + loop { + futures::select_biased! { + cmd = self.cmd_receiver.recv_async() => { + self.process_command(cmd.unwrap()); + } + item = self.item_receiver.recv_async() => { + break item.map_err(|_| panic!("Internal channel sender dropped while Task is Started")) + } } - item = self.item_receiver.recv_async() => item, - }; + } + } + .boxed() + } - match item.map_err(|_| gst::FlowError::Flushing)? { + fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + match item { TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { element_error!( &self.element, diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 5c66f0ac..1f6ded99 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -208,67 +208,11 @@ impl UdpSrcTask { need_segment: true, } } - - async fn push_buffer( - &mut self, - buffer: gst::Buffer, - ) -> Result { - gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); - let udpsrc = self.element.imp(); - - if self.need_initial_events { - gst::debug!(CAT, obj: &self.element, "Pushing initial events"); - - let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - let stream_start_evt = gst::event::StreamStart::builder(&stream_id) - .group_id(gst::GroupId::next()) - .build(); - udpsrc.src_pad.push_event(stream_start_evt).await; - - let caps = udpsrc.settings.lock().unwrap().caps.clone(); - if let Some(caps) = caps { - udpsrc - .src_pad - .push_event(gst::event::Caps::new(&caps)) - .await; - *udpsrc.configured_caps.lock().unwrap() = Some(caps); - } - - self.need_initial_events = false; - } - - if self.need_segment { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); - udpsrc.src_pad.push_event(segment_evt).await; - - self.need_segment = false; - } - - let res = udpsrc.src_pad.push(buffer).await; - match res { - Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"), - Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"), - Err(gst::FlowError::Eos) => { - gst::debug!(CAT, obj: &self.element, "EOS"); - udpsrc.src_pad.push_event(gst::event::Eos::new()).await; - } - Err(err) => { - gst::error!(CAT, obj: &self.element, "Got error {}", err); - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - } - - res - } } impl TaskImpl for UdpSrcTask { + type Item = gst::Buffer; + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { let udpsrc = self.element.imp(); @@ -489,46 +433,105 @@ impl TaskImpl for UdpSrcTask { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { - gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); - gst::FlowError::Flushing - })?; + self.socket + .as_mut() + .unwrap() + .try_next() + .await + .map(|(mut buffer, saddr)| { + if let Some(saddr) = saddr { + if self.retrieve_sender_address { + NetAddressMeta::add( + buffer.get_mut().unwrap(), + &gio::InetSocketAddress::from(saddr), + ); + } + } + buffer + }) + .map_err(|err| { + gst::error!(CAT, obj: &self.element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } + } + gst::FlowError::Error + }) + } + .boxed() + } - let (mut buffer, saddr) = item.map_err(|err| { - gst::error!(CAT, obj: &self.element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } + fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async { + gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); + let udpsrc = self.element.imp(); + + if self.need_initial_events { + gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + + let stream_id = + format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + udpsrc.src_pad.push_event(stream_start_evt).await; + + let caps = udpsrc.settings.lock().unwrap().caps.clone(); + if let Some(caps) = caps { + udpsrc + .src_pad + .push_event(gst::event::Caps::new(&caps)) + .await; + *udpsrc.configured_caps.lock().unwrap() = Some(caps); } - gst::FlowError::Error - })?; - if let Some(saddr) = saddr { - if self.retrieve_sender_address { - NetAddressMeta::add( - buffer.get_mut().unwrap(), - &gio::InetSocketAddress::from(saddr), + self.need_initial_events = false; + } + + if self.need_segment { + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + udpsrc.src_pad.push_event(segment_evt).await; + + self.need_segment = false; + } + + let res = udpsrc.src_pad.push(buffer).await.map(drop); + match res { + Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"), + Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"), + Err(gst::FlowError::Eos) => { + gst::debug!(CAT, obj: &self.element, "EOS"); + udpsrc.src_pad.push_event(gst::event::Eos::new()).await; + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Got error {}", err); + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] ); } } - self.push_buffer(buffer).await.map(drop) + res } .boxed() } @@ -616,7 +619,7 @@ impl UdpSrc { fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pausing"); - let _ = self.task.pause().check()?; + self.task.pause().block_on()?; gst::debug!(CAT, obj: element, "Paused"); Ok(()) } diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 89917b52..4920e2c7 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -155,19 +155,21 @@ mod imp_src { } impl TaskImpl for ElementSrcTestTask { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = Item; + + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - let item = self.receiver.next().await; + self.receiver.next().await.ok_or_else(|| { + gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted"); + gst::FlowError::Eos + }) + } + .boxed() + } - let item = match item { - Some(item) => item, - None => { - gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted"); - return Err(gst::FlowError::Eos); - } - }; - - let res = self.push_item(item).await; + fn handle_item(&mut self, item: Item) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let res = self.push_item(item).await.map(drop); match res { Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"), Err(gst::FlowError::Flushing) => { @@ -176,7 +178,7 @@ mod imp_src { Err(err) => panic!("Got error {}", err), } - res.map(drop) + res } .boxed() } @@ -275,7 +277,7 @@ mod imp_src { fn pause(&self, element: &super::ElementSrcTest) { gst::debug!(SRC_CAT, obj: element, "Pausing"); - let _ = self.task.pause().check().unwrap(); + self.task.pause().block_on().unwrap(); gst::debug!(SRC_CAT, obj: element, "Paused"); } }