From eca36974101e7e315048327cbd13831e0cdd400a Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 5 Apr 2024 11:58:55 -0500 Subject: [PATCH] Add panic boundaries for background jobs --- src/queue.rs | 76 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index f20b457..562f221 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -11,9 +11,11 @@ use crate::{ use std::{ ops::Deref, + rc::Rc, sync::Arc, time::{Duration, Instant}, }; +use tokio::task::JoinError; use tracing::Instrument; pub(crate) mod cleanup; @@ -297,54 +299,66 @@ where } } -fn job_result(result: &JobResult) -> crate::repo::JobResult { +fn job_result(result: &Result) -> crate::repo::JobResult { match result { - Ok(()) => crate::repo::JobResult::Success, - Err(JobError::Retry(_)) => crate::repo::JobResult::Failure, - Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted, + Ok(Ok(())) => crate::repo::JobResult::Success, + Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure, + Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted, + Err(_) => crate::repo::JobResult::Aborted, } } async fn process_jobs(state: State, queue: &'static str, callback: F) where - S: Store, - for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy, + S: Store + 'static, + for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy + 'static, { let worker_id = uuid::Uuid::new_v4(); + let state = Rc::new(state); loop { tracing::trace!("process_jobs: looping"); crate::sync::cooperate().await; - let res = job_loop(&state, worker_id, queue, callback) - .with_poll_timer("job-loop") - .await; + // add a panic boundary by spawning a task + let res = crate::sync::spawn( + "job-loop", + job_loop(state.clone(), worker_id, queue, callback), + ) + .await; - if let Err(e) = res { - tracing::warn!("Error processing jobs: {}", format!("{e}")); - tracing::warn!("{}", format!("{e:?}")); + match res { + // clean exit + Ok(Ok(())) => break, - if e.is_disconnected() { - tokio::time::sleep(Duration::from_secs(10)).await; + // job error + Ok(Err(e)) => { + tracing::warn!("Error processing jobs: {}", format!("{e}")); + tracing::warn!("{}", format!("{e:?}")); + + if e.is_disconnected() { + tokio::time::sleep(Duration::from_secs(10)).await; + } } - continue; + // job panic + Err(_) => { + tracing::warn!("Panic while processing jobs"); + } } - - break; } } async fn job_loop( - state: &State, + state: Rc>, worker_id: uuid::Uuid, queue: &'static str, callback: F, ) -> Result<(), Error> where - S: Store, - for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy, + S: Store + 'static, + for<'a> F: Fn(&'a State, serde_json::Value) -> JobFuture<'a> + Copy + 'static, { loop { tracing::trace!("job_loop: looping"); @@ -360,14 +374,18 @@ where let guard = MetricsGuard::guard(worker_id, queue); - let res = heartbeat( - &state.repo, - queue, - worker_id, - job_id, - (callback)(state, job), - ) - .with_poll_timer("job-and-heartbeat") + let state2 = state.clone(); + let res = crate::sync::spawn("job-and-heartbeat", async move { + let state = state2; + heartbeat( + &state.repo, + queue, + worker_id, + job_id, + (callback)(&state, job), + ) + .await + }) .await; state @@ -376,7 +394,7 @@ where .with_poll_timer("job-complete") .await?; - res?; + res.map_err(|_| UploadError::Canceled)??; guard.disarm();