diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 95ee2f74..0ba917b6 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -10,6 +10,7 @@ gstreamer = { git = "https://github.com/sdroege/gstreamer-rs" } gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" } tokio = { git = "https://github.com/tokio-rs/tokio" } tokio-reactor = { git = "https://github.com/tokio-rs/tokio" } +tokio-executor = { git = "https://github.com/tokio-rs/tokio" } futures = "0.1" lazy_static = "1.0" diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index f969d357..b52cb8e7 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -25,6 +25,7 @@ extern crate gstreamer as gst; extern crate futures; extern crate tokio; +extern crate tokio_executor; extern crate tokio_reactor; #[macro_use] diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index d299a405..fa2ef960 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -25,6 +25,8 @@ use gst_plugin::object::*; use gst_plugin::element::*; use std::sync::{Arc, Mutex, Weak}; +use std::sync::atomic; +use std::thread; use std::u16; use futures::{Async, Future, IntoFuture, Poll, Stream}; @@ -50,13 +52,94 @@ lazy_static!{ ); } +// Our own simplified implementation of reactor::Background to allow hooking into its internals +const RUNNING: usize = 0; +const SHUTDOWN_NOW: usize = 1; +const SHUTDOWN: usize = 2; + +struct IOContextRunner { + name: String, + reactor: reactor::Reactor, + shutdown: Arc, +} + +impl IOContextRunner { + fn start(name: &str, reactor: reactor::Reactor) -> IOContextShutdown { + let handle = reactor.handle().clone(); + let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING)); + let shutdown_clone = shutdown.clone(); + let name_clone = name.into(); + let join = thread::spawn(move || { + let mut runner = IOContextRunner { + reactor: reactor, + shutdown: shutdown_clone, + name: name_clone, + }; + runner.run(); + }); + + IOContextShutdown { + name: name.into(), + shutdown: shutdown, + handle: handle, + join: Some(join), + } + } + + fn run(&mut self) { + gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name); + loop { + if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { + break; + } + + gst_trace!(CONTEXT_CAT, "Turning reactor '{}'", self.name); + self.reactor.turn(None).unwrap(); + } + } +} + +impl Drop for IOContextRunner { + fn drop(&mut self) { + self.shutdown.store(SHUTDOWN, atomic::Ordering::SeqCst); + gst_debug!(CONTEXT_CAT, "Shut down reactor thread '{}'", self.name); + } +} + +struct IOContextShutdown { + name: String, + shutdown: Arc, + handle: reactor::Handle, + join: Option>, +} + +impl Drop for IOContextShutdown { + fn drop(&mut self) { + gst_debug!(CONTEXT_CAT, "Shutting down reactor thread '{}'", self.name); + self.shutdown.store(SHUTDOWN_NOW, atomic::Ordering::SeqCst); + loop { + use tokio_executor::park::Unpark; + + // XXX: Not nice but good enough + gst_trace!(CONTEXT_CAT, "Waiting for reactor '{}' shutdown", self.name); + self.handle.unpark(); + thread::yield_now(); + if self.shutdown.load(atomic::Ordering::SeqCst) == SHUTDOWN { + break; + } + } + let _ = self.join.take().unwrap().join(); + } +} + #[derive(Clone)] struct IOContext(Arc); struct IOContextInner { name: String, - reactor: reactor::Background, pool: thread_pool::ThreadPool, + // Only used for dropping + _shutdown: IOContextShutdown, } impl Drop for IOContextInner { @@ -77,7 +160,7 @@ impl IOContext { } } - let reactor = reactor::Reactor::new().unwrap().background().unwrap(); + let reactor = reactor::Reactor::new().unwrap(); let handle = reactor.handle().clone(); @@ -93,10 +176,12 @@ impl IOContext { } let pool = pool_builder.build(); + let shutdown = IOContextRunner::start(name, reactor); + let context = Arc::new(IOContextInner { name: name.into(), - reactor, pool, + _shutdown: shutdown, }); contexts.insert(name.into(), Arc::downgrade(&context)); @@ -112,7 +197,7 @@ impl IOContext { } } -/* Workaround for https://github.com/tokio-rs/tokio/issues/207 */ +// FIXME: Workaround for https://github.com/tokio-rs/tokio/issues/207 struct YieldOnce(Option<()>, PhantomData); impl YieldOnce { @@ -316,7 +401,7 @@ impl Stream for SocketStream { return Ok(Async::NotReady); } - assert_ne!(inner.state, SocketState::Unscheduled); + assert_eq!(inner.state, SocketState::Running); gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data"); let (len, time) = { @@ -362,6 +447,8 @@ impl Stream for SocketStream { buffer.set_dts(time); } + // TODO: Only ever poll the second again in Xms, using tokio-timer + Ok(Async::Ready(Some(buffer))) } }