Explicitly drop arbiters, use notify_waiters not notify_one

This commit is contained in:
asonix 2022-11-17 23:48:54 -06:00
parent 2b6e1b3be3
commit 8267df8311

View file

@ -156,7 +156,7 @@ impl Timer for ActixTimer {
/// Manager attempts to restart workers as their arbiters die
pub struct Manager {
// the manager arbiter
_arbiter: ArbiterDropper,
arbiter_dropper: Option<ArbiterDropper>,
// handle for queueing
queue_handle: QueueHandle,
@ -175,45 +175,43 @@ impl Manager {
where
State: Clone,
{
let arbiter = Arbiter::new();
let manager_arbiter = Arbiter::new();
let worker_arbiter = Arbiter::new();
let notifier = Arc::new(tokio::sync::Notify::new());
let queue_handle = worker_config.queue_handle.clone();
let drop_notifier = DropNotifier::new(Arc::clone(&notifier));
arbiter.spawn(async move {
manager_arbiter.spawn(async move {
let mut drop_notifier = drop_notifier;
let mut arbiter = ArbiterDropper {
arbiter: Some(worker_arbiter),
};
let mut arbiter_dropper = ArbiterDropper::from(worker_arbiter);
loop {
worker_config.start_managed(&arbiter.handle(), &drop_notifier);
let notified = notifier.notified();
worker_config.start_managed(&arbiter_dropper.handle(), &drop_notifier);
notifier.notified().await;
notified.await;
tracing::warn!("Recovering from dead worker arbiter");
// drop_notifier needs to live at least as long as notifier.notified().await
// in order to ensure we get notified by ticker or a worker, and not ourselves
drop(drop_notifier);
// Assume arbiter is dead if we were notified
let online = arbiter.spawn(async {});
let online = arbiter_dropper.spawn(async {});
if online {
panic!("Arbiter should be dead by now");
}
arbiter = ArbiterDropper {
arbiter: Some(Arbiter::new()),
};
drop(arbiter_dropper);
arbiter_dropper = ArbiterDropper::default();
drop_notifier = DropNotifier::new(Arc::clone(&notifier));
}
});
Manager {
_arbiter: ArbiterDropper {
arbiter: Some(arbiter),
},
arbiter_dropper: Some(ArbiterDropper::from(manager_arbiter)),
queue_handle,
}
}
@ -232,6 +230,13 @@ impl Deref for Manager {
}
}
impl Drop for Manager {
fn drop(&mut self) {
tracing::warn!("Dropping manager, tearing down workers");
self.arbiter_dropper.take();
}
}
impl Deref for ArbiterDropper {
type Target = Arbiter;
@ -242,8 +247,26 @@ impl Deref for ArbiterDropper {
impl Drop for ArbiterDropper {
fn drop(&mut self) {
self.stop();
let _ = self.arbiter.take().unwrap().join();
tracing::warn!("Stopping and joining arbiter");
let arbiter = self.arbiter.take().unwrap();
arbiter.stop();
let _ = arbiter.join();
}
}
impl From<Arbiter> for ArbiterDropper {
fn from(arbiter: Arbiter) -> Self {
Self {
arbiter: Some(arbiter),
}
}
}
impl Default for ArbiterDropper {
fn default() -> Self {
ArbiterDropper {
arbiter: Some(Arbiter::new()),
}
}
}
@ -263,7 +286,7 @@ impl DropNotifier {
impl Drop for DropNotifier {
fn drop(&mut self) {
if let Some(notifier) = self.inner.lock().unwrap().take() {
notifier.notify_one();
notifier.notify_waiters();
}
}
}