ts/runtime: slight optimizations for sub tasks related operations

Using callgrind with the standalone test showed opportunities for
improvements for sub tasks addition and drain.

All sub task additions were performed after making sure we were
operating on a Context Task. The Context and Task were checked
again when adding the sub task.

Draining sub tasks was perfomed in a loop on every call places,
checking whether there were remaining sub tasks first. This
commit implements the loop and checks directly in
`executor::Task::drain_subtasks`, saving one `Mutex` lock and
one `thread_local` access per iteration when there are sub
tasks to drain.

The `PadSink` functions wrapper were performing redundant checks
on the `Context` presence and were adding the delayed Future only
when there were already sub tasks.
This commit is contained in:
François Laignel 2022-08-17 19:14:37 +02:00
parent 57da8e649d
commit 2bb071a950
6 changed files with 84 additions and 155 deletions

View file

@ -53,7 +53,7 @@ struct Args {
#[clap(short, long, default_value_t = 6000)] #[clap(short, long, default_value_t = 6000)]
num_buffers: i32, num_buffers: i32,
/// Enable statistics logging (use GST_DEBUG=ts-standalone*:4). /// Enables statistics logging (use GST_DEBUG=ts-standalone*:4).
#[clap(short, long)] #[clap(short, long)]
log_stats: bool, log_stats: bool,
} }

View file

@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com> // Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr> // Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// //
// Take a look at the license at the top of the repository in the LICENSE file. // Take a look at the license at the top of the repository in the LICENSE file.
@ -55,7 +55,7 @@ where
cur_task_id, cur_task_id,
cur_context.name() cur_context.name()
); );
let _ = Context::add_sub_task(async move { let _ = cur_context.add_sub_task(cur_task_id, async move {
future.await; future.await;
Ok(()) Ok(())
}); });
@ -199,7 +199,10 @@ impl Context {
/// Returns the `TaskId` running on current thread, if any. /// Returns the `TaskId` running on current thread, if any.
pub fn current_task() -> Option<(Context, TaskId)> { pub fn current_task() -> Option<(Context, TaskId)> {
Scheduler::current().map(Context).zip(TaskId::current()) Scheduler::current().map(|scheduler| {
// Context users always operate on a Task
(Context(scheduler), TaskId::current().unwrap())
})
} }
/// Executes the provided function relatively to this [`Context`]. /// Executes the provided function relatively to this [`Context`].
@ -265,31 +268,11 @@ impl Context {
self.0.unpark(); self.0.unpark();
} }
pub fn current_has_sub_tasks() -> bool { pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
let (ctx, task_id) = match Context::current_task() {
Some(task) => task,
None => {
gst::trace!(RUNTIME_CAT, "No current task");
return false;
}
};
ctx.0.has_sub_tasks(task_id)
}
pub fn add_sub_task<T>(sub_task: T) -> Result<(), T>
where where
T: Future<Output = SubTaskOutput> + Send + 'static, T: Future<Output = SubTaskOutput> + Send + 'static,
{ {
let (ctx, task_id) = match Context::current_task() { self.0.add_sub_task(task_id, sub_task)
Some(task) => task,
None => {
gst::trace!(RUNTIME_CAT, "No current task");
return Err(sub_task);
}
};
ctx.0.add_sub_task(task_id, sub_task)
} }
pub async fn drain_sub_tasks() -> SubTaskOutput { pub async fn drain_sub_tasks() -> SubTaskOutput {
@ -339,7 +322,7 @@ mod tests {
assert_eq!(ctx.name(), Scheduler::DUMMY_NAME); assert_eq!(ctx.name(), Scheduler::DUMMY_NAME);
assert_eq!(task_id, super::TaskId(0)); assert_eq!(task_id, super::TaskId(0));
let res = Context::add_sub_task(async move { let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap(); let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0)); assert_eq!(task_id, super::TaskId(0));
Ok(()) Ok(())
@ -381,10 +364,10 @@ mod tests {
let ctx_weak = context.downgrade(); let ctx_weak = context.downgrade();
let join_handle = context.spawn(async move { let join_handle = context.spawn(async move {
let (_ctx, task_id) = Context::current_task().unwrap(); let (ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(0)); assert_eq!(task_id, TaskId(0));
let res = Context::add_sub_task(async move { let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap(); let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(0)); assert_eq!(task_id, TaskId(0));
Ok(()) Ok(())
@ -395,10 +378,10 @@ mod tests {
.upgrade() .upgrade()
.unwrap() .unwrap()
.spawn(async { .spawn(async {
let (_ctx, task_id) = Context::current_task().unwrap(); let (ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(1)); assert_eq!(task_id, TaskId(1));
let res = Context::add_sub_task(async move { let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap(); let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(1)); assert_eq!(task_id, TaskId(1));
Ok(()) Ok(())
@ -433,14 +416,19 @@ mod tests {
let add_sub_task = move |item| { let add_sub_task = move |item| {
let sender = sender.clone(); let sender = sender.clone();
Context::add_sub_task(async move { Context::current_task()
sender .ok_or(())
.lock() .and_then(|(ctx, task_id)| {
.await ctx.add_sub_task(task_id, async move {
.send(item) sender
.await .lock()
.map_err(|_| gst::FlowError::Error) .await
}) .send(item)
.await
.map_err(|_| gst::FlowError::Error)
})
.map_err(drop)
})
}; };
// Tests // Tests
@ -450,7 +438,7 @@ mod tests {
drain_fut.await.unwrap(); drain_fut.await.unwrap();
// Add a subtask // Add a subtask
add_sub_task(0).map_err(drop).unwrap(); add_sub_task(0).unwrap();
// Check that it was not executed yet // Check that it was not executed yet
receiver.try_next().unwrap_err(); receiver.try_next().unwrap_err();
@ -461,7 +449,7 @@ mod tests {
assert_eq!(receiver.try_next().unwrap(), Some(0)); assert_eq!(receiver.try_next().unwrap(), Some(0));
// Add another task and check that it's not executed yet // Add another task and check that it's not executed yet
add_sub_task(1).map_err(drop).unwrap(); add_sub_task(1).unwrap();
receiver.try_next().unwrap_err(); receiver.try_next().unwrap_err();
// Return the receiver // Return the receiver

View file

@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com> // Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr> // Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// //
// Take a look at the license at the top of the repository in the LICENSE file. // Take a look at the license at the top of the repository in the LICENSE file.
@ -132,11 +132,7 @@ impl Scheduler {
let res = future.await; let res = future.await;
let task_id = TaskId::current().unwrap(); let task_id = TaskId::current().unwrap();
while handle.has_sub_tasks(task_id) { let _ = handle.drain_sub_tasks(task_id).await;
if handle.drain_sub_tasks(task_id).await.is_err() {
break;
}
}
res res
}); });
@ -411,10 +407,6 @@ impl Handle {
self.0.scheduler.unpark(); self.0.scheduler.unpark();
} }
pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {
self.0.scheduler.tasks.has_sub_tasks(task_id)
}
pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T> pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
where where
T: Future<Output = SubTaskOutput> + Send + 'static, T: Future<Output = SubTaskOutput> + Send + 'static,

View file

@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com> // Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr> // Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// //
// Take a look at the license at the top of the repository in the LICENSE file. // Take a look at the license at the top of the repository in the LICENSE file.
@ -91,10 +91,6 @@ impl Task {
{ {
self.sub_tasks.push_back(sub_task.boxed()); self.sub_tasks.push_back(sub_task.boxed());
} }
fn drain_sub_tasks(&mut self) -> VecDeque<BoxFuture<'static, SubTaskOutput>> {
std::mem::take(&mut self.sub_tasks)
}
} }
impl fmt::Debug for Task { impl fmt::Debug for Task {
@ -240,15 +236,6 @@ impl TaskQueue {
self.runnables.pop() self.runnables.pop()
} }
pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {
self.tasks
.lock()
.unwrap()
.get(task_id.0)
.map(|t| !t.sub_tasks.is_empty())
.unwrap_or(false)
}
pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T> pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
where where
T: Future<Output = SubTaskOutput> + Send + 'static, T: Future<Output = SubTaskOutput> + Send + 'static,
@ -272,35 +259,24 @@ impl TaskQueue {
} }
} }
pub fn drain_sub_tasks( pub async fn drain_sub_tasks(&self, task_id: TaskId) -> SubTaskOutput {
&self, loop {
task_id: TaskId, let mut sub_tasks = match self.tasks.lock().unwrap().get_mut(task_id.0) {
) -> impl Future<Output = SubTaskOutput> + Send + 'static { Some(task) if !task.sub_tasks.is_empty() => std::mem::take(&mut task.sub_tasks),
let sub_tasks = self _ => return Ok(()),
.tasks };
.lock()
.unwrap()
.get_mut(task_id.0)
.map(|task| (task.drain_sub_tasks(), Arc::clone(&self.context_name)));
async move { gst::trace!(
if let Some((mut sub_tasks, context_name)) = sub_tasks { RUNTIME_CAT,
if !sub_tasks.is_empty() { "Scheduling draining {} sub tasks from {:?} on '{}'",
gst::log!( sub_tasks.len(),
RUNTIME_CAT, task_id,
"Scheduling draining {} sub tasks from {:?} on '{}'", self.context_name,
sub_tasks.len(), );
task_id,
&context_name,
);
for sub_task in sub_tasks.drain(..) { for sub_task in sub_tasks.drain(..) {
sub_task.await?; sub_task.await?;
}
}
} }
Ok(())
} }
} }
} }

View file

@ -1,4 +1,4 @@
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr> // Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com> // Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
// //
// This library is free software; you can redistribute it and/or // This library is free software; you can redistribute it and/or
@ -80,7 +80,7 @@ use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use super::executor::{block_on_or_add_sub_task, Context}; use super::executor::{self, Context};
use super::RUNTIME_CAT; use super::RUNTIME_CAT;
#[inline] #[inline]
@ -234,9 +234,7 @@ impl PadSrcInner {
})?; })?;
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() { Context::drain_sub_tasks().await?;
Context::drain_sub_tasks().await?;
}
Ok(success) Ok(success)
} }
@ -255,9 +253,7 @@ impl PadSrcInner {
})?; })?;
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() { Context::drain_sub_tasks().await?;
Context::drain_sub_tasks().await?;
}
Ok(success) Ok(success)
} }
@ -268,10 +264,8 @@ impl PadSrcInner {
let was_handled = self.gst_pad().push_event(event); let was_handled = self.gst_pad().push_event(event);
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() { if Context::drain_sub_tasks().await.is_err() {
if Context::drain_sub_tasks().await.is_err() { return false;
return false;
}
} }
was_handled was_handled
@ -758,18 +752,6 @@ impl<'a> PadSinkRef<'a> {
Ok(()) Ok(())
} }
fn handle_future(
&self,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
.unwrap_or(Ok(gst::FlowSuccess::Ok))
} else {
Ok(gst::FlowSuccess::Ok)
}
}
} }
impl<'a> Deref for PadSinkRef<'a> { impl<'a> Deref for PadSinkRef<'a> {
@ -876,7 +858,7 @@ impl PadSink {
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp, element| { move |imp, element| {
if Context::current_has_sub_tasks() { if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone(); let handler = handler.clone();
let element = let element =
@ -889,7 +871,8 @@ impl PadSink {
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler.sink_chain(&this_ref, imp, &element, buffer).await handler.sink_chain(&this_ref, imp, &element, buffer).await
}; };
let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
@ -900,7 +883,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(), element.dynamic_cast_ref::<gst::Element>().unwrap(),
buffer, buffer,
); );
this_ref.handle_future(chain_fut) executor::block_on(chain_fut)
} }
}, },
) )
@ -916,7 +899,7 @@ impl PadSink {
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp, element| { move |imp, element| {
if Context::current_has_sub_tasks() { if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone(); let handler = handler.clone();
let element = let element =
@ -931,7 +914,8 @@ impl PadSink {
.sink_chain_list(&this_ref, imp, &element, list) .sink_chain_list(&this_ref, imp, &element, list)
.await .await
}; };
let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
@ -942,7 +926,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(), element.dynamic_cast_ref::<gst::Element>().unwrap(),
list, list,
); );
this_ref.handle_future(chain_list_fut) executor::block_on(chain_list_fut)
} }
}, },
) )
@ -961,7 +945,7 @@ impl PadSink {
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp, element| { move |imp, element| {
if event.is_serialized() { if event.is_serialized() {
if Context::current_has_sub_tasks() { if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone(); let handler = handler.clone();
let element = let element =
@ -980,8 +964,10 @@ impl PadSink {
) )
.await .await
}; };
let _ = let _ = ctx.add_sub_task(
Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); task_id,
delayed_fut.map(|res| res.map(drop)),
);
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
@ -992,7 +978,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(), element.dynamic_cast_ref::<gst::Element>().unwrap(),
event, event,
); );
this_ref.handle_future(event_fut) executor::block_on(event_fut)
} }
} else { } else {
let this_ref = PadSinkRef::new(inner_arc); let this_ref = PadSinkRef::new(inner_arc);

View file

@ -203,14 +203,14 @@ impl TransitionStatus {
origin, origin,
res_fut, res_fut,
} => { } => {
if let Some(cur_ctx) = Context::current() { if let Some((ctx, task_id)) = Context::current_task() {
gst::debug!( gst::debug!(
RUNTIME_CAT, RUNTIME_CAT,
"Awaiting for {:?} ack in a subtask on context {}", "Awaiting for {:?} ack in a subtask on context {}",
trigger, trigger,
cur_ctx.name() ctx.name()
); );
let _ = Context::add_sub_task(async move { let _ = ctx.add_sub_task(task_id, async move {
let res = res_fut.await; let res = res_fut.await;
if res.is_ok() { if res.is_ok() {
gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
@ -826,18 +826,14 @@ macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{ ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await { match $self.task_impl.$action().await {
Ok(()) => { Ok(()) => {
let mut res; gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
while Context::current_has_sub_tasks() { if let Err(err) = Context::drain_sub_tasks().await {
gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); gst::log!(
res = Context::drain_sub_tasks().await.map_err(|err| { RUNTIME_CAT,
let msg = format!("{} subtask returned {:?}", stringify!($action), err); "{} subtask returned {:?}",
gst::log!(RUNTIME_CAT, "{}", &msg); stringify!($action),
gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg]) err
}); );
if res.is_err() {
break;
}
} }
Ok($triggering_evt) Ok($triggering_evt)
@ -1097,15 +1093,9 @@ impl<Item: Send + 'static> StateMachine<Item> {
// Unprepare is not joined by an ack_rx but by joining the state machine handle // Unprepare is not joined by an ack_rx but by joining the state machine handle
self.task_impl.unprepare().await; self.task_impl.unprepare().await;
while Context::current_has_sub_tasks() { gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); if let Err(err) = Context::drain_sub_tasks().await {
let res = Context::drain_sub_tasks().await.map_err(|err| { gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
err
});
if res.is_err() {
break;
}
} }
task_inner task_inner
@ -1185,12 +1175,9 @@ impl<Item: Send + 'static> StateMachine<Item> {
err err
})?; })?;
while Context::current_has_sub_tasks() { gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); if let Err(err) = Context::drain_sub_tasks().await {
Context::drain_sub_tasks().await.map_err(|err| { gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
err
})?;
} }
} }
} }