From 587adfebab817cfdac68d02b59b697224732764d Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 21 Mar 2020 13:41:15 -0500 Subject: [PATCH] Add more logging around checkdb --- jobs-actix/src/server.rs | 58 +++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 4f6fa4f..eee99eb 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -5,7 +5,7 @@ use crate::{ use actix::clock::{interval_at, Duration, Instant}; use anyhow::Error; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; -use log::{error, trace}; +use log::{debug, error}; use std::{ collections::{HashMap, VecDeque}, sync::Arc, @@ -54,18 +54,14 @@ impl Server { } async fn check_db(&self) -> Result<(), Error> { + debug!("Checking db for ready jobs"); for queue in self.cache.keys().await { 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { - if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { - if let Err(job) = worker.process_job(job).await { - error!("Worker has hung up"); - self.storage.return_job(job.unexecuted()).await? - } - } else { - self.cache.push(queue.clone(), worker).await; + if !self.try_turning(queue.clone(), worker).await? { break 'worker_loop; } } + debug!("Finished job lookups for queue {}", queue); } Ok(()) @@ -77,18 +73,12 @@ impl Server { self.storage.new_job(job).await?; if !ready { + debug!("New job is not ready for processing yet, returning"); return Ok(()); } if let Some(worker) = self.cache.pop(queue.clone()).await { - if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { - if let Err(job) = worker.process_job(job).await { - error!("Worker has hung up"); - self.storage.return_job(job.unexecuted()).await?; - } - } else { - self.cache.push(queue, worker).await; - } + self.try_turning(queue, worker).await?; } Ok(()) @@ -98,25 +88,33 @@ impl Server { &self, worker: Box, ) -> Result<(), Error> { - trace!("Worker {} requested job", worker.id()); + debug!("Worker {} requested job", worker.id()); - if let Ok(Some(job)) = self.storage.request_job(worker.queue(), worker.id()).await { - if let Err(job) = worker.process_job(job).await { - error!("Worker has hung up"); - self.storage.return_job(job.unexecuted()).await?; - } - } else { - trace!( - "storing worker {} for queue {}", - worker.id(), - worker.queue() - ); - self.cache.push(worker.queue().to_owned(), worker).await; - } + self.try_turning(worker.queue().to_owned(), worker).await?; Ok(()) } + async fn try_turning( + &self, + queue: String, + worker: Box, + ) -> Result { + debug!("Trying to find job for worker {}", worker.id()); + if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { + if let Err(job) = worker.process_job(job).await { + error!("Worker has hung up"); + self.storage.return_job(job.unexecuted()).await? + } + } else { + debug!("No job exists, returning worker {}", worker.id()); + self.cache.push(queue.clone(), worker).await; + return Ok(false); + } + + Ok(true) + } + pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { Ok(self.storage.return_job(job).await?) }