diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 55908618..abf23e1b 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -22,9 +22,8 @@ use futures::channel::mpsc as async_mpsc; use futures::channel::oneshot; -use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture}; +use futures::future::{self, BoxFuture}; use futures::prelude::*; -use futures::stream::StreamExt; use std::fmt; use std::ops::Deref; @@ -73,7 +72,7 @@ pub enum TransitionOk { /// This is to prevent: /// - A deadlock when executing a transition action. /// - A potential infinite wait when pausing a running loop - /// which could be awaiting for an `iterate` to complete. + /// which could be awaiting for an `nominal` to complete. NotWaiting { trigger: Trigger, origin: TaskState }, /// Skipping triggering event due to current state. Skipped { trigger: Trigger, state: TaskState }, @@ -319,6 +318,8 @@ impl fmt::Debug for TransitionStatus { /// /// Defines implementations for state transition actions and error handlers. pub trait TaskImpl: Send + 'static { + type Item: Send + 'static; + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { future::ok(()).boxed() } @@ -331,8 +332,31 @@ pub trait TaskImpl: Send + 'static { future::ok(()).boxed() } - /// Executes an iteration in `TaskState::Started`. - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>>; + /// Tries to retrieve the next item to process. + /// + /// With [`Self::handle_item`], this is one of the two `Task` loop + /// functions. They are executed in a loop in the `Started` state. + /// + /// Function `try_next` is awaited at the beginning of each iteration, + /// and can be cancelled at `await` point if a state transition is requested. + /// + /// If `Ok(item)` is returned, the iteration calls [`Self::handle_item`] + /// with said `Item`. + /// + /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`]. + fn try_next(&mut self) -> BoxFuture<'_, Result>; + + /// Does whatever needs to be done with the `item`. + /// + /// With [`Self::try_next`], this is one of the two `Task` loop + /// functions. They are executed in a loop in the `Started` state. + /// + /// Function `handle_item` asynchronously processes an `item` previously + /// retrieved by [`Self::try_next`]. Processing is guaranteed to run + /// to completion even if a state transition is requested. + /// + /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`]. + fn handle_item(&mut self, _item: Self::Item) -> BoxFuture<'_, Result<(), gst::FlowError>>; fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { future::ok(()).boxed() @@ -350,12 +374,13 @@ pub trait TaskImpl: Send + 'static { future::ok(()).boxed() } - /// Handles an error occuring during the execution of an iteration. + /// Handles an error occuring during the execution of the `Task` loop. /// - /// This handler also catches errors returned by subtasks spawned by the iteration. + /// This include errors returned by [`Self::try_next`], [`Self::handle_item`] + /// as well as errors returned by subtasks drain at the end of an iteration. /// - /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message` - /// and return `Trigger::Error`. + /// If the error is unrecoverable, implementations might use + /// `gst::Element::post_error_message` and return `Trigger::Error`. /// /// Otherwise, handle the error and return the requested `Transition` to recover. /// @@ -364,26 +389,22 @@ pub trait TaskImpl: Send + 'static { /// - `FlowError::Flushing` -> `Trigger::FlushStart`. /// - `FlowError::Eos` -> `Trigger::Stop`. /// - Other `FlowError` -> `Trigger::Error`. - fn handle_iterate_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> { + fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> { async move { match err { gst::FlowError::Flushing => { gst::debug!( RUNTIME_CAT, - "TaskImpl iterate returned Flushing. Posting FlushStart" + "Task loop returned Flushing. Posting FlushStart" ); Trigger::FlushStart } gst::FlowError::Eos => { - gst::debug!(RUNTIME_CAT, "TaskImpl iterate returned Eos. Posting Stop"); + gst::debug!(RUNTIME_CAT, "Task loop returned Eos. Posting Stop"); Trigger::Stop } other => { - gst::error!( - RUNTIME_CAT, - "TaskImpl iterate returned {:?}. Posting Error", - other - ); + gst::error!(RUNTIME_CAT, "Task loop returned {:?}. Posting Error", other); Trigger::Error } } @@ -497,7 +518,6 @@ impl StateMachineHandle { struct TaskInner { state: TaskState, state_machine_handle: Option, - loop_abort_handle: Option, } impl Default for TaskInner { @@ -505,7 +525,6 @@ impl Default for TaskInner { TaskInner { state: TaskState::Unprepared, state_machine_handle: None, - loop_abort_handle: None, } } } @@ -568,12 +587,6 @@ impl TaskInner { } }) } - - fn abort_task_loop(&mut self) { - if let Some(loop_abort_handle) = self.loop_abort_handle.take() { - loop_abort_handle.abort(); - } - } } impl Drop for TaskInner { @@ -713,7 +726,6 @@ impl Task { } }; - inner.abort_task_loop(); let ack_rx = state_machine_handle.trigger(Trigger::Unprepare); drop(inner); @@ -735,11 +747,6 @@ impl Task { pub fn start(&self) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); - let ack_rx = match inner.trigger(Trigger::Start) { - Ok(ack_rx) => ack_rx, - Err(err) => return err.into(), - }; - if let TaskState::Started = inner.state { return TransitionOk::Skipped { trigger: Trigger::Start, @@ -748,6 +755,11 @@ impl Task { .into(); } + let ack_rx = match inner.trigger(Trigger::Start) { + Ok(ack_rx) => ack_rx, + Err(err) => return err.into(), + }; + let origin = inner.state; drop(inner); @@ -760,59 +772,29 @@ impl Task { /// Requests the `Task` loop to pause. /// - /// If an iteration is in progress, it will run to completion, - /// then no more iteration will be executed before `start` is called again. - /// Therefore, it is not guaranteed that `Paused` is reached when `pause` returns. + /// If an item handling is in progress, it will run to completion, + /// then no iterations will be executed before `start` is called again. pub fn pause(&self) -> TransitionStatus { - let mut inner = self.0.lock().unwrap(); - - let ack_rx = match inner.trigger(Trigger::Pause) { - Ok(ack_rx) => ack_rx, - Err(err) => return err.into(), - }; - - if let TaskState::Started = inner.state { - // FIXME this could be async when iterate is split into next_item / handle_item - return TransitionOk::NotWaiting { - trigger: Trigger::Pause, - origin: TaskState::Started, - } - .into(); - } - - let origin = inner.state; - drop(inner); - - TransitionStatus::Pending { - trigger: Trigger::Pause, - origin, - res_fut: Box::pin(ack_rx.map(Result::unwrap)), - } + self.push_pending(Trigger::Pause) } pub fn flush_start(&self) -> TransitionStatus { - self.abort_push_await(Trigger::FlushStart) + self.push_pending(Trigger::FlushStart) } pub fn flush_stop(&self) -> TransitionStatus { - self.abort_push_await(Trigger::FlushStop) + self.push_pending(Trigger::FlushStop) } /// Stops the `Started` `Task` and wait for it to finish. pub fn stop(&self) -> TransitionStatus { - self.abort_push_await(Trigger::Stop) + self.push_pending(Trigger::Stop) } - /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP. - /// - /// This function: - /// - Aborts the iteration loop aborts. - /// - Pushes the provided [`Trigger`]. - /// - Awaits for the expected transition as usual. - fn abort_push_await(&self, trigger: Trigger) -> TransitionStatus { + /// Pushes a [`Trigger`] and returns TransitionStatus::Pending. + fn push_pending(&self, trigger: Trigger) -> TransitionStatus { let mut inner = self.0.lock().unwrap(); - inner.abort_task_loop(); let ack_rx = match inner.trigger(trigger) { Ok(ack_rx) => ack_rx, Err(err) => return err.into(), @@ -829,8 +811,8 @@ impl Task { } } -struct StateMachine { - task_impl: Box, +struct StateMachine { + task_impl: Box>, triggering_evt_rx: async_mpsc::Receiver, pending_triggering_evt: Option, } @@ -886,12 +868,12 @@ macro_rules! exec_action { }}; } -impl StateMachine { +impl StateMachine { // Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization // without inducing any significant performance penalties. fn spawn( task_inner: Arc>, - task_impl: Box, + task_impl: Box>, context: Context, ) -> StateMachineHandle { let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4); @@ -910,8 +892,6 @@ impl StateMachine { } async fn run(mut self, task_inner: Arc>) { - let context = Context::current().expect("must be spawed on a Context"); - let mut triggering_evt = self .triggering_evt_rx .next() @@ -953,7 +933,6 @@ impl StateMachine { .await .expect("triggering_evt_rx dropped"), }; - gst::trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt); match triggering_evt.trigger { @@ -990,7 +969,7 @@ impl StateMachine { origin }; - self = Self::start(self, triggering_evt, origin, &task_inner, &context).await; + self.start(triggering_evt, origin, &task_inner).await; // next/pending triggering event handled in next iteration } Trigger::Pause => { @@ -1109,8 +1088,7 @@ impl StateMachine { .switch_to_state(TaskState::Paused, triggering_evt); gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused"); } else { - self = Self::start(self, triggering_evt, origin, &task_inner, &context) - .await; + self.start(triggering_evt, origin, &task_inner).await; // next/pending triggering event handled in next iteration } } @@ -1145,62 +1123,48 @@ impl StateMachine { } async fn start( - mut self, + &mut self, mut triggering_evt: TriggeringEvent, origin: TaskState, task_inner: &Arc>, - context: &Context, - ) -> Self { - let triggering_evt = match exec_action!(self, start, triggering_evt, origin, &task_inner) { - Ok(triggering_evt) => triggering_evt, - Err(_) => return self, - }; - - let task_inner_cl = Arc::clone(task_inner); - let loop_fut = async move { - let (abortable_task_loop, loop_abort_handle) = - abortable(self.run_loop(Arc::clone(&task_inner_cl))); - - { - let mut task_inner = task_inner_cl.lock().unwrap(); - task_inner.loop_abort_handle = Some(loop_abort_handle); + ) { + match exec_action!(self, start, triggering_evt, origin, &task_inner) { + Ok(triggering_evt) => { + let mut task_inner = task_inner.lock().unwrap(); task_inner.switch_to_state(TaskState::Started, triggering_evt); - - gst::trace!(RUNTIME_CAT, "Starting task loop"); } - - match abortable_task_loop.await { - Ok(Ok(())) => (), - Ok(Err(err)) => { - let next_trigger = self.task_impl.handle_iterate_error(err).await; - let (triggering_evt, _) = TriggeringEvent::new(next_trigger); - self.pending_triggering_evt = Some(triggering_evt); - } - Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"), + Err(_) => { + // error handled by exec_action + return; } + } - self - }; - - context.spawn_and_unpark(loop_fut).await.unwrap() + match self.run_loop().await { + Ok(()) => (), + Err(err) => { + let next_trigger = self.task_impl.handle_loop_error(err).await; + let (triggering_evt, _) = TriggeringEvent::new(next_trigger); + self.pending_triggering_evt = Some(triggering_evt); + } + } } - async fn run_loop(&mut self, task_inner: Arc>) -> Result<(), gst::FlowError> { + async fn run_loop(&mut self) -> Result<(), gst::FlowError> { gst::trace!(RUNTIME_CAT, "Task loop started"); + let mut item; loop { - while let Ok(Some(triggering_evt)) = self.triggering_evt_rx.try_next() { - gst::trace!(RUNTIME_CAT, "Task loop popped {:?}", triggering_evt); - - match triggering_evt.trigger { - Trigger::Start => { - task_inner - .lock() - .unwrap() - .skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped Start in state Started"); - } - _ => { + item = { + // select_biased requires the selected futures to implement + // `FusedFuture`. Because async trait functions are not stable, + // we use `BoxFuture` for the `TaskImpl` function, including + // `try_next`. Since we need to get a new `BoxFuture` at + // each iteration, we can guarantee that the future is + // always valid for use in `select_biased`. + let mut try_next_fut = self.task_impl.try_next().fuse(); + futures::select_biased! { + triggering_evt = self.triggering_evt_rx.next() => { + let triggering_evt = triggering_evt.expect("broken state machine channel"); gst::trace!( RUNTIME_CAT, "Task loop handing {:?} to state machine", @@ -1209,19 +1173,22 @@ impl StateMachine { self.pending_triggering_evt = Some(triggering_evt); return Ok(()); } + try_next_res = try_next_fut => try_next_res.map_err(|err| { + gst::debug!(RUNTIME_CAT, "TaskImpl::try_next returned {:?}", err); + err + })?, } - } + }; - // Run the iteration function - self.task_impl.iterate().await.map_err(|err| { - gst::log!(RUNTIME_CAT, "Task loop iterate impl returned {:?}", err); + self.task_impl.handle_item(item).await.map_err(|err| { + gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err); err })?; while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); + gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); Context::drain_sub_tasks().await.map_err(|err| { - gst::log!(RUNTIME_CAT, "Task loop iterate subtask returned {:?}", err); + gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); err })?; } @@ -1245,24 +1212,27 @@ mod tests { } #[test] - fn iterate() { + fn nominal() { gst::init().unwrap(); struct TaskTest { prepared_sender: mpsc::Sender<()>, started_sender: mpsc::Sender<()>, - iterate_sender: mpsc::Sender<()>, - complete_iterate_receiver: mpsc::Receiver>, + try_next_ready_sender: mpsc::Sender<()>, + try_next_receiver: mpsc::Receiver<()>, + handle_item_ready_sender: mpsc::Sender<()>, + handle_item_sender: mpsc::Sender<()>, paused_sender: mpsc::Sender<()>, stopped_sender: mpsc::Sender<()>, - flush_start_sender: mpsc::Sender<()>, unprepared_sender: mpsc::Sender<()>, } impl TaskImpl for TaskTest { + type Item = (); + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "iterate: prepared"); + gst::debug!(RUNTIME_CAT, "nominal: prepared"); self.prepared_sender.send(()).await.unwrap(); Ok(()) } @@ -1271,39 +1241,41 @@ mod tests { fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "iterate: started"); + gst::debug!(RUNTIME_CAT, "nominal: started"); self.started_sender.send(()).await.unwrap(); Ok(()) } .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - gst::debug!(RUNTIME_CAT, "iterate: entering iterate"); - self.iterate_sender.send(()).await.unwrap(); + gst::debug!(RUNTIME_CAT, "nominal: entering try_next"); + self.try_next_ready_sender.send(()).await.unwrap(); + gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next"); + self.try_next_receiver.next().await.unwrap(); + Ok(()) + } + .boxed() + } - gst::debug!(RUNTIME_CAT, "iterate: awaiting complete_iterate_receiver"); + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + gst::debug!(RUNTIME_CAT, "nominal: entering handle_item"); + self.handle_item_ready_sender.send(()).await.unwrap(); - let res = self.complete_iterate_receiver.next().await.unwrap(); - if res.is_ok() { - gst::debug!(RUNTIME_CAT, "iterate: received Ok => keep looping"); - } else { - gst::debug!( - RUNTIME_CAT, - "iterate: received {:?} => cancelling loop", - res - ); - } + gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item"); + self.handle_item_sender.send(()).await.unwrap(); + gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item"); - res + Ok(()) } .boxed() } fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "iterate: paused"); + gst::debug!(RUNTIME_CAT, "nominal: paused"); self.paused_sender.send(()).await.unwrap(); Ok(()) } @@ -1312,56 +1284,49 @@ mod tests { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::debug!(RUNTIME_CAT, "iterate: stopped"); + gst::debug!(RUNTIME_CAT, "nominal: stopped"); self.stopped_sender.send(()).await.unwrap(); Ok(()) } .boxed() } - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::debug!(RUNTIME_CAT, "iterate: stopped"); - self.flush_start_sender.send(()).await.unwrap(); - Ok(()) - } - .boxed() - } - fn unprepare(&mut self) -> BoxFuture<'_, ()> { async move { - gst::debug!(RUNTIME_CAT, "iterate: unprepared"); + gst::debug!(RUNTIME_CAT, "nominal: unprepared"); self.unprepared_sender.send(()).await.unwrap(); } .boxed() } } - let context = Context::acquire("iterate", Duration::from_millis(2)).unwrap(); + let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap(); let task = Task::default(); assert_eq!(task.state(), Unprepared); - gst::debug!(RUNTIME_CAT, "iterate: preparing"); + gst::debug!(RUNTIME_CAT, "nominal: preparing"); let (prepared_sender, mut prepared_receiver) = mpsc::channel(1); let (started_sender, mut started_receiver) = mpsc::channel(1); - let (iterate_sender, mut iterate_receiver) = mpsc::channel(1); - let (mut complete_iterate_sender, complete_iterate_receiver) = mpsc::channel(1); + let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1); + let (mut try_next_sender, try_next_receiver) = mpsc::channel(1); + let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1); + let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0); let (paused_sender, mut paused_receiver) = mpsc::channel(1); let (stopped_sender, mut stopped_receiver) = mpsc::channel(1); - let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1); let prepare_status = task.prepare( TaskTest { prepared_sender, started_sender, - iterate_sender, - complete_iterate_receiver, + try_next_ready_sender, + try_next_receiver, + handle_item_ready_sender, + handle_item_sender, paused_sender, stopped_sender, - flush_start_sender, unprepared_sender, }, context, @@ -1377,18 +1342,10 @@ mod tests { other => panic!("{:?}", other), }; - gst::debug!(RUNTIME_CAT, "iterate: starting (async prepare)"); - // also tests await_maybe_on_context - assert_eq!( - task.start().await_maybe_on_context().unwrap(), - Complete { - origin: Prepared, - target: Started, - } - ); - assert_eq!(task.state(), Started); + gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)"); + let start_status = task.start().check().unwrap(); - // At this point, preparation must be complete + block_on(prepared_receiver.next()).unwrap(); // also tests await_maybe_on_context assert_eq!( prepare_status.await_maybe_on_context().unwrap(), @@ -1397,17 +1354,24 @@ mod tests { target: Prepared, }, ); - block_on(prepared_receiver.next()).unwrap(); - // ... and start executed - block_on(started_receiver.next()).unwrap(); + block_on(started_receiver.next()).unwrap(); + assert_eq!( + start_status.block_on().unwrap(), + Complete { + origin: Prepared, + target: Started, + } + ); assert_eq!(task.state(), Started); // unlock task loop and keep looping - block_on(iterate_receiver.next()).unwrap(); - block_on(complete_iterate_sender.send(Ok(()))).unwrap(); + block_on(try_next_ready_receiver.next()).unwrap(); + block_on(try_next_sender.send(())).unwrap(); + block_on(handle_item_ready_receiver.next()).unwrap(); + block_on(handle_item_receiver.next()).unwrap(); - gst::debug!(RUNTIME_CAT, "iterate: starting (redundant)"); + gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)"); // already started assert_eq!( task.start().block_on().unwrap(), @@ -1428,45 +1392,81 @@ mod tests { other => panic!("{:?}", other), } - gst::debug!(RUNTIME_CAT, "iterate: pause (initial)"); - let pause_status = task.pause(); - assert!(pause_status.is_ready()); - // also tests `check` - match pause_status.check().unwrap() { - Ready(Ok(NotWaiting { - trigger: Pause, - origin: Started, - })) => (), - other => panic!("{:?}", other), - } + gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next"); + block_on(try_next_ready_receiver.next()).unwrap(); - // Pause transition is asynchronous FIXME - while TaskState::Paused != task.state() { - std::thread::sleep(Duration::from_millis(2)); - - if let Ok(Some(())) = iterate_receiver.try_next() { - // unlock iteration - block_on(complete_iterate_sender.send(Ok(()))).unwrap(); - } - } - - gst::debug!(RUNTIME_CAT, "iterate: awaiting pause ack"); + let pause_status = task.pause().check().unwrap(); + gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack"); block_on(paused_receiver.next()).unwrap(); - - gst::debug!(RUNTIME_CAT, "iterate: starting (after pause)"); assert_eq!( - task.start().block_on().unwrap(), + pause_status.block_on().unwrap(), + Complete { + origin: Started, + target: Paused, + }, + ); + + // handle_item not reached + assert!(handle_item_ready_receiver.try_next().is_err()); + // try_next not reached again + assert!(try_next_ready_receiver.try_next().is_err()); + + gst::debug!( + RUNTIME_CAT, + "nominal: starting (after pause cancelling try_next)" + ); + let start_receiver = task.start().check().unwrap(); + block_on(started_receiver.next()).unwrap(); + assert_eq!( + start_receiver.block_on().unwrap(), Complete { origin: Paused, target: Started, }, ); - assert_eq!(task.state(), Started); - // Paused -> Started - let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "iterate: stopping"); + gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item"); + block_on(try_next_ready_receiver.next()).unwrap(); + block_on(try_next_sender.send(())).unwrap(); + // Make sure item is picked + block_on(handle_item_ready_receiver.next()).unwrap(); + + gst::debug!(RUNTIME_CAT, "nominal: requesting to pause"); + let pause_status = task.pause().check().unwrap(); + + gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling"); + block_on(handle_item_receiver.next()).unwrap(); + + gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack"); + block_on(paused_receiver.next()).unwrap(); + assert_eq!( + pause_status.block_on().unwrap(), + Complete { + origin: Started, + target: Paused, + }, + ); + + // try_next not reached again + assert!(try_next_ready_receiver.try_next().is_err()); + + gst::debug!( + RUNTIME_CAT, + "nominal: starting (after pause // handle_item)" + ); + let start_receiver = task.start().check().unwrap(); + block_on(started_receiver.next()).unwrap(); + assert_eq!( + start_receiver.block_on().unwrap(), + Complete { + origin: Paused, + target: Started, + }, + ); + assert_eq!(task.state(), Started); + + gst::debug!(RUNTIME_CAT, "nominal: stopping"); assert_eq!( task.stop().block_on().unwrap(), Complete { @@ -1479,9 +1479,9 @@ mod tests { let _ = block_on(stopped_receiver.next()); // purge remaining iteration received before stop if any - let _ = iterate_receiver.try_next(); + let _ = try_next_ready_receiver.try_next(); - gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)"); + gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)"); assert_eq!( task.start().block_on().unwrap(), Complete { @@ -1491,73 +1491,19 @@ mod tests { ); let _ = block_on(started_receiver.next()); - gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Eos"); - block_on(iterate_receiver.next()).unwrap(); - block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap(); - - gst::debug!(RUNTIME_CAT, "iterate: awaiting stop ack"); - block_on(stopped_receiver.next()).unwrap(); - - // Wait for state machine to reach Stopped - while TaskState::Stopped != task.state() { - std::thread::sleep(Duration::from_millis(2)); - } - - gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)"); + gst::debug!(RUNTIME_CAT, "nominal: stopping"); assert_eq!( - task.start().block_on().unwrap(), + task.stop().block_on().unwrap(), Complete { - origin: Stopped, - target: Started, + origin: Started, + target: Stopped, }, ); - let _ = block_on(started_receiver.next()); - - gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Flushing"); - block_on(iterate_receiver.next()).unwrap(); - block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap(); - - gst::debug!(RUNTIME_CAT, "iterate: awaiting flush_start ack"); - block_on(flush_start_receiver.next()).unwrap(); - - // Wait for state machine to reach Flushing - while TaskState::Flushing != task.state() { - std::thread::sleep(Duration::from_millis(2)); - } - - gst::debug!(RUNTIME_CAT, "iterate: stop flushing"); - assert_eq!( - task.flush_stop().block_on().unwrap(), - Complete { - origin: Flushing, - target: Started, - }, - ); - let _ = block_on(started_receiver.next()); - - gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Error"); - block_on(iterate_receiver.next()).unwrap(); - block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap(); - - // Wait for state machine to reach Error - while TaskState::Error != task.state() { - std::thread::sleep(Duration::from_millis(2)); - } - - gst::debug!(RUNTIME_CAT, "iterate: attempting to start (after Error)"); - match task.start().block_on().unwrap_err() { - TransitionError { - trigger: Start, - state: TaskState::Error, - .. - } => (), - other => panic!("{:?}", other), - } assert_eq!( task.unprepare().block_on().unwrap(), Complete { - origin: TaskState::Error, + origin: Stopped, target: Unprepared, }, ); @@ -1575,6 +1521,8 @@ mod tests { } impl TaskImpl for TaskPrepareTest { + type Item = (); + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error"); @@ -1609,8 +1557,12 @@ mod tests { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - future::ok(()).boxed() + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("prepare_error: try_next"); + } + + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("prepare_error: handle_item"); } } @@ -1671,6 +1623,8 @@ mod tests { } impl TaskImpl for TaskPrepareTest { + type Item = (); + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!( @@ -1701,9 +1655,13 @@ mod tests { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("prepare_start_ok: handle_item"); + } } let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap(); @@ -1795,6 +1753,8 @@ mod tests { } impl TaskImpl for TaskPrepareTest { + type Item = (); + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!( @@ -1839,8 +1799,12 @@ mod tests { unreachable!("prepare_start_error: start"); } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - unreachable!("prepare_start_error: iterate"); + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("prepare_start_error: try_next"); + } + + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("prepare_start_error: handle_item"); } } @@ -1921,154 +1885,85 @@ mod tests { } #[test] - fn pause_start() { + fn item_error() { gst::init().unwrap(); - struct TaskPauseStartTest { - iterate_sender: mpsc::Sender<()>, - complete_receiver: mpsc::Receiver<()>, - paused_sender: mpsc::Sender<()>, + struct TaskTest { + try_next_receiver: mpsc::Receiver, } - impl TaskImpl for TaskPauseStartTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + impl TaskImpl for TaskTest { + type Item = gst::FlowError; + + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - gst::debug!(RUNTIME_CAT, "pause_start: entering iteration"); - self.iterate_sender.send(()).await.unwrap(); - - gst::debug!(RUNTIME_CAT, "pause_start: iteration awaiting completion"); - self.complete_receiver.next().await.unwrap(); - gst::debug!(RUNTIME_CAT, "pause_start: iteration complete"); - - Ok(()) + gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next"); + Ok(self.try_next_receiver.next().await.unwrap()) } .boxed() } - fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + fn handle_item( + &mut self, + item: gst::FlowError, + ) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - gst::debug!(RUNTIME_CAT, "pause_start: paused"); - self.paused_sender.send(()).await.unwrap(); - Ok(()) + gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item); + Err(item) } .boxed() } } - let context = Context::acquire("pause_start", Duration::from_millis(2)).unwrap(); - + let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap(); let task = Task::default(); + gst::debug!(RUNTIME_CAT, "item_error: prepare and start"); + let (mut try_next_sender, try_next_receiver) = mpsc::channel(1); + task.prepare(TaskTest { try_next_receiver }, context) + .block_on() + .unwrap(); + task.start().block_on().unwrap(); - let (iterate_sender, mut iterate_receiver) = mpsc::channel(1); - let (mut complete_sender, complete_receiver) = mpsc::channel(0); - let (paused_sender, mut paused_receiver) = mpsc::channel(1); - let _ = task.prepare( - TaskPauseStartTest { - iterate_sender, - complete_receiver, - paused_sender, - }, - context, - ); + gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos"); + block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap(); + // Wait for state machine to reach Stopped + while Stopped != task.state() { + std::thread::sleep(Duration::from_millis(2)); + } - gst::debug!(RUNTIME_CAT, "pause_start: starting"); + gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)"); assert_eq!( task.start().block_on().unwrap(), Complete { - origin: Prepared, + origin: Stopped, target: Started, }, ); - assert_eq!(task.state(), Started); - gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration"); - block_on(iterate_receiver.next()).unwrap(); + gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error"); + block_on(try_next_sender.send(gst::FlowError::Error)).unwrap(); + // Wait for state machine to reach Error + while TaskState::Error != task.state() { + std::thread::sleep(Duration::from_millis(2)); + } - gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)"); - match task.pause() { - Ready(Ok(NotWaiting { - trigger: Pause, - origin: Started, - })) => (), + gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)"); + match task.start().block_on().unwrap_err() { + TransitionError { + trigger: Start, + state: TaskState::Error, + .. + } => (), other => panic!("{:?}", other), } - gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion"); - complete_sender.try_send(()).unwrap(); - - // Pause transition is asynchronous FIXME - while TaskState::Paused != task.state() { - std::thread::sleep(Duration::from_millis(5)); - } - - gst::debug!(RUNTIME_CAT, "pause_start: awaiting paused"); - let _ = block_on(paused_receiver.next()); - - // Loop held on due to Pause - iterate_receiver.try_next().unwrap_err(); assert_eq!( - task.start().block_on().unwrap(), + task.unprepare().block_on().unwrap(), Complete { - origin: Paused, - target: Started, + origin: TaskState::Error, + target: Unprepared, }, ); - assert_eq!(task.state(), Started); - - gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration"); - block_on(iterate_receiver.next()).unwrap(); - - gst::debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion"); - complete_sender.try_send(()).unwrap(); - - stop_then_unprepare(task); - } - - #[test] - fn successive_pause_start() { - // Purpose: check pause cancellation. - gst::init().unwrap(); - - struct TaskPauseStartTest { - iterate_sender: mpsc::Sender<()>, - } - - impl TaskImpl for TaskPauseStartTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - gst::debug!(RUNTIME_CAT, "successive_pause_start: iteration"); - self.iterate_sender.send(()).await.unwrap(); - - Ok(()) - } - .boxed() - } - } - - let context = Context::acquire("successive_pause_start", Duration::from_millis(2)).unwrap(); - - let task = Task::default(); - - let (iterate_sender, mut iterate_receiver) = mpsc::channel(1); - let _ = task.prepare(TaskPauseStartTest { iterate_sender }, context); - - gst::debug!(RUNTIME_CAT, "successive_pause_start: starting"); - block_on(task.start()).unwrap(); - - gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1"); - block_on(iterate_receiver.next()).unwrap(); - - gst::debug!(RUNTIME_CAT, "successive_pause_start: pause and start"); - let _ = task.pause(); - block_on(task.start()).unwrap(); - - assert_eq!(task.state(), Started); - - gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2"); - block_on(iterate_receiver.next()).unwrap(); - - gst::debug!(RUNTIME_CAT, "successive_pause_start: stopping"); - stop_then_unprepare(task); } #[test] @@ -2081,10 +1976,16 @@ mod tests { } impl TaskImpl for TaskFlushTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("flush_regular_sync: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing"); @@ -2160,10 +2061,16 @@ mod tests { } impl TaskImpl for TaskFlushTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("flush_regular_different_context: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!( @@ -2272,10 +2179,16 @@ mod tests { } impl TaskImpl for TaskFlushTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("flush_regular_same_context: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing"); @@ -2369,11 +2282,16 @@ mod tests { } impl TaskImpl for TaskFlushTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + future::ok(()).boxed() + } + + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration"); - let flush_status = self.task.flush_start(); - match flush_status { + gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item"); + match self.task.flush_start() { Pending { trigger: FlushStart, origin: Started, @@ -2381,7 +2299,6 @@ mod tests { } => (), other => panic!("{:?}", other), } - flush_status.await.unwrap(); Ok(()) } .boxed() @@ -2440,18 +2357,25 @@ mod tests { } impl TaskImpl for TaskStartTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + future::ok(()).boxed() + } + + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - gst::debug!(RUNTIME_CAT, "pause_from_loop: entering iteration"); + gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item"); crate::runtime::time::delay_for(Duration::from_millis(50)).await; - gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration"); + gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item"); match self.task.pause() { - Ready(Ok(TransitionOk::NotWaiting { + Pending { trigger: Pause, origin: Started, - })) => (), + .. + } => (), other => panic!("{:?}", other), } @@ -2502,10 +2426,16 @@ mod tests { } impl TaskImpl for TaskFlushTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("trigger_from_action: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!( @@ -2572,6 +2502,8 @@ mod tests { } impl TaskImpl for TaskFlushTest { + type Item = (); + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "pause_flush_start: started"); @@ -2581,10 +2513,14 @@ mod tests { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("pause_flush_start: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing"); @@ -2681,6 +2617,8 @@ mod tests { } impl TaskImpl for TaskFlushTest { + type Item = (); + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "pause_flushing_start: started"); @@ -2690,10 +2628,14 @@ mod tests { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("pause_flushing_start: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing"); @@ -2779,10 +2721,16 @@ mod tests { } impl TaskImpl for TaskStartTest { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + type Item = (); + + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { future::pending::>().boxed() } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + unreachable!("flush_concurrent_start: handle_item"); + } + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing"); @@ -2893,6 +2841,8 @@ mod tests { } impl TaskImpl for TaskTimerTest { + type Item = (); + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { self.timer = Some(crate::runtime::time::delay_for(Duration::from_millis(50))); @@ -2902,12 +2852,18 @@ mod tests { .boxed() } - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer"); self.timer.take().unwrap().await; - gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed"); + Ok(()) + } + .boxed() + } + fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed"); if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() { timer_elapsed_sender.send(()).unwrap(); }