diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index d3fdf39f..55908618 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2020 François Laignel +// Copyright (C) 2019-2022 François Laignel // Copyright (C) 2020 Sebastian Dröge // // This library is free software; you can redistribute it and/or @@ -28,8 +28,10 @@ use futures::stream::StreamExt; use std::fmt; use std::ops::Deref; +use std::pin::Pin; use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::Poll; use super::{Context, JoinHandle, RUNTIME_CAT}; @@ -58,6 +60,25 @@ pub enum Trigger { Unprepare, } +/// Transition success details. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum TransitionOk { + /// Transition completed successfully. + Complete { + origin: TaskState, + target: TaskState, + }, + /// Not waiting for transition result. + /// + /// This is to prevent: + /// - A deadlock when executing a transition action. + /// - A potential infinite wait when pausing a running loop + /// which could be awaiting for an `iterate` to complete. + NotWaiting { trigger: Trigger, origin: TaskState }, + /// Skipping triggering event due to current state. + Skipped { trigger: Trigger, state: TaskState }, +} + /// TriggeringEvent error details. #[derive(Clone, Debug, Eq, PartialEq)] pub struct TransitionError { @@ -84,35 +105,214 @@ impl From for gst::ErrorMessage { } } -// FIXME impl Future so that we can await without matching on the variant -/// Transition details. +/// Transition status. /// /// A state transition occurs as a result of a triggering event. -#[derive(Debug)] +/// The triggering event is asynchronously handled by a state machine +/// running on a [`Context`]. +#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"] pub enum TransitionStatus { - /// Transition completed successfully. - Complete { - origin: TaskState, - target: TaskState, - }, - /// Asynchronously awaiting for transition completion. - /// - /// This occurs when the event is triggered from a `Context`. - Async { + /// Transition result is ready. + Ready(Result), + /// Transition is pending. + Pending { trigger: Trigger, origin: TaskState, - ack_handle: JoinHandle>, + res_fut: Pin> + Send>>, }, - // FIXME remove or edit doc - /// Not waiting for transition completion. +} + +impl TransitionStatus { + pub fn is_ready(&self) -> bool { + matches!(self, TransitionStatus::Ready { .. }) + } + + pub fn is_pending(&self) -> bool { + matches!(self, TransitionStatus::Pending { .. }) + } + + /// Converts the `TransitionStatus` into a `Result`. /// - /// This is to prevent: - /// - A deadlock when executing a transition action. - /// - A potential infinite wait when pausing a running loop - /// which could be awaiting for an `iterate` to complete. - NotWaiting { trigger: Trigger, origin: TaskState }, - /// Skipping triggering event due to current state. - Skipped { trigger: Trigger, state: TaskState }, + /// This function allows getting the `TransitionError` when + /// the transition result is ready without `await`ing nor blocking. + /// + /// See also [`Self::await_maybe_on_context`]. + // FIXME once stabilized, this could use https://github.com/rust-lang/rust/issues/84277 + pub fn check(self) -> Result { + match self { + TransitionStatus::Ready(Err(err)) => Err(err), + other => Ok(other), + } + } + + /// Awaits for this transition to complete, possibly while running on a [`Context`]. + /// + /// Notes: + /// + /// - When running in an `async` block within a running transition or + /// task iteration, don't await for the transition as it would deadlock. + /// Use [`Self::check`] to make sure the state transition is valid. + /// - When running in an `async` block out of a running transition or + /// task iteration, just `.await` normally. E.g.: + /// + /// ``` + /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError}; + /// # async fn async_fn() -> Result { + /// # let task = Task::default(); + /// let flush_ok = task.flush_start().await?; + /// # Ok(flush_ok) + /// # } + /// ``` + /// + /// This function makes sure the transition completes successfully or + /// produces an error. It must be used in situations where we don't know + /// whether we are running on a [`Context`] or not. This is the case for + /// functions in [`PadSrc`] and [`PadSink`] as well as the synchronous + /// functions transitively called from them. + /// + /// As an example, a `PadSrc::src_event` function which handles a + /// `FlushStart` could call: + /// + /// ``` + /// # fn src_event() -> bool { + /// # let task = gstthreadshare::runtime::Task::default(); + /// return task + /// .flush_start() + /// .await_maybe_on_context() + /// .is_ok(); + /// # } + /// ``` + /// + /// If the transition is already complete, the result is returned immediately. + /// + /// If we are NOT running on a [`Context`], the transition result is awaited + /// by blocking on current thread and the result is returned. + /// + /// If we are running on a [`Context`], the transition result is awaited + /// in a sub task for current [`Context`]'s Scheduler task. As a consequence, + /// the sub task will be awaited in usual [`Context::drain_sub_tasks`] + /// rendezvous, ensuring some kind of synchronization. To avoid deadlocks, + /// `Ok(TransitionOk::NotWaiting { .. })` is immediately returned. + /// + /// [`PadSrc`]: ../pad/struct.PadSrc.html + /// [`PadSink`]: ../pad/struct.PadSink.html + pub fn await_maybe_on_context(self) -> Result { + use TransitionStatus::*; + match self { + Pending { + trigger, + origin, + res_fut, + } => { + if let Some(cur_ctx) = Context::current() { + gst::debug!( + RUNTIME_CAT, + "Awaiting for {:?} ack in a subtask on context {}", + trigger, + cur_ctx.name() + ); + let _ = Context::add_sub_task(async move { + let res = res_fut.await; + if res.is_ok() { + gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); + } else { + gst::error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); + } + + Ok(()) + }); + + Ok(TransitionOk::NotWaiting { trigger, origin }) + } else { + gst::debug!( + RUNTIME_CAT, + "Awaiting for {:?} ack on current thread", + trigger, + ); + futures::executor::block_on(res_fut) + } + } + Ready(res) => res, + } + } + + /// Awaits for this transition to complete by blocking current thread. + /// + /// This function blocks until the transition completes successfully or + /// produces an error. + /// + /// In situations where we don't know whether we are running on a [`Context`] + /// or not, use [`Self::await_maybe_on_context`] instead. + /// + /// # Panics + /// + /// Panics if current thread is a [`Context`] thread. + pub fn block_on(self) -> Result { + assert!(!Context::is_context_thread()); + use TransitionStatus::*; + match self { + Pending { + trigger, res_fut, .. + } => { + gst::debug!( + RUNTIME_CAT, + "Awaiting for {:?} ack on current thread", + trigger, + ); + futures::executor::block_on(res_fut) + } + Ready(res) => res, + } + } +} + +impl Future for TransitionStatus { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + use TransitionStatus::*; + + match &mut *self { + Ready(res) => Poll::Ready(res.clone()), + Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(res) => { + *self = Ready(res.clone()); + + Poll::Ready(res) + } + }, + } + } +} + +impl From for TransitionStatus { + fn from(ok: TransitionOk) -> Self { + Self::Ready(Ok(ok)) + } +} + +impl From for TransitionStatus { + fn from(err: TransitionError) -> Self { + Self::Ready(Err(err)) + } +} + +// Explicit impl due to `res_fut` not implementing `Debug`. +impl fmt::Debug for TransitionStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use TransitionStatus::*; + match self { + Ready(res) => f.debug_tuple("Ready").field(res).finish(), + Pending { + trigger, origin, .. + } => f + .debug_struct("Pending") + .field("trigger", trigger) + .field("origin", origin) + .finish(), + } + } } /// Implementation trait for `Task`s. @@ -222,8 +422,8 @@ pub trait TaskImpl: Send + 'static { } } -type AckSender = oneshot::Sender>; -type AckReceiver = oneshot::Receiver>; +type AckSender = oneshot::Sender>; +type AckReceiver = oneshot::Receiver>; struct TriggeringEvent { trigger: Trigger, @@ -238,7 +438,7 @@ impl TriggeringEvent { (req, ack_rx) } - fn send_ack(self, res: Result) { + fn send_ack(self, res: Result) { let _ = self.ack_tx.send(res); } @@ -279,12 +479,7 @@ impl StateMachineHandle { let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); - - self.triggering_evt_tx - .try_send(triggering_evt) - .unwrap_or_else(|err| { - panic!("trigger channel failure: {}", err); - }); + self.triggering_evt_tx.try_send(triggering_evt).unwrap(); self.context.unpark(); @@ -317,7 +512,7 @@ impl Default for TaskInner { impl TaskInner { fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) { - let res = Ok(TransitionStatus::Complete { + let res = Ok(TransitionOk::Complete { origin: self.state, target: target_state, }); @@ -345,7 +540,7 @@ impl TaskInner { } fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) { - let res = Ok(TransitionStatus::Skipped { + let res = Ok(TransitionOk::Skipped { trigger: triggering_evt.trigger, state: self.state, }); @@ -424,11 +619,7 @@ impl Task { TaskStateGuard(self.0.lock().unwrap()) } - pub fn prepare( - &self, - task_impl: impl TaskImpl, - context: Context, - ) -> Result { + pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); let origin = inner.state; @@ -436,21 +627,23 @@ impl Task { TaskState::Unprepared => (), TaskState::Prepared | TaskState::Preparing => { gst::debug!(RUNTIME_CAT, "Task already {:?}", origin); - return Ok(TransitionStatus::Skipped { + return TransitionOk::Skipped { trigger: Trigger::Prepare, state: origin, - }); + } + .into(); } state => { gst::warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state); - return Err(TransitionError { + return TransitionError { trigger: Trigger::Prepare, state: inner.state, err_msg: gst::error_msg!( gst::CoreError::StateChange, ["Attempt to prepare Task in state {:?}", state] ), - }); + } + .into(); } } @@ -462,33 +655,23 @@ impl Task { inner.state_machine_handle = Some(StateMachine::spawn( self.0.clone(), Box::new(task_impl), - context.clone(), + context, )); - let ack_rx = inner.trigger(Trigger::Prepare)?; + let ack_rx = match inner.trigger(Trigger::Prepare) { + Ok(ack_rx) => ack_rx, + Err(err) => return err.into(), + }; drop(inner); - let ack_await_fut = async move { - gst::trace!(RUNTIME_CAT, "Awaiting ack for Prepare"); - - let res = ack_rx.await.unwrap(); - if res.is_ok() { - gst::log!(RUNTIME_CAT, "Received ack {:?} for Prepare", res); - } else { - gst::error!(RUNTIME_CAT, "Received ack {:?} for Prepare", res); - } - - res - }; - - Ok(TransitionStatus::Async { + TransitionStatus::Pending { trigger: Trigger::Prepare, origin: TaskState::Unprepared, - ack_handle: context.spawn_and_unpark(ack_await_fut), - }) + res_fut: Box::pin(ack_rx.map(Result::unwrap)), + } } - pub fn unprepare(&self) -> Result { + pub fn unprepare(&self) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); let origin = inner.state; @@ -505,10 +688,11 @@ impl Task { } None => { gst::debug!(RUNTIME_CAT, "Task already unpreparing"); - return Ok(TransitionStatus::Skipped { + return TransitionOk::Skipped { trigger: Trigger::Unprepare, state: origin, - }); + } + .into(); } }, state => { @@ -517,14 +701,15 @@ impl Task { "Attempt to unprepare Task in state {:?}", state ); - return Err(TransitionError { + return TransitionError { trigger: Trigger::Unprepare, state: inner.state, err_msg: gst::error_msg!( gst::CoreError::StateChange, ["Attempt to unprepare Task in state {:?}", state] ), - }); + } + .into(); } }; @@ -534,62 +719,43 @@ impl Task { let state_machine_end_fut = async { state_machine_handle.join().await; - - let res = ack_rx.await.unwrap(); - if res.is_ok() { - gst::log!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res); - } else { - gst::error!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res); - } - - res + ack_rx.await.unwrap() }; - if let Some(cur_context) = Context::current() { - let ack_handle = cur_context.spawn_and_unpark(state_machine_end_fut); - gst::log!( - RUNTIME_CAT, - "Will wait for state machine termination completion in {:?} on context {}", - ack_handle.task_id(), - cur_context.name() - ); - - return Ok(TransitionStatus::Async { - trigger: Trigger::Unprepare, - origin, - ack_handle, - }); - } else { - gst::log!( - RUNTIME_CAT, - "Waiting for state machine termination on current thread" - ); - // Use a light-weight executor, no timer nor async io. - futures::executor::block_on(state_machine_end_fut.map(drop)) - } - - Ok(TransitionStatus::Complete { + TransitionStatus::Pending { + trigger: Trigger::Unprepare, origin, - target: TaskState::Unprepared, - }) + res_fut: Box::pin(state_machine_end_fut), + } } /// Starts the `Task`. /// /// The execution occurs on the `Task` context. - pub fn start(&self) -> Result { + pub fn start(&self) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); - let ack_rx = inner.trigger(Trigger::Start)?; + let ack_rx = match inner.trigger(Trigger::Start) { + Ok(ack_rx) => ack_rx, + Err(err) => return err.into(), + }; if let TaskState::Started = inner.state { - return Ok(TransitionStatus::Skipped { + return TransitionOk::Skipped { trigger: Trigger::Start, state: TaskState::Started, - }); + } + .into(); } - Self::await_ack(inner, ack_rx, Trigger::Start) + let origin = inner.state; + drop(inner); + + TransitionStatus::Pending { + trigger: Trigger::Start, + origin, + res_fut: Box::pin(ack_rx.map(Result::unwrap)), + } } /// Requests the `Task` loop to pause. @@ -597,32 +763,43 @@ impl Task { /// If an iteration is in progress, it will run to completion, /// then no more iteration will be executed before `start` is called again. /// Therefore, it is not guaranteed that `Paused` is reached when `pause` returns. - pub fn pause(&self) -> Result { + pub fn pause(&self) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); - let ack_rx = inner.trigger(Trigger::Pause)?; + let ack_rx = match inner.trigger(Trigger::Pause) { + Ok(ack_rx) => ack_rx, + Err(err) => return err.into(), + }; if let TaskState::Started = inner.state { - // FIXME this could be async - return Ok(TransitionStatus::NotWaiting { + // FIXME this could be async when iterate is split into next_item / handle_item + return TransitionOk::NotWaiting { trigger: Trigger::Pause, origin: TaskState::Started, - }); + } + .into(); } - Self::await_ack(inner, ack_rx, Trigger::Pause) + let origin = inner.state; + drop(inner); + + TransitionStatus::Pending { + trigger: Trigger::Pause, + origin, + res_fut: Box::pin(ack_rx.map(Result::unwrap)), + } } - pub fn flush_start(&self) -> Result { + pub fn flush_start(&self) -> TransitionStatus { self.abort_push_await(Trigger::FlushStart) } - pub fn flush_stop(&self) -> Result { + pub fn flush_stop(&self) -> TransitionStatus { self.abort_push_await(Trigger::FlushStop) } /// Stops the `Started` `Task` and wait for it to finish. - pub fn stop(&self) -> Result { + pub fn stop(&self) -> TransitionStatus { self.abort_push_await(Trigger::Stop) } @@ -632,59 +809,22 @@ impl Task { /// - Aborts the iteration loop aborts. /// - Pushes the provided [`Trigger`]. /// - Awaits for the expected transition as usual. - fn abort_push_await(&self, trigger: Trigger) -> Result { + fn abort_push_await(&self, trigger: Trigger) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); inner.abort_task_loop(); - let ack_rx = inner.trigger(trigger)?; - Self::await_ack(inner, ack_rx, trigger) - } + let ack_rx = match inner.trigger(trigger) { + Ok(ack_rx) => ack_rx, + Err(err) => return err.into(), + }; - fn await_ack( - inner: MutexGuard, - ack_rx: oneshot::Receiver>, - trigger: Trigger, - ) -> Result { let origin = inner.state; drop(inner); - let ack_await_fut = async move { - gst::trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger); - - let res = ack_rx.await.unwrap(); - if res.is_ok() { - gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); - } else { - gst::error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); - } - - res - }; - - if let Some(cur_context) = Context::current() { - let ack_handle = cur_context.spawn(ack_await_fut); - - gst::log!( - RUNTIME_CAT, - "Awaiting ack for {:?} in {:?} on context {}", - trigger, - ack_handle.task_id(), - cur_context.name() - ); - - Ok(TransitionStatus::Async { - trigger, - origin, - ack_handle, - }) - } else { - gst::log!( - RUNTIME_CAT, - "Awaiting ack for {:?} on current thread", - trigger - ); - // Use a light-weight executor, no timer nor async io. - futures::executor::block_on(ack_await_fut) + TransitionStatus::Pending { + trigger, + origin, + res_fut: Box::pin(ack_rx.map(Result::unwrap)), } } } @@ -790,7 +930,7 @@ impl StateMachine { ); if let Ok(triggering_evt) = res { let mut task_inner = task_inner.lock().unwrap(); - let res = Ok(TransitionStatus::Complete { + let res = Ok(TransitionOk::Complete { origin: TaskState::Unprepared, target: TaskState::Prepared, }); @@ -1095,9 +1235,14 @@ mod tests { use futures::executor::block_on; use std::time::Duration; + use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *}; use crate::runtime::Context; - use super::*; + #[track_caller] + fn stop_then_unprepare(task: Task) { + task.stop().block_on().unwrap(); + task.unprepare().block_on().unwrap(); + } #[test] fn iterate() { @@ -1117,7 +1262,7 @@ mod tests { impl TaskImpl for TaskTest { fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: prepared"); + gst::debug!(RUNTIME_CAT, "iterate: prepared"); self.prepared_sender.send(()).await.unwrap(); Ok(()) } @@ -1126,7 +1271,7 @@ mod tests { fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: started"); + gst::debug!(RUNTIME_CAT, "iterate: started"); self.started_sender.send(()).await.unwrap(); Ok(()) } @@ -1135,21 +1280,18 @@ mod tests { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: entering iterate"); + gst::debug!(RUNTIME_CAT, "iterate: entering iterate"); self.iterate_sender.send(()).await.unwrap(); - gst::debug!( - RUNTIME_CAT, - "task_iterate: awaiting complete_iterate_receiver" - ); + gst::debug!(RUNTIME_CAT, "iterate: awaiting complete_iterate_receiver"); let res = self.complete_iterate_receiver.next().await.unwrap(); if res.is_ok() { - gst::debug!(RUNTIME_CAT, "task_iterate: received Ok => keep looping"); + gst::debug!(RUNTIME_CAT, "iterate: received Ok => keep looping"); } else { gst::debug!( RUNTIME_CAT, - "task_iterate: received {:?} => cancelling loop", + "iterate: received {:?} => cancelling loop", res ); } @@ -1161,7 +1303,7 @@ mod tests { fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: paused"); + gst::debug!(RUNTIME_CAT, "iterate: paused"); self.paused_sender.send(()).await.unwrap(); Ok(()) } @@ -1170,7 +1312,7 @@ mod tests { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: stopped"); + gst::debug!(RUNTIME_CAT, "iterate: stopped"); self.stopped_sender.send(()).await.unwrap(); Ok(()) } @@ -1179,7 +1321,7 @@ mod tests { fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: stopped"); + gst::debug!(RUNTIME_CAT, "iterate: stopped"); self.flush_start_sender.send(()).await.unwrap(); Ok(()) } @@ -1188,20 +1330,20 @@ mod tests { fn unprepare(&mut self) -> BoxFuture<'_, ()> { async move { - gst::debug!(RUNTIME_CAT, "task_iterate: unprepared"); + gst::debug!(RUNTIME_CAT, "iterate: unprepared"); self.unprepared_sender.send(()).await.unwrap(); } .boxed() } } - let context = Context::acquire("task_iterate", Duration::from_millis(2)).unwrap(); + let context = Context::acquire("iterate", Duration::from_millis(2)).unwrap(); let task = Task::default(); - assert_eq!(task.state(), TaskState::Unprepared); + assert_eq!(task.state(), Unprepared); - gst::debug!(RUNTIME_CAT, "task_iterate: preparing"); + gst::debug!(RUNTIME_CAT, "iterate: preparing"); let (prepared_sender, mut prepared_receiver) = mpsc::channel(1); let (started_sender, mut started_receiver) = mpsc::channel(1); @@ -1211,7 +1353,7 @@ mod tests { let (stopped_sender, mut stopped_receiver) = mpsc::channel(1); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1); - let res = task.prepare( + let prepare_status = task.prepare( TaskTest { prepared_sender, started_sender, @@ -1225,65 +1367,80 @@ mod tests { context, ); - let prepare_ack_handle = match res { - Ok(TransitionStatus::Async { - trigger: Trigger::Prepare, - origin: TaskState::Unprepared, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + assert!(prepare_status.is_pending()); + match prepare_status { + Pending { + trigger: Prepare, + origin: Unprepared, + .. + } => (), + other => panic!("{:?}", other), }; - gst::debug!(RUNTIME_CAT, "task_iterate: starting (initial)"); + gst::debug!(RUNTIME_CAT, "iterate: starting (async prepare)"); + // also tests await_maybe_on_context + assert_eq!( + task.start().await_maybe_on_context().unwrap(), + Complete { + origin: Prepared, + target: Started, + } + ); + assert_eq!(task.state(), Started); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Prepared, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - - assert_eq!(task.state(), TaskState::Started); // At this point, preparation must be complete - match block_on(prepare_ack_handle).unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Unprepared, - target: TaskState::Prepared, - }) => (), - other => panic!("unexpected {:?}", other), - } + // also tests await_maybe_on_context + assert_eq!( + prepare_status.await_maybe_on_context().unwrap(), + Complete { + origin: Unprepared, + target: Prepared, + }, + ); block_on(prepared_receiver.next()).unwrap(); // ... and start executed block_on(started_receiver.next()).unwrap(); - assert_eq!(task.state(), TaskState::Started); + assert_eq!(task.state(), Started); // unlock task loop and keep looping block_on(iterate_receiver.next()).unwrap(); block_on(complete_iterate_sender.send(Ok(()))).unwrap(); - gst::debug!(RUNTIME_CAT, "task_iterate: starting (redundant)"); + gst::debug!(RUNTIME_CAT, "iterate: starting (redundant)"); // already started - match task.start() { - Ok(TransitionStatus::Skipped { - trigger: Trigger::Start, - state: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.start().block_on().unwrap(), + Skipped { + trigger: Start, + state: Started, + }, + ); + assert_eq!(task.state(), Started); - gst::debug!(RUNTIME_CAT, "task_iterate: pause (initial)"); - match task.pause() { - Ok(TransitionStatus::NotWaiting { - trigger: Trigger::Pause, - origin: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), + // Attempt to prepare Task in state Started (also tests check) + match task.unprepare().check().unwrap_err() { + TransitionError { + trigger: Unprepare, + state: Started, + .. + } => (), + other => panic!("{:?}", other), } - // Pause transition is asynchronous + gst::debug!(RUNTIME_CAT, "iterate: pause (initial)"); + let pause_status = task.pause(); + assert!(pause_status.is_ready()); + // also tests `check` + match pause_status.check().unwrap() { + Ready(Ok(NotWaiting { + trigger: Pause, + origin: Started, + })) => (), + other => panic!("{:?}", other), + } + + // Pause transition is asynchronous FIXME while TaskState::Paused != task.state() { std::thread::sleep(Duration::from_millis(2)); @@ -1293,52 +1450,52 @@ mod tests { } } - gst::debug!(RUNTIME_CAT, "task_iterate: awaiting pause ack"); + gst::debug!(RUNTIME_CAT, "iterate: awaiting pause ack"); block_on(paused_receiver.next()).unwrap(); - gst::debug!(RUNTIME_CAT, "task_iterate: starting (after pause)"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Paused, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } + gst::debug!(RUNTIME_CAT, "iterate: starting (after pause)"); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Paused, + target: Started, + }, + ); - assert_eq!(task.state(), TaskState::Started); + assert_eq!(task.state(), Started); // Paused -> Started let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "task_iterate: stopping"); - match task.stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Started, - target: TaskState::Stopped, - }) => (), - other => panic!("unexpected {:?}", other), - } + gst::debug!(RUNTIME_CAT, "iterate: stopping"); + assert_eq!( + task.stop().block_on().unwrap(), + Complete { + origin: Started, + target: Stopped, + }, + ); - assert_eq!(task.state(), TaskState::Stopped); + assert_eq!(task.state(), Stopped); let _ = block_on(stopped_receiver.next()); // purge remaining iteration received before stop if any let _ = iterate_receiver.try_next(); - gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Stopped, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } + gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)"); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Stopped, + target: Started, + }, + ); let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos"); + gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Eos"); block_on(iterate_receiver.next()).unwrap(); block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap(); - gst::debug!(RUNTIME_CAT, "task_iterate: awaiting stop ack"); + gst::debug!(RUNTIME_CAT, "iterate: awaiting stop ack"); block_on(stopped_receiver.next()).unwrap(); // Wait for state machine to reach Stopped @@ -1346,21 +1503,21 @@ mod tests { std::thread::sleep(Duration::from_millis(2)); } - gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Stopped, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } + gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)"); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Stopped, + target: Started, + }, + ); let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing"); + gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Flushing"); block_on(iterate_receiver.next()).unwrap(); block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap(); - gst::debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start ack"); + gst::debug!(RUNTIME_CAT, "iterate: awaiting flush_start ack"); block_on(flush_start_receiver.next()).unwrap(); // Wait for state machine to reach Flushing @@ -1368,17 +1525,17 @@ mod tests { std::thread::sleep(Duration::from_millis(2)); } - gst::debug!(RUNTIME_CAT, "task_iterate: stop flushing"); - match task.flush_stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } + gst::debug!(RUNTIME_CAT, "iterate: stop flushing"); + assert_eq!( + task.flush_stop().block_on().unwrap(), + Complete { + origin: Flushing, + target: Started, + }, + ); let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error"); + gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Error"); block_on(iterate_receiver.next()).unwrap(); block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap(); @@ -1387,28 +1544,25 @@ mod tests { std::thread::sleep(Duration::from_millis(2)); } - gst::debug!( - RUNTIME_CAT, - "task_iterate: attempting to start (after Error)" - ); - match task.start() { - Err(TransitionError { - trigger: Trigger::Start, + gst::debug!(RUNTIME_CAT, "iterate: attempting to start (after Error)"); + match task.start().block_on().unwrap_err() { + TransitionError { + trigger: Start, state: TaskState::Error, .. - }) => (), - _ => unreachable!(), + } => (), + other => panic!("{:?}", other), } - match task.unprepare() { - Ok(TransitionStatus::Complete { + assert_eq!( + task.unprepare().block_on().unwrap(), + Complete { origin: TaskState::Error, - target: TaskState::Unprepared, - }) => (), - other => panic!("unexpected {:?}", other), - } + target: Unprepared, + }, + ); - assert_eq!(task.state(), TaskState::Unprepared); + assert_eq!(task.state(), Unprepared); let _ = block_on(unprepared_receiver.next()); } @@ -1464,16 +1618,15 @@ mod tests { let task = Task::default(); - assert_eq!(task.state(), TaskState::Unprepared); + assert_eq!(task.state(), Unprepared); let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); - task.prepare( + let prepare_status = task.prepare( TaskPrepareTest { prepare_error_sender, }, context, - ) - .unwrap(); + ); gst::debug!( RUNTIME_CAT, @@ -1481,21 +1634,30 @@ mod tests { ); block_on(prepare_error_receiver.next()).unwrap(); + match prepare_status.block_on().unwrap_err() { + TransitionError { + trigger: Trigger::Error, + state: Preparing, + .. + } => (), + other => panic!("{:?}", other), + } + // Wait for state machine to reach Error while TaskState::Error != task.state() { std::thread::sleep(Duration::from_millis(2)); } - match task.start() { - Err(TransitionError { - trigger: Trigger::Start, + match task.start().block_on().unwrap_err() { + TransitionError { + trigger: Start, state: TaskState::Error, .. - }) => (), - other => unreachable!("{:?}", other), + } => (), + other => panic!("{:?}", other), } - task.unprepare().unwrap(); + block_on(task.unprepare()).unwrap(); } #[test] @@ -1549,65 +1711,67 @@ mod tests { let task = Task::default(); let (mut prepare_sender, prepare_receiver) = mpsc::channel(1); - task.prepare(TaskPrepareTest { prepare_receiver }, context) - .unwrap(); + let _ = task.prepare(TaskPrepareTest { prepare_receiver }, context); let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { - assert_eq!(task.state(), TaskState::Preparing); + assert_eq!(task.state(), Preparing); gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting"); - let ack_handle = match task.start() { - Ok(TransitionStatus::Async { - trigger: Trigger::Start, - origin: TaskState::Preparing, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), - }; + let start_status = task.start(); + match start_status { + Pending { + trigger: Start, + origin: Preparing, + .. + } => (), + other => panic!("{:?}", other), + } ready_sender.send(()).unwrap(); - match ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Prepared, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + start_status.await.unwrap(), + Complete { + origin: Prepared, + target: Started, + }, + ); + assert_eq!(task.state(), Started); - let ack_handle = match task.stop() { - Ok(TransitionStatus::Async { - trigger: Trigger::Stop, - origin: TaskState::Started, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), - }; - match ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Started, - target: TaskState::Stopped, - }) => (), - other => panic!("unexpected {:?}", other), + let stop_status = task.stop(); + match stop_status { + Pending { + trigger: Stop, + origin: Started, + .. + } => (), + other => panic!("{:?}", other), } - assert_eq!(task.state(), TaskState::Stopped); + assert_eq!( + stop_status.await.unwrap(), + Complete { + origin: Started, + target: Stopped, + }, + ); + assert_eq!(task.state(), Stopped); - let ack_handle = match task.unprepare() { - Ok(TransitionStatus::Async { - trigger: Trigger::Unprepare, - origin: TaskState::Stopped, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + let unprepare_status = task.unprepare(); + match unprepare_status { + Pending { + trigger: Unprepare, + origin: Stopped, + .. + } => (), + other => panic!("{:?}", other), }; - match ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Stopped, - target: TaskState::Unprepared, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Unprepared); + assert_eq!( + unprepare_status.await.unwrap(), + Complete { + origin: Stopped, + target: Unprepared, + }, + ); + assert_eq!(task.state(), Unprepared); }); gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx"); @@ -1686,55 +1850,56 @@ mod tests { let (mut prepare_sender, prepare_receiver) = mpsc::channel(1); let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); - let res = task.prepare( + let prepare_status = task.prepare( TaskPrepareTest { prepare_receiver, prepare_error_sender, }, context, ); - let prepare_ack = match res { - Ok(TransitionStatus::Async { - trigger: Trigger::Prepare, - origin: TaskState::Unprepared, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + match prepare_status { + Pending { + trigger: Prepare, + origin: Unprepared, + .. + } => (), + other => panic!("{:?}", other), }; let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)"); - task.start().unwrap(); + let _ = task.start(); ready_sender.send(()).unwrap(); // FIXME we loose the origin Trigger (Start) // and only get the Trigger returned by handle_action_error // see also: comment in exec_action! - match prepare_ack.await.unwrap() { + match prepare_status.await { Err(TransitionError { trigger: Trigger::Error, - state: TaskState::Preparing, + state: Preparing, .. }) => (), - other => panic!("unexpected transition res {:?}", other), + other => panic!("{:?}", other), } - let ack_handle = match task.unprepare().unwrap() { - TransitionStatus::Async { - trigger: Trigger::Unprepare, + let unprepare_status = task.unprepare(); + match unprepare_status { + Pending { + trigger: Unprepare, origin: TaskState::Error, - ack_handle, - } => ack_handle, - other => panic!("unexpected {:?}", other), + .. + } => (), + other => panic!("{:?}", other), }; - match ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { + assert_eq!( + unprepare_status.await.unwrap(), + Complete { origin: TaskState::Error, - target: TaskState::Unprepared, - }) => (), - other => panic!("unexpected {:?}", other), - } + target: Unprepared, + }, + ); }); gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx"); @@ -1797,42 +1962,41 @@ mod tests { let (iterate_sender, mut iterate_receiver) = mpsc::channel(1); let (mut complete_sender, complete_receiver) = mpsc::channel(0); let (paused_sender, mut paused_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskPauseStartTest { iterate_sender, complete_receiver, paused_sender, }, context, - ) - .unwrap(); + ); gst::debug!(RUNTIME_CAT, "pause_start: starting"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Prepared, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Prepared, + target: Started, + }, + ); + assert_eq!(task.state(), Started); gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration"); block_on(iterate_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)"); match task.pause() { - Ok(TransitionStatus::NotWaiting { - trigger: Trigger::Pause, - origin: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), + Ready(Ok(NotWaiting { + trigger: Pause, + origin: Started, + })) => (), + other => panic!("{:?}", other), } gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion"); complete_sender.try_send(()).unwrap(); - // Pause transition is asynchronous + // Pause transition is asynchronous FIXME while TaskState::Paused != task.state() { std::thread::sleep(Duration::from_millis(5)); } @@ -1842,14 +2006,14 @@ mod tests { // Loop held on due to Pause iterate_receiver.try_next().unwrap_err(); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Paused, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Paused, + target: Started, + }, + ); + assert_eq!(task.state(), Started); gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration"); block_on(iterate_receiver.next()).unwrap(); @@ -1857,8 +2021,7 @@ mod tests { gst::debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion"); complete_sender.try_send(()).unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -1887,27 +2050,25 @@ mod tests { let task = Task::default(); let (iterate_sender, mut iterate_receiver) = mpsc::channel(1); - task.prepare(TaskPauseStartTest { iterate_sender }, context) - .unwrap(); + let _ = task.prepare(TaskPauseStartTest { iterate_sender }, context); gst::debug!(RUNTIME_CAT, "successive_pause_start: starting"); - task.start().unwrap(); + block_on(task.start()).unwrap(); gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1"); block_on(iterate_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "successive_pause_start: pause and start"); - task.pause().unwrap(); - task.start().unwrap(); + let _ = task.pause(); + block_on(task.start()).unwrap(); - assert_eq!(task.state(), TaskState::Started); + assert_eq!(task.state(), Started); gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2"); block_on(iterate_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "successive_pause_start: stopping"); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -1949,45 +2110,43 @@ mod tests { let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { flush_start_sender, flush_stop_sender, }, context, - ) - .unwrap(); + ); gst::debug!(RUNTIME_CAT, "flush_regular_sync: start"); - task.start().unwrap(); + block_on(task.start()).unwrap(); gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush"); - match task.flush_start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Started, - target: TaskState::Flushing, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Flushing); + assert_eq!( + task.flush_start().block_on().unwrap(), + Complete { + origin: Started, + target: Flushing, + }, + ); + assert_eq!(task.state(), Flushing); block_on(flush_start_receiver.next()).unwrap(); gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush"); - match task.flush_stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.flush_stop().await_maybe_on_context().unwrap(), + Complete { + origin: Flushing, + target: Started, + }, + ); + assert_eq!(task.state(), Started); block_on(flush_stop_receiver.next()).unwrap(); - task.pause().unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + let _ = task.pause(); + stop_then_unprepare(task); } #[test] @@ -2037,17 +2196,16 @@ mod tests { let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { flush_start_sender, flush_stop_sender, }, context, - ) - .unwrap(); + ); gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start"); - task.start().unwrap(); + task.start().block_on().unwrap(); let oob_context = Context::acquire( "flush_regular_different_context_oob", @@ -2056,48 +2214,51 @@ mod tests { .unwrap(); let task_clone = task.clone(); - let flush_handle = oob_context.spawn(async move { - let flush_ack_handle = match task_clone.flush_start() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStart, - origin: TaskState::Started, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + let flush_res_fut = oob_context.spawn(async move { + let flush_start_status = task_clone.flush_start(); + match flush_start_status { + Pending { + trigger: FlushStart, + origin: Started, + .. + } => (), + other => panic!("{:?}", other), }; - match flush_ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Started, - target: TaskState::Flushing, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task_clone.state(), TaskState::Flushing); + assert_eq!( + flush_start_status.await.unwrap(), + Complete { + origin: Started, + target: Flushing, + }, + ); + assert_eq!(task_clone.state(), Flushing); flush_start_receiver.next().await.unwrap(); - let flush_stop_ack_handle = match task_clone.flush_stop() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStop, - origin: TaskState::Flushing, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + let flush_stop_status = task_clone.flush_stop(); + match flush_stop_status { + Pending { + trigger: FlushStop, + origin: Flushing, + .. + } => (), + other => panic!("{:?}", other), }; - match flush_stop_ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task_clone.state(), TaskState::Started); + assert_eq!( + flush_stop_status.await_maybe_on_context().unwrap(), + NotWaiting { + trigger: FlushStop, + origin: Flushing, + }, + ); + + Context::drain_sub_tasks().await.unwrap(); + assert_eq!(task_clone.state(), Started); }); - block_on(flush_handle).unwrap(); + block_on(flush_res_fut).unwrap(); block_on(flush_stop_receiver.next()).unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2141,60 +2302,60 @@ mod tests { let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { flush_start_sender, flush_stop_sender, }, context.clone(), - ) - .unwrap(); + ); - task.start().unwrap(); + block_on(task.start()).unwrap(); let task_clone = task.clone(); let flush_handle = context.spawn(async move { - let flush_ack_handle = match task_clone.flush_start() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStart, - origin: TaskState::Started, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + let flush_start_status = task_clone.flush_start(); + match flush_start_status { + Pending { + trigger: FlushStart, + origin: Started, + .. + } => (), + other => panic!("{:?}", other), }; - match flush_ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Started, - target: TaskState::Flushing, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task_clone.state(), TaskState::Flushing); + assert_eq!( + flush_start_status.await.unwrap(), + Complete { + origin: Started, + target: Flushing, + }, + ); + assert_eq!(task_clone.state(), Flushing); flush_start_receiver.next().await.unwrap(); - let flush_stop_ack_handle = match task_clone.flush_stop() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStop, - origin: TaskState::Flushing, - ack_handle, - }) => ack_handle, - other => panic!("unexpected {:?}", other), + let flush_stop_status = task_clone.flush_stop(); + match flush_stop_status { + Pending { + trigger: FlushStop, + origin: Flushing, + .. + } => (), + other => panic!("{:?}", other), }; - match flush_stop_ack_handle.await.unwrap() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task_clone.state(), TaskState::Started); + assert_eq!( + flush_stop_status.await.unwrap(), + Complete { + origin: Flushing, + target: Started, + }, + ); + assert_eq!(task_clone.state(), Started); }); block_on(flush_handle).unwrap(); block_on(flush_stop_receiver.next()).unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2211,14 +2372,16 @@ mod tests { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration"); - match self.task.flush_start() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStart, - origin: TaskState::Started, + let flush_status = self.task.flush_start(); + match flush_status { + Pending { + trigger: FlushStart, + origin: Started, .. - }) => (), - other => panic!("unexpected {:?}", other), + } => (), + other => panic!("{:?}", other), } + flush_status.await.unwrap(); Ok(()) } .boxed() @@ -2239,16 +2402,15 @@ mod tests { let task = Task::default(); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { task: task.clone(), flush_start_sender, }, context, - ) - .unwrap(); + ); - task.start().unwrap(); + let _ = task.start(); gst::debug!( RUNTIME_CAT, @@ -2256,14 +2418,14 @@ mod tests { ); block_on(flush_start_receiver.next()).unwrap(); - match task.stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Stopped, - }) => (), - other => panic!("unexpected {:?}", other), - } - task.unprepare().unwrap(); + assert_eq!( + task.stop().block_on().unwrap(), + Complete { + origin: Flushing, + target: Stopped, + }, + ); + task.unprepare().block_on().unwrap(); } #[test] @@ -2286,11 +2448,11 @@ mod tests { gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration"); match self.task.pause() { - Ok(TransitionStatus::NotWaiting { - trigger: Trigger::Pause, - origin: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), + Ready(Ok(TransitionOk::NotWaiting { + trigger: Pause, + origin: Started, + })) => (), + other => panic!("{:?}", other), } Ok(()) @@ -2313,22 +2475,20 @@ mod tests { let task = Task::default(); let (pause_sender, mut pause_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskStartTest { task: task.clone(), pause_sender, }, context, - ) - .unwrap(); + ); - task.start().unwrap(); + let _ = task.start(); gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification"); block_on(pause_receiver.next()).unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2353,12 +2513,12 @@ mod tests { "trigger_from_action: flush_start triggering flush_stop" ); match self.task.flush_stop() { - Ok(TransitionStatus::Async { - trigger: Trigger::FlushStop, - origin: TaskState::Started, + Pending { + trigger: FlushStop, + origin: Started, .. - }) => (), - other => panic!("unexpected {:?}", other), + } => (), + other => panic!("{:?}", other), } Ok(()) @@ -2381,17 +2541,16 @@ mod tests { let task = Task::default(); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { task: task.clone(), flush_stop_sender, }, context, - ) - .unwrap(); + ); - task.start().unwrap(); - task.flush_start().unwrap(); + task.start().block_on().unwrap(); + let _ = task.flush_start(); gst::debug!( RUNTIME_CAT, @@ -2399,8 +2558,7 @@ mod tests { ); block_on(flush_stop_receiver.next()).unwrap(); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2453,65 +2611,63 @@ mod tests { let (started_sender, mut started_receiver) = mpsc::channel(1); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { started_sender, flush_start_sender, flush_stop_sender, }, context, - ) - .unwrap(); + ); // Pause, FlushStart, FlushStop, Start gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing"); - match task.pause() { - Ok(TransitionStatus::Complete { - origin: TaskState::Prepared, - target: TaskState::Paused, - }) => (), - other => panic!("unexpected {:?}", other), - } + assert_eq!( + task.pause().block_on().unwrap(), + Complete { + origin: Prepared, + target: Paused, + }, + ); gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush"); - match task.flush_start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Paused, - target: TaskState::PausedFlushing, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::PausedFlushing); + assert_eq!( + task.flush_start().block_on().unwrap(), + Complete { + origin: Paused, + target: PausedFlushing, + }, + ); + assert_eq!(task.state(), PausedFlushing); block_on(flush_start_receiver.next()); gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush"); - match task.flush_stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::PausedFlushing, - target: TaskState::Paused, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Paused); + assert_eq!( + task.flush_stop().block_on().unwrap(), + Complete { + origin: PausedFlushing, + target: Paused, + }, + ); + assert_eq!(task.state(), Paused); block_on(flush_stop_receiver.next()); // start action not executed started_receiver.try_next().unwrap_err(); gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::Paused, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: Paused, + target: Started, + }, + ); + assert_eq!(task.state(), Started); block_on(started_receiver.next()); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2564,53 +2720,51 @@ mod tests { let (started_sender, mut started_receiver) = mpsc::channel(1); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskFlushTest { started_sender, flush_start_sender, flush_stop_sender, }, context, - ) - .unwrap(); + ); // Pause, FlushStart, Start, FlushStop gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing"); - task.pause().unwrap(); + let _ = task.pause(); gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush"); - task.flush_start().unwrap(); - assert_eq!(task.state(), TaskState::PausedFlushing); + block_on(task.flush_start()).unwrap(); + assert_eq!(task.state(), PausedFlushing); block_on(flush_start_receiver.next()); gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing"); - match task.start() { - Ok(TransitionStatus::Complete { - origin: TaskState::PausedFlushing, - target: TaskState::Flushing, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Flushing); + assert_eq!( + task.start().block_on().unwrap(), + Complete { + origin: PausedFlushing, + target: Flushing, + }, + ); + assert_eq!(task.state(), Flushing); // start action not executed started_receiver.try_next().unwrap_err(); gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush"); - match task.flush_stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.flush_stop().block_on().unwrap(), + Complete { + origin: Flushing, + target: Started, + }, + ); + assert_eq!(task.state(), Started); block_on(flush_stop_receiver.next()); block_on(started_receiver.next()); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2654,20 +2808,19 @@ mod tests { let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); - task.prepare( + let _ = task.prepare( TaskStartTest { flush_start_sender, flush_stop_sender, }, context, - ) - .unwrap(); + ); let oob_context = Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap(); let task_clone = task.clone(); - task.pause().unwrap(); + block_on(task.pause()).unwrap(); // Launch flush_start // start let (ready_sender, ready_receiver) = oneshot::channel(); @@ -2675,21 +2828,21 @@ mod tests { let flush_start_handle = oob_context.spawn(async move { gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start"); ready_sender.send(()).unwrap(); - let res = task_clone.flush_start().unwrap(); - let ack_handle = match res { - TransitionStatus::Async { - trigger: Trigger::FlushStart, - origin: TaskState::Paused, - ack_handle, - } => ack_handle, - TransitionStatus::Async { - trigger: Trigger::FlushStart, - origin: TaskState::Started, - ack_handle, - } => ack_handle, - other => panic!("unexpected {:?}", other), + let status = task_clone.flush_start(); + match status { + Pending { + trigger: FlushStart, + origin: Paused, + .. + } => (), + Pending { + trigger: FlushStart, + origin: Started, + .. + } => (), + other => panic!("{:?}", other), }; - ack_handle.await.unwrap().unwrap(); + status.await.unwrap(); flush_start_receiver.next().await.unwrap(); }); @@ -2700,34 +2853,32 @@ mod tests { block_on(ready_receiver).unwrap(); gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start"); - let res = task.start().unwrap(); - match res { - TransitionStatus::Complete { - origin: TaskState::Paused, - target: TaskState::Started, - } => (), - TransitionStatus::Complete { - origin: TaskState::PausedFlushing, - target: TaskState::Flushing, - } => (), - other => panic!("unexpected {:?}", other), + match block_on(task.start()) { + Ok(TransitionOk::Complete { + origin: Paused, + target: Started, + }) => (), + Ok(TransitionOk::Complete { + origin: PausedFlushing, + target: Flushing, + }) => (), + other => panic!("{:?}", other), } block_on(flush_start_handle).unwrap(); gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop"); - match task.flush_stop() { - Ok(TransitionStatus::Complete { - origin: TaskState::Flushing, - target: TaskState::Started, - }) => (), - other => panic!("unexpected {:?}", other), - } - assert_eq!(task.state(), TaskState::Started); + assert_eq!( + task.flush_stop().block_on().unwrap(), + Complete { + origin: Flushing, + target: Started, + }, + ); + assert_eq!(task.state(), Started); block_on(flush_stop_receiver.next()); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } #[test] @@ -2772,22 +2923,20 @@ mod tests { let task = Task::default(); let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel(); - task.prepare( + let _ = task.prepare( TaskTimerTest { timer: None, timer_elapsed_sender: Some(timer_elapsed_sender), }, context, - ) - .unwrap(); + ); gst::debug!(RUNTIME_CAT, "start_timer: start"); - task.start().unwrap(); + let _ = task.start(); block_on(timer_elapsed_receiver).unwrap(); gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received"); - task.stop().unwrap(); - task.unprepare().unwrap(); + stop_then_unprepare(task); } }