Add max retry count

This commit is contained in:
asonix 2018-06-28 19:58:17 -05:00
parent 60569fc384
commit 01f593de8a

View file

@ -88,6 +88,7 @@ impl Default for Jobs {
pub struct Processors { pub struct Processors {
inner: HashMap<String, Box<Processor + Send + Sync + 'static>>, inner: HashMap<String, Box<Processor + Send + Sync + 'static>>,
jobs: Option<Jobs>, jobs: Option<Jobs>,
max_retries: usize,
} }
impl Processors { impl Processors {
@ -95,6 +96,11 @@ impl Processors {
Default::default() Default::default()
} }
pub fn max_retries(&mut self, max_retries: usize) -> &mut Self {
self.max_retries = max_retries;
self
}
pub fn register_processor<P>(&mut self, processor: P) pub fn register_processor<P>(&mut self, processor: P)
where where
P: Processor + Send + Sync + 'static, P: Processor + Send + Sync + 'static,
@ -135,6 +141,7 @@ impl Default for Processors {
Processors { Processors {
inner: Default::default(), inner: Default::default(),
jobs: Default::default(), jobs: Default::default(),
max_retries: 5,
} }
} }
} }
@ -152,14 +159,21 @@ fn process(
let fut = processor.process(args).then(move |res| match res { let fut = processor.process(args).then(move |res| match res {
Ok(_) => Ok(info!("Job completed, {}", name)), Ok(_) => Ok(info!("Job completed, {}", name)),
Err(e) => Err(error!("Job errored, {}, {}", name, e)), Err(e) => {
error!("Job errored, {}, {}", name, e);
Err(e)
}
}); });
processors.inner.insert(local_name, processor); processors.inner.insert(local_name, processor);
fut.then(move |res| { fut.then(move |res| {
if res.is_err() { if let Err(e) = res {
jobs.requeue(&local_name_2, job); if job.retry_count < processors.max_retries {
jobs.requeue(&local_name_2, job);
} else {
error!("Job failed permanently, {}, {}", &local_name_2, e);
}
} }
processors.jobs = Some(jobs); processors.jobs = Some(jobs);