From 33cb599464c23360086fa642ddf089bb3fb0a88f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 18 Apr 2019 13:38:10 +0300 Subject: [PATCH] threadshare/iocontext: Add custom interval timer implementation This knows about our throttled event loop and ensures that timers are triggered at most 1 throttle time later instead of 2. --- gst-plugin-threadshare/src/iocontext.rs | 132 +++++++++++++++++++++++- 1 file changed, 127 insertions(+), 5 deletions(-) diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs index 336d7c26..aee1545a 100644 --- a/gst-plugin-threadshare/src/iocontext.rs +++ b/gst-plugin-threadshare/src/iocontext.rs @@ -15,15 +15,18 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use std::collections::HashMap; +use std::cmp; +use std::collections::{BinaryHeap, HashMap}; use std::io; use std::mem; use std::sync::{atomic, mpsc}; use std::sync::{Arc, Mutex, Weak}; use std::thread; +use std::time; use futures::future; use futures::stream::futures_unordered::FuturesUnordered; +use futures::sync::mpsc as futures_mpsc; use futures::sync::oneshot; use futures::{Future, Stream}; use tokio::reactor; @@ -56,6 +59,7 @@ impl IOContextRunner { name: &str, wait: u32, reactor: reactor::Reactor, + timers: Arc>>, ) -> (tokio_current_thread::Handle, IOContextShutdown) { let handle = reactor.handle().clone(); let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING)); @@ -70,7 +74,7 @@ impl IOContextRunner { let (sender, receiver) = mpsc::channel(); let join = thread::spawn(move || { - runner.run(wait, reactor, sender); + runner.run(wait, reactor, sender, timers); }); let shutdown = IOContextShutdown { @@ -90,8 +94,8 @@ impl IOContextRunner { wait: u32, reactor: reactor::Reactor, sender: mpsc::Sender, + timers: Arc>>, ) { - use std::time; let wait = time::Duration::from_millis(wait as u64); gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name); @@ -112,6 +116,48 @@ impl IOContextRunner { break; } + gst_trace!(CONTEXT_CAT, "Elapsed {:?} since last loop", now.elapsed()); + + // Handle timers + { + // Trigger all timers that would be expired before the middle of the loop wait + // time + let timer_threshold = now + wait / 2; + let mut timers = timers.lock().unwrap(); + while timers + .peek() + .and_then(|entry| { + if entry.time < timer_threshold { + Some(()) + } else { + None + } + }) + .is_some() + { + let TimerEntry { + time, + interval, + sender, + .. + } = timers.pop().unwrap(); + + if sender.is_closed() { + continue; + } + + let _ = sender.unbounded_send(()); + if let Some(interval) = interval { + timers.push(TimerEntry { + time: time + interval, + id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed), + interval: Some(interval), + sender, + }); + } + } + } + gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); while current_thread .enter(enter) @@ -122,6 +168,8 @@ impl IOContextRunner { gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name); let elapsed = now.elapsed(); + gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed); + if elapsed < wait { gst_trace!( CONTEXT_CAT, @@ -129,8 +177,14 @@ impl IOContextRunner { wait - elapsed ); thread::sleep(wait - elapsed); + gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed()); + } + + if wait > time::Duration::from_millis(0) { + now += wait; + } else { + now = time::Instant::now(); } - now += wait; }) }); } @@ -179,6 +233,7 @@ struct IOContextInner { name: String, runtime_handle: Mutex, reactor_handle: reactor::Handle, + timers: Arc>>, // Only used for dropping _shutdown: IOContextShutdown, pending_futures: Mutex<( @@ -208,12 +263,16 @@ impl IOContext { let reactor = reactor::Reactor::new()?; let reactor_handle = reactor.handle().clone(); - let (runtime_handle, shutdown) = IOContextRunner::start(name, wait, reactor); + let timers = Arc::new(Mutex::new(BinaryHeap::new())); + + let (runtime_handle, shutdown) = + IOContextRunner::start(name, wait, reactor, timers.clone()); let context = Arc::new(IOContextInner { name: name.into(), runtime_handle: Mutex::new(runtime_handle), reactor_handle, + timers, _shutdown: shutdown, pending_futures: Mutex::new((0, HashMap::new())), }); @@ -306,3 +365,66 @@ impl glib::subclass::boxed::BoxedType for PendingFutureId { } glib_boxed_derive_traits!(PendingFutureId); + +static TIMER_ENTRY_ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0); + +// Ad-hoc interval timer implementation for our throttled event loop above +pub struct TimerEntry { + time: time::Instant, + id: usize, // for producing a total order + interval: Option, + sender: futures_mpsc::UnboundedSender<()>, +} + +impl PartialEq for TimerEntry { + fn eq(&self, other: &Self) -> bool { + self.time.eq(&other.time) && self.id.eq(&other.id) + } +} + +impl Eq for TimerEntry {} + +impl PartialOrd for TimerEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(&other)) + } +} + +impl Ord for TimerEntry { + fn cmp(&self, other: &Self) -> cmp::Ordering { + other + .time + .cmp(&self.time) + .then_with(|| other.id.cmp(&self.id)) + } +} + +pub struct Interval { + receiver: futures_mpsc::UnboundedReceiver<()>, +} + +impl Interval { + pub fn new(context: &IOContext, interval: time::Duration) -> Self { + let (sender, receiver) = futures_mpsc::unbounded(); + + let mut timers = context.0.timers.lock().unwrap(); + let entry = TimerEntry { + time: time::Instant::now(), + id: TIMER_ENTRY_ID.fetch_add(1, atomic::Ordering::Relaxed), + interval: Some(interval), + sender, + }; + timers.push(entry); + + Self { receiver } + } +} + +impl Stream for Interval { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll, Self::Error> { + self.receiver.poll() + } +}