From 7cd294526828ff32d1e766817b36211e32e36d21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 16 Mar 2018 19:11:53 +0200 Subject: [PATCH] threadshare: Implement error handling for IO/data flow errors Also create a proper stream id --- gst-plugin-threadshare/Cargo.toml | 1 + gst-plugin-threadshare/src/lib.rs | 2 + gst-plugin-threadshare/src/udpsocket.rs | 72 ++++++++-------------- gst-plugin-threadshare/src/udpsrc.rs | 79 ++++++++++++++++++++++--- 4 files changed, 99 insertions(+), 55 deletions(-) diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 7437218f..d9b9b631 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -14,6 +14,7 @@ tokio-executor = { git = "https://github.com/tokio-rs/tokio" } futures = "0.1" lazy_static = "1.0" either = "1.0" +rand = "0.4" [lib] name = "gstthreadshare" diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index a177c0a3..733c5daf 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -30,6 +30,8 @@ extern crate tokio_reactor; extern crate either; +extern crate rand; + #[macro_use] extern crate lazy_static; diff --git a/gst-plugin-threadshare/src/udpsocket.rs b/gst-plugin-threadshare/src/udpsocket.rs index 7a3d5e04..12804689 100644 --- a/gst-plugin-threadshare/src/udpsocket.rs +++ b/gst-plugin-threadshare/src/udpsocket.rs @@ -15,17 +15,19 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use std::marker::PhantomData; use std::sync::{Arc, Mutex}; +use std::io; use gst; use gst::prelude::*; use futures::{Async, Future, IntoFuture, Poll, Stream}; -use futures::{future, task}; +use futures::task; use futures::sync::oneshot; use tokio::net; +use either::Either; + use iocontext::*; lazy_static!{ @@ -36,30 +38,6 @@ lazy_static!{ ); } -// FIXME: Workaround for https://github.com/tokio-rs/tokio/issues/207 -struct YieldOnce(Option<()>, PhantomData); - -impl YieldOnce { - fn new() -> YieldOnce { - YieldOnce(None, PhantomData) - } -} - -impl Future for YieldOnce { - type Item = (); - type Error = E; - - fn poll(&mut self) -> Poll<(), E> { - if let Some(_) = self.0.take() { - Ok(Async::Ready(())) - } else { - self.0 = Some(()); - task::current().notify(); - Ok(Async::NotReady) - } - } -} - #[derive(Clone)] pub struct Socket(Arc>); @@ -100,11 +78,13 @@ impl Socket { }))) } - pub fn schedule Result<(), gst::FlowError> + Send + 'static>( - &self, - io_context: &IOContext, - func: F, - ) { + pub fn schedule(&self, io_context: &IOContext, func: F, err_func: G) + where + F: Fn(gst::Buffer) -> U + Send + 'static, + U: IntoFuture + 'static, + ::Future: Send + 'static, + G: FnOnce(Either) + Send + 'static, + { // Ready->Paused // // Need to wait for a possible shutdown to finish first @@ -128,17 +108,19 @@ impl Socket { let element_clone = inner.element.clone(); io_context.spawn( stream - .for_each(move |buffer| { - let res = func(buffer); - match res { - Ok(()) => future::Either::A(Ok(()).into_future()), - //Ok(()) => future::Either::A(YieldOnce::new()), - Err(err) => future::Either::B(Err(err).into_future()), - } - }) + .for_each(move |buffer| func(buffer).into_future().map_err(Either::Left)) .then(move |res| { - gst_debug!(SOCKET_CAT, obj: &element_clone, "Socket finished {:?}", res); - // TODO: Do something with errors here? + gst_debug!( + SOCKET_CAT, + obj: &element_clone, + "Socket finished: {:?}", + res + ); + + if let Err(err) = res { + err_func(err); + } + let _ = sender.send(()); Ok(()) @@ -232,7 +214,7 @@ struct SocketStream(Socket, Option>); impl Stream for SocketStream { type Item = gst::Buffer; - type Error = gst::FlowError; + type Error = Either; fn poll(&mut self) -> Poll, Self::Error> { let mut inner = (self.0).0.lock().unwrap(); @@ -258,7 +240,7 @@ impl Stream for SocketStream { } Err(err) => { gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err); - return Err(err.into_result().unwrap_err()); + return Err(Either::Left(err.into_result().unwrap_err())); } }, }; @@ -271,7 +253,7 @@ impl Stream for SocketStream { } Err(err) => { gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err); - return Err(gst::FlowError::Error); + return Err(Either::Right(err)); } Ok(Async::Ready(len)) => { let time = inner.clock.as_ref().unwrap().get_time(); @@ -291,8 +273,6 @@ impl Stream for SocketStream { buffer.set_dts(time); } - // TODO: Only ever poll the second again in Xms, using tokio-timer - Ok(Async::Ready(Some(buffer))) } } diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 2ef8eeaf..21eab8f6 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -29,6 +29,10 @@ use std::u16; use tokio::net; +use either::Either; + +use rand; + use iocontext::*; use udpsocket::*; @@ -293,8 +297,8 @@ impl UdpSrc { if state.need_initial_events { gst_debug!(self.cat, obj: element, "Pushing initial events"); - // TODO: Invent a stream id - events.push(gst::Event::new_stream_start("meh").build()); + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + events.push(gst::Event::new_stream_start(&stream_id).build()); if let Some(ref caps) = self.settings.lock().unwrap().caps { events.push(gst::Event::new_caps(&caps).build()); state.configured_caps = Some(caps.clone()); @@ -310,8 +314,38 @@ impl UdpSrc { self.src_pad.push_event(event); } - // TODO: Error handling - self.src_pad.push(buffer).into_result().map(|_| ()) + match self.src_pad.push(buffer).into_result() { + Ok(_) => { + gst_log!(self.cat, obj: element, "Successfully pushed buffer"); + Ok(()) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(self.cat, obj: element, "Flushing"); + let state = self.state.lock().unwrap(); + if let Some(ref socket) = state.socket { + socket.pause(); + } + Ok(()) + } + Err(gst::FlowError::Eos) => { + gst_debug!(self.cat, obj: element, "EOS"); + let state = self.state.lock().unwrap(); + if let Some(ref socket) = state.socket { + socket.pause(); + } + Ok(()) + } + Err(err) => { + gst_error!(self.cat, obj: element, "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + Err(gst::FlowError::CustomError) + } + } } fn prepare(&self, element: &Element) -> Result<(), ()> { @@ -321,9 +355,9 @@ impl UdpSrc { let settings = self.settings.lock().unwrap().clone(); - // TODO: Error handling let mut state = self.state.lock().unwrap(); + // TODO: Error handling let io_context = IOContext::new( &settings.context, settings.context_threads as isize, @@ -388,10 +422,37 @@ impl UdpSrc { let socket = Socket::new(&element.clone().upcast(), socket, buffer_pool); let element_clone = element.clone(); - socket.schedule(&io_context, move |buffer| { - let udpsrc = element_clone.get_impl().downcast_ref::().unwrap(); - udpsrc.push_buffer(&element_clone, buffer) - }); + let element_clone2 = element.clone(); + socket.schedule( + &io_context, + move |buffer| { + let udpsrc = element_clone.get_impl().downcast_ref::().unwrap(); + udpsrc.push_buffer(&element_clone, buffer) + }, + move |err| { + let udpsrc = element_clone2.get_impl().downcast_ref::().unwrap(); + gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err); + match err { + Either::Left(gst::FlowError::CustomError) => (), + Either::Left(err) => { + gst_element_error!( + element_clone2, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + Either::Right(err) => { + gst_element_error!( + element_clone2, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + } + } + }, + ); state.socket = Some(socket); state.io_context = Some(io_context);