threadshare: Implement error handling for IO/data flow errors

Also create a proper stream id
This commit is contained in:
Sebastian Dröge 2018-03-16 19:11:53 +02:00
parent 212b00ef2f
commit 7cd2945268
4 changed files with 99 additions and 55 deletions

View file

@ -14,6 +14,7 @@ tokio-executor = { git = "https://github.com/tokio-rs/tokio" }
futures = "0.1" futures = "0.1"
lazy_static = "1.0" lazy_static = "1.0"
either = "1.0" either = "1.0"
rand = "0.4"
[lib] [lib]
name = "gstthreadshare" name = "gstthreadshare"

View file

@ -30,6 +30,8 @@ extern crate tokio_reactor;
extern crate either; extern crate either;
extern crate rand;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;

View file

@ -15,17 +15,19 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use std::marker::PhantomData;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::io;
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
use futures::{Async, Future, IntoFuture, Poll, Stream}; use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::{future, task}; use futures::task;
use futures::sync::oneshot; use futures::sync::oneshot;
use tokio::net; use tokio::net;
use either::Either;
use iocontext::*; use iocontext::*;
lazy_static!{ lazy_static!{
@ -36,30 +38,6 @@ lazy_static!{
); );
} }
// FIXME: Workaround for https://github.com/tokio-rs/tokio/issues/207
struct YieldOnce<E>(Option<()>, PhantomData<E>);
impl<E> YieldOnce<E> {
fn new() -> YieldOnce<E> {
YieldOnce(None, PhantomData)
}
}
impl<E> Future for YieldOnce<E> {
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
if let Some(_) = self.0.take() {
Ok(Async::Ready(()))
} else {
self.0 = Some(());
task::current().notify();
Ok(Async::NotReady)
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Socket(Arc<Mutex<SocketInner>>); pub struct Socket(Arc<Mutex<SocketInner>>);
@ -100,11 +78,13 @@ impl Socket {
}))) })))
} }
pub fn schedule<F: Fn(gst::Buffer) -> Result<(), gst::FlowError> + Send + 'static>( pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G)
&self, where
io_context: &IOContext, F: Fn(gst::Buffer) -> U + Send + 'static,
func: F, U: IntoFuture<Item = (), Error = gst::FlowError> + 'static,
) { <U as IntoFuture>::Future: Send + 'static,
G: FnOnce(Either<gst::FlowError, io::Error>) + Send + 'static,
{
// Ready->Paused // Ready->Paused
// //
// Need to wait for a possible shutdown to finish first // Need to wait for a possible shutdown to finish first
@ -128,17 +108,19 @@ impl Socket {
let element_clone = inner.element.clone(); let element_clone = inner.element.clone();
io_context.spawn( io_context.spawn(
stream stream
.for_each(move |buffer| { .for_each(move |buffer| func(buffer).into_future().map_err(Either::Left))
let res = func(buffer);
match res {
Ok(()) => future::Either::A(Ok(()).into_future()),
//Ok(()) => future::Either::A(YieldOnce::new()),
Err(err) => future::Either::B(Err(err).into_future()),
}
})
.then(move |res| { .then(move |res| {
gst_debug!(SOCKET_CAT, obj: &element_clone, "Socket finished {:?}", res); gst_debug!(
// TODO: Do something with errors here? SOCKET_CAT,
obj: &element_clone,
"Socket finished: {:?}",
res
);
if let Err(err) = res {
err_func(err);
}
let _ = sender.send(()); let _ = sender.send(());
Ok(()) Ok(())
@ -232,7 +214,7 @@ struct SocketStream(Socket, Option<gst::MappedBuffer<gst::buffer::Writable>>);
impl Stream for SocketStream { impl Stream for SocketStream {
type Item = gst::Buffer; type Item = gst::Buffer;
type Error = gst::FlowError; type Error = Either<gst::FlowError, io::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut inner = (self.0).0.lock().unwrap(); let mut inner = (self.0).0.lock().unwrap();
@ -258,7 +240,7 @@ impl Stream for SocketStream {
} }
Err(err) => { Err(err) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err); gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err);
return Err(err.into_result().unwrap_err()); return Err(Either::Left(err.into_result().unwrap_err()));
} }
}, },
}; };
@ -271,7 +253,7 @@ impl Stream for SocketStream {
} }
Err(err) => { Err(err) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err); gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err);
return Err(gst::FlowError::Error); return Err(Either::Right(err));
} }
Ok(Async::Ready(len)) => { Ok(Async::Ready(len)) => {
let time = inner.clock.as_ref().unwrap().get_time(); let time = inner.clock.as_ref().unwrap().get_time();
@ -291,8 +273,6 @@ impl Stream for SocketStream {
buffer.set_dts(time); buffer.set_dts(time);
} }
// TODO: Only ever poll the second again in Xms, using tokio-timer
Ok(Async::Ready(Some(buffer))) Ok(Async::Ready(Some(buffer)))
} }
} }

View file

@ -29,6 +29,10 @@ use std::u16;
use tokio::net; use tokio::net;
use either::Either;
use rand;
use iocontext::*; use iocontext::*;
use udpsocket::*; use udpsocket::*;
@ -293,8 +297,8 @@ impl UdpSrc {
if state.need_initial_events { if state.need_initial_events {
gst_debug!(self.cat, obj: element, "Pushing initial events"); gst_debug!(self.cat, obj: element, "Pushing initial events");
// TODO: Invent a stream id let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(gst::Event::new_stream_start("meh").build()); events.push(gst::Event::new_stream_start(&stream_id).build());
if let Some(ref caps) = self.settings.lock().unwrap().caps { if let Some(ref caps) = self.settings.lock().unwrap().caps {
events.push(gst::Event::new_caps(&caps).build()); events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone()); state.configured_caps = Some(caps.clone());
@ -310,8 +314,38 @@ impl UdpSrc {
self.src_pad.push_event(event); self.src_pad.push_event(event);
} }
// TODO: Error handling match self.src_pad.push(buffer).into_result() {
self.src_pad.push(buffer).into_result().map(|_| ()) Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed buffer");
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(self.cat, obj: element, "Flushing");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
}
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
}
Ok(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
Err(gst::FlowError::CustomError)
}
}
} }
fn prepare(&self, element: &Element) -> Result<(), ()> { fn prepare(&self, element: &Element) -> Result<(), ()> {
@ -321,9 +355,9 @@ impl UdpSrc {
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
// TODO: Error handling
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
// TODO: Error handling
let io_context = IOContext::new( let io_context = IOContext::new(
&settings.context, &settings.context,
settings.context_threads as isize, settings.context_threads as isize,
@ -388,10 +422,37 @@ impl UdpSrc {
let socket = Socket::new(&element.clone().upcast(), socket, buffer_pool); let socket = Socket::new(&element.clone().upcast(), socket, buffer_pool);
let element_clone = element.clone(); let element_clone = element.clone();
socket.schedule(&io_context, move |buffer| { let element_clone2 = element.clone();
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap(); socket.schedule(
udpsrc.push_buffer(&element_clone, buffer) &io_context,
}); move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let udpsrc = element_clone2.get_impl().downcast_ref::<UdpSrc>().unwrap();
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
Either::Right(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
},
);
state.socket = Some(socket); state.socket = Some(socket);
state.io_context = Some(io_context); state.io_context = Some(io_context);