diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink.rs index 3c95f25c..18cd414f 100644 --- a/generic/threadshare/src/udpsink.rs +++ b/generic/threadshare/src/udpsink.rs @@ -773,14 +773,18 @@ impl PadSinkHandler for UdpSinkPadHandler { &self, _pad: &PadSinkRef, _udpsink: &UdpSink, - _element: &gst::Element, + element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone(); async move { if let Some(sender) = sender.lock().await.as_mut() { - sender.send(TaskItem::Buffer(buffer)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } } Ok(gst::FlowSuccess::Ok) } @@ -791,15 +795,19 @@ impl PadSinkHandler for UdpSinkPadHandler { &self, _pad: &PadSinkRef, _udpsink: &UdpSink, - _element: &gst::Element, + element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone(); async move { if let Some(sender) = sender.lock().await.as_mut() { for buffer in list.iter_owned() { - sender.send(TaskItem::Buffer(buffer)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } } } @@ -823,7 +831,9 @@ impl PadSinkHandler for UdpSinkPadHandler { let udpsink = UdpSink::from_instance(&element); let _ = udpsink.start(&element); } else if let Some(sender) = sender.lock().await.as_mut() { - sender.send(TaskItem::Event(event)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Event(event)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + } } true