ts/scheduler: rename awake / wake_up as unpark

This commit is contained in:
François Laignel 2022-07-30 13:51:28 +02:00 committed by Sebastian Dröge
parent 833331ab66
commit 28a62e622e
4 changed files with 36 additions and 39 deletions

View file

@ -377,7 +377,7 @@ impl<T: Send + 'static> Drop for Async<T> {
if let Some(io) = self.io.take() { if let Some(io) = self.io.take() {
if let Some(sched) = self.sched.upgrade() { if let Some(sched) = self.sched.upgrade() {
let source = Arc::clone(&self.source); let source = Arc::clone(&self.source);
sched.spawn_and_awake(async move { sched.spawn_and_unpark(async move {
Reactor::with_mut(|reactor| { Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&source) { if let Err(err) = reactor.remove_io(&source) {
gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err); gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err);

View file

@ -246,23 +246,23 @@ impl Context {
self.0.spawn(future) self.0.spawn(future)
} }
pub fn spawn_and_awake<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output> pub fn spawn_and_unpark<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where where
Fut: Future + Send + 'static, Fut: Future + Send + 'static,
Fut::Output: Send + 'static, Fut::Output: Send + 'static,
{ {
self.0.spawn_and_awake(future) self.0.spawn_and_unpark(future)
} }
/// Forces the scheduler to wake up. /// Forces the scheduler to unpark.
/// ///
/// This is not needed by elements implementors as they are /// This is not needed by elements implementors as they are
/// supposed to call [`Self::spawn_and_awake`] when needed. /// supposed to call [`Self::spawn_and_unpark`] when needed.
/// However, it's useful for lower level implementations such as /// However, it's useful for lower level implementations such as
/// `runtime::Task` so as to make sure the iteration loop yields /// `runtime::Task` so as to make sure the iteration loop yields
/// as soon as possible when a transition is requested. /// as soon as possible when a transition is requested.
pub(in crate::runtime) fn wake_up(&self) { pub(in crate::runtime) fn unpark(&self) {
self.0.wake_up(); self.0.unpark();
} }
pub fn current_has_sub_tasks() -> bool { pub fn current_has_sub_tasks() -> bool {

View file

@ -32,8 +32,8 @@ pub(super) struct Scheduler {
context_name: Arc<str>, context_name: Arc<str>,
max_throttling: Duration, max_throttling: Duration,
tasks: TaskQueue, tasks: TaskQueue,
must_awake: Mutex<bool>, must_unpark: Mutex<bool>,
must_awake_cvar: Condvar, must_unpark_cvar: Condvar,
} }
impl Scheduler { impl Scheduler {
@ -102,8 +102,8 @@ impl Scheduler {
context_name: context_name.clone(), context_name: context_name.clone(),
max_throttling, max_throttling,
tasks: TaskQueue::new(context_name), tasks: TaskQueue::new(context_name),
must_awake: Mutex::new(false), must_unpark: Mutex::new(false),
must_awake_cvar: Condvar::new(), must_unpark_cvar: Condvar::new(),
})); }));
*cur_scheduler = Some(handle.downgrade()); *cur_scheduler = Some(handle.downgrade());
@ -198,32 +198,32 @@ impl Scheduler {
})?; })?;
} }
let mut must_awake = self.must_awake.lock().unwrap(); let mut must_unpark = self.must_unpark.lock().unwrap();
loop { loop {
if *must_awake { if *must_unpark {
*must_awake = false; *must_unpark = false;
break; break;
} }
if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) { if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) {
let result = self let result = self
.must_awake_cvar .must_unpark_cvar
.wait_timeout(must_awake, wait_duration) .wait_timeout(must_unpark, wait_duration)
.unwrap(); .unwrap();
must_awake = result.0; must_unpark = result.0;
} else { } else {
*must_awake = false; *must_unpark = false;
break; break;
} }
} }
} }
} }
fn wake_up(&self) { fn unpark(&self) {
let mut must_awake = self.must_awake.lock().unwrap(); let mut must_unpark = self.must_unpark.lock().unwrap();
*must_awake = true; *must_unpark = true;
self.must_awake_cvar.notify_one(); self.must_unpark_cvar.notify_one();
} }
fn close(context_name: Arc<str>) { fn close(context_name: Arc<str>) {
@ -384,7 +384,7 @@ impl Handle {
// ensures that the lifetime bounds satisfy the safety // ensures that the lifetime bounds satisfy the safety
// requirements for `TaskQueue::add_sync`. // requirements for `TaskQueue::add_sync`.
let task = unsafe { self.0.scheduler.tasks.add_sync(f) }; let task = unsafe { self.0.scheduler.tasks.add_sync(f) };
self.0.scheduler.wake_up(); self.0.scheduler.unpark();
futures::executor::block_on(task) futures::executor::block_on(task)
} }
@ -397,18 +397,18 @@ impl Handle {
JoinHandle::new(task_id, task, self) JoinHandle::new(task_id, task, self)
} }
pub fn spawn_and_awake<F>(&self, future: F) -> JoinHandle<F::Output> pub fn spawn_and_unpark<F>(&self, future: F) -> JoinHandle<F::Output>
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
{ {
let (task_id, task) = self.0.scheduler.tasks.add(future); let (task_id, task) = self.0.scheduler.tasks.add(future);
self.0.scheduler.wake_up(); self.0.scheduler.unpark();
JoinHandle::new(task_id, task, self) JoinHandle::new(task_id, task, self)
} }
pub(super) fn wake_up(&self) { pub(super) fn unpark(&self) {
self.0.scheduler.wake_up(); self.0.scheduler.unpark();
} }
pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {

View file

@ -353,7 +353,7 @@ impl TaskInner {
} }
})?; })?;
self.context.as_ref().unwrap().wake_up(); self.context.as_ref().unwrap().unpark();
Ok(ack_rx) Ok(ack_rx)
} }
@ -594,28 +594,25 @@ impl Task {
} }
pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> { pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> {
self.abort_push_wakeup_await(Trigger::FlushStart) self.abort_push_await(Trigger::FlushStart)
} }
pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> { pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> {
self.abort_push_wakeup_await(Trigger::FlushStop) self.abort_push_await(Trigger::FlushStop)
} }
/// Stops the `Started` `Task` and wait for it to finish. /// Stops the `Started` `Task` and wait for it to finish.
pub fn stop(&self) -> Result<TransitionStatus, TransitionError> { pub fn stop(&self) -> Result<TransitionStatus, TransitionError> {
self.abort_push_wakeup_await(Trigger::Stop) self.abort_push_await(Trigger::Stop)
} }
/// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP. /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP.
/// ///
/// This function: /// This function:
/// - Makes sure the iteration loop aborts as soon as possible. /// - Aborts the iteration loop aborts.
/// - Pushes the provided [`Trigger`]. /// - Pushes the provided [`Trigger`].
/// - Awaits for the expected transition as usual. /// - Awaits for the expected transition as usual.
fn abort_push_wakeup_await( fn abort_push_await(&self, trigger: Trigger) -> Result<TransitionStatus, TransitionError> {
&self,
trigger: Trigger,
) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
inner.abort_task_loop(); inner.abort_task_loop();
@ -751,7 +748,7 @@ macro_rules! exec_action {
let join_handle = { let join_handle = {
let mut task_inner = $task_inner.lock().unwrap(); let mut task_inner = $task_inner.lock().unwrap();
let join_handle = $context.spawn_and_awake(action_fut); let join_handle = $context.spawn_and_unpark(action_fut);
task_inner.spawned_task_id = Some(join_handle.task_id()); task_inner.spawned_task_id = Some(join_handle.task_id());
join_handle join_handle
@ -1014,7 +1011,7 @@ impl StateMachine {
// Unprepare is not joined by an ack_rx but by joining the state machine // Unprepare is not joined by an ack_rx but by joining the state machine
// handle, so we don't need to keep track of the spwaned_task_id // handle, so we don't need to keep track of the spwaned_task_id
context context
.spawn_and_awake(async move { .spawn_and_unpark(async move {
self.task_impl.unprepare().await; self.task_impl.unprepare().await;
while Context::current_has_sub_tasks() { while Context::current_has_sub_tasks() {
@ -1119,7 +1116,7 @@ impl StateMachine {
let join_handle = { let join_handle = {
let mut task_inner = task_inner.lock().unwrap(); let mut task_inner = task_inner.lock().unwrap();
let join_handle = context.spawn_and_awake(loop_fut); let join_handle = context.spawn_and_unpark(loop_fut);
task_inner.spawned_task_id = Some(join_handle.task_id()); task_inner.spawned_task_id = Some(join_handle.task_id());
join_handle join_handle