threadshare/iocontext: Add custom interval timer implementation

This knows about our throttled event loop and ensures that timers are
triggered at most 1 throttle time later instead of 2.
This commit is contained in:
Sebastian Dröge 2019-04-18 13:38:10 +03:00 committed by Mathieu Duponchelle
parent b7e55836c1
commit 33cb599464

View file

@ -15,15 +15,18 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use std::collections::HashMap; use std::cmp;
use std::collections::{BinaryHeap, HashMap};
use std::io; use std::io;
use std::mem; use std::mem;
use std::sync::{atomic, mpsc}; use std::sync::{atomic, mpsc};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::thread; use std::thread;
use std::time;
use futures::future; use futures::future;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::sync::mpsc as futures_mpsc;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio::reactor; use tokio::reactor;
@ -56,6 +59,7 @@ impl IOContextRunner {
name: &str, name: &str,
wait: u32, wait: u32,
reactor: reactor::Reactor, reactor: reactor::Reactor,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) -> (tokio_current_thread::Handle, IOContextShutdown) { ) -> (tokio_current_thread::Handle, IOContextShutdown) {
let handle = reactor.handle().clone(); let handle = reactor.handle().clone();
let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING)); let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING));
@ -70,7 +74,7 @@ impl IOContextRunner {
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
let join = thread::spawn(move || { let join = thread::spawn(move || {
runner.run(wait, reactor, sender); runner.run(wait, reactor, sender, timers);
}); });
let shutdown = IOContextShutdown { let shutdown = IOContextShutdown {
@ -90,8 +94,8 @@ impl IOContextRunner {
wait: u32, wait: u32,
reactor: reactor::Reactor, reactor: reactor::Reactor,
sender: mpsc::Sender<tokio_current_thread::Handle>, sender: mpsc::Sender<tokio_current_thread::Handle>,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
) { ) {
use std::time;
let wait = time::Duration::from_millis(wait as u64); let wait = time::Duration::from_millis(wait as u64);
gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name); gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name);
@ -112,6 +116,48 @@ impl IOContextRunner {
break; break;
} }
gst_trace!(CONTEXT_CAT, "Elapsed {:?} since last loop", now.elapsed());
// Handle timers
{
// Trigger all timers that would be expired before the middle of the loop wait
// time
let timer_threshold = now + wait / 2;
let mut timers = timers.lock().unwrap();
while timers
.peek()
.and_then(|entry| {
if entry.time < timer_threshold {
Some(())
} else {
None
}
})
.is_some()
{
let TimerEntry {
time,
interval,
sender,
..
} = timers.pop().unwrap();
if sender.is_closed() {
continue;
}
let _ = sender.unbounded_send(());
if let Some(interval) = interval {
timers.push(TimerEntry {
time: time + interval,
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval: Some(interval),
sender,
});
}
}
}
gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name);
while current_thread while current_thread
.enter(enter) .enter(enter)
@ -122,6 +168,8 @@ impl IOContextRunner {
gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name); gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
let elapsed = now.elapsed(); let elapsed = now.elapsed();
gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed);
if elapsed < wait { if elapsed < wait {
gst_trace!( gst_trace!(
CONTEXT_CAT, CONTEXT_CAT,
@ -129,8 +177,14 @@ impl IOContextRunner {
wait - elapsed wait - elapsed
); );
thread::sleep(wait - elapsed); thread::sleep(wait - elapsed);
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
}
if wait > time::Duration::from_millis(0) {
now += wait;
} else {
now = time::Instant::now();
} }
now += wait;
}) })
}); });
} }
@ -179,6 +233,7 @@ struct IOContextInner {
name: String, name: String,
runtime_handle: Mutex<tokio_current_thread::Handle>, runtime_handle: Mutex<tokio_current_thread::Handle>,
reactor_handle: reactor::Handle, reactor_handle: reactor::Handle,
timers: Arc<Mutex<BinaryHeap<TimerEntry>>>,
// Only used for dropping // Only used for dropping
_shutdown: IOContextShutdown, _shutdown: IOContextShutdown,
pending_futures: Mutex<( pending_futures: Mutex<(
@ -208,12 +263,16 @@ impl IOContext {
let reactor = reactor::Reactor::new()?; let reactor = reactor::Reactor::new()?;
let reactor_handle = reactor.handle().clone(); let reactor_handle = reactor.handle().clone();
let (runtime_handle, shutdown) = IOContextRunner::start(name, wait, reactor); let timers = Arc::new(Mutex::new(BinaryHeap::new()));
let (runtime_handle, shutdown) =
IOContextRunner::start(name, wait, reactor, timers.clone());
let context = Arc::new(IOContextInner { let context = Arc::new(IOContextInner {
name: name.into(), name: name.into(),
runtime_handle: Mutex::new(runtime_handle), runtime_handle: Mutex::new(runtime_handle),
reactor_handle, reactor_handle,
timers,
_shutdown: shutdown, _shutdown: shutdown,
pending_futures: Mutex::new((0, HashMap::new())), pending_futures: Mutex::new((0, HashMap::new())),
}); });
@ -306,3 +365,66 @@ impl glib::subclass::boxed::BoxedType for PendingFutureId {
} }
glib_boxed_derive_traits!(PendingFutureId); glib_boxed_derive_traits!(PendingFutureId);
static TIMER_ENTRY_ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
// Ad-hoc interval timer implementation for our throttled event loop above
pub struct TimerEntry {
time: time::Instant,
id: usize, // for producing a total order
interval: Option<time::Duration>,
sender: futures_mpsc::UnboundedSender<()>,
}
impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.time.eq(&other.time) && self.id.eq(&other.id)
}
}
impl Eq for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(&other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other
.time
.cmp(&self.time)
.then_with(|| other.id.cmp(&self.id))
}
}
pub struct Interval {
receiver: futures_mpsc::UnboundedReceiver<()>,
}
impl Interval {
pub fn new(context: &IOContext, interval: time::Duration) -> Self {
let (sender, receiver) = futures_mpsc::unbounded();
let mut timers = context.0.timers.lock().unwrap();
let entry = TimerEntry {
time: time::Instant::now(),
id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed),
interval: Some(interval),
sender,
};
timers.push(entry);
Self { receiver }
}
}
impl Stream for Interval {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
self.receiver.poll()
}
}