Store information about failed activity deliveries in database

This commit is contained in:
silverpill 2023-02-24 20:17:32 +00:00
parent e4254e7a3d
commit c201f3ea2b
3 changed files with 90 additions and 92 deletions

View file

@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Added `federation` parameter group to configuration. - Added `federation` parameter group to configuration.
- Add empty `spoiler_text` property to Mastodon API Status object. - Add empty `spoiler_text` property to Mastodon API Status object.
- Added `error` and `error_description` fields to Mastodon API error responses. - Added `error` and `error_description` fields to Mastodon API error responses.
- Store information about failed activity deliveries in database.
### Changed ### Changed

View file

@ -6,16 +6,13 @@ use reqwest::{Client, Proxy};
use rsa::RsaPrivateKey; use rsa::RsaPrivateKey;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::time::sleep;
use mitra_config::Instance; use mitra_config::Instance;
use mitra_utils::crypto_rsa::deserialize_private_key; use mitra_utils::crypto_rsa::deserialize_private_key;
use crate::database::{ use crate::database::{
get_database_client,
DatabaseClient, DatabaseClient,
DatabaseError, DatabaseError,
DbPool,
}; };
use crate::http_signatures::create::{ use crate::http_signatures::create::{
create_http_signature, create_http_signature,
@ -26,10 +23,7 @@ use crate::json_signatures::create::{
sign_object, sign_object,
JsonSignatureError, JsonSignatureError,
}; };
use crate::models::{ use crate::models::users::types::User;
profiles::queries::set_reachability_status,
users::types::User,
};
use super::actors::types::Actor; use super::actors::types::Actor;
use super::constants::AP_MEDIA_TYPE; use super::constants::AP_MEDIA_TYPE;
use super::identifiers::{local_actor_id, local_actor_key_id}; use super::identifiers::{local_actor_id, local_actor_key_id};
@ -122,26 +116,19 @@ async fn send_activity(
Ok(()) Ok(())
} }
// 30 secs, 5 mins, 50 mins, 8 hours
fn backoff(retry_count: u32) -> Duration {
debug_assert!(retry_count > 0);
Duration::from_secs(3 * 10_u64.pow(retry_count))
}
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub struct Recipient { pub struct Recipient {
id: String, pub id: String,
inbox: String, inbox: String,
#[serde(default)] #[serde(default)]
is_delivered: bool, // default to false if serialized data contains no value pub is_delivered: bool, // default to false if serialized data contains no value
} }
async fn deliver_activity_worker( async fn deliver_activity_worker(
maybe_db_pool: Option<DbPool>,
instance: Instance, instance: Instance,
sender: User, sender: User,
activity: Value, activity: Value,
mut recipients: Vec<Recipient>, recipients: &mut [Recipient],
) -> Result<(), DelivererError> { ) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?; let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_id = local_actor_id( let actor_id = local_actor_id(
@ -157,56 +144,30 @@ async fn deliver_activity_worker(
}; };
let activity_json = serde_json::to_string(&activity_signed)?; let activity_json = serde_json::to_string(&activity_signed)?;
let mut retry_count = 0; for recipient in recipients.iter_mut() {
let max_retries = 2; if recipient.is_delivered {
continue;
while recipients.iter().any(|recipient| !recipient.is_delivered) &&
retry_count <= max_retries
{
if retry_count > 0 {
// Wait before next attempt
sleep(backoff(retry_count)).await;
}; };
for recipient in recipients.iter_mut() { if let Err(error) = send_activity(
if recipient.is_delivered { &instance,
continue; &actor_key,
}; &actor_key_id,
if let Err(error) = send_activity( &activity_json,
&instance, &recipient.inbox,
&actor_key, ).await {
&actor_key_id, log::warn!(
&activity_json, "failed to deliver activity to {}: {}",
&recipient.inbox, recipient.inbox,
).await { error,
log::warn!( );
"failed to deliver activity to {} (attempt #{}): {}", } else {
recipient.inbox, recipient.is_delivered = true;
retry_count + 1,
error,
);
} else {
recipient.is_delivered = true;
};
};
retry_count += 1;
};
if let Some(ref db_pool) = maybe_db_pool {
// Get connection from pool only after finishing delivery
let db_client = &**get_database_client(db_pool).await?;
for recipient in recipients {
set_reachability_status(
db_client,
&recipient.id,
recipient.is_delivered,
).await?;
}; };
}; };
Ok(()) Ok(())
} }
pub struct OutgoingActivity { pub struct OutgoingActivity {
pub db_pool: Option<DbPool>, // needed to track unreachable actors (optional)
pub instance: Instance, pub instance: Instance,
pub sender: User, pub sender: User,
pub activity: Value, pub activity: Value,
@ -233,7 +194,6 @@ impl OutgoingActivity {
}; };
}; };
Self { Self {
db_pool: None,
instance: instance.clone(), instance: instance.clone(),
sender: sender.clone(), sender: sender.clone(),
activity: serde_json::to_value(activity) activity: serde_json::to_value(activity)
@ -243,23 +203,15 @@ impl OutgoingActivity {
} }
pub(super) async fn deliver( pub(super) async fn deliver(
self, mut self,
) -> Result<(), DelivererError> { ) -> Result<Vec<Recipient>, DelivererError> {
deliver_activity_worker( deliver_activity_worker(
self.db_pool,
self.instance, self.instance,
self.sender, self.sender,
self.activity, self.activity,
self.recipients, &mut self.recipients,
).await ).await?;
} Ok(self.recipients)
pub(super) fn spawn_deliver(self) -> () {
tokio::spawn(async move {
self.deliver().await.unwrap_or_else(|err| {
log::error!("{}", err);
});
});
} }
pub async fn enqueue( pub async fn enqueue(
@ -278,18 +230,8 @@ impl OutgoingActivity {
activity: self.activity, activity: self.activity,
sender_id: self.sender.id, sender_id: self.sender.id,
recipients: self.recipients, recipients: self.recipients,
failure_count: 0,
}; };
job_data.into_job(db_client).await job_data.into_job(db_client, 0).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backoff() {
assert_eq!(backoff(1).as_secs(), 30);
assert_eq!(backoff(2).as_secs(), 300);
} }
} }

View file

@ -19,6 +19,7 @@ use crate::models::{
delete_job_from_queue, delete_job_from_queue,
}, },
background_jobs::types::JobType, background_jobs::types::JobType,
profiles::queries::set_reachability_status,
users::queries::get_user_by_id, users::queries::get_user_by_id,
}; };
use super::deliverer::{OutgoingActivity, Recipient}; use super::deliverer::{OutgoingActivity, Recipient};
@ -97,9 +98,9 @@ pub async fn process_queued_incoming_activities(
!matches!(error, HandlerError::FetchError(FetchError::RecursionError)) !matches!(error, HandlerError::FetchError(FetchError::RecursionError))
{ {
// Re-queue // Re-queue
log::info!("activity re-queued");
let retry_after = incoming_queue_backoff(job_data.failure_count); let retry_after = incoming_queue_backoff(job_data.failure_count);
job_data.into_job(db_client, retry_after).await?; job_data.into_job(db_client, retry_after).await?;
log::info!("activity re-queued");
}; };
}; };
delete_job_from_queue(db_client, &job.id).await?; delete_job_from_queue(db_client, &job.id).await?;
@ -112,16 +113,18 @@ pub struct OutgoingActivityJobData {
pub activity: Value, pub activity: Value,
pub sender_id: Uuid, pub sender_id: Uuid,
pub recipients: Vec<Recipient>, pub recipients: Vec<Recipient>,
pub failure_count: u32,
} }
impl OutgoingActivityJobData { impl OutgoingActivityJobData {
pub async fn into_job( pub async fn into_job(
self, self,
db_client: &impl DatabaseClient, db_client: &impl DatabaseClient,
delay: u32,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self) let job_data = serde_json::to_value(self)
.expect("activity should be serializable"); .expect("activity should be serializable");
let scheduled_for = Utc::now(); let scheduled_for = Utc::now() + Duration::seconds(delay.into());
enqueue_job( enqueue_job(
db_client, db_client,
&JobType::OutgoingActivity, &JobType::OutgoingActivity,
@ -132,6 +135,13 @@ impl OutgoingActivityJobData {
} }
const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1; const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1;
const OUTGOING_QUEUE_RETRIES_MAX: u32 = 2;
// 30 secs, 5 mins, 50 mins, 8 hours
pub fn outgoing_queue_backoff(failure_count: u32) -> u32 {
debug_assert!(failure_count > 0);
3 * 10_u32.pow(failure_count)
}
pub async fn process_queued_outgoing_activities( pub async fn process_queued_outgoing_activities(
config: &Config, config: &Config,
@ -144,19 +154,64 @@ pub async fn process_queued_outgoing_activities(
OUTGOING_QUEUE_BATCH_SIZE, OUTGOING_QUEUE_BATCH_SIZE,
).await?; ).await?;
for job in batch { for job in batch {
let job_data: OutgoingActivityJobData = let mut job_data: OutgoingActivityJobData =
serde_json::from_value(job.job_data) serde_json::from_value(job.job_data)
.map_err(|_| DatabaseTypeError)?; .map_err(|_| DatabaseTypeError)?;
let sender = get_user_by_id(db_client, &job_data.sender_id).await?; let sender = get_user_by_id(db_client, &job_data.sender_id).await?;
let outgoing_activity = OutgoingActivity { let outgoing_activity = OutgoingActivity {
db_pool: Some(db_pool.clone()),
instance: config.instance(), instance: config.instance(),
sender, sender,
activity: job_data.activity, activity: job_data.activity.clone(),
recipients: job_data.recipients, recipients: job_data.recipients,
}; };
outgoing_activity.spawn_deliver();
let recipients = match outgoing_activity.deliver().await {
Ok(recipients) => recipients,
Err(error) => {
// Unexpected error
log::error!("{}", error);
delete_job_from_queue(db_client, &job.id).await?;
return Ok(());
},
};
log::info!(
"delivery job: {} delivered, {} errors (attempt #{})",
recipients.iter().filter(|item| item.is_delivered).count(),
recipients.iter().filter(|item| !item.is_delivered).count(),
job_data.failure_count + 1,
);
if recipients.iter().any(|recipient| !recipient.is_delivered) &&
job_data.failure_count < OUTGOING_QUEUE_RETRIES_MAX
{
job_data.failure_count += 1;
// Re-queue if some deliveries are not successful
job_data.recipients = recipients;
let retry_after = outgoing_queue_backoff(job_data.failure_count);
job_data.into_job(db_client, retry_after).await?;
log::info!("delivery job re-queued");
} else {
// Update inbox status if all deliveries are successful
// or if retry limit is reached
for recipient in recipients {
set_reachability_status(
db_client,
&recipient.id,
recipient.is_delivered,
).await?;
};
};
delete_job_from_queue(db_client, &job.id).await?; delete_job_from_queue(db_client, &job.id).await?;
}; };
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_outgoing_queue_backoff() {
assert_eq!(outgoing_queue_backoff(1), 30);
assert_eq!(outgoing_queue_backoff(2), 300);
}
}