Improve metrics, add job metrics

This commit is contained in:
asonix 2023-07-22 21:11:28 -05:00
parent 7dc8ede8eb
commit 77cdeab57e
6 changed files with 108 additions and 20 deletions

View file

@ -74,9 +74,11 @@ where
S: Store, S: Store,
{ {
fn drop(&mut self) { fn drop(&mut self) {
if self.identifier.is_some() || self.upload_id.is_some() { let any_items = self.identifier.is_some() || self.upload_id.is_some();
metrics::increment_counter!("pict-rs.background.upload.failure");
metrics::increment_counter!("pict-rs.background.upload", "completed" => (!any_items).to_string());
if any_items {
let cleanup_parent_span = let cleanup_parent_span =
tracing::info_span!(parent: None, "Dropped backgrounded cleanup"); tracing::info_span!(parent: None, "Dropped backgrounded cleanup");
cleanup_parent_span.follows_from(Span::current()); cleanup_parent_span.follows_from(Span::current());
@ -110,8 +112,6 @@ where
) )
}); });
} }
} else {
metrics::increment_counter!("pict-rs.background.upload.success");
} }
} }
} }

View file

@ -34,11 +34,7 @@ impl MetricsGuard {
impl Drop for MetricsGuard { impl Drop for MetricsGuard {
fn drop(&mut self) { fn drop(&mut self) {
metrics::histogram!("pict-rs.generate.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); metrics::histogram!("pict-rs.generate.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
if self.armed { metrics::increment_counter!("pict-rs.generate.end", "completed" => (!self.armed).to_string());
metrics::increment_counter!("pict-rs.generate.failure");
} else {
metrics::increment_counter!("pict-rs.generate.success");
}
} }
} }

View file

@ -224,9 +224,11 @@ where
S: Store, S: Store,
{ {
fn drop(&mut self) { fn drop(&mut self) {
if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { let any_items = self.hash.is_some() || self.alias.is_some() || self.identifier.is_some();
metrics::increment_counter!("pict-rs.ingest.failure");
metrics::increment_counter!("pict-rs.ingest.end", "completed" => (!any_items).to_string());
if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() {
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
cleanup_parent_span.follows_from(Span::current()); cleanup_parent_span.follows_from(Span::current());
@ -281,8 +283,6 @@ where
) )
}); });
} }
} else {
metrics::increment_counter!("pict-rs.ingest.success");
} }
} }
} }

View file

@ -22,7 +22,7 @@ struct MetricsGuard {
impl MetricsGuard { impl MetricsGuard {
fn guard(command: String) -> Self { fn guard(command: String) -> Self {
metrics::increment_counter!("pict-rs.process.spawn", "command" => command.clone()); metrics::increment_counter!("pict-rs.process.start", "command" => command.clone());
Self { Self {
start: Instant::now(), start: Instant::now(),
@ -45,11 +45,7 @@ impl Drop for MetricsGuard {
"completed" => (!self.armed).to_string(), "completed" => (!self.armed).to_string(),
); );
if self.armed { metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.clone());
metrics::increment_counter!("pict-rs.process.failure", "command" => self.command.clone());
} else {
metrics::increment_counter!("pict-rs.process.success", "command" => self.command.clone());
}
} }
} }

View file

@ -10,7 +10,7 @@ use crate::{
store::{Identifier, Store}, store::{Identifier, Store},
}; };
use base64::{prelude::BASE64_STANDARD, Engine}; use base64::{prelude::BASE64_STANDARD, Engine};
use std::{future::Future, path::PathBuf, pin::Pin}; use std::{future::Future, path::PathBuf, pin::Pin, time::Instant};
use tracing::Instrument; use tracing::Instrument;
mod cleanup; mod cleanup;
@ -235,6 +235,37 @@ async fn process_jobs<R, S, F>(
} }
} }
struct MetricsGuard {
worker_id: String,
queue: &'static str,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(worker_id: String, queue: &'static str) -> Self {
metrics::increment_counter!("pict-rs.job.start", "queue" => queue, "worker-id" => worker_id.clone());
Self {
worker_id,
queue,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.job.duration", self.start.elapsed().as_secs_f64(), "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string());
metrics::increment_counter!("pict-rs.job.end", "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string());
}
}
async fn job_loop<R, S, F>( async fn job_loop<R, S, F>(
repo: &R, repo: &R,
store: &S, store: &S,
@ -255,9 +286,13 @@ where
let span = tracing::info_span!("Running Job", worker_id = ?worker_id); let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
let guard = MetricsGuard::guard(worker_id.clone(), queue);
span.in_scope(|| (callback)(repo, store, config, bytes.as_ref())) span.in_scope(|| (callback)(repo, store, config, bytes.as_ref()))
.instrument(span) .instrument(span)
.await?; .await?;
guard.disarm();
} }
} }
@ -331,8 +366,12 @@ where
let span = tracing::info_span!("Running Job", worker_id = ?worker_id); let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
let guard = MetricsGuard::guard(worker_id.clone(), queue);
span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref())) span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref()))
.instrument(span) .instrument(span)
.await?; .await?;
guard.disarm();
} }
} }

View file

@ -19,6 +19,7 @@ use std::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, RwLock, Arc, RwLock,
}, },
time::Instant,
}; };
use tokio::{sync::Notify, task::JoinHandle}; use tokio::{sync::Notify, task::JoinHandle};
@ -468,6 +469,54 @@ impl From<InnerUploadResult> for UploadResult {
} }
} }
struct PushMetricsGuard {
queue: &'static str,
armed: bool,
}
struct PopMetricsGuard {
queue: &'static str,
start: Instant,
armed: bool,
}
impl PushMetricsGuard {
fn guard(queue: &'static str) -> Self {
Self { queue, armed: true }
}
fn disarm(mut self) {
self.armed = false;
}
}
impl PopMetricsGuard {
fn guard(queue: &'static str) -> Self {
Self {
queue,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for PushMetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
impl Drop for PopMetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue);
metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo { impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
@ -577,6 +626,8 @@ impl QueueRepo for SledRepo {
#[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))]
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> {
let metrics_guard = PushMetricsGuard::guard(queue_name);
let id = self.db.generate_id().map_err(SledError::from)?; let id = self.db.generate_id().map_err(SledError::from)?;
let mut key = queue_name.as_bytes().to_vec(); let mut key = queue_name.as_bytes().to_vec();
key.extend(id.to_be_bytes()); key.extend(id.to_be_bytes());
@ -585,6 +636,7 @@ impl QueueRepo for SledRepo {
if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) {
notifier.notify_one(); notifier.notify_one();
metrics_guard.disarm();
return Ok(()); return Ok(());
} }
@ -595,6 +647,8 @@ impl QueueRepo for SledRepo {
.or_insert_with(|| Arc::new(Notify::new())) .or_insert_with(|| Arc::new(Notify::new()))
.notify_one(); .notify_one();
metrics_guard.disarm();
Ok(()) Ok(())
} }
@ -604,6 +658,8 @@ impl QueueRepo for SledRepo {
queue_name: &'static str, queue_name: &'static str,
worker_id: Vec<u8>, worker_id: Vec<u8>,
) -> Result<Self::Bytes, RepoError> { ) -> Result<Self::Bytes, RepoError> {
let metrics_guard = PopMetricsGuard::guard(queue_name);
loop { loop {
let in_progress_queue = self.in_progress_queue.clone(); let in_progress_queue = self.in_progress_queue.clone();
@ -633,6 +689,7 @@ impl QueueRepo for SledRepo {
}); });
if let Some(job) = job { if let Some(job) = job {
metrics_guard.disarm();
return Ok(job); return Ok(job);
} }