From 60569fc38494b9a272017ecff1f48eabd7016e37 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 28 Jun 2018 19:36:41 -0500 Subject: [PATCH] Clean a bit --- src/lib.rs | 82 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 65b4d17..9e66b93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,12 +77,24 @@ impl Jobs { } } +impl Default for Jobs { + fn default() -> Self { + Jobs { + inner: Default::default(), + } + } +} + pub struct Processors { inner: HashMap>, jobs: Option, } impl Processors { + pub fn new() -> Self { + Default::default() + } + pub fn register_processor

(&mut self, processor: P) where P: Processor + Send + Sync + 'static, @@ -107,41 +119,51 @@ impl Processors { name: String, job: JobInfo, ) -> impl Future { - let mut jobs = self.jobs.take().unwrap(); - let args = job.args.clone(); + let jobs = self.jobs.take().unwrap(); let processor = self.inner.remove(&name); processor .ok_or(JobError::MissingProcessor) .into_future() - .and_then(move |processor| { - let local_name = name.clone(); - let local_name_2 = name.clone(); - - let fut = processor.process(args).then(move |res| match res { - Ok(_) => Ok(info!("Job completed, {}", name)), - Err(e) => { - error!("Job errored, {}, {}", name, e); - - Err(JobError::Processing(e)) - } - }); - - self.inner.insert(local_name, processor); - - fut.then(move |res| { - match res { - Ok(_) => (), - Err(_) => { - jobs.requeue(&local_name_2, job); - } - } - - self.jobs = Some(jobs); - - Ok(self) - }) - }) + .and_then(move |processor| process(self, jobs, processor, job, name)) } } + +impl Default for Processors { + fn default() -> Self { + Processors { + inner: Default::default(), + jobs: Default::default(), + } + } +} + +fn process( + mut processors: Processors, + mut jobs: Jobs, + processor: Box, + job: JobInfo, + name: String, +) -> impl Future { + let args = job.args.clone(); + let local_name = name.clone(); + let local_name_2 = name.clone(); + + let fut = processor.process(args).then(move |res| match res { + Ok(_) => Ok(info!("Job completed, {}", name)), + Err(e) => Err(error!("Job errored, {}, {}", name, e)), + }); + + processors.inner.insert(local_name, processor); + + fut.then(move |res| { + if res.is_err() { + jobs.requeue(&local_name_2, job); + } + + processors.jobs = Some(jobs); + + Ok(processors) + }) +}