Restart stalled background jobs

This commit is contained in:
silverpill 2023-03-25 20:14:48 +00:00
parent 5ba8b8d6ae
commit 0521f1f731
4 changed files with 18 additions and 4 deletions

View file

@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed ### Fixed
- Fixed error in emoji update SQL query. - Fixed error in emoji update SQL query.
- Restart stalled background jobs.
## [1.18.0] - 2023-03-21 ## [1.18.0] - 2023-03-21

View file

@ -26,6 +26,8 @@ use super::deliverer::{OutgoingActivity, Recipient};
use super::fetcher::fetchers::FetchError; use super::fetcher::fetchers::FetchError;
use super::receiver::{handle_activity, HandlerError}; use super::receiver::{handle_activity, HandlerError};
const JOB_TIMEOUT: u32 = 3600; // 1 hour
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub struct IncomingActivityJobData { pub struct IncomingActivityJobData {
activity: Value, activity: Value,
@ -75,6 +77,7 @@ pub async fn process_queued_incoming_activities(
db_client, db_client,
&JobType::IncomingActivity, &JobType::IncomingActivity,
INCOMING_QUEUE_BATCH_SIZE, INCOMING_QUEUE_BATCH_SIZE,
JOB_TIMEOUT,
).await?; ).await?;
for job in batch { for job in batch {
let mut job_data: IncomingActivityJobData = let mut job_data: IncomingActivityJobData =
@ -152,6 +155,7 @@ pub async fn process_queued_outgoing_activities(
db_client, db_client,
&JobType::OutgoingActivity, &JobType::OutgoingActivity,
OUTGOING_QUEUE_BATCH_SIZE, OUTGOING_QUEUE_BATCH_SIZE,
JOB_TIMEOUT,
).await?; ).await?;
for job in batch { for job in batch {
let mut job_data: OutgoingActivityJobData = let mut job_data: OutgoingActivityJobData =

View file

@ -114,6 +114,7 @@ pub async fn incoming_activity_queue_executor(
db_pool: &DbPool, db_pool: &DbPool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let db_client = &mut **get_database_client(db_pool).await?; let db_client = &mut **get_database_client(db_pool).await?;
// See also: activitypub::queues::JOB_TIMEOUT
let duration_max = Duration::from_secs(600); let duration_max = Duration::from_secs(600);
let completed = process_queued_incoming_activities(config, db_client); let completed = process_queued_incoming_activities(config, db_client);
match tokio::time::timeout(duration_max, completed).await { match tokio::time::timeout(duration_max, completed).await {

View file

@ -31,7 +31,10 @@ pub async fn get_job_batch(
db_client: &impl DatabaseClient, db_client: &impl DatabaseClient,
job_type: &JobType, job_type: &JobType,
batch_size: u32, batch_size: u32,
job_timeout: u32,
) -> Result<Vec<DbBackgroundJob>, DatabaseError> { ) -> Result<Vec<DbBackgroundJob>, DatabaseError> {
// https://github.com/sfackler/rust-postgres/issues/60
let job_timeout_pg = format!("{}S", job_timeout); // interval
let rows = db_client.query( let rows = db_client.query(
" "
UPDATE background_job UPDATE background_job
@ -43,8 +46,12 @@ pub async fn get_job_batch(
FROM background_job FROM background_job
WHERE WHERE
job_type = $2 job_type = $2
AND job_status = $3
AND scheduled_for < CURRENT_TIMESTAMP AND scheduled_for < CURRENT_TIMESTAMP
AND (
job_status = $3 --queued
OR job_status = $1 --running
AND updated_at < CURRENT_TIMESTAMP - $5::text::interval
)
ORDER BY scheduled_for ASC ORDER BY scheduled_for ASC
LIMIT $4 LIMIT $4
) )
@ -55,6 +62,7 @@ pub async fn get_job_batch(
&job_type, &job_type,
&JobStatus::Queued, &JobStatus::Queued,
&i64::from(batch_size), &i64::from(batch_size),
&job_timeout_pg,
], ],
).await?; ).await?;
let jobs = rows.iter() let jobs = rows.iter()
@ -100,18 +108,18 @@ mod tests {
let scheduled_for = Utc::now(); let scheduled_for = Utc::now();
enqueue_job(db_client, &job_type, &job_data, &scheduled_for).await.unwrap(); enqueue_job(db_client, &job_type, &job_data, &scheduled_for).await.unwrap();
let batch_1 = get_job_batch(db_client, &job_type, 10).await.unwrap(); let batch_1 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap();
assert_eq!(batch_1.len(), 1); assert_eq!(batch_1.len(), 1);
let job = &batch_1[0]; let job = &batch_1[0];
assert_eq!(job.job_type, job_type); assert_eq!(job.job_type, job_type);
assert_eq!(job.job_data, job_data); assert_eq!(job.job_data, job_data);
assert_eq!(job.job_status, JobStatus::Running); assert_eq!(job.job_status, JobStatus::Running);
let batch_2 = get_job_batch(db_client, &job_type, 10).await.unwrap(); let batch_2 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap();
assert_eq!(batch_2.len(), 0); assert_eq!(batch_2.len(), 0);
delete_job_from_queue(db_client, &job.id).await.unwrap(); delete_job_from_queue(db_client, &job.id).await.unwrap();
let batch_3 = get_job_batch(db_client, &job_type, 10).await.unwrap(); let batch_3 = get_job_batch(db_client, &job_type, 10, 3600).await.unwrap();
assert_eq!(batch_3.len(), 0); assert_eq!(batch_3.len(), 0);
} }
} }