From 21c98d607fc02f7bfd6c5d931b7b85e747e2ebfe Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 7 Jan 2024 12:30:04 -0600 Subject: [PATCH] Update metrics --- Cargo.toml | 8 ++++---- examples/basic-example/Cargo.toml | 2 +- examples/long-example/Cargo.toml | 2 +- examples/managed-example/Cargo.toml | 2 +- examples/metrics-example/Cargo.toml | 2 +- examples/panic-example/Cargo.toml | 2 +- flake.lock | 30 +++++++++++++++++++++++------ jobs-actix/Cargo.toml | 6 +++--- jobs-actix/src/lib.rs | 2 +- jobs-actix/src/worker.rs | 9 +++++---- jobs-core/Cargo.toml | 4 ++-- jobs-core/src/processor_map.rs | 2 +- jobs-core/src/storage.rs | 26 ++++++++++++------------- jobs-metrics/Cargo.toml | 6 +++--- jobs-metrics/src/lib.rs | 2 +- jobs-metrics/src/recorder.rs | 12 ++++++------ jobs-sled/Cargo.toml | 2 +- 17 files changed, 69 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 57b7ada..54301f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with actix and futures" -version = "0.15.0" +version = "0.16.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -33,15 +33,15 @@ error-logging = ["background-jobs-core/error-logging"] [dependencies] [dependencies.background-jobs-core] -version = "0.15.0" +version = "0.16.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.15.0" +version = "0.16.0" path = "jobs-actix" optional = true [dependencies.background-jobs-metrics] -version = "0.15.0" +version = "0.16.0" path = "jobs-metrics" optional = true diff --git a/examples/basic-example/Cargo.toml b/examples/basic-example/Cargo.toml index ff61ea9..ea3fe0b 100644 --- a/examples/basic-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.15.0", path = "../..", features = [ +background-jobs = { version = "0.16.0", path = "../..", features = [ "error-logging", ] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml index a76c857..ea091f4 100644 --- a/examples/long-example/Cargo.toml +++ b/examples/long-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.15.0", path = "../..", features = [ +background-jobs = { version = "0.16.0", path = "../..", features = [ "error-logging", ] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml index 8e49e52..ef7d54a 100644 --- a/examples/managed-example/Cargo.toml +++ b/examples/managed-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.15.0", path = "../..", features = [ +background-jobs = { version = "0.16.0", path = "../..", features = [ "error-logging", ] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } diff --git a/examples/metrics-example/Cargo.toml b/examples/metrics-example/Cargo.toml index e115eaf..d2e1fa2 100644 --- a/examples/metrics-example/Cargo.toml +++ b/examples/metrics-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.15.0", path = "../..", features = [ +background-jobs = { version = "0.16.0", path = "../..", features = [ "error-logging", ] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml index 0e36565..2f34eb1 100644 --- a/examples/panic-example/Cargo.toml +++ b/examples/panic-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.15.0", path = "../..", features = [ +background-jobs = { version = "0.16.0", path = "../..", features = [ "error-logging", ] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } diff --git a/flake.lock b/flake.lock index 55f41fb..383e071 100644 --- a/flake.lock +++ b/flake.lock @@ -1,12 +1,15 @@ { "nodes": { "flake-utils": { + "inputs": { + "systems": "systems" + }, "locked": { - "lastModified": 1678901627, - "narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=", + "lastModified": 1701680307, + "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", "owner": "numtide", "repo": "flake-utils", - "rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6", + "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", "type": "github" }, "original": { @@ -17,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1679437018, - "narHash": "sha256-vOuiDPLHSEo/7NkiWtxpHpHgoXoNmrm+wkXZ6a072Fc=", + "lastModified": 1704194953, + "narHash": "sha256-RtDKd8Mynhe5CFnVT8s0/0yqtWFMM9LmCzXv/YKxnq4=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "19cf008bb18e47b6e3b4e16e32a9a4bdd4b45f7e", + "rev": "bd645e8668ec6612439a9ee7e71f7eac4099d4f6", "type": "github" }, "original": { @@ -36,6 +39,21 @@ "flake-utils": "flake-utils", "nixpkgs": "nixpkgs" } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } } }, "root": "root", diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 85749f6..890b548 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.15.0" +version = "0.16.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -13,10 +13,10 @@ edition = "2021" actix-rt = "2.5.1" anyhow = "1.0" async-trait = "0.1.24" -background-jobs-core = { version = "0.15.0", path = "../jobs-core", features = [ +background-jobs-core = { version = "0.16.0", path = "../jobs-core", features = [ "with-actix", ] } -metrics = "0.21.0" +metrics = "0.22.0" tracing = "0.1" tracing-futures = "0.2" serde = { version = "1.0", features = ["derive"] } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 4c0b11c..60614c0 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -197,7 +197,7 @@ impl Manager { notified.await; - metrics::counter!("background-jobs.worker-arbiter.restart", 1, "number" => i.to_string()); + metrics::counter!("background-jobs.worker-arbiter.restart", "number" => i.to_string()).increment(1); tracing::warn!("Recovering from dead worker arbiter"); drop(worker_arbiter); diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index b86e82a..10e4005 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -14,7 +14,8 @@ struct LocalWorkerStarter { impl Drop for LocalWorkerStarter { fn drop(&mut self) { - metrics::counter!("background-jobs.worker.finished", 1, "queue" => self.queue.clone()); + metrics::counter!("background-jobs.worker.finished", "queue" => self.queue.clone()) + .increment(1); let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {})); @@ -83,7 +84,7 @@ pub(crate) async fn local_worker( State: Clone + 'static, Extras: 'static, { - metrics::counter!("background-jobs.worker.started", 1, "queue" => queue.clone()); + metrics::counter!("background-jobs.worker.started", "queue" => queue.clone()).increment(1); let starter = LocalWorkerStarter { queue: queue.clone(), @@ -108,7 +109,7 @@ pub(crate) async fn local_worker( { Ok(job) => job, Err(e) => { - metrics::counter!("background-jobs.worker.failed-request", 1); + metrics::counter!("background-jobs.worker.failed-request").increment(1); let display_val = format!("{}", e); let debug = format!("{:?}", e); @@ -134,7 +135,7 @@ pub(crate) async fn local_worker( .instrument(return_span.clone()) .await { - metrics::counter!("background-jobs.worker.failed-return", 1); + metrics::counter!("background-jobs.worker.failed-return").increment(1); let display_val = format!("{}", e); let debug = format!("{:?}", e); diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index fa2a3e5..f1abf5a 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.15.0" +version = "0.16.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,7 +20,7 @@ actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" async-trait = "0.1.24" event-listener = "2" -metrics = "0.21.0" +metrics = "0.22.0" time = { version = "0.3", features = ["serde-human-readable"] } tracing = "0.1" tracing-futures = "0.2.5" diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 55caa4d..92e55d4 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -177,7 +177,7 @@ where let span = Span::current(); span.record("job.execution_time", &tracing::field::display(&seconds)); - metrics::histogram!("background-jobs.job.execution_time", seconds, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue().to_string(), "name" => job.name().to_string()).record(seconds); match res { Ok(Ok(_)) => { diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index c42bb40..aa29c5e 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -48,7 +48,7 @@ pub trait Storage: Clone + Send { let id = self.generate_id().await?; let job = job.with_id(id); - metrics::counter!("background-jobs.job.created", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.created", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); let queue = job.queue().to_owned(); self.save_job(job).await?; @@ -68,7 +68,7 @@ pub trait Storage: Clone + Send { self.run_job(job.id(), runner_id).await?; self.save_job(job.clone()).await?; - metrics::counter!("background-jobs.job.started", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.started", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); return Ok(job); } else { @@ -89,14 +89,14 @@ pub trait Storage: Clone + Send { if result.is_failure() { if let Some(mut job) = self.fetch_job(id).await? { if job.needs_retry() { - metrics::counter!("background-jobs.job.failed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); - metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.failed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); + metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); self.queue_job(job.queue(), id).await?; self.save_job(job).await } else { - metrics::counter!("background-jobs.job.dead", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); - metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.dead", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); + metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); #[cfg(feature = "error-logging")] tracing::warn!("Job {} failed permanently", id); @@ -105,29 +105,29 @@ pub trait Storage: Clone + Send { } } else { tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing", 1); + metrics::counter!("background-jobs.job.missing").increment(1); Ok(()) } } else if result.is_unregistered() || result.is_unexecuted() { if let Some(mut job) = self.fetch_job(id).await? { - metrics::counter!("background-jobs.job.returned", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); - metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.returned", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); + metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); job.pending(); self.queue_job(job.queue(), id).await?; self.save_job(job).await } else { tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing", 1); + metrics::counter!("background-jobs.job.missing").increment(1); Ok(()) } } else { if let Some(job) = self.fetch_job(id).await? { - metrics::counter!("background-jobs.job.completed", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); - metrics::counter!("background-jobs.job.finished", 1, "queue" => job.queue().to_string(), "name" => job.name().to_string()); + metrics::counter!("background-jobs.job.completed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); + metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); } else { tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing", 1); + metrics::counter!("background-jobs.job.missing").increment(1); } self.delete_job(id).await diff --git a/jobs-metrics/Cargo.toml b/jobs-metrics/Cargo.toml index 6383a18..973f27e 100644 --- a/jobs-metrics/Cargo.toml +++ b/jobs-metrics/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-metrics" description = "Background Jobs implemented with actix and futures - metrics subscriber" -version = "0.15.0" +version = "0.16.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -12,5 +12,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -metrics = "0.21.0" -metrics-util = "0.15.0" +metrics = "0.22.0" +metrics-util = "0.16.0" diff --git a/jobs-metrics/src/lib.rs b/jobs-metrics/src/lib.rs index 8d0805b..a9e3e25 100644 --- a/jobs-metrics/src/lib.rs +++ b/jobs-metrics/src/lib.rs @@ -30,7 +30,7 @@ pub use recorder::{JobStat, Stats, StatsHandle, StatsRecorder}; /// ```rust /// background_jobs_metrics::install().expect("Failed to install recorder"); /// ``` -pub fn install() -> Result { +pub fn install() -> Result> { StatsRecorder::install() } diff --git a/jobs-metrics/src/recorder.rs b/jobs-metrics/src/recorder.rs index 2a76568..7035a74 100644 --- a/jobs-metrics/src/recorder.rs +++ b/jobs-metrics/src/recorder.rs @@ -1,7 +1,7 @@ mod bucket; use self::bucket::Buckets; -use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError}; +use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Metadata, Recorder, SetRecorderError}; use metrics_util::registry::{Registry, Storage}; use std::{ sync::{ @@ -104,10 +104,10 @@ impl StatsRecorder { /// # use background_jobs_metrics::StatsRecorder; /// StatsRecorder::install().expect("Failed to install recorder"); /// ``` - pub fn install() -> Result { + pub fn install() -> Result> { let (recorder, handle) = Self::build(); - metrics::set_boxed_recorder(Box::new(recorder))?; + metrics::set_global_recorder(recorder)?; Ok(handle) } @@ -207,16 +207,16 @@ impl Recorder for StatsRecorder { ) { } - fn register_counter(&self, key: &Key) -> metrics::Counter { + fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> metrics::Counter { self.registry .get_or_create_counter(key, |c| c.clone().into()) } - fn register_gauge(&self, key: &Key) -> metrics::Gauge { + fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> metrics::Gauge { self.registry.get_or_create_gauge(key, |c| c.clone().into()) } - fn register_histogram(&self, key: &Key) -> metrics::Histogram { + fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram { self.registry .get_or_create_histogram(key, |c| c.clone().into()) } diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 6425d04..e260374 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -13,7 +13,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.15.0", path = "../jobs-core" } +background-jobs-core = { version = "0.16.0", path = "../jobs-core" } bincode = "1.2" sled = "0.34" serde_cbor = "0.11"