Remove activity from queue if handler times out
This commit is contained in:
parent
779d4e7287
commit
9a32fb9c80
3 changed files with 20 additions and 10 deletions
|
@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
|||
### Fixed
|
||||
|
||||
- Process queued background jobs before re-trying stalled.
|
||||
- Remove activity from queue if handler times out.
|
||||
|
||||
## [1.19.0] - 2023-03-30
|
||||
|
||||
|
|
|
@ -83,12 +83,29 @@ pub async fn process_queued_incoming_activities(
|
|||
let mut job_data: IncomingActivityJobData =
|
||||
serde_json::from_value(job.job_data)
|
||||
.map_err(|_| DatabaseTypeError)?;
|
||||
if let Err(error) = handle_activity(
|
||||
// See also: activitypub::queues::JOB_TIMEOUT
|
||||
let duration_max = std::time::Duration::from_secs(600);
|
||||
let handler_future = handle_activity(
|
||||
config,
|
||||
db_client,
|
||||
&job_data.activity,
|
||||
job_data.is_authenticated,
|
||||
);
|
||||
let handler_result = match tokio::time::timeout(
|
||||
duration_max,
|
||||
handler_future,
|
||||
).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
log::error!(
|
||||
"failed to process activity (timeout): {}",
|
||||
job_data.activity,
|
||||
);
|
||||
delete_job_from_queue(db_client, &job.id).await?;
|
||||
continue;
|
||||
},
|
||||
};
|
||||
if let Err(error) = handler_result {
|
||||
job_data.failure_count += 1;
|
||||
log::warn!(
|
||||
"failed to process activity ({}) (attempt #{}): {}",
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use anyhow::Error;
|
||||
|
||||
use mitra_config::Config;
|
||||
|
@ -114,13 +112,7 @@ pub async fn incoming_activity_queue_executor(
|
|||
db_pool: &DbPool,
|
||||
) -> Result<(), Error> {
|
||||
let db_client = &mut **get_database_client(db_pool).await?;
|
||||
// See also: activitypub::queues::JOB_TIMEOUT
|
||||
let duration_max = Duration::from_secs(600);
|
||||
let completed = process_queued_incoming_activities(config, db_client);
|
||||
match tokio::time::timeout(duration_max, completed).await {
|
||||
Ok(result) => result?,
|
||||
Err(_) => log::error!("incoming activity queue executor timeout"),
|
||||
};
|
||||
process_queued_incoming_activities(config, db_client).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue