ts/Task: don't drain sub tasks after state transition and iteration

Subtasks are used when current async processing needs to execute
a `Future` via a sync function (eg. a call to a C function).
In this case `Context::block_on` would block the whole `Context`,
leading to a deadlock.

The main use case for this is the `Pad{Src,Sink}` functions:
when we `PadSrc::push` and the peer pad is a `PadSink`, we want
`PadSrc::push` to complete after the async function on the
`PadSink` completes. In this case the `PadSink` async function
is added as a subtask of current scheduler task and
`PadSrc::push` only returns when the subtask is executed.

In `runtime::Task` (`Task` here is the execution Task with a
state machine, not a scheduler task), we used to spawn state
transition actions and iteration loop (leading to a new
scheduler Task). At the time, it seemed convenient for the user
to automatically drain sub tasks after a state transition action
or an iteration. User wouldn't have to worry about this, similarly
to the `Pad{Src,Sink}` case.

In current implementation, the `Task` state machine now operates
directly on the target `Context`. State transtions actions and
the iteration loop are no longer spawned. It seems now useless to
abstract the subtasks draining from the user. Either they
transitively use a mechanism such as `Pad{Src,Sink}` which already
handles this automatically, or they add substasks on purpose, in
which case they know better when subtasks must be drained.
This commit is contained in:
François Laignel 2022-08-19 14:50:40 +02:00 committed by Sebastian Dröge
parent af12bce141
commit d39aabe054

View file

@ -28,7 +28,6 @@ use futures::prelude::*;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
@ -376,8 +375,7 @@ pub trait TaskImpl: Send + 'static {
/// Handles an error occuring during the execution of the `Task` loop.
///
/// This include errors returned by [`Self::try_next`], [`Self::handle_item`]
/// as well as errors returned by subtasks drain at the end of an iteration.
/// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
///
/// If the error is unrecoverable, implementations might use
/// `gst::Element::post_error_message` and return `Trigger::Error`.
@ -817,27 +815,10 @@ struct StateMachine<Item: Send + 'static> {
pending_triggering_evt: Option<TriggeringEvent>,
}
// Make sure the Context doesn't throttle otherwise we end up with long delays executing
// transition actions in a pipeline with many elements. This is because pipeline serializes
// the transition actions and the Context's scheduler gets a chance to reach its throttling
// state between 2 elements.
macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await {
Ok(()) => {
gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
if let Err(err) = Context::drain_sub_tasks().await {
gst::log!(
RUNTIME_CAT,
"{} subtask returned {:?}",
stringify!($action),
err
);
}
Ok($triggering_evt)
}
Ok(()) => Ok($triggering_evt),
Err(err) => {
// FIXME problem is that we loose the origin trigger in the
// final TransitionStatus.
@ -1093,11 +1074,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
// Unprepare is not joined by an ack_rx but by joining the state machine handle
self.task_impl.unprepare().await;
gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
if let Err(err) = Context::drain_sub_tasks().await {
gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
}
task_inner
.lock()
.unwrap()
@ -1174,11 +1150,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err);
err
})?;
gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
if let Err(err) = Context::drain_sub_tasks().await {
gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
}
}
}
}
@ -1187,10 +1158,19 @@ impl<Item: Send + 'static> StateMachine<Item> {
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::prelude::*;
use std::time::Duration;
use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *};
use crate::runtime::Context;
use super::{
Task, TaskImpl,
TaskState::{self, *},
TransitionError,
TransitionOk::*,
TransitionStatus::*,
Trigger::{self, *},
};
use crate::runtime::{Context, RUNTIME_CAT};
#[track_caller]
fn stop_then_unprepare(task: Task) {
@ -1534,7 +1514,7 @@ mod tests {
err
);
match (trigger, state) {
(Trigger::Prepare, TaskState::Unprepared) => {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
@ -1772,7 +1752,7 @@ mod tests {
err
);
match (trigger, state) {
(Trigger::Prepare, TaskState::Unprepared) => {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => panic!("action error for {:?}", other),
@ -2789,11 +2769,11 @@ mod tests {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
match block_on(task.start()) {
Ok(TransitionOk::Complete {
Ok(Complete {
origin: Paused,
target: Started,
}) => (),
Ok(TransitionOk::Complete {
Ok(Complete {
origin: PausedFlushing,
target: Flushing,
}) => (),