diff --git a/src/lib.rs b/src/lib.rs index 9e66b93..edc74f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,7 @@ impl Default for Jobs { pub struct Processors { inner: HashMap>, jobs: Option, + max_retries: usize, } impl Processors { @@ -95,6 +96,11 @@ impl Processors { Default::default() } + pub fn max_retries(&mut self, max_retries: usize) -> &mut Self { + self.max_retries = max_retries; + self + } + pub fn register_processor

(&mut self, processor: P) where P: Processor + Send + Sync + 'static, @@ -135,6 +141,7 @@ impl Default for Processors { Processors { inner: Default::default(), jobs: Default::default(), + max_retries: 5, } } } @@ -152,14 +159,21 @@ fn process( let fut = processor.process(args).then(move |res| match res { Ok(_) => Ok(info!("Job completed, {}", name)), - Err(e) => Err(error!("Job errored, {}, {}", name, e)), + Err(e) => { + error!("Job errored, {}, {}", name, e); + Err(e) + } }); processors.inner.insert(local_name, processor); fut.then(move |res| { - if res.is_err() { - jobs.requeue(&local_name_2, job); + if let Err(e) = res { + if job.retry_count < processors.max_retries { + jobs.requeue(&local_name_2, job); + } else { + error!("Job failed permanently, {}, {}", &local_name_2, e); + } } processors.jobs = Some(jobs);