From 769f7451f984c2f4f4de888d0529b4de64249ee5 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 27 Jul 2023 10:19:20 -0500 Subject: [PATCH] Simplify signature thread --- src/data/state.rs | 3 +- src/extractors.rs | 2 +- src/jobs.rs | 3 +- src/main.rs | 4 +- src/middleware/verifier.rs | 3 +- src/requests.rs | 190 +------------------------------------ src/spawner.rs | 157 ++++++++++++++++++++++++++++++ 7 files changed, 170 insertions(+), 192 deletions(-) create mode 100644 src/spawner.rs diff --git a/src/data/state.rs b/src/data/state.rs index c095efc..7dcc0dd 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -4,6 +4,7 @@ use crate::{ db::Db, error::Error, requests::{Breakers, Requests}, + spawner::Spawner, }; use activitystreams::iri_string::types::IriString; use actix_web::web; @@ -40,7 +41,7 @@ impl State { self.node_cache.clone() } - pub(crate) fn requests(&self, config: &Config, spawner: crate::requests::Spawner) -> Requests { + pub(crate) fn requests(&self, config: &Config, spawner: Spawner) -> Requests { Requests::new( config.generate_url(UrlKind::MainKey).to_string(), self.private_key.clone(), diff --git a/src/extractors.rs b/src/extractors.rs index 1fddee7..2301ee4 100644 --- a/src/extractors.rs +++ b/src/extractors.rs @@ -14,7 +14,7 @@ use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, use std::{convert::Infallible, str::FromStr, time::Instant}; use tracing_error::SpanTrace; -use crate::{db::Db, requests::Spawner}; +use crate::{db::Db, spawner::Spawner}; #[derive(Clone)] pub(crate) struct AdminConfig { diff --git a/src/jobs.rs b/src/jobs.rs index b32c1d2..7635b3c 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -17,7 +17,8 @@ use crate::{ data::{ActorCache, MediaCache, NodeCache, State}, error::{Error, ErrorKind}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, - requests::{Requests, Spawner}, + requests::Requests, + spawner::Spawner, }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, diff --git a/src/main.rs b/src/main.rs index 86aac68..0133d86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,10 +31,9 @@ mod jobs; mod middleware; mod requests; mod routes; +mod spawner; mod telegram; -use crate::requests::Spawner; - use self::{ args::Args, config::Config, @@ -43,6 +42,7 @@ use self::{ jobs::create_workers, middleware::{DebugPayload, MyVerify, RelayResolver, Timings}, routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics}, + spawner::Spawner, }; fn init_subscriber( diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index 4845329..f5cfad7 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -2,7 +2,8 @@ use crate::{ apub::AcceptedActors, data::{ActorCache, State}, error::{Error, ErrorKind}, - requests::{Requests, Spawner}, + requests::Requests, + spawner::Spawner, }; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use base64::{engine::general_purpose::STANDARD, Engine}; diff --git a/src/requests.rs b/src/requests.rs index 6dfe6f7..f1cd1bc 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,13 +1,14 @@ use crate::{ data::LastOnline, error::{Error, ErrorKind}, + spawner::Spawner, }; use activitystreams::iri_string::types::IriString; use actix_web::http::header::Date; use awc::{error::SendRequestError, Client, ClientResponse, Connector}; use base64::{engine::general_purpose::STANDARD, Engine}; use dashmap::DashMap; -use http_signature_normalization_actix::{prelude::*, Canceled, Spawn}; +use http_signature_normalization_actix::prelude::*; use rand::thread_rng; use rsa::{ pkcs1v15::SigningKey, @@ -16,13 +17,8 @@ use rsa::{ RsaPrivateKey, }; use std::{ - panic::AssertUnwindSafe, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::JoinHandle, - time::{Duration, Instant, SystemTime}, + sync::Arc, + time::{Duration, SystemTime}, }; use tracing_awc::Tracing; @@ -422,181 +418,3 @@ impl Signer { Ok(STANDARD.encode(signature.to_bytes().as_ref())) } } - -fn signature_thread( - receiver: flume::Receiver>, - shutdown: flume::Receiver<()>, - id: usize, -) { - let guard = MetricsGuard::guard(id); - let stopping = AtomicBool::new(false); - - while !stopping.load(Ordering::Acquire) { - flume::Selector::new() - .recv(&receiver, |res| match res { - Ok(f) => { - let start = Instant::now(); - metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string()); - let res = std::panic::catch_unwind(AssertUnwindSafe(move || { - (f)(); - })); - metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string()); - metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string()); - - if let Err(e) = res { - tracing::warn!("Signature fn panicked: {e:?}"); - } - } - Err(_) => { - tracing::warn!("Receive error, stopping"); - stopping.store(true, Ordering::Release) - } - }) - .recv(&shutdown, |res| { - if res.is_ok() { - tracing::warn!("Stopping"); - } else { - tracing::warn!("Shutdown receive error, stopping"); - } - stopping.store(true, Ordering::Release) - }) - .wait(); - } - - guard.disarm(); -} - -#[derive(Clone, Debug)] -pub(crate) struct Spawner { - sender: flume::Sender>, - threads: Option>>>, - shutdown: flume::Sender<()>, -} - -struct MetricsGuard { - id: usize, - start: Instant, - armed: bool, -} - -impl MetricsGuard { - fn guard(id: usize) -> Self { - metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string()); - - Self { - id, - start: Instant::now(), - armed: true, - } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl Drop for MetricsGuard { - fn drop(&mut self) { - metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - } -} - -impl Spawner { - pub(crate) fn build(threads: usize) -> std::io::Result { - let (sender, receiver) = flume::bounded(8); - let (shutdown, shutdown_rx) = flume::bounded(threads); - - tracing::warn!("Launching {threads} signature threads"); - - let threads = (0..threads) - .map(|i| { - let receiver = receiver.clone(); - let shutdown_rx = shutdown_rx.clone(); - std::thread::Builder::new() - .name(format!("signature-thread-{i}")) - .spawn(move || { - signature_thread(receiver, shutdown_rx, i); - }) - }) - .collect::, _>>()?; - - Ok(Spawner { - sender, - threads: Some(Arc::new(threads)), - shutdown, - }) - } -} - -impl Drop for Spawner { - fn drop(&mut self) { - if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { - for _ in &threads { - let _ = self.shutdown.send(()); - } - - for thread in threads { - let _ = thread.join(); - } - } - } -} - -async fn timer(fut: Fut) -> Fut::Output -where - Fut: std::future::Future, -{ - let id = uuid::Uuid::new_v4(); - - metrics::increment_counter!("relay.spawner.wait-timer.start"); - - let mut interval = actix_rt::time::interval(Duration::from_secs(5)); - - // pass the first tick (instant) - interval.tick().await; - - let mut fut = std::pin::pin!(fut); - - let mut counter = 0; - loop { - tokio::select! { - out = &mut fut => { - metrics::increment_counter!("relay.spawner.wait-timer.end"); - return out; - } - _ = interval.tick() => { - counter += 1; - metrics::increment_counter!("relay.spawner.wait-timer.pending"); - tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5); - } - } - } -} - -impl Spawn for Spawner { - type Future = std::pin::Pin>>>; - - fn spawn_blocking(&self, func: Func) -> Self::Future - where - Func: FnOnce() -> Out + Send + 'static, - Out: Send + 'static, - { - let sender = self.sender.clone(); - - Box::pin(async move { - let (tx, rx) = flume::bounded(1); - - let _ = sender - .send_async(Box::new(move || { - if tx.send((func)()).is_err() { - tracing::warn!("Requestor hung up"); - metrics::increment_counter!("relay.spawner.disconnected"); - } - })) - .await; - - timer(rx.recv_async()).await.map_err(|_| Canceled) - }) - } -} diff --git a/src/spawner.rs b/src/spawner.rs new file mode 100644 index 0000000..a0aaa5c --- /dev/null +++ b/src/spawner.rs @@ -0,0 +1,157 @@ +use http_signature_normalization_actix::{Canceled, Spawn}; +use std::{ + panic::AssertUnwindSafe, + sync::Arc, + thread::JoinHandle, + time::{Duration, Instant}, +}; + +fn signature_thread(receiver: flume::Receiver>, id: usize) { + let guard = MetricsGuard::guard(id); + + while let Ok(f) = receiver.recv() { + let start = Instant::now(); + metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string()); + let res = std::panic::catch_unwind(AssertUnwindSafe(move || { + (f)(); + })); + metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string()); + metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string()); + + if let Err(e) = res { + tracing::warn!("Signature fn panicked: {e:?}"); + } + } + + guard.disarm(); +} + +#[derive(Clone, Debug)] +pub(crate) struct Spawner { + sender: Option>>, + threads: Option>>>, +} + +struct MetricsGuard { + id: usize, + start: Instant, + armed: bool, +} + +impl MetricsGuard { + fn guard(id: usize) -> Self { + metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string()); + + Self { + id, + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); + tracing::warn!("Stopping signature thread"); + } +} + +impl Spawner { + pub(crate) fn build(threads: usize) -> std::io::Result { + let (sender, receiver) = flume::bounded(8); + + tracing::warn!("Launching {threads} signature threads"); + + let threads = (0..threads) + .map(|i| { + let receiver = receiver.clone(); + std::thread::Builder::new() + .name(format!("signature-thread-{i}")) + .spawn(move || { + signature_thread(receiver, i); + }) + }) + .collect::, _>>()?; + + Ok(Spawner { + sender: Some(sender), + threads: Some(Arc::new(threads)), + }) + } +} + +impl Drop for Spawner { + fn drop(&mut self) { + self.sender.take(); + + if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { + for thread in threads { + let _ = thread.join(); + } + } + } +} + +async fn timer(fut: Fut) -> Fut::Output +where + Fut: std::future::Future, +{ + let id = uuid::Uuid::new_v4(); + + metrics::increment_counter!("relay.spawner.wait-timer.start"); + + let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + + // pass the first tick (instant) + interval.tick().await; + + let mut fut = std::pin::pin!(fut); + + let mut counter = 0; + loop { + tokio::select! { + out = &mut fut => { + metrics::increment_counter!("relay.spawner.wait-timer.end"); + return out; + } + _ = interval.tick() => { + counter += 1; + metrics::increment_counter!("relay.spawner.wait-timer.pending"); + tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5); + } + } + } +} + +impl Spawn for Spawner { + type Future = std::pin::Pin>>>; + + fn spawn_blocking(&self, func: Func) -> Self::Future + where + Func: FnOnce() -> Out + Send + 'static, + Out: Send + 'static, + { + let sender = self.sender.as_ref().expect("Sender exists").clone(); + + Box::pin(async move { + let (tx, rx) = flume::bounded(1); + + let _ = sender + .send_async(Box::new(move || { + if tx.try_send((func)()).is_err() { + tracing::warn!("Requestor hung up"); + metrics::increment_counter!("relay.spawner.disconnected"); + } + })) + .await; + + timer(rx.recv_async()).await.map_err(|_| Canceled) + }) + } +}