diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs index fabd0937..17a8c840 100644 --- a/generic/threadshare/src/runtime/executor/context.rs +++ b/generic/threadshare/src/runtime/executor/context.rs @@ -249,6 +249,17 @@ impl Context { self.0.spawn_and_awake(future) } + /// Forces the scheduler to wake up. + /// + /// This is not needed by elements implementors as they are + /// supposed to call [`Self::spawn_and_awake`] when needed. + /// However, it's useful for lower level implementations such as + /// `runtime::Task` so as to make sure the iteration loop yields + /// as soon as possible when a transition is requested. + pub(in crate::runtime) fn wake_up(&self) { + self.0.wake_up(); + } + pub fn current_has_sub_tasks() -> bool { let (ctx, task_id) = match Context::current_task() { Some(task) => task, diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index dab53de0..d6d76be2 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -420,6 +420,10 @@ impl Handle { JoinHandle::new(task_id, task, self) } + pub(super) fn wake_up(&self) { + self.0.scheduler.wake_up(); + } + pub fn remove_soure(&self, source: Arc) { if self .0 diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index e729a6fc..8b7be397 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -355,6 +355,25 @@ impl TaskInner { Ok(ack_rx) } + + /// Aborts the task iteration loop ASAP. + /// + /// When the iteration loop is throttling, the call to `abort` + /// on the `loop_abort_handle` returns immediately, but the + /// actual `Future` for the iteration loop is aborted only when + /// the scheduler throttling completes. + /// + /// This function aborts the task iteration loop and awakes the + /// iteration scheduler. + fn abort_task_loop(&mut self) { + if let Some(loop_abort_handle) = self.loop_abort_handle.take() { + loop_abort_handle.abort(); + + if let Some(context) = self.context.as_ref() { + context.wake_up(); + } + } + } } impl Drop for TaskInner { @@ -489,9 +508,7 @@ impl Task { inner.state = TaskState::Unpreparing; - if let Some(loop_abort_handle) = inner.loop_abort_handle.take() { - loop_abort_handle.abort(); - } + inner.abort_task_loop(); let _ = inner.trigger(Trigger::Unprepare).unwrap(); let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap(); @@ -579,42 +596,32 @@ impl Task { } pub fn flush_start(&self) -> Result { - let mut inner = self.0.lock().unwrap(); - - if let Some(loop_abort_handle) = inner.loop_abort_handle.take() { - loop_abort_handle.abort(); - } - - Self::push_and_await_transition(inner, Trigger::FlushStart) + self.abort_push_wakeup_await(Trigger::FlushStart) } pub fn flush_stop(&self) -> Result { - let mut inner = self.0.lock().unwrap(); - - if let Some(loop_abort_handle) = inner.loop_abort_handle.take() { - loop_abort_handle.abort(); - } - - Self::push_and_await_transition(inner, Trigger::FlushStop) + self.abort_push_wakeup_await(Trigger::FlushStop) } /// Stops the `Started` `Task` and wait for it to finish. pub fn stop(&self) -> Result { - let mut inner = self.0.lock().unwrap(); - - if let Some(loop_abort_handle) = inner.loop_abort_handle.take() { - loop_abort_handle.abort(); - } - - Self::push_and_await_transition(inner, Trigger::Stop) + self.abort_push_wakeup_await(Trigger::Stop) } - fn push_and_await_transition( - mut inner: MutexGuard, + /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP. + /// + /// This function: + /// - Makes sure the iteration loop aborts as soon as possible. + /// - Pushes the provided [`Trigger`]. + /// - Awaits for the expected transition as usual. + fn abort_push_wakeup_await( + &self, trigger: Trigger, ) -> Result { - let ack_rx = inner.trigger(trigger)?; + let mut inner = self.0.lock().unwrap(); + inner.abort_task_loop(); + let ack_rx = inner.trigger(trigger)?; Self::await_ack(inner, ack_rx, trigger) }