From ed90b338f899f670239a1c0ea33055f2b5dad531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 29 Sep 2021 18:21:08 +0200 Subject: [PATCH] ts: runtime: add delay_for_at_least The time driver for the threadshare runtime assigns the timer entries to the nearest throttling time frame so that the timer fires as close as possible to the expected instant. This means that the timer might fire before or after the expected instant (at most `wait / 2` away). In some cases, we don't want the timer to fire early. The new function `delay_for_at_least` ensures that the timer is assigned to the time frame after the expected instant. --- generic/threadshare/src/runtime/executor.rs | 9 +++ generic/threadshare/src/runtime/time.rs | 72 ++++++++++++++++++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/generic/threadshare/src/runtime/executor.rs b/generic/threadshare/src/runtime/executor.rs index 445c56eb..a4f614fc 100644 --- a/generic/threadshare/src/runtime/executor.rs +++ b/generic/threadshare/src/runtime/executor.rs @@ -209,6 +209,7 @@ impl ContextThread { let context = Context(Arc::new(ContextInner { real: Some(ContextRealInner { name: self.name.clone(), + wait_duration: wait, handle: Mutex::new(runtime.handle().clone()), shutdown, }), @@ -352,6 +353,7 @@ impl fmt::Debug for JoinHandle { struct ContextRealInner { name: String, handle: Mutex, + wait_duration: Duration, // Only used for dropping shutdown: ContextShutdown, } @@ -444,6 +446,13 @@ impl Context { } } + pub fn wait_duration(&self) -> Duration { + match self.0.real { + Some(ref real) => real.wait_duration, + None => Duration::ZERO, + } + } + /// Returns `true` if a `Context` is running on current thread. pub fn is_context_thread() -> bool { CURRENT_THREAD_CONTEXT.with(|cur_ctx| cur_ctx.borrow().is_some()) diff --git a/generic/threadshare/src/runtime/time.rs b/generic/threadshare/src/runtime/time.rs index 5dac6c49..b085a2a0 100644 --- a/generic/threadshare/src/runtime/time.rs +++ b/generic/threadshare/src/runtime/time.rs @@ -22,18 +22,88 @@ use futures::stream::StreamExt; use std::time::Duration; +use super::Context; + /// Wait until the given `delay` has elapsed. /// /// This must be called from within the target runtime environment. +/// +/// When throttling is activated (i.e. when using a non-`0` `wait` +/// duration in `Context::acquire`), timer entries are assigned to +/// the nearest time frame, meaning that the delay might elapse +/// `wait` / 2 ms earlier or later than the expected instant. +/// +/// Use [`delay_for_at_least`] when it's preferable not to return +/// before the expected instant. pub async fn delay_for(delay: Duration) { - if delay > Duration::from_nanos(0) { + if delay > Duration::ZERO { tokio::time::delay_for(delay).map(drop).await; } } +/// Wait until at least the given `delay` has elapsed. +/// +/// This must be called from within the target runtime environment. +/// +/// See [`delay_for`] for details. This method won't return before +/// the expected delay has elapsed. +pub async fn delay_for_at_least(delay: Duration) { + if delay > Duration::ZERO { + tokio::time::delay_for( + delay + Context::current().map_or(Duration::ZERO, |ctx| ctx.wait_duration() / 2), + ) + .map(drop) + .await; + } +} + /// Builds a `Stream` that yields at `interval. /// /// This must be called from within the target runtime environment. pub fn interval(interval: Duration) -> impl Stream { tokio::time::interval(interval).map(drop) } + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use crate::runtime::Context; + + const MAX_THROTTLING: Duration = Duration::from_millis(10); + const DELAY: Duration = Duration::from_millis(12); + + #[tokio::test] + async fn delay_for() { + gst::init().unwrap(); + + let context = Context::acquire("delay_for", MAX_THROTTLING).unwrap(); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + })) + .unwrap(); + + // Due to throttling, timer may be fired earlier + assert!(elapsed + MAX_THROTTLING / 2 >= DELAY); + } + + #[tokio::test] + async fn delay_for_at_least() { + gst::init().unwrap(); + + let context = Context::acquire("delay_for_at_least", MAX_THROTTLING).unwrap(); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for_at_least(DELAY).await; + now.elapsed() + })) + .unwrap(); + + // Never returns earlier that DELAY + assert!(elapsed >= DELAY); + } +}