From 0521f1f731b099b74a199335ff0c597693cb2285 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sat, 25 Mar 2023 20:14:48 +0000 Subject: [PATCH] Restart stalled background jobs --- CHANGELOG.md | 1 + src/activitypub/queues.rs | 4 ++++ src/job_queue/periodic_tasks.rs | 1 + src/models/background_jobs/queries.rs | 16 ++++++++++++---- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26772a8..954c05e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Fixed error in emoji update SQL query. +- Restart stalled background jobs. ## [1.18.0] - 2023-03-21 diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index b52637d..3b54001 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -26,6 +26,8 @@ use super::deliverer::{OutgoingActivity, Recipient}; use super::fetcher::fetchers::FetchError; use super::receiver::{handle_activity, HandlerError}; +const JOB_TIMEOUT: u32 = 3600; // 1 hour + #[derive(Deserialize, Serialize)] pub struct IncomingActivityJobData { activity: Value, @@ -75,6 +77,7 @@ pub async fn process_queued_incoming_activities( db_client, &JobType::IncomingActivity, INCOMING_QUEUE_BATCH_SIZE, + JOB_TIMEOUT, ).await?; for job in batch { let mut job_data: IncomingActivityJobData = @@ -152,6 +155,7 @@ pub async fn process_queued_outgoing_activities( db_client, &JobType::OutgoingActivity, OUTGOING_QUEUE_BATCH_SIZE, + JOB_TIMEOUT, ).await?; for job in batch { let mut job_data: OutgoingActivityJobData = diff --git a/src/job_queue/periodic_tasks.rs b/src/job_queue/periodic_tasks.rs index 58b701d..eeaeb4a 100644 --- a/src/job_queue/periodic_tasks.rs +++ b/src/job_queue/periodic_tasks.rs @@ -114,6 +114,7 @@ pub async fn incoming_activity_queue_executor( db_pool: &DbPool, ) -> Result<(), Error> { let db_client = &mut **get_database_client(db_pool).await?; + // See also: activitypub::queues::JOB_TIMEOUT let duration_max = Duration::from_secs(600); let completed = process_queued_incoming_activities(config, db_client); match tokio::time::timeout(duration_max, completed).await { diff --git a/src/models/background_jobs/queries.rs b/src/models/background_jobs/queries.rs index e26ad02..cedca1b 100644 --- a/src/models/background_jobs/queries.rs +++ b/src/models/background_jobs/queries.rs @@ -31,7 +31,10 @@ pub async fn get_job_batch( db_client: &impl DatabaseClient, job_type: &JobType, batch_size: u32, + job_timeout: u32, ) -> Result, DatabaseError> { + // https://github.com/sfackler/rust-postgres/issues/60 + let job_timeout_pg = format!("{}S", job_timeout); // interval let rows = db_client.query( " UPDATE background_job @@ -43,8 +46,12 @@ pub async fn get_job_batch( FROM background_job WHERE job_type = $2 - AND job_status = $3 AND scheduled_for < CURRENT_TIMESTAMP + AND ( + job_status = $3 --queued + OR job_status = $1 --running + AND updated_at < CURRENT_TIMESTAMP - $5::text::interval + ) ORDER BY scheduled_for ASC LIMIT $4 ) @@ -55,6 +62,7 @@ pub async fn get_job_batch( &job_type, &JobStatus::Queued, &i64::from(batch_size), + &job_timeout_pg, ], ).await?; let jobs = rows.iter() @@ -100,18 +108,18 @@ mod tests { let scheduled_for = Utc::now(); enqueue_job(db_client, &job_type, &job_data, &scheduled_for).await.unwrap(); - let batch_1 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + let batch_1 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap(); assert_eq!(batch_1.len(), 1); let job = &batch_1[0]; assert_eq!(job.job_type, job_type); assert_eq!(job.job_data, job_data); assert_eq!(job.job_status, JobStatus::Running); - let batch_2 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + let batch_2 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap(); assert_eq!(batch_2.len(), 0); delete_job_from_queue(db_client, &job.id).await.unwrap(); - let batch_3 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + let batch_3 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap(); assert_eq!(batch_3.len(), 0); } }