From 9a32fb9c8093ed0b2cd47db1e61a8f4a7c8409a2 Mon Sep 17 00:00:00 2001 From: silverpill Date: Fri, 31 Mar 2023 17:36:27 +0000 Subject: [PATCH] Remove activity from queue if handler times out --- CHANGELOG.md | 1 + src/activitypub/queues.rs | 19 ++++++++++++++++++- src/job_queue/periodic_tasks.rs | 10 +--------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02d5e13..1e661a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Process queued background jobs before re-trying stalled. +- Remove activity from queue if handler times out. ## [1.19.0] - 2023-03-30 diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index 68fdabb..be5afaa 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -83,12 +83,29 @@ pub async fn process_queued_incoming_activities( let mut job_data: IncomingActivityJobData = serde_json::from_value(job.job_data) .map_err(|_| DatabaseTypeError)?; - if let Err(error) = handle_activity( + // See also: activitypub::queues::JOB_TIMEOUT + let duration_max = std::time::Duration::from_secs(600); + let handler_future = handle_activity( config, db_client, &job_data.activity, job_data.is_authenticated, + ); + let handler_result = match tokio::time::timeout( + duration_max, + handler_future, ).await { + Ok(result) => result, + Err(_) => { + log::error!( + "failed to process activity (timeout): {}", + job_data.activity, + ); + delete_job_from_queue(db_client, &job.id).await?; + continue; + }, + }; + if let Err(error) = handler_result { job_data.failure_count += 1; log::warn!( "failed to process activity ({}) (attempt #{}): {}", diff --git a/src/job_queue/periodic_tasks.rs b/src/job_queue/periodic_tasks.rs index 9d1e98e..37c613f 100644 --- a/src/job_queue/periodic_tasks.rs +++ b/src/job_queue/periodic_tasks.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use anyhow::Error; use mitra_config::Config; @@ -114,13 +112,7 @@ pub async fn incoming_activity_queue_executor( db_pool: &DbPool, ) -> Result<(), Error> { let db_client = &mut **get_database_client(db_pool).await?; - // See also: activitypub::queues::JOB_TIMEOUT - let duration_max = Duration::from_secs(600); - let completed = process_queued_incoming_activities(config, db_client); - match tokio::time::timeout(duration_max, completed).await { - Ok(result) => result?, - Err(_) => log::error!("incoming activity queue executor timeout"), - }; + process_queued_incoming_activities(config, db_client).await?; Ok(()) }