Test case for activity send errors (#5747)

This commit is contained in:
Nutomic 2025-06-04 13:59:45 +00:00 committed by GitHub
parent fa9be76148
commit ced74b40cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 67 additions and 11 deletions

View file

@ -207,6 +207,7 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
/// the domain name is needed for logging, pass it to the stats printer so it doesn't need to look
/// up the domain itself
#[derive(Debug)]
pub(crate) struct FederationQueueStateWithDomain {
pub domain: String,
pub state: FederationQueueState,

View file

@ -467,6 +467,7 @@ mod test {
use lemmy_utils::error::LemmyResult;
use serde_json::{json, Value};
use serial_test::serial;
use std::sync::{Arc, RwLock};
use test_context::{test_context, AsyncTestContext};
use tokio::{
spawn,
@ -485,6 +486,7 @@ mod test {
cleaned_up: bool,
wait_stop_server: ServerHandle,
is_concurrent: bool,
respond_with_error: Arc<RwLock<bool>>,
}
impl Data {
@ -507,7 +509,8 @@ mod test {
let (inbox_sender, inbox_receiver) = unbounded_channel();
// listen for received activities in background
let wait_stop_server = listen_activities(inbox_sender)?;
let respond_with_error = Arc::new(RwLock::new(false));
let wait_stop_server = listen_activities(inbox_sender, respond_with_error.clone())?;
let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS")
.ok()
@ -537,6 +540,7 @@ mod test {
wait_stop_server,
cleaned_up: false,
is_concurrent: concurrent_sends_per_instance > 1,
respond_with_error,
})
}
@ -674,22 +678,73 @@ mod test {
assert!(instance.updated.is_some());
data.cleanup().await?;
Ok(())
}
#[test_context(Data)]
#[tokio::test]
#[serial]
async fn test_errors(data: &mut Data) -> LemmyResult<()> {
let form = InstanceForm::new(data.instance.domain.clone());
Instance::update(&mut data.context.pool(), data.instance.id, form).await?;
// check initial state
let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(0, rcv.state.fail_count);
assert_eq!(data.instance.id, rcv.state.instance_id);
// set receiver to return error for all inbox requests
*data.respond_with_error.write().unwrap() = true;
// send a few activities
send_activity(data.person.ap_id.clone(), &data.context, false).await?;
send_activity(data.person.ap_id.clone(), &data.context, false).await?;
send_activity(data.person.ap_id.clone(), &data.context, false).await?;
send_activity(data.person.ap_id.clone(), &data.context, false).await?;
// it immediately performs first retry giving us 2 failures
let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(2, rcv.state.fail_count);
// another automatic retry after short sleep
sleep(Duration::try_from_secs_f64(1.25)?).await;
let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(3, rcv.state.fail_count);
// now make sends successfu
*data.respond_with_error.write().unwrap() = false;
// give it time for retry and to deliver activities
sleep(Duration::from_secs(5)).await;
// fail count goes back to 0
let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(0, rcv.state.fail_count);
Ok(())
}
fn listen_activities(inbox_sender: UnboundedSender<String>) -> LemmyResult<ServerHandle> {
fn listen_activities(
inbox_sender: UnboundedSender<String>,
respond_with_error: Arc<RwLock<bool>>,
) -> LemmyResult<ServerHandle> {
let run = HttpServer::new(move || {
App::new()
.app_data(actix_web::web::Data::new(inbox_sender.clone()))
.app_data(actix_web::web::Data::new(respond_with_error.clone()))
.route(
"/inbox",
web::post().to(
|inbox_sender: actix_web::web::Data<UnboundedSender<String>>, body: String| async move {
move |inbox_sender: actix_web::web::Data<UnboundedSender<String>>,
respond_with_error: actix_web::web::Data<Arc<RwLock<bool>>>,
body: String| async move {
tracing::debug!("received activity: {:?}", body);
inbox_sender.send(body.clone()).unwrap();
HttpResponse::new(actix_web::http::StatusCode::OK)
if *respond_with_error.read().unwrap() {
HttpResponse::new(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR)
} else {
HttpResponse::new(actix_web::http::StatusCode::OK)
}
},
),
)
@ -713,18 +768,18 @@ mod test {
wait: bool,
) -> LemmyResult<SentActivity> {
// create outgoing activity
let id = format!(
"http://ds9.lemmy.ml/activities/like/{}",
uuid::Uuid::new_v4()
);
let data = json!({
"actor": "http://ds9.lemmy.ml/u/lemmy_alpha",
"object": "http://ds9.lemmy.ml/comment/1",
"type": "Like",
"id": format!("http://ds9.lemmy.ml/activities/like/{}", uuid::Uuid::new_v4()),
"id": id,
});
let form = SentActivityForm {
ap_id: Url::parse(&format!(
"http://local.com/activity/{}",
uuid::Uuid::new_v4()
))?
.into(),
ap_id: Url::parse(&id)?.into(),
data,
sensitive: false,
send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],