From f24685e700bbabfc166fbcdf5c98eac76c0f9515 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 27 Jul 2023 10:53:01 -0500 Subject: [PATCH] Allow naming spawner threads --- src/main.rs | 4 ++-- src/spawner.rs | 39 ++++++++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0133d86..9cd1fd4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -259,8 +259,8 @@ async fn do_server_main( let keys = config.open_keys()?; - let spawner = Spawner::build(config.signature_threads())?; - let verify_spawner = Spawner::build((config.signature_threads() / 2).max(1))?; + let spawner = Spawner::build("signature-thread", config.signature_threads())?; + let verify_spawner = Spawner::build("verify-thread", (config.signature_threads() / 8).max(1))?; let bind_address = config.bind_address(); let server = HttpServer::new(move || { diff --git a/src/spawner.rs b/src/spawner.rs index 5b765cb..a6c4c87 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -6,18 +6,22 @@ use std::{ time::{Duration, Instant}, }; -fn signature_thread(receiver: flume::Receiver>, id: usize) { - let guard = MetricsGuard::guard(id); +fn spawner_thread( + receiver: flume::Receiver>, + name: &'static str, + id: usize, +) { + let guard = MetricsGuard::guard(name, id); while let Ok(f) = receiver.recv() { let start = Instant::now(); - metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string()); + metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string()); let res = std::panic::catch_unwind(AssertUnwindSafe(f)); - metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string()); - metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string()); + metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string()); + metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string()); if let Err(e) = res { - tracing::warn!("Signature fn panicked: {e:?}"); + tracing::warn!("{name} fn panicked: {e:?}"); } } @@ -26,21 +30,24 @@ fn signature_thread(receiver: flume::Receiver>, id: usi #[derive(Clone, Debug)] pub(crate) struct Spawner { + name: &'static str, sender: Option>>, threads: Option>>>, } struct MetricsGuard { + name: &'static str, id: usize, start: Instant, armed: bool, } impl MetricsGuard { - fn guard(id: usize) -> Self { - metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string()); + fn guard(name: &'static str, id: usize) -> Self { + metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string()); Self { + name, id, start: Instant::now(), armed: true, @@ -54,30 +61,31 @@ impl MetricsGuard { impl Drop for MetricsGuard { fn drop(&mut self) { - metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - tracing::warn!("Stopping signature thread"); + metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + tracing::warn!("Stopping {} - {}", self.name, self.id); } } impl Spawner { - pub(crate) fn build(threads: usize) -> std::io::Result { + pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result { let (sender, receiver) = flume::bounded(8); - tracing::warn!("Launching {threads} signature threads"); + tracing::warn!("Launching {threads} {name}s"); let threads = (0..threads) .map(|i| { let receiver = receiver.clone(); std::thread::Builder::new() - .name(format!("signature-thread-{i}")) + .name(format!("{name}-{i}")) .spawn(move || { - signature_thread(receiver, i); + spawner_thread(receiver, name, i); }) }) .collect::, _>>()?; Ok(Spawner { + name, sender: Some(sender), threads: Some(Arc::new(threads)), }) @@ -89,6 +97,7 @@ impl Drop for Spawner { self.sender.take(); if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { + tracing::warn!("Joining {}s", self.name); for thread in threads { let _ = thread.join(); }