ts/rt/Task: use light weight executor blocking on ack or join handle

Previous version used the Context::block_on_or_add_sub_task which
spawns a full-fledged executor with timer and io Reactor for no
reason when we just need to wait for a Receiver or JoinHandle.
This commit is contained in:
François Laignel 2022-03-22 17:18:18 +01:00 committed by Sebastian Dröge
parent c1615d01e6
commit 8eb8ea0e7d

View file

@ -31,7 +31,7 @@ use std::ops::Deref;
use std::stringify; use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use super::executor::{block_on_or_add_sub_task, TaskId}; use super::executor::TaskId;
use super::{Context, RUNTIME_CAT}; use super::{Context, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
@ -524,25 +524,35 @@ impl Task {
match state_machine_handle { match state_machine_handle {
Some(state_machine_handle) => { Some(state_machine_handle) => {
gst::log!( let state_machine_end_fut = async {
RUNTIME_CAT,
"Synchronously waiting for the state machine {:?}",
state_machine_handle,
);
let join_fut = block_on_or_add_sub_task(async {
state_machine_handle.await; state_machine_handle.await;
drop(triggering_evt_tx); drop(triggering_evt_tx);
drop(context); drop(context);
gst::debug!(RUNTIME_CAT, "Task unprepared"); gst::debug!(RUNTIME_CAT, "Task unprepared");
}); };
if let Some((cur_context, cur_task_id)) = Context::current_task() {
gst::log!(
RUNTIME_CAT,
"Will wait for state machine termination completion in subtask to task {:?} on context {}",
cur_task_id,
cur_context.name()
);
let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(())));
if join_fut.is_none() {
return Ok(TransitionStatus::Async { return Ok(TransitionStatus::Async {
trigger: Trigger::Unprepare, trigger: Trigger::Unprepare,
origin, origin,
}); });
} else {
gst::log!(
RUNTIME_CAT,
"Waiting for state machine termination on current thread"
);
// Use a light-weight executor, no timer nor async io.
futures::executor::block_on(state_machine_end_fut)
} }
} }
None => { None => {
@ -652,7 +662,7 @@ impl Task {
drop(inner); drop(inner);
block_on_or_add_sub_task(async move { let ack_await_fut = async move {
gst::trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger); gst::trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger);
let res = ack_rx.await.unwrap(); let res = ack_rx.await.unwrap();
@ -663,11 +673,28 @@ impl Task {
} }
res res
}) };
.unwrap_or({
// Future was spawned as a subtask if let Some((cur_context, cur_task_id)) = Context::current_task() {
gst::log!(
RUNTIME_CAT,
"Will await ack for {:?} in subtask to task {:?} on context {}",
trigger,
cur_task_id,
cur_context.name()
);
let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(())));
Ok(TransitionStatus::Async { trigger, origin }) Ok(TransitionStatus::Async { trigger, origin })
}) } else {
gst::log!(
RUNTIME_CAT,
"Awaiting ack for {:?} on current thread",
trigger
);
// Use a light-weight executor, no timer nor async io.
futures::executor::block_on(ack_await_fut)
}
} }
fn spawn_state_machine( fn spawn_state_machine(