From 0ecf6829841309052a649c45e5cbc7d9a79152dc Mon Sep 17 00:00:00 2001 From: silverpill Date: Sat, 31 Dec 2022 00:06:33 +0000 Subject: [PATCH] Move activity queue helpers to activitypub::queues module --- src/activitypub/mod.rs | 3 +- src/activitypub/queues.rs | 94 +++++++++++++++++++++++++++++++++++++ src/activitypub/receiver.rs | 93 ++---------------------------------- src/job_queue/scheduler.rs | 2 +- 4 files changed, 100 insertions(+), 92 deletions(-) create mode 100644 src/activitypub/queues.rs diff --git a/src/activitypub/mod.rs b/src/activitypub/mod.rs index 6d68087..db6e13a 100644 --- a/src/activitypub/mod.rs +++ b/src/activitypub/mod.rs @@ -8,6 +8,7 @@ mod deliverer; pub mod fetcher; mod handlers; pub mod identifiers; -pub mod receiver; +pub mod queues; +mod receiver; pub mod views; mod vocabulary; diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs new file mode 100644 index 0000000..d533233 --- /dev/null +++ b/src/activitypub/queues.rs @@ -0,0 +1,94 @@ +use chrono::{Duration, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio_postgres::GenericClient; + +use crate::config::Config; +use crate::database::{DatabaseError, DatabaseTypeError}; +use crate::models::{ + background_jobs::queries::{ + enqueue_job, + get_job_batch, + delete_job_from_queue, + }, + background_jobs::types::JobType, +}; +use super::receiver::handle_activity; + +#[derive(Deserialize, Serialize)] +pub struct IncomingActivity { + activity: Value, + is_authenticated: bool, + failure_count: i32, +} + +impl IncomingActivity { + pub fn new(activity: &Value, is_authenticated: bool) -> Self { + Self { + activity: activity.clone(), + is_authenticated, + failure_count: 0, + } + } + + pub async fn enqueue( + self, + db_client: &impl GenericClient, + delay: i64, + ) -> Result<(), DatabaseError> { + let job_data = serde_json::to_value(self) + .expect("activity should be serializable"); + let scheduled_for = Utc::now() + Duration::seconds(delay); + enqueue_job( + db_client, + &JobType::IncomingActivity, + &job_data, + &scheduled_for, + ).await + } +} + +pub async fn process_queued_activities( + config: &Config, + db_client: &mut impl GenericClient, +) -> Result<(), DatabaseError> { + let batch_size = 10; + let max_retries = 2; + let retry_after = 60 * 10; // 10 minutes + + let batch = get_job_batch( + db_client, + &JobType::IncomingActivity, + batch_size, + ).await?; + for job in batch { + let mut incoming_activity: IncomingActivity = + serde_json::from_value(job.job_data) + .map_err(|_| DatabaseTypeError)?; + let is_error = match handle_activity( + config, + db_client, + &incoming_activity.activity, + incoming_activity.is_authenticated, + ).await { + Ok(_) => false, + Err(error) => { + incoming_activity.failure_count += 1; + log::warn!( + "failed to process activity ({}) (attempt #{}): {}", + error, + incoming_activity.failure_count, + incoming_activity.activity, + ); + true + }, + }; + if is_error && incoming_activity.failure_count <= max_retries { + // Re-queue + log::info!("activity re-queued"); + incoming_activity.enqueue(db_client, retry_after).await?; + }; + delete_job_from_queue(db_client, &job.id).await?; + }; + Ok(()) +} diff --git a/src/activitypub/receiver.rs b/src/activitypub/receiver.rs index fd22dce..e694b46 100644 --- a/src/activitypub/receiver.rs +++ b/src/activitypub/receiver.rs @@ -1,9 +1,7 @@ use actix_web::HttpRequest; -use chrono::{Duration, Utc}; use serde::{ Deserialize, Deserializer, - Serialize, de::DeserializeOwned, de::Error as DeserializerError, }; @@ -11,20 +9,12 @@ use serde_json::Value; use tokio_postgres::GenericClient; use crate::config::Config; -use crate::database::{DatabaseError, DatabaseTypeError}; +use crate::database::DatabaseError; use crate::errors::{ ConversionError, HttpError, ValidationError, }; -use crate::models::{ - background_jobs::queries::{ - enqueue_job, - get_job_batch, - delete_job_from_queue, - }, - background_jobs::types::JobType, -}; use super::authentication::{ verify_signed_activity, verify_signed_request, @@ -45,6 +35,7 @@ use super::handlers::{ undo::handle_undo, update::handle_update, }; +use super::queues::IncomingActivity; use super::vocabulary::*; #[derive(thiserror::Error, Debug)] @@ -152,7 +143,7 @@ pub fn deserialize_into_object_id<'de, D>( Ok(object_id) } -async fn handle_activity( +pub async fn handle_activity( config: &Config, db_client: &mut impl GenericClient, activity: &Value, @@ -344,84 +335,6 @@ pub async fn receive_activity( ).await } -#[derive(Deserialize, Serialize)] -struct IncomingActivity { - activity: Value, - is_authenticated: bool, - failure_count: i32, -} - -impl IncomingActivity { - fn new(activity: &Value, is_authenticated: bool) -> Self { - Self { - activity: activity.clone(), - is_authenticated, - failure_count: 0, - } - } - - async fn enqueue( - self, - db_client: &impl GenericClient, - delay: i64, - ) -> Result<(), DatabaseError> { - let job_data = serde_json::to_value(self) - .expect("activity should be serializable"); - let scheduled_for = Utc::now() + Duration::seconds(delay); - enqueue_job( - db_client, - &JobType::IncomingActivity, - &job_data, - &scheduled_for, - ).await - } -} - -pub async fn process_queued_activities( - config: &Config, - db_client: &mut impl GenericClient, -) -> Result<(), DatabaseError> { - let batch_size = 10; - let max_retries = 2; - let retry_after = 60 * 10; // 10 minutes - - let batch = get_job_batch( - db_client, - &JobType::IncomingActivity, - batch_size, - ).await?; - for job in batch { - let mut incoming_activity: IncomingActivity = - serde_json::from_value(job.job_data) - .map_err(|_| DatabaseTypeError)?; - let is_error = match handle_activity( - config, - db_client, - &incoming_activity.activity, - incoming_activity.is_authenticated, - ).await { - Ok(_) => false, - Err(error) => { - incoming_activity.failure_count += 1; - log::warn!( - "failed to process activity ({}) (attempt #{}): {}", - error, - incoming_activity.failure_count, - incoming_activity.activity, - ); - true - }, - }; - if is_error && incoming_activity.failure_count <= max_retries { - // Re-queue - log::info!("activity re-queued"); - incoming_activity.enqueue(db_client, retry_after).await?; - }; - delete_job_from_queue(db_client, &job.id).await?; - }; - Ok(()) -} - #[cfg(test)] mod tests { use serde_json::json; diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index bbf433a..787889f 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -5,7 +5,7 @@ use anyhow::Error; use chrono::{DateTime, Utc}; use uuid::Uuid; -use crate::activitypub::receiver::process_queued_activities; +use crate::activitypub::queues::process_queued_activities; use crate::config::{Config, Instance}; use crate::database::{get_database_client, DbPool}; use crate::ethereum::contracts::Blockchain;