From 2bb071a950c1b78b8e324c916294251ca822fb37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 17 Aug 2022 19:14:37 +0200 Subject: [PATCH] ts/runtime: slight optimizations for sub tasks related operations Using callgrind with the standalone test showed opportunities for improvements for sub tasks addition and drain. All sub task additions were performed after making sure we were operating on a Context Task. The Context and Task were checked again when adding the sub task. Draining sub tasks was perfomed in a loop on every call places, checking whether there were remaining sub tasks first. This commit implements the loop and checks directly in `executor::Task::drain_subtasks`, saving one `Mutex` lock and one `thread_local` access per iteration when there are sub tasks to drain. The `PadSink` functions wrapper were performing redundant checks on the `Context` presence and were adding the delayed Future only when there were already sub tasks. --- .../threadshare/examples/standalone/main.rs | 2 +- .../src/runtime/executor/context.rs | 68 ++++++++----------- .../src/runtime/executor/scheduler.rs | 12 +--- .../threadshare/src/runtime/executor/task.rs | 56 +++++---------- generic/threadshare/src/runtime/pad.rs | 54 ++++++--------- generic/threadshare/src/runtime/task.rs | 47 +++++-------- 6 files changed, 84 insertions(+), 155 deletions(-) diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index b8ecfd3f..1b14bc62 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -53,7 +53,7 @@ struct Args { #[clap(short, long, default_value_t = 6000)] num_buffers: i32, - /// Enable statistics logging (use GST_DEBUG=ts-standalone*:4). + /// Enables statistics logging (use GST_DEBUG=ts-standalone*:4). #[clap(short, long)] log_stats: bool, } diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs index 371af220..f1555627 100644 --- a/generic/threadshare/src/runtime/executor/context.rs +++ b/generic/threadshare/src/runtime/executor/context.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -55,7 +55,7 @@ where cur_task_id, cur_context.name() ); - let _ = Context::add_sub_task(async move { + let _ = cur_context.add_sub_task(cur_task_id, async move { future.await; Ok(()) }); @@ -199,7 +199,10 @@ impl Context { /// Returns the `TaskId` running on current thread, if any. pub fn current_task() -> Option<(Context, TaskId)> { - Scheduler::current().map(Context).zip(TaskId::current()) + Scheduler::current().map(|scheduler| { + // Context users always operate on a Task + (Context(scheduler), TaskId::current().unwrap()) + }) } /// Executes the provided function relatively to this [`Context`]. @@ -265,31 +268,11 @@ impl Context { self.0.unpark(); } - pub fn current_has_sub_tasks() -> bool { - let (ctx, task_id) = match Context::current_task() { - Some(task) => task, - None => { - gst::trace!(RUNTIME_CAT, "No current task"); - return false; - } - }; - - ctx.0.has_sub_tasks(task_id) - } - - pub fn add_sub_task(sub_task: T) -> Result<(), T> + pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, { - let (ctx, task_id) = match Context::current_task() { - Some(task) => task, - None => { - gst::trace!(RUNTIME_CAT, "No current task"); - return Err(sub_task); - } - }; - - ctx.0.add_sub_task(task_id, sub_task) + self.0.add_sub_task(task_id, sub_task) } pub async fn drain_sub_tasks() -> SubTaskOutput { @@ -339,7 +322,7 @@ mod tests { assert_eq!(ctx.name(), Scheduler::DUMMY_NAME); assert_eq!(task_id, super::TaskId(0)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, super::TaskId(0)); Ok(()) @@ -381,10 +364,10 @@ mod tests { let ctx_weak = context.downgrade(); let join_handle = context.spawn(async move { - let (_ctx, task_id) = Context::current_task().unwrap(); + let (ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(0)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(0)); Ok(()) @@ -395,10 +378,10 @@ mod tests { .upgrade() .unwrap() .spawn(async { - let (_ctx, task_id) = Context::current_task().unwrap(); + let (ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(1)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(1)); Ok(()) @@ -433,14 +416,19 @@ mod tests { let add_sub_task = move |item| { let sender = sender.clone(); - Context::add_sub_task(async move { - sender - .lock() - .await - .send(item) - .await - .map_err(|_| gst::FlowError::Error) - }) + Context::current_task() + .ok_or(()) + .and_then(|(ctx, task_id)| { + ctx.add_sub_task(task_id, async move { + sender + .lock() + .await + .send(item) + .await + .map_err(|_| gst::FlowError::Error) + }) + .map_err(drop) + }) }; // Tests @@ -450,7 +438,7 @@ mod tests { drain_fut.await.unwrap(); // Add a subtask - add_sub_task(0).map_err(drop).unwrap(); + add_sub_task(0).unwrap(); // Check that it was not executed yet receiver.try_next().unwrap_err(); @@ -461,7 +449,7 @@ mod tests { assert_eq!(receiver.try_next().unwrap(), Some(0)); // Add another task and check that it's not executed yet - add_sub_task(1).map_err(drop).unwrap(); + add_sub_task(1).unwrap(); receiver.try_next().unwrap_err(); // Return the receiver diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index a5f1eaf6..53a7fab7 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -132,11 +132,7 @@ impl Scheduler { let res = future.await; let task_id = TaskId::current().unwrap(); - while handle.has_sub_tasks(task_id) { - if handle.drain_sub_tasks(task_id).await.is_err() { - break; - } - } + let _ = handle.drain_sub_tasks(task_id).await; res }); @@ -411,10 +407,6 @@ impl Handle { self.0.scheduler.unpark(); } - pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { - self.0.scheduler.tasks.has_sub_tasks(task_id) - } - pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, diff --git a/generic/threadshare/src/runtime/executor/task.rs b/generic/threadshare/src/runtime/executor/task.rs index 935b5f9e..4b883ced 100644 --- a/generic/threadshare/src/runtime/executor/task.rs +++ b/generic/threadshare/src/runtime/executor/task.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -91,10 +91,6 @@ impl Task { { self.sub_tasks.push_back(sub_task.boxed()); } - - fn drain_sub_tasks(&mut self) -> VecDeque> { - std::mem::take(&mut self.sub_tasks) - } } impl fmt::Debug for Task { @@ -240,15 +236,6 @@ impl TaskQueue { self.runnables.pop() } - pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { - self.tasks - .lock() - .unwrap() - .get(task_id.0) - .map(|t| !t.sub_tasks.is_empty()) - .unwrap_or(false) - } - pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, @@ -272,35 +259,24 @@ impl TaskQueue { } } - pub fn drain_sub_tasks( - &self, - task_id: TaskId, - ) -> impl Future + Send + 'static { - let sub_tasks = self - .tasks - .lock() - .unwrap() - .get_mut(task_id.0) - .map(|task| (task.drain_sub_tasks(), Arc::clone(&self.context_name))); + pub async fn drain_sub_tasks(&self, task_id: TaskId) -> SubTaskOutput { + loop { + let mut sub_tasks = match self.tasks.lock().unwrap().get_mut(task_id.0) { + Some(task) if !task.sub_tasks.is_empty() => std::mem::take(&mut task.sub_tasks), + _ => return Ok(()), + }; - async move { - if let Some((mut sub_tasks, context_name)) = sub_tasks { - if !sub_tasks.is_empty() { - gst::log!( - RUNTIME_CAT, - "Scheduling draining {} sub tasks from {:?} on '{}'", - sub_tasks.len(), - task_id, - &context_name, - ); + gst::trace!( + RUNTIME_CAT, + "Scheduling draining {} sub tasks from {:?} on '{}'", + sub_tasks.len(), + task_id, + self.context_name, + ); - for sub_task in sub_tasks.drain(..) { - sub_task.await?; - } - } + for sub_task in sub_tasks.drain(..) { + sub_task.await?; } - - Ok(()) } } } diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index e5453708..89e14477 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2020 François Laignel +// Copyright (C) 2019-2022 François Laignel // Copyright (C) 2020 Sebastian Dröge // // This library is free software; you can redistribute it and/or @@ -80,7 +80,7 @@ use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; -use super::executor::{block_on_or_add_sub_task, Context}; +use super::executor::{self, Context}; use super::RUNTIME_CAT; #[inline] @@ -234,9 +234,7 @@ impl PadSrcInner { })?; gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } + Context::drain_sub_tasks().await?; Ok(success) } @@ -255,9 +253,7 @@ impl PadSrcInner { })?; gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } + Context::drain_sub_tasks().await?; Ok(success) } @@ -268,10 +264,8 @@ impl PadSrcInner { let was_handled = self.gst_pad().push_event(event); gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - if Context::drain_sub_tasks().await.is_err() { - return false; - } + if Context::drain_sub_tasks().await.is_err() { + return false; } was_handled @@ -758,18 +752,6 @@ impl<'a> PadSinkRef<'a> { Ok(()) } - - fn handle_future( - &self, - fut: impl Future> + Send + 'static, - ) -> Result { - if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) { - block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok))) - .unwrap_or(Ok(gst::FlowSuccess::Ok)) - } else { - Ok(gst::FlowSuccess::Ok) - } - } } impl<'a> Deref for PadSinkRef<'a> { @@ -876,7 +858,7 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp, element| { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -889,7 +871,8 @@ impl PadSink { this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; handler.sink_chain(&this_ref, imp, &element, buffer).await }; - let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = + ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { @@ -900,7 +883,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), buffer, ); - this_ref.handle_future(chain_fut) + executor::block_on(chain_fut) } }, ) @@ -916,7 +899,7 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp, element| { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -931,7 +914,8 @@ impl PadSink { .sink_chain_list(&this_ref, imp, &element, list) .await }; - let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = + ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { @@ -942,7 +926,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), list, ); - this_ref.handle_future(chain_list_fut) + executor::block_on(chain_list_fut) } }, ) @@ -961,7 +945,7 @@ impl PadSink { || Err(FlowError::Error), move |imp, element| { if event.is_serialized() { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -980,8 +964,10 @@ impl PadSink { ) .await }; - let _ = - Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = ctx.add_sub_task( + task_id, + delayed_fut.map(|res| res.map(drop)), + ); Ok(gst::FlowSuccess::Ok) } else { @@ -992,7 +978,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), event, ); - this_ref.handle_future(event_fut) + executor::block_on(event_fut) } } else { let this_ref = PadSinkRef::new(inner_arc); diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index abf23e1b..137ceef9 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -203,14 +203,14 @@ impl TransitionStatus { origin, res_fut, } => { - if let Some(cur_ctx) = Context::current() { + if let Some((ctx, task_id)) = Context::current_task() { gst::debug!( RUNTIME_CAT, "Awaiting for {:?} ack in a subtask on context {}", trigger, - cur_ctx.name() + ctx.name() ); - let _ = Context::add_sub_task(async move { + let _ = ctx.add_sub_task(task_id, async move { let res = res_fut.await; if res.is_ok() { gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); @@ -826,18 +826,14 @@ macro_rules! exec_action { ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{ match $self.task_impl.$action().await { Ok(()) => { - let mut res; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); - res = Context::drain_sub_tasks().await.map_err(|err| { - let msg = format!("{} subtask returned {:?}", stringify!($action), err); - gst::log!(RUNTIME_CAT, "{}", &msg); - gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg]) - }); - - if res.is_err() { - break; - } + gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); + if let Err(err) = Context::drain_sub_tasks().await { + gst::log!( + RUNTIME_CAT, + "{} subtask returned {:?}", + stringify!($action), + err + ); } Ok($triggering_evt) @@ -1097,15 +1093,9 @@ impl StateMachine { // Unprepare is not joined by an ack_rx but by joining the state machine handle self.task_impl.unprepare().await; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); - let res = Context::drain_sub_tasks().await.map_err(|err| { - gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); - err - }); - if res.is_err() { - break; - } + gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); + if let Err(err) = Context::drain_sub_tasks().await { + gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); } task_inner @@ -1185,12 +1175,9 @@ impl StateMachine { err })?; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); - Context::drain_sub_tasks().await.map_err(|err| { - gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); - err - })?; + gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); + if let Err(err) = Context::drain_sub_tasks().await { + gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); } } }