Update metrics

This commit is contained in:
asonix 2024-01-07 12:30:04 -06:00
parent c275bc7ef6
commit 21c98d607f
17 changed files with 69 additions and 50 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs"
description = "Background Jobs implemented with actix and futures"
version = "0.15.0"
version = "0.16.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -33,15 +33,15 @@ error-logging = ["background-jobs-core/error-logging"]
[dependencies]
[dependencies.background-jobs-core]
version = "0.15.0"
version = "0.16.0"
path = "jobs-core"
[dependencies.background-jobs-actix]
version = "0.15.0"
version = "0.16.0"
path = "jobs-actix"
optional = true
[dependencies.background-jobs-metrics]
version = "0.15.0"
version = "0.16.0"
path = "jobs-metrics"
optional = true

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.15.0", path = "../..", features = [
background-jobs = { version = "0.16.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.15.0", path = "../..", features = [
background-jobs = { version = "0.16.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.15.0", path = "../..", features = [
background-jobs = { version = "0.16.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.15.0", path = "../..", features = [
background-jobs = { version = "0.16.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.15.0", path = "../..", features = [
background-jobs = { version = "0.16.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }

View file

@ -1,12 +1,15 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1678901627,
"narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=",
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
@ -17,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1679437018,
"narHash": "sha256-vOuiDPLHSEo/7NkiWtxpHpHgoXoNmrm+wkXZ6a072Fc=",
"lastModified": 1704194953,
"narHash": "sha256-RtDKd8Mynhe5CFnVT8s0/0yqtWFMM9LmCzXv/YKxnq4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "19cf008bb18e47b6e3b4e16e32a9a4bdd4b45f7e",
"rev": "bd645e8668ec6612439a9ee7e71f7eac4099d4f6",
"type": "github"
},
"original": {
@ -36,6 +39,21 @@
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-actix"
description = "in-process jobs processor based on Actix"
version = "0.15.0"
version = "0.16.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -13,10 +13,10 @@ edition = "2021"
actix-rt = "2.5.1"
anyhow = "1.0"
async-trait = "0.1.24"
background-jobs-core = { version = "0.15.0", path = "../jobs-core", features = [
background-jobs-core = { version = "0.16.0", path = "../jobs-core", features = [
"with-actix",
] }
metrics = "0.21.0"
metrics = "0.22.0"
tracing = "0.1"
tracing-futures = "0.2"
serde = { version = "1.0", features = ["derive"] }

View file

@ -197,7 +197,7 @@ impl Manager {
notified.await;
metrics::counter!("background-jobs.worker-arbiter.restart", 1, "number" => i.to_string());
metrics::counter!("background-jobs.worker-arbiter.restart", "number" => i.to_string()).increment(1);
tracing::warn!("Recovering from dead worker arbiter");
drop(worker_arbiter);

View file

@ -14,7 +14,8 @@ struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> {
impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State, Extras> {
fn drop(&mut self) {
metrics::counter!("background-jobs.worker.finished", 1, "queue" => self.queue.clone());
metrics::counter!("background-jobs.worker.finished", "queue" => self.queue.clone())
.increment(1);
let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {}));
@ -83,7 +84,7 @@ pub(crate) async fn local_worker<State, Extras>(
State: Clone + 'static,
Extras: 'static,
{
metrics::counter!("background-jobs.worker.started", 1, "queue" => queue.clone());
metrics::counter!("background-jobs.worker.started", "queue" => queue.clone()).increment(1);
let starter = LocalWorkerStarter {
queue: queue.clone(),
@ -108,7 +109,7 @@ pub(crate) async fn local_worker<State, Extras>(
{
Ok(job) => job,
Err(e) => {
metrics::counter!("background-jobs.worker.failed-request", 1);
metrics::counter!("background-jobs.worker.failed-request").increment(1);
let display_val = format!("{}", e);
let debug = format!("{:?}", e);
@ -134,7 +135,7 @@ pub(crate) async fn local_worker<State, Extras>(
.instrument(return_span.clone())
.await
{
metrics::counter!("background-jobs.worker.failed-return", 1);
metrics::counter!("background-jobs.worker.failed-return").increment(1);
let display_val = format!("{}", e);
let debug = format!("{:?}", e);

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor"
version = "0.15.0"
version = "0.16.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -20,7 +20,7 @@ actix-rt = { version = "2.3.0", optional = true }
anyhow = "1.0"
async-trait = "0.1.24"
event-listener = "2"
metrics = "0.21.0"
metrics = "0.22.0"
time = { version = "0.3", features = ["serde-human-readable"] }
tracing = "0.1"
tracing-futures = "0.2.5"

View file

@ -177,7 +177,7 @@ where
let span = Span::current();
span.record("job.execution_time", &tracing::field::display(&seconds));
metrics::histogram!("background-jobs.job.execution_time", seconds, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue().to_string(), "name" => job.name().to_string()).record(seconds);
match res {
Ok(Ok(_)) => {

View file

@ -48,7 +48,7 @@ pub trait Storage: Clone + Send {
let id = self.generate_id().await?;
let job = job.with_id(id);
metrics::counter!("background-jobs.job.created", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.created", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
let queue = job.queue().to_owned();
self.save_job(job).await?;
@ -68,7 +68,7 @@ pub trait Storage: Clone + Send {
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
metrics::counter!("background-jobs.job.started", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.started", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
return Ok(job);
} else {
@ -89,14 +89,14 @@ pub trait Storage: Clone + Send {
if result.is_failure() {
if let Some(mut job) = self.fetch_job(id).await? {
if job.needs_retry() {
metrics::counter!("background-jobs.job.failed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.failed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
self.queue_job(job.queue(), id).await?;
self.save_job(job).await
} else {
metrics::counter!("background-jobs.job.dead", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.dead", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
#[cfg(feature = "error-logging")]
tracing::warn!("Job {} failed permanently", id);
@ -105,29 +105,29 @@ pub trait Storage: Clone + Send {
}
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
metrics::counter!("background-jobs.job.missing").increment(1);
Ok(())
}
} else if result.is_unregistered() || result.is_unexecuted() {
if let Some(mut job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.returned", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.returned", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
job.pending();
self.queue_job(job.queue(), id).await?;
self.save_job(job).await
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
metrics::counter!("background-jobs.job.missing").increment(1);
Ok(())
}
} else {
if let Some(job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.completed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string());
metrics::counter!("background-jobs.job.completed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing", 1);
metrics::counter!("background-jobs.job.missing").increment(1);
}
self.delete_job(id).await

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-metrics"
description = "Background Jobs implemented with actix and futures - metrics subscriber"
version = "0.15.0"
version = "0.16.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -12,5 +12,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
metrics = "0.21.0"
metrics-util = "0.15.0"
metrics = "0.22.0"
metrics-util = "0.16.0"

View file

@ -30,7 +30,7 @@ pub use recorder::{JobStat, Stats, StatsHandle, StatsRecorder};
/// ```rust
/// background_jobs_metrics::install().expect("Failed to install recorder");
/// ```
pub fn install() -> Result<StatsHandle, SetRecorderError> {
pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
StatsRecorder::install()
}

View file

@ -1,7 +1,7 @@
mod bucket;
use self::bucket::Buckets;
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Metadata, Recorder, SetRecorderError};
use metrics_util::registry::{Registry, Storage};
use std::{
sync::{
@ -104,10 +104,10 @@ impl StatsRecorder {
/// # use background_jobs_metrics::StatsRecorder;
/// StatsRecorder::install().expect("Failed to install recorder");
/// ```
pub fn install() -> Result<StatsHandle, SetRecorderError> {
pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
let (recorder, handle) = Self::build();
metrics::set_boxed_recorder(Box::new(recorder))?;
metrics::set_global_recorder(recorder)?;
Ok(handle)
}
@ -207,16 +207,16 @@ impl Recorder for StatsRecorder {
) {
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> metrics::Counter {
self.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> metrics::Gauge {
self.registry.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram {
self.registry
.get_or_create_histogram(key, |c| c.clone().into())
}

View file

@ -13,7 +13,7 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.15.0", path = "../jobs-core" }
background-jobs-core = { version = "0.16.0", path = "../jobs-core" }
bincode = "1.2"
sled = "0.34"
serde_cbor = "0.11"