ts/rt/Task: use light weight executor blocking on ack or join handle

Previous version used the Context::block_on_or_add_sub_task which
spawns a full-fledged executor with timer and io Reactor for no
reason when we just need to wait for a Receiver or JoinHandle.
This commit is contained in:
François Laignel 2022-03-22 17:18:18 +01:00 committed by Sebastian Dröge
parent a17cdf9903
commit 0b615409ac

View file

@ -33,7 +33,7 @@ use std::ops::Deref;
use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use super::executor::{block_on_or_add_sub_task, TaskId};
use super::executor::TaskId;
use super::{Context, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
@ -526,25 +526,35 @@ impl Task {
match state_machine_handle {
Some(state_machine_handle) => {
gst_log!(
RUNTIME_CAT,
"Synchronously waiting for the state machine {:?}",
state_machine_handle,
);
let join_fut = block_on_or_add_sub_task(async {
let state_machine_end_fut = async {
state_machine_handle.await;
drop(triggering_evt_tx);
drop(context);
gst_debug!(RUNTIME_CAT, "Task unprepared");
});
};
if let Some((cur_context, cur_task_id)) = Context::current_task() {
gst_log!(
RUNTIME_CAT,
"Will wait for state machine termination completion in subtask to task {:?} on context {}",
cur_task_id,
cur_context.name()
);
let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(())));
if join_fut.is_none() {
return Ok(TransitionStatus::Async {
trigger: Trigger::Unprepare,
origin,
});
} else {
gst_log!(
RUNTIME_CAT,
"Waiting for state machine termination on current thread"
);
// Use a light-weight executor, no timer nor async io.
futures::executor::block_on(state_machine_end_fut)
}
}
None => {
@ -654,7 +664,7 @@ impl Task {
drop(inner);
block_on_or_add_sub_task(async move {
let ack_await_fut = async move {
gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger);
let res = ack_rx.await.unwrap();
@ -665,11 +675,28 @@ impl Task {
}
res
})
.unwrap_or({
// Future was spawned as a subtask
};
if let Some((cur_context, cur_task_id)) = Context::current_task() {
gst_log!(
RUNTIME_CAT,
"Will await ack for {:?} in subtask to task {:?} on context {}",
trigger,
cur_task_id,
cur_context.name()
);
let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(())));
Ok(TransitionStatus::Async { trigger, origin })
})
} else {
gst_log!(
RUNTIME_CAT,
"Awaiting ack for {:?} on current thread",
trigger
);
// Use a light-weight executor, no timer nor async io.
futures::executor::block_on(ack_await_fut)
}
}
fn spawn_state_machine(