Implement activity delivery queue

This commit is contained in:
silverpill 2022-12-11 18:41:08 +00:00
parent b392d9164b
commit 534812efa2
5 changed files with 100 additions and 9 deletions

View file

@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added ### Added
- Added `/api/v1/accounts/lookup` Mastodon API endpoint. - Added `/api/v1/accounts/lookup` Mastodon API endpoint.
- Implemented activity delivery queue.
### Changed ### Changed

View file

@ -4,11 +4,13 @@ use std::time::Duration;
use actix_web::http::Method; use actix_web::http::Method;
use reqwest::{Client, Proxy}; use reqwest::{Client, Proxy};
use rsa::RsaPrivateKey; use rsa::RsaPrivateKey;
use serde::Serialize; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_postgres::GenericClient;
use crate::config::Instance; use crate::config::Instance;
use crate::database::DatabaseError;
use crate::http_signatures::create::{ use crate::http_signatures::create::{
create_http_signature, create_http_signature,
HttpSignatureError, HttpSignatureError,
@ -23,6 +25,7 @@ use crate::utils::crypto_rsa::deserialize_private_key;
use super::actors::types::Actor; use super::actors::types::Actor;
use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX}; use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX};
use super::identifiers::local_actor_id; use super::identifiers::local_actor_id;
use super::queues::OutgoingActivityJobData;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum DelivererError { pub enum DelivererError {
@ -107,8 +110,8 @@ fn backoff(retry_count: u32) -> Duration {
Duration::from_secs(3 * 10_u64.pow(retry_count)) Duration::from_secs(3 * 10_u64.pow(retry_count))
} }
#[allow(dead_code)] #[derive(Deserialize, Serialize)]
struct Recipient { pub struct Recipient {
id: String, id: String,
inbox: String, inbox: String,
} }
@ -186,10 +189,10 @@ async fn deliver_activity_worker(
} }
pub struct OutgoingActivity { pub struct OutgoingActivity {
instance: Instance, pub instance: Instance,
sender: User, pub sender: User,
pub activity: Value, pub activity: Value,
recipients: Vec<Recipient>, pub recipients: Vec<Recipient>,
} }
impl OutgoingActivity { impl OutgoingActivity {
@ -239,6 +242,18 @@ impl OutgoingActivity {
self.deliver_or_log().await; self.deliver_or_log().await;
}); });
} }
pub async fn enqueue(
self,
db_client: &impl GenericClient,
) -> Result<(), DatabaseError> {
let job_data = OutgoingActivityJobData {
activity: self.activity,
sender_id: self.sender.id,
recipients: self.recipients,
};
job_data.into_job(db_client).await
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -2,6 +2,7 @@ use chrono::{Duration, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio_postgres::GenericClient; use tokio_postgres::GenericClient;
use uuid::Uuid;
use crate::config::Config; use crate::config::Config;
use crate::database::{DatabaseError, DatabaseTypeError}; use crate::database::{DatabaseError, DatabaseTypeError};
@ -12,7 +13,9 @@ use crate::models::{
delete_job_from_queue, delete_job_from_queue,
}, },
background_jobs::types::JobType, background_jobs::types::JobType,
users::queries::get_user_by_id,
}; };
use super::deliverer::{OutgoingActivity, Recipient};
use super::receiver::handle_activity; use super::receiver::handle_activity;
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
@ -48,7 +51,7 @@ impl IncomingActivityJobData {
} }
} }
pub async fn process_queued_activities( pub async fn process_queued_incoming_activities(
config: &Config, config: &Config,
db_client: &mut impl GenericClient, db_client: &mut impl GenericClient,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
@ -92,3 +95,54 @@ pub async fn process_queued_activities(
}; };
Ok(()) Ok(())
} }
#[derive(Deserialize, Serialize)]
pub struct OutgoingActivityJobData {
pub activity: Value,
pub sender_id: Uuid,
pub recipients: Vec<Recipient>,
}
impl OutgoingActivityJobData {
pub async fn into_job(
self,
db_client: &impl GenericClient,
) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self)
.expect("activity should be serializable");
let scheduled_for = Utc::now();
enqueue_job(
db_client,
&JobType::OutgoingActivity,
&job_data,
&scheduled_for,
).await
}
}
pub async fn process_queued_outgoing_activities(
config: &Config,
db_client: &impl GenericClient,
) -> Result<(), DatabaseError> {
let batch_size = 1;
let batch = get_job_batch(
db_client,
&JobType::OutgoingActivity,
batch_size,
).await?;
for job in batch {
let job_data: OutgoingActivityJobData =
serde_json::from_value(job.job_data)
.map_err(|_| DatabaseTypeError)?;
let sender = get_user_by_id(db_client, &job_data.sender_id).await?;
let outgoing_activity = OutgoingActivity {
instance: config.instance(),
sender,
activity: job_data.activity,
recipients: job_data.recipients,
};
outgoing_activity.spawn_deliver();
delete_job_from_queue(db_client, &job.id).await?;
};
Ok(())
}

View file

@ -5,7 +5,10 @@ use anyhow::Error;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use uuid::Uuid; use uuid::Uuid;
use crate::activitypub::queues::process_queued_activities; use crate::activitypub::queues::{
process_queued_incoming_activities,
process_queued_outgoing_activities,
};
use crate::config::{Config, Instance}; use crate::config::{Config, Instance};
use crate::database::{get_database_client, DbPool}; use crate::database::{get_database_client, DbPool};
use crate::ethereum::contracts::Blockchain; use crate::ethereum::contracts::Blockchain;
@ -23,6 +26,7 @@ enum Task {
SubscriptionExpirationMonitor, SubscriptionExpirationMonitor,
MoneroPaymentMonitor, MoneroPaymentMonitor,
IncomingActivityQueue, IncomingActivityQueue,
OutgoingActivityQueue,
} }
impl Task { impl Task {
@ -34,6 +38,7 @@ impl Task {
Self::SubscriptionExpirationMonitor => 300, Self::SubscriptionExpirationMonitor => 300,
Self::MoneroPaymentMonitor => 30, Self::MoneroPaymentMonitor => 30,
Self::IncomingActivityQueue => 5, Self::IncomingActivityQueue => 5,
Self::OutgoingActivityQueue => 5,
} }
} }
} }
@ -117,7 +122,16 @@ async fn incoming_activity_queue_task(
db_pool: &DbPool, db_pool: &DbPool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let db_client = &mut **get_database_client(db_pool).await?; let db_client = &mut **get_database_client(db_pool).await?;
process_queued_activities(config, db_client).await?; process_queued_incoming_activities(config, db_client).await?;
Ok(())
}
async fn outgoing_activity_queue_task(
config: &Config,
db_pool: &DbPool,
) -> Result<(), Error> {
let db_client = &**get_database_client(db_pool).await?;
process_queued_outgoing_activities(config, db_client).await?;
Ok(()) Ok(())
} }
@ -133,6 +147,7 @@ pub fn run(
(Task::SubscriptionExpirationMonitor, None), (Task::SubscriptionExpirationMonitor, None),
(Task::MoneroPaymentMonitor, None), (Task::MoneroPaymentMonitor, None),
(Task::IncomingActivityQueue, None), (Task::IncomingActivityQueue, None),
(Task::OutgoingActivityQueue, None),
]); ]);
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
@ -171,6 +186,9 @@ pub fn run(
Task::IncomingActivityQueue => { Task::IncomingActivityQueue => {
incoming_activity_queue_task(&config, &db_pool).await incoming_activity_queue_task(&config, &db_pool).await
}, },
Task::OutgoingActivityQueue => {
outgoing_activity_queue_task(&config, &db_pool).await
},
}; };
task_result.unwrap_or_else(|err| { task_result.unwrap_or_else(|err| {
log::error!("{:?}: {}", task, err); log::error!("{:?}: {}", task, err);

View file

@ -11,12 +11,14 @@ use crate::database::{
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum JobType { pub enum JobType {
IncomingActivity, IncomingActivity,
OutgoingActivity,
} }
impl From<&JobType> for i16 { impl From<&JobType> for i16 {
fn from(value: &JobType) -> i16 { fn from(value: &JobType) -> i16 {
match value { match value {
JobType::IncomingActivity => 1, JobType::IncomingActivity => 1,
JobType::OutgoingActivity => 2,
} }
} }
} }
@ -27,6 +29,7 @@ impl TryFrom<i16> for JobType {
fn try_from(value: i16) -> Result<Self, Self::Error> { fn try_from(value: i16) -> Result<Self, Self::Error> {
let job_type = match value { let job_type = match value {
1 => Self::IncomingActivity, 1 => Self::IncomingActivity,
2 => Self::OutgoingActivity,
_ => return Err(DatabaseTypeError), _ => return Err(DatabaseTypeError),
}; };
Ok(job_type) Ok(job_type)