diff --git a/examples/server-jobs-example/.env b/examples/server-jobs-example/.env index ee391cb..a6ece83 100644 --- a/examples/server-jobs-example/.env +++ b/examples/server-jobs-example/.env @@ -1 +1 @@ -RUST_LOG=server_jobs_example=info +RUST_LOG=info diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 769b3c9..abbfe6f 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -184,10 +184,6 @@ impl JobInfo { } } - pub(crate) fn is_failed(&self) -> bool { - self.status == JobStatus::Failed - } - pub fn needs_retry(&mut self) -> bool { let should_retry = self.is_failed() && self.increment().should_requeue(); @@ -207,6 +203,14 @@ impl JobInfo { self.status == JobStatus::Pending } + pub fn is_failed(&self) -> bool { + self.status == JobStatus::Failed + } + + pub fn is_finished(&self) -> bool { + self.status == JobStatus::Finished + } + pub(crate) fn is_in_queue(&self, queue: &str) -> bool { self.queue == queue } diff --git a/jobs-server/src/server/pull.rs b/jobs-server/src/server/pull.rs index 11431a6..78dd2af 100644 --- a/jobs-server/src/server/pull.rs +++ b/jobs-server/src/server/pull.rs @@ -36,8 +36,8 @@ use crate::server::{coerce, Config}; #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] enum EitherJob { - New(NewJobInfo), Existing(JobInfo), + New(NewJobInfo), } pub(crate) struct PullConfig { @@ -135,6 +135,16 @@ fn store_job( EitherJob::Existing(job) => job, }; + if job.is_pending() { + info!("Storing pending job, {}", job.id()); + } + if job.is_finished() { + info!("Finished job {}", job.id()); + } + if job.is_failed() { + info!("Job failed {}", job.id()); + } + storage.store_job(job, server_id).map_err(Error::from) }) .map_err(Error::from) diff --git a/jobs-server/src/server/push.rs b/jobs-server/src/server/push.rs index 9a5ea2e..65a58ad 100644 --- a/jobs-server/src/server/push.rs +++ b/jobs-server/src/server/push.rs @@ -151,7 +151,7 @@ fn fetch_queue( server_id: usize, ) -> Result, Error> { storage - .stage_jobs(100, queue, server_id) + .stage_jobs(10, queue, server_id) .map_err(Error::from) } diff --git a/jobs-server/src/worker/config.rs b/jobs-server/src/worker/config.rs index f4cf138..dfa2be1 100644 --- a/jobs-server/src/worker/config.rs +++ b/jobs-server/src/worker/config.rs @@ -39,7 +39,6 @@ where { pull: Pull, push: Push, - push2: Push, push_address: String, pull_address: String, queue: String, @@ -75,7 +74,6 @@ where let Worker { push, - push2, pull, push_address: _, pull_address: _, @@ -85,12 +83,13 @@ where } = self; let (tx, rx) = channel(5); + let tx2 = tx.clone(); tokio::spawn( rx.map_err(|_| RecvError) .from_err::() .and_then(serialize_request) - .forward(push2.sink(1)) + .forward(push.sink(1)) .map(|_| ()) .or_else(|_| Ok(())), ); @@ -101,8 +100,7 @@ where .and_then(parse_multipart) .and_then(move |job| report_running(job, tx.clone())) .and_then(move |job| process_job(job, &processors)) - .and_then(serialize_request) - .forward(push.sink(1)) + .forward(tx2) .map(move |_| info!("worker for queue {} is shutting down", queue)) .map_err(|e| { error!("Error processing job, {}", e); @@ -152,20 +150,14 @@ where Push::builder(self.context.clone()) .connect(&self.push_address) .build() - .join( - Push::builder(self.context.clone()) - .connect(&self.push_address) - .build(), - ) .join( Pull::builder(self.context.clone()) .connect(&self.pull_address) .build(), ) - .map(|((push, push2), pull)| { + .map(|(push, pull)| { let config = Worker { push, - push2, pull, push_address: self.push_address, pull_address: self.pull_address,