Replace magic numbers in activitypub::queues module with constants
This commit is contained in:
parent
f5c012769f
commit
49f51f44d8
2 changed files with 19 additions and 13 deletions
|
@ -29,7 +29,7 @@ use super::receiver::{handle_activity, HandlerError};
|
||||||
pub struct IncomingActivityJobData {
|
pub struct IncomingActivityJobData {
|
||||||
activity: Value,
|
activity: Value,
|
||||||
is_authenticated: bool,
|
is_authenticated: bool,
|
||||||
failure_count: i32,
|
failure_count: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IncomingActivityJobData {
|
impl IncomingActivityJobData {
|
||||||
|
@ -44,11 +44,11 @@ impl IncomingActivityJobData {
|
||||||
pub async fn into_job(
|
pub async fn into_job(
|
||||||
self,
|
self,
|
||||||
db_client: &impl DatabaseClient,
|
db_client: &impl DatabaseClient,
|
||||||
delay: i64,
|
delay: u32,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let job_data = serde_json::to_value(self)
|
let job_data = serde_json::to_value(self)
|
||||||
.expect("activity should be serializable");
|
.expect("activity should be serializable");
|
||||||
let scheduled_for = Utc::now() + Duration::seconds(delay);
|
let scheduled_for = Utc::now() + Duration::seconds(delay.into());
|
||||||
enqueue_job(
|
enqueue_job(
|
||||||
db_client,
|
db_client,
|
||||||
&JobType::IncomingActivity,
|
&JobType::IncomingActivity,
|
||||||
|
@ -58,18 +58,22 @@ impl IncomingActivityJobData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const INCOMING_QUEUE_BATCH_SIZE: u32 = 10;
|
||||||
|
const INCOMING_QUEUE_RETRIES_MAX: u32 = 2;
|
||||||
|
|
||||||
|
const fn incoming_queue_backoff(_failure_count: u32) -> u32 {
|
||||||
|
// Constant, 10 minutes
|
||||||
|
60 * 10
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn process_queued_incoming_activities(
|
pub async fn process_queued_incoming_activities(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
db_client: &mut impl DatabaseClient,
|
db_client: &mut impl DatabaseClient,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let batch_size = 10;
|
|
||||||
let max_retries = 2;
|
|
||||||
let retry_after = 60 * 10; // 10 minutes
|
|
||||||
|
|
||||||
let batch = get_job_batch(
|
let batch = get_job_batch(
|
||||||
db_client,
|
db_client,
|
||||||
&JobType::IncomingActivity,
|
&JobType::IncomingActivity,
|
||||||
batch_size,
|
INCOMING_QUEUE_BATCH_SIZE,
|
||||||
).await?;
|
).await?;
|
||||||
for job in batch {
|
for job in batch {
|
||||||
let mut job_data: IncomingActivityJobData =
|
let mut job_data: IncomingActivityJobData =
|
||||||
|
@ -88,12 +92,13 @@ pub async fn process_queued_incoming_activities(
|
||||||
job_data.failure_count,
|
job_data.failure_count,
|
||||||
job_data.activity,
|
job_data.activity,
|
||||||
);
|
);
|
||||||
if job_data.failure_count <= max_retries &&
|
if job_data.failure_count <= INCOMING_QUEUE_RETRIES_MAX &&
|
||||||
// Don't retry after fetcher recursion error
|
// Don't retry after fetcher recursion error
|
||||||
!matches!(error, HandlerError::FetchError(FetchError::RecursionError))
|
!matches!(error, HandlerError::FetchError(FetchError::RecursionError))
|
||||||
{
|
{
|
||||||
// Re-queue
|
// Re-queue
|
||||||
log::info!("activity re-queued");
|
log::info!("activity re-queued");
|
||||||
|
let retry_after = incoming_queue_backoff(job_data.failure_count);
|
||||||
job_data.into_job(db_client, retry_after).await?;
|
job_data.into_job(db_client, retry_after).await?;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@ -126,16 +131,17 @@ impl OutgoingActivityJobData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const OUTGOING_QUEUE_BATCH_SIZE: u32 = 1;
|
||||||
|
|
||||||
pub async fn process_queued_outgoing_activities(
|
pub async fn process_queued_outgoing_activities(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
db_pool: &DbPool,
|
db_pool: &DbPool,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let db_client = &**get_database_client(db_pool).await?;
|
let db_client = &**get_database_client(db_pool).await?;
|
||||||
let batch_size = 1;
|
|
||||||
let batch = get_job_batch(
|
let batch = get_job_batch(
|
||||||
db_client,
|
db_client,
|
||||||
&JobType::OutgoingActivity,
|
&JobType::OutgoingActivity,
|
||||||
batch_size,
|
OUTGOING_QUEUE_BATCH_SIZE,
|
||||||
).await?;
|
).await?;
|
||||||
for job in batch {
|
for job in batch {
|
||||||
let job_data: OutgoingActivityJobData =
|
let job_data: OutgoingActivityJobData =
|
||||||
|
|
|
@ -30,7 +30,7 @@ pub async fn enqueue_job(
|
||||||
pub async fn get_job_batch(
|
pub async fn get_job_batch(
|
||||||
db_client: &impl DatabaseClient,
|
db_client: &impl DatabaseClient,
|
||||||
job_type: &JobType,
|
job_type: &JobType,
|
||||||
batch_size: i64,
|
batch_size: u32,
|
||||||
) -> Result<Vec<DbBackgroundJob>, DatabaseError> {
|
) -> Result<Vec<DbBackgroundJob>, DatabaseError> {
|
||||||
let rows = db_client.query(
|
let rows = db_client.query(
|
||||||
"
|
"
|
||||||
|
@ -54,7 +54,7 @@ pub async fn get_job_batch(
|
||||||
&JobStatus::Running,
|
&JobStatus::Running,
|
||||||
&job_type,
|
&job_type,
|
||||||
&JobStatus::Queued,
|
&JobStatus::Queued,
|
||||||
&batch_size,
|
&i64::from(batch_size),
|
||||||
],
|
],
|
||||||
).await?;
|
).await?;
|
||||||
let jobs = rows.iter()
|
let jobs = rows.iter()
|
||||||
|
|
Loading…
Reference in a new issue