From 5ebc1bd67438113494cb724f4cd194687963a342 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Mon, 13 Sep 2021 17:40:49 -0500 Subject: [PATCH] Handle panicking jobs --- examples/actix-example/src/main.rs | 28 +++++++++++++ jobs-core/src/processor_map.rs | 67 +++++++++++++++++++++++++++--- jobs-core/src/stats.rs | 13 +----- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index e33802e..6d3d7cb 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -17,8 +17,14 @@ pub struct MyJob { other_usize: usize, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct PanickingJob; + #[actix_rt::main] async fn main() -> Result<(), Error> { + if std::env::var_os("RUST_LOG").is_none() { + std::env::set_var("RUST_LOG", "info"); + } env_logger::init(); // Set up our Storage let db = sled::Config::new().temporary(true).open()?; @@ -29,10 +35,16 @@ async fn main() -> Result<(), Error> { // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) + .register::() .register::() .set_worker_count(DEFAULT_QUEUE, 16) .start(queue_handle.clone()); + // Queue some panicking job + for _ in 0..32 { + queue_handle.queue(PanickingJob)?; + } + // Queue our jobs queue_handle.queue(MyJob::new(1, 2))?; queue_handle.queue(MyJob::new(3, 4))?; @@ -90,3 +102,19 @@ impl Job for MyJob { ready(Ok(())) } } + +#[async_trait::async_trait] +impl Job for PanickingJob { + type State = MyState; + type Future = Ready>; + + const NAME: &'static str = "PanickingJob"; + + const QUEUE: &'static str = DEFAULT_QUEUE; + + const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); + + fn run(self, _: MyState) -> Self::Future { + panic!("A panicking job does not stop others from running") + } +} diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 33675f8..6f28942 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -79,7 +79,7 @@ where let opt = self .inner .get(job.name()) - .map(|name| process(name, (self.state_fn)(), job.clone())); + .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone())); if let Some(fut) = opt { fut.await @@ -100,7 +100,7 @@ where /// intended for internal use. pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { if let Some(name) = self.inner.get(job.name()) { - process(name, self.state.clone(), job).await + process(Arc::clone(name), self.state.clone(), job).await } else { error!("Job {} not registered", job.name()); ReturnJobInfo::unregistered(job.id()) @@ -108,13 +108,64 @@ where } } -async fn process(process_fn: &ProcessFn, state: S, job: JobInfo) -> ReturnJobInfo { +struct CatchUnwindFuture { + future: std::sync::Mutex, +} + +fn catch_unwind(future: F) -> CatchUnwindFuture +where + F: Future + Unpin, +{ + CatchUnwindFuture { + future: std::sync::Mutex::new(future), + } +} + +impl std::future::Future for CatchUnwindFuture +where + F: Future + Unpin, +{ + type Output = std::thread::Result; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let future = &self.future; + let waker = cx.waker().clone(); + let res = std::panic::catch_unwind(|| { + let mut context = std::task::Context::from_waker(&waker); + let mut guard = future.lock().unwrap(); + Pin::new(&mut *guard).poll(&mut context) + }); + + match res { + Ok(poll) => poll.map(Ok), + Err(e) => std::task::Poll::Ready(Err(e)), + } + } +} + +async fn process(process_fn: ProcessFn, state: S, job: JobInfo) -> ReturnJobInfo +where + S: Clone, +{ let args = job.args(); let id = job.id(); let name = job.name().to_owned(); let start = Utc::now(); - let res = process_fn(args, state).await; + + let state_mtx = std::sync::Mutex::new(state); + let process_mtx = std::sync::Mutex::new(process_fn); + + let res = match std::panic::catch_unwind(|| { + let state = state_mtx.lock().unwrap().clone(); + (process_mtx.lock().unwrap())(args, state) + }) { + Ok(fut) => catch_unwind(fut).await, + Err(e) => Err(e), + }; let end = Utc::now(); let duration = end - start; @@ -126,13 +177,17 @@ async fn process(process_fn: &ProcessFn, state: S, job: JobInfo) -> Return }; match res { - Ok(_) => { + Ok(Ok(_)) => { info!("Job {} {} completed {:.6}", id, name, seconds); ReturnJobInfo::pass(id) } - Err(e) => { + Ok(Err(e)) => { info!("Job {} {} errored {} {:.6}", id, name, e, seconds); ReturnJobInfo::fail(id) } + Err(_) => { + info!("Job {} {} panicked {:.6}", id, name, seconds); + ReturnJobInfo::fail(id) + } } } diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs index 6d5c10f..c0ba240 100644 --- a/jobs-core/src/stats.rs +++ b/jobs-core/src/stats.rs @@ -1,6 +1,6 @@ use chrono::{offset::Utc, DateTime, Datelike, Timelike}; -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] /// Statistics about the jobs processor pub struct Stats { /// How many jobs are pending execution @@ -60,17 +60,6 @@ impl Stats { } } -impl Default for Stats { - fn default() -> Self { - Stats { - pending: 0, - running: 0, - dead: JobStat::default(), - complete: JobStat::default(), - } - } -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] /// A time-based overview of job completion and failures pub struct JobStat {