threadshare/runtime: Add a dummy context when blocking the current thread

This allows downstreams to function as if a normal context thread is
operating upstream apart from not being able to spawn new tasks.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/issues/96
This commit is contained in:
Sebastian Dröge 2020-03-16 15:03:15 +02:00
parent 12dcca3f17
commit 56895f35bc
3 changed files with 157 additions and 41 deletions

View file

@ -77,28 +77,86 @@ tokio::task_local! {
static CURRENT_TASK_ID: TaskId; static CURRENT_TASK_ID: TaskId;
} }
/// Blocks on `future` in one way or another if possible.
///
/// IO & time related `Future`s must be handled within their own [`Context`].
/// Wait for the result using a [`JoinHandle`] or a `channel`.
///
/// If there's currently an active `Context` with a task, then the future is only queued up as a
/// pending sub task for that task.
///
/// Otherwise the current thread is blocking and the passed in future is executed.
///
/// Note that you must not pass any futures here that wait for the currently active task in one way
/// or another as this would deadlock!
pub fn block_on_or_add_sub_task<Fut: Future + Send + 'static>(future: Fut) -> Option<Fut::Output> {
if let Some((cur_context, cur_task_id)) = Context::current_task() {
gst_debug!(
RUNTIME_CAT,
"Adding subtask to task {:?} on context {}",
cur_task_id,
cur_context.name()
);
let _ = Context::add_sub_task(async move {
future.await;
Ok(())
});
return None;
}
// Not running in a Context thread so we can block
Some(block_on(future))
}
/// Blocks on `future`. /// Blocks on `future`.
/// ///
/// IO & time related `Future`s must be handled within their own [`Context`]. /// IO & time related `Future`s must be handled within their own [`Context`].
/// Wait for the result using a [`JoinHandle`] or a `channel`. /// Wait for the result using a [`JoinHandle`] or a `channel`.
/// ///
/// This function must NOT be called within a [`Context`] thread. /// The current thread is blocking and the passed in future is executed.
/// The reason is this would prevent any task operating on the
/// [`Context`] from making progress.
/// ///
/// # Panics /// # Panics
/// ///
/// This function panics if called within a [`Context`] thread. /// This function panics if called within a [`Context`] thread.
///
/// [`Context`]: struct.Context.html
/// [`JoinHandle`]: enum.JoinHandle.html
pub fn block_on<Fut: Future>(future: Fut) -> Fut::Output { pub fn block_on<Fut: Future>(future: Fut) -> Fut::Output {
if Context::is_context_thread() { assert!(!Context::is_context_thread());
panic!("Attempt to `block_on` within a `Context` thread");
}
// Not running in a Context thread so we can block // Not running in a Context thread so we can block
futures::executor::block_on(future) gst_debug!(RUNTIME_CAT, "Blocking on new dummy context");
let context = Context(Arc::new(ContextInner {
real: None,
task_queues: Mutex::new((0, HashMap::new())),
}));
CURRENT_THREAD_CONTEXT.with(move |cur_ctx| {
*cur_ctx.borrow_mut() = Some(context.downgrade());
let res = futures::executor::block_on(async move {
let res = CURRENT_TASK_ID
.scope(TaskId(0), async move {
let task_id = CURRENT_TASK_ID.try_with(|task_id| *task_id).ok();
assert_eq!(task_id, Some(TaskId(0)));
let res = future.await;
while Context::current_has_sub_tasks() {
if Context::drain_sub_tasks().await.is_err() {
break;
}
}
res
})
.await;
res
});
*cur_ctx.borrow_mut() = None;
res
})
} }
struct ContextThread { struct ContextThread {
@ -114,7 +172,15 @@ impl ContextThread {
}); });
let context = context_receiver.recv().expect("Context thread init failed"); let context = context_receiver.recv().expect("Context thread init failed");
*context.0.shutdown.join.lock().unwrap() = Some(join); *context
.0
.real
.as_ref()
.unwrap()
.shutdown
.join
.lock()
.unwrap() = Some(join);
context context
} }
@ -139,9 +205,11 @@ impl ContextThread {
}; };
let context = Context(Arc::new(ContextInner { let context = Context(Arc::new(ContextInner {
name: self.name.clone(), real: Some(ContextRealInner {
handle: Mutex::new(runtime.handle().clone()), name: self.name.clone(),
shutdown, handle: Mutex::new(runtime.handle().clone()),
shutdown,
}),
task_queues: Mutex::new((0, HashMap::new())), task_queues: Mutex::new((0, HashMap::new())),
})); }));
@ -279,19 +347,27 @@ impl<T> fmt::Debug for JoinHandle<T> {
} }
#[derive(Debug)] #[derive(Debug)]
struct ContextInner { struct ContextRealInner {
name: String, name: String,
handle: Mutex<tokio::runtime::Handle>, handle: Mutex<tokio::runtime::Handle>,
// Only used for dropping // Only used for dropping
shutdown: ContextShutdown, shutdown: ContextShutdown,
}
#[derive(Debug)]
struct ContextInner {
// Otherwise a dummy context
real: Option<ContextRealInner>,
task_queues: Mutex<(u64, HashMap<u64, SubTaskQueue>)>, task_queues: Mutex<(u64, HashMap<u64, SubTaskQueue>)>,
} }
impl Drop for ContextInner { impl Drop for ContextInner {
fn drop(&mut self) { fn drop(&mut self) {
let mut contexts = CONTEXTS.lock().unwrap(); if let Some(ref real) = self.real {
gst_debug!(RUNTIME_CAT, "Finalizing context '{}'", self.name); let mut contexts = CONTEXTS.lock().unwrap();
contexts.remove(&self.name); gst_debug!(RUNTIME_CAT, "Finalizing context '{}'", real.name);
contexts.remove(&real.name);
}
} }
} }
@ -329,11 +405,17 @@ impl Eq for Context {}
impl Context { impl Context {
pub fn acquire(context_name: &str, wait: u32) -> Result<Self, io::Error> { pub fn acquire(context_name: &str, wait: u32) -> Result<Self, io::Error> {
assert_ne!(context_name, "DUMMY");
let mut contexts = CONTEXTS.lock().unwrap(); let mut contexts = CONTEXTS.lock().unwrap();
if let Some(inner_weak) = contexts.get(context_name) { if let Some(inner_weak) = contexts.get(context_name) {
if let Some(inner_strong) = inner_weak.upgrade() { if let Some(inner_strong) = inner_weak.upgrade() {
gst_debug!(RUNTIME_CAT, "Joining Context '{}'", inner_strong.name); gst_debug!(
RUNTIME_CAT,
"Joining Context '{}'",
inner_strong.real.as_ref().unwrap().name
);
return Ok(Context(inner_strong)); return Ok(Context(inner_strong));
} }
} }
@ -341,7 +423,11 @@ impl Context {
let context = ContextThread::start(context_name, wait); let context = ContextThread::start(context_name, wait);
contexts.insert(context_name.into(), Arc::downgrade(&context.0)); contexts.insert(context_name.into(), Arc::downgrade(&context.0));
gst_debug!(RUNTIME_CAT, "New Context '{}'", context.0.name); gst_debug!(
RUNTIME_CAT,
"New Context '{}'",
context.0.real.as_ref().unwrap().name
);
Ok(context) Ok(context)
} }
@ -350,7 +436,10 @@ impl Context {
} }
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
self.0.name.as_str() match self.0.real {
Some(ref real) => real.name.as_str(),
None => "DUMMY",
}
} }
/// Returns `true` if a `Context` is running on current thread. /// Returns `true` if a `Context` is running on current thread.
@ -376,7 +465,7 @@ impl Context {
.as_ref() .as_ref()
.and_then(|ctx_weak| ctx_weak.upgrade()) .and_then(|ctx_weak| ctx_weak.upgrade())
.and_then(|ctx| { .and_then(|ctx| {
let task_id = ctx.enter(|| CURRENT_TASK_ID.try_with(|task_id| *task_id).ok()); let task_id = CURRENT_TASK_ID.try_with(|task_id| *task_id).ok();
task_id.map(move |task_id| (ctx, task_id)) task_id.map(move |task_id| (ctx, task_id))
}) })
@ -387,7 +476,12 @@ impl Context {
where where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
self.0.handle.lock().unwrap().enter(f) let real = match self.0.real {
Some(ref real) => real,
None => panic!("Can't enter on dummy context"),
};
real.handle.lock().unwrap().enter(f)
} }
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output> pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
@ -395,6 +489,11 @@ impl Context {
Fut: Future + Send + 'static, Fut: Future + Send + 'static,
Fut::Output: Send + 'static, Fut::Output: Send + 'static,
{ {
let real = match self.0.real {
Some(ref real) => real,
None => panic!("Can't spawn new tasks on dummy context"),
};
let mut task_queues = self.0.task_queues.lock().unwrap(); let mut task_queues = self.0.task_queues.lock().unwrap();
let id = task_queues.0; let id = task_queues.0;
task_queues.0 += 1; task_queues.0 += 1;
@ -405,16 +504,18 @@ impl Context {
RUNTIME_CAT, RUNTIME_CAT,
"Spawning new task {:?} on context {}", "Spawning new task {:?} on context {}",
id, id,
self.0.name real.name
); );
let join_handle = self.0.handle.lock().unwrap().spawn(async move { let join_handle = real.handle.lock().unwrap().spawn(async move {
let ctx = Context::current().unwrap(); let ctx = Context::current().unwrap();
let real = ctx.0.real.as_ref().unwrap();
gst_trace!( gst_trace!(
RUNTIME_CAT, RUNTIME_CAT,
"Running task {:?} on context {}", "Running task {:?} on context {}",
id, id,
ctx.0.name real.name
); );
let res = CURRENT_TASK_ID.scope(id, future).await; let res = CURRENT_TASK_ID.scope(id, future).await;
@ -428,14 +529,14 @@ impl Context {
RUNTIME_CAT, RUNTIME_CAT,
"Task {:?} on context {} has {} pending sub tasks", "Task {:?} on context {} has {} pending sub tasks",
id, id,
ctx.0.name, real.name,
l l
); );
} }
} }
} }
gst_trace!(RUNTIME_CAT, "Task {:?} on context {} done", id, ctx.0.name); gst_trace!(RUNTIME_CAT, "Task {:?} on context {} done", id, real.name);
res res
}); });
@ -479,12 +580,20 @@ impl Context {
let mut task_queues = ctx.0.task_queues.lock().unwrap(); let mut task_queues = ctx.0.task_queues.lock().unwrap();
match task_queues.1.get_mut(&task_id.0) { match task_queues.1.get_mut(&task_id.0) {
Some(task_queue) => { Some(task_queue) => {
gst_trace!( if let Some(ref real) = ctx.0.real {
RUNTIME_CAT, gst_trace!(
"Adding subtask to {:?} on context {}", RUNTIME_CAT,
task_id, "Adding subtask to {:?} on context {}",
ctx.0.name task_id,
); real.name
);
} else {
gst_trace!(
RUNTIME_CAT,
"Adding subtask to {:?} on dummy context",
task_id,
);
}
task_queue.0.push_back(sub_task.boxed()); task_queue.0.push_back(sub_task.boxed());
Ok(()) Ok(())
} }
@ -519,7 +628,12 @@ impl Context {
} }
}; };
let name = self.0.name.clone(); let name = self
.0
.real
.as_ref()
.map(|r| r.name.clone())
.unwrap_or_else(|| String::from("DUMMY"));
async move { async move {
if !task_queue.0.is_empty() { if !task_queue.0.is_empty() {
gst_log!( gst_log!(

View file

@ -83,7 +83,7 @@ use std::marker::PhantomData;
use std::sync; use std::sync;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use super::executor::Context; use super::executor::{block_on_or_add_sub_task, Context};
use super::task::Task; use super::task::Task;
use super::RUNTIME_CAT; use super::RUNTIME_CAT;
@ -986,7 +986,8 @@ impl<'a> PadSinkRef<'a> {
// Note: we don't use `crate::runtime::executor::block_on` here // Note: we don't use `crate::runtime::executor::block_on` here
// because `Context::is_context_thread()` is checked in the `if` // because `Context::is_context_thread()` is checked in the `if`
// statement above. // statement above.
futures::executor::block_on(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok))) block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
.unwrap_or(Ok(gst::FlowSuccess::Ok))
} else { } else {
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} }

View file

@ -27,6 +27,7 @@ use gst::{gst_debug, gst_log, gst_trace, gst_warning};
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use super::executor::block_on;
use super::{Context, JoinHandle, RUNTIME_CAT}; use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
@ -209,7 +210,7 @@ impl Task {
cur_task_id, cur_task_id,
cur_context.name() cur_context.name()
); );
let _ = futures::executor::block_on(prepare_handle); let _ = block_on(prepare_handle);
} }
} else { } else {
gst_debug!( gst_debug!(
@ -217,7 +218,7 @@ impl Task {
"Synchronously waiting for task {:?}", "Synchronously waiting for task {:?}",
prepare_handle prepare_handle
); );
let _ = futures::executor::block_on(prepare_handle); let _ = block_on(prepare_handle);
} }
} }
@ -422,7 +423,7 @@ impl Task {
cur_task_id, cur_task_id,
cur_context.name() cur_context.name()
); );
let _ = futures::executor::block_on(loop_handle); let _ = block_on(loop_handle);
} }
} else { } else {
gst_debug!( gst_debug!(
@ -430,7 +431,7 @@ impl Task {
"Synchronously waiting for task {:?}", "Synchronously waiting for task {:?}",
loop_handle loop_handle
); );
let _ = futures::executor::block_on(loop_handle); let _ = block_on(loop_handle);
} }
} }