Add more metrics around spawn-blocking

This commit is contained in:
asonix 2023-07-27 09:26:16 -05:00
parent 8071c6ce3f
commit 240eee730c

View file

@ -22,7 +22,7 @@ use std::{
Arc, Arc,
}, },
thread::JoinHandle, thread::JoinHandle,
time::{Duration, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use tracing_awc::Tracing; use tracing_awc::Tracing;
@ -234,7 +234,7 @@ impl Requests {
if let Ok(bytes) = res.body().await { if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() { if !s.is_empty() {
tracing::warn!("Response from {parsed_url}, {s}"); tracing::debug!("Response from {parsed_url}, {s}");
} }
} }
} }
@ -426,15 +426,22 @@ impl Signer {
fn signature_thread( fn signature_thread(
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>, receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
shutdown: flume::Receiver<()>, shutdown: flume::Receiver<()>,
id: usize,
) { ) {
let guard = MetricsGuard::guard(id);
let stopping = AtomicBool::new(false); let stopping = AtomicBool::new(false);
while !stopping.load(Ordering::Acquire) { while !stopping.load(Ordering::Acquire) {
flume::Selector::new() flume::Selector::new()
.recv(&receiver, |res| match res { .recv(&receiver, |res| match res {
Ok(f) => { Ok(f) => {
let start = Instant::now();
metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(move || { let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
(f)(); (f)();
})); }));
metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string());
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Signature fn panicked: {e:?}"); tracing::warn!("Signature fn panicked: {e:?}");
@ -445,12 +452,18 @@ fn signature_thread(
stopping.store(true, Ordering::Release) stopping.store(true, Ordering::Release)
} }
}) })
.recv(&shutdown, |_| { .recv(&shutdown, |res| {
if res.is_ok() {
tracing::warn!("Stopping"); tracing::warn!("Stopping");
} else {
tracing::warn!("Shutdown receive error, stopping");
}
stopping.store(true, Ordering::Release) stopping.store(true, Ordering::Release)
}) })
.wait(); .wait();
} }
guard.disarm();
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -460,6 +473,35 @@ pub(crate) struct Spawner {
shutdown: flume::Sender<()>, shutdown: flume::Sender<()>,
} }
struct MetricsGuard {
id: usize,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(id: usize) -> Self {
metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string());
Self {
id,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
}
}
impl Spawner { impl Spawner {
pub(crate) fn build(threads: usize) -> std::io::Result<Self> { pub(crate) fn build(threads: usize) -> std::io::Result<Self> {
let (sender, receiver) = flume::bounded(8); let (sender, receiver) = flume::bounded(8);
@ -474,7 +516,7 @@ impl Spawner {
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("signature-thread-{i}")) .name(format!("signature-thread-{i}"))
.spawn(move || { .spawn(move || {
signature_thread(receiver, shutdown_rx); signature_thread(receiver, shutdown_rx, i);
}) })
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
@ -501,6 +543,37 @@ impl Drop for Spawner {
} }
} }
async fn timer<Fut>(fut: Fut) -> Fut::Output
where
Fut: std::future::Future,
{
let id = uuid::Uuid::new_v4();
metrics::increment_counter!("relay.spawner.wait-timer.start");
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
// pass the first tick (instant)
interval.tick().await;
let mut fut = std::pin::pin!(fut);
let mut counter = 0;
loop {
tokio::select! {
out = &mut fut => {
metrics::increment_counter!("relay.spawner.wait-timer.end");
return out;
}
_ = interval.tick() => {
counter += 1;
metrics::increment_counter!("relay.spawner.wait-timer.pending");
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
}
}
}
}
impl Spawn for Spawner { impl Spawn for Spawner {
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>; type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
@ -518,11 +591,12 @@ impl Spawn for Spawner {
.send_async(Box::new(move || { .send_async(Box::new(move || {
if tx.send((func)()).is_err() { if tx.send((func)()).is_err() {
tracing::warn!("Requestor hung up"); tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
} }
})) }))
.await; .await;
rx.recv_async().await.map_err(|_| Canceled) timer(rx.recv_async()).await.map_err(|_| Canceled)
}) })
} }
} }