Add more logging around checkdb

This commit is contained in:
asonix 2020-03-21 13:41:15 -05:00
parent 248af5c783
commit 587adfebab

View file

@ -5,7 +5,7 @@ use crate::{
use actix::clock::{interval_at, Duration, Instant}; use actix::clock::{interval_at, Duration, Instant};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{error, trace}; use log::{debug, error};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
sync::Arc, sync::Arc,
@ -54,18 +54,14 @@ impl Server {
} }
async fn check_db(&self) -> Result<(), Error> { async fn check_db(&self) -> Result<(), Error> {
debug!("Checking db for ready jobs");
for queue in self.cache.keys().await { for queue in self.cache.keys().await {
'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await {
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if !self.try_turning(queue.clone(), worker).await? {
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?
}
} else {
self.cache.push(queue.clone(), worker).await;
break 'worker_loop; break 'worker_loop;
} }
} }
debug!("Finished job lookups for queue {}", queue);
} }
Ok(()) Ok(())
@ -77,18 +73,12 @@ impl Server {
self.storage.new_job(job).await?; self.storage.new_job(job).await?;
if !ready { if !ready {
debug!("New job is not ready for processing yet, returning");
return Ok(()); return Ok(());
} }
if let Some(worker) = self.cache.pop(queue.clone()).await { if let Some(worker) = self.cache.pop(queue.clone()).await {
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { self.try_turning(queue, worker).await?;
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?;
}
} else {
self.cache.push(queue, worker).await;
}
} }
Ok(()) Ok(())
@ -98,25 +88,33 @@ impl Server {
&self, &self,
worker: Box<dyn Worker + Send + 'static>, worker: Box<dyn Worker + Send + 'static>,
) -> Result<(), Error> { ) -> Result<(), Error> {
trace!("Worker {} requested job", worker.id()); debug!("Worker {} requested job", worker.id());
if let Ok(Some(job)) = self.storage.request_job(worker.queue(), worker.id()).await { self.try_turning(worker.queue().to_owned(), worker).await?;
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?;
}
} else {
trace!(
"storing worker {} for queue {}",
worker.id(),
worker.queue()
);
self.cache.push(worker.queue().to_owned(), worker).await;
}
Ok(()) Ok(())
} }
async fn try_turning(
&self,
queue: String,
worker: Box<dyn Worker + Send + 'static>,
) -> Result<bool, Error> {
debug!("Trying to find job for worker {}", worker.id());
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?
}
} else {
debug!("No job exists, returning worker {}", worker.id());
self.cache.push(queue.clone(), worker).await;
return Ok(false);
}
Ok(true)
}
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {
Ok(self.storage.return_job(job).await?) Ok(self.storage.return_job(job).await?)
} }