jobs-core: Replace Stats with metrics

This commit is contained in:
asonix 2022-11-19 14:38:47 -06:00
parent 9c77c20883
commit 54b0b0fb0e
6 changed files with 39 additions and 197 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor"
version = "0.13.0"
version = "0.14.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -20,6 +20,7 @@ actix-rt = { version = "2.3.0", optional = true }
anyhow = "1.0"
async-trait = "0.1.24"
event-listener = "2"
metrics = "0.20.1"
time = { version = "0.3", features = ["serde-human-readable"] }
tracing = "0.1"
tracing-futures = "0.2.5"

View file

@ -2,7 +2,6 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
use serde_json::Value;
use std::time::SystemTime;
use time::{Duration, OffsetDateTime};
use tracing::trace;
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
@ -215,7 +214,7 @@ impl JobInfo {
self.next_queue = Some(next_queue);
trace!(
tracing::trace!(
"Now {}, Next queue {}, ready {}",
now,
next_queue,

View file

@ -14,14 +14,12 @@ mod catch_unwind;
mod job;
mod job_info;
mod processor_map;
mod stats;
mod storage;
pub use crate::{
job::{new_job, new_scheduled_job, process, Job},
job_info::{JobInfo, NewJobInfo, ReturnJobInfo},
processor_map::{CachedProcessorMap, ProcessorMap},
stats::{JobStat, Stats},
storage::{memory_storage, Storage},
};

View file

@ -1,7 +1,7 @@
use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo};
use serde_json::Value;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Instant};
use tracing::{error, Span};
use tracing::Span;
use tracing_futures::Instrument;
use uuid::Uuid;
@ -98,7 +98,7 @@ where
"exception.details",
&tracing::field::display("Not registered"),
);
error!("Not registered");
tracing::error!("Not registered");
ReturnJobInfo::unregistered(job.id())
};
@ -133,7 +133,7 @@ where
"exception.details",
&tracing::field::display("Not registered"),
);
error!("Not registered");
tracing::error!("Not registered");
ReturnJobInfo::unregistered(job.id())
};
@ -175,13 +175,12 @@ where
Ok(fut) => catch_unwind(fut).await,
Err(e) => Err(e),
};
let end = Instant::now();
let duration = end - start;
let duration = start.elapsed();
let seconds = duration.as_micros() as f64 / 1_000_000_f64;
let span = Span::current();
span.record("job.execution_time", &tracing::field::display(&seconds));
metrics::histogram!("background-jobs.job.execution_time", seconds, "queue" => job.queue().to_string(), "name" => job.name().to_string());
match res {
Ok(Ok(_)) => {

View file

@ -1,147 +0,0 @@
use time::OffsetDateTime;
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
/// Statistics about the jobs processor
pub struct Stats {
/// How many jobs are pending execution
pub pending: usize,
/// How many jobs are currently executing
pub running: usize,
/// How many jobs are permanently failed
pub dead: JobStat,
/// How many jobs have completed successfully
pub complete: JobStat,
}
impl Stats {
/// A new, empty stats struct
pub fn new() -> Self {
Self::default()
}
pub(crate) fn new_job(mut self) -> Self {
self.pending += 1;
self
}
pub(crate) fn run_job(mut self) -> Self {
if self.pending > 0 {
self.pending -= 1;
}
self.running += 1;
self
}
pub(crate) fn retry_job(mut self) -> Self {
self.pending += 1;
if self.running > 0 {
self.running -= 1;
}
self
}
pub(crate) fn fail_job(mut self) -> Self {
if self.running > 0 {
self.running -= 1;
}
self.dead.increment();
self
}
pub(crate) fn complete_job(mut self) -> Self {
if self.running > 0 {
self.running -= 1;
}
self.complete.increment();
self
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// A time-based overview of job completion and failures
pub struct JobStat {
this_hour: usize,
today: usize,
this_month: usize,
all_time: usize,
updated_at: OffsetDateTime,
}
impl JobStat {
/// A new, empty job statistic
pub fn new() -> Self {
Self::default()
}
fn increment(&mut self) {
self.tick();
self.this_hour += 1;
self.today += 1;
self.this_month += 1;
self.all_time += 1;
}
fn tick(&mut self) {
let now = OffsetDateTime::now_utc();
if now.month() != self.updated_at.month() {
self.next_month();
} else if now.day() != self.updated_at.day() {
self.next_day();
} else if now.hour() != self.updated_at.hour() {
self.next_hour();
}
self.updated_at = now;
}
fn next_hour(&mut self) {
self.this_hour = 0;
}
fn next_day(&mut self) {
self.next_hour();
self.today = 0;
}
fn next_month(&mut self) {
self.next_day();
self.this_month = 0;
}
/// A count from the last hour
pub fn this_hour(&self) -> usize {
self.this_hour
}
/// A count from the last day
pub fn today(&self) -> usize {
self.today
}
/// A count from the last month
pub fn this_month(&self) -> usize {
self.this_month
}
/// A total count
pub fn all_time(&self) -> usize {
self.all_time
}
}
impl Default for JobStat {
fn default() -> Self {
JobStat {
this_hour: 0,
today: 0,
this_month: 0,
all_time: 0,
updated_at: OffsetDateTime::now_utc(),
}
}
}

View file

@ -1,6 +1,5 @@
use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
use crate::{JobInfo, NewJobInfo, ReturnJobInfo};
use std::{error::Error, time::SystemTime};
use tracing::warn;
use uuid::Uuid;
/// Define a storage backend for jobs
@ -44,25 +43,16 @@ pub trait Storage: Clone + Send {
/// This happens when a job has been completed or has failed too many times
async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error>;
/// This method returns the current statistics, or Stats::default() if none exists.
async fn get_stats(&self) -> Result<Stats, Self::Error>;
/// This method fetches the existing statistics or Stats::default(), and stores the result of
/// calling `update_stats` on it.
async fn update_stats<F>(&self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats + Send + 'static;
/// Generate a new job based on the provided NewJobInfo
async fn new_job(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
let id = self.generate_id().await?;
let job = job.with_id(id);
metrics::counter!("background-jobs.job.created", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
let queue = job.queue().to_owned();
self.save_job(job).await?;
self.queue_job(&queue, id).await?;
self.update_stats(Stats::new_job).await?;
Ok(id)
}
@ -77,11 +67,12 @@ pub trait Storage: Clone + Send {
job.run();
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
self.update_stats(Stats::run_job).await?;
metrics::gauge!("background-jobs.job.running", 1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string());
return Ok(job);
} else {
warn!(
tracing::warn!(
"Not fetching job {}, it is not ready for processing",
job.id()
);
@ -98,38 +89,55 @@ pub trait Storage: Clone + Send {
if result.is_failure() {
if let Some(mut job) = self.fetch_job(id).await? {
if job.needs_retry() {
metrics::counter!("background-jobs.job.failed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string());
self.queue_job(job.queue(), id).await?;
self.save_job(job).await?;
self.update_stats(Stats::retry_job).await
self.save_job(job).await
} else {
metrics::counter!("background-jobs.job.dead", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string());
#[cfg(feature = "error-logging")]
tracing::warn!("Job {} failed permanently", id);
self.delete_job(id).await?;
self.update_stats(Stats::fail_job).await
self.delete_job(id).await
}
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
Ok(())
}
} else if result.is_unregistered() || result.is_unexecuted() {
if let Some(mut job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.returned", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string());
job.pending();
self.queue_job(job.queue(), id).await?;
self.save_job(job).await?;
self.update_stats(Stats::retry_job).await
self.save_job(job).await
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
Ok(())
}
} else {
self.delete_job(id).await?;
self.update_stats(Stats::complete_job).await
if let Some(job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.completed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::gauge!("background-jobs.job.running", -1.0, "queue" => job.queue().to_string(), "name" => job.name().to_string());
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
}
self.delete_job(id).await
}
}
}
/// A default, in-memory implementation of a storage mechanism
pub mod memory_storage {
use super::{JobInfo, Stats};
use super::JobInfo;
use event_listener::Event;
use std::{
collections::HashMap,
@ -162,7 +170,6 @@ pub mod memory_storage {
job_queues: HashMap<Uuid, String>,
worker_ids: HashMap<Uuid, Uuid>,
worker_ids_inverse: HashMap<Uuid, Uuid>,
stats: Stats,
}
impl<T: Timer> Storage<T> {
@ -175,7 +182,6 @@ pub mod memory_storage {
job_queues: HashMap::new(),
worker_ids: HashMap::new(),
worker_ids_inverse: HashMap::new(),
stats: Stats::default(),
})),
timer,
}
@ -293,19 +299,5 @@ pub mod memory_storage {
}
Ok(())
}
async fn get_stats(&self) -> Result<Stats, Self::Error> {
Ok(self.inner.lock().unwrap().stats.clone())
}
async fn update_stats<F>(&self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats + Send,
{
let mut inner = self.inner.lock().unwrap();
inner.stats = (f)(inner.stats.clone());
Ok(())
}
}
}