diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index feb8a88f3..e75ff265a 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -207,6 +207,7 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result>, } impl Data { @@ -507,7 +509,8 @@ mod test { let (inbox_sender, inbox_receiver) = unbounded_channel(); // listen for received activities in background - let wait_stop_server = listen_activities(inbox_sender)?; + let respond_with_error = Arc::new(RwLock::new(false)); + let wait_stop_server = listen_activities(inbox_sender, respond_with_error.clone())?; let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS") .ok() @@ -537,6 +540,7 @@ mod test { wait_stop_server, cleaned_up: false, is_concurrent: concurrent_sends_per_instance > 1, + respond_with_error, }) } @@ -674,22 +678,73 @@ mod test { assert!(instance.updated.is_some()); - data.cleanup().await?; + Ok(()) + } + + #[test_context(Data)] + #[tokio::test] + #[serial] + async fn test_errors(data: &mut Data) -> LemmyResult<()> { + let form = InstanceForm::new(data.instance.domain.clone()); + Instance::update(&mut data.context.pool(), data.instance.id, form).await?; + + // check initial state + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(0, rcv.state.fail_count); + assert_eq!(data.instance.id, rcv.state.instance_id); + + // set receiver to return error for all inbox requests + *data.respond_with_error.write().unwrap() = true; + + // send a few activities + send_activity(data.person.ap_id.clone(), &data.context, false).await?; + send_activity(data.person.ap_id.clone(), &data.context, false).await?; + send_activity(data.person.ap_id.clone(), &data.context, false).await?; + send_activity(data.person.ap_id.clone(), &data.context, false).await?; + + // it immediately performs first retry giving us 2 failures + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(2, rcv.state.fail_count); + + // another automatic retry after short sleep + sleep(Duration::try_from_secs_f64(1.25)?).await; + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(3, rcv.state.fail_count); + + // now make sends successfu + *data.respond_with_error.write().unwrap() = false; + + // give it time for retry and to deliver activities + sleep(Duration::from_secs(5)).await; + + // fail count goes back to 0 + let rcv = data.stats_receiver.recv().await.unwrap(); + assert_eq!(0, rcv.state.fail_count); Ok(()) } - fn listen_activities(inbox_sender: UnboundedSender) -> LemmyResult { + fn listen_activities( + inbox_sender: UnboundedSender, + respond_with_error: Arc>, + ) -> LemmyResult { let run = HttpServer::new(move || { App::new() .app_data(actix_web::web::Data::new(inbox_sender.clone())) + .app_data(actix_web::web::Data::new(respond_with_error.clone())) .route( "/inbox", web::post().to( - |inbox_sender: actix_web::web::Data>, body: String| async move { + move |inbox_sender: actix_web::web::Data>, + respond_with_error: actix_web::web::Data>>, + body: String| async move { tracing::debug!("received activity: {:?}", body); inbox_sender.send(body.clone()).unwrap(); - HttpResponse::new(actix_web::http::StatusCode::OK) + if *respond_with_error.read().unwrap() { + HttpResponse::new(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR) + } else { + HttpResponse::new(actix_web::http::StatusCode::OK) + } }, ), ) @@ -713,18 +768,18 @@ mod test { wait: bool, ) -> LemmyResult { // create outgoing activity + let id = format!( + "http://ds9.lemmy.ml/activities/like/{}", + uuid::Uuid::new_v4() + ); let data = json!({ "actor": "http://ds9.lemmy.ml/u/lemmy_alpha", "object": "http://ds9.lemmy.ml/comment/1", "type": "Like", - "id": format!("http://ds9.lemmy.ml/activities/like/{}", uuid::Uuid::new_v4()), + "id": id, }); let form = SentActivityForm { - ap_id: Url::parse(&format!( - "http://local.com/activity/{}", - uuid::Uuid::new_v4() - ))? - .into(), + ap_id: Url::parse(&id)?.into(), data, sensitive: false, send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],