From d97cc4e5a4a1a305bba383f19e859217026911f4 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 26 Jul 2023 18:03:21 -0500 Subject: [PATCH] Use custom threadpool for client signatures --- Cargo.lock | 58 +++++++++++++++++----- Cargo.toml | 3 +- src/data/state.rs | 3 +- src/error.rs | 6 +++ src/jobs.rs | 7 ++- src/main.rs | 15 ++++-- src/requests.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 189 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e898b8a..616689f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -409,6 +409,7 @@ dependencies = [ "console-subscriber", "dashmap", "dotenv", + "flume", "futures-util", "http-signature-normalization-actix", "lru", @@ -1165,6 +1166,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1315,8 +1329,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1444,9 +1460,9 @@ dependencies = [ [[package]] name = "http-signature-normalization-actix" -version = "0.8.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dc95d9ca3b4e2f93a97e5ccf9f26992c69a272e0abad8807180f0a9e9b59e31" +checksum = "218124b6b0c6ef27526493f50faf00b7cf8a3840bb1d5268f6ee8eef753b8225" dependencies = [ "actix-http", "actix-rt", @@ -1677,7 +1693,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" dependencies = [ - "spin", + "spin 0.5.2", ] [[package]] @@ -1943,6 +1959,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "never" version = "0.1.0" @@ -2685,7 +2710,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2848,9 +2873,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.1" +version = "0.101.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" dependencies = [ "ring", "untrusted", @@ -2892,18 +2917,18 @@ checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "serde" -version = "1.0.175" +version = "1.0.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d25439cd7397d044e2748a6fe2432b5e85db703d6d097bd014b3c0ad1ebff0b" +checksum = "76dc28c9523c5d70816e393136b86d48909cfb27cecaa902d338c19ed47164dc" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.175" +version = "1.0.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4" +checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f" dependencies = [ "proc-macro2", "quote", @@ -2912,9 +2937,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" +checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" dependencies = [ "itoa", "ryu", @@ -3063,6 +3088,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.2" diff --git a/Cargo.toml b/Cargo.toml index 4aaeb51..9b965a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ config = "0.13.0" console-subscriber = { version = "0.1", optional = true } dashmap = "5.1.0" dotenv = "0.15.0" +flume = "0.10.14" futures-util = "0.3.17" lru = "0.11.0" metrics = "0.21.0" @@ -88,7 +89,7 @@ default-features = false features = ["background-jobs-actix", "error-logging"] [dependencies.http-signature-normalization-actix] -version = "0.8.0" +version = "0.9.1" default-features = false features = ["client", "server", "sha-2"] diff --git a/src/data/state.rs b/src/data/state.rs index e673668..c095efc 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -40,7 +40,7 @@ impl State { self.node_cache.clone() } - pub(crate) fn requests(&self, config: &Config) -> Requests { + pub(crate) fn requests(&self, config: &Config, spawner: crate::requests::Spawner) -> Requests { Requests::new( config.generate_url(UrlKind::MainKey).to_string(), self.private_key.clone(), @@ -49,6 +49,7 @@ impl State { self.last_online.clone(), config.client_pool_size(), config.client_timeout(), + spawner, ) } diff --git a/src/error.rs b/src/error.rs index c72b8b5..d03c995 100644 --- a/src/error.rs +++ b/src/error.rs @@ -242,3 +242,9 @@ impl From for ErrorKind { ErrorKind::Rsa(e) } } + +impl From for ErrorKind { + fn from(_: http_signature_normalization_actix::Canceled) -> Self { + Self::Canceled + } +} diff --git a/src/jobs.rs b/src/jobs.rs index ceae891..b32c1d2 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -17,7 +17,7 @@ use crate::{ data::{ActorCache, MediaCache, NodeCache, State}, error::{Error, ErrorKind}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, - requests::Requests, + requests::{Requests, Spawner}, }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, @@ -44,6 +44,7 @@ pub(crate) fn create_workers( actors: ActorCache, media: MediaCache, config: Config, + spawner: Spawner, ) -> JobServer { let deliver_concurrency = config.deliver_concurrency(); @@ -54,6 +55,7 @@ pub(crate) fn create_workers( JobServer::new(queue_handle), media.clone(), config.clone(), + spawner.clone(), ) }) .register::() @@ -110,9 +112,10 @@ impl JobState { job_server: JobServer, media: MediaCache, config: Config, + spawner: Spawner, ) -> Self { JobState { - requests: state.requests(&config), + requests: state.requests(&config, spawner), node_cache: state.node_cache(), actors, config, diff --git a/src/main.rs b/src/main.rs index e837c4d..d012102 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,6 +33,8 @@ mod requests; mod routes; mod telegram; +use crate::requests::Spawner; + use self::{ args::Args, config::Config, @@ -257,12 +259,19 @@ async fn do_server_main( let keys = config.open_keys()?; + let spawner = Spawner::build()?; + let bind_address = config.bind_address(); let server = HttpServer::new(move || { - let requests = state.requests(&config); + let requests = state.requests(&config, spawner.clone()); - let job_server = - create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); + let job_server = create_workers( + state.clone(), + actors.clone(), + media.clone(), + config.clone(), + spawner.clone(), + ); let app = App::new() .app_data(web::Data::new(db.clone())) diff --git a/src/requests.rs b/src/requests.rs index f7d2169..ba95b53 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -7,7 +7,7 @@ use actix_web::http::header::Date; use awc::{error::SendRequestError, Client, ClientResponse, Connector}; use base64::{engine::general_purpose::STANDARD, Engine}; use dashmap::DashMap; -use http_signature_normalization_actix::prelude::*; +use http_signature_normalization_actix::{prelude::*, Canceled, Spawn}; use rand::thread_rng; use rsa::{ pkcs1v15::SigningKey, @@ -16,7 +16,12 @@ use rsa::{ RsaPrivateKey, }; use std::{ - sync::Arc, + panic::AssertUnwindSafe, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::JoinHandle, time::{Duration, SystemTime}, }; use tracing_awc::Tracing; @@ -145,7 +150,7 @@ pub(crate) struct Requests { key_id: String, user_agent: String, private_key: RsaPrivateKey, - config: Config, + config: Config, breakers: Breakers, last_online: Arc, } @@ -192,6 +197,7 @@ impl Requests { last_online: Arc, pool_size: usize, timeout_seconds: u64, + spawner: Spawner, ) -> Self { Requests { pool_size, @@ -199,7 +205,7 @@ impl Requests { key_id, user_agent, private_key, - config: Config::default().mastodon_compat(), + config: Config::new().mastodon_compat().spawner(spawner), breakers, last_online, } @@ -415,3 +421,109 @@ impl Signer { Ok(STANDARD.encode(signature.to_bytes().as_ref())) } } + +fn signature_thread( + receiver: flume::Receiver>, + shutdown: flume::Receiver<()>, +) { + let stopping = AtomicBool::new(false); + while !stopping.load(Ordering::Acquire) { + flume::Selector::new() + .recv(&receiver, |res| match res { + Ok(f) => { + let res = std::panic::catch_unwind(AssertUnwindSafe(move || { + (f)(); + })); + + if let Err(e) = res { + tracing::warn!("Signature fn panicked: {e:?}"); + } + } + Err(_) => { + tracing::warn!("Receive error, stopping"); + stopping.store(true, Ordering::Release) + } + }) + .recv(&shutdown, |_| { + tracing::warn!("Stopping"); + stopping.store(true, Ordering::Release) + }) + .wait(); + } +} + +#[derive(Clone, Debug)] +pub(crate) struct Spawner { + sender: flume::Sender>, + threads: Option>>>, + shutdown: flume::Sender<()>, +} + +impl Spawner { + pub(crate) fn build() -> std::io::Result { + let threads = std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1); + + let (sender, receiver) = flume::bounded(8); + let (shutdown, shutdown_rx) = flume::bounded(threads); + + let threads = (0..threads) + .map(|i| { + let receiver = receiver.clone(); + let shutdown_rx = shutdown_rx.clone(); + std::thread::Builder::new() + .name(format!("signature-thread-{i}")) + .spawn(move || { + signature_thread(receiver, shutdown_rx); + }) + }) + .collect::, _>>()?; + + Ok(Spawner { + sender, + threads: Some(Arc::new(threads)), + shutdown, + }) + } +} + +impl Drop for Spawner { + fn drop(&mut self) { + if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { + for _ in &threads { + let _ = self.shutdown.send(()); + } + + for thread in threads { + let _ = thread.join(); + } + } + } +} + +impl Spawn for Spawner { + type Future = std::pin::Pin>>>; + + fn spawn_blocking(&self, func: Func) -> Self::Future + where + Func: FnOnce() -> Out + Send + 'static, + Out: Send + 'static, + { + let sender = self.sender.clone(); + + Box::pin(async move { + let (tx, rx) = flume::bounded(1); + + let _ = sender + .send_async(Box::new(move || { + if tx.send((func)()).is_err() { + tracing::warn!("Requestor hung up"); + } + })) + .await; + + rx.recv_async().await.map_err(|_| Canceled) + }) + } +}