threadshare: Handle context-wait==0 by waiting on IO

This commit is contained in:
Sebastian Dröge 2019-09-09 12:16:09 +03:00
parent b05fb902f9
commit 5f9d649021

View file

@ -170,20 +170,37 @@ impl IOContextRunner {
let elapsed = now.elapsed(); let elapsed = now.elapsed();
gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed); gst_trace!(CONTEXT_CAT, "Elapsed {:?} after handling futures", elapsed);
if elapsed < wait { if wait == time::Duration::from_millis(0) {
gst_trace!( let timers = timers.lock().unwrap();
CONTEXT_CAT, let wait = match timers.peek().map(|entry| entry.time) {
"Waiting for {:?} before polling again", None => None,
wait - elapsed Some(time) => Some({
); let tmp = time::Instant::now();
thread::sleep(wait - elapsed);
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
}
if wait > time::Duration::from_millis(0) { if time < tmp {
now += wait; time::Duration::from_millis(0)
} else { } else {
time.duration_since(tmp)
}
}),
};
gst_trace!(CONTEXT_CAT, "Sleeping for up to {:?}", wait);
current_thread.enter(enter).turn(wait).unwrap();
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
now = time::Instant::now(); now = time::Instant::now();
} else {
if elapsed < wait {
gst_trace!(
CONTEXT_CAT,
"Waiting for {:?} before polling again",
wait - elapsed
);
thread::sleep(wait - elapsed);
gst_trace!(CONTEXT_CAT, "Slept for {:?}", now.elapsed());
}
now += wait;
} }
}) })
}); });
@ -409,6 +426,8 @@ pub struct Interval {
impl Interval { impl Interval {
#[allow(unused)] #[allow(unused)]
pub fn new(context: &IOContext, interval: time::Duration) -> Self { pub fn new(context: &IOContext, interval: time::Duration) -> Self {
use tokio_executor::park::Unpark;
let (sender, receiver) = futures_mpsc::unbounded(); let (sender, receiver) = futures_mpsc::unbounded();
let mut timers = context.0.timers.lock().unwrap(); let mut timers = context.0.timers.lock().unwrap();
@ -419,6 +438,7 @@ impl Interval {
sender, sender,
}; };
timers.push(entry); timers.push(entry);
context.reactor_handle().unpark();
Self { receiver } Self { receiver }
} }