From 28a62e622e058cb77f50a4f2aadebb3b4d6c7275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Sat, 30 Jul 2022 13:51:28 +0200 Subject: [PATCH] ts/scheduler: rename awake / wake_up as unpark --- .../src/runtime/executor/async_wrapper.rs | 2 +- .../src/runtime/executor/context.rs | 12 +++--- .../src/runtime/executor/scheduler.rs | 40 +++++++++---------- generic/threadshare/src/runtime/task.rs | 21 +++++----- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/generic/threadshare/src/runtime/executor/async_wrapper.rs b/generic/threadshare/src/runtime/executor/async_wrapper.rs index dfb17b70..434533aa 100644 --- a/generic/threadshare/src/runtime/executor/async_wrapper.rs +++ b/generic/threadshare/src/runtime/executor/async_wrapper.rs @@ -377,7 +377,7 @@ impl Drop for Async { if let Some(io) = self.io.take() { if let Some(sched) = self.sched.upgrade() { let source = Arc::clone(&self.source); - sched.spawn_and_awake(async move { + sched.spawn_and_unpark(async move { Reactor::with_mut(|reactor| { if let Err(err) = reactor.remove_io(&source) { gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err); diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs index 6ed22d45..371af220 100644 --- a/generic/threadshare/src/runtime/executor/context.rs +++ b/generic/threadshare/src/runtime/executor/context.rs @@ -246,23 +246,23 @@ impl Context { self.0.spawn(future) } - pub fn spawn_and_awake(&self, future: Fut) -> JoinHandle + pub fn spawn_and_unpark(&self, future: Fut) -> JoinHandle where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - self.0.spawn_and_awake(future) + self.0.spawn_and_unpark(future) } - /// Forces the scheduler to wake up. + /// Forces the scheduler to unpark. /// /// This is not needed by elements implementors as they are - /// supposed to call [`Self::spawn_and_awake`] when needed. + /// supposed to call [`Self::spawn_and_unpark`] when needed. /// However, it's useful for lower level implementations such as /// `runtime::Task` so as to make sure the iteration loop yields /// as soon as possible when a transition is requested. - pub(in crate::runtime) fn wake_up(&self) { - self.0.wake_up(); + pub(in crate::runtime) fn unpark(&self) { + self.0.unpark(); } pub fn current_has_sub_tasks() -> bool { diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index 14cc4340..a5f1eaf6 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -32,8 +32,8 @@ pub(super) struct Scheduler { context_name: Arc, max_throttling: Duration, tasks: TaskQueue, - must_awake: Mutex, - must_awake_cvar: Condvar, + must_unpark: Mutex, + must_unpark_cvar: Condvar, } impl Scheduler { @@ -102,8 +102,8 @@ impl Scheduler { context_name: context_name.clone(), max_throttling, tasks: TaskQueue::new(context_name), - must_awake: Mutex::new(false), - must_awake_cvar: Condvar::new(), + must_unpark: Mutex::new(false), + must_unpark_cvar: Condvar::new(), })); *cur_scheduler = Some(handle.downgrade()); @@ -198,32 +198,32 @@ impl Scheduler { })?; } - let mut must_awake = self.must_awake.lock().unwrap(); + let mut must_unpark = self.must_unpark.lock().unwrap(); loop { - if *must_awake { - *must_awake = false; + if *must_unpark { + *must_unpark = false; break; } if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) { let result = self - .must_awake_cvar - .wait_timeout(must_awake, wait_duration) + .must_unpark_cvar + .wait_timeout(must_unpark, wait_duration) .unwrap(); - must_awake = result.0; + must_unpark = result.0; } else { - *must_awake = false; + *must_unpark = false; break; } } } } - fn wake_up(&self) { - let mut must_awake = self.must_awake.lock().unwrap(); - *must_awake = true; - self.must_awake_cvar.notify_one(); + fn unpark(&self) { + let mut must_unpark = self.must_unpark.lock().unwrap(); + *must_unpark = true; + self.must_unpark_cvar.notify_one(); } fn close(context_name: Arc) { @@ -384,7 +384,7 @@ impl Handle { // ensures that the lifetime bounds satisfy the safety // requirements for `TaskQueue::add_sync`. let task = unsafe { self.0.scheduler.tasks.add_sync(f) }; - self.0.scheduler.wake_up(); + self.0.scheduler.unpark(); futures::executor::block_on(task) } @@ -397,18 +397,18 @@ impl Handle { JoinHandle::new(task_id, task, self) } - pub fn spawn_and_awake(&self, future: F) -> JoinHandle + pub fn spawn_and_unpark(&self, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { let (task_id, task) = self.0.scheduler.tasks.add(future); - self.0.scheduler.wake_up(); + self.0.scheduler.unpark(); JoinHandle::new(task_id, task, self) } - pub(super) fn wake_up(&self) { - self.0.scheduler.wake_up(); + pub(super) fn unpark(&self) { + self.0.scheduler.unpark(); } pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 761af91a..37ca756c 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -353,7 +353,7 @@ impl TaskInner { } })?; - self.context.as_ref().unwrap().wake_up(); + self.context.as_ref().unwrap().unpark(); Ok(ack_rx) } @@ -594,28 +594,25 @@ impl Task { } pub fn flush_start(&self) -> Result { - self.abort_push_wakeup_await(Trigger::FlushStart) + self.abort_push_await(Trigger::FlushStart) } pub fn flush_stop(&self) -> Result { - self.abort_push_wakeup_await(Trigger::FlushStop) + self.abort_push_await(Trigger::FlushStop) } /// Stops the `Started` `Task` and wait for it to finish. pub fn stop(&self) -> Result { - self.abort_push_wakeup_await(Trigger::Stop) + self.abort_push_await(Trigger::Stop) } /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP. /// /// This function: - /// - Makes sure the iteration loop aborts as soon as possible. + /// - Aborts the iteration loop aborts. /// - Pushes the provided [`Trigger`]. /// - Awaits for the expected transition as usual. - fn abort_push_wakeup_await( - &self, - trigger: Trigger, - ) -> Result { + fn abort_push_await(&self, trigger: Trigger) -> Result { let mut inner = self.0.lock().unwrap(); inner.abort_task_loop(); @@ -751,7 +748,7 @@ macro_rules! exec_action { let join_handle = { let mut task_inner = $task_inner.lock().unwrap(); - let join_handle = $context.spawn_and_awake(action_fut); + let join_handle = $context.spawn_and_unpark(action_fut); task_inner.spawned_task_id = Some(join_handle.task_id()); join_handle @@ -1014,7 +1011,7 @@ impl StateMachine { // Unprepare is not joined by an ack_rx but by joining the state machine // handle, so we don't need to keep track of the spwaned_task_id context - .spawn_and_awake(async move { + .spawn_and_unpark(async move { self.task_impl.unprepare().await; while Context::current_has_sub_tasks() { @@ -1119,7 +1116,7 @@ impl StateMachine { let join_handle = { let mut task_inner = task_inner.lock().unwrap(); - let join_handle = context.spawn_and_awake(loop_fut); + let join_handle = context.spawn_and_unpark(loop_fut); task_inner.spawned_task_id = Some(join_handle.task_id()); join_handle