From 8267df83117db1699e4f4b40cf1758deb79c3294 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 17 Nov 2022 23:48:54 -0600 Subject: [PATCH] Explicitly drop arbiters, use notify_waiters not notify_one --- jobs-actix/src/lib.rs | 59 ++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index b387607..8beb2dc 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -156,7 +156,7 @@ impl Timer for ActixTimer { /// Manager attempts to restart workers as their arbiters die pub struct Manager { // the manager arbiter - _arbiter: ArbiterDropper, + arbiter_dropper: Option, // handle for queueing queue_handle: QueueHandle, @@ -175,45 +175,43 @@ impl Manager { where State: Clone, { - let arbiter = Arbiter::new(); + let manager_arbiter = Arbiter::new(); let worker_arbiter = Arbiter::new(); let notifier = Arc::new(tokio::sync::Notify::new()); let queue_handle = worker_config.queue_handle.clone(); let drop_notifier = DropNotifier::new(Arc::clone(¬ifier)); - arbiter.spawn(async move { + manager_arbiter.spawn(async move { let mut drop_notifier = drop_notifier; - let mut arbiter = ArbiterDropper { - arbiter: Some(worker_arbiter), - }; + let mut arbiter_dropper = ArbiterDropper::from(worker_arbiter); loop { - worker_config.start_managed(&arbiter.handle(), &drop_notifier); + let notified = notifier.notified(); + worker_config.start_managed(&arbiter_dropper.handle(), &drop_notifier); - notifier.notified().await; + notified.await; + tracing::warn!("Recovering from dead worker arbiter"); // drop_notifier needs to live at least as long as notifier.notified().await // in order to ensure we get notified by ticker or a worker, and not ourselves drop(drop_notifier); // Assume arbiter is dead if we were notified - let online = arbiter.spawn(async {}); + let online = arbiter_dropper.spawn(async {}); if online { panic!("Arbiter should be dead by now"); } - arbiter = ArbiterDropper { - arbiter: Some(Arbiter::new()), - }; + drop(arbiter_dropper); + + arbiter_dropper = ArbiterDropper::default(); drop_notifier = DropNotifier::new(Arc::clone(¬ifier)); } }); Manager { - _arbiter: ArbiterDropper { - arbiter: Some(arbiter), - }, + arbiter_dropper: Some(ArbiterDropper::from(manager_arbiter)), queue_handle, } } @@ -232,6 +230,13 @@ impl Deref for Manager { } } +impl Drop for Manager { + fn drop(&mut self) { + tracing::warn!("Dropping manager, tearing down workers"); + self.arbiter_dropper.take(); + } +} + impl Deref for ArbiterDropper { type Target = Arbiter; @@ -242,8 +247,26 @@ impl Deref for ArbiterDropper { impl Drop for ArbiterDropper { fn drop(&mut self) { - self.stop(); - let _ = self.arbiter.take().unwrap().join(); + tracing::warn!("Stopping and joining arbiter"); + let arbiter = self.arbiter.take().unwrap(); + arbiter.stop(); + let _ = arbiter.join(); + } +} + +impl From for ArbiterDropper { + fn from(arbiter: Arbiter) -> Self { + Self { + arbiter: Some(arbiter), + } + } +} + +impl Default for ArbiterDropper { + fn default() -> Self { + ArbiterDropper { + arbiter: Some(Arbiter::new()), + } } } @@ -263,7 +286,7 @@ impl DropNotifier { impl Drop for DropNotifier { fn drop(&mut self) { if let Some(notifier) = self.inner.lock().unwrap().take() { - notifier.notify_one(); + notifier.notify_waiters(); } } }