From 4849ac0138c9c28f78b0b1b611c7a95c839e1b4f Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Jul 2024 16:42:57 +0200 Subject: [PATCH] handle todo about smooth exit --- crates/federate/src/send.rs | 5 +++-- crates/federate/src/worker.rs | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs index 8ab8bae73..634aaf8dc 100644 --- a/crates/federate/src/send.rs +++ b/crates/federate/src/send.rs @@ -129,8 +129,9 @@ impl<'a> SendRetryTask<'a> { tokio::select! { () = sleep(retry_delay) => {}, () = stop.cancelled() => { - // save state to db and exit - // TODO: do we need to report state here to prevent hang on exit? + // cancel sending without reporting any result. + // the InstanceWorker needs to be careful to not hang on receive of that + // channel when cancelled (see handle_send_results) return Ok(()); } } diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index b2f887670..247071c36 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -241,8 +241,20 @@ impl InstanceWorker { async fn handle_send_results(&mut self) -> Result<(), anyhow::Error> { let mut force_write = false; let mut events = Vec::new(); - // wait for at least one event but if there's multiple handle them all - self.receive_send_result.recv_many(&mut events, 1000).await; + // Wait for at least one event but if there's multiple handle them all. + // We need to listen to the cancel event here as well in order to prevent a hang on shutdown: + // If the SendRetryTask gets cancelled, it immediately exits without reporting any state. + // So if the worker is waiting for a send result and all SendRetryTask gets cancelled, this recv + // could hang indefinitely otherwise. The tasks will also drop their handle of + // report_send_result which would cause the recv_many method to return 0 elements, but since + // InstanceWorker holds a copy of the send result channel as well, that won't happen. + tokio::select! { + _ = self.receive_send_result.recv_many(&mut events, 1000) => {}, + () = self.stop.cancelled() => { + tracing::debug!("cancelled worker loop while waiting for send results"); + return Ok(()); + } + } for event in events { match event { SendActivityResult::Success(s) => {