From 240eee730c8ffedb6ef52c72ae14c353de7d8520 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 27 Jul 2023 09:26:16 -0500 Subject: [PATCH] Add more metrics around spawn-blocking --- src/requests.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/src/requests.rs b/src/requests.rs index 8883dba..6dfe6f7 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -22,7 +22,7 @@ use std::{ Arc, }, thread::JoinHandle, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use tracing_awc::Tracing; @@ -234,7 +234,7 @@ impl Requests { if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if !s.is_empty() { - tracing::warn!("Response from {parsed_url}, {s}"); + tracing::debug!("Response from {parsed_url}, {s}"); } } } @@ -426,15 +426,22 @@ impl Signer { fn signature_thread( receiver: flume::Receiver>, shutdown: flume::Receiver<()>, + id: usize, ) { + let guard = MetricsGuard::guard(id); let stopping = AtomicBool::new(false); + while !stopping.load(Ordering::Acquire) { flume::Selector::new() .recv(&receiver, |res| match res { Ok(f) => { + let start = Instant::now(); + metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string()); let res = std::panic::catch_unwind(AssertUnwindSafe(move || { (f)(); })); + metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string()); + metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string()); if let Err(e) = res { tracing::warn!("Signature fn panicked: {e:?}"); @@ -445,12 +452,18 @@ fn signature_thread( stopping.store(true, Ordering::Release) } }) - .recv(&shutdown, |_| { - tracing::warn!("Stopping"); + .recv(&shutdown, |res| { + if res.is_ok() { + tracing::warn!("Stopping"); + } else { + tracing::warn!("Shutdown receive error, stopping"); + } stopping.store(true, Ordering::Release) }) .wait(); } + + guard.disarm(); } #[derive(Clone, Debug)] @@ -460,6 +473,35 @@ pub(crate) struct Spawner { shutdown: flume::Sender<()>, } +struct MetricsGuard { + id: usize, + start: Instant, + armed: bool, +} + +impl MetricsGuard { + fn guard(id: usize) -> Self { + metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string()); + + Self { + id, + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + } +} + impl Spawner { pub(crate) fn build(threads: usize) -> std::io::Result { let (sender, receiver) = flume::bounded(8); @@ -474,7 +516,7 @@ impl Spawner { std::thread::Builder::new() .name(format!("signature-thread-{i}")) .spawn(move || { - signature_thread(receiver, shutdown_rx); + signature_thread(receiver, shutdown_rx, i); }) }) .collect::, _>>()?; @@ -501,6 +543,37 @@ impl Drop for Spawner { } } +async fn timer(fut: Fut) -> Fut::Output +where + Fut: std::future::Future, +{ + let id = uuid::Uuid::new_v4(); + + metrics::increment_counter!("relay.spawner.wait-timer.start"); + + let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + + // pass the first tick (instant) + interval.tick().await; + + let mut fut = std::pin::pin!(fut); + + let mut counter = 0; + loop { + tokio::select! { + out = &mut fut => { + metrics::increment_counter!("relay.spawner.wait-timer.end"); + return out; + } + _ = interval.tick() => { + counter += 1; + metrics::increment_counter!("relay.spawner.wait-timer.pending"); + tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5); + } + } + } +} + impl Spawn for Spawner { type Future = std::pin::Pin>>>; @@ -518,11 +591,12 @@ impl Spawn for Spawner { .send_async(Box::new(move || { if tx.send((func)()).is_err() { tracing::warn!("Requestor hung up"); + metrics::increment_counter!("relay.spawner.disconnected"); } })) .await; - rx.recv_async().await.map_err(|_| Canceled) + timer(rx.recv_async()).await.map_err(|_| Canceled) }) } }