ts/rt/Task: awake the iteration loop when it needs to be aborted

When the iteration loop is throttling, the call to `abort` on the
`loop_abort_handle` returns immediately, but the actual `Future`
for the iteration loop is aborted only when the scheduler throttling
completes. State transitions which requires the loop to be aborted &
which are serialized at the pipeline level can incur long delays.

This commit makes sure the Task Context's scheduler is awaken as soon
as the task loop is aborted.
This commit is contained in:
François Laignel 2022-03-21 12:40:28 +01:00 committed by Sebastian Dröge
parent 97985d6442
commit c1615d01e6
3 changed files with 49 additions and 27 deletions

View file

@ -249,6 +249,17 @@ impl Context {
self.0.spawn_and_awake(future)
}
/// Forces the scheduler to wake up.
///
/// This is not needed by elements implementors as they are
/// supposed to call [`Self::spawn_and_awake`] when needed.
/// However, it's useful for lower level implementations such as
/// `runtime::Task` so as to make sure the iteration loop yields
/// as soon as possible when a transition is requested.
pub(in crate::runtime) fn wake_up(&self) {
self.0.wake_up();
}
pub fn current_has_sub_tasks() -> bool {
let (ctx, task_id) = match Context::current_task() {
Some(task) => task,

View file

@ -420,6 +420,10 @@ impl Handle {
JoinHandle::new(task_id, task, self)
}
pub(super) fn wake_up(&self) {
self.0.scheduler.wake_up();
}
pub fn remove_soure(&self, source: Arc<Source>) {
if self
.0

View file

@ -355,6 +355,25 @@ impl TaskInner {
Ok(ack_rx)
}
/// Aborts the task iteration loop ASAP.
///
/// When the iteration loop is throttling, the call to `abort`
/// on the `loop_abort_handle` returns immediately, but the
/// actual `Future` for the iteration loop is aborted only when
/// the scheduler throttling completes.
///
/// This function aborts the task iteration loop and awakes the
/// iteration scheduler.
fn abort_task_loop(&mut self) {
if let Some(loop_abort_handle) = self.loop_abort_handle.take() {
loop_abort_handle.abort();
if let Some(context) = self.context.as_ref() {
context.wake_up();
}
}
}
}
impl Drop for TaskInner {
@ -489,9 +508,7 @@ impl Task {
inner.state = TaskState::Unpreparing;
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
inner.abort_task_loop();
let _ = inner.trigger(Trigger::Unprepare).unwrap();
let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap();
@ -579,42 +596,32 @@ impl Task {
}
pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
Self::push_and_await_transition(inner, Trigger::FlushStart)
self.abort_push_wakeup_await(Trigger::FlushStart)
}
pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
Self::push_and_await_transition(inner, Trigger::FlushStop)
self.abort_push_wakeup_await(Trigger::FlushStop)
}
/// Stops the `Started` `Task` and wait for it to finish.
pub fn stop(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
Self::push_and_await_transition(inner, Trigger::Stop)
self.abort_push_wakeup_await(Trigger::Stop)
}
fn push_and_await_transition(
mut inner: MutexGuard<TaskInner>,
/// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP.
///
/// This function:
/// - Makes sure the iteration loop aborts as soon as possible.
/// - Pushes the provided [`Trigger`].
/// - Awaits for the expected transition as usual.
fn abort_push_wakeup_await(
&self,
trigger: Trigger,
) -> Result<TransitionStatus, TransitionError> {
let ack_rx = inner.trigger(trigger)?;
let mut inner = self.0.lock().unwrap();
inner.abort_task_loop();
let ack_rx = inner.trigger(trigger)?;
Self::await_ack(inner, ack_rx, trigger)
}