From 1be30b8ecc94167f2a0cf4c05408311fdc1ab3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Sun, 11 Sep 2022 19:57:36 +0200 Subject: [PATCH] ts/scheduler: fix shutdown A strong handle reference was held in the `block_on_priv` `Result` handler in the thread for the `Scheduler::start` code path, which lead to the `Handler` strong count not dropping to 0 when it should, leading to the shutdown request not being triggered. Use an Arc instead of a oneshot channel for shutdown. The main Future is always polled and never relies on a waker, a `poll_fn` is cheap and does the job. Unpark the scheduler after posting a request to shutdown. --- .../src/runtime/executor/scheduler.rs | 116 ++++++++---------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index 4ce1c008..bf7c65c4 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -3,7 +3,7 @@ // // Take a look at the license at the top of the repository in the LICENSE file. -use futures::channel::oneshot; +use futures::future::poll_fn; use futures::pin_mut; use gio::glib::clone::Downgrade; @@ -12,7 +12,8 @@ use std::cell::RefCell; use std::future::Future; use std::panic; #[cfg(feature = "tuning")] -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc as sync_mpsc; use std::sync::{Arc, Condvar, Mutex, Weak}; use std::task::Poll; @@ -49,7 +50,6 @@ impl Scheduler { let thread = thread::Builder::new().name(context_name.to_string()); let (handle_sender, handle_receiver) = sync_mpsc::channel(); - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let context_name = Arc::from(context_name); let thread_ctx_name = Arc::clone(&context_name); let join = thread @@ -62,9 +62,20 @@ impl Scheduler { let handle = Scheduler::init(Arc::clone(&thread_ctx_name), max_throttling); let this = Arc::clone(&handle.0.scheduler); - handle_sender.send(handle.clone()).unwrap(); + let must_shutdown = handle.0.must_shutdown.clone(); + let handle_weak = handle.downgrade(); + handle_sender.send(handle).unwrap(); - match this.block_on_priv(shutdown_receiver) { + let shutdown_fut = poll_fn(move |_| { + if must_shutdown.load(Ordering::SeqCst) { + Poll::Ready(()) + } else { + Poll::Pending + } + }); + + // Blocking on `shutdown_fut` which is cheap to `poll`. + match this.block_on_priv(shutdown_fut) { Ok(_) => { gst::debug!( RUNTIME_CAT, @@ -79,9 +90,8 @@ impl Scheduler { thread_ctx_name ); - // We are shutting down on our own initiative - if let Ok(mut shutdown) = handle.0.shutdown.lock() { - shutdown.clear(); + if let Some(handle) = handle_weak.upgrade() { + handle.self_shutdown(); } panic::resume_unwind(e); @@ -91,7 +101,7 @@ impl Scheduler { .expect("Failed to spawn Scheduler thread"); let handle = handle_receiver.recv().expect("Context thread init failed"); - handle.set_shutdown(shutdown_sender, join); + handle.set_join_handle(join); handle } @@ -132,9 +142,11 @@ impl Scheduler { !Scheduler::is_scheduler_thread(), "Attempt to block within an existing Scheduler thread." ); + let handle = Scheduler::init(Scheduler::DUMMY_NAME.into(), Duration::ZERO); let this = Arc::clone(&handle.0.scheduler); + // Move the (only) handle for this scheduler in the main task. let (task_id, task) = this.tasks.add(async move { let res = future.await; @@ -154,6 +166,7 @@ impl Scheduler { ); }); + // Blocking on `task` which is cheap to `poll`. match this.block_on_priv(task) { Ok(res) => res, Err(e) => { @@ -168,14 +181,21 @@ impl Scheduler { } } - fn block_on_priv(&self, future: F) -> std::thread::Result + // Important: the `termination_future` MUST be cheap to poll. + // + // Examples of appropriate `termination_future` are: + // + // - an `executor::Task` returned by `self.tasks.add(..)`. + // - a `JoinHandle` returned by `Handle::spawn`. + // - a custom future with few cycles (ex. checking an `AtomicBool`). + fn block_on_priv(&self, termination_future: F) -> std::thread::Result where F: Future + Send + 'static, F::Output: Send + 'static, { let waker = waker_fn(|| ()); let cx = &mut std::task::Context::from_waker(&waker); - pin_mut!(future); + pin_mut!(termination_future); let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name))); @@ -191,7 +211,7 @@ impl Scheduler { Reactor::with_mut(|reactor| reactor.react(now).ok()); } - if let Poll::Ready(t) = future.as_mut().poll(cx) { + if let Poll::Ready(t) = termination_future.as_mut().poll(cx) { return Ok(t); } @@ -210,15 +230,6 @@ impl Scheduler { tasks_checked += 1; } else { - // No more ready tasks. - if tasks_checked > 0 { - // Check if the main future is ready before parking. - if let Poll::Ready(t) = future.as_mut().poll(cx) { - return Ok(t); - } - } - // else: main future has just been checked. - let mut must_unpark = self.must_unpark.lock().unwrap(); loop { if *must_unpark { @@ -297,51 +308,38 @@ impl Scheduler { } } -impl Drop for Scheduler { - fn drop(&mut self) { - gst::debug!( - RUNTIME_CAT, - "Terminated: Scheduler for Context {}", - self.context_name - ); - } -} - #[derive(Debug)] -struct SchedulerShutdown { +struct HandleInner { scheduler: Arc, - sender: Option>, - join: Option>, + must_shutdown: Arc, + join: Mutex>>, } -impl SchedulerShutdown { +impl HandleInner { fn new(scheduler: Arc) -> Self { - SchedulerShutdown { + HandleInner { scheduler, - sender: None, - join: None, + must_shutdown: Default::default(), + join: Default::default(), } } - - fn clear(&mut self) { - self.sender = None; - self.join = None; - } } -impl Drop for SchedulerShutdown { +impl Drop for HandleInner { fn drop(&mut self) { - if let Some(sender) = self.sender.take() { - gst::debug!( + if !self.must_shutdown.fetch_or(true, Ordering::SeqCst) { + // Was not already shutting down. + self.scheduler.unpark(); + + gst::trace!( RUNTIME_CAT, "Shutting down Scheduler thread for Context {}", self.scheduler.context_name ); - drop(sender); // Don't block shutting down itself if !self.scheduler.is_current() { - if let Some(join_handler) = self.join.take() { + if let Some(join_handler) = self.join.lock().unwrap().take() { gst::trace!( RUNTIME_CAT, "Waiting for Scheduler thread to shutdown for Context {}", @@ -355,12 +353,6 @@ impl Drop for SchedulerShutdown { } } -#[derive(Debug)] -struct HandleInner { - scheduler: Arc, - shutdown: Mutex, -} - #[derive(Clone, Debug)] pub(super) struct HandleWeak(Weak); @@ -375,16 +367,16 @@ pub(super) struct Handle(Arc); impl Handle { fn new(scheduler: Arc) -> Self { - Handle(Arc::new(HandleInner { - shutdown: Mutex::new(SchedulerShutdown::new(Arc::clone(&scheduler))), - scheduler, - })) + Handle(Arc::new(HandleInner::new(scheduler))) } - fn set_shutdown(&self, sender: oneshot::Sender<()>, join: thread::JoinHandle<()>) { - let mut shutdown = self.0.shutdown.lock().unwrap(); - shutdown.sender = Some(sender); - shutdown.join = Some(join); + fn set_join_handle(&self, join: thread::JoinHandle<()>) { + *self.0.join.lock().unwrap() = Some(join); + } + + fn self_shutdown(self) { + self.0.must_shutdown.store(true, Ordering::SeqCst); + *self.0.join.lock().unwrap() = None; } pub fn context_name(&self) -> &str {