diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index fa07ffe9..a9efaca3 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -19,6 +19,7 @@ use futures::future::BoxFuture; use futures::prelude::*; +use futures::stream::Peekable; use gst::glib; use gst::prelude::*; @@ -34,7 +35,9 @@ use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::pin::Pin; use std::sync::Mutex; +use std::task::Poll; use std::time::Duration; use std::u16; use std::u8; @@ -220,10 +223,9 @@ enum Command { SetSync(bool), } -#[derive(Debug)] struct UdpSinkTask { element: super::UdpSink, - item_receiver: flume::Receiver, + item_receiver: Peekable>, cmd_receiver: flume::Receiver, clients: BTreeSet, socket: Option>, @@ -241,7 +243,7 @@ impl UdpSinkTask { ) -> Self { UdpSinkTask { element: element.clone(), - item_receiver, + item_receiver: item_receiver.into_stream().peekable(), cmd_receiver, clients: Default::default(), socket: None, @@ -252,6 +254,11 @@ impl UdpSinkTask { } } + async fn flush(&mut self) { + // Purge the channel + while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {} + } + fn process_command(&mut self, cmd: Command) { use Command::*; match cmd { @@ -619,17 +626,6 @@ impl UdpSinkTask { /// Buffer handling. impl UdpSinkTask { async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { - if self.sync { - let rtime = self.segment.as_ref().and_then(|segment| { - segment - .downcast_ref::() - .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency)) - }); - if let Some(rtime) = rtime { - self.sync(rtime).await; - } - } - let data = buffer.map_readable().map_err(|_| { element_error!( self.element, @@ -681,11 +677,9 @@ impl UdpSinkTask { async fn sync(&self, running_time: gst::ClockTime) { let now = self.element.current_running_time(); - match running_time.opt_checked_sub(now) { - Ok(Some(delay)) => { - runtime::time::delay_for(delay.into()).await; - } - _ => runtime::executor::yield_now().await, + if let Ok(Some(delay)) = running_time.opt_checked_sub(now) { + gst::trace!(CAT, obj: &self.element, "sync: waiting {}", delay); + runtime::time::delay_for(delay.into()).await; } } } @@ -730,12 +724,39 @@ impl TaskImpl for UdpSinkTask { fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { loop { + gst::info!(CAT, obj: &self.element, "Awaiting next item or command"); futures::select_biased! { cmd = self.cmd_receiver.recv_async() => { self.process_command(cmd.unwrap()); } - item = self.item_receiver.recv_async() => { - break item.map_err(|_| panic!("Internal channel sender dropped while Task is Started")) + item_opt = Pin::new(&mut self.item_receiver).peek() => { + // Check the peeked item in case we need to sync. + // The item will still be available in the channel + // in case this is cancelled by a state transition. + match item_opt { + Some(TaskItem::Buffer(buffer)) => { + if self.sync { + let rtime = self.segment.as_ref().and_then(|segment| { + segment + .downcast_ref::() + .and_then(|segment| { + segment.to_running_time(buffer.pts()).opt_add(self.latency) + }) + }); + if let Some(rtime) = rtime { + // This can be cancelled by a state transition. + self.sync(rtime).await; + } + } + } + Some(_) => (), + None => { + panic!("Internal channel sender dropped while Task is Started"); + } + } + + // An item was peeked above, we can now pop it without losing it. + return Ok(self.item_receiver.next().await.unwrap()); } } } @@ -745,6 +766,8 @@ impl TaskImpl for UdpSinkTask { fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { + gst::info!(CAT, obj: &self.element, "Handling {:?}", item); + match item { TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { element_error!( @@ -774,6 +797,24 @@ impl TaskImpl for UdpSinkTask { } .boxed() } + + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async { + gst::info!(CAT, obj: &self.element, "Stopping Task"); + self.flush().await; + Ok(()) + } + .boxed() + } + + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async { + gst::info!(CAT, obj: &self.element, "Starting Task Flush"); + self.flush().await; + Ok(()) + } + .boxed() + } } #[derive(Debug)]