mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2025-01-21 00:38:14 +00:00
jobs-metrics: Add Storage type to provide metrics for any storage backend
This commit is contained in:
parent
f73712c098
commit
3ab2bef826
2 changed files with 104 additions and 0 deletions
|
@ -12,5 +12,9 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1.24"
|
||||
background-jobs-core = { version = "0.16.0", path = "../jobs-core" }
|
||||
metrics = "0.22.0"
|
||||
metrics-util = "0.16.0"
|
||||
tracing = "0.1"
|
||||
uuid = { version = "1.6", features = ["serde", "v7"] }
|
||||
|
|
|
@ -53,3 +53,103 @@ pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
|
|||
pub fn build() -> (StatsRecorder, StatsHandle) {
|
||||
StatsRecorder::build()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// A wrapper for any Storage type adding metrics
|
||||
pub struct MetricsStorage<S>(S);
|
||||
|
||||
impl<S> MetricsStorage<S> {
|
||||
/// Add metrics to a provided Storage
|
||||
pub const fn wrap(storage: S) -> MetricsStorage<S>
|
||||
where
|
||||
S: background_jobs_core::Storage,
|
||||
{
|
||||
Self(storage)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<S> background_jobs_core::Storage for MetricsStorage<S>
|
||||
where
|
||||
S: background_jobs_core::Storage + Sync,
|
||||
{
|
||||
type Error = S::Error;
|
||||
|
||||
async fn info(
|
||||
&self,
|
||||
job_id: uuid::Uuid,
|
||||
) -> Result<Option<background_jobs_core::JobInfo>, Self::Error> {
|
||||
self.0.info(job_id).await
|
||||
}
|
||||
|
||||
async fn push(&self, job: background_jobs_core::NewJobInfo) -> Result<uuid::Uuid, Self::Error> {
|
||||
let queue = job.queue().to_string();
|
||||
let name = job.name().to_string();
|
||||
|
||||
let uuid = self.0.push(job).await?;
|
||||
|
||||
metrics::counter!("background-jobs.job.created", "queue" => queue, "name" => name)
|
||||
.increment(1);
|
||||
|
||||
Ok(uuid)
|
||||
}
|
||||
|
||||
async fn pop(
|
||||
&self,
|
||||
queue: &str,
|
||||
runner_id: uuid::Uuid,
|
||||
) -> Result<background_jobs_core::JobInfo, Self::Error> {
|
||||
let job_info = self.0.pop(queue, runner_id).await?;
|
||||
|
||||
metrics::counter!("background-jobs.job.started", "queue" => job_info.queue.clone(), "name" => job_info.name.clone()).increment(1);
|
||||
|
||||
Ok(job_info)
|
||||
}
|
||||
|
||||
async fn heartbeat(
|
||||
&self,
|
||||
job_id: uuid::Uuid,
|
||||
runner_id: uuid::Uuid,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.0.heartbeat(job_id, runner_id).await
|
||||
}
|
||||
|
||||
async fn complete(
|
||||
&self,
|
||||
return_job_info: background_jobs_core::ReturnJobInfo,
|
||||
) -> Result<bool, Self::Error> {
|
||||
let info = if let Some(info) = self.0.info(return_job_info.id).await? {
|
||||
Some(info)
|
||||
} else {
|
||||
tracing::warn!("Returned non-existant job");
|
||||
metrics::counter!("background-jobs.job.missing").increment(1);
|
||||
None
|
||||
};
|
||||
|
||||
let result = return_job_info.result;
|
||||
|
||||
let completed = self.0.complete(return_job_info).await?;
|
||||
|
||||
if let Some(info) = info {
|
||||
metrics::counter!("background-jobs.job.finished", "queue" => info.queue.clone(), "name" => info.name.clone()).increment(1);
|
||||
|
||||
match result {
|
||||
background_jobs_core::JobResult::Success => {
|
||||
metrics::counter!("background-jobs.job.completed", "queue" => info.queue, "name" => info.name).increment(1);
|
||||
}
|
||||
background_jobs_core::JobResult::Failure if completed => {
|
||||
metrics::counter!("background-jobs.job.dead", "queue" => info.queue, "name" => info.name).increment(1);
|
||||
}
|
||||
background_jobs_core::JobResult::Failure => {
|
||||
metrics::counter!("background-jobs.job.failed", "queue" => info.queue, "name" => info.name).increment(1);
|
||||
}
|
||||
background_jobs_core::JobResult::Unexecuted
|
||||
| background_jobs_core::JobResult::Unregistered => {
|
||||
metrics::counter!("background-jobs.job.returned", "queue" => info.queue, "name" => info.name).increment(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(completed)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue