From 243a8a93eacf76b3ec701d6bb863edd54f5d8ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 19 Aug 2025 13:38:51 +0200 Subject: [PATCH] threadshare: have Task log its obj Part-of: --- .../examples/standalone/sink/task/imp.rs | 4 + .../examples/standalone/src/imp.rs | 4 + generic/threadshare/src/appsrc/imp.rs | 4 + generic/threadshare/src/audiotestsrc/imp.rs | 4 + generic/threadshare/src/inter/src/imp.rs | 4 + generic/threadshare/src/jitterbuffer/imp.rs | 4 + generic/threadshare/src/proxy/imp.rs | 4 + generic/threadshare/src/queue/imp.rs | 4 + generic/threadshare/src/rtpdtmfsrc/imp.rs | 4 + generic/threadshare/src/runtime/task.rs | 313 +++++++++++++++--- generic/threadshare/src/tcpclientsrc/imp.rs | 4 + generic/threadshare/src/udpsrc/imp.rs | 4 + generic/threadshare/tests/pad.rs | 4 + 13 files changed, 309 insertions(+), 52 deletions(-) diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs index f29f249c3..482e685f1 100644 --- a/generic/threadshare/examples/standalone/sink/task/imp.rs +++ b/generic/threadshare/examples/standalone/sink/task/imp.rs @@ -127,6 +127,10 @@ impl TaskSinkTask { impl TaskImpl for TaskSinkTask { type Item = StreamItem; + fn obj(&self) -> &impl IsA { + &self.elem + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Preparing Task"); Ok(()) diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index df64b603b..18fe3f92f 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -97,6 +97,10 @@ impl SrcTask { impl TaskImpl for SrcTask { type Item = (); + fn obj(&self) -> &impl IsA { + &self.elem + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { let imp = self.elem.imp(); let settings = imp.settings.lock().unwrap(); diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 52ba570f7..d7fceea3a 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -225,6 +225,10 @@ impl AppSrcTask { impl TaskImpl for AppSrcTask { type Item = StreamItem; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn try_next(&mut self) -> Result { self.receiver .next() diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs index f03cd7f90..a41e95f5d 100644 --- a/generic/threadshare/src/audiotestsrc/imp.rs +++ b/generic/threadshare/src/audiotestsrc/imp.rs @@ -296,6 +296,10 @@ impl AudioTestSrcTask { impl TaskImpl for AudioTestSrcTask { type Item = gst::Buffer; + fn obj(&self) -> &impl IsA { + &self.elem + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.elem, "Preparing Task"); diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 00f00bdd0..4c28428e7 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -436,6 +436,10 @@ impl InterSrcTask { impl TaskImpl for InterSrcTask { type Item = DataQueueItem; + fn obj(&self) -> &impl IsA { + &self.elem + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.elem, "Starting task"); self.dataqueue.start(); diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 96affa3cd..32868d485 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -1098,6 +1098,10 @@ impl JitterBufferTask { impl TaskImpl for JitterBufferTask { type Item = (); + fn obj(&self) -> &impl IsA { + &self.element + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.element, "Starting task"); diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 5390c1bd5..7af9adb7a 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -793,6 +793,10 @@ impl ProxySrcTask { impl TaskImpl for ProxySrcTask { type Item = DataQueueItem; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(SRC_CAT, obj = self.element, "Starting task"); diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 3620a432c..ca78b3fb2 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -263,6 +263,10 @@ impl QueueTask { impl TaskImpl for QueueTask { type Item = DataQueueItem; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.element, "Starting task"); diff --git a/generic/threadshare/src/rtpdtmfsrc/imp.rs b/generic/threadshare/src/rtpdtmfsrc/imp.rs index dee260e38..9dbeef9dc 100644 --- a/generic/threadshare/src/rtpdtmfsrc/imp.rs +++ b/generic/threadshare/src/rtpdtmfsrc/imp.rs @@ -410,6 +410,10 @@ enum TaskItem { impl TaskImpl for RTPDTMFSrcTask { type Item = TaskItem; + fn obj(&self) -> &impl IsA { + &self.elem + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!(CAT, obj = self.elem, "Starting Task"); diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 2c6747749..e26aee747 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -30,6 +30,9 @@ use std::pin::{pin, Pin}; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::Poll; +use gst::glib; +use gst::glib::prelude::*; + use super::{Context, JoinHandle, RUNTIME_CAT}; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] @@ -318,6 +321,8 @@ impl fmt::Debug for TransitionStatus { pub trait TaskImpl: Send + 'static { type Item: Send + 'static; + fn obj(&self) -> &impl IsA; + fn prepare(&mut self) -> impl Future> + Send { future::ok(()) } @@ -395,16 +400,25 @@ pub trait TaskImpl: Send + 'static { gst::FlowError::Flushing => { gst::debug!( RUNTIME_CAT, + obj = self.obj(), "Task loop returned Flushing. Posting FlushStart" ); Trigger::FlushStart } gst::FlowError::Eos => { - gst::debug!(RUNTIME_CAT, "Task loop returned Eos. Posting Stop"); + gst::debug!( + RUNTIME_CAT, + obj = self.obj(), + "Task loop returned Eos. Posting Stop" + ); Trigger::Stop } other => { - gst::error!(RUNTIME_CAT, "Task loop returned {:?}. Posting Error", other); + gst::error!( + RUNTIME_CAT, + obj = self.obj(), + "Task loop returned {other:?}. Posting Error", + ); Trigger::Error } } @@ -430,10 +444,8 @@ pub trait TaskImpl: Send + 'static { async move { gst::error!( RUNTIME_CAT, - "TaskImpl transition action error during {:?} from {:?}: {:?}. Posting Trigger::Error", - trigger, - state, - err, + obj = self.obj(), + "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error", ); Trigger::Error @@ -497,7 +509,7 @@ impl StateMachineHandle { fn trigger(&mut self, trigger: Trigger) -> AckReceiver { let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); - gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); + gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}"); self.triggering_evt_tx.try_send(triggering_evt).unwrap(); self.context.unpark(); @@ -545,8 +557,7 @@ impl TaskInner { err_msg: gst::error_msg!( gst::CoreError::StateChange, [ - "Unrecoverable error for {:?} from state {:?}", - triggering_evt, + "Unrecoverable error for {triggering_evt:?} from state {:?}", self.state, ] ), @@ -570,17 +581,13 @@ impl TaskInner { .as_mut() .map(|state_machine| state_machine.trigger(trigger)) .ok_or_else(|| { - gst::warning!( - RUNTIME_CAT, - "Unable to send {:?}: no state machine", - trigger - ); + gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",); TransitionError { trigger, state: TaskState::Unprepared, err_msg: gst::error_msg!( gst::ResourceError::NotFound, - ["Unable to send {:?}: no state machine", trigger] + ["Unable to send {trigger:?}: no state machine"] ), } }) @@ -637,7 +644,11 @@ impl Task { match origin { TaskState::Unprepared => (), TaskState::Prepared | TaskState::Preparing => { - gst::debug!(RUNTIME_CAT, "Task already {:?}", origin); + gst::debug!( + RUNTIME_CAT, + obj = task_impl.obj(), + "Task already {origin:?}", + ); return TransitionOk::Skipped { trigger: Trigger::Prepare, state: origin, @@ -645,13 +656,17 @@ impl Task { .into(); } state => { - gst::warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state); + gst::warning!( + RUNTIME_CAT, + obj = task_impl.obj(), + "Attempt to prepare Task in state {state:?}" + ); return TransitionError { trigger: Trigger::Prepare, state: inner.state, err_msg: gst::error_msg!( gst::CoreError::StateChange, - ["Attempt to prepare Task in state {:?}", state] + ["Attempt to prepare Task in state {state:?}"] ), } .into(); @@ -662,7 +677,11 @@ impl Task { inner.state = TaskState::Preparing; - gst::log!(RUNTIME_CAT, "Spawning task state machine"); + gst::log!( + RUNTIME_CAT, + obj = task_impl.obj(), + "Spawning task state machine" + ); inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context)); let ack_rx = match inner.trigger(Trigger::Prepare) { @@ -703,17 +722,13 @@ impl Task { } }, state => { - gst::warning!( - RUNTIME_CAT, - "Attempt to unprepare Task in state {:?}", - state - ); + gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}"); return TransitionError { trigger: Trigger::Unprepare, state: inner.state, err_msg: gst::error_msg!( gst::CoreError::StateChange, - ["Attempt to unprepare Task in state {:?}", state] + ["Attempt to unprepare Task in state {state:?}"] ), } .into(); @@ -827,9 +842,9 @@ macro_rules! exec_action { // Convert triggering event according to the error handler's decision gst::trace!( RUNTIME_CAT, - "TaskImpl transition action error: converting {:?} to {:?}", + obj = $self.task_impl.obj(), + "TaskImpl transition action error: converting {:?} to {next_trigger:?}", $triggering_evt.trigger, - next_trigger, ); $triggering_evt.trigger = next_trigger; @@ -870,7 +885,7 @@ impl StateMachine { .expect("triggering_evt_rx dropped"); if let Trigger::Prepare = triggering_evt.trigger { - gst::trace!(RUNTIME_CAT, "Preparing task"); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task"); let res = exec_action!( self, @@ -889,7 +904,7 @@ impl StateMachine { task_inner.state = TaskState::Prepared; triggering_evt.send_ack(res); - gst::trace!(RUNTIME_CAT, "Task Prepared"); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared"); } } else { panic!("Unexpected initial trigger {:?}", triggering_evt.trigger); @@ -904,13 +919,17 @@ impl StateMachine { .await .expect("triggering_evt_rx dropped"), }; - gst::trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "State machine popped {triggering_evt:?}" + ); match triggering_evt.trigger { Trigger::Error => { let mut task_inner = task_inner.lock().unwrap(); task_inner.switch_to_err(triggering_evt); - gst::trace!(RUNTIME_CAT, "Switched to Error"); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error"); } Trigger::Start => { let origin = { @@ -922,6 +941,7 @@ impl StateMachine { task_inner.switch_to_state(TaskState::Flushing, triggering_evt); gst::trace!( RUNTIME_CAT, + obj = self.task_impl.obj(), "Switched from PausedFlushing to Flushing" ); continue; @@ -932,7 +952,11 @@ impl StateMachine { } state => { task_inner.skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped Start in state {:?}", state); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Skipped Start in state {state:?}" + ); continue; } } @@ -958,7 +982,11 @@ impl StateMachine { } state => { task_inner.skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Skipped Pause in state {state:?}" + ); continue; } } @@ -970,7 +998,11 @@ impl StateMachine { .lock() .unwrap() .switch_to_state(target, triggering_evt); - gst::trace!(RUNTIME_CAT, "Task loop {:?}", target); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Task loop {target:?}" + ); } } Trigger::Stop => { @@ -988,7 +1020,11 @@ impl StateMachine { } state => { task_inner.skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped Stop in state {:?}", state); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Skipped Stop in state {state:?}" + ); continue; } } @@ -1000,7 +1036,7 @@ impl StateMachine { .lock() .unwrap() .switch_to_state(TaskState::Stopped, triggering_evt); - gst::trace!(RUNTIME_CAT, "Task loop Stopped"); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop Stopped"); } } Trigger::FlushStart => { @@ -1016,7 +1052,11 @@ impl StateMachine { } state => { task_inner.skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped FlushStart in state {:?}", state); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Skipped FlushStart in state {state:?}" + ); continue; } } @@ -1028,7 +1068,7 @@ impl StateMachine { .lock() .unwrap() .switch_to_state(target, triggering_evt); - gst::trace!(RUNTIME_CAT, "Task {:?}", target); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}"); } } Trigger::FlushStop => { @@ -1045,7 +1085,11 @@ impl StateMachine { .lock() .unwrap() .skip_triggering_evt(triggering_evt); - gst::trace!(RUNTIME_CAT, "Skipped FlushStop in state {:?}", state); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Skipped FlushStop in state {state:?}" + ); continue; } }; @@ -1057,7 +1101,11 @@ impl StateMachine { .lock() .unwrap() .switch_to_state(TaskState::Paused, triggering_evt); - gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused"); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Switched from PausedFlushing to Paused" + ); } else { self.start(triggering_evt, origin, &task_inner).await; // next/pending triggering event handled in next iteration @@ -1079,7 +1127,11 @@ impl StateMachine { } } - gst::trace!(RUNTIME_CAT, "Task state machine terminated"); + gst::trace!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "Task state machine terminated" + ); } async fn start( @@ -1110,11 +1162,11 @@ impl StateMachine { } async fn run_loop(&mut self) -> Result<(), gst::FlowError> { - gst::trace!(RUNTIME_CAT, "Task loop started"); + gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started"); - let mut item; + let mut try_next_res; loop { - item = { + try_next_res = { // select_biased requires the selected futures to implement // `FusedFuture`. Because async trait functions are not stable, // we use `BoxFuture` for the `TaskImpl` function, including @@ -1133,14 +1185,24 @@ impl StateMachine { self.pending_triggering_evt = Some(triggering_evt); return Ok(()); } - try_next_res = try_next_fut => try_next_res.inspect_err(|&err| { - gst::debug!(RUNTIME_CAT, "TaskImpl::try_next returned {:?}", err); - })?, + try_next_res = try_next_fut => try_next_res, } }; + let item = try_next_res.inspect_err(|err| { + gst::debug!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "TaskImpl::try_next returned {err:?}" + ); + })?; + self.task_impl.handle_item(item).await.inspect_err(|&err| { - gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err); + gst::debug!( + RUNTIME_CAT, + obj = self.task_impl.obj(), + "TaskImpl::handle_item returned {err:?}" + ); })?; } } @@ -1151,6 +1213,8 @@ mod tests { use futures::channel::{mpsc, oneshot}; use futures::executor::block_on; use futures::prelude::*; + use gst::glib; + use gst::glib::prelude::*; use std::future::pending; use std::time::Duration; @@ -1175,6 +1239,7 @@ mod tests { gst::init().unwrap(); struct TaskTest { + obj: gst::Object, prepared_sender: mpsc::Sender<()>, started_sender: mpsc::Sender<()>, try_next_ready_sender: mpsc::Sender<()>, @@ -1189,6 +1254,10 @@ mod tests { impl TaskImpl for TaskTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!(RUNTIME_CAT, "nominal: prepared"); self.prepared_sender.send(()).await.unwrap(); @@ -1257,6 +1326,10 @@ mod tests { let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1); let prepare_status = task.prepare( TaskTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::nominal") + .build() + .into(), prepared_sender, started_sender, try_next_ready_sender, @@ -1455,12 +1528,17 @@ mod tests { gst::init().unwrap(); struct TaskPrepareTest { + obj: gst::Object, prepare_error_sender: mpsc::Sender<()>, } impl TaskImpl for TaskPrepareTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error"); Err(gst::error_msg!( @@ -1507,6 +1585,10 @@ mod tests { let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); let prepare_status = task.prepare( TaskPrepareTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::prepare_error") + .build() + .into(), prepare_error_sender, }, context, @@ -1551,12 +1633,17 @@ mod tests { gst::init().unwrap(); struct TaskPrepareTest { + obj: gst::Object, prepare_receiver: mpsc::Receiver<()>, } impl TaskImpl for TaskPrepareTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!( RUNTIME_CAT, @@ -1595,7 +1682,16 @@ mod tests { let task = Task::default(); let (mut prepare_sender, prepare_receiver) = mpsc::channel(1); - let fut = task.prepare(TaskPrepareTest { prepare_receiver }, context); + let fut = task.prepare( + TaskPrepareTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::prepare_start_ok") + .build() + .into(), + prepare_receiver, + }, + context, + ); drop(fut); let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); @@ -1675,6 +1771,7 @@ mod tests { gst::init().unwrap(); struct TaskPrepareTest { + obj: gst::Object, prepare_receiver: mpsc::Receiver<()>, prepare_error_sender: mpsc::Sender<()>, } @@ -1682,6 +1779,10 @@ mod tests { impl TaskImpl for TaskPrepareTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!( RUNTIME_CAT, @@ -1737,6 +1838,10 @@ mod tests { let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); let prepare_status = task.prepare( TaskPrepareTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::prepare_start_error") + .build() + .into(), prepare_receiver, prepare_error_sender, }, @@ -1811,12 +1916,17 @@ mod tests { gst::init().unwrap(); struct TaskTest { + obj: gst::Object, try_next_receiver: mpsc::Receiver, } impl TaskImpl for TaskTest { type Item = gst::FlowError; + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result { gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next"); Ok(self.try_next_receiver.next().await.unwrap()) @@ -1832,9 +1942,18 @@ mod tests { let task = Task::default(); gst::debug!(RUNTIME_CAT, "item_error: prepare and start"); let (mut try_next_sender, try_next_receiver) = mpsc::channel(1); - task.prepare(TaskTest { try_next_receiver }, context) - .block_on() - .unwrap(); + task.prepare( + TaskTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::item_error") + .build() + .into(), + try_next_receiver, + }, + context, + ) + .block_on() + .unwrap(); task.start().block_on().unwrap(); gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos"); @@ -1884,6 +2003,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, } @@ -1891,6 +2011,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { pending().await } @@ -1920,6 +2044,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::flush_regular_sync") + .build() + .into(), flush_start_sender, flush_stop_sender, }, @@ -1965,6 +2093,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, } @@ -1972,6 +2101,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { pending().await } @@ -2008,6 +2141,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::flush_regular_different_context") + .build() + .into(), flush_start_sender, flush_stop_sender, }, @@ -2078,6 +2215,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, } @@ -2085,6 +2223,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { pending().await } @@ -2115,6 +2257,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::flush_regular_same_context") + .build() + .into(), flush_start_sender, flush_stop_sender, }, @@ -2176,6 +2322,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, task: Task, flush_start_sender: mpsc::Sender<()>, } @@ -2183,6 +2330,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { Ok(()) } @@ -2214,6 +2365,10 @@ mod tests { let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::flush_from_loop") + .build() + .into(), task: task.clone(), flush_start_sender, }, @@ -2247,6 +2402,7 @@ mod tests { gst::init().unwrap(); struct TaskStartTest { + obj: gst::Object, task: Task, pause_sender: mpsc::Sender<()>, } @@ -2254,6 +2410,10 @@ mod tests { impl TaskImpl for TaskStartTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { Ok(()) } @@ -2290,6 +2450,10 @@ mod tests { let (pause_sender, mut pause_receiver) = mpsc::channel(1); let fut = task.prepare( TaskStartTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::pause_from_loop") + .build() + .into(), task: task.clone(), pause_sender, }, @@ -2312,6 +2476,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, task: Task, flush_stop_sender: mpsc::Sender<()>, } @@ -2319,6 +2484,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { pending().await } @@ -2358,6 +2527,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::trigger_from_action") + .build() + .into(), task: task.clone(), flush_stop_sender, }, @@ -2383,6 +2556,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, started_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, @@ -2391,6 +2565,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!(RUNTIME_CAT, "pause_flush_start: started"); self.started_sender.send(()).await.unwrap(); @@ -2427,6 +2605,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::pause_flush_start") + .build() + .into(), started_sender, flush_start_sender, flush_stop_sender, @@ -2490,6 +2672,7 @@ mod tests { gst::init().unwrap(); struct TaskFlushTest { + obj: gst::Object, started_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, @@ -2498,6 +2681,10 @@ mod tests { impl TaskImpl for TaskFlushTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { gst::debug!(RUNTIME_CAT, "pause_flushing_start: started"); self.started_sender.send(()).await.unwrap(); @@ -2534,6 +2721,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskFlushTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::pause_flushing_start") + .build() + .into(), started_sender, flush_start_sender, flush_stop_sender, @@ -2588,6 +2779,7 @@ mod tests { gst::init().unwrap(); struct TaskStartTest { + obj: gst::Object, flush_start_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>, } @@ -2595,6 +2787,10 @@ mod tests { impl TaskImpl for TaskStartTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn try_next(&mut self) -> Result<(), gst::FlowError> { pending().await } @@ -2624,6 +2820,10 @@ mod tests { let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let fut = task.prepare( TaskStartTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::flush_concurrent_start") + .build() + .into(), flush_start_sender, flush_stop_sender, }, @@ -2705,6 +2905,7 @@ mod tests { gst::init().unwrap(); struct TaskTimerTest { + obj: gst::Object, timer: Option, timer_elapsed_sender: Option>, } @@ -2712,6 +2913,10 @@ mod tests { impl TaskImpl for TaskTimerTest { type Item = (); + fn obj(&self) -> &impl IsA { + &self.obj + } + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50))); gst::debug!(RUNTIME_CAT, "start_timer: started"); @@ -2741,6 +2946,10 @@ mod tests { let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel(); let fut = task.prepare( TaskTimerTest { + obj: gst::Pad::builder(gst::PadDirection::Unknown) + .name("runtime::Task::start_timer") + .build() + .into(), timer: None, timer_elapsed_sender: Some(timer_elapsed_sender), }, diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index dd837eea0..906bbeea7 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -272,6 +272,10 @@ impl TcpClientSrcTask { impl TaskImpl for TcpClientSrcTask { type Item = gst::Buffer; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { gst::log!( CAT, diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 0cb3e5918..a51c1d892 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -217,6 +217,10 @@ impl UdpSrcTask { impl TaskImpl for UdpSrcTask { type Item = gst::Buffer; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { let udpsrc = self.element.imp(); let mut settings = udpsrc.settings.lock().unwrap(); diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 95d327f41..51d53490c 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -144,6 +144,10 @@ mod imp_src { impl TaskImpl for ElementSrcTestTask { type Item = Item; + fn obj(&self) -> &impl IsA { + &self.element + } + async fn try_next(&mut self) -> Result { self.receiver.next().await.ok_or_else(|| { gst::log!(SRC_CAT, obj = self.element, "SrcPad channel aborted");