From 3aaa52820ce2b0dfc4cc0959fdae9f01e7bf628c Mon Sep 17 00:00:00 2001 From: Nutomic Date: Fri, 6 Jun 2025 11:09:32 +0000 Subject: [PATCH] fix send test (#5756) --- crates/federate/src/worker.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index f8ca4687d..a9466827f 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -454,6 +454,7 @@ mod test { protocol::context::WithContext, }; use actix_web::{dev::ServerHandle, web, App, HttpResponse, HttpServer}; + use futures::future::try_join_all; use lemmy_api_common::utils::generate_inbox_url; use lemmy_db_schema::{ newtypes::DbUrl, @@ -622,10 +623,9 @@ mod test { assert_eq!(data.instance.id, rcv.state.instance_id); // assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id); // let last_id_before = rcv.state.last_successful_id.unwrap(); - let mut sent = Vec::new(); - for _ in 0..40 { - sent.push(send_activity(data.person.ap_id.clone(), &data.context, false).await?); - } + let sent = + try_join_all((0..40).map(|_| send_activity(data.person.ap_id.clone(), &data.context, false))) + .await?; sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; tracing::debug!("sent activity"); compare_sent_with_receive(data, sent).await?; @@ -651,10 +651,10 @@ mod test { let counts = vec![15, 20, 35]; for count in counts { tracing::debug!("sending {} activities", count); - let mut sent = Vec::new(); - for _ in 0..count { - sent.push(send_activity(data.person.ap_id.clone(), &data.context, false).await?); - } + let sent = try_join_all( + (0..count).map(|_| send_activity(data.person.ap_id.clone(), &data.context, false)), + ) + .await?; sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await; tracing::debug!("sent activity"); compare_sent_with_receive(data, sent).await?; @@ -697,10 +697,8 @@ mod test { *data.respond_with_error.write().unwrap() = true; // send a few activities - send_activity(data.person.ap_id.clone(), &data.context, false).await?; - send_activity(data.person.ap_id.clone(), &data.context, false).await?; - send_activity(data.person.ap_id.clone(), &data.context, false).await?; - send_activity(data.person.ap_id.clone(), &data.context, false).await?; + try_join_all((0..5).map(|_| send_activity(data.person.ap_id.clone(), &data.context, false))) + .await?; // it immediately performs first retry giving us 2 failures let rcv = data.stats_receiver.recv().await.unwrap(); @@ -711,15 +709,16 @@ mod test { let rcv = data.stats_receiver.recv().await.unwrap(); assert_eq!(3, rcv.state.fail_count); - // now make sends successfu + // now make sends successful *data.respond_with_error.write().unwrap() = false; // give it time for retry and to deliver activities - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(2)).await; - // fail count goes back to 0 - let rcv = data.stats_receiver.recv().await.unwrap(); - assert_eq!(0, rcv.state.fail_count); + // fail count goes back to 0 (need to skip all intermediary stats and get the final one) + let mut buffer = vec![]; + data.stats_receiver.recv_many(&mut buffer, 10).await; + assert_eq!(0, buffer.last().unwrap().state.fail_count); Ok(()) } @@ -797,6 +796,7 @@ mod test { Ok(sent) } async fn compare_sent_with_receive(data: &mut Data, mut sent: Vec) -> Result<()> { + assert!(!sent.is_empty()); let check_order = !data.is_concurrent; // allow out-of order receiving when running parallel let mut received = Vec::new(); for _ in 0..sent.len() {