threadshare: Integrate new tokio-timer into the IOContext

This commit is contained in:
Sebastian Dröge 2018-04-02 10:53:40 +03:00
parent 82cc63551c
commit ab22d81f12
2 changed files with 58 additions and 32 deletions

View file

@ -11,6 +11,8 @@ gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" }
tokio = { git = "https://github.com/tokio-rs/tokio" } tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio-reactor = { git = "https://github.com/tokio-rs/tokio" } tokio-reactor = { git = "https://github.com/tokio-rs/tokio" }
tokio-executor = { git = "https://github.com/tokio-rs/tokio" } tokio-executor = { git = "https://github.com/tokio-rs/tokio" }
tokio-timer = { git = "https://github.com/tokio-rs/tokio" }
tokio-threadpool = { git = "https://github.com/tokio-rs/tokio" }
futures = "0.1" futures = "0.1"
lazy_static = "1.0" lazy_static = "1.0"
either = "1.0" either = "1.0"

View file

@ -16,16 +16,17 @@
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic;
use std::thread;
use std::io; use std::io;
use std::mem; use std::mem;
use std::sync::atomic;
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use futures::{Future, Stream};
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::{Future, Stream};
use tokio::executor::thread_pool; use tokio::executor::thread_pool;
use tokio::reactor; use tokio::reactor;
use tokio_timer::timer;
use gst; use gst;
@ -125,35 +126,39 @@ impl IOContextRunner {
let handle = reactor.handle(); let handle = reactor.handle();
let mut enter = ::tokio_executor::enter().unwrap(); let mut enter = ::tokio_executor::enter().unwrap();
let mut current_thread = current_thread::CurrentThread::new_with_park(reactor); let timer = timer::Timer::new(reactor);
let timer_handle = timer.handle();
let mut current_thread = current_thread::CurrentThread::new_with_park(timer);
::tokio_reactor::with_default(&handle, &mut enter, |enter| loop { ::tokio_reactor::with_default(&handle, &mut enter, |mut enter| {
let now = time::Instant::now(); ::tokio_timer::with_default(&timer_handle, &mut enter, |enter| loop {
let now = time::Instant::now();
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
break; break;
}
{
let mut pending_futures = pending_futures.lock().unwrap();
while let Some(future) = pending_futures.pop() {
current_thread.spawn(future);
} }
}
gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); {
current_thread.enter(enter).turn(None).unwrap(); let mut pending_futures = pending_futures.lock().unwrap();
gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name); while let Some(future) = pending_futures.pop() {
current_thread.spawn(future);
}
}
let elapsed = now.elapsed(); gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name);
if elapsed < wait { current_thread.enter(enter).turn(None).unwrap();
gst_trace!( gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
CONTEXT_CAT,
"Waiting for {:?} before polling again", let elapsed = now.elapsed();
wait - elapsed if elapsed < wait {
); gst_trace!(
thread::sleep(wait - elapsed); CONTEXT_CAT,
} "Waiting for {:?} before polling again",
wait - elapsed
);
thread::sleep(wait - elapsed);
}
})
}); });
} else { } else {
let mut reactor = reactor; let mut reactor = reactor;
@ -265,14 +270,33 @@ impl IOContext {
let (pool, shutdown) = if n_threads >= 0 { let (pool, shutdown) = if n_threads >= 0 {
let handle = reactor.handle().clone(); let handle = reactor.handle().clone();
let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new()));
let t1 = timers.clone();
let shutdown = IOContextRunner::start(name, wait, reactor); let shutdown = IOContextRunner::start(name, wait, reactor);
let mut pool_builder = thread_pool::Builder::new(); let mut pool_builder = thread_pool::Builder::new();
pool_builder.around_worker(move |w, enter| { pool_builder
::tokio_reactor::with_default(&handle, enter, |_| { .around_worker(move |w, enter| {
w.run(); let timer_handle = t1.lock().unwrap().get(w.id()).unwrap().clone();
::tokio_reactor::with_default(&handle, enter, |enter| {
timer::with_default(&timer_handle, enter, |_| {
w.run();
});
});
})
.custom_park(move |worker_id| {
// Create a new timer
let timer = timer::Timer::new(::tokio_threadpool::park::DefaultPark::new());
timers
.lock()
.unwrap()
.insert(worker_id.clone(), timer.handle());
timer
}); });
});
if n_threads > 0 { if n_threads > 0 {
pool_builder.pool_size(n_threads as usize); pool_builder.pool_size(n_threads as usize);