From 244f6dd6f7a015cb9a9ee3a18b6dc6f70e43b86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 25 May 2020 17:18:31 +0200 Subject: [PATCH] threadshare: fix Transition naming --- generic/threadshare/src/runtime/task.rs | 521 ++++++++++++------------ generic/threadshare/src/tcpclientsrc.rs | 16 +- 2 files changed, 268 insertions(+), 269 deletions(-) diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 636f486b..53ed66e0 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -49,7 +49,7 @@ pub enum TaskState { } #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum Transition { +pub enum Trigger { Error, FlushStart, FlushStop, @@ -60,10 +60,10 @@ pub enum Transition { Unprepare, } -/// TransitionRequest error details. +/// TriggeringEvent error details. #[derive(Clone, Debug, Eq, PartialEq)] pub struct TransitionError { - pub transition: Transition, + pub trigger: Trigger, pub state: TaskState, pub err_msg: gst::ErrorMessage, } @@ -73,7 +73,7 @@ impl fmt::Display for TransitionError { write!( f, "{:?} from state {:?}: {:?}", - self.transition, self.state, self.err_msg + self.trigger, self.state, self.err_msg ) } } @@ -86,7 +86,9 @@ impl From for gst::ErrorMessage { } } -/// Transition request handling details. +/// Transition details. +/// +/// A state transition occurs as a result of a triggering event. #[derive(Clone, Debug, Eq, PartialEq)] pub enum TransitionStatus { /// Transition completed successfully. @@ -94,36 +96,24 @@ pub enum TransitionStatus { origin: TaskState, target: TaskState, }, - /// The transition acknowledgement was spawned in a subtask. + /// Asynchronously awaiting for transition completion in a subtask. /// - /// This occurs when the transition is requested from a `Context`. - Async { - transition: Transition, - origin: TaskState, - }, + /// This occurs when the event is triggered from a `Context`. + Async { trigger: Trigger, origin: TaskState }, /// Not waiting for transition completion. /// /// This is to prevent: - /// - A deadlock when executing from a `TaskImpl` hook. + /// - A deadlock when executing a transition action. /// - A potential infinite wait when pausing a running loop /// which could be awaiting for an `iterate` to complete. - NotWaiting { - transition: Transition, - origin: TaskState, - }, - /// Skipping transition due to current state. - Skipped { - transition: Transition, - state: TaskState, - }, + NotWaiting { trigger: Trigger, origin: TaskState }, + /// Skipping triggering event due to current state. + Skipped { trigger: Trigger, state: TaskState }, } /// Implementation trait for `Task`s. /// -/// Defines implementations for specific state transitions. -/// -/// In the event of a failure, the implementer must call -/// `gst_element_error!`. +/// Defines implementations for state transition actions and error handlers. pub trait TaskImpl: Send + 'static { fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { future::ok(()).boxed() @@ -161,16 +151,16 @@ pub trait TaskImpl: Send + 'static { /// This handler also catches errors returned by subtasks spawned by the iteration. /// /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message` - /// and return `Transition::Error`. + /// and return `Trigger::Error`. /// /// Otherwise, handle the error and return the requested `Transition` to recover. /// /// Default behaviour depends on the `err`: /// - /// - `FlowError::Flushing` -> `Transition::FlushStart`. - /// - `FlowError::Eos` -> `Transition::Stop`. - /// - Other `FlowError` -> `Transition::Error`. - fn handle_iterate_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Transition> { + /// - `FlowError::Flushing` -> `Trigger::FlushStart`. + /// - `FlowError::Eos` -> `Trigger::Stop`. + /// - Other `FlowError` -> `Trigger::Error`. + fn handle_iterate_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> { async move { match err { gst::FlowError::Flushing => { @@ -178,11 +168,11 @@ pub trait TaskImpl: Send + 'static { RUNTIME_CAT, "TaskImpl iterate returned Flushing. Posting FlushStart" ); - Transition::FlushStart + Trigger::FlushStart } gst::FlowError::Eos => { gst_debug!(RUNTIME_CAT, "TaskImpl iterate returned Eos. Posting Stop"); - Transition::Stop + Trigger::Stop } other => { gst_error!( @@ -190,58 +180,58 @@ pub trait TaskImpl: Send + 'static { "TaskImpl iterate returned {:?}. Posting Error", other ); - Transition::Error + Trigger::Error } } } .boxed() } - /// Handles an error occuring during the execution of a transition hook. + /// Handles an error occuring during the execution of a transition action. /// - /// This handler also catches errors returned by subtasks spawned by the transition hook. + /// This handler also catches errors returned by subtasks spawned by the transition action. /// /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message` - /// and return `Transition::Error`. + /// and return `Trigger::Error`. /// - /// Otherwise, handle the error and return the requested `Transition` to recover. + /// Otherwise, handle the error and return the recovering `Trigger`. /// - /// Default is to `gst_error` log and return `Transition::Error`. - fn handle_hook_error( + /// Default is to `gst_error` log and return `Trigger::Error`. + fn handle_action_error( &mut self, - transition: Transition, + trigger: Trigger, state: TaskState, err: gst::ErrorMessage, - ) -> BoxFuture<'_, Transition> { + ) -> BoxFuture<'_, Trigger> { async move { gst_error!( RUNTIME_CAT, - "TaskImpl hook error during {:?} from {:?}: {:?}. Posting Transition::Error", - transition, + "TaskImpl transition action error during {:?} from {:?}: {:?}. Posting Trigger::Error", + trigger, state, err, ); - Transition::Error + Trigger::Error } .boxed() } } -struct TransitionRequest { - kind: Transition, +struct TriggeringEvent { + trigger: Trigger, ack_tx: oneshot::Sender>, } -impl TransitionRequest { +impl TriggeringEvent { fn new( - kind: Transition, + trigger: Trigger, ) -> ( Self, oneshot::Receiver>, ) { let (ack_tx, ack_rx) = oneshot::channel(); - let req = TransitionRequest { kind, ack_tx }; + let req = TriggeringEvent { trigger, ack_tx }; (req, ack_rx) } @@ -252,13 +242,13 @@ impl TransitionRequest { fn send_err_ack(self) { let res = Err(TransitionError { - transition: self.kind, + trigger: self.trigger, state: TaskState::Error, err_msg: gst_error_msg!( gst::CoreError::StateChange, [ - "Transition {:?} failed due to a previous unrecoverable error", - self.kind, + "Triggering Event {:?} rejected due to a previous unrecoverable error", + self.trigger, ] ), }); @@ -267,10 +257,10 @@ impl TransitionRequest { } } -impl fmt::Debug for TransitionRequest { +impl fmt::Debug for TriggeringEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TransitionRequest") - .field("kind", &self.kind) + f.debug_struct("TriggeringEvent") + .field("trigger", &self.trigger) .finish() } } @@ -280,7 +270,7 @@ struct TaskInner { context: Option, state: TaskState, state_machine_handle: Option>, - transition_tx: Option>, + triggering_evt_tx: Option>, prepare_abort_handle: Option, loop_abort_handle: Option, spawned_task_id: Option, @@ -292,7 +282,7 @@ impl Default for TaskInner { context: None, state: TaskState::Unprepared, state_machine_handle: None, - transition_tx: None, + triggering_evt_tx: None, prepare_abort_handle: None, loop_abort_handle: None, spawned_task_id: None, @@ -301,65 +291,65 @@ impl Default for TaskInner { } impl TaskInner { - fn switch_to_state(&mut self, target_state: TaskState, transition_req: TransitionRequest) { + fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) { let res = Ok(TransitionStatus::Complete { origin: self.state, target: target_state, }); self.state = target_state; - transition_req.send_ack(res); + triggering_evt.send_ack(res); } - fn switch_to_err(&mut self, transition_req: TransitionRequest) { + fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) { let res = Err(TransitionError { - transition: transition_req.kind, + trigger: triggering_evt.trigger, state: self.state, err_msg: gst_error_msg!( gst::CoreError::StateChange, [ "Unrecoverable error for {:?} from state {:?}", - transition_req, + triggering_evt, self.state, ] ), }); self.state = TaskState::Error; - transition_req.send_ack(res); + triggering_evt.send_ack(res); } - fn skip_transition(&mut self, transition_req: TransitionRequest) { + fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) { let res = Ok(TransitionStatus::Skipped { - transition: transition_req.kind, + trigger: triggering_evt.trigger, state: self.state, }); - transition_req.send_ack(res); + triggering_evt.send_ack(res); } - fn request_transition( + fn trigger( &mut self, - kind: Transition, + trigger: Trigger, ) -> Result>, TransitionError> { - let transition_tx = self.transition_tx.as_mut().unwrap(); + let triggering_evt_tx = self.triggering_evt_tx.as_mut().unwrap(); - let (transition_req, ack_rx) = TransitionRequest::new(kind); + let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); - gst_log!(RUNTIME_CAT, "Pushing {:?}", transition_req); + gst_log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); - transition_tx.try_send(transition_req).or_else(|err| { + triggering_evt_tx.try_send(triggering_evt).or_else(|err| { let resource_err = if err.is_full() { gst::ResourceError::NoSpaceLeft } else { gst::ResourceError::Close }; - gst_warning!(RUNTIME_CAT, "Unable to send {:?}: {:?}", kind, err); + gst_warning!(RUNTIME_CAT, "Unable to send {:?}: {:?}", trigger, err); Err(TransitionError { - transition: kind, + trigger, state: self.state, - err_msg: gst_error_msg!(resource_err, ["Unable to send {:?}: {:?}", kind, err]), + err_msg: gst_error_msg!(resource_err, ["Unable to send {:?}: {:?}", trigger, err]), }) })?; @@ -427,14 +417,14 @@ impl Task { TaskState::Prepared | TaskState::Preparing => { gst_debug!(RUNTIME_CAT, "Task already {:?}", origin); return Ok(TransitionStatus::Skipped { - transition: Transition::Prepare, + trigger: Trigger::Prepare, state: origin, }); } state => { gst_warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state); return Err(TransitionError { - transition: Transition::Prepare, + trigger: Trigger::Prepare, state: inner.state, err_msg: gst_error_msg!( gst::CoreError::StateChange, @@ -452,15 +442,15 @@ impl Task { // FIXME allow configuration of the channel buffer size, // this determines the contention on the Task. - let (transition_tx, transition_rx) = async_mpsc::channel(4); - let state_machine = StateMachine::new(Box::new(task_impl), transition_rx); + let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4); + let state_machine = StateMachine::new(Box::new(task_impl), triggering_evt_rx); inner.state_machine_handle = Some(self.spawn_state_machine(state_machine, &context)); - inner.transition_tx = Some(transition_tx); + inner.triggering_evt_tx = Some(triggering_evt_tx); inner.context = Some(context); Ok(TransitionStatus::Async { - transition: Transition::Prepare, + trigger: Trigger::Prepare, origin, }) } @@ -476,7 +466,7 @@ impl Task { TaskState::Unprepared | TaskState::Unpreparing => { gst_debug!(RUNTIME_CAT, "Task already {:?}", origin); return Ok(TransitionStatus::Skipped { - transition: Transition::Unprepare, + trigger: Trigger::Unprepare, state: origin, }); } @@ -487,7 +477,7 @@ impl Task { state ); return Err(TransitionError { - transition: Transition::Unprepare, + trigger: Trigger::Unprepare, state: inner.state, err_msg: gst_error_msg!( gst::CoreError::StateChange, @@ -503,8 +493,8 @@ impl Task { loop_abort_handle.abort(); } - let _ = inner.request_transition(Transition::Unprepare).unwrap(); - let transition_tx = inner.transition_tx.take().unwrap(); + let _ = inner.trigger(Trigger::Unprepare).unwrap(); + let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap(); let state_machine_handle = inner.state_machine_handle.take(); let context = inner.context.take().unwrap(); @@ -525,7 +515,7 @@ impl Task { let join_fut = block_on_or_add_sub_task(async { state_machine_handle.await; - drop(transition_tx); + drop(triggering_evt_tx); drop(context); gst_debug!(RUNTIME_CAT, "Task unprepared"); @@ -533,13 +523,13 @@ impl Task { if join_fut.is_none() { return Ok(TransitionStatus::Async { - transition: Transition::Unprepare, + trigger: Trigger::Unprepare, origin, }); } } None => { - drop(transition_tx); + drop(triggering_evt_tx); drop(context); } } @@ -556,16 +546,16 @@ impl Task { pub fn start(&self) -> Result { let mut inner = self.0.lock().unwrap(); - let ack_rx = inner.request_transition(Transition::Start)?; + let ack_rx = inner.trigger(Trigger::Start)?; if let TaskState::Started = inner.state { return Ok(TransitionStatus::NotWaiting { - transition: Transition::Start, + trigger: Trigger::Start, origin: TaskState::Started, }); } - Self::await_ack(inner, ack_rx, Transition::Start) + Self::await_ack(inner, ack_rx, Trigger::Start) } /// Requests the `Task` loop to pause. @@ -576,16 +566,16 @@ impl Task { pub fn pause(&self) -> Result { let mut inner = self.0.lock().unwrap(); - let ack_rx = inner.request_transition(Transition::Pause)?; + let ack_rx = inner.trigger(Trigger::Pause)?; if let TaskState::Started = inner.state { return Ok(TransitionStatus::NotWaiting { - transition: Transition::Pause, + trigger: Trigger::Pause, origin: TaskState::Started, }); } - Self::await_ack(inner, ack_rx, Transition::Pause) + Self::await_ack(inner, ack_rx, Trigger::Pause) } pub fn flush_start(&self) -> Result { @@ -595,7 +585,7 @@ impl Task { loop_abort_handle.abort(); } - Self::push_and_await_transition(inner, Transition::FlushStart) + Self::push_and_await_transition(inner, Trigger::FlushStart) } pub fn flush_stop(&self) -> Result { @@ -605,7 +595,7 @@ impl Task { loop_abort_handle.abort(); } - Self::push_and_await_transition(inner, Transition::FlushStop) + Self::push_and_await_transition(inner, Trigger::FlushStop) } /// Stops the `Started` `Task` and wait for it to finish. @@ -616,27 +606,27 @@ impl Task { loop_abort_handle.abort(); } - Self::push_and_await_transition(inner, Transition::Stop) + Self::push_and_await_transition(inner, Trigger::Stop) } fn push_and_await_transition( mut inner: MutexGuard, - transition: Transition, + trigger: Trigger, ) -> Result { - let ack_rx = inner.request_transition(transition)?; + let ack_rx = inner.trigger(trigger)?; - Self::await_ack(inner, ack_rx, transition) + Self::await_ack(inner, ack_rx, trigger) } fn await_ack( inner: MutexGuard, ack_rx: oneshot::Receiver>, - transition: Transition, + trigger: Trigger, ) -> Result { let origin = inner.state; - // Since transition handling is serialized by the state machine and - // we hold a lock on TaskInner, we can verify if current spawned loop / tansition hook + // Since triggering events handling is serialized by the state machine and + // we hold a lock on TaskInner, we can verify if current spawned loop / tansition action // task_id matches the task_id of current subtask, if any. if let Some(spawned_task_id) = inner.spawned_task_id { if let Some((cur_context, cur_task_id)) = Context::current_task() { @@ -645,10 +635,10 @@ impl Task { // Don't block as this would deadlock gst_log!( RUNTIME_CAT, - "Requested {:?} from loop or transition hook, not waiting", - transition, + "Requested {:?} from loop or transition action, not waiting", + trigger, ); - return Ok(TransitionStatus::NotWaiting { transition, origin }); + return Ok(TransitionStatus::NotWaiting { trigger, origin }); } } } @@ -656,20 +646,20 @@ impl Task { drop(inner); block_on_or_add_sub_task(async move { - gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", transition); + gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger); let res = ack_rx.await.unwrap(); if res.is_ok() { - gst_log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, transition); + gst_log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); } else { - gst_error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, transition); + gst_error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); } res }) .unwrap_or_else(|| { // Future was spawned as a subtask - Ok(TransitionStatus::Async { transition, origin }) + Ok(TransitionStatus::Async { trigger, origin }) }) } @@ -692,27 +682,27 @@ impl Task { struct StateMachine { task_impl: Box, - transition_rx: async_mpsc::Receiver, - pending_transition: Option, + triggering_evt_rx: async_mpsc::Receiver, + pending_triggering_evt: Option, } -// Make sure the Context doesn't throttle otherwise we end up with long delays -// executing transition in a pipeline with many elements. This is because pipeline -// serializes the transitions and the Context's scheduler gets a chance to reach its -// throttling state between 2 elements. +// Make sure the Context doesn't throttle otherwise we end up with long delays executing +// transition actions in a pipeline with many elements. This is because pipeline serializes +// the transition actions and the Context's scheduler gets a chance to reach its throttling +// state between 2 elements. -macro_rules! exec_hook { - ($self:ident, $hook:ident, $transition_req:expr, $origin:expr, $task_inner:expr, $context:expr) => {{ - let transition = $transition_req.kind; +macro_rules! exec_action { + ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr, $context:expr) => {{ + let trigger = $triggering_evt.trigger; - let hook_fut = async move { - let mut res = $self.task_impl.$hook().await; + let action_fut = async move { + let mut res = $self.task_impl.$action().await; if res.is_ok() { while Context::current_has_sub_tasks() { - gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($hook)); + gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); res = Context::drain_sub_tasks().await.map_err(|err| { - let msg = format!("{} subtask returned {:?}", stringify!($hook), err); + let msg = format!("{} subtask returned {:?}", stringify!($action), err); gst_log!(RUNTIME_CAT, "{}", &msg); gst_error_msg!(gst::CoreError::StateChange, ["{}", &msg]) }); @@ -726,11 +716,11 @@ macro_rules! exec_hook { let res = match res { Ok(()) => Ok(()), Err(err) => { - let next_transition_req = $self + let next_triggering_evt = $self .task_impl - .handle_hook_error(transition, $origin, err) + .handle_action_error(trigger, $origin, err) .await; - Err(next_transition_req) + Err(next_triggering_evt) } }; @@ -739,7 +729,7 @@ macro_rules! exec_hook { let join_handle = { let mut task_inner = $task_inner.lock().unwrap(); - let join_handle = $context.awake_and_spawn(hook_fut); + let join_handle = $context.awake_and_spawn(action_fut); task_inner.spawned_task_id = Some(join_handle.task_id()); join_handle @@ -749,18 +739,18 @@ macro_rules! exec_hook { $self = this; match res { - Ok(()) => Ok($transition_req), - Err(next_transition_req) => { - // Convert transition according to the error handler's decision + Ok(()) => Ok($triggering_evt), + Err(next_trigger) => { + // Convert triggering event according to the error handler's decision gst_trace!( RUNTIME_CAT, - "TaskImpl hook error: converting {:?} to {:?}", - $transition_req.kind, - next_transition_req, + "TaskImpl transition action error: converting {:?} to {:?}", + $triggering_evt.trigger, + next_trigger, ); - $transition_req.kind = next_transition_req; - $self.pending_transition = Some($transition_req); + $triggering_evt.trigger = next_trigger; + $self.pending_triggering_evt = Some($triggering_evt); Err(()) } @@ -773,12 +763,12 @@ impl StateMachine { // without inducing any significant performance penalties. fn new( task_impl: Box, - transition_rx: async_mpsc::Receiver, + triggering_evt_rx: async_mpsc::Receiver, ) -> Self { StateMachine { task_impl, - transition_rx, - pending_transition: None, + triggering_evt_rx, + pending_triggering_evt: None, } } @@ -786,59 +776,59 @@ impl StateMachine { gst_trace!(RUNTIME_CAT, "Preparing task"); { - let (mut transition_req, _) = TransitionRequest::new(Transition::Prepare); - let res = exec_hook!( + let (mut triggering_evt, _) = TriggeringEvent::new(Trigger::Prepare); + let res = exec_action!( self, prepare, - transition_req, + triggering_evt, TaskState::Preparing, &task_inner, &context ); - if let Ok(transition_req) = res { + if let Ok(triggering_evt) = res { task_inner .lock() .unwrap() - .switch_to_state(TaskState::Prepared, transition_req); + .switch_to_state(TaskState::Prepared, triggering_evt); gst_trace!(RUNTIME_CAT, "Task Prepared"); } } loop { - let mut transition_req = match self.pending_transition.take() { - Some(pending_transition_req) => pending_transition_req, + let mut triggering_evt = match self.pending_triggering_evt.take() { + Some(pending_triggering_evt) => pending_triggering_evt, None => self - .transition_rx + .triggering_evt_rx .next() .await - .expect("transition_rx dropped"), + .expect("triggering_evt_rx dropped"), }; - gst_trace!(RUNTIME_CAT, "State machine popped {:?}", transition_req); + gst_trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt); - match transition_req.kind { - Transition::Error => { + match triggering_evt.trigger { + Trigger::Error => { let mut task_inner = task_inner.lock().unwrap(); - task_inner.switch_to_err(transition_req); + task_inner.switch_to_err(triggering_evt); gst_trace!(RUNTIME_CAT, "Switched to Error"); } - Transition::Start => { + Trigger::Start => { let origin = { let mut task_inner = task_inner.lock().unwrap(); let origin = task_inner.state; match origin { TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (), TaskState::PausedFlushing => { - task_inner.switch_to_state(TaskState::Flushing, transition_req); + task_inner.switch_to_state(TaskState::Flushing, triggering_evt); gst_trace!(RUNTIME_CAT, "Switched from PausedFlushing to Flushing"); continue; } TaskState::Error => { - transition_req.send_err_ack(); + triggering_evt.send_err_ack(); continue; } state => { - task_inner.skip_transition(transition_req); + task_inner.skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped Start in state {:?}", state); continue; } @@ -848,10 +838,10 @@ impl StateMachine { }; self = - Self::spawn_loop(self, transition_req, origin, &task_inner, &context).await; - // next/pending transition handled in next iteration + Self::spawn_loop(self, triggering_evt, origin, &task_inner, &context).await; + // next/pending triggering event handled in next iteration } - Transition::Pause => { + Trigger::Pause => { let (origin, target) = { let mut task_inner = task_inner.lock().unwrap(); let origin = task_inner.state; @@ -861,11 +851,11 @@ impl StateMachine { } TaskState::Flushing => (origin, TaskState::PausedFlushing), TaskState::Error => { - transition_req.send_err_ack(); + triggering_evt.send_err_ack(); continue; } state => { - task_inner.skip_transition(transition_req); + task_inner.skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state); continue; } @@ -873,16 +863,16 @@ impl StateMachine { }; let res = - exec_hook!(self, pause, transition_req, origin, &task_inner, &context); - if let Ok(transition_req) = res { + exec_action!(self, pause, triggering_evt, origin, &task_inner, &context); + if let Ok(triggering_evt) = res { task_inner .lock() .unwrap() - .switch_to_state(target, transition_req); + .switch_to_state(target, triggering_evt); gst_trace!(RUNTIME_CAT, "Task loop {:?}", target); } } - Transition::Stop => { + Trigger::Stop => { let origin = { let mut task_inner = task_inner.lock().unwrap(); let origin = task_inner.state; @@ -892,27 +882,28 @@ impl StateMachine { | TaskState::PausedFlushing | TaskState::Flushing => origin, TaskState::Error => { - transition_req.send_err_ack(); + triggering_evt.send_err_ack(); continue; } state => { - task_inner.skip_transition(transition_req); + task_inner.skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped Stop in state {:?}", state); continue; } } }; - let res = exec_hook!(self, stop, transition_req, origin, &task_inner, &context); - if let Ok(transition_req) = res { + let res = + exec_action!(self, stop, triggering_evt, origin, &task_inner, &context); + if let Ok(triggering_evt) = res { task_inner .lock() .unwrap() - .switch_to_state(TaskState::Stopped, transition_req); + .switch_to_state(TaskState::Stopped, triggering_evt); gst_trace!(RUNTIME_CAT, "Task loop Stopped"); } } - Transition::FlushStart => { + Trigger::FlushStart => { let (origin, target) = { let mut task_inner = task_inner.lock().unwrap(); let origin = task_inner.state; @@ -920,78 +911,81 @@ impl StateMachine { TaskState::Started => (origin, TaskState::Flushing), TaskState::Paused => (origin, TaskState::PausedFlushing), TaskState::Error => { - transition_req.send_err_ack(); + triggering_evt.send_err_ack(); continue; } state => { - task_inner.skip_transition(transition_req); + task_inner.skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped FlushStart in state {:?}", state); continue; } } }; - let res = exec_hook!( + let res = exec_action!( self, flush_start, - transition_req, + triggering_evt, origin, &task_inner, &context ); - if let Ok(transition_req) = res { + if let Ok(triggering_evt) = res { task_inner .lock() .unwrap() - .switch_to_state(target, transition_req); + .switch_to_state(target, triggering_evt); gst_trace!(RUNTIME_CAT, "Task {:?}", target); } } - Transition::FlushStop => { + Trigger::FlushStop => { let origin = task_inner.lock().unwrap().state; let is_paused = match origin { TaskState::Flushing => false, TaskState::PausedFlushing => true, TaskState::Error => { - transition_req.send_err_ack(); + triggering_evt.send_err_ack(); continue; } state => { - task_inner.lock().unwrap().skip_transition(transition_req); + task_inner + .lock() + .unwrap() + .skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped FlushStop in state {:?}", state); continue; } }; - let res = exec_hook!( + let res = exec_action!( self, flush_stop, - transition_req, + triggering_evt, origin, &task_inner, &context ); - if let Ok(transition_req) = res { + if let Ok(triggering_evt) = res { if is_paused { task_inner .lock() .unwrap() - .switch_to_state(TaskState::Paused, transition_req); + .switch_to_state(TaskState::Paused, triggering_evt); gst_trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused"); } else { self = Self::spawn_loop( self, - transition_req, + triggering_evt, origin, &task_inner, &context, ) .await; - // next/pending transition handled in next iteration + // next/pending triggering event handled in next iteration } } } - Transition::Unprepare => { + Trigger::Unprepare => { // Unprepare is not joined by an ack_rx but by joining the state machine // handle, so we don't need to keep track of the spwaned_task_id context @@ -1015,11 +1009,11 @@ impl StateMachine { task_inner .lock() .unwrap() - .switch_to_state(TaskState::Unprepared, transition_req); + .switch_to_state(TaskState::Unprepared, triggering_evt); break; } - _ => unreachable!("State machine handler {:?}", transition_req), + _ => unreachable!("State machine handler {:?}", triggering_evt), } } @@ -1028,7 +1022,7 @@ impl StateMachine { async fn spawn_loop( mut self, - mut transition_req: TransitionRequest, + mut triggering_evt: TriggeringEvent, origin: TaskState, task_inner: &Arc>, context: &Context, @@ -1059,7 +1053,7 @@ impl StateMachine { let mut task_inner = task_inner_clone.lock().unwrap(); task_inner.loop_abort_handle = Some(loop_abort_handle); - task_inner.switch_to_state(TaskState::Started, transition_req); + task_inner.switch_to_state(TaskState::Started, triggering_evt); abortable_task_loop }; @@ -1068,32 +1062,32 @@ impl StateMachine { match abortable_task_loop.await { Ok(Ok(())) => (), Ok(Err(err)) => { - let next_transition = self.task_impl.handle_iterate_error(err).await; - let (transition_req, _) = TransitionRequest::new(next_transition); - self.pending_transition = Some(transition_req); + let next_trigger = self.task_impl.handle_iterate_error(err).await; + let (triggering_evt, _) = TriggeringEvent::new(next_trigger); + self.pending_triggering_evt = Some(triggering_evt); } Err(Aborted) => gst_trace!(RUNTIME_CAT, "Task loop aborted"), } } Err(err) => { - // Error while executing start hook - let next_transition = self + // Error while executing start transition action + let next_trigger = self .task_impl - .handle_hook_error(transition_req.kind, origin, err) + .handle_action_error(triggering_evt.trigger, origin, err) .await; gst_log!( RUNTIME_CAT, - "TaskImpl hook error: converting Start to {:?}", - next_transition, + "TaskImpl transition action error: converting Start to {:?}", + next_trigger, ); - transition_req.kind = next_transition; - self.pending_transition = Some(transition_req); + triggering_evt.trigger = next_trigger; + self.pending_triggering_evt = Some(triggering_evt); } } - // next/pending transition handled in state machine loop + // next/pending triggering event handled in state machine loop self }; @@ -1113,35 +1107,37 @@ impl StateMachine { gst_trace!(RUNTIME_CAT, "Task loop started"); loop { - // Check if there is any pending transition_req - while let Ok(Some(transition_req)) = self.transition_rx.try_next() { - gst_trace!(RUNTIME_CAT, "Task loop popped {:?}", transition_req); + while let Ok(Some(triggering_evt)) = self.triggering_evt_rx.try_next() { + gst_trace!(RUNTIME_CAT, "Task loop popped {:?}", triggering_evt); - match transition_req.kind { - Transition::Start => { - task_inner.lock().unwrap().skip_transition(transition_req); + match triggering_evt.trigger { + Trigger::Start => { + task_inner + .lock() + .unwrap() + .skip_triggering_evt(triggering_evt); gst_trace!(RUNTIME_CAT, "Skipped Start in state Started"); } _ => { gst_trace!( RUNTIME_CAT, "Task loop handing {:?} to state machine", - transition_req, + triggering_evt, ); - self.pending_transition = Some(transition_req); + self.pending_triggering_evt = Some(triggering_evt); return Ok(()); } } } - // Run the iteration + // Run the iteration function self.task_impl.iterate().await.map_err(|err| { gst_log!(RUNTIME_CAT, "Task loop iterate impl returned {:?}", err); err })?; while Context::current_has_sub_tasks() { - gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($hook)); + gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); Context::drain_sub_tasks().await.map_err(|err| { gst_log!(RUNTIME_CAT, "Task loop iterate subtask returned {:?}", err); err @@ -1290,7 +1286,7 @@ mod tests { assert_eq!( res, TransitionStatus::Async { - transition: Transition::Prepare, + trigger: Trigger::Prepare, origin: TaskState::Unprepared, } ); @@ -1321,7 +1317,7 @@ mod tests { assert_eq!( task.start().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::Start, + trigger: Trigger::Start, origin: TaskState::Started, }, ); @@ -1331,7 +1327,7 @@ mod tests { assert_eq!( task.pause().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::Pause, + trigger: Trigger::Pause, origin: TaskState::Started, }, ); @@ -1346,7 +1342,7 @@ mod tests { } } - gst_debug!(RUNTIME_CAT, "task_iterate: awaiting pause hook ack"); + gst_debug!(RUNTIME_CAT, "task_iterate: awaiting pause ack"); paused_receiver.next().await.unwrap(); gst_debug!(RUNTIME_CAT, "task_iterate: starting (after pause)"); @@ -1394,7 +1390,7 @@ mod tests { .await .unwrap(); - gst_debug!(RUNTIME_CAT, "task_iterate: awaiting stop hook ack"); + gst_debug!(RUNTIME_CAT, "task_iterate: awaiting stop ack"); stopped_receiver.next().await.unwrap(); // Wait for state machine to reach Stopped @@ -1419,7 +1415,7 @@ mod tests { .await .unwrap(); - gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start hook ack"); + gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start ack"); flush_start_receiver.next().await.unwrap(); // Wait for state machine to reach Flushing @@ -1456,7 +1452,7 @@ mod tests { let err = task.start().unwrap_err(); match err { TransitionError { - transition: Transition::Start, + trigger: Trigger::Start, state: TaskState::Error, .. } => (), @@ -1495,25 +1491,25 @@ mod tests { .boxed() } - fn handle_hook_error( + fn handle_action_error( &mut self, - transition: Transition, + trigger: Trigger, state: TaskState, err: gst::ErrorMessage, - ) -> BoxFuture<'_, Transition> { + ) -> BoxFuture<'_, Trigger> { async move { gst_debug!( RUNTIME_CAT, "prepare_error: handling prepare error {:?}", err ); - match (transition, state) { - (Transition::Prepare, TaskState::Preparing) => { + match (trigger, state) { + (Trigger::Prepare, TaskState::Preparing) => { self.prepare_error_sender.send(()).await.unwrap(); } other => unreachable!("{:?}", other), } - Transition::Error + Trigger::Error } .boxed() } @@ -1538,7 +1534,10 @@ mod tests { ) .unwrap(); - gst_debug!(RUNTIME_CAT, "prepare_error: await error hook notification"); + gst_debug!( + RUNTIME_CAT, + "prepare_error: await action error notification" + ); prepare_error_receiver.next().await.unwrap(); // Wait for state machine to reach Error @@ -1549,7 +1548,7 @@ mod tests { let res = task.start().unwrap_err(); match res { TransitionError { - transition: Transition::Start, + trigger: Trigger::Start, state: TaskState::Error, .. } => (), @@ -1583,12 +1582,12 @@ mod tests { .boxed() } - fn handle_hook_error( + fn handle_action_error( &mut self, - _transition: Transition, + _trigger: Trigger, _state: TaskState, _err: gst::ErrorMessage, - ) -> BoxFuture<'_, Transition> { + ) -> BoxFuture<'_, Trigger> { unreachable!("prepare_start_ok: handle_prepare_error"); } @@ -1622,7 +1621,7 @@ mod tests { assert_eq!( task_clone.start().unwrap(), TransitionStatus::Async { - transition: Transition::Start, + trigger: Trigger::Start, origin: TaskState::Preparing, } ); @@ -1633,7 +1632,7 @@ mod tests { assert_eq!( task.stop().unwrap(), TransitionStatus::Async { - transition: Transition::Stop, + trigger: Trigger::Stop, origin: TaskState::Started, }, ); @@ -1643,7 +1642,7 @@ mod tests { assert_eq!( task.unprepare().unwrap(), TransitionStatus::Async { - transition: Transition::Unprepare, + trigger: Trigger::Unprepare, origin: TaskState::Stopped, }, ); @@ -1689,25 +1688,25 @@ mod tests { .boxed() } - fn handle_hook_error( + fn handle_action_error( &mut self, - transition: Transition, + trigger: Trigger, state: TaskState, err: gst::ErrorMessage, - ) -> BoxFuture<'_, Transition> { + ) -> BoxFuture<'_, Trigger> { async move { gst_debug!( RUNTIME_CAT, "prepare_start_error: handling prepare error {:?}", err ); - match (transition, state) { - (Transition::Prepare, TaskState::Preparing) => { + match (trigger, state) { + (Trigger::Prepare, TaskState::Preparing) => { self.prepare_error_sender.send(()).await.unwrap(); } - other => unreachable!("hook error for {:?}", other), + other => unreachable!("action error for {:?}", other), } - Transition::Error + Trigger::Error } .boxed() } @@ -1749,7 +1748,7 @@ mod tests { assert_eq!( task.unprepare().unwrap(), TransitionStatus::Async { - transition: Transition::Unprepare, + trigger: Trigger::Unprepare, origin: TaskState::Error, }, ); @@ -1843,7 +1842,7 @@ mod tests { assert_eq!( task.pause().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::Pause, + trigger: Trigger::Pause, origin: TaskState::Started, }, ); @@ -2075,7 +2074,7 @@ mod tests { assert_eq!( task_clone.flush_start().unwrap(), TransitionStatus::Async { - transition: Transition::FlushStart, + trigger: Trigger::FlushStart, origin: TaskState::Started, }, ); @@ -2086,7 +2085,7 @@ mod tests { assert_eq!( task_clone.flush_stop().unwrap(), TransitionStatus::Async { - transition: Transition::FlushStop, + trigger: Trigger::FlushStop, origin: TaskState::Flushing, }, ); @@ -2157,7 +2156,7 @@ mod tests { assert_eq!( task_clone.flush_start().unwrap(), TransitionStatus::Async { - transition: Transition::FlushStart, + trigger: Trigger::FlushStart, origin: TaskState::Started, }, ); @@ -2168,7 +2167,7 @@ mod tests { assert_eq!( task_clone.flush_stop().unwrap(), TransitionStatus::Async { - transition: Transition::FlushStop, + trigger: Trigger::FlushStop, origin: TaskState::Flushing, }, ); @@ -2185,7 +2184,7 @@ mod tests { #[tokio::test] async fn flush_from_loop() { - // Purpose: make sure a flush_start transition triggered from an iteration doesn't block. + // Purpose: make sure a flush_start triggered from an iteration doesn't block. gst::init().unwrap(); struct TaskFlushTest { @@ -2200,7 +2199,7 @@ mod tests { assert_eq!( self.task.flush_start().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::FlushStart, + trigger: Trigger::FlushStart, origin: TaskState::Started, }, ); @@ -2253,7 +2252,7 @@ mod tests { #[tokio::test] async fn pause_from_loop() { - // Purpose: make sure a start transition triggered from an iteration doesn't block. + // Purpose: make sure a start triggered from an iteration doesn't block. // E.g. an auto pause cancellation after a delay. gst::init().unwrap(); @@ -2273,7 +2272,7 @@ mod tests { assert_eq!( self.task.pause().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::Pause, + trigger: Trigger::Pause, origin: TaskState::Started, }, ); @@ -2284,7 +2283,7 @@ mod tests { fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst_debug!(RUNTIME_CAT, "pause_from_loop: entering pause hook"); + gst_debug!(RUNTIME_CAT, "pause_from_loop: entering pause action"); self.pause_sender.send(()).await.unwrap(); Ok(()) } @@ -2316,8 +2315,8 @@ mod tests { } #[tokio::test] - async fn transition_from_hook() { - // Purpose: make sure a transition triggered from a transition hook doesn't block. + async fn trigger_from_action() { + // Purpose: make sure an event triggered from a transition action doesn't block. gst::init().unwrap(); struct TaskFlushTest { @@ -2334,12 +2333,12 @@ mod tests { async move { gst_debug!( RUNTIME_CAT, - "transition_from_hook: flush_start triggering flush_stop" + "trigger_from_action: flush_start triggering flush_stop" ); assert_eq!( self.task.flush_stop().unwrap(), TransitionStatus::NotWaiting { - transition: Transition::FlushStop, + trigger: Trigger::FlushStop, origin: TaskState::Started, }, ); @@ -2350,7 +2349,7 @@ mod tests { fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst_debug!(RUNTIME_CAT, "transition_from_hook: stopped flushing"); + gst_debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing"); self.flush_stop_sender.send(()).await.unwrap(); Ok(()) } @@ -2358,7 +2357,7 @@ mod tests { } } - let context = Context::acquire("transition_from_hook", 2).unwrap(); + let context = Context::acquire("trigger_from_action", 2).unwrap(); let task = Task::default(); @@ -2377,7 +2376,7 @@ mod tests { gst_debug!( RUNTIME_CAT, - "transition_from_hook: awaiting flush_stop notification" + "trigger_from_action: awaiting flush_stop notification" ); flush_stop_receiver.next().await.unwrap(); @@ -2478,7 +2477,7 @@ mod tests { assert_eq!(task.state(), TaskState::Paused); flush_stop_receiver.next().await; - // start hook not executed + // start action not executed started_receiver.try_next().unwrap_err(); gst_debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing"); @@ -2576,7 +2575,7 @@ mod tests { ); assert_eq!(task.state(), TaskState::Flushing); - // start hook not executed + // start action not executed started_receiver.try_next().unwrap_err(); gst_debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush"); @@ -2659,11 +2658,11 @@ mod tests { let res = task_clone.flush_start().unwrap(); match res { TransitionStatus::Async { - transition: Transition::FlushStart, + trigger: Trigger::FlushStart, origin: TaskState::Paused, } => (), TransitionStatus::Async { - transition: Transition::FlushStart, + trigger: Trigger::FlushStart, origin: TaskState::Started, } => (), other => unreachable!("{:?}", other), diff --git a/generic/threadshare/src/tcpclientsrc.rs b/generic/threadshare/src/tcpclientsrc.rs index c51d4359..f2333eb5 100644 --- a/generic/threadshare/src/tcpclientsrc.rs +++ b/generic/threadshare/src/tcpclientsrc.rs @@ -41,7 +41,7 @@ use std::u32; use tokio::io::AsyncReadExt; use crate::runtime::prelude::*; -use crate::runtime::task::Transition; +use crate::runtime::task; use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; use super::socket::{Socket, SocketError, SocketRead}; @@ -391,21 +391,21 @@ impl TaskImpl for TcpClientSrcTask { .boxed() } - fn handle_hook_error( + fn handle_action_error( &mut self, - transition: Transition, + trigger: task::Trigger, state: TaskState, err: gst::ErrorMessage, - ) -> BoxFuture<'_, Transition> { + ) -> BoxFuture<'_, task::Trigger> { async move { - match transition { - Transition::Prepare => { + match trigger { + task::Trigger::Prepare => { gst_error!(CAT, "Task preparation failed: {:?}", err); self.element.post_error_message(&err); - Transition::Error + task::Trigger::Error } - other => unreachable!("Hook error {:?} in state {:?}", other, state), + other => unreachable!("Action error for {:?} in state {:?}", other, state), } } .boxed()