From 8eb8ea0e7d969726fffaa00d18d90a4ba926963b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 22 Mar 2022 17:18:18 +0100 Subject: [PATCH] ts/rt/Task: use light weight executor blocking on ack or join handle Previous version used the Context::block_on_or_add_sub_task which spawns a full-fledged executor with timer and io Reactor for no reason when we just need to wait for a Receiver or JoinHandle. --- generic/threadshare/src/runtime/task.rs | 55 ++++++++++++++++++------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 8b7be397..13d8e4d9 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -31,7 +31,7 @@ use std::ops::Deref; use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; -use super::executor::{block_on_or_add_sub_task, TaskId}; +use super::executor::TaskId; use super::{Context, RUNTIME_CAT}; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] @@ -524,25 +524,35 @@ impl Task { match state_machine_handle { Some(state_machine_handle) => { - gst::log!( - RUNTIME_CAT, - "Synchronously waiting for the state machine {:?}", - state_machine_handle, - ); - let join_fut = block_on_or_add_sub_task(async { + let state_machine_end_fut = async { state_machine_handle.await; drop(triggering_evt_tx); drop(context); gst::debug!(RUNTIME_CAT, "Task unprepared"); - }); + }; + + if let Some((cur_context, cur_task_id)) = Context::current_task() { + gst::log!( + RUNTIME_CAT, + "Will wait for state machine termination completion in subtask to task {:?} on context {}", + cur_task_id, + cur_context.name() + ); + let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(()))); - if join_fut.is_none() { return Ok(TransitionStatus::Async { trigger: Trigger::Unprepare, origin, }); + } else { + gst::log!( + RUNTIME_CAT, + "Waiting for state machine termination on current thread" + ); + // Use a light-weight executor, no timer nor async io. + futures::executor::block_on(state_machine_end_fut) } } None => { @@ -652,7 +662,7 @@ impl Task { drop(inner); - block_on_or_add_sub_task(async move { + let ack_await_fut = async move { gst::trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger); let res = ack_rx.await.unwrap(); @@ -663,11 +673,28 @@ impl Task { } res - }) - .unwrap_or({ - // Future was spawned as a subtask + }; + + if let Some((cur_context, cur_task_id)) = Context::current_task() { + gst::log!( + RUNTIME_CAT, + "Will await ack for {:?} in subtask to task {:?} on context {}", + trigger, + cur_task_id, + cur_context.name() + ); + let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(()))); + Ok(TransitionStatus::Async { trigger, origin }) - }) + } else { + gst::log!( + RUNTIME_CAT, + "Awaiting ack for {:?} on current thread", + trigger + ); + // Use a light-weight executor, no timer nor async io. + futures::executor::block_on(ack_await_fut) + } } fn spawn_state_machine(