From 7173790da26ef054bf30c74f43bf7585b52f35cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 6 Apr 2020 14:01:16 +0300 Subject: [PATCH] threadshare/udpsink: Return Flushing if the sender gets disconnected This can only happen if the receiver is dropped, which only happens when the task is stopped. As such, Flushing should be returned instead of panicking. --- generic/threadshare/src/udpsink.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink.rs index 3c95f25c..18cd414f 100644 --- a/generic/threadshare/src/udpsink.rs +++ b/generic/threadshare/src/udpsink.rs @@ -773,14 +773,18 @@ impl PadSinkHandler for UdpSinkPadHandler { &self, _pad: &PadSinkRef, _udpsink: &UdpSink, - _element: &gst::Element, + element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone(); async move { if let Some(sender) = sender.lock().await.as_mut() { - sender.send(TaskItem::Buffer(buffer)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } } Ok(gst::FlowSuccess::Ok) } @@ -791,15 +795,19 @@ impl PadSinkHandler for UdpSinkPadHandler { &self, _pad: &PadSinkRef, _udpsink: &UdpSink, - _element: &gst::Element, + element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { let sender = Arc::clone(&self.0.read().unwrap().sender); + let element = element.clone(); async move { if let Some(sender) = sender.lock().await.as_mut() { for buffer in list.iter_owned() { - sender.send(TaskItem::Buffer(buffer)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); + } } } @@ -823,7 +831,9 @@ impl PadSinkHandler for UdpSinkPadHandler { let udpsink = UdpSink::from_instance(&element); let _ = udpsink.start(&element); } else if let Some(sender) = sender.lock().await.as_mut() { - sender.send(TaskItem::Event(event)).await.unwrap(); + if let Err(_) = sender.send(TaskItem::Event(event)).await { + gst_debug!(CAT, obj: &element, "Flushing"); + } } true