diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index b8ecfd3f..1b14bc62 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -53,7 +53,7 @@ struct Args { #[clap(short, long, default_value_t = 6000)] num_buffers: i32, - /// Enable statistics logging (use GST_DEBUG=ts-standalone*:4). + /// Enables statistics logging (use GST_DEBUG=ts-standalone*:4). #[clap(short, long)] log_stats: bool, } diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs index 371af220..f1555627 100644 --- a/generic/threadshare/src/runtime/executor/context.rs +++ b/generic/threadshare/src/runtime/executor/context.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -55,7 +55,7 @@ where cur_task_id, cur_context.name() ); - let _ = Context::add_sub_task(async move { + let _ = cur_context.add_sub_task(cur_task_id, async move { future.await; Ok(()) }); @@ -199,7 +199,10 @@ impl Context { /// Returns the `TaskId` running on current thread, if any. pub fn current_task() -> Option<(Context, TaskId)> { - Scheduler::current().map(Context).zip(TaskId::current()) + Scheduler::current().map(|scheduler| { + // Context users always operate on a Task + (Context(scheduler), TaskId::current().unwrap()) + }) } /// Executes the provided function relatively to this [`Context`]. @@ -265,31 +268,11 @@ impl Context { self.0.unpark(); } - pub fn current_has_sub_tasks() -> bool { - let (ctx, task_id) = match Context::current_task() { - Some(task) => task, - None => { - gst::trace!(RUNTIME_CAT, "No current task"); - return false; - } - }; - - ctx.0.has_sub_tasks(task_id) - } - - pub fn add_sub_task(sub_task: T) -> Result<(), T> + pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, { - let (ctx, task_id) = match Context::current_task() { - Some(task) => task, - None => { - gst::trace!(RUNTIME_CAT, "No current task"); - return Err(sub_task); - } - }; - - ctx.0.add_sub_task(task_id, sub_task) + self.0.add_sub_task(task_id, sub_task) } pub async fn drain_sub_tasks() -> SubTaskOutput { @@ -339,7 +322,7 @@ mod tests { assert_eq!(ctx.name(), Scheduler::DUMMY_NAME); assert_eq!(task_id, super::TaskId(0)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, super::TaskId(0)); Ok(()) @@ -381,10 +364,10 @@ mod tests { let ctx_weak = context.downgrade(); let join_handle = context.spawn(async move { - let (_ctx, task_id) = Context::current_task().unwrap(); + let (ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(0)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(0)); Ok(()) @@ -395,10 +378,10 @@ mod tests { .upgrade() .unwrap() .spawn(async { - let (_ctx, task_id) = Context::current_task().unwrap(); + let (ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(1)); - let res = Context::add_sub_task(async move { + let res = ctx.add_sub_task(task_id, async move { let (_ctx, task_id) = Context::current_task().unwrap(); assert_eq!(task_id, TaskId(1)); Ok(()) @@ -433,14 +416,19 @@ mod tests { let add_sub_task = move |item| { let sender = sender.clone(); - Context::add_sub_task(async move { - sender - .lock() - .await - .send(item) - .await - .map_err(|_| gst::FlowError::Error) - }) + Context::current_task() + .ok_or(()) + .and_then(|(ctx, task_id)| { + ctx.add_sub_task(task_id, async move { + sender + .lock() + .await + .send(item) + .await + .map_err(|_| gst::FlowError::Error) + }) + .map_err(drop) + }) }; // Tests @@ -450,7 +438,7 @@ mod tests { drain_fut.await.unwrap(); // Add a subtask - add_sub_task(0).map_err(drop).unwrap(); + add_sub_task(0).unwrap(); // Check that it was not executed yet receiver.try_next().unwrap_err(); @@ -461,7 +449,7 @@ mod tests { assert_eq!(receiver.try_next().unwrap(), Some(0)); // Add another task and check that it's not executed yet - add_sub_task(1).map_err(drop).unwrap(); + add_sub_task(1).unwrap(); receiver.try_next().unwrap_err(); // Return the receiver diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index a5f1eaf6..53a7fab7 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -132,11 +132,7 @@ impl Scheduler { let res = future.await; let task_id = TaskId::current().unwrap(); - while handle.has_sub_tasks(task_id) { - if handle.drain_sub_tasks(task_id).await.is_err() { - break; - } - } + let _ = handle.drain_sub_tasks(task_id).await; res }); @@ -411,10 +407,6 @@ impl Handle { self.0.scheduler.unpark(); } - pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { - self.0.scheduler.tasks.has_sub_tasks(task_id) - } - pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, diff --git a/generic/threadshare/src/runtime/executor/task.rs b/generic/threadshare/src/runtime/executor/task.rs index 935b5f9e..4b883ced 100644 --- a/generic/threadshare/src/runtime/executor/task.rs +++ b/generic/threadshare/src/runtime/executor/task.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018-2020 Sebastian Dröge -// Copyright (C) 2019-2021 François Laignel +// Copyright (C) 2019-2022 François Laignel // // Take a look at the license at the top of the repository in the LICENSE file. @@ -91,10 +91,6 @@ impl Task { { self.sub_tasks.push_back(sub_task.boxed()); } - - fn drain_sub_tasks(&mut self) -> VecDeque> { - std::mem::take(&mut self.sub_tasks) - } } impl fmt::Debug for Task { @@ -240,15 +236,6 @@ impl TaskQueue { self.runnables.pop() } - pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { - self.tasks - .lock() - .unwrap() - .get(task_id.0) - .map(|t| !t.sub_tasks.is_empty()) - .unwrap_or(false) - } - pub fn add_sub_task(&self, task_id: TaskId, sub_task: T) -> Result<(), T> where T: Future + Send + 'static, @@ -272,35 +259,24 @@ impl TaskQueue { } } - pub fn drain_sub_tasks( - &self, - task_id: TaskId, - ) -> impl Future + Send + 'static { - let sub_tasks = self - .tasks - .lock() - .unwrap() - .get_mut(task_id.0) - .map(|task| (task.drain_sub_tasks(), Arc::clone(&self.context_name))); + pub async fn drain_sub_tasks(&self, task_id: TaskId) -> SubTaskOutput { + loop { + let mut sub_tasks = match self.tasks.lock().unwrap().get_mut(task_id.0) { + Some(task) if !task.sub_tasks.is_empty() => std::mem::take(&mut task.sub_tasks), + _ => return Ok(()), + }; - async move { - if let Some((mut sub_tasks, context_name)) = sub_tasks { - if !sub_tasks.is_empty() { - gst::log!( - RUNTIME_CAT, - "Scheduling draining {} sub tasks from {:?} on '{}'", - sub_tasks.len(), - task_id, - &context_name, - ); + gst::trace!( + RUNTIME_CAT, + "Scheduling draining {} sub tasks from {:?} on '{}'", + sub_tasks.len(), + task_id, + self.context_name, + ); - for sub_task in sub_tasks.drain(..) { - sub_task.await?; - } - } + for sub_task in sub_tasks.drain(..) { + sub_task.await?; } - - Ok(()) } } } diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index e5453708..89e14477 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2020 François Laignel +// Copyright (C) 2019-2022 François Laignel // Copyright (C) 2020 Sebastian Dröge // // This library is free software; you can redistribute it and/or @@ -80,7 +80,7 @@ use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; -use super::executor::{block_on_or_add_sub_task, Context}; +use super::executor::{self, Context}; use super::RUNTIME_CAT; #[inline] @@ -234,9 +234,7 @@ impl PadSrcInner { })?; gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } + Context::drain_sub_tasks().await?; Ok(success) } @@ -255,9 +253,7 @@ impl PadSrcInner { })?; gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - Context::drain_sub_tasks().await?; - } + Context::drain_sub_tasks().await?; Ok(success) } @@ -268,10 +264,8 @@ impl PadSrcInner { let was_handled = self.gst_pad().push_event(event); gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks"); - while Context::current_has_sub_tasks() { - if Context::drain_sub_tasks().await.is_err() { - return false; - } + if Context::drain_sub_tasks().await.is_err() { + return false; } was_handled @@ -758,18 +752,6 @@ impl<'a> PadSinkRef<'a> { Ok(()) } - - fn handle_future( - &self, - fut: impl Future> + Send + 'static, - ) -> Result { - if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) { - block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok))) - .unwrap_or(Ok(gst::FlowSuccess::Ok)) - } else { - Ok(gst::FlowSuccess::Ok) - } - } } impl<'a> Deref for PadSinkRef<'a> { @@ -876,7 +858,7 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp, element| { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -889,7 +871,8 @@ impl PadSink { this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; handler.sink_chain(&this_ref, imp, &element, buffer).await }; - let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = + ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { @@ -900,7 +883,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), buffer, ); - this_ref.handle_future(chain_fut) + executor::block_on(chain_fut) } }, ) @@ -916,7 +899,7 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp, element| { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -931,7 +914,8 @@ impl PadSink { .sink_chain_list(&this_ref, imp, &element, list) .await }; - let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = + ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { @@ -942,7 +926,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), list, ); - this_ref.handle_future(chain_list_fut) + executor::block_on(chain_list_fut) } }, ) @@ -961,7 +945,7 @@ impl PadSink { || Err(FlowError::Error), move |imp, element| { if event.is_serialized() { - if Context::current_has_sub_tasks() { + if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let handler = handler.clone(); let element = @@ -980,8 +964,10 @@ impl PadSink { ) .await }; - let _ = - Context::add_sub_task(delayed_fut.map(|res| res.map(drop))); + let _ = ctx.add_sub_task( + task_id, + delayed_fut.map(|res| res.map(drop)), + ); Ok(gst::FlowSuccess::Ok) } else { @@ -992,7 +978,7 @@ impl PadSink { element.dynamic_cast_ref::().unwrap(), event, ); - this_ref.handle_future(event_fut) + executor::block_on(event_fut) } } else { let this_ref = PadSinkRef::new(inner_arc); diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index abf23e1b..137ceef9 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -203,14 +203,14 @@ impl TransitionStatus { origin, res_fut, } => { - if let Some(cur_ctx) = Context::current() { + if let Some((ctx, task_id)) = Context::current_task() { gst::debug!( RUNTIME_CAT, "Awaiting for {:?} ack in a subtask on context {}", trigger, - cur_ctx.name() + ctx.name() ); - let _ = Context::add_sub_task(async move { + let _ = ctx.add_sub_task(task_id, async move { let res = res_fut.await; if res.is_ok() { gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger); @@ -826,18 +826,14 @@ macro_rules! exec_action { ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{ match $self.task_impl.$action().await { Ok(()) => { - let mut res; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); - res = Context::drain_sub_tasks().await.map_err(|err| { - let msg = format!("{} subtask returned {:?}", stringify!($action), err); - gst::log!(RUNTIME_CAT, "{}", &msg); - gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg]) - }); - - if res.is_err() { - break; - } + gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); + if let Err(err) = Context::drain_sub_tasks().await { + gst::log!( + RUNTIME_CAT, + "{} subtask returned {:?}", + stringify!($action), + err + ); } Ok($triggering_evt) @@ -1097,15 +1093,9 @@ impl StateMachine { // Unprepare is not joined by an ack_rx but by joining the state machine handle self.task_impl.unprepare().await; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); - let res = Context::drain_sub_tasks().await.map_err(|err| { - gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); - err - }); - if res.is_err() { - break; - } + gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); + if let Err(err) = Context::drain_sub_tasks().await { + gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); } task_inner @@ -1185,12 +1175,9 @@ impl StateMachine { err })?; - while Context::current_has_sub_tasks() { - gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); - Context::drain_sub_tasks().await.map_err(|err| { - gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); - err - })?; + gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); + if let Err(err) = Context::drain_sub_tasks().await { + gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); } } }