From 0b615409ac5acb05fa6af402da3ce39d42f6e173 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 22 Mar 2022 17:18:18 +0100 Subject: [PATCH] ts/rt/Task: use light weight executor blocking on ack or join handle Previous version used the Context::block_on_or_add_sub_task which spawns a full-fledged executor with timer and io Reactor for no reason when we just need to wait for a Receiver or JoinHandle. --- generic/threadshare/src/runtime/task.rs | 55 ++++++++++++++++++------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 8110042f5..903d5a5ec 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -33,7 +33,7 @@ use std::ops::Deref; use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; -use super::executor::{block_on_or_add_sub_task, TaskId}; +use super::executor::TaskId; use super::{Context, RUNTIME_CAT}; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] @@ -526,25 +526,35 @@ impl Task { match state_machine_handle { Some(state_machine_handle) => { - gst_log!( - RUNTIME_CAT, - "Synchronously waiting for the state machine {:?}", - state_machine_handle, - ); - let join_fut = block_on_or_add_sub_task(async { + let state_machine_end_fut = async { state_machine_handle.await; drop(triggering_evt_tx); drop(context); gst_debug!(RUNTIME_CAT, "Task unprepared"); - }); + }; + + if let Some((cur_context, cur_task_id)) = Context::current_task() { + gst_log!( + RUNTIME_CAT, + "Will wait for state machine termination completion in subtask to task {:?} on context {}", + cur_task_id, + cur_context.name() + ); + let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(()))); - if join_fut.is_none() { return Ok(TransitionStatus::Async { trigger: Trigger::Unprepare, origin, }); + } else { + gst_log!( + RUNTIME_CAT, + "Waiting for state machine termination on current thread" + ); + // Use a light-weight executor, no timer nor async io. + futures::executor::block_on(state_machine_end_fut) } } None => { @@ -654,7 +664,7 @@ impl Task { drop(inner); - block_on_or_add_sub_task(async move { + let ack_await_fut = async move { gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger); let res = ack_rx.await.unwrap(); @@ -665,11 +675,28 @@ impl Task { } res - }) - .unwrap_or({ - // Future was spawned as a subtask + }; + + if let Some((cur_context, cur_task_id)) = Context::current_task() { + gst_log!( + RUNTIME_CAT, + "Will await ack for {:?} in subtask to task {:?} on context {}", + trigger, + cur_task_id, + cur_context.name() + ); + let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(()))); + Ok(TransitionStatus::Async { trigger, origin }) - }) + } else { + gst_log!( + RUNTIME_CAT, + "Awaiting ack for {:?} on current thread", + trigger + ); + // Use a light-weight executor, no timer nor async io. + futures::executor::block_on(ack_await_fut) + } } fn spawn_state_machine(