threadshare: have Task log its obj

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2476>
This commit is contained in:
François Laignel 2025-08-19 13:38:51 +02:00
parent a005cf8e05
commit 243a8a93ea
13 changed files with 309 additions and 52 deletions

View file

@ -127,6 +127,10 @@ impl TaskSinkTask {
impl TaskImpl for TaskSinkTask { impl TaskImpl for TaskSinkTask {
type Item = StreamItem; type Item = StreamItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.elem
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Preparing Task"); log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Preparing Task");
Ok(()) Ok(())

View file

@ -97,6 +97,10 @@ impl SrcTask {
impl TaskImpl for SrcTask { impl TaskImpl for SrcTask {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.elem
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
let imp = self.elem.imp(); let imp = self.elem.imp();
let settings = imp.settings.lock().unwrap(); let settings = imp.settings.lock().unwrap();

View file

@ -225,6 +225,10 @@ impl AppSrcTask {
impl TaskImpl for AppSrcTask { impl TaskImpl for AppSrcTask {
type Item = StreamItem; type Item = StreamItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn try_next(&mut self) -> Result<StreamItem, gst::FlowError> { async fn try_next(&mut self) -> Result<StreamItem, gst::FlowError> {
self.receiver self.receiver
.next() .next()

View file

@ -296,6 +296,10 @@ impl AudioTestSrcTask {
impl TaskImpl for AudioTestSrcTask { impl TaskImpl for AudioTestSrcTask {
type Item = gst::Buffer; type Item = gst::Buffer;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.elem
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Preparing Task"); gst::log!(CAT, obj = self.elem, "Preparing Task");

View file

@ -436,6 +436,10 @@ impl InterSrcTask {
impl TaskImpl for InterSrcTask { impl TaskImpl for InterSrcTask {
type Item = DataQueueItem; type Item = DataQueueItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.elem
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Starting task"); gst::log!(CAT, obj = self.elem, "Starting task");
self.dataqueue.start(); self.dataqueue.start();

View file

@ -1098,6 +1098,10 @@ impl JitterBufferTask {
impl TaskImpl for JitterBufferTask { impl TaskImpl for JitterBufferTask {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task"); gst::log!(CAT, obj = self.element, "Starting task");

View file

@ -793,6 +793,10 @@ impl ProxySrcTask {
impl TaskImpl for ProxySrcTask { impl TaskImpl for ProxySrcTask {
type Item = DataQueueItem; type Item = DataQueueItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Starting task"); gst::log!(SRC_CAT, obj = self.element, "Starting task");

View file

@ -263,6 +263,10 @@ impl QueueTask {
impl TaskImpl for QueueTask { impl TaskImpl for QueueTask {
type Item = DataQueueItem; type Item = DataQueueItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task"); gst::log!(CAT, obj = self.element, "Starting task");

View file

@ -410,6 +410,10 @@ enum TaskItem {
impl TaskImpl for RTPDTMFSrcTask { impl TaskImpl for RTPDTMFSrcTask {
type Item = TaskItem; type Item = TaskItem;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.elem
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Starting Task"); gst::log!(CAT, obj = self.elem, "Starting Task");

View file

@ -30,6 +30,9 @@ use std::pin::{pin, Pin};
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll; use std::task::Poll;
use gst::glib;
use gst::glib::prelude::*;
use super::{Context, JoinHandle, RUNTIME_CAT}; use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
@ -318,6 +321,8 @@ impl fmt::Debug for TransitionStatus {
pub trait TaskImpl: Send + 'static { pub trait TaskImpl: Send + 'static {
type Item: Send + 'static; type Item: Send + 'static;
fn obj(&self) -> &impl IsA<glib::Object>;
fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send { fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ok(()) future::ok(())
} }
@ -395,16 +400,25 @@ pub trait TaskImpl: Send + 'static {
gst::FlowError::Flushing => { gst::FlowError::Flushing => {
gst::debug!( gst::debug!(
RUNTIME_CAT, RUNTIME_CAT,
obj = self.obj(),
"Task loop returned Flushing. Posting FlushStart" "Task loop returned Flushing. Posting FlushStart"
); );
Trigger::FlushStart Trigger::FlushStart
} }
gst::FlowError::Eos => { gst::FlowError::Eos => {
gst::debug!(RUNTIME_CAT, "Task loop returned Eos. Posting Stop"); gst::debug!(
RUNTIME_CAT,
obj = self.obj(),
"Task loop returned Eos. Posting Stop"
);
Trigger::Stop Trigger::Stop
} }
other => { other => {
gst::error!(RUNTIME_CAT, "Task loop returned {:?}. Posting Error", other); gst::error!(
RUNTIME_CAT,
obj = self.obj(),
"Task loop returned {other:?}. Posting Error",
);
Trigger::Error Trigger::Error
} }
} }
@ -430,10 +444,8 @@ pub trait TaskImpl: Send + 'static {
async move { async move {
gst::error!( gst::error!(
RUNTIME_CAT, RUNTIME_CAT,
"TaskImpl transition action error during {:?} from {:?}: {:?}. Posting Trigger::Error", obj = self.obj(),
trigger, "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
state,
err,
); );
Trigger::Error Trigger::Error
@ -497,7 +509,7 @@ impl StateMachineHandle {
fn trigger(&mut self, trigger: Trigger) -> AckReceiver { fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger); let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt); gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
self.triggering_evt_tx.try_send(triggering_evt).unwrap(); self.triggering_evt_tx.try_send(triggering_evt).unwrap();
self.context.unpark(); self.context.unpark();
@ -545,8 +557,7 @@ impl TaskInner {
err_msg: gst::error_msg!( err_msg: gst::error_msg!(
gst::CoreError::StateChange, gst::CoreError::StateChange,
[ [
"Unrecoverable error for {:?} from state {:?}", "Unrecoverable error for {triggering_evt:?} from state {:?}",
triggering_evt,
self.state, self.state,
] ]
), ),
@ -570,17 +581,13 @@ impl TaskInner {
.as_mut() .as_mut()
.map(|state_machine| state_machine.trigger(trigger)) .map(|state_machine| state_machine.trigger(trigger))
.ok_or_else(|| { .ok_or_else(|| {
gst::warning!( gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
RUNTIME_CAT,
"Unable to send {:?}: no state machine",
trigger
);
TransitionError { TransitionError {
trigger, trigger,
state: TaskState::Unprepared, state: TaskState::Unprepared,
err_msg: gst::error_msg!( err_msg: gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Unable to send {:?}: no state machine", trigger] ["Unable to send {trigger:?}: no state machine"]
), ),
} }
}) })
@ -637,7 +644,11 @@ impl Task {
match origin { match origin {
TaskState::Unprepared => (), TaskState::Unprepared => (),
TaskState::Prepared | TaskState::Preparing => { TaskState::Prepared | TaskState::Preparing => {
gst::debug!(RUNTIME_CAT, "Task already {:?}", origin); gst::debug!(
RUNTIME_CAT,
obj = task_impl.obj(),
"Task already {origin:?}",
);
return TransitionOk::Skipped { return TransitionOk::Skipped {
trigger: Trigger::Prepare, trigger: Trigger::Prepare,
state: origin, state: origin,
@ -645,13 +656,17 @@ impl Task {
.into(); .into();
} }
state => { state => {
gst::warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state); gst::warning!(
RUNTIME_CAT,
obj = task_impl.obj(),
"Attempt to prepare Task in state {state:?}"
);
return TransitionError { return TransitionError {
trigger: Trigger::Prepare, trigger: Trigger::Prepare,
state: inner.state, state: inner.state,
err_msg: gst::error_msg!( err_msg: gst::error_msg!(
gst::CoreError::StateChange, gst::CoreError::StateChange,
["Attempt to prepare Task in state {:?}", state] ["Attempt to prepare Task in state {state:?}"]
), ),
} }
.into(); .into();
@ -662,7 +677,11 @@ impl Task {
inner.state = TaskState::Preparing; inner.state = TaskState::Preparing;
gst::log!(RUNTIME_CAT, "Spawning task state machine"); gst::log!(
RUNTIME_CAT,
obj = task_impl.obj(),
"Spawning task state machine"
);
inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context)); inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
let ack_rx = match inner.trigger(Trigger::Prepare) { let ack_rx = match inner.trigger(Trigger::Prepare) {
@ -703,17 +722,13 @@ impl Task {
} }
}, },
state => { state => {
gst::warning!( gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
RUNTIME_CAT,
"Attempt to unprepare Task in state {:?}",
state
);
return TransitionError { return TransitionError {
trigger: Trigger::Unprepare, trigger: Trigger::Unprepare,
state: inner.state, state: inner.state,
err_msg: gst::error_msg!( err_msg: gst::error_msg!(
gst::CoreError::StateChange, gst::CoreError::StateChange,
["Attempt to unprepare Task in state {:?}", state] ["Attempt to unprepare Task in state {state:?}"]
), ),
} }
.into(); .into();
@ -827,9 +842,9 @@ macro_rules! exec_action {
// Convert triggering event according to the error handler's decision // Convert triggering event according to the error handler's decision
gst::trace!( gst::trace!(
RUNTIME_CAT, RUNTIME_CAT,
"TaskImpl transition action error: converting {:?} to {:?}", obj = $self.task_impl.obj(),
"TaskImpl transition action error: converting {:?} to {next_trigger:?}",
$triggering_evt.trigger, $triggering_evt.trigger,
next_trigger,
); );
$triggering_evt.trigger = next_trigger; $triggering_evt.trigger = next_trigger;
@ -870,7 +885,7 @@ impl<Task: TaskImpl> StateMachine<Task> {
.expect("triggering_evt_rx dropped"); .expect("triggering_evt_rx dropped");
if let Trigger::Prepare = triggering_evt.trigger { if let Trigger::Prepare = triggering_evt.trigger {
gst::trace!(RUNTIME_CAT, "Preparing task"); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
let res = exec_action!( let res = exec_action!(
self, self,
@ -889,7 +904,7 @@ impl<Task: TaskImpl> StateMachine<Task> {
task_inner.state = TaskState::Prepared; task_inner.state = TaskState::Prepared;
triggering_evt.send_ack(res); triggering_evt.send_ack(res);
gst::trace!(RUNTIME_CAT, "Task Prepared"); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
} }
} else { } else {
panic!("Unexpected initial trigger {:?}", triggering_evt.trigger); panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
@ -904,13 +919,17 @@ impl<Task: TaskImpl> StateMachine<Task> {
.await .await
.expect("triggering_evt_rx dropped"), .expect("triggering_evt_rx dropped"),
}; };
gst::trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"State machine popped {triggering_evt:?}"
);
match triggering_evt.trigger { match triggering_evt.trigger {
Trigger::Error => { Trigger::Error => {
let mut task_inner = task_inner.lock().unwrap(); let mut task_inner = task_inner.lock().unwrap();
task_inner.switch_to_err(triggering_evt); task_inner.switch_to_err(triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched to Error"); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
} }
Trigger::Start => { Trigger::Start => {
let origin = { let origin = {
@ -922,6 +941,7 @@ impl<Task: TaskImpl> StateMachine<Task> {
task_inner.switch_to_state(TaskState::Flushing, triggering_evt); task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
gst::trace!( gst::trace!(
RUNTIME_CAT, RUNTIME_CAT,
obj = self.task_impl.obj(),
"Switched from PausedFlushing to Flushing" "Switched from PausedFlushing to Flushing"
); );
continue; continue;
@ -932,7 +952,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
state => { state => {
task_inner.skip_triggering_evt(triggering_evt); task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Start in state {:?}", state); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Skipped Start in state {state:?}"
);
continue; continue;
} }
} }
@ -958,7 +982,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
state => { state => {
task_inner.skip_triggering_evt(triggering_evt); task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Skipped Pause in state {state:?}"
);
continue; continue;
} }
} }
@ -970,7 +998,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
.lock() .lock()
.unwrap() .unwrap()
.switch_to_state(target, triggering_evt); .switch_to_state(target, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task loop {:?}", target); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Task loop {target:?}"
);
} }
} }
Trigger::Stop => { Trigger::Stop => {
@ -988,7 +1020,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
state => { state => {
task_inner.skip_triggering_evt(triggering_evt); task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Stop in state {:?}", state); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Skipped Stop in state {state:?}"
);
continue; continue;
} }
} }
@ -1000,7 +1036,7 @@ impl<Task: TaskImpl> StateMachine<Task> {
.lock() .lock()
.unwrap() .unwrap()
.switch_to_state(TaskState::Stopped, triggering_evt); .switch_to_state(TaskState::Stopped, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task loop Stopped"); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop Stopped");
} }
} }
Trigger::FlushStart => { Trigger::FlushStart => {
@ -1016,7 +1052,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
state => { state => {
task_inner.skip_triggering_evt(triggering_evt); task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped FlushStart in state {:?}", state); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Skipped FlushStart in state {state:?}"
);
continue; continue;
} }
} }
@ -1028,7 +1068,7 @@ impl<Task: TaskImpl> StateMachine<Task> {
.lock() .lock()
.unwrap() .unwrap()
.switch_to_state(target, triggering_evt); .switch_to_state(target, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task {:?}", target); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
} }
} }
Trigger::FlushStop => { Trigger::FlushStop => {
@ -1045,7 +1085,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
.lock() .lock()
.unwrap() .unwrap()
.skip_triggering_evt(triggering_evt); .skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped FlushStop in state {:?}", state); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Skipped FlushStop in state {state:?}"
);
continue; continue;
} }
}; };
@ -1057,7 +1101,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
.lock() .lock()
.unwrap() .unwrap()
.switch_to_state(TaskState::Paused, triggering_evt); .switch_to_state(TaskState::Paused, triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused"); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Switched from PausedFlushing to Paused"
);
} else { } else {
self.start(triggering_evt, origin, &task_inner).await; self.start(triggering_evt, origin, &task_inner).await;
// next/pending triggering event handled in next iteration // next/pending triggering event handled in next iteration
@ -1079,7 +1127,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
} }
gst::trace!(RUNTIME_CAT, "Task state machine terminated"); gst::trace!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"Task state machine terminated"
);
} }
async fn start( async fn start(
@ -1110,11 +1162,11 @@ impl<Task: TaskImpl> StateMachine<Task> {
} }
async fn run_loop(&mut self) -> Result<(), gst::FlowError> { async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
gst::trace!(RUNTIME_CAT, "Task loop started"); gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
let mut item; let mut try_next_res;
loop { loop {
item = { try_next_res = {
// select_biased requires the selected futures to implement // select_biased requires the selected futures to implement
// `FusedFuture`. Because async trait functions are not stable, // `FusedFuture`. Because async trait functions are not stable,
// we use `BoxFuture` for the `TaskImpl` function, including // we use `BoxFuture` for the `TaskImpl` function, including
@ -1133,14 +1185,24 @@ impl<Task: TaskImpl> StateMachine<Task> {
self.pending_triggering_evt = Some(triggering_evt); self.pending_triggering_evt = Some(triggering_evt);
return Ok(()); return Ok(());
} }
try_next_res = try_next_fut => try_next_res.inspect_err(|&err| { try_next_res = try_next_fut => try_next_res,
gst::debug!(RUNTIME_CAT, "TaskImpl::try_next returned {:?}", err);
})?,
} }
}; };
let item = try_next_res.inspect_err(|err| {
gst::debug!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"TaskImpl::try_next returned {err:?}"
);
})?;
self.task_impl.handle_item(item).await.inspect_err(|&err| { self.task_impl.handle_item(item).await.inspect_err(|&err| {
gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err); gst::debug!(
RUNTIME_CAT,
obj = self.task_impl.obj(),
"TaskImpl::handle_item returned {err:?}"
);
})?; })?;
} }
} }
@ -1151,6 +1213,8 @@ mod tests {
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::executor::block_on; use futures::executor::block_on;
use futures::prelude::*; use futures::prelude::*;
use gst::glib;
use gst::glib::prelude::*;
use std::future::pending; use std::future::pending;
use std::time::Duration; use std::time::Duration;
@ -1175,6 +1239,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskTest { struct TaskTest {
obj: gst::Object,
prepared_sender: mpsc::Sender<()>, prepared_sender: mpsc::Sender<()>,
started_sender: mpsc::Sender<()>, started_sender: mpsc::Sender<()>,
try_next_ready_sender: mpsc::Sender<()>, try_next_ready_sender: mpsc::Sender<()>,
@ -1189,6 +1254,10 @@ mod tests {
impl TaskImpl for TaskTest { impl TaskImpl for TaskTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "nominal: prepared"); gst::debug!(RUNTIME_CAT, "nominal: prepared");
self.prepared_sender.send(()).await.unwrap(); self.prepared_sender.send(()).await.unwrap();
@ -1257,6 +1326,10 @@ mod tests {
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1); let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
let prepare_status = task.prepare( let prepare_status = task.prepare(
TaskTest { TaskTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::nominal")
.build()
.into(),
prepared_sender, prepared_sender,
started_sender, started_sender,
try_next_ready_sender, try_next_ready_sender,
@ -1455,12 +1528,17 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskPrepareTest { struct TaskPrepareTest {
obj: gst::Object,
prepare_error_sender: mpsc::Sender<()>, prepare_error_sender: mpsc::Sender<()>,
} }
impl TaskImpl for TaskPrepareTest { impl TaskImpl for TaskPrepareTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error"); gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
Err(gst::error_msg!( Err(gst::error_msg!(
@ -1507,6 +1585,10 @@ mod tests {
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
let prepare_status = task.prepare( let prepare_status = task.prepare(
TaskPrepareTest { TaskPrepareTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::prepare_error")
.build()
.into(),
prepare_error_sender, prepare_error_sender,
}, },
context, context,
@ -1551,12 +1633,17 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskPrepareTest { struct TaskPrepareTest {
obj: gst::Object,
prepare_receiver: mpsc::Receiver<()>, prepare_receiver: mpsc::Receiver<()>,
} }
impl TaskImpl for TaskPrepareTest { impl TaskImpl for TaskPrepareTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!( gst::debug!(
RUNTIME_CAT, RUNTIME_CAT,
@ -1595,7 +1682,16 @@ mod tests {
let task = Task::default(); let task = Task::default();
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1); let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
let fut = task.prepare(TaskPrepareTest { prepare_receiver }, context); let fut = task.prepare(
TaskPrepareTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::prepare_start_ok")
.build()
.into(),
prepare_receiver,
},
context,
);
drop(fut); drop(fut);
let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
@ -1675,6 +1771,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskPrepareTest { struct TaskPrepareTest {
obj: gst::Object,
prepare_receiver: mpsc::Receiver<()>, prepare_receiver: mpsc::Receiver<()>,
prepare_error_sender: mpsc::Sender<()>, prepare_error_sender: mpsc::Sender<()>,
} }
@ -1682,6 +1779,10 @@ mod tests {
impl TaskImpl for TaskPrepareTest { impl TaskImpl for TaskPrepareTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!( gst::debug!(
RUNTIME_CAT, RUNTIME_CAT,
@ -1737,6 +1838,10 @@ mod tests {
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1); let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
let prepare_status = task.prepare( let prepare_status = task.prepare(
TaskPrepareTest { TaskPrepareTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::prepare_start_error")
.build()
.into(),
prepare_receiver, prepare_receiver,
prepare_error_sender, prepare_error_sender,
}, },
@ -1811,12 +1916,17 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskTest { struct TaskTest {
obj: gst::Object,
try_next_receiver: mpsc::Receiver<gst::FlowError>, try_next_receiver: mpsc::Receiver<gst::FlowError>,
} }
impl TaskImpl for TaskTest { impl TaskImpl for TaskTest {
type Item = gst::FlowError; type Item = gst::FlowError;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> { async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next"); gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
Ok(self.try_next_receiver.next().await.unwrap()) Ok(self.try_next_receiver.next().await.unwrap())
@ -1832,9 +1942,18 @@ mod tests {
let task = Task::default(); let task = Task::default();
gst::debug!(RUNTIME_CAT, "item_error: prepare and start"); gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
let (mut try_next_sender, try_next_receiver) = mpsc::channel(1); let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
task.prepare(TaskTest { try_next_receiver }, context) task.prepare(
.block_on() TaskTest {
.unwrap(); obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::item_error")
.build()
.into(),
try_next_receiver,
},
context,
)
.block_on()
.unwrap();
task.start().block_on().unwrap(); task.start().block_on().unwrap();
gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos"); gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
@ -1884,6 +2003,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
} }
@ -1891,6 +2011,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await pending().await
} }
@ -1920,6 +2044,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::flush_regular_sync")
.build()
.into(),
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
}, },
@ -1965,6 +2093,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
} }
@ -1972,6 +2101,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await pending().await
} }
@ -2008,6 +2141,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::flush_regular_different_context")
.build()
.into(),
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
}, },
@ -2078,6 +2215,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
} }
@ -2085,6 +2223,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await pending().await
} }
@ -2115,6 +2257,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::flush_regular_same_context")
.build()
.into(),
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
}, },
@ -2176,6 +2322,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
task: Task, task: Task,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
} }
@ -2183,6 +2330,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
Ok(()) Ok(())
} }
@ -2214,6 +2365,10 @@ mod tests {
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1); let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::flush_from_loop")
.build()
.into(),
task: task.clone(), task: task.clone(),
flush_start_sender, flush_start_sender,
}, },
@ -2247,6 +2402,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskStartTest { struct TaskStartTest {
obj: gst::Object,
task: Task, task: Task,
pause_sender: mpsc::Sender<()>, pause_sender: mpsc::Sender<()>,
} }
@ -2254,6 +2410,10 @@ mod tests {
impl TaskImpl for TaskStartTest { impl TaskImpl for TaskStartTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
Ok(()) Ok(())
} }
@ -2290,6 +2450,10 @@ mod tests {
let (pause_sender, mut pause_receiver) = mpsc::channel(1); let (pause_sender, mut pause_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskStartTest { TaskStartTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::pause_from_loop")
.build()
.into(),
task: task.clone(), task: task.clone(),
pause_sender, pause_sender,
}, },
@ -2312,6 +2476,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
task: Task, task: Task,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
} }
@ -2319,6 +2484,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await pending().await
} }
@ -2358,6 +2527,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::trigger_from_action")
.build()
.into(),
task: task.clone(), task: task.clone(),
flush_stop_sender, flush_stop_sender,
}, },
@ -2383,6 +2556,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
started_sender: mpsc::Sender<()>, started_sender: mpsc::Sender<()>,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
@ -2391,6 +2565,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started"); gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
self.started_sender.send(()).await.unwrap(); self.started_sender.send(()).await.unwrap();
@ -2427,6 +2605,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::pause_flush_start")
.build()
.into(),
started_sender, started_sender,
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
@ -2490,6 +2672,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskFlushTest { struct TaskFlushTest {
obj: gst::Object,
started_sender: mpsc::Sender<()>, started_sender: mpsc::Sender<()>,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
@ -2498,6 +2681,10 @@ mod tests {
impl TaskImpl for TaskFlushTest { impl TaskImpl for TaskFlushTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started"); gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
self.started_sender.send(()).await.unwrap(); self.started_sender.send(()).await.unwrap();
@ -2534,6 +2721,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskFlushTest { TaskFlushTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::pause_flushing_start")
.build()
.into(),
started_sender, started_sender,
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
@ -2588,6 +2779,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskStartTest { struct TaskStartTest {
obj: gst::Object,
flush_start_sender: mpsc::Sender<()>, flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>, flush_stop_sender: mpsc::Sender<()>,
} }
@ -2595,6 +2787,10 @@ mod tests {
impl TaskImpl for TaskStartTest { impl TaskImpl for TaskStartTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn try_next(&mut self) -> Result<(), gst::FlowError> { async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await pending().await
} }
@ -2624,6 +2820,10 @@ mod tests {
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1); let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let fut = task.prepare( let fut = task.prepare(
TaskStartTest { TaskStartTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::flush_concurrent_start")
.build()
.into(),
flush_start_sender, flush_start_sender,
flush_stop_sender, flush_stop_sender,
}, },
@ -2705,6 +2905,7 @@ mod tests {
gst::init().unwrap(); gst::init().unwrap();
struct TaskTimerTest { struct TaskTimerTest {
obj: gst::Object,
timer: Option<timer::Oneshot>, timer: Option<timer::Oneshot>,
timer_elapsed_sender: Option<oneshot::Sender<()>>, timer_elapsed_sender: Option<oneshot::Sender<()>>,
} }
@ -2712,6 +2913,10 @@ mod tests {
impl TaskImpl for TaskTimerTest { impl TaskImpl for TaskTimerTest {
type Item = (); type Item = ();
fn obj(&self) -> &impl IsA<glib::Object> {
&self.obj
}
async fn start(&mut self) -> Result<(), gst::ErrorMessage> { async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50))); self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
gst::debug!(RUNTIME_CAT, "start_timer: started"); gst::debug!(RUNTIME_CAT, "start_timer: started");
@ -2741,6 +2946,10 @@ mod tests {
let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel(); let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
let fut = task.prepare( let fut = task.prepare(
TaskTimerTest { TaskTimerTest {
obj: gst::Pad::builder(gst::PadDirection::Unknown)
.name("runtime::Task::start_timer")
.build()
.into(),
timer: None, timer: None,
timer_elapsed_sender: Some(timer_elapsed_sender), timer_elapsed_sender: Some(timer_elapsed_sender),
}, },

View file

@ -272,6 +272,10 @@ impl TcpClientSrcTask {
impl TaskImpl for TcpClientSrcTask { impl TaskImpl for TcpClientSrcTask {
type Item = gst::Buffer; type Item = gst::Buffer;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!( gst::log!(
CAT, CAT,

View file

@ -217,6 +217,10 @@ impl UdpSrcTask {
impl TaskImpl for UdpSrcTask { impl TaskImpl for UdpSrcTask {
type Item = gst::Buffer; type Item = gst::Buffer;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> { async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
let udpsrc = self.element.imp(); let udpsrc = self.element.imp();
let mut settings = udpsrc.settings.lock().unwrap(); let mut settings = udpsrc.settings.lock().unwrap();

View file

@ -144,6 +144,10 @@ mod imp_src {
impl TaskImpl for ElementSrcTestTask { impl TaskImpl for ElementSrcTestTask {
type Item = Item; type Item = Item;
fn obj(&self) -> &impl IsA<glib::Object> {
&self.element
}
async fn try_next(&mut self) -> Result<Item, gst::FlowError> { async fn try_next(&mut self) -> Result<Item, gst::FlowError> {
self.receiver.next().await.ok_or_else(|| { self.receiver.next().await.ok_or_else(|| {
gst::log!(SRC_CAT, obj = self.element, "SrcPad channel aborted"); gst::log!(SRC_CAT, obj = self.element, "SrcPad channel aborted");