fedimovies/src/activitypub/queues.rs

226 lines
7.2 KiB
Rust

use chrono::{Duration, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use fedimovies_config::Config;
use fedimovies_models::{
background_jobs::queries::{delete_job_from_queue, enqueue_job, get_job_batch},
background_jobs::types::JobType,
database::{get_database_client, DatabaseClient, DatabaseError, DatabaseTypeError, DbPool},
profiles::queries::set_reachability_status,
users::queries::get_user_by_id,
};
use super::deliverer::{OutgoingActivity, Recipient};
use super::fetcher::fetchers::FetchError;
use super::receiver::{handle_activity, HandlerError};
const JOB_TIMEOUT: u32 = 3600; // 1 hour
#[derive(Deserialize, Serialize)]
pub struct IncomingActivityJobData {
activity: Value,
is_authenticated: bool,
failure_count: u32,
}
impl IncomingActivityJobData {
pub fn new(activity: &Value, is_authenticated: bool) -> Self {
Self {
activity: activity.clone(),
is_authenticated,
failure_count: 0,
}
}
pub async fn into_job(
self,
db_client: &impl DatabaseClient,
delay: u32,
) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self).expect("activity should be serializable");
let scheduled_for = Utc::now() + Duration::seconds(delay.into());
enqueue_job(
db_client,
&JobType::IncomingActivity,
&job_data,
&scheduled_for,
)
.await
}
}
const INCOMING_QUEUE_BATCH_SIZE: u32 = 10;
const INCOMING_QUEUE_RETRIES_MAX: u32 = 2;
const fn incoming_queue_backoff(_failure_count: u32) -> u32 {
// Constant, 10 minutes
60 * 10
}
pub async fn process_queued_incoming_activities(
config: &Config,
db_client: &mut impl DatabaseClient,
) -> Result<(), DatabaseError> {
let batch = get_job_batch(
db_client,
&JobType::IncomingActivity,
INCOMING_QUEUE_BATCH_SIZE,
JOB_TIMEOUT,
)
.await?;
for job in batch {
let mut job_data: IncomingActivityJobData =
serde_json::from_value(job.job_data).map_err(|_| DatabaseTypeError)?;
// See also: activitypub::queues::JOB_TIMEOUT
let duration_max = std::time::Duration::from_secs(600);
let handler_future = handle_activity(
config,
db_client,
&job_data.activity,
job_data.is_authenticated,
);
let handler_result = match tokio::time::timeout(duration_max, handler_future).await {
Ok(result) => result,
Err(_) => {
log::error!(
"failed to process activity (timeout): {}",
job_data.activity,
);
delete_job_from_queue(db_client, &job.id).await?;
continue;
}
};
if let Err(error) = handler_result {
job_data.failure_count += 1;
if let HandlerError::DatabaseError(DatabaseError::DatabaseClientError(ref pg_error)) =
error
{
log::error!("database client error: {}", pg_error);
};
log::warn!(
"failed to process activity ({}) (attempt #{}): {}",
error,
job_data.failure_count,
job_data.activity,
);
if job_data.failure_count <= INCOMING_QUEUE_RETRIES_MAX &&
// Don't retry after fetcher recursion error
!matches!(error, HandlerError::FetchError(FetchError::RecursionError))
{
// Re-queue
let retry_after = incoming_queue_backoff(job_data.failure_count);
job_data.into_job(db_client, retry_after).await?;
log::info!("activity re-queued");
};
};
delete_job_from_queue(db_client, &job.id).await?;
}
Ok(())
}
#[derive(Deserialize, Serialize)]
pub struct OutgoingActivityJobData {
pub activity: Value,
pub sender_id: Uuid,
pub recipients: Vec<Recipient>,
pub failure_count: u32,
}
impl OutgoingActivityJobData {
pub async fn into_job(
self,
db_client: &impl DatabaseClient,
delay: u32,
) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self).expect("activity should be serializable");
let scheduled_for = Utc::now() + Duration::seconds(delay.into());
enqueue_job(
db_client,
&JobType::OutgoingActivity,
&job_data,
&scheduled_for,
)
.await
}
}
const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1;
const OUTGOING_QUEUE_RETRIES_MAX: u32 = 3;
// 5 mins, 50 mins, 8 hours
pub fn outgoing_queue_backoff(failure_count: u32) -> u32 {
debug_assert!(failure_count > 0);
30 * 10_u32.pow(failure_count)
}
pub async fn process_queued_outgoing_activities(
config: &Config,
db_pool: &DbPool,
) -> Result<(), DatabaseError> {
let db_client = &**get_database_client(db_pool).await?;
let batch = get_job_batch(
db_client,
&JobType::OutgoingActivity,
OUTGOING_QUEUE_BATCH_SIZE,
JOB_TIMEOUT,
)
.await?;
for job in batch {
let mut job_data: OutgoingActivityJobData =
serde_json::from_value(job.job_data).map_err(|_| DatabaseTypeError)?;
let sender = get_user_by_id(db_client, &job_data.sender_id).await?;
let outgoing_activity = OutgoingActivity {
instance: config.instance(),
sender,
activity: job_data.activity.clone(),
recipients: job_data.recipients,
};
let recipients = match outgoing_activity.deliver().await {
Ok(recipients) => recipients,
Err(error) => {
// Unexpected error
log::error!("{}", error);
delete_job_from_queue(db_client, &job.id).await?;
return Ok(());
}
};
log::info!(
"delivery job: {} delivered, {} errors (attempt #{})",
recipients.iter().filter(|item| item.is_delivered).count(),
recipients.iter().filter(|item| !item.is_delivered).count(),
job_data.failure_count + 1,
);
if recipients.iter().any(|recipient| !recipient.is_delivered)
&& job_data.failure_count < OUTGOING_QUEUE_RETRIES_MAX
{
job_data.failure_count += 1;
// Re-queue if some deliveries are not successful
job_data.recipients = recipients;
let retry_after = outgoing_queue_backoff(job_data.failure_count);
job_data.into_job(db_client, retry_after).await?;
log::info!("delivery job re-queued");
} else {
// Update inbox status if all deliveries are successful
// or if retry limit is reached
for recipient in recipients {
set_reachability_status(db_client, &recipient.id, recipient.is_delivered).await?;
}
};
delete_job_from_queue(db_client, &job.id).await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_outgoing_queue_backoff() {
assert_eq!(outgoing_queue_backoff(1), 300);
assert_eq!(outgoing_queue_backoff(2), 3000);
}
}