diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d7e634..a89b9d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Added configuration option for automatic assigning of "read-only user" role after registration. - Added `set-role` command. +### Changed + +- Don't retry activity if fetcher recursion limit has been reached. + ### Deprecated - `registrations_open` configuration option. diff --git a/src/activitypub/fetcher/fetchers.rs b/src/activitypub/fetcher/fetchers.rs index 13f2227..000beab 100644 --- a/src/activitypub/fetcher/fetchers.rs +++ b/src/activitypub/fetcher/fetchers.rs @@ -38,6 +38,9 @@ pub enum FetchError { #[error(transparent)] FileError(#[from] std::io::Error), + #[error("too many objects")] + RecursionError, + #[error("{0}")] OtherError(&'static str), } diff --git a/src/activitypub/fetcher/helpers.rs b/src/activitypub/fetcher/helpers.rs index 051d0f5..e486304 100644 --- a/src/activitypub/fetcher/helpers.rs +++ b/src/activitypub/fetcher/helpers.rs @@ -24,6 +24,7 @@ use super::fetchers::{ fetch_actor, fetch_object, perform_webfinger_query, + FetchError, }; pub async fn get_or_import_profile_by_actor_id( @@ -160,6 +161,8 @@ pub async fn get_or_import_profile_by_actor_address( Ok(profile) } +const RECURSION_DEPTH_MAX: usize = 50; + pub async fn import_post( config: &Config, db_client: &mut impl DatabaseClient, @@ -220,9 +223,9 @@ pub async fn import_post( let object = match maybe_object { Some(object) => object, None => { - if fetch_count >= 50 { + if fetch_count >= RECURSION_DEPTH_MAX { // TODO: create tombstone - return Err(ValidationError("too many objects").into()); + return Err(FetchError::RecursionError.into()); }; let object = fetch_object(&instance, &object_id).await .map_err(|err| { diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index bd60886..1b3d8a8 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -21,7 +21,8 @@ use crate::models::{ users::queries::get_user_by_id, }; use super::deliverer::{OutgoingActivity, Recipient}; -use super::receiver::handle_activity; +use super::fetcher::fetchers::FetchError; +use super::receiver::{handle_activity, HandlerError}; #[derive(Deserialize, Serialize)] pub struct IncomingActivityJobData { @@ -73,28 +74,27 @@ pub async fn process_queued_incoming_activities( let mut job_data: IncomingActivityJobData = serde_json::from_value(job.job_data) .map_err(|_| DatabaseTypeError)?; - let is_error = match handle_activity( + if let Err(error) = handle_activity( config, db_client, &job_data.activity, job_data.is_authenticated, ).await { - Ok(_) => false, - Err(error) => { - job_data.failure_count += 1; - log::warn!( - "failed to process activity ({}) (attempt #{}): {}", - error, - job_data.failure_count, - job_data.activity, - ); - true - }, - }; - if is_error && job_data.failure_count <= max_retries { - // Re-queue - log::info!("activity re-queued"); - job_data.into_job(db_client, retry_after).await?; + job_data.failure_count += 1; + log::warn!( + "failed to process activity ({}) (attempt #{}): {}", + error, + job_data.failure_count, + job_data.activity, + ); + if job_data.failure_count <= max_retries && + // Don't retry after fetcher recursion error + !matches!(error, HandlerError::FetchError(FetchError::RecursionError)) + { + // Re-queue + log::info!("activity re-queued"); + job_data.into_job(db_client, retry_after).await?; + }; }; delete_job_from_queue(db_client, &job.id).await?; };