threadshare: Implement error handling

This commit is contained in:
Sebastian Dröge 2018-03-16 20:24:36 +02:00
parent 7cd2945268
commit 21f905739f
3 changed files with 100 additions and 49 deletions

View file

@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic;
use std::thread;
use std::io;
use futures::Future;
use tokio::executor::thread_pool;
@ -245,16 +246,16 @@ impl Drop for IOContextInner {
}
impl IOContext {
pub fn new(name: &str, n_threads: isize, wait: u32) -> Self {
pub fn new(name: &str, n_threads: isize, wait: u32) -> Result<Self, io::Error> {
let mut contexts = CONTEXTS.lock().unwrap();
if let Some(context) = contexts.get(name) {
if let Some(context) = context.upgrade() {
gst_debug!(CONTEXT_CAT, "Reusing existing context '{}'", name);
return IOContext(context);
return Ok(IOContext(context));
}
}
let reactor = reactor::Reactor::new().unwrap();
let reactor = reactor::Reactor::new()?;
let (pool, shutdown) = if n_threads >= 0 {
let handle = reactor.handle().clone();
@ -286,7 +287,7 @@ impl IOContext {
contexts.insert(name.into(), Arc::downgrade(&context));
gst_debug!(CONTEXT_CAT, "Created new context '{}'", name);
IOContext(context)
Ok(IOContext(context))
}
pub fn spawn<F>(&self, future: F)

View file

@ -78,7 +78,7 @@ impl Socket {
})))
}
pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G)
pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G) -> Result<(), ()>
where
F: Fn(gst::Buffer) -> U + Send + 'static,
U: IntoFuture<Item = (), Error = gst::FlowError> + 'static,
@ -95,12 +95,15 @@ impl Socket {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Scheduling socket");
if inner.state == SocketState::Scheduled {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already scheduled");
return;
return Ok(());
}
assert_eq!(inner.state, SocketState::Unscheduled);
inner.state = SocketState::Scheduled;
inner.buffer_pool.set_active(true).unwrap();
if let Err(_) = inner.buffer_pool.set_active(true) {
gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to activate buffer pool");
return Err(());
}
let (sender, receiver) = oneshot::channel::<()>();
inner.shutdown_receiver = Some(receiver);
@ -126,6 +129,7 @@ impl Socket {
Ok(())
}),
);
Ok(())
}
pub fn unpause(&self, clock: gst::Clock, base_time: gst::ClockTime) {
@ -195,11 +199,11 @@ impl Socket {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Waiting for socket to shut down");
drop(inner);
shutdown_receiver.wait().unwrap();
shutdown_receiver.wait().expect("Already shut down");
let mut inner = self.0.lock().unwrap();
inner.state = SocketState::Unscheduled;
inner.buffer_pool.set_active(false).unwrap();
let _ = inner.buffer_pool.set_active(false);
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket shut down");
}
}

View file

@ -348,7 +348,7 @@ impl UdpSrc {
}
}
fn prepare(&self, element: &Element) -> Result<(), ()> {
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
@ -357,17 +357,31 @@ impl UdpSrc {
let mut state = self.state.lock().unwrap();
// TODO: Error handling
let io_context = IOContext::new(
&settings.context,
settings.context_threads as isize,
settings.context_wait,
);
).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create IO context: {}", err]
)
})?;
let addr: IpAddr = match settings.address {
None => return Err(()),
None => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
["No address set"]
))
}
Some(ref addr) => match addr.parse() {
Err(_) => return Err(()),
Err(err) => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
["Invalid address '{}' set: {}", addr, err]
))
}
Ok(addr) => addr,
},
};
@ -391,17 +405,32 @@ impl UdpSrc {
addr
);
let socket = net::UdpSocket::bind(&saddr).unwrap();
let socket = net::UdpSocket::bind(&saddr).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to bind socket: {}", err]
)
})?;
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr::V4(addr) => {
socket
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.unwrap();
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.join_multicast_v6(&addr, 0).unwrap();
socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
@ -409,7 +438,12 @@ impl UdpSrc {
} else {
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
let socket = net::UdpSocket::bind(&saddr).unwrap();
let socket = net::UdpSocket::bind(&saddr).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to bind socket: {}", err]
)
})?;
socket
};
@ -417,42 +451,51 @@ impl UdpSrc {
let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config();
config.set_params(None, settings.mtu, 0, 0);
buffer_pool.set_config(config).unwrap();
buffer_pool.set_config(config).map_err(|_| {
gst_error_msg!(
gst::ResourceError::Settings,
["Failed to configure buffer pool"]
)
})?;
let socket = Socket::new(&element.clone().upcast(), socket, buffer_pool);
let element_clone = element.clone();
let element_clone2 = element.clone();
socket.schedule(
&io_context,
move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let udpsrc = element_clone2.get_impl().downcast_ref::<UdpSrc>().unwrap();
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
socket
.schedule(
&io_context,
move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let udpsrc = element_clone2.get_impl().downcast_ref::<UdpSrc>().unwrap();
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
Either::Right(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
Either::Right(err) => {
gst_element_error!(
element_clone2,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
},
);
},
)
.map_err(|_| {
gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to schedule socket"])
})?;
state.socket = Some(socket);
state.io_context = Some(io_context);
@ -588,7 +631,10 @@ impl ElementImpl<Element> for UdpSrc {
match transition {
gst::StateChange::NullToReady => match self.prepare(element) {
Err(_) => return gst::StateChangeReturn::Failure,
Err(err) => {
element.post_error_message(&err);
return gst::StateChangeReturn::Failure;
}
Ok(_) => (),
},
gst::StateChange::PlayingToPaused => match self.stop(element) {