threadshare: separate blocking & throttling schedulers

Rationale:

* Detecting whether current thread uses a throttling or blocking scheduler
  is now immediate instead of relying on a sentinel name.
* Each scheduler can be optimized for its purpose.
* Blocking scheduler can now be reused saving a few CPU cycles entering
  and leaving.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2526>
This commit is contained in:
François Laignel 2025-08-29 15:45:31 +02:00
parent e92250847d
commit 83d16a1cca
8 changed files with 279 additions and 298 deletions

View file

@ -32,7 +32,7 @@ use rustix::net::addr::SocketAddrArg;
use crate::runtime::RUNTIME_CAT;
use super::scheduler::{self, Scheduler};
use super::scheduler;
use super::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned};
/// Async adapter for I/O types.
@ -98,8 +98,8 @@ pub struct Async<T: Send + 'static> {
/// The inner I/O handle.
pub(super) io: Option<T>,
// The [`Handle`] on the [`Scheduler`] on which this Async wrapper is registered.
pub(super) sched: scheduler::HandleWeak,
// The [`ThrottlingHandle`] on the [`scheduler::Throttling`] on which this Async wrapper is registered.
pub(super) throttling_sched_hdl: Option<scheduler::ThrottlingHandleWeak>,
}
impl<T: Send + 'static> Unpin for Async<T> {}
@ -151,9 +151,9 @@ impl<T: AsFd + Send + 'static> Async<T> {
Ok(Async {
source,
io: Some(io),
sched: Scheduler::current()
.expect("Attempt to create an Async wrapper outside of a Context")
.downgrade(),
throttling_sched_hdl: scheduler::Throttling::current()
.as_ref()
.map(scheduler::ThrottlingHandle::downgrade),
})
}
}
@ -239,9 +239,9 @@ impl<T: AsSocket + Send + 'static> Async<T> {
Ok(Async {
source,
io: Some(io),
sched: Scheduler::current()
.expect("Attempt to create an Async wrapper outside of a Context")
.downgrade(),
throttling_sched_hdl: scheduler::Throttling::current()
.as_ref()
.map(scheduler::ThrottlingHandle::downgrade),
})
}
}
@ -461,23 +461,32 @@ impl<T: Send + 'static> AsRef<T> for Async<T> {
impl<T: Send + 'static> Drop for Async<T> {
fn drop(&mut self) {
if let Some(io) = self.io.take() {
if let Some(sched) = self.sched.upgrade() {
let source = Arc::clone(&self.source);
sched.spawn_and_unpark(async move {
Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&source) {
gst::error!(
RUNTIME_CAT,
"Failed to remove fd {:?}: {}",
source.registration,
err
);
}
if let Some(throttling_sched_hdl) = self.throttling_sched_hdl.take() {
if let Some(sched) = throttling_sched_hdl.upgrade() {
let source = Arc::clone(&self.source);
sched.spawn_and_unpark(async move {
Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&source) {
gst::error!(
RUNTIME_CAT,
"Failed to remove fd {:?}: {err}",
source.registration,
);
}
});
drop(io);
});
drop(io);
});
}
} else {
drop(io);
Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&self.source) {
gst::error!(
RUNTIME_CAT,
"Failed to remove fd {:?}: {err}",
self.source.registration,
);
}
});
}
}
}

View file

@ -14,7 +14,7 @@ use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use std::time::Duration;
use super::{Handle, HandleWeak, JoinHandle, Scheduler, SubTaskOutput, TaskId};
use super::{scheduler, JoinHandle, SubTaskOutput, TaskId};
use crate::runtime::RUNTIME_CAT;
// We are bound to using `sync` for the `runtime` `Mutex`es. Attempts to use `async` `Mutex`es
@ -82,15 +82,8 @@ where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
if let Some(context) = Context::current() {
let msg = format!("Attempt to block within Context {}", context.name());
gst::error!(RUNTIME_CAT, "{}", msg);
panic!("{}", msg);
}
// Not running in a Context thread so we can block
gst::debug!(RUNTIME_CAT, "Blocking on new dummy context");
Scheduler::block_on(future)
gst::log!(RUNTIME_CAT, "Blocking on local thread");
scheduler::Blocking::block_on(future)
}
/// Yields execution back to the runtime.
@ -118,7 +111,7 @@ impl Future for YieldNow {
}
#[derive(Clone, Debug)]
pub struct ContextWeak(HandleWeak);
pub struct ContextWeak(scheduler::ThrottlingHandleWeak);
impl ContextWeak {
pub fn upgrade(&self) -> Option<Context> {
@ -137,7 +130,7 @@ impl ContextWeak {
/// [`PadSrc`]: ../pad/struct.PadSrc.html
/// [`PadSink`]: ../pad/struct.PadSink.html
#[derive(Clone, Debug)]
pub struct Context(Handle);
pub struct Context(scheduler::ThrottlingHandle);
impl PartialEq for Context {
fn eq(&self, other: &Self) -> bool {
@ -149,8 +142,6 @@ impl Eq for Context {}
impl Context {
pub fn acquire(context_name: &str, wait: Duration) -> Result<Self, io::Error> {
assert_ne!(context_name, Scheduler::DUMMY_NAME);
let mut contexts = CONTEXTS.lock().unwrap();
if let Some(context_weak) = contexts.get(context_name) {
@ -160,14 +151,13 @@ impl Context {
}
}
let context = Context(Scheduler::start(context_name, wait));
let context = Context(scheduler::Throttling::start(context_name, wait));
contexts.insert(context_name.into(), context.downgrade());
gst::debug!(
RUNTIME_CAT,
"New Context '{}' throttling {:?}",
"New Context '{}' throttling {wait:?}",
context.name(),
wait,
);
Ok(context)
}
@ -197,20 +187,20 @@ impl Context {
/// Returns `true` if a `Context` is running on current thread.
pub fn is_context_thread() -> bool {
Scheduler::is_scheduler_thread()
scheduler::Throttling::is_throttling_thread()
}
/// Returns the `Context` running on current thread, if any.
pub fn current() -> Option<Context> {
Scheduler::current().map(Context)
scheduler::Throttling::current().map(Context)
}
/// Returns the `TaskId` running on current thread, if any.
pub fn current_task() -> Option<(Context, TaskId)> {
Scheduler::current().map(|scheduler| {
// Context users always operate on a Task
(Context(scheduler), TaskId::current().unwrap())
})
Option::zip(
scheduler::Throttling::current().map(Context),
TaskId::current(),
)
}
/// Executes the provided function relatively to this [`Context`].
@ -293,8 +283,8 @@ impl Context {
}
}
impl From<Handle> for Context {
fn from(handle: Handle) -> Self {
impl From<scheduler::ThrottlingHandle> for Context {
fn from(handle: scheduler::ThrottlingHandle) -> Self {
Context(handle)
}
}
@ -309,7 +299,6 @@ mod tests {
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::super::Scheduler;
use super::Context;
use crate::runtime::Async;
@ -319,29 +308,6 @@ mod tests {
const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS);
const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10);
#[test]
fn block_on_task_id() {
gst::init().unwrap();
assert!(!Context::is_context_thread());
crate::runtime::executor::block_on(async {
let (ctx, task_id) = Context::current_task().unwrap();
assert_eq!(ctx.name(), Scheduler::DUMMY_NAME);
assert_eq!(task_id, super::TaskId(0));
let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0));
Ok(())
});
assert!(res.is_ok());
assert!(Context::is_context_thread());
});
assert!(!Context::is_context_thread());
}
#[test]
fn block_on_timer() {
gst::init().unwrap();

View file

@ -10,9 +10,8 @@ use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use super::context::Context;
use super::TaskId;
use super::{Handle, Scheduler};
use super::{context::Context, scheduler};
#[derive(Debug)]
pub struct JoinError(TaskId);
@ -28,14 +27,18 @@ impl std::error::Error for JoinError {}
pub struct JoinHandle<T> {
task: Option<async_task::Task<T>>,
task_id: TaskId,
scheduler: Handle,
scheduler: scheduler::ThrottlingHandle,
}
unsafe impl<T: Send> Send for JoinHandle<T> {}
unsafe impl<T: Send> Sync for JoinHandle<T> {}
impl<T> JoinHandle<T> {
pub(super) fn new(task_id: TaskId, task: async_task::Task<T>, scheduler: &Handle) -> Self {
pub(super) fn new(
task_id: TaskId,
task: async_task::Task<T>,
scheduler: &scheduler::ThrottlingHandle,
) -> Self {
JoinHandle {
task: Some(task),
task_id,
@ -44,7 +47,9 @@ impl<T> JoinHandle<T> {
}
pub fn is_current(&self) -> bool {
if let Some((cur_scheduler, task_id)) = Scheduler::current().zip(TaskId::current()) {
if let Some((cur_scheduler, task_id)) =
scheduler::Throttling::current().zip(TaskId::current())
{
cur_scheduler == self.scheduler && task_id == self.task_id
} else {
false

View file

@ -35,7 +35,6 @@ use reactor::{Reactor, Readable, ReadableOwned, Registration, Source, Writable,
// We need the `Mutex<bool>` to work in pair with `Condvar`.
#[allow(clippy::mutex_atomic)]
mod scheduler;
use scheduler::{Handle, HandleWeak, Scheduler};
mod task;
pub use task::{SubTaskOutput, TaskId};

View file

@ -182,7 +182,7 @@ impl Reactor {
f(reactor
.borrow()
.as_ref()
.expect("Not running in a Context."))
.expect("Reactor initialized at this point"))
})
}
@ -207,7 +207,7 @@ impl Reactor {
f(reactor
.borrow_mut()
.as_mut()
.expect("Not running in a Context."))
.expect("Reactor initialized at this point"))
})
}

View file

@ -6,9 +6,7 @@
use futures::future::poll_fn;
use futures::pin_mut;
use gio::glib::clone::Downgrade;
use std::cell::RefCell;
use std::cell::OnceCell;
use std::future::Future;
use std::panic;
#[cfg(feature = "tuning")]
@ -27,40 +25,144 @@ use super::{CallOnDrop, JoinHandle, Reactor};
use crate::runtime::RUNTIME_CAT;
thread_local! {
static CURRENT_SCHEDULER: RefCell<Option<HandleWeak>> = const { RefCell::new(None) };
static CURRENT_SCHEDULER: OnceCell<Scheduler> = const { OnceCell::new() };
}
#[derive(Debug)]
pub(super) struct Scheduler {
pub(super) enum Scheduler {
Blocking(Blocking),
Throttling(ThrottlingHandleWeak),
}
impl Scheduler {
pub fn is_blocking(&self) -> bool {
matches!(*self, Scheduler::Blocking(..))
}
}
#[derive(Debug, Default)]
pub(super) struct Blocking {
tasks_queue: TaskQueue,
}
impl Blocking {
const MAX_SUCCESSIVE_TASKS: usize = 64;
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
CURRENT_SCHEDULER.with(|cur_sched| {
let cur_sched = cur_sched.get_or_init(|| {
Reactor::init(Duration::ZERO);
Scheduler::Blocking(Blocking::default())
});
match cur_sched {
Scheduler::Throttling(hdl_weak) => {
let msg = if let Some(hdl) = hdl_weak.upgrade() {
format!(
"Attempt to block on existing Context {}",
hdl.context_name()
)
} else {
"Attempt to block on terminated Context".to_string()
};
gst::error!(RUNTIME_CAT, "{msg}");
panic!("{msg}");
}
Scheduler::Blocking(sched) => {
let (task_id, task) = sched.tasks_queue.add(future);
gst::trace!(RUNTIME_CAT, "Blocking on current thread with {task_id:?}");
let _guard = CallOnDrop::new(|| {
gst::trace!(
RUNTIME_CAT,
"Done blocking on current thread with {task_id:?}",
);
});
// Blocking on `task` which is cheap to `poll`.
let waker = waker_fn(|| ());
let cx = &mut std::task::Context::from_waker(&waker);
pin_mut!(task);
let mut now;
let mut tasks_checked;
'main: loop {
now = Instant::now();
Reactor::with_mut(|reactor| reactor.react(now).ok());
if let Poll::Ready(t) = task.as_mut().poll(cx) {
return t;
}
tasks_checked = 0;
while tasks_checked < Self::MAX_SUCCESSIVE_TASKS {
let Ok(runnable) = sched.tasks_queue.pop_runnable() else {
continue 'main;
};
if let Err(err) = panic::catch_unwind(|| runnable.run()) {
gst::error!(
RUNTIME_CAT,
"A task panicked blocking on current thread"
);
panic::resume_unwind(err);
}
tasks_checked += 1;
}
}
}
}
})
}
}
#[derive(Debug, Default)]
pub(super) struct Throttling {
context_name: Arc<str>,
max_throttling: Duration,
tasks: TaskQueue,
task_queue: TaskQueue,
must_unpark: Mutex<bool>,
must_unpark_cvar: Condvar,
#[cfg(feature = "tuning")]
parked_duration: AtomicU64,
}
impl Scheduler {
pub const DUMMY_NAME: &'static str = "DUMMY";
impl Throttling {
const MAX_SUCCESSIVE_TASKS: usize = 64;
pub fn start(context_name: &str, max_throttling: Duration) -> Handle {
pub fn start(context_name: &str, max_throttling: Duration) -> ThrottlingHandle {
// Name the thread so that it appears in panic messages.
let thread = thread::Builder::new().name(context_name.to_string());
let (handle_sender, handle_receiver) = sync_mpsc::channel();
let context_name = Arc::from(context_name);
let thread_ctx_name = Arc::clone(&context_name);
let context_name: Arc<str> = Arc::from(context_name);
let join = thread
.spawn(move || {
gst::debug!(
RUNTIME_CAT,
"Started Scheduler thread for Context {}",
thread_ctx_name
"Started Scheduler thread for Context {context_name}",
);
let handle = Scheduler::init(Arc::clone(&thread_ctx_name), max_throttling);
let handle = CURRENT_SCHEDULER.with(|cur_sched| {
Reactor::init(max_throttling);
let handle = ThrottlingHandle::new(Arc::new(Throttling {
context_name: context_name.clone(),
max_throttling,
..Default::default()
}));
cur_sched.set(Scheduler::Throttling(handle.downgrade())).expect("new thread");
handle
});
let this = Arc::clone(&handle.0.scheduler);
let must_shutdown = handle.0.must_shutdown.clone();
let handle_weak = handle.downgrade();
@ -74,20 +176,24 @@ impl Scheduler {
}
});
let _guard = CallOnDrop::new(|| {
gst::trace!(RUNTIME_CAT, "Closing Scheduler for Context {context_name}");
Reactor::clear();
});
// Blocking on `shutdown_fut` which is cheap to `poll`.
match this.block_on_priv(shutdown_fut) {
Ok(_) => {
gst::debug!(
RUNTIME_CAT,
"Scheduler thread shut down for Context {}",
thread_ctx_name
"Scheduler thread shut down for Context {context_name}",
);
}
Err(e) => {
gst::error!(
RUNTIME_CAT,
"Scheduler thread shut down due to an error within Context {}",
thread_ctx_name
"Scheduler thread shut down due to an error within Context {context_name}",
);
if let Some(handle) = handle_weak.upgrade() {
@ -106,81 +212,6 @@ impl Scheduler {
handle
}
fn init(context_name: Arc<str>, max_throttling: Duration) -> Handle {
let handle = CURRENT_SCHEDULER.with(|cur_scheduler| {
let mut cur_scheduler = cur_scheduler.borrow_mut();
if cur_scheduler.is_some() {
panic!("Attempt to initialize an Scheduler on thread where another Scheduler is running.");
}
let handle = Handle::new(Arc::new(Scheduler {
context_name: context_name.clone(),
max_throttling,
tasks: TaskQueue::new(context_name),
must_unpark: Mutex::new(false),
must_unpark_cvar: Condvar::new(),
#[cfg(feature = "tuning")]
parked_duration: AtomicU64::new(0),
}));
*cur_scheduler = Some(handle.downgrade());
handle
});
Reactor::init(handle.max_throttling());
handle
}
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
assert!(
!Scheduler::is_scheduler_thread(),
"Attempt to block within an existing Scheduler thread."
);
let handle = Scheduler::init(Scheduler::DUMMY_NAME.into(), Duration::ZERO);
let this = Arc::clone(&handle.0.scheduler);
// Move the (only) handle for this scheduler in the main task.
let (task_id, task) = this.tasks.add(async move {
let res = future.await;
let task_id = TaskId::current().unwrap();
let _ = handle.drain_sub_tasks(task_id).await;
res
});
gst::trace!(RUNTIME_CAT, "Blocking on current thread with {:?}", task_id);
let _guard = CallOnDrop::new(|| {
gst::trace!(
RUNTIME_CAT,
"Blocking on current thread with {:?} done",
task_id,
);
});
// Blocking on `task` which is cheap to `poll`.
match this.block_on_priv(task) {
Ok(res) => res,
Err(e) => {
gst::error!(
RUNTIME_CAT,
"Panic blocking on Context {}",
&Scheduler::DUMMY_NAME
);
panic::resume_unwind(e);
}
}
}
// Important: the `termination_future` MUST be cheap to poll.
//
// Examples of appropriate `termination_future` are:
@ -197,8 +228,6 @@ impl Scheduler {
let cx = &mut std::task::Context::from_waker(&waker);
pin_mut!(termination_future);
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
let mut now;
// This is to ensure reactor invocation on the first iteration.
let mut last_react = Instant::now().checked_sub(self.max_throttling).unwrap();
@ -217,13 +246,9 @@ impl Scheduler {
tasks_checked = 0;
while tasks_checked < Self::MAX_SUCCESSIVE_TASKS {
if let Ok(runnable) = self.tasks.pop_runnable() {
if let Ok(runnable) = self.task_queue.pop_runnable() {
panic::catch_unwind(|| runnable.run()).inspect_err(|_err| {
gst::error!(
RUNTIME_CAT,
"A task has panicked within Context {}",
self.context_name
);
gst::error!(RUNTIME_CAT, "A task panicked {}", self.context_name);
})?;
tasks_checked += 1;
@ -266,62 +291,59 @@ impl Scheduler {
self.must_unpark_cvar.notify_one();
}
fn close(context_name: Arc<str>) {
gst::trace!(
RUNTIME_CAT,
"Closing Scheduler for Context {}",
context_name,
);
Reactor::clear();
let _ = CURRENT_SCHEDULER.try_with(|cur_scheduler| {
*cur_scheduler.borrow_mut() = None;
});
}
pub fn is_scheduler_thread() -> bool {
CURRENT_SCHEDULER.with(|cur_scheduler| cur_scheduler.borrow().is_some())
}
pub fn current() -> Option<Handle> {
pub fn is_throttling_thread() -> bool {
CURRENT_SCHEDULER.with(|cur_scheduler| {
cur_scheduler
.borrow()
.as_ref()
.and_then(HandleWeak::upgrade)
let Some(sched) = cur_scheduler.get() else {
return false;
};
!sched.is_blocking()
})
}
pub fn is_current(&self) -> bool {
CURRENT_SCHEDULER.with(|cur_scheduler| {
cur_scheduler
.borrow()
.as_ref()
.and_then(HandleWeak::upgrade)
.is_some_and(|cur| std::ptr::eq(self, Arc::as_ptr(&cur.0.scheduler)))
pub fn current() -> Option<ThrottlingHandle> {
CURRENT_SCHEDULER.with(|cur_scheduler| match cur_scheduler.get()? {
Scheduler::Blocking(_) => None,
Scheduler::Throttling(hdl_weak) => hdl_weak.upgrade(),
})
}
fn is_current(&self) -> bool {
CURRENT_SCHEDULER.with(|cur_sched| match cur_sched.get() {
None | Some(Scheduler::Blocking(_)) => false,
Some(Scheduler::Throttling(hdl_weak)) => {
if let Some(hdl) = hdl_weak.upgrade() {
std::ptr::eq(self, Arc::as_ptr(&hdl.0.scheduler))
} else {
false
}
}
})
}
}
#[derive(Debug)]
struct HandleInner {
scheduler: Arc<Scheduler>,
struct ThrottlingHandleInner {
scheduler: Arc<Throttling>,
must_shutdown: Arc<AtomicBool>,
join: Mutex<Option<thread::JoinHandle<()>>>,
}
impl HandleInner {
fn new(scheduler: Arc<Scheduler>) -> Self {
HandleInner {
impl ThrottlingHandleInner {
fn new(scheduler: Arc<Throttling>) -> Self {
ThrottlingHandleInner {
scheduler,
must_shutdown: Default::default(),
join: Default::default(),
}
}
#[track_caller]
fn context_name(&self) -> &str {
self.scheduler.context_name.as_ref()
}
}
impl Drop for HandleInner {
impl Drop for ThrottlingHandleInner {
fn drop(&mut self) {
if !self.must_shutdown.fetch_or(true, Ordering::SeqCst) {
// Was not already shutting down.
@ -330,7 +352,7 @@ impl Drop for HandleInner {
gst::trace!(
RUNTIME_CAT,
"Shutting down Scheduler thread for Context {}",
self.scheduler.context_name
self.context_name(),
);
// Don't block shutting down itself
@ -350,20 +372,20 @@ impl Drop for HandleInner {
}
#[derive(Clone, Debug)]
pub(super) struct HandleWeak(Weak<HandleInner>);
pub(super) struct ThrottlingHandleWeak(Weak<ThrottlingHandleInner>);
impl HandleWeak {
pub(super) fn upgrade(&self) -> Option<Handle> {
self.0.upgrade().map(Handle)
impl ThrottlingHandleWeak {
pub(super) fn upgrade(&self) -> Option<ThrottlingHandle> {
self.0.upgrade().map(ThrottlingHandle)
}
}
#[derive(Clone, Debug)]
pub(super) struct Handle(Arc<HandleInner>);
pub(super) struct ThrottlingHandle(Arc<ThrottlingHandleInner>);
impl Handle {
fn new(scheduler: Arc<Scheduler>) -> Self {
Handle(Arc::new(HandleInner::new(scheduler)))
impl ThrottlingHandle {
fn new(scheduler: Arc<Throttling>) -> Self {
ThrottlingHandle(Arc::new(ThrottlingHandleInner::new(scheduler)))
}
fn set_join_handle(&self, join: thread::JoinHandle<()>) {
@ -376,7 +398,7 @@ impl Handle {
}
pub fn context_name(&self) -> &str {
&self.0.scheduler.context_name
self.0.context_name()
}
pub fn max_throttling(&self) -> Duration {
@ -402,12 +424,15 @@ impl Handle {
F: FnOnce() -> O + Send + 'a,
O: Send + 'a,
{
assert!(!self.0.scheduler.is_current());
assert!(
!self.0.scheduler.is_current(),
"Attempting to `enter()` current `Context` from itself"
);
// Safety: bounding `self` to `'a` and blocking on the task
// ensures that the lifetime bounds satisfy the safety
// requirements for `TaskQueue::add_sync`.
let task = unsafe { self.0.scheduler.tasks.add_sync(f) };
let task = unsafe { self.0.scheduler.task_queue.add_sync(f) };
self.0.scheduler.unpark();
futures::executor::block_on(task)
}
@ -417,7 +442,7 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task_id, task) = self.0.scheduler.tasks.add(future);
let (task_id, task) = self.0.scheduler.task_queue.add(future);
JoinHandle::new(task_id, task, self)
}
@ -426,7 +451,7 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task_id, task) = self.0.scheduler.tasks.add(future);
let (task_id, task) = self.0.scheduler.task_queue.add(future);
self.0.scheduler.unpark();
JoinHandle::new(task_id, task, self)
}
@ -439,20 +464,20 @@ impl Handle {
where
T: Future<Output = SubTaskOutput> + Send + 'static,
{
self.0.scheduler.tasks.add_sub_task(task_id, sub_task)
self.0.scheduler.task_queue.add_sub_task(task_id, sub_task)
}
pub fn downgrade(&self) -> HandleWeak {
HandleWeak(self.0.downgrade())
pub fn downgrade(&self) -> ThrottlingHandleWeak {
ThrottlingHandleWeak(Arc::downgrade(&self.0))
}
pub async fn drain_sub_tasks(&self, task_id: TaskId) -> SubTaskOutput {
let sub_tasks_fut = self.0.scheduler.tasks.drain_sub_tasks(task_id);
let sub_tasks_fut = self.0.scheduler.task_queue.drain_sub_tasks(task_id);
sub_tasks_fut.await
}
}
impl PartialEq for Handle {
impl PartialEq for ThrottlingHandle {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
@ -460,9 +485,12 @@ impl PartialEq for Handle {
#[cfg(test)]
mod tests {
use super::super::*;
use std::sync::Arc;
use std::time::Duration;
use super::{Blocking, Reactor, Throttling, ThrottlingHandle};
use crate::runtime::timer;
#[test]
fn block_on_task_join_handle() {
use futures::channel::oneshot;
@ -472,8 +500,14 @@ mod tests {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
std::thread::spawn(move || {
let handle =
Scheduler::init("block_on_task_join_handle".into(), Duration::from_millis(2));
let handle = ThrottlingHandle::new(Arc::new(Throttling {
context_name: "block_on_task_join_handle".into(),
max_throttling: Duration::from_millis(2),
..Default::default()
}));
Reactor::init(Duration::from_millis(2));
let join_handle = handle.spawn(async {
timer::delay_for(Duration::from_millis(5)).await;
42
@ -484,7 +518,7 @@ mod tests {
});
let task_join_handle = join_receiver.recv().unwrap();
let res = Scheduler::block_on(task_join_handle).unwrap();
let res = Blocking::block_on(task_join_handle).unwrap();
let _ = shutdown_sender.send(());
@ -493,7 +527,7 @@ mod tests {
#[test]
fn block_on_timer() {
let res = Scheduler::block_on(async {
let res = Blocking::block_on(async {
timer::delay_for(Duration::from_millis(5)).await;
42
});
@ -503,7 +537,7 @@ mod tests {
#[test]
fn enter_non_static() {
let handle = Scheduler::start("enter_non_static", Duration::from_millis(2));
let handle = Throttling::start("enter_non_static", Duration::from_millis(2));
let mut flag = false;
handle.enter(|| flag = true);

View file

@ -72,7 +72,7 @@ impl<F: Future> Future for TaskFuture<F> {
}
}
struct Task {
pub(super) struct Task {
id: TaskId,
sub_tasks: VecDeque<BoxFuture<'static, SubTaskOutput>>,
}
@ -102,63 +102,49 @@ impl fmt::Debug for Task {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(super) struct TaskQueue {
runnables: Arc<ConcurrentQueue<Runnable>>,
// FIXME good point about using a slab is that it's probably faster than a HashMap
// However since we reuse the vacant entries, we get the same TaskId
// which can harm debugging. If this is not acceptable, I'll switch back to using
// a HashMap.
tasks: Arc<Mutex<Slab<Task>>>,
context_name: Arc<str>,
}
impl TaskQueue {
pub fn new(context_name: Arc<str>) -> Self {
impl Default for TaskQueue {
fn default() -> Self {
TaskQueue {
runnables: Arc::new(ConcurrentQueue::unbounded()),
tasks: Arc::new(Mutex::new(Slab::new())),
context_name,
}
}
}
impl TaskQueue {
pub fn add<F>(&self, future: F) -> (TaskId, async_task::Task<<F as Future>::Output>)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let tasks_clone = Arc::clone(&self.tasks);
let tasks_weak = Arc::downgrade(&self.tasks);
let mut tasks = self.tasks.lock().unwrap();
let task_id = TaskId(tasks.vacant_entry().key());
let context_name = Arc::clone(&self.context_name);
let task_fut = async move {
gst::trace!(
RUNTIME_CAT,
"Running {:?} on context {}",
task_id,
context_name
);
gst::trace!(RUNTIME_CAT, "Running {task_id:?}");
let _guard = CallOnDrop::new(move || {
if let Some(task) = tasks_clone.lock().unwrap().try_remove(task_id.0) {
if let Some(task) = tasks_weak
.upgrade()
.and_then(|tasks| tasks.lock().unwrap().try_remove(task_id.0))
{
if !task.sub_tasks.is_empty() {
gst::warning!(
RUNTIME_CAT,
"Task {:?} on context {} has {} pending sub tasks",
task_id,
context_name,
"Task {task_id:?} has {} pending sub tasks",
task.sub_tasks.len(),
);
}
}
gst::trace!(
RUNTIME_CAT,
"Done {:?} on context {}",
task_id,
context_name
);
gst::trace!(RUNTIME_CAT, "Done {task_id:?}",);
});
TaskFuture {
@ -195,24 +181,13 @@ impl TaskQueue {
let mut tasks = self.tasks.lock().unwrap();
let task_id = TaskId(tasks.vacant_entry().key());
let context_name = Arc::clone(&self.context_name);
let task_fut = async move {
gst::trace!(
RUNTIME_CAT,
"Executing sync function on context {} as {:?}",
context_name,
task_id,
);
gst::trace!(RUNTIME_CAT, "Executing sync function as {task_id:?}");
let _guard = CallOnDrop::new(move || {
let _ = tasks_clone.lock().unwrap().try_remove(task_id.0);
gst::trace!(
RUNTIME_CAT,
"Done executing sync function on context {} as {:?}",
context_name,
task_id,
);
gst::trace!(RUNTIME_CAT, "Done executing sync function as {task_id:?}");
});
f()
@ -243,12 +218,7 @@ impl TaskQueue {
let mut state = self.tasks.lock().unwrap();
match state.get_mut(task_id.0) {
Some(task) => {
gst::trace!(
RUNTIME_CAT,
"Adding subtask to {:?} on context {}",
task_id,
self.context_name
);
gst::trace!(RUNTIME_CAT, "Adding subtask to {task_id:?}");
task.add_sub_task(sub_task);
Ok(())
}
@ -268,10 +238,8 @@ impl TaskQueue {
gst::trace!(
RUNTIME_CAT,
"Scheduling draining {} sub tasks from {:?} on '{}'",
"Draining {} sub tasks from {task_id:?}",
sub_tasks.len(),
task_id,
self.context_name,
);
for sub_task in sub_tasks.drain(..) {

View file

@ -508,7 +508,7 @@ impl FusedStream for IntervalAfter {
mod tests {
use std::time::{Duration, Instant};
use crate::runtime::executor::Scheduler;
use crate::runtime::executor::scheduler;
const MAX_THROTTLING: Duration = Duration::from_millis(10);
const DELAY: Duration = Duration::from_millis(12);
@ -518,7 +518,7 @@ mod tests {
fn delay_for_regular() {
gst::init().unwrap();
let handle = Scheduler::start("delay_for_regular", MAX_THROTTLING);
let handle = scheduler::Throttling::start("delay_for_regular", MAX_THROTTLING);
futures::executor::block_on(handle.spawn(async {
let start = Instant::now();
@ -533,7 +533,7 @@ mod tests {
fn delay_for_at_least() {
gst::init().unwrap();
let handle = Scheduler::start("delay_for_at_least", MAX_THROTTLING);
let handle = scheduler::Throttling::start("delay_for_at_least", MAX_THROTTLING);
futures::executor::block_on(handle.spawn(async {
let start = Instant::now();
@ -550,7 +550,7 @@ mod tests {
gst::init().unwrap();
let handle = Scheduler::start("interval_regular", MAX_THROTTLING);
let handle = scheduler::Throttling::start("interval_regular", MAX_THROTTLING);
let join_handle = handle.spawn(async move {
let mut acc = Duration::ZERO;
@ -578,7 +578,7 @@ mod tests {
gst::init().unwrap();
let handle = Scheduler::start("interval_after", MAX_THROTTLING);
let handle = scheduler::Throttling::start("interval_after", MAX_THROTTLING);
let join_handle = handle.spawn(async move {
let mut acc = DELAY;