From 1efbf5a3fb5fa75308c4cacc983980a4c1c57ee3 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sun, 11 Dec 2022 23:29:57 +0000 Subject: [PATCH] Process Create() activities in background jobs --- src/activitypub/mod.rs | 2 +- src/activitypub/receiver.rs | 98 ++++++++++++++++++++++++++++++++++++- src/job_queue/scheduler.rs | 18 ++++++- 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/src/activitypub/mod.rs b/src/activitypub/mod.rs index ffc5b6e..6d68087 100644 --- a/src/activitypub/mod.rs +++ b/src/activitypub/mod.rs @@ -8,6 +8,6 @@ mod deliverer; pub mod fetcher; mod handlers; pub mod identifiers; -mod receiver; +pub mod receiver; pub mod views; mod vocabulary; diff --git a/src/activitypub/receiver.rs b/src/activitypub/receiver.rs index f546b4f..844996f 100644 --- a/src/activitypub/receiver.rs +++ b/src/activitypub/receiver.rs @@ -1,7 +1,9 @@ use actix_web::HttpRequest; +use chrono::{Duration, Utc}; use serde::{ Deserialize, Deserializer, + Serialize, de::DeserializeOwned, de::Error as DeserializerError, }; @@ -9,12 +11,20 @@ use serde_json::Value; use tokio_postgres::GenericClient; use crate::config::Config; -use crate::database::DatabaseError; +use crate::database::{DatabaseError, DatabaseTypeError}; use crate::errors::{ ConversionError, HttpError, ValidationError, }; +use crate::models::{ + background_jobs::queries::{ + enqueue_job, + get_job_batch, + delete_job_from_queue, + }, + background_jobs::types::JobType, +}; use super::authentication::{ verify_signed_activity, verify_signed_request, @@ -306,6 +316,14 @@ pub async fn receive_activity( }; }; + if activity_type == CREATE { + // Add activity to job queue and release lock + IncomingActivity::new(activity, is_authenticated) + .enqueue(db_client, 0).await?; + log::info!("activity added to the queue: {}", activity_type); + return Ok(()); + }; + handle_activity( config, db_client, @@ -314,6 +332,84 @@ pub async fn receive_activity( ).await } +#[derive(Deserialize, Serialize)] +struct IncomingActivity { + activity: Value, + is_authenticated: bool, + failure_count: i32, +} + +impl IncomingActivity { + fn new(activity: &Value, is_authenticated: bool) -> Self { + Self { + activity: activity.clone(), + is_authenticated, + failure_count: 0, + } + } + + async fn enqueue( + self, + db_client: &impl GenericClient, + delay: i64, + ) -> Result<(), DatabaseError> { + let job_data = serde_json::to_value(self) + .expect("activity should be serializable"); + let scheduled_for = Utc::now() + Duration::seconds(delay); + enqueue_job( + db_client, + &JobType::IncomingActivity, + &job_data, + &scheduled_for, + ).await + } +} + +pub async fn process_queued_activities( + config: &Config, + db_client: &mut impl GenericClient, +) -> Result<(), DatabaseError> { + let batch_size = 10; + let max_retries = 2; + let retry_after = 60 * 10; // 10 minutes + + let batch = get_job_batch( + db_client, + &JobType::IncomingActivity, + batch_size, + ).await?; + for job in batch { + let mut incoming_activity: IncomingActivity = + serde_json::from_value(job.job_data) + .map_err(|_| DatabaseTypeError)?; + let is_error = match handle_activity( + config, + db_client, + &incoming_activity.activity, + incoming_activity.is_authenticated, + ).await { + Ok(_) => false, + Err(error) => { + incoming_activity.failure_count += 1; + log::warn!( + "failed to process activity ({}) (attempt #{}): {}", + error, + incoming_activity.failure_count, + incoming_activity.activity, + ); + true + }, + }; + if is_error && incoming_activity.failure_count <= max_retries { + // Re-queue + log::info!("activity re-queued"); + incoming_activity.enqueue(db_client, retry_after).await?; + }; + delete_job_from_queue(db_client, &job.id).await?; + }; + Ok(()) +} + #[cfg(test)] mod tests { use serde_json::json; diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index 061a9ed..6bfdda3 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -5,8 +5,9 @@ use anyhow::Error; use chrono::{DateTime, Utc}; use uuid::Uuid; +use crate::activitypub::receiver::process_queued_activities; use crate::config::{Config, Instance}; -use crate::database::DbPool; +use crate::database::{get_database_client, DbPool}; use crate::ethereum::contracts::Blockchain; use crate::ethereum::nft::process_nft_events; use crate::ethereum::subscriptions::{ @@ -21,6 +22,7 @@ enum Task { EthereumSubscriptionMonitor, SubscriptionExpirationMonitor, MoneroPaymentMonitor, + IncomingActivityQueue, } impl Task { @@ -31,6 +33,7 @@ impl Task { Self::EthereumSubscriptionMonitor => 300, Self::SubscriptionExpirationMonitor => 300, Self::MoneroPaymentMonitor => 30, + Self::IncomingActivityQueue => 5, } } } @@ -109,6 +112,15 @@ async fn monero_payment_monitor_task( Ok(()) } +async fn incoming_activity_queue_task( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + let db_client = &mut **get_database_client(db_pool).await?; + process_queued_activities(config, db_client).await?; + Ok(()) +} + pub fn run( config: Config, mut maybe_blockchain: Option, @@ -120,6 +132,7 @@ pub fn run( scheduler_state.insert(Task::EthereumSubscriptionMonitor, None); scheduler_state.insert(Task::SubscriptionExpirationMonitor, None); scheduler_state.insert(Task::MoneroPaymentMonitor, None); + scheduler_state.insert(Task::IncomingActivityQueue, None); let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut token_waitlist_map: HashMap> = HashMap::new(); @@ -154,6 +167,9 @@ pub fn run( Task::MoneroPaymentMonitor => { monero_payment_monitor_task(&config, &db_pool).await }, + Task::IncomingActivityQueue => { + incoming_activity_queue_task(&config, &db_pool).await + }, }; task_result.unwrap_or_else(|err| { log::error!("{:?}: {}", task, err);