handle todo about smooth exit

This commit is contained in:
phiresky 2024-07-15 16:42:57 +02:00
parent 5ab598a6f0
commit 4849ac0138
2 changed files with 17 additions and 4 deletions

View file

@ -129,8 +129,9 @@ impl<'a> SendRetryTask<'a> {
tokio::select! { tokio::select! {
() = sleep(retry_delay) => {}, () = sleep(retry_delay) => {},
() = stop.cancelled() => { () = stop.cancelled() => {
// save state to db and exit // cancel sending without reporting any result.
// TODO: do we need to report state here to prevent hang on exit? // the InstanceWorker needs to be careful to not hang on receive of that
// channel when cancelled (see handle_send_results)
return Ok(()); return Ok(());
} }
} }

View file

@ -241,8 +241,20 @@ impl InstanceWorker {
async fn handle_send_results(&mut self) -> Result<(), anyhow::Error> { async fn handle_send_results(&mut self) -> Result<(), anyhow::Error> {
let mut force_write = false; let mut force_write = false;
let mut events = Vec::new(); let mut events = Vec::new();
// wait for at least one event but if there's multiple handle them all // Wait for at least one event but if there's multiple handle them all.
self.receive_send_result.recv_many(&mut events, 1000).await; // We need to listen to the cancel event here as well in order to prevent a hang on shutdown:
// If the SendRetryTask gets cancelled, it immediately exits without reporting any state.
// So if the worker is waiting for a send result and all SendRetryTask gets cancelled, this recv
// could hang indefinitely otherwise. The tasks will also drop their handle of
// report_send_result which would cause the recv_many method to return 0 elements, but since
// InstanceWorker holds a copy of the send result channel as well, that won't happen.
tokio::select! {
_ = self.receive_send_result.recv_many(&mut events, 1000) => {},
() = self.stop.cancelled() => {
tracing::debug!("cancelled worker loop while waiting for send results");
return Ok(());
}
}
for event in events { for event in events {
match event { match event {
SendActivityResult::Success(s) => { SendActivityResult::Success(s) => {