Rename IncomingActivity type

This commit is contained in:
silverpill 2022-12-31 13:28:25 +00:00
parent 0ecf682984
commit b4f68aaec8
2 changed files with 14 additions and 14 deletions

View file

@ -16,13 +16,13 @@ use crate::models::{
use super::receiver::handle_activity; use super::receiver::handle_activity;
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub struct IncomingActivity { pub struct IncomingActivityJobData {
activity: Value, activity: Value,
is_authenticated: bool, is_authenticated: bool,
failure_count: i32, failure_count: i32,
} }
impl IncomingActivity { impl IncomingActivityJobData {
pub fn new(activity: &Value, is_authenticated: bool) -> Self { pub fn new(activity: &Value, is_authenticated: bool) -> Self {
Self { Self {
activity: activity.clone(), activity: activity.clone(),
@ -31,7 +31,7 @@ impl IncomingActivity {
} }
} }
pub async fn enqueue( pub async fn into_job(
self, self,
db_client: &impl GenericClient, db_client: &impl GenericClient,
delay: i64, delay: i64,
@ -62,31 +62,31 @@ pub async fn process_queued_activities(
batch_size, batch_size,
).await?; ).await?;
for job in batch { for job in batch {
let mut incoming_activity: IncomingActivity = let mut job_data: IncomingActivityJobData =
serde_json::from_value(job.job_data) serde_json::from_value(job.job_data)
.map_err(|_| DatabaseTypeError)?; .map_err(|_| DatabaseTypeError)?;
let is_error = match handle_activity( let is_error = match handle_activity(
config, config,
db_client, db_client,
&incoming_activity.activity, &job_data.activity,
incoming_activity.is_authenticated, job_data.is_authenticated,
).await { ).await {
Ok(_) => false, Ok(_) => false,
Err(error) => { Err(error) => {
incoming_activity.failure_count += 1; job_data.failure_count += 1;
log::warn!( log::warn!(
"failed to process activity ({}) (attempt #{}): {}", "failed to process activity ({}) (attempt #{}): {}",
error, error,
incoming_activity.failure_count, job_data.failure_count,
incoming_activity.activity, job_data.activity,
); );
true true
}, },
}; };
if is_error && incoming_activity.failure_count <= max_retries { if is_error && job_data.failure_count <= max_retries {
// Re-queue // Re-queue
log::info!("activity re-queued"); log::info!("activity re-queued");
incoming_activity.enqueue(db_client, retry_after).await?; job_data.into_job(db_client, retry_after).await?;
}; };
delete_job_from_queue(db_client, &job.id).await?; delete_job_from_queue(db_client, &job.id).await?;
}; };

View file

@ -35,7 +35,7 @@ use super::handlers::{
undo::handle_undo, undo::handle_undo,
update::handle_update, update::handle_update,
}; };
use super::queues::IncomingActivity; use super::queues::IncomingActivityJobData;
use super::vocabulary::*; use super::vocabulary::*;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
@ -321,8 +321,8 @@ pub async fn receive_activity(
if let ANNOUNCE | CREATE | UPDATE = activity_type { if let ANNOUNCE | CREATE | UPDATE = activity_type {
// Add activity to job queue and release lock // Add activity to job queue and release lock
IncomingActivity::new(activity, is_authenticated) IncomingActivityJobData::new(activity, is_authenticated)
.enqueue(db_client, 0).await?; .into_job(db_client, 0).await?;
log::debug!("activity added to the queue: {}", activity_type); log::debug!("activity added to the queue: {}", activity_type);
return Ok(()); return Ok(());
}; };