Handle panicking jobs

This commit is contained in:
Aode (Lion) 2021-09-13 17:40:49 -05:00
parent a2c149e406
commit 5ebc1bd674
3 changed files with 90 additions and 18 deletions

View file

@ -17,8 +17,14 @@ pub struct MyJob {
other_usize: usize, other_usize: usize,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct PanickingJob;
#[actix_rt::main] #[actix_rt::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init(); env_logger::init();
// Set up our Storage // Set up our Storage
let db = sled::Config::new().temporary(true).open()?; let db = sled::Config::new().temporary(true).open()?;
@ -29,10 +35,16 @@ async fn main() -> Result<(), Error> {
// Configure and start our workers // Configure and start our workers
WorkerConfig::new(move || MyState::new("My App")) WorkerConfig::new(move || MyState::new("My App"))
.register::<PanickingJob>()
.register::<MyJob>() .register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16) .set_worker_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone()); .start(queue_handle.clone());
// Queue some panicking job
for _ in 0..32 {
queue_handle.queue(PanickingJob)?;
}
// Queue our jobs // Queue our jobs
queue_handle.queue(MyJob::new(1, 2))?; queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?; queue_handle.queue(MyJob::new(3, 4))?;
@ -90,3 +102,19 @@ impl Job for MyJob {
ready(Ok(())) ready(Ok(()))
} }
} }
#[async_trait::async_trait]
impl Job for PanickingJob {
type State = MyState;
type Future = Ready<Result<(), Error>>;
const NAME: &'static str = "PanickingJob";
const QUEUE: &'static str = DEFAULT_QUEUE;
const MAX_RETRIES: MaxRetries = MaxRetries::Count(0);
fn run(self, _: MyState) -> Self::Future {
panic!("A panicking job does not stop others from running")
}
}

View file

@ -79,7 +79,7 @@ where
let opt = self let opt = self
.inner .inner
.get(job.name()) .get(job.name())
.map(|name| process(name, (self.state_fn)(), job.clone())); .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone()));
if let Some(fut) = opt { if let Some(fut) = opt {
fut.await fut.await
@ -100,7 +100,7 @@ where
/// intended for internal use. /// intended for internal use.
pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { pub async fn process(&self, job: JobInfo) -> ReturnJobInfo {
if let Some(name) = self.inner.get(job.name()) { if let Some(name) = self.inner.get(job.name()) {
process(name, self.state.clone(), job).await process(Arc::clone(name), self.state.clone(), job).await
} else { } else {
error!("Job {} not registered", job.name()); error!("Job {} not registered", job.name());
ReturnJobInfo::unregistered(job.id()) ReturnJobInfo::unregistered(job.id())
@ -108,13 +108,64 @@ where
} }
} }
async fn process<S>(process_fn: &ProcessFn<S>, state: S, job: JobInfo) -> ReturnJobInfo { struct CatchUnwindFuture<F> {
future: std::sync::Mutex<F>,
}
fn catch_unwind<F>(future: F) -> CatchUnwindFuture<F>
where
F: Future + Unpin,
{
CatchUnwindFuture {
future: std::sync::Mutex::new(future),
}
}
impl<F> std::future::Future for CatchUnwindFuture<F>
where
F: Future + Unpin,
{
type Output = std::thread::Result<F::Output>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let future = &self.future;
let waker = cx.waker().clone();
let res = std::panic::catch_unwind(|| {
let mut context = std::task::Context::from_waker(&waker);
let mut guard = future.lock().unwrap();
Pin::new(&mut *guard).poll(&mut context)
});
match res {
Ok(poll) => poll.map(Ok),
Err(e) => std::task::Poll::Ready(Err(e)),
}
}
}
async fn process<S>(process_fn: ProcessFn<S>, state: S, job: JobInfo) -> ReturnJobInfo
where
S: Clone,
{
let args = job.args(); let args = job.args();
let id = job.id(); let id = job.id();
let name = job.name().to_owned(); let name = job.name().to_owned();
let start = Utc::now(); let start = Utc::now();
let res = process_fn(args, state).await;
let state_mtx = std::sync::Mutex::new(state);
let process_mtx = std::sync::Mutex::new(process_fn);
let res = match std::panic::catch_unwind(|| {
let state = state_mtx.lock().unwrap().clone();
(process_mtx.lock().unwrap())(args, state)
}) {
Ok(fut) => catch_unwind(fut).await,
Err(e) => Err(e),
};
let end = Utc::now(); let end = Utc::now();
let duration = end - start; let duration = end - start;
@ -126,13 +177,17 @@ async fn process<S>(process_fn: &ProcessFn<S>, state: S, job: JobInfo) -> Return
}; };
match res { match res {
Ok(_) => { Ok(Ok(_)) => {
info!("Job {} {} completed {:.6}", id, name, seconds); info!("Job {} {} completed {:.6}", id, name, seconds);
ReturnJobInfo::pass(id) ReturnJobInfo::pass(id)
} }
Err(e) => { Ok(Err(e)) => {
info!("Job {} {} errored {} {:.6}", id, name, e, seconds); info!("Job {} {} errored {} {:.6}", id, name, e, seconds);
ReturnJobInfo::fail(id) ReturnJobInfo::fail(id)
} }
Err(_) => {
info!("Job {} {} panicked {:.6}", id, name, seconds);
ReturnJobInfo::fail(id)
}
} }
} }

View file

@ -1,6 +1,6 @@
use chrono::{offset::Utc, DateTime, Datelike, Timelike}; use chrono::{offset::Utc, DateTime, Datelike, Timelike};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
/// Statistics about the jobs processor /// Statistics about the jobs processor
pub struct Stats { pub struct Stats {
/// How many jobs are pending execution /// How many jobs are pending execution
@ -60,17 +60,6 @@ impl Stats {
} }
} }
impl Default for Stats {
fn default() -> Self {
Stats {
pending: 0,
running: 0,
dead: JobStat::default(),
complete: JobStat::default(),
}
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// A time-based overview of job completion and failures /// A time-based overview of job completion and failures
pub struct JobStat { pub struct JobStat {