threadshare: Stop using reactor::Background and implement it ourselves for now

This commit is contained in:
Sebastian Dröge 2018-03-12 12:24:37 +02:00
parent 299c69185e
commit b939607693
3 changed files with 94 additions and 5 deletions

View file

@ -10,6 +10,7 @@ gstreamer = { git = "https://github.com/sdroege/gstreamer-rs" }
gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" }
tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio-reactor = { git = "https://github.com/tokio-rs/tokio" }
tokio-executor = { git = "https://github.com/tokio-rs/tokio" }
futures = "0.1"
lazy_static = "1.0"

View file

@ -25,6 +25,7 @@ extern crate gstreamer as gst;
extern crate futures;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_reactor;
#[macro_use]

View file

@ -25,6 +25,8 @@ use gst_plugin::object::*;
use gst_plugin::element::*;
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic;
use std::thread;
use std::u16;
use futures::{Async, Future, IntoFuture, Poll, Stream};
@ -50,13 +52,94 @@ lazy_static!{
);
}
// Our own simplified implementation of reactor::Background to allow hooking into its internals
const RUNNING: usize = 0;
const SHUTDOWN_NOW: usize = 1;
const SHUTDOWN: usize = 2;
struct IOContextRunner {
name: String,
reactor: reactor::Reactor,
shutdown: Arc<atomic::AtomicUsize>,
}
impl IOContextRunner {
fn start(name: &str, reactor: reactor::Reactor) -> IOContextShutdown {
let handle = reactor.handle().clone();
let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING));
let shutdown_clone = shutdown.clone();
let name_clone = name.into();
let join = thread::spawn(move || {
let mut runner = IOContextRunner {
reactor: reactor,
shutdown: shutdown_clone,
name: name_clone,
};
runner.run();
});
IOContextShutdown {
name: name.into(),
shutdown: shutdown,
handle: handle,
join: Some(join),
}
}
fn run(&mut self) {
gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name);
loop {
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
break;
}
gst_trace!(CONTEXT_CAT, "Turning reactor '{}'", self.name);
self.reactor.turn(None).unwrap();
}
}
}
impl Drop for IOContextRunner {
fn drop(&mut self) {
self.shutdown.store(SHUTDOWN, atomic::Ordering::SeqCst);
gst_debug!(CONTEXT_CAT, "Shut down reactor thread '{}'", self.name);
}
}
struct IOContextShutdown {
name: String,
shutdown: Arc<atomic::AtomicUsize>,
handle: reactor::Handle,
join: Option<thread::JoinHandle<()>>,
}
impl Drop for IOContextShutdown {
fn drop(&mut self) {
gst_debug!(CONTEXT_CAT, "Shutting down reactor thread '{}'", self.name);
self.shutdown.store(SHUTDOWN_NOW, atomic::Ordering::SeqCst);
loop {
use tokio_executor::park::Unpark;
// XXX: Not nice but good enough
gst_trace!(CONTEXT_CAT, "Waiting for reactor '{}' shutdown", self.name);
self.handle.unpark();
thread::yield_now();
if self.shutdown.load(atomic::Ordering::SeqCst) == SHUTDOWN {
break;
}
}
let _ = self.join.take().unwrap().join();
}
}
#[derive(Clone)]
struct IOContext(Arc<IOContextInner>);
struct IOContextInner {
name: String,
reactor: reactor::Background,
pool: thread_pool::ThreadPool,
// Only used for dropping
_shutdown: IOContextShutdown,
}
impl Drop for IOContextInner {
@ -77,7 +160,7 @@ impl IOContext {
}
}
let reactor = reactor::Reactor::new().unwrap().background().unwrap();
let reactor = reactor::Reactor::new().unwrap();
let handle = reactor.handle().clone();
@ -93,10 +176,12 @@ impl IOContext {
}
let pool = pool_builder.build();
let shutdown = IOContextRunner::start(name, reactor);
let context = Arc::new(IOContextInner {
name: name.into(),
reactor,
pool,
_shutdown: shutdown,
});
contexts.insert(name.into(), Arc::downgrade(&context));
@ -112,7 +197,7 @@ impl IOContext {
}
}
/* Workaround for https://github.com/tokio-rs/tokio/issues/207 */
// FIXME: Workaround for https://github.com/tokio-rs/tokio/issues/207
struct YieldOnce<E>(Option<()>, PhantomData<E>);
impl<E> YieldOnce<E> {
@ -316,7 +401,7 @@ impl Stream for SocketStream {
return Ok(Async::NotReady);
}
assert_ne!(inner.state, SocketState::Unscheduled);
assert_eq!(inner.state, SocketState::Running);
gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data");
let (len, time) = {
@ -362,6 +447,8 @@ impl Stream for SocketStream {
buffer.set_dts(time);
}
// TODO: Only ever poll the second again in Xms, using tokio-timer
Ok(Async::Ready(Some(buffer)))
}
}