From 54b0b0fb0e4696071110f376e0b9f3f35cb89807 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 14:38:47 -0600 Subject: [PATCH] jobs-core: Replace Stats with metrics --- jobs-core/Cargo.toml | 3 +- jobs-core/src/job_info.rs | 3 +- jobs-core/src/lib.rs | 2 - jobs-core/src/processor_map.rs | 11 ++- jobs-core/src/stats.rs | 147 --------------------------------- jobs-core/src/storage.rs | 70 +++++++--------- 6 files changed, 39 insertions(+), 197 deletions(-) delete mode 100644 jobs-core/src/stats.rs diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index bd21979..8ab4894 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.13.0" +version = "0.14.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,6 +20,7 @@ actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" async-trait = "0.1.24" event-listener = "2" +metrics = "0.20.1" time = { version = "0.3", features = ["serde-human-readable"] } tracing = "0.1" tracing-futures = "0.2.5" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 80dc658..33e8a81 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -2,7 +2,6 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; use serde_json::Value; use std::time::SystemTime; use time::{Duration, OffsetDateTime}; -use tracing::trace; use uuid::Uuid; #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] @@ -215,7 +214,7 @@ impl JobInfo { self.next_queue = Some(next_queue); - trace!( + tracing::trace!( "Now {}, Next queue {}, ready {}", now, next_queue, diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index a1a0e76..117f2ca 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -14,14 +14,12 @@ mod catch_unwind; mod job; mod job_info; mod processor_map; -mod stats; mod storage; pub use crate::{ job::{new_job, new_scheduled_job, process, Job}, job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, processor_map::{CachedProcessorMap, ProcessorMap}, - stats::{JobStat, Stats}, storage::{memory_storage, Storage}, }; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index e739f5d..fae76f5 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,7 +1,7 @@ use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo}; use serde_json::Value; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Instant}; -use tracing::{error, Span}; +use tracing::Span; use tracing_futures::Instrument; use uuid::Uuid; @@ -98,7 +98,7 @@ where "exception.details", &tracing::field::display("Not registered"), ); - error!("Not registered"); + tracing::error!("Not registered"); ReturnJobInfo::unregistered(job.id()) }; @@ -133,7 +133,7 @@ where "exception.details", &tracing::field::display("Not registered"), ); - error!("Not registered"); + tracing::error!("Not registered"); ReturnJobInfo::unregistered(job.id()) }; @@ -175,13 +175,12 @@ where Ok(fut) => catch_unwind(fut).await, Err(e) => Err(e), }; - let end = Instant::now(); - - let duration = end - start; + let duration = start.elapsed(); let seconds = duration.as_micros() as f64 / 1_000_000_f64; let span = Span::current(); span.record("job.execution_time", &tracing::field::display(&seconds)); + metrics::histogram!("background-jobs.job.execution_time", seconds, "queue" => job.queue().to_string(), "name" => job.name().to_string()); match res { Ok(Ok(_)) => { diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs deleted file mode 100644 index 95cfa71..0000000 --- a/jobs-core/src/stats.rs +++ /dev/null @@ -1,147 +0,0 @@ -use time::OffsetDateTime; - -#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] -/// Statistics about the jobs processor -pub struct Stats { - /// How many jobs are pending execution - pub pending: usize, - - /// How many jobs are currently executing - pub running: usize, - - /// How many jobs are permanently failed - pub dead: JobStat, - - /// How many jobs have completed successfully - pub complete: JobStat, -} - -impl Stats { - /// A new, empty stats struct - pub fn new() -> Self { - Self::default() - } - - pub(crate) fn new_job(mut self) -> Self { - self.pending += 1; - self - } - - pub(crate) fn run_job(mut self) -> Self { - if self.pending > 0 { - self.pending -= 1; - } - self.running += 1; - self - } - - pub(crate) fn retry_job(mut self) -> Self { - self.pending += 1; - if self.running > 0 { - self.running -= 1; - } - self - } - - pub(crate) fn fail_job(mut self) -> Self { - if self.running > 0 { - self.running -= 1; - } - self.dead.increment(); - self - } - - pub(crate) fn complete_job(mut self) -> Self { - if self.running > 0 { - self.running -= 1; - } - self.complete.increment(); - self - } -} - -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -/// A time-based overview of job completion and failures -pub struct JobStat { - this_hour: usize, - today: usize, - this_month: usize, - all_time: usize, - updated_at: OffsetDateTime, -} - -impl JobStat { - /// A new, empty job statistic - pub fn new() -> Self { - Self::default() - } - - fn increment(&mut self) { - self.tick(); - - self.this_hour += 1; - self.today += 1; - self.this_month += 1; - self.all_time += 1; - } - - fn tick(&mut self) { - let now = OffsetDateTime::now_utc(); - - if now.month() != self.updated_at.month() { - self.next_month(); - } else if now.day() != self.updated_at.day() { - self.next_day(); - } else if now.hour() != self.updated_at.hour() { - self.next_hour(); - } - - self.updated_at = now; - } - - fn next_hour(&mut self) { - self.this_hour = 0; - } - - fn next_day(&mut self) { - self.next_hour(); - self.today = 0; - } - - fn next_month(&mut self) { - self.next_day(); - self.this_month = 0; - } - - /// A count from the last hour - pub fn this_hour(&self) -> usize { - self.this_hour - } - - /// A count from the last day - pub fn today(&self) -> usize { - self.today - } - - /// A count from the last month - pub fn this_month(&self) -> usize { - self.this_month - } - - /// A total count - pub fn all_time(&self) -> usize { - self.all_time - } -} - -impl Default for JobStat { - fn default() -> Self { - JobStat { - this_hour: 0, - today: 0, - this_month: 0, - all_time: 0, - updated_at: OffsetDateTime::now_utc(), - } - } -} diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index bb1d4e6..9e29335 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,6 +1,5 @@ -use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; +use crate::{JobInfo, NewJobInfo, ReturnJobInfo}; use std::{error::Error, time::SystemTime}; -use tracing::warn; use uuid::Uuid; /// Define a storage backend for jobs @@ -44,25 +43,16 @@ pub trait Storage: Clone + Send { /// This happens when a job has been completed or has failed too many times async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error>; - /// This method returns the current statistics, or Stats::default() if none exists. - async fn get_stats(&self) -> Result; - - /// This method fetches the existing statistics or Stats::default(), and stores the result of - /// calling `update_stats` on it. - async fn update_stats(&self, f: F) -> Result<(), Self::Error> - where - F: Fn(Stats) -> Stats + Send + 'static; - /// Generate a new job based on the provided NewJobInfo async fn new_job(&self, job: NewJobInfo) -> Result { let id = self.generate_id().await?; let job = job.with_id(id); + metrics::counter!("background-jobs.job.created", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); let queue = job.queue().to_owned(); self.save_job(job).await?; self.queue_job(&queue, id).await?; - self.update_stats(Stats::new_job).await?; Ok(id) } @@ -77,11 +67,12 @@ pub trait Storage: Clone + Send { job.run(); self.run_job(job.id(), runner_id).await?; self.save_job(job.clone()).await?; - self.update_stats(Stats::run_job).await?; + + metrics::gauge!("background-jobs.job.running", 1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string()); return Ok(job); } else { - warn!( + tracing::warn!( "Not fetching job {}, it is not ready for processing", job.id() ); @@ -98,38 +89,55 @@ pub trait Storage: Clone + Send { if result.is_failure() { if let Some(mut job) = self.fetch_job(id).await? { if job.needs_retry() { + metrics::counter!("background-jobs.job.failed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + self.queue_job(job.queue(), id).await?; - self.save_job(job).await?; - self.update_stats(Stats::retry_job).await + self.save_job(job).await } else { + metrics::counter!("background-jobs.job.dead", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + #[cfg(feature = "error-logging")] tracing::warn!("Job {} failed permanently", id); - self.delete_job(id).await?; - self.update_stats(Stats::fail_job).await + self.delete_job(id).await } } else { + tracing::warn!("Returned non-existant job"); + metrics::counter!("background-jobs.job.missing", 1); Ok(()) } } else if result.is_unregistered() || result.is_unexecuted() { if let Some(mut job) = self.fetch_job(id).await? { + metrics::counter!("background-jobs.job.returned", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + job.pending(); self.queue_job(job.queue(), id).await?; - self.save_job(job).await?; - self.update_stats(Stats::retry_job).await + self.save_job(job).await } else { + tracing::warn!("Returned non-existant job"); + metrics::counter!("background-jobs.job.missing", 1); Ok(()) } } else { - self.delete_job(id).await?; - self.update_stats(Stats::complete_job).await + if let Some(job) = self.fetch_job(id).await? { + metrics::counter!("background-jobs.job.completed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + } else { + tracing::warn!("Returned non-existant job"); + metrics::counter!("background-jobs.job.missing", 1); + } + + self.delete_job(id).await } } } /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { - use super::{JobInfo, Stats}; + use super::JobInfo; use event_listener::Event; use std::{ collections::HashMap, @@ -162,7 +170,6 @@ pub mod memory_storage { job_queues: HashMap, worker_ids: HashMap, worker_ids_inverse: HashMap, - stats: Stats, } impl Storage { @@ -175,7 +182,6 @@ pub mod memory_storage { job_queues: HashMap::new(), worker_ids: HashMap::new(), worker_ids_inverse: HashMap::new(), - stats: Stats::default(), })), timer, } @@ -293,19 +299,5 @@ pub mod memory_storage { } Ok(()) } - - async fn get_stats(&self) -> Result { - Ok(self.inner.lock().unwrap().stats.clone()) - } - - async fn update_stats(&self, f: F) -> Result<(), Self::Error> - where - F: Fn(Stats) -> Stats + Send, - { - let mut inner = self.inner.lock().unwrap(); - - inner.stats = (f)(inner.stats.clone()); - Ok(()) - } } }