diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index a9466827f..7874c7cba 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -701,28 +701,34 @@ mod test { .await?; // it immediately performs first retry giving us 2 failures - let rcv = data.stats_receiver.recv().await.unwrap(); - assert_eq!(2, rcv.state.fail_count); + wait_receive(2, &mut data.stats_receiver).await; - // another automatic retry after short sleep - sleep(Duration::try_from_secs_f64(1.25)?).await; - let rcv = data.stats_receiver.recv().await.unwrap(); - assert_eq!(3, rcv.state.fail_count); + // another automatic retry after short wait + wait_receive(3, &mut data.stats_receiver).await; // now make sends successful *data.respond_with_error.write().unwrap() = false; - // give it time for retry and to deliver activities - sleep(Duration::from_secs(2)).await; - - // fail count goes back to 0 (need to skip all intermediary stats and get the final one) - let mut buffer = vec![]; - data.stats_receiver.recv_many(&mut buffer, 10).await; - assert_eq!(0, buffer.last().unwrap().state.fail_count); + // fail count goes back to 0 + wait_receive(0, &mut data.stats_receiver).await; Ok(()) } + async fn wait_receive( + expected_fail_count: i32, + rec: &mut UnboundedReceiver, + ) { + // loop until we get the latest event + for _ in 0..5 { + let rcv = rec.recv().await.unwrap(); + if expected_fail_count == rcv.state.fail_count { + return; + } + } + panic!(); + } + fn listen_activities( inbox_sender: UnboundedSender, respond_with_error: Arc>,