diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs index ec5e5073..41b0d607 100644 --- a/gst-plugin-threadshare/src/iocontext.rs +++ b/gst-plugin-threadshare/src/iocontext.rs @@ -170,20 +170,37 @@ impl IOContextRunner { let elapsed = now.elapsed(); gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed); - if elapsed < wait { - gst_trace!( - CONTEXT_CAT, - "Waiting for {:?} before polling again", - wait - elapsed - ); - thread::sleep(wait - elapsed); - gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed()); - } + if wait == time::Duration::from_millis(0) { + let timers = timers.lock().unwrap(); + let wait = match timers.peek().map(|entry| entry.time) { + None => None, + Some(time) => Some({ + let tmp = time::Instant::now(); - if wait > time::Duration::from_millis(0) { - now += wait; - } else { + if time < tmp { + time::Duration::from_millis(0) + } else { + time.duration_since(tmp) + } + }), + }; + + gst_trace!(CONTEXT_CAT, "Sleeping for up to {:?}", wait); + current_thread.enter(enter).turn(wait).unwrap(); + gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed()); now = time::Instant::now(); + } else { + if elapsed < wait { + gst_trace!( + CONTEXT_CAT, + "Waiting for {:?} before polling again", + wait - elapsed + ); + thread::sleep(wait - elapsed); + gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed()); + } + + now += wait; } }) }); @@ -409,6 +426,8 @@ pub struct Interval { impl Interval { #[allow(unused)] pub fn new(context: &IOContext, interval: time::Duration) -> Self { + use tokio_executor::park::Unpark; + let (sender, receiver) = futures_mpsc::unbounded(); let mut timers = context.0.timers.lock().unwrap(); @@ -419,6 +438,7 @@ impl Interval { sender, }; timers.push(entry); + context.reactor_handle().unpark(); Self { receiver } }