Move activity queue helpers to activitypub::queues module

This commit is contained in:
silverpill 2022-12-31 00:06:33 +00:00
parent 471442a22a
commit 0ecf682984
4 changed files with 100 additions and 92 deletions

View file

@ -8,6 +8,7 @@ mod deliverer;
pub mod fetcher;
mod handlers;
pub mod identifiers;
pub mod receiver;
pub mod queues;
mod receiver;
pub mod views;
mod vocabulary;

94
src/activitypub/queues.rs Normal file
View file

@ -0,0 +1,94 @@
use chrono::{Duration, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio_postgres::GenericClient;
use crate::config::Config;
use crate::database::{DatabaseError, DatabaseTypeError};
use crate::models::{
background_jobs::queries::{
enqueue_job,
get_job_batch,
delete_job_from_queue,
},
background_jobs::types::JobType,
};
use super::receiver::handle_activity;
#[derive(Deserialize, Serialize)]
pub struct IncomingActivity {
activity: Value,
is_authenticated: bool,
failure_count: i32,
}
impl IncomingActivity {
pub fn new(activity: &Value, is_authenticated: bool) -> Self {
Self {
activity: activity.clone(),
is_authenticated,
failure_count: 0,
}
}
pub async fn enqueue(
self,
db_client: &impl GenericClient,
delay: i64,
) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self)
.expect("activity should be serializable");
let scheduled_for = Utc::now() + Duration::seconds(delay);
enqueue_job(
db_client,
&JobType::IncomingActivity,
&job_data,
&scheduled_for,
).await
}
}
pub async fn process_queued_activities(
config: &Config,
db_client: &mut impl GenericClient,
) -> Result<(), DatabaseError> {
let batch_size = 10;
let max_retries = 2;
let retry_after = 60 * 10; // 10 minutes
let batch = get_job_batch(
db_client,
&JobType::IncomingActivity,
batch_size,
).await?;
for job in batch {
let mut incoming_activity: IncomingActivity =
serde_json::from_value(job.job_data)
.map_err(|_| DatabaseTypeError)?;
let is_error = match handle_activity(
config,
db_client,
&incoming_activity.activity,
incoming_activity.is_authenticated,
).await {
Ok(_) => false,
Err(error) => {
incoming_activity.failure_count += 1;
log::warn!(
"failed to process activity ({}) (attempt #{}): {}",
error,
incoming_activity.failure_count,
incoming_activity.activity,
);
true
},
};
if is_error && incoming_activity.failure_count <= max_retries {
// Re-queue
log::info!("activity re-queued");
incoming_activity.enqueue(db_client, retry_after).await?;
};
delete_job_from_queue(db_client, &job.id).await?;
};
Ok(())
}

View file

@ -1,9 +1,7 @@
use actix_web::HttpRequest;
use chrono::{Duration, Utc};
use serde::{
Deserialize,
Deserializer,
Serialize,
de::DeserializeOwned,
de::Error as DeserializerError,
};
@ -11,20 +9,12 @@ use serde_json::Value;
use tokio_postgres::GenericClient;
use crate::config::Config;
use crate::database::{DatabaseError, DatabaseTypeError};
use crate::database::DatabaseError;
use crate::errors::{
ConversionError,
HttpError,
ValidationError,
};
use crate::models::{
background_jobs::queries::{
enqueue_job,
get_job_batch,
delete_job_from_queue,
},
background_jobs::types::JobType,
};
use super::authentication::{
verify_signed_activity,
verify_signed_request,
@ -45,6 +35,7 @@ use super::handlers::{
undo::handle_undo,
update::handle_update,
};
use super::queues::IncomingActivity;
use super::vocabulary::*;
#[derive(thiserror::Error, Debug)]
@ -152,7 +143,7 @@ pub fn deserialize_into_object_id<'de, D>(
Ok(object_id)
}
async fn handle_activity(
pub async fn handle_activity(
config: &Config,
db_client: &mut impl GenericClient,
activity: &Value,
@ -344,84 +335,6 @@ pub async fn receive_activity(
).await
}
#[derive(Deserialize, Serialize)]
struct IncomingActivity {
activity: Value,
is_authenticated: bool,
failure_count: i32,
}
impl IncomingActivity {
fn new(activity: &Value, is_authenticated: bool) -> Self {
Self {
activity: activity.clone(),
is_authenticated,
failure_count: 0,
}
}
async fn enqueue(
self,
db_client: &impl GenericClient,
delay: i64,
) -> Result<(), DatabaseError> {
let job_data = serde_json::to_value(self)
.expect("activity should be serializable");
let scheduled_for = Utc::now() + Duration::seconds(delay);
enqueue_job(
db_client,
&JobType::IncomingActivity,
&job_data,
&scheduled_for,
).await
}
}
pub async fn process_queued_activities(
config: &Config,
db_client: &mut impl GenericClient,
) -> Result<(), DatabaseError> {
let batch_size = 10;
let max_retries = 2;
let retry_after = 60 * 10; // 10 minutes
let batch = get_job_batch(
db_client,
&JobType::IncomingActivity,
batch_size,
).await?;
for job in batch {
let mut incoming_activity: IncomingActivity =
serde_json::from_value(job.job_data)
.map_err(|_| DatabaseTypeError)?;
let is_error = match handle_activity(
config,
db_client,
&incoming_activity.activity,
incoming_activity.is_authenticated,
).await {
Ok(_) => false,
Err(error) => {
incoming_activity.failure_count += 1;
log::warn!(
"failed to process activity ({}) (attempt #{}): {}",
error,
incoming_activity.failure_count,
incoming_activity.activity,
);
true
},
};
if is_error && incoming_activity.failure_count <= max_retries {
// Re-queue
log::info!("activity re-queued");
incoming_activity.enqueue(db_client, retry_after).await?;
};
delete_job_from_queue(db_client, &job.id).await?;
};
Ok(())
}
#[cfg(test)]
mod tests {
use serde_json::json;

View file

@ -5,7 +5,7 @@ use anyhow::Error;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::activitypub::receiver::process_queued_activities;
use crate::activitypub::queues::process_queued_activities;
use crate::config::{Config, Instance};
use crate::database::{get_database_client, DbPool};
use crate::ethereum::contracts::Blockchain;