diff --git a/CHANGELOG.md b/CHANGELOG.md index a03347e..3bd27c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Added `federation` parameter group to configuration. - Add empty `spoiler_text` property to Mastodon API Status object. - Added `error` and `error_description` fields to Mastodon API error responses. +- Store information about failed activity deliveries in database. ### Changed diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 21401ee..a9ceea1 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -6,16 +6,13 @@ use reqwest::{Client, Proxy}; use rsa::RsaPrivateKey; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::time::sleep; use mitra_config::Instance; use mitra_utils::crypto_rsa::deserialize_private_key; use crate::database::{ - get_database_client, DatabaseClient, DatabaseError, - DbPool, }; use crate::http_signatures::create::{ create_http_signature, @@ -26,10 +23,7 @@ use crate::json_signatures::create::{ sign_object, JsonSignatureError, }; -use crate::models::{ - profiles::queries::set_reachability_status, - users::types::User, -}; +use crate::models::users::types::User; use super::actors::types::Actor; use super::constants::AP_MEDIA_TYPE; use super::identifiers::{local_actor_id, local_actor_key_id}; @@ -122,26 +116,19 @@ async fn send_activity( Ok(()) } -// 30 secs, 5 mins, 50 mins, 8 hours -fn backoff(retry_count: u32) -> Duration { - debug_assert!(retry_count > 0); - Duration::from_secs(3 * 10_u64.pow(retry_count)) -} - #[derive(Deserialize, Serialize)] pub struct Recipient { - id: String, + pub id: String, inbox: String, #[serde(default)] - is_delivered: bool, // default to false if serialized data contains no value + pub is_delivered: bool, // default to false if serialized data contains no value } async fn deliver_activity_worker( - maybe_db_pool: Option, instance: Instance, sender: User, activity: Value, - mut recipients: Vec, + recipients: &mut [Recipient], ) -> Result<(), DelivererError> { let actor_key = deserialize_private_key(&sender.private_key)?; let actor_id = local_actor_id( @@ -157,56 +144,30 @@ async fn deliver_activity_worker( }; let activity_json = serde_json::to_string(&activity_signed)?; - let mut retry_count = 0; - let max_retries = 2; - - while recipients.iter().any(|recipient| !recipient.is_delivered) && - retry_count <= max_retries - { - if retry_count > 0 { - // Wait before next attempt - sleep(backoff(retry_count)).await; + for recipient in recipients.iter_mut() { + if recipient.is_delivered { + continue; }; - for recipient in recipients.iter_mut() { - if recipient.is_delivered { - continue; - }; - if let Err(error) = send_activity( - &instance, - &actor_key, - &actor_key_id, - &activity_json, - &recipient.inbox, - ).await { - log::warn!( - "failed to deliver activity to {} (attempt #{}): {}", - recipient.inbox, - retry_count + 1, - error, - ); - } else { - recipient.is_delivered = true; - }; - }; - retry_count += 1; - }; - - if let Some(ref db_pool) = maybe_db_pool { - // Get connection from pool only after finishing delivery - let db_client = &**get_database_client(db_pool).await?; - for recipient in recipients { - set_reachability_status( - db_client, - &recipient.id, - recipient.is_delivered, - ).await?; + if let Err(error) = send_activity( + &instance, + &actor_key, + &actor_key_id, + &activity_json, + &recipient.inbox, + ).await { + log::warn!( + "failed to deliver activity to {}: {}", + recipient.inbox, + error, + ); + } else { + recipient.is_delivered = true; }; }; Ok(()) } pub struct OutgoingActivity { - pub db_pool: Option, // needed to track unreachable actors (optional) pub instance: Instance, pub sender: User, pub activity: Value, @@ -233,7 +194,6 @@ impl OutgoingActivity { }; }; Self { - db_pool: None, instance: instance.clone(), sender: sender.clone(), activity: serde_json::to_value(activity) @@ -243,23 +203,15 @@ impl OutgoingActivity { } pub(super) async fn deliver( - self, - ) -> Result<(), DelivererError> { + mut self, + ) -> Result, DelivererError> { deliver_activity_worker( - self.db_pool, self.instance, self.sender, self.activity, - self.recipients, - ).await - } - - pub(super) fn spawn_deliver(self) -> () { - tokio::spawn(async move { - self.deliver().await.unwrap_or_else(|err| { - log::error!("{}", err); - }); - }); + &mut self.recipients, + ).await?; + Ok(self.recipients) } pub async fn enqueue( @@ -278,18 +230,8 @@ impl OutgoingActivity { activity: self.activity, sender_id: self.sender.id, recipients: self.recipients, + failure_count: 0, }; - job_data.into_job(db_client).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_backoff() { - assert_eq!(backoff(1).as_secs(), 30); - assert_eq!(backoff(2).as_secs(), 300); + job_data.into_job(db_client, 0).await } } diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index 0585093..46574a7 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -19,6 +19,7 @@ use crate::models::{ delete_job_from_queue, }, background_jobs::types::JobType, + profiles::queries::set_reachability_status, users::queries::get_user_by_id, }; use super::deliverer::{OutgoingActivity, Recipient}; @@ -97,9 +98,9 @@ pub async fn process_queued_incoming_activities( !matches!(error, HandlerError::FetchError(FetchError::RecursionError)) { // Re-queue - log::info!("activity re-queued"); let retry_after = incoming_queue_backoff(job_data.failure_count); job_data.into_job(db_client, retry_after).await?; + log::info!("activity re-queued"); }; }; delete_job_from_queue(db_client, &job.id).await?; @@ -112,16 +113,18 @@ pub struct OutgoingActivityJobData { pub activity: Value, pub sender_id: Uuid, pub recipients: Vec, + pub failure_count: u32, } impl OutgoingActivityJobData { pub async fn into_job( self, db_client: &impl DatabaseClient, + delay: u32, ) -> Result<(), DatabaseError> { let job_data = serde_json::to_value(self) .expect("activity should be serializable"); - let scheduled_for = Utc::now(); + let scheduled_for = Utc::now() + Duration::seconds(delay.into()); enqueue_job( db_client, &JobType::OutgoingActivity, @@ -132,6 +135,13 @@ impl OutgoingActivityJobData { } const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1; +const OUTGOING_QUEUE_RETRIES_MAX: u32 = 2; + +// 30 secs, 5 mins, 50 mins, 8 hours +pub fn outgoing_queue_backoff(failure_count: u32) -> u32 { + debug_assert!(failure_count > 0); + 3 * 10_u32.pow(failure_count) +} pub async fn process_queued_outgoing_activities( config: &Config, @@ -144,19 +154,64 @@ pub async fn process_queued_outgoing_activities( OUTGOING_QUEUE_BATCH_SIZE, ).await?; for job in batch { - let job_data: OutgoingActivityJobData = + let mut job_data: OutgoingActivityJobData = serde_json::from_value(job.job_data) .map_err(|_| DatabaseTypeError)?; let sender = get_user_by_id(db_client, &job_data.sender_id).await?; let outgoing_activity = OutgoingActivity { - db_pool: Some(db_pool.clone()), instance: config.instance(), sender, - activity: job_data.activity, + activity: job_data.activity.clone(), recipients: job_data.recipients, }; - outgoing_activity.spawn_deliver(); + + let recipients = match outgoing_activity.deliver().await { + Ok(recipients) => recipients, + Err(error) => { + // Unexpected error + log::error!("{}", error); + delete_job_from_queue(db_client, &job.id).await?; + return Ok(()); + }, + }; + log::info!( + "delivery job: {} delivered, {} errors (attempt #{})", + recipients.iter().filter(|item| item.is_delivered).count(), + recipients.iter().filter(|item| !item.is_delivered).count(), + job_data.failure_count + 1, + ); + if recipients.iter().any(|recipient| !recipient.is_delivered) && + job_data.failure_count < OUTGOING_QUEUE_RETRIES_MAX + { + job_data.failure_count += 1; + // Re-queue if some deliveries are not successful + job_data.recipients = recipients; + let retry_after = outgoing_queue_backoff(job_data.failure_count); + job_data.into_job(db_client, retry_after).await?; + log::info!("delivery job re-queued"); + } else { + // Update inbox status if all deliveries are successful + // or if retry limit is reached + for recipient in recipients { + set_reachability_status( + db_client, + &recipient.id, + recipient.is_delivered, + ).await?; + }; + }; delete_job_from_queue(db_client, &job.id).await?; }; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_outgoing_queue_backoff() { + assert_eq!(outgoing_queue_backoff(1), 30); + assert_eq!(outgoing_queue_backoff(2), 300); + } +}