diff --git a/CHANGELOG.md b/CHANGELOG.md index b4decde..b50874a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Added - Added `/api/v1/accounts/lookup` Mastodon API endpoint. +- Implemented activity delivery queue. ### Changed diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 98b3c6c..76300fe 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -4,11 +4,13 @@ use std::time::Duration; use actix_web::http::Method; use reqwest::{Client, Proxy}; use rsa::RsaPrivateKey; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::time::sleep; +use tokio_postgres::GenericClient; use crate::config::Instance; +use crate::database::DatabaseError; use crate::http_signatures::create::{ create_http_signature, HttpSignatureError, @@ -23,6 +25,7 @@ use crate::utils::crypto_rsa::deserialize_private_key; use super::actors::types::Actor; use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX}; use super::identifiers::local_actor_id; +use super::queues::OutgoingActivityJobData; #[derive(thiserror::Error, Debug)] pub enum DelivererError { @@ -107,8 +110,8 @@ fn backoff(retry_count: u32) -> Duration { Duration::from_secs(3 * 10_u64.pow(retry_count)) } -#[allow(dead_code)] -struct Recipient { +#[derive(Deserialize, Serialize)] +pub struct Recipient { id: String, inbox: String, } @@ -186,10 +189,10 @@ async fn deliver_activity_worker( } pub struct OutgoingActivity { - instance: Instance, - sender: User, + pub instance: Instance, + pub sender: User, pub activity: Value, - recipients: Vec, + pub recipients: Vec, } impl OutgoingActivity { @@ -239,6 +242,18 @@ impl OutgoingActivity { self.deliver_or_log().await; }); } + + pub async fn enqueue( + self, + db_client: &impl GenericClient, + ) -> Result<(), DatabaseError> { + let job_data = OutgoingActivityJobData { + activity: self.activity, + sender_id: self.sender.id, + recipients: self.recipients, + }; + job_data.into_job(db_client).await + } } #[cfg(test)] diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index a968041..3abdbc8 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -2,6 +2,7 @@ use chrono::{Duration, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio_postgres::GenericClient; +use uuid::Uuid; use crate::config::Config; use crate::database::{DatabaseError, DatabaseTypeError}; @@ -12,7 +13,9 @@ use crate::models::{ delete_job_from_queue, }, background_jobs::types::JobType, + users::queries::get_user_by_id, }; +use super::deliverer::{OutgoingActivity, Recipient}; use super::receiver::handle_activity; #[derive(Deserialize, Serialize)] @@ -48,7 +51,7 @@ impl IncomingActivityJobData { } } -pub async fn process_queued_activities( +pub async fn process_queued_incoming_activities( config: &Config, db_client: &mut impl GenericClient, ) -> Result<(), DatabaseError> { @@ -92,3 +95,54 @@ pub async fn process_queued_activities( }; Ok(()) } + +#[derive(Deserialize, Serialize)] +pub struct OutgoingActivityJobData { + pub activity: Value, + pub sender_id: Uuid, + pub recipients: Vec, +} + +impl OutgoingActivityJobData { + pub async fn into_job( + self, + db_client: &impl GenericClient, + ) -> Result<(), DatabaseError> { + let job_data = serde_json::to_value(self) + .expect("activity should be serializable"); + let scheduled_for = Utc::now(); + enqueue_job( + db_client, + &JobType::OutgoingActivity, + &job_data, + &scheduled_for, + ).await + } +} + +pub async fn process_queued_outgoing_activities( + config: &Config, + db_client: &impl GenericClient, +) -> Result<(), DatabaseError> { + let batch_size = 1; + let batch = get_job_batch( + db_client, + &JobType::OutgoingActivity, + batch_size, + ).await?; + for job in batch { + let job_data: OutgoingActivityJobData = + serde_json::from_value(job.job_data) + .map_err(|_| DatabaseTypeError)?; + let sender = get_user_by_id(db_client, &job_data.sender_id).await?; + let outgoing_activity = OutgoingActivity { + instance: config.instance(), + sender, + activity: job_data.activity, + recipients: job_data.recipients, + }; + outgoing_activity.spawn_deliver(); + delete_job_from_queue(db_client, &job.id).await?; + }; + Ok(()) +} diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index 787889f..2a184af 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -5,7 +5,10 @@ use anyhow::Error; use chrono::{DateTime, Utc}; use uuid::Uuid; -use crate::activitypub::queues::process_queued_activities; +use crate::activitypub::queues::{ + process_queued_incoming_activities, + process_queued_outgoing_activities, +}; use crate::config::{Config, Instance}; use crate::database::{get_database_client, DbPool}; use crate::ethereum::contracts::Blockchain; @@ -23,6 +26,7 @@ enum Task { SubscriptionExpirationMonitor, MoneroPaymentMonitor, IncomingActivityQueue, + OutgoingActivityQueue, } impl Task { @@ -34,6 +38,7 @@ impl Task { Self::SubscriptionExpirationMonitor => 300, Self::MoneroPaymentMonitor => 30, Self::IncomingActivityQueue => 5, + Self::OutgoingActivityQueue => 5, } } } @@ -117,7 +122,16 @@ async fn incoming_activity_queue_task( db_pool: &DbPool, ) -> Result<(), Error> { let db_client = &mut **get_database_client(db_pool).await?; - process_queued_activities(config, db_client).await?; + process_queued_incoming_activities(config, db_client).await?; + Ok(()) +} + +async fn outgoing_activity_queue_task( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + let db_client = &**get_database_client(db_pool).await?; + process_queued_outgoing_activities(config, db_client).await?; Ok(()) } @@ -133,6 +147,7 @@ pub fn run( (Task::SubscriptionExpirationMonitor, None), (Task::MoneroPaymentMonitor, None), (Task::IncomingActivityQueue, None), + (Task::OutgoingActivityQueue, None), ]); let mut interval = tokio::time::interval(Duration::from_secs(5)); @@ -171,6 +186,9 @@ pub fn run( Task::IncomingActivityQueue => { incoming_activity_queue_task(&config, &db_pool).await }, + Task::OutgoingActivityQueue => { + outgoing_activity_queue_task(&config, &db_pool).await + }, }; task_result.unwrap_or_else(|err| { log::error!("{:?}: {}", task, err); diff --git a/src/models/background_jobs/types.rs b/src/models/background_jobs/types.rs index 674cf40..3219482 100644 --- a/src/models/background_jobs/types.rs +++ b/src/models/background_jobs/types.rs @@ -11,12 +11,14 @@ use crate::database::{ #[derive(Debug, PartialEq)] pub enum JobType { IncomingActivity, + OutgoingActivity, } impl From<&JobType> for i16 { fn from(value: &JobType) -> i16 { match value { JobType::IncomingActivity => 1, + JobType::OutgoingActivity => 2, } } } @@ -27,6 +29,7 @@ impl TryFrom for JobType { fn try_from(value: i16) -> Result { let job_type = match value { 1 => Self::IncomingActivity, + 2 => Self::OutgoingActivity, _ => return Err(DatabaseTypeError), }; Ok(job_type)