diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index d533233..a968041 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -16,13 +16,13 @@ use crate::models::{ use super::receiver::handle_activity; #[derive(Deserialize, Serialize)] -pub struct IncomingActivity { +pub struct IncomingActivityJobData { activity: Value, is_authenticated: bool, failure_count: i32, } -impl IncomingActivity { +impl IncomingActivityJobData { pub fn new(activity: &Value, is_authenticated: bool) -> Self { Self { activity: activity.clone(), @@ -31,7 +31,7 @@ impl IncomingActivity { } } - pub async fn enqueue( + pub async fn into_job( self, db_client: &impl GenericClient, delay: i64, @@ -62,31 +62,31 @@ pub async fn process_queued_activities( batch_size, ).await?; for job in batch { - let mut incoming_activity: IncomingActivity = + let mut job_data: IncomingActivityJobData = serde_json::from_value(job.job_data) .map_err(|_| DatabaseTypeError)?; let is_error = match handle_activity( config, db_client, - &incoming_activity.activity, - incoming_activity.is_authenticated, + &job_data.activity, + job_data.is_authenticated, ).await { Ok(_) => false, Err(error) => { - incoming_activity.failure_count += 1; + job_data.failure_count += 1; log::warn!( "failed to process activity ({}) (attempt #{}): {}", error, - incoming_activity.failure_count, - incoming_activity.activity, + job_data.failure_count, + job_data.activity, ); true }, }; - if is_error && incoming_activity.failure_count <= max_retries { + if is_error && job_data.failure_count <= max_retries { // Re-queue log::info!("activity re-queued"); - incoming_activity.enqueue(db_client, retry_after).await?; + job_data.into_job(db_client, retry_after).await?; }; delete_job_from_queue(db_client, &job.id).await?; }; diff --git a/src/activitypub/receiver.rs b/src/activitypub/receiver.rs index e694b46..6e7407c 100644 --- a/src/activitypub/receiver.rs +++ b/src/activitypub/receiver.rs @@ -35,7 +35,7 @@ use super::handlers::{ undo::handle_undo, update::handle_update, }; -use super::queues::IncomingActivity; +use super::queues::IncomingActivityJobData; use super::vocabulary::*; #[derive(thiserror::Error, Debug)] @@ -321,8 +321,8 @@ pub async fn receive_activity( if let ANNOUNCE | CREATE | UPDATE = activity_type { // Add activity to job queue and release lock - IncomingActivity::new(activity, is_authenticated) - .enqueue(db_client, 0).await?; + IncomingActivityJobData::new(activity, is_authenticated) + .into_job(db_client, 0).await?; log::debug!("activity added to the queue: {}", activity_type); return Ok(()); };