diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index 539d1f1..0585093 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -29,7 +29,7 @@ use super::receiver::{handle_activity, HandlerError}; pub struct IncomingActivityJobData { activity: Value, is_authenticated: bool, - failure_count: i32, + failure_count: u32, } impl IncomingActivityJobData { @@ -44,11 +44,11 @@ impl IncomingActivityJobData { pub async fn into_job( self, db_client: &impl DatabaseClient, - delay: i64, + delay: u32, ) -> Result<(), DatabaseError> { let job_data = serde_json::to_value(self) .expect("activity should be serializable"); - let scheduled_for = Utc::now() + Duration::seconds(delay); + let scheduled_for = Utc::now() + Duration::seconds(delay.into()); enqueue_job( db_client, &JobType::IncomingActivity, @@ -58,18 +58,22 @@ impl IncomingActivityJobData { } } +const INCOMING_QUEUE_BATCH_SIZE: u32 = 10; +const INCOMING_QUEUE_RETRIES_MAX: u32 = 2; + +const fn incoming_queue_backoff(_failure_count: u32) -> u32 { + // Constant, 10 minutes + 60 * 10 +} + pub async fn process_queued_incoming_activities( config: &Config, db_client: &mut impl DatabaseClient, ) -> Result<(), DatabaseError> { - let batch_size = 10; - let max_retries = 2; - let retry_after = 60 * 10; // 10 minutes - let batch = get_job_batch( db_client, &JobType::IncomingActivity, - batch_size, + INCOMING_QUEUE_BATCH_SIZE, ).await?; for job in batch { let mut job_data: IncomingActivityJobData = @@ -88,12 +92,13 @@ pub async fn process_queued_incoming_activities( job_data.failure_count, job_data.activity, ); - if job_data.failure_count <= max_retries && + if job_data.failure_count <= INCOMING_QUEUE_RETRIES_MAX && // Don't retry after fetcher recursion error !matches!(error, HandlerError::FetchError(FetchError::RecursionError)) { // Re-queue log::info!("activity re-queued"); + let retry_after = incoming_queue_backoff(job_data.failure_count); job_data.into_job(db_client, retry_after).await?; }; }; @@ -126,16 +131,17 @@ impl OutgoingActivityJobData { } } +const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1; + pub async fn process_queued_outgoing_activities( config: &Config, db_pool: &DbPool, ) -> Result<(), DatabaseError> { let db_client = &**get_database_client(db_pool).await?; - let batch_size = 1; let batch = get_job_batch( db_client, &JobType::OutgoingActivity, - batch_size, + OUTGOING_QUEUE_BATCH_SIZE, ).await?; for job in batch { let job_data: OutgoingActivityJobData = diff --git a/src/models/background_jobs/queries.rs b/src/models/background_jobs/queries.rs index 63ebe7f..e26ad02 100644 --- a/src/models/background_jobs/queries.rs +++ b/src/models/background_jobs/queries.rs @@ -30,7 +30,7 @@ pub async fn enqueue_job( pub async fn get_job_batch( db_client: &impl DatabaseClient, job_type: &JobType, - batch_size: i64, + batch_size: u32, ) -> Result, DatabaseError> { let rows = db_client.query( " @@ -54,7 +54,7 @@ pub async fn get_job_batch( &JobStatus::Running, &job_type, &JobStatus::Queued, - &batch_size, + &i64::from(batch_size), ], ).await?; let jobs = rows.iter()