diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index ba4d6fa6..4ce1c008 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -42,6 +42,7 @@ pub(super) struct Scheduler { impl Scheduler { pub const DUMMY_NAME: &'static str = "DUMMY"; + const MAX_SUCCESSIVE_TASKS: usize = 64; pub fn start(context_name: &str, max_throttling: Duration) -> Handle { // Name the thread so that it appears in panic messages. @@ -178,52 +179,73 @@ impl Scheduler { let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name))); - if let Poll::Ready(t) = future.as_mut().poll(cx) { - return Ok(t); - } - - let mut last; - loop { - last = Instant::now(); - Reactor::with_mut(|reactor| reactor.react(last).ok()); + let mut now; + // This is to ensure reactor invocation on the first iteration. + let mut last_react = Instant::now() - self.max_throttling; + let mut tasks_checked; + 'main: loop { + // Only check I/O and timers every `max_throttling`. + now = Instant::now(); + if now - last_react >= self.max_throttling { + last_react = now; + Reactor::with_mut(|reactor| reactor.react(now).ok()); + } if let Poll::Ready(t) = future.as_mut().poll(cx) { return Ok(t); } - while let Ok(runnable) = self.tasks.pop_runnable() { - panic::catch_unwind(|| runnable.run()).map_err(|err| { - gst::error!( - RUNTIME_CAT, - "A task has panicked within Context {}", - self.context_name - ); + tasks_checked = 0; + while tasks_checked < Self::MAX_SUCCESSIVE_TASKS { + if let Ok(runnable) = self.tasks.pop_runnable() { + panic::catch_unwind(|| runnable.run()).map_err(|err| { + gst::error!( + RUNTIME_CAT, + "A task has panicked within Context {}", + self.context_name + ); - err - })?; - } + err + })?; - let mut must_unpark = self.must_unpark.lock().unwrap(); - loop { - if *must_unpark { - *must_unpark = false; - break; - } - - if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) { - let result = self - .must_unpark_cvar - .wait_timeout(must_unpark, parking_duration) - .unwrap(); - - #[cfg(feature = "tuning")] - self.parked_duration - .fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed); - - must_unpark = result.0; + tasks_checked += 1; } else { - *must_unpark = false; - break; + // No more ready tasks. + if tasks_checked > 0 { + // Check if the main future is ready before parking. + if let Poll::Ready(t) = future.as_mut().poll(cx) { + return Ok(t); + } + } + // else: main future has just been checked. + + let mut must_unpark = self.must_unpark.lock().unwrap(); + loop { + if *must_unpark { + *must_unpark = false; + continue 'main; + } + + if let Some(parking_duration) = + self.max_throttling.checked_sub(last_react.elapsed()) + { + #[cfg(feature = "tuning")] + self.parked_duration.fetch_add( + parking_duration.subsec_nanos() as u64, + Ordering::Relaxed, + ); + + let result = self + .must_unpark_cvar + .wait_timeout(must_unpark, parking_duration) + .unwrap(); + + must_unpark = result.0; + } else { + *must_unpark = false; + continue 'main; + } + } } } }