From 625fce3934470698abd7facfe4dba1804a84fa3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 26 Jul 2022 09:23:52 +0200 Subject: [PATCH] ts/Task: spawn StateMachine on ts Context Task state machines used to execute in an executor from the Futures crate. State transitions actions and iteration functions were then spawned on the target threadshare Context. This commit directly spawns the task state machine on the threadshare Context. This simplifies code a bit and paves the way for the changes described in [1]. Also introduces struct `StateMachineHandle`, which gather together fields to communicate and synchronize with the StateMachine. Renamed `StateMachine::run` as `spawn` and return `StateMachineHandle`. [1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793#note_1464400 --- generic/threadshare/Cargo.toml | 6 +- generic/threadshare/src/runtime/task.rs | 1126 +++++++++++------------ 2 files changed, 566 insertions(+), 566 deletions(-) diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 0d3f1692..774773ff 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -9,10 +9,10 @@ edition = "2021" rust-version = "1.57" [dependencies] -async-task = "4.0.3" +async-task = "4.3.0" concurrent-queue = "1.2.2" flume = "0.10.13" -futures = { version = "0.3.17", features = ["thread-pool"] } +futures = "0.3.21" libc = "0.2" gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } @@ -22,7 +22,7 @@ once_cell = "1" pin-project-lite = "0.2.0" polling = "2.2.0" rand = "0.8" -slab = "0.4.2" +slab = "0.4.7" socket2 = {features = ["all"], version = "0.4"} waker-fn = "1.1" diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 37ca756c..d3fdf39f 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -22,7 +22,7 @@ use futures::channel::mpsc as async_mpsc; use futures::channel::oneshot; -use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture, RemoteHandle}; +use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture}; use futures::prelude::*; use futures::stream::StreamExt; @@ -31,8 +31,7 @@ use std::ops::Deref; use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; -use super::executor::TaskId; -use super::{Context, RUNTIME_CAT}; +use super::{Context, JoinHandle, RUNTIME_CAT}; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] pub enum TaskState { @@ -45,7 +44,6 @@ pub enum TaskState { Started, Stopped, Unprepared, - Unpreparing, } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -86,20 +84,26 @@ impl From for gst::ErrorMessage { } } +// FIXME impl Future so that we can await without matching on the variant /// Transition details. /// /// A state transition occurs as a result of a triggering event. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Debug)] pub enum TransitionStatus { /// Transition completed successfully. Complete { origin: TaskState, target: TaskState, }, - /// Asynchronously awaiting for transition completion in a subtask. + /// Asynchronously awaiting for transition completion. /// /// This occurs when the event is triggered from a `Context`. - Async { trigger: Trigger, origin: TaskState }, + Async { + trigger: Trigger, + origin: TaskState, + ack_handle: JoinHandle>, + }, + // FIXME remove or edit doc /// Not waiting for transition completion. /// /// This is to prevent: @@ -218,18 +222,16 @@ pub trait TaskImpl: Send + 'static { } } +type AckSender = oneshot::Sender>; +type AckReceiver = oneshot::Receiver>; + struct TriggeringEvent { trigger: Trigger, - ack_tx: oneshot::Sender>, + ack_tx: AckSender, } impl TriggeringEvent { - fn new( - trigger: Trigger, - ) -> ( - Self, - oneshot::Receiver>, - ) { + fn new(trigger: Trigger) -> (Self, AckReceiver) { let (ack_tx, ack_rx) = oneshot::channel(); let req = TriggeringEvent { trigger, ack_tx }; @@ -265,27 +267,50 @@ impl fmt::Debug for TriggeringEvent { } } +#[derive(Debug)] +struct StateMachineHandle { + join_handle: JoinHandle<()>, + triggering_evt_tx: async_mpsc::Sender, + context: Context, +} + +impl StateMachineHandle { + fn trigger(&mut self, trigger: Trigger) -> AckReceiver { + let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); + + gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); + + self.triggering_evt_tx + .try_send(triggering_evt) + .unwrap_or_else(|err| { + panic!("trigger channel failure: {}", err); + }); + + self.context.unpark(); + + ack_rx + } + + async fn join(self) { + self.join_handle + .await + .expect("state machine shouldn't have been cancelled"); + } +} + #[derive(Debug)] struct TaskInner { - context: Option, state: TaskState, - state_machine_handle: Option>, - triggering_evt_tx: Option>, - prepare_abort_handle: Option, + state_machine_handle: Option, loop_abort_handle: Option, - spawned_task_id: Option, } impl Default for TaskInner { fn default() -> Self { TaskInner { - context: None, state: TaskState::Unprepared, state_machine_handle: None, - triggering_evt_tx: None, - prepare_abort_handle: None, loop_abort_handle: None, - spawned_task_id: None, } } } @@ -328,34 +353,25 @@ impl TaskInner { triggering_evt.send_ack(res); } - fn trigger( - &mut self, - trigger: Trigger, - ) -> Result>, TransitionError> { - let triggering_evt_tx = self.triggering_evt_tx.as_mut().unwrap(); - - let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); - - gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); - - triggering_evt_tx.try_send(triggering_evt).map_err(|err| { - let resource_err = if err.is_full() { - gst::ResourceError::NoSpaceLeft - } else { - gst::ResourceError::Close - }; - - gst::warning!(RUNTIME_CAT, "Unable to send {:?}: {:?}", trigger, err); - TransitionError { - trigger, - state: self.state, - err_msg: gst::error_msg!(resource_err, ["Unable to send {:?}: {:?}", trigger, err]), - } - })?; - - self.context.as_ref().unwrap().unpark(); - - Ok(ack_rx) + fn trigger(&mut self, trigger: Trigger) -> Result { + self.state_machine_handle + .as_mut() + .map(|state_machine| state_machine.trigger(trigger)) + .ok_or_else(|| { + gst::warning!( + RUNTIME_CAT, + "Unable to send {:?}: no state machine", + trigger + ); + TransitionError { + trigger, + state: TaskState::Unprepared, + err_msg: gst::error_msg!( + gst::ResourceError::NotFound, + ["Unable to send {:?}: no state machine", trigger] + ), + } + }) } fn abort_task_loop(&mut self) { @@ -408,10 +424,6 @@ impl Task { TaskStateGuard(self.0.lock().unwrap()) } - pub fn context(&self) -> Option { - self.0.lock().unwrap().context.as_ref().cloned() - } - pub fn prepare( &self, task_impl: impl TaskImpl, @@ -447,19 +459,32 @@ impl Task { inner.state = TaskState::Preparing; gst::log!(RUNTIME_CAT, "Spawning task state machine"); + inner.state_machine_handle = Some(StateMachine::spawn( + self.0.clone(), + Box::new(task_impl), + context.clone(), + )); - // FIXME allow configuration of the channel buffer size, - // this determines the contention on the Task. - let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4); - let state_machine = StateMachine::new(Box::new(task_impl), triggering_evt_rx); - inner.state_machine_handle = Some(self.spawn_state_machine(state_machine, &context)); + let ack_rx = inner.trigger(Trigger::Prepare)?; + drop(inner); - inner.triggering_evt_tx = Some(triggering_evt_tx); - inner.context = Some(context); + let ack_await_fut = async move { + gst::trace!(RUNTIME_CAT, "Awaiting ack for Prepare"); + + let res = ack_rx.await.unwrap(); + if res.is_ok() { + gst::log!(RUNTIME_CAT, "Received ack {:?} for Prepare", res); + } else { + gst::error!(RUNTIME_CAT, "Received ack {:?} for Prepare", res); + } + + res + }; Ok(TransitionStatus::Async { trigger: Trigger::Prepare, - origin, + origin: TaskState::Unprepared, + ack_handle: context.spawn_and_unpark(ack_await_fut), }) } @@ -467,17 +492,25 @@ impl Task { let mut inner = self.0.lock().unwrap(); let origin = inner.state; - match origin { - TaskState::Stopped | TaskState::Error | TaskState::Prepared | TaskState::Preparing => { - gst::debug!(RUNTIME_CAT, "Unpreparing task"); - } - TaskState::Unprepared | TaskState::Unpreparing => { - gst::debug!(RUNTIME_CAT, "Task already {:?}", origin); - return Ok(TransitionStatus::Skipped { - trigger: Trigger::Unprepare, - state: origin, - }); - } + let mut state_machine_handle = match origin { + TaskState::Stopped + | TaskState::Error + | TaskState::Prepared + | TaskState::Preparing + | TaskState::Unprepared => match inner.state_machine_handle.take() { + Some(state_machine_handle) => { + gst::debug!(RUNTIME_CAT, "Unpreparing task"); + + state_machine_handle + } + None => { + gst::debug!(RUNTIME_CAT, "Task already unpreparing"); + return Ok(TransitionStatus::Skipped { + trigger: Trigger::Unprepare, + state: origin, + }); + } + }, state => { gst::warning!( RUNTIME_CAT, @@ -493,60 +526,46 @@ impl Task { ), }); } - } - - inner.state = TaskState::Unpreparing; - - if let Some(prepare_abort_handle) = inner.prepare_abort_handle.take() { - prepare_abort_handle.abort(); - } + }; inner.abort_task_loop(); - - let _ = inner.trigger(Trigger::Unprepare).unwrap(); - let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap(); - - let state_machine_handle = inner.state_machine_handle.take(); - let context = inner.context.take().unwrap(); + let ack_rx = state_machine_handle.trigger(Trigger::Unprepare); drop(inner); - match state_machine_handle { - Some(state_machine_handle) => { - let state_machine_end_fut = async { - state_machine_handle.await; + let state_machine_end_fut = async { + state_machine_handle.join().await; - drop(triggering_evt_tx); - drop(context); - - gst::debug!(RUNTIME_CAT, "Task unprepared"); - }; - - if let Some((cur_context, cur_task_id)) = Context::current_task() { - gst::log!( - RUNTIME_CAT, - "Will wait for state machine termination completion in subtask to task {:?} on context {}", - cur_task_id, - cur_context.name() - ); - let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(()))); - - return Ok(TransitionStatus::Async { - trigger: Trigger::Unprepare, - origin, - }); - } else { - gst::log!( - RUNTIME_CAT, - "Waiting for state machine termination on current thread" - ); - // Use a light-weight executor, no timer nor async io. - futures::executor::block_on(state_machine_end_fut) - } - } - None => { - drop(triggering_evt_tx); - drop(context); + let res = ack_rx.await.unwrap(); + if res.is_ok() { + gst::log!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res); + } else { + gst::error!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res); } + + res + }; + + if let Some(cur_context) = Context::current() { + let ack_handle = cur_context.spawn_and_unpark(state_machine_end_fut); + gst::log!( + RUNTIME_CAT, + "Will wait for state machine termination completion in {:?} on context {}", + ack_handle.task_id(), + cur_context.name() + ); + + return Ok(TransitionStatus::Async { + trigger: Trigger::Unprepare, + origin, + ack_handle, + }); + } else { + gst::log!( + RUNTIME_CAT, + "Waiting for state machine termination on current thread" + ); + // Use a light-weight executor, no timer nor async io. + futures::executor::block_on(state_machine_end_fut.map(drop)) } Ok(TransitionStatus::Complete { @@ -564,9 +583,9 @@ impl Task { let ack_rx = inner.trigger(Trigger::Start)?; if let TaskState::Started = inner.state { - return Ok(TransitionStatus::NotWaiting { + return Ok(TransitionStatus::Skipped { trigger: Trigger::Start, - origin: TaskState::Started, + state: TaskState::Started, }); } @@ -584,6 +603,7 @@ impl Task { let ack_rx = inner.trigger(Trigger::Pause)?; if let TaskState::Started = inner.state { + // FIXME this could be async return Ok(TransitionStatus::NotWaiting { trigger: Trigger::Pause, origin: TaskState::Started, @@ -626,25 +646,6 @@ impl Task { trigger: Trigger, ) -> Result { let origin = inner.state; - - // Since triggering events handling is serialized by the state machine and - // we hold a lock on TaskInner, we can verify if current spawned loop / tansition action - // task_id matches the task_id of current subtask, if any. - if let Some(spawned_task_id) = inner.spawned_task_id { - if let Some((cur_context, cur_task_id)) = Context::current_task() { - if cur_task_id == spawned_task_id && &cur_context == inner.context.as_ref().unwrap() - { - // Don't block as this would deadlock - gst::log!( - RUNTIME_CAT, - "Requested {:?} from loop or transition action, not waiting", - trigger, - ); - return Ok(TransitionStatus::NotWaiting { trigger, origin }); - } - } - } - drop(inner); let ack_await_fut = async move { @@ -660,17 +661,22 @@ impl Task { res }; - if let Some((cur_context, cur_task_id)) = Context::current_task() { + if let Some(cur_context) = Context::current() { + let ack_handle = cur_context.spawn(ack_await_fut); + gst::log!( RUNTIME_CAT, - "Will await ack for {:?} in subtask to task {:?} on context {}", + "Awaiting ack for {:?} in {:?} on context {}", trigger, - cur_task_id, + ack_handle.task_id(), cur_context.name() ); - let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(()))); - Ok(TransitionStatus::Async { trigger, origin }) + Ok(TransitionStatus::Async { + trigger, + origin, + ack_handle, + }) } else { gst::log!( RUNTIME_CAT, @@ -681,22 +687,6 @@ impl Task { futures::executor::block_on(ack_await_fut) } } - - fn spawn_state_machine( - &self, - state_machine: StateMachine, - context: &Context, - ) -> RemoteHandle<()> { - use futures::executor::ThreadPool; - use futures::task::SpawnExt; - use once_cell::sync::OnceCell; - - static EXECUTOR: OnceCell = OnceCell::new(); - EXECUTOR - .get_or_init(|| ThreadPool::builder().pool_size(1).create().unwrap()) - .spawn_with_handle(state_machine.run(Arc::clone(&self.0), context.clone())) - .unwrap() - } } struct StateMachine { @@ -711,13 +701,10 @@ struct StateMachine { // state between 2 elements. macro_rules! exec_action { - ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr, $context:expr) => {{ - let trigger = $triggering_evt.trigger; - - let action_fut = async move { - let mut res = $self.task_impl.$action().await; - - if res.is_ok() { + ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{ + match $self.task_impl.$action().await { + Ok(()) => { + let mut res; while Context::current_has_sub_tasks() { gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); res = Context::drain_sub_tasks().await.map_err(|err| { @@ -730,36 +717,18 @@ macro_rules! exec_action { break; } } + + Ok($triggering_evt) } + Err(err) => { + // FIXME problem is that we loose the origin trigger in the + // final TransitionStatus. - let res = match res { - Ok(()) => Ok(()), - Err(err) => { - let next_triggering_evt = $self - .task_impl - .handle_action_error(trigger, $origin, err) - .await; - Err(next_triggering_evt) - } - }; + let next_trigger = $self + .task_impl + .handle_action_error($triggering_evt.trigger, $origin, err) + .await; - ($self, res) - }; - - let join_handle = { - let mut task_inner = $task_inner.lock().unwrap(); - let join_handle = $context.spawn_and_unpark(action_fut); - task_inner.spawned_task_id = Some(join_handle.task_id()); - - join_handle - }; - - let (this, res) = join_handle.await.unwrap(); - $self = this; - - match res { - Ok(()) => Ok($triggering_evt), - Err(next_trigger) => { // Convert triggering event according to the error handler's decision gst::trace!( RUNTIME_CAT, @@ -780,41 +749,63 @@ macro_rules! exec_action { impl StateMachine { // Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization // without inducing any significant performance penalties. - fn new( + fn spawn( + task_inner: Arc>, task_impl: Box, - triggering_evt_rx: async_mpsc::Receiver, - ) -> Self { - StateMachine { + context: Context, + ) -> StateMachineHandle { + let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4); + + let state_machine = StateMachine { task_impl, triggering_evt_rx, pending_triggering_evt: None, + }; + + StateMachineHandle { + join_handle: context.spawn_and_unpark(state_machine.run(task_inner)), + triggering_evt_tx, + context, } } - async fn run(mut self, task_inner: Arc>, context: Context) { - gst::trace!(RUNTIME_CAT, "Preparing task"); + async fn run(mut self, task_inner: Arc>) { + let context = Context::current().expect("must be spawed on a Context"); + + let mut triggering_evt = self + .triggering_evt_rx + .next() + .await + .expect("triggering_evt_rx dropped"); + + if let Trigger::Prepare = triggering_evt.trigger { + gst::trace!(RUNTIME_CAT, "Preparing task"); - { - let (mut triggering_evt, _) = TriggeringEvent::new(Trigger::Prepare); let res = exec_action!( self, prepare, triggering_evt, - TaskState::Preparing, - &task_inner, - &context + TaskState::Unprepared, + &task_inner ); if let Ok(triggering_evt) = res { - task_inner - .lock() - .unwrap() - .switch_to_state(TaskState::Prepared, triggering_evt); + let mut task_inner = task_inner.lock().unwrap(); + let res = Ok(TransitionStatus::Complete { + origin: TaskState::Unprepared, + target: TaskState::Prepared, + }); + + task_inner.state = TaskState::Prepared; + triggering_evt.send_ack(res); + gst::trace!(RUNTIME_CAT, "Task Prepared"); } + } else { + panic!("Unexpected initial trigger {:?}", triggering_evt.trigger); } loop { - let mut triggering_evt = match self.pending_triggering_evt.take() { + triggering_evt = match self.pending_triggering_evt.take() { Some(pending_triggering_evt) => pending_triggering_evt, None => self .triggering_evt_rx @@ -859,8 +850,7 @@ impl StateMachine { origin }; - self = - Self::spawn_loop(self, triggering_evt, origin, &task_inner, &context).await; + self = Self::start(self, triggering_evt, origin, &task_inner, &context).await; // next/pending triggering event handled in next iteration } Trigger::Pause => { @@ -884,8 +874,7 @@ impl StateMachine { } }; - let res = - exec_action!(self, pause, triggering_evt, origin, &task_inner, &context); + let res = exec_action!(self, pause, triggering_evt, origin, &task_inner); if let Ok(triggering_evt) = res { task_inner .lock() @@ -915,8 +904,7 @@ impl StateMachine { } }; - let res = - exec_action!(self, stop, triggering_evt, origin, &task_inner, &context); + let res = exec_action!(self, stop, triggering_evt, origin, &task_inner); if let Ok(triggering_evt) = res { task_inner .lock() @@ -944,14 +932,7 @@ impl StateMachine { } }; - let res = exec_action!( - self, - flush_start, - triggering_evt, - origin, - &task_inner, - &context - ); + let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner); if let Ok(triggering_evt) = res { task_inner .lock() @@ -979,14 +960,7 @@ impl StateMachine { } }; - let res = exec_action!( - self, - flush_stop, - triggering_evt, - origin, - &task_inner, - &context - ); + let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner); if let Ok(triggering_evt) = res { if is_paused { task_inner @@ -995,38 +969,26 @@ impl StateMachine { .switch_to_state(TaskState::Paused, triggering_evt); gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused"); } else { - self = Self::spawn_loop( - self, - triggering_evt, - origin, - &task_inner, - &context, - ) - .await; + self = Self::start(self, triggering_evt, origin, &task_inner, &context) + .await; // next/pending triggering event handled in next iteration } } } Trigger::Unprepare => { - // Unprepare is not joined by an ack_rx but by joining the state machine - // handle, so we don't need to keep track of the spwaned_task_id - context - .spawn_and_unpark(async move { - self.task_impl.unprepare().await; + // Unprepare is not joined by an ack_rx but by joining the state machine handle + self.task_impl.unprepare().await; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); - let res = Context::drain_sub_tasks().await.map_err(|err| { - gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); - err - }); - if res.is_err() { - break; - } - } - }) - .await - .unwrap(); + while Context::current_has_sub_tasks() { + gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); + let res = Context::drain_sub_tasks().await.map_err(|err| { + gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); + err + }); + if res.is_err() { + break; + } + } task_inner .lock() @@ -1042,87 +1004,45 @@ impl StateMachine { gst::trace!(RUNTIME_CAT, "Task state machine terminated"); } - async fn spawn_loop( + async fn start( mut self, mut triggering_evt: TriggeringEvent, origin: TaskState, task_inner: &Arc>, context: &Context, ) -> Self { - let task_inner_clone = Arc::clone(task_inner); - let loop_fut = async move { - let mut res = self.task_impl.start().await; - if res.is_ok() { - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for start"); - res = Context::drain_sub_tasks().await.map_err(|err| { - let msg = format!("start subtask returned {:?}", err); - gst::log!(RUNTIME_CAT, "{}", &msg); - gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg]) - }); + let triggering_evt = match exec_action!(self, start, triggering_evt, origin, &task_inner) { + Ok(triggering_evt) => triggering_evt, + Err(_) => return self, + }; - if res.is_err() { - break; - } - } + let task_inner_cl = Arc::clone(task_inner); + let loop_fut = async move { + let (abortable_task_loop, loop_abort_handle) = + abortable(self.run_loop(Arc::clone(&task_inner_cl))); + + { + let mut task_inner = task_inner_cl.lock().unwrap(); + task_inner.loop_abort_handle = Some(loop_abort_handle); + task_inner.switch_to_state(TaskState::Started, triggering_evt); + + gst::trace!(RUNTIME_CAT, "Starting task loop"); } - match res { - Ok(()) => { - let abortable_task_loop = { - let (abortable_task_loop, loop_abort_handle) = - abortable(self.run_loop(Arc::clone(&task_inner_clone))); - - let mut task_inner = task_inner_clone.lock().unwrap(); - task_inner.loop_abort_handle = Some(loop_abort_handle); - task_inner.switch_to_state(TaskState::Started, triggering_evt); - - abortable_task_loop - }; - - gst::trace!(RUNTIME_CAT, "Starting task loop"); - match abortable_task_loop.await { - Ok(Ok(())) => (), - Ok(Err(err)) => { - let next_trigger = self.task_impl.handle_iterate_error(err).await; - let (triggering_evt, _) = TriggeringEvent::new(next_trigger); - self.pending_triggering_evt = Some(triggering_evt); - } - Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"), - } - } - Err(err) => { - // Error while executing start transition action - let next_trigger = self - .task_impl - .handle_action_error(triggering_evt.trigger, origin, err) - .await; - - gst::log!( - RUNTIME_CAT, - "TaskImpl transition action error: converting Start to {:?}", - next_trigger, - ); - - triggering_evt.trigger = next_trigger; + match abortable_task_loop.await { + Ok(Ok(())) => (), + Ok(Err(err)) => { + let next_trigger = self.task_impl.handle_iterate_error(err).await; + let (triggering_evt, _) = TriggeringEvent::new(next_trigger); self.pending_triggering_evt = Some(triggering_evt); } + Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"), } - // next/pending triggering event handled in state machine loop - self }; - let join_handle = { - let mut task_inner = task_inner.lock().unwrap(); - let join_handle = context.spawn_and_unpark(loop_fut); - task_inner.spawned_task_id = Some(join_handle.task_id()); - - join_handle - }; - - join_handle.await.unwrap() + context.spawn_and_unpark(loop_fut).await.unwrap() } async fn run_loop(&mut self, task_inner: Arc>) -> Result<(), gst::FlowError> { @@ -1291,40 +1211,48 @@ mod tests { let (stopped_sender, mut stopped_receiver) = mpsc::channel(1); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1); - let res = task - .prepare( - TaskTest { - prepared_sender, - started_sender, - iterate_sender, - complete_iterate_receiver, - paused_sender, - stopped_sender, - flush_start_sender, - unprepared_sender, - }, - context, - ) - .unwrap(); - assert_eq!( - res, - TransitionStatus::Async { + let res = task.prepare( + TaskTest { + prepared_sender, + started_sender, + iterate_sender, + complete_iterate_receiver, + paused_sender, + stopped_sender, + flush_start_sender, + unprepared_sender, + }, + context, + ); + + let prepare_ack_handle = match res { + Ok(TransitionStatus::Async { trigger: Trigger::Prepare, origin: TaskState::Unprepared, - } - ); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; gst::debug!(RUNTIME_CAT, "task_iterate: starting (initial)"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Prepared, - target: TaskState::Started - }, - ); + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); - // At this point, prepared must be completed + // At this point, preparation must be complete + match block_on(prepare_ack_handle).unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Unprepared, + target: TaskState::Prepared, + }) => (), + other => panic!("unexpected {:?}", other), + } block_on(prepared_receiver.next()).unwrap(); // ... and start executed block_on(started_receiver.next()).unwrap(); @@ -1336,24 +1264,24 @@ mod tests { block_on(complete_iterate_sender.send(Ok(()))).unwrap(); gst::debug!(RUNTIME_CAT, "task_iterate: starting (redundant)"); - // start will return immediately - assert_eq!( - task.start().unwrap(), - TransitionStatus::NotWaiting { + // already started + match task.start() { + Ok(TransitionStatus::Skipped { trigger: Trigger::Start, - origin: TaskState::Started, - }, - ); + state: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); gst::debug!(RUNTIME_CAT, "task_iterate: pause (initial)"); - assert_eq!( - task.pause().unwrap(), - TransitionStatus::NotWaiting { + match task.pause() { + Ok(TransitionStatus::NotWaiting { trigger: Trigger::Pause, origin: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } // Pause transition is asynchronous while TaskState::Paused != task.state() { @@ -1369,26 +1297,26 @@ mod tests { block_on(paused_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "task_iterate: starting (after pause)"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Paused, - target: TaskState::Started - }, - ); + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); // Paused -> Started let _ = block_on(started_receiver.next()); gst::debug!(RUNTIME_CAT, "task_iterate: stopping"); - assert_eq!( - task.stop().unwrap(), - TransitionStatus::Complete { + match task.stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Started, - target: TaskState::Stopped - }, - ); + target: TaskState::Stopped, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Stopped); let _ = block_on(stopped_receiver.next()); @@ -1397,13 +1325,13 @@ mod tests { let _ = iterate_receiver.try_next(); gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Stopped, - target: TaskState::Started - }, - ); + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } let _ = block_on(started_receiver.next()); gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos"); @@ -1419,13 +1347,13 @@ mod tests { } gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Stopped, - target: TaskState::Started - }, - ); + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } let _ = block_on(started_receiver.next()); gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing"); @@ -1441,13 +1369,13 @@ mod tests { } gst::debug!(RUNTIME_CAT, "task_iterate: stop flushing"); - assert_eq!( - task.flush_stop().unwrap(), - TransitionStatus::Complete { + match task.flush_stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Flushing, - target: TaskState::Started - }, - ); + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } let _ = block_on(started_receiver.next()); gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error"); @@ -1463,23 +1391,22 @@ mod tests { RUNTIME_CAT, "task_iterate: attempting to start (after Error)" ); - let err = task.start().unwrap_err(); - match err { - TransitionError { + match task.start() { + Err(TransitionError { trigger: Trigger::Start, state: TaskState::Error, .. - } => (), + }) => (), _ => unreachable!(), } - assert_eq!( - task.unprepare().unwrap(), - TransitionStatus::Complete { + match task.unprepare() { + Ok(TransitionStatus::Complete { origin: TaskState::Error, - target: TaskState::Unprepared - }, - ); + target: TaskState::Unprepared, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Unprepared); let _ = block_on(unprepared_receiver.next()); @@ -1518,7 +1445,7 @@ mod tests { err ); match (trigger, state) { - (Trigger::Prepare, TaskState::Preparing) => { + (Trigger::Prepare, TaskState::Unprepared) => { self.prepare_error_sender.send(()).await.unwrap(); } other => unreachable!("{:?}", other), @@ -1559,13 +1486,12 @@ mod tests { std::thread::sleep(Duration::from_millis(2)); } - let res = task.start().unwrap_err(); - match res { - TransitionError { + match task.start() { + Err(TransitionError { trigger: Trigger::Start, state: TaskState::Error, .. - } => (), + }) => (), other => unreachable!("{:?}", other), } @@ -1627,41 +1553,61 @@ mod tests { .unwrap(); let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); - let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { - assert_eq!(task_clone.state(), TaskState::Preparing); + assert_eq!(task.state(), TaskState::Preparing); gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting"); - assert_eq!( - task_clone.start().unwrap(), - TransitionStatus::Async { + let ack_handle = match task.start() { + Ok(TransitionStatus::Async { trigger: Trigger::Start, origin: TaskState::Preparing, - } - ); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; ready_sender.send(()).unwrap(); - Context::drain_sub_tasks().await.unwrap(); - assert_eq!(task_clone.state(), TaskState::Started); + match ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Prepared, + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } + assert_eq!(task.state(), TaskState::Started); - assert_eq!( - task.stop().unwrap(), - TransitionStatus::Async { + let ack_handle = match task.stop() { + Ok(TransitionStatus::Async { trigger: Trigger::Stop, origin: TaskState::Started, - }, - ); - Context::drain_sub_tasks().await.unwrap(); - assert_eq!(task_clone.state(), TaskState::Stopped); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Started, + target: TaskState::Stopped, + }) => (), + other => panic!("unexpected {:?}", other), + } + assert_eq!(task.state(), TaskState::Stopped); - assert_eq!( - task.unprepare().unwrap(), - TransitionStatus::Async { + let ack_handle = match task.unprepare() { + Ok(TransitionStatus::Async { trigger: Trigger::Unprepare, origin: TaskState::Stopped, - }, - ); - Context::drain_sub_tasks().await.unwrap(); - assert_eq!(task_clone.state(), TaskState::Unprepared); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Stopped, + target: TaskState::Unprepared, + }) => (), + other => panic!("unexpected {:?}", other), + } + assert_eq!(task.state(), TaskState::Unprepared); }); gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx"); @@ -1715,10 +1661,10 @@ mod tests { err ); match (trigger, state) { - (Trigger::Prepare, TaskState::Preparing) => { + (Trigger::Prepare, TaskState::Unprepared) => { self.prepare_error_sender.send(()).await.unwrap(); } - other => unreachable!("action error for {:?}", other), + other => panic!("action error for {:?}", other), } Trigger::Error } @@ -1740,33 +1686,55 @@ mod tests { let (mut prepare_sender, prepare_receiver) = mpsc::channel(1); let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); - task.prepare( + let res = task.prepare( TaskPrepareTest { prepare_receiver, prepare_error_sender, }, context, - ) - .unwrap(); + ); + let prepare_ack = match res { + Ok(TransitionStatus::Async { + trigger: Trigger::Prepare, + origin: TaskState::Unprepared, + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap(); - let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { - assert_eq!(task_clone.state(), TaskState::Preparing); gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)"); - task_clone.start().unwrap(); + task.start().unwrap(); ready_sender.send(()).unwrap(); - Context::drain_sub_tasks().await.unwrap(); + // FIXME we loose the origin Trigger (Start) + // and only get the Trigger returned by handle_action_error + // see also: comment in exec_action! + match prepare_ack.await.unwrap() { + Err(TransitionError { + trigger: Trigger::Error, + state: TaskState::Preparing, + .. + }) => (), + other => panic!("unexpected transition res {:?}", other), + } - assert_eq!( - task.unprepare().unwrap(), + let ack_handle = match task.unprepare().unwrap() { TransitionStatus::Async { trigger: Trigger::Unprepare, origin: TaskState::Error, - }, - ); - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + } => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Error, + target: TaskState::Unprepared, + }) => (), + other => panic!("unexpected {:?}", other), + } }); gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx"); @@ -1840,26 +1808,26 @@ mod tests { .unwrap(); gst::debug!(RUNTIME_CAT, "pause_start: starting"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Prepared, target: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration"); block_on(iterate_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)"); - assert_eq!( - task.pause().unwrap(), - TransitionStatus::NotWaiting { + match task.pause() { + Ok(TransitionStatus::NotWaiting { trigger: Trigger::Pause, origin: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion"); complete_sender.try_send(()).unwrap(); @@ -1874,14 +1842,13 @@ mod tests { // Loop held on due to Pause iterate_receiver.try_next().unwrap_err(); - - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Paused, target: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration"); @@ -1995,25 +1962,25 @@ mod tests { task.start().unwrap(); gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush"); - assert_eq!( - task.flush_start().unwrap(), - TransitionStatus::Complete { + match task.flush_start() { + Ok(TransitionStatus::Complete { origin: TaskState::Started, target: TaskState::Flushing, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Flushing); block_on(flush_start_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush"); - assert_eq!( - task.flush_stop().unwrap(), - TransitionStatus::Complete { + match task.flush_stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Flushing, target: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); block_on(flush_stop_receiver.next()).unwrap(); @@ -2090,25 +2057,39 @@ mod tests { let task_clone = task.clone(); let flush_handle = oob_context.spawn(async move { - assert_eq!( - task_clone.flush_start().unwrap(), - TransitionStatus::Async { + let flush_ack_handle = match task_clone.flush_start() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStart, origin: TaskState::Started, - }, - ); - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match flush_ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Started, + target: TaskState::Flushing, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task_clone.state(), TaskState::Flushing); flush_start_receiver.next().await.unwrap(); - assert_eq!( - task_clone.flush_stop().unwrap(), - TransitionStatus::Async { + let flush_stop_ack_handle = match task_clone.flush_stop() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStop, origin: TaskState::Flushing, - }, - ); - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match flush_stop_ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Flushing, + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task_clone.state(), TaskState::Started); }); @@ -2165,33 +2146,47 @@ mod tests { flush_start_sender, flush_stop_sender, }, - context, + context.clone(), ) .unwrap(); task.start().unwrap(); let task_clone = task.clone(); - let flush_handle = task.context().as_ref().unwrap().spawn(async move { - assert_eq!( - task_clone.flush_start().unwrap(), - TransitionStatus::Async { + let flush_handle = context.spawn(async move { + let flush_ack_handle = match task_clone.flush_start() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStart, origin: TaskState::Started, - }, - ); - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match flush_ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Started, + target: TaskState::Flushing, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task_clone.state(), TaskState::Flushing); flush_start_receiver.next().await.unwrap(); - assert_eq!( - task_clone.flush_stop().unwrap(), - TransitionStatus::Async { + let flush_stop_ack_handle = match task_clone.flush_stop() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStop, origin: TaskState::Flushing, - }, - ); - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + }) => ack_handle, + other => panic!("unexpected {:?}", other), + }; + match flush_stop_ack_handle.await.unwrap() { + Ok(TransitionStatus::Complete { + origin: TaskState::Flushing, + target: TaskState::Started, + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task_clone.state(), TaskState::Started); }); @@ -2216,13 +2211,14 @@ mod tests { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration"); - assert_eq!( - self.task.flush_start().unwrap(), - TransitionStatus::NotWaiting { + match self.task.flush_start() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStart, origin: TaskState::Started, - }, - ); + .. + }) => (), + other => panic!("unexpected {:?}", other), + } Ok(()) } .boxed() @@ -2260,13 +2256,13 @@ mod tests { ); block_on(flush_start_receiver.next()).unwrap(); - assert_eq!( - task.stop().unwrap(), - TransitionStatus::Complete { + match task.stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Flushing, target: TaskState::Stopped, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } task.unprepare().unwrap(); } @@ -2289,13 +2285,14 @@ mod tests { crate::runtime::time::delay_for(Duration::from_millis(50)).await; gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration"); - assert_eq!( - self.task.pause().unwrap(), - TransitionStatus::NotWaiting { + match self.task.pause() { + Ok(TransitionStatus::NotWaiting { trigger: Trigger::Pause, origin: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } + Ok(()) } .boxed() @@ -2355,13 +2352,15 @@ mod tests { RUNTIME_CAT, "trigger_from_action: flush_start triggering flush_stop" ); - assert_eq!( - self.task.flush_stop().unwrap(), - TransitionStatus::NotWaiting { + match self.task.flush_stop() { + Ok(TransitionStatus::Async { trigger: Trigger::FlushStop, origin: TaskState::Started, - }, - ); + .. + }) => (), + other => panic!("unexpected {:?}", other), + } + Ok(()) } .boxed() @@ -2467,33 +2466,33 @@ mod tests { // Pause, FlushStart, FlushStop, Start gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing"); - assert_eq!( - task.pause().unwrap(), - TransitionStatus::Complete { + match task.pause() { + Ok(TransitionStatus::Complete { origin: TaskState::Prepared, target: TaskState::Paused, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush"); - assert_eq!( - task.flush_start().unwrap(), - TransitionStatus::Complete { + match task.flush_start() { + Ok(TransitionStatus::Complete { origin: TaskState::Paused, target: TaskState::PausedFlushing, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::PausedFlushing); block_on(flush_start_receiver.next()); gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush"); - assert_eq!( - task.flush_stop().unwrap(), - TransitionStatus::Complete { + match task.flush_stop() { + Ok(TransitionStatus::Complete { origin: TaskState::PausedFlushing, target: TaskState::Paused, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Paused); block_on(flush_stop_receiver.next()); @@ -2501,13 +2500,13 @@ mod tests { started_receiver.try_next().unwrap_err(); gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::Paused, target: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); block_on(started_receiver.next()); @@ -2586,26 +2585,26 @@ mod tests { block_on(flush_start_receiver.next()); gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing"); - assert_eq!( - task.start().unwrap(), - TransitionStatus::Complete { + match task.start() { + Ok(TransitionStatus::Complete { origin: TaskState::PausedFlushing, target: TaskState::Flushing, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Flushing); // start action not executed started_receiver.try_next().unwrap_err(); gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush"); - assert_eq!( - task.flush_stop().unwrap(), - TransitionStatus::Complete { + match task.flush_stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Flushing, target: TaskState::Started, - }, - ); + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); block_on(flush_stop_receiver.next()); block_on(started_receiver.next()); @@ -2677,18 +2676,20 @@ mod tests { gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start"); ready_sender.send(()).unwrap(); let res = task_clone.flush_start().unwrap(); - match res { + let ack_handle = match res { TransitionStatus::Async { trigger: Trigger::FlushStart, origin: TaskState::Paused, - } => (), + ack_handle, + } => ack_handle, TransitionStatus::Async { trigger: Trigger::FlushStart, origin: TaskState::Started, - } => (), - other => unreachable!("{:?}", other), - } - Context::drain_sub_tasks().await.unwrap(); + ack_handle, + } => ack_handle, + other => panic!("unexpected {:?}", other), + }; + ack_handle.await.unwrap().unwrap(); flush_start_receiver.next().await.unwrap(); }); @@ -2709,20 +2710,19 @@ mod tests { origin: TaskState::PausedFlushing, target: TaskState::Flushing, } => (), - other => unreachable!("{:?}", other), + other => panic!("unexpected {:?}", other), } block_on(flush_start_handle).unwrap(); gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop"); - assert_eq!( - task.flush_stop().unwrap(), - TransitionStatus::Complete { + match task.flush_stop() { + Ok(TransitionStatus::Complete { origin: TaskState::Flushing, target: TaskState::Started, - }, - ); - + }) => (), + other => panic!("unexpected {:?}", other), + } assert_eq!(task.state(), TaskState::Started); block_on(flush_stop_receiver.next());