From 73c7150f97cf0509e451379f5055fa245721910a Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 26 Jul 2023 18:11:44 -0500 Subject: [PATCH] Use spawner for CPU-bound operations --- src/extractors.rs | 35 +++++++++++++++++++++++-------- src/main.rs | 5 +++-- src/middleware/verifier.rs | 43 ++++++++++++++++++++++---------------- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/extractors.rs b/src/extractors.rs index f56b9af..0311d6b 100644 --- a/src/extractors.rs +++ b/src/extractors.rs @@ -1,6 +1,6 @@ use actix_web::{ dev::Payload, - error::{BlockingError, ParseError}, + error::ParseError, http::{ header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue}, StatusCode, @@ -10,11 +10,11 @@ use actix_web::{ }; use bcrypt::{BcryptError, DEFAULT_COST}; use futures_util::future::LocalBoxFuture; -use http_signature_normalization_actix::prelude::InvalidHeaderValue; +use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn}; use std::{convert::Infallible, str::FromStr, time::Instant}; use tracing_error::SpanTrace; -use crate::db::Db; +use crate::{db::Db, requests::Spawner}; #[derive(Clone)] pub(crate) struct AdminConfig { @@ -40,7 +40,7 @@ pub(crate) struct Admin { impl Admin { fn prepare_verify( req: &HttpRequest, - ) -> Result<(Data, Data, XApiToken), Error> { + ) -> Result<(Data, Data, Data, XApiToken), Error> { let hashed_api_token = req .app_data::>() .ok_or_else(Error::missing_config)? @@ -53,16 +53,23 @@ impl Admin { .ok_or_else(Error::missing_db)? .clone(); - Ok((db, hashed_api_token, x_api_token)) + let spawner = req + .app_data::>() + .ok_or_else(Error::missing_spawner)? + .clone(); + + Ok((db, hashed_api_token, spawner, x_api_token)) } #[tracing::instrument(level = "debug", skip_all)] async fn verify( hashed_api_token: Data, + spawner: Data, x_api_token: XApiToken, ) -> Result<(), Error> { let span = tracing::Span::current(); - if actix_web::web::block(move || span.in_scope(|| hashed_api_token.verify(x_api_token))) + if spawner + .spawn_blocking(move || span.in_scope(|| hashed_api_token.verify(x_api_token))) .await .map_err(Error::canceled)?? { @@ -107,6 +114,13 @@ impl Error { } } + fn missing_spawner() -> Self { + Error { + context: SpanTrace::capture().to_string(), + kind: ErrorKind::MissingSpawner, + } + } + fn bcrypt_verify(e: BcryptError) -> Self { Error { context: SpanTrace::capture().to_string(), @@ -128,7 +142,7 @@ impl Error { } } - fn canceled(_: BlockingError) -> Self { + fn canceled(_: Canceled) -> Self { Error { context: SpanTrace::capture().to_string(), kind: ErrorKind::Canceled, @@ -147,6 +161,9 @@ enum ErrorKind { #[error("Missing Db")] MissingDb, + #[error("Missing Spawner")] + MissingSpawner, + #[error("Panic in verify")] Canceled, @@ -182,8 +199,8 @@ impl FromRequest for Admin { let now = Instant::now(); let res = Self::prepare_verify(req); Box::pin(async move { - let (db, c, t) = res?; - Self::verify(c, t).await?; + let (db, c, s, t) = res?; + Self::verify(c, s, t).await?; metrics::histogram!( "relay.admin.verify", now.elapsed().as_micros() as f64 / 1_000_000_f64 diff --git a/src/main.rs b/src/main.rs index d012102..1e635e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -281,7 +281,8 @@ async fn do_server_main( .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(job_server)) .app_data(web::Data::new(media.clone())) - .app_data(web::Data::new(collector.clone())); + .app_data(web::Data::new(collector.clone())) + .app_data(web::Data::new(spawner.clone())); let app = if let Some(data) = config.admin_config() { app.app_data(data) @@ -299,7 +300,7 @@ async fn do_server_main( web::resource("/inbox") .wrap(config.digest_middleware()) .wrap(VerifySignature::new( - MyVerify(requests, actors.clone(), state.clone()), + MyVerify(requests, actors.clone(), state.clone(), spawner.clone()), Default::default(), )) .wrap(DebugPayload(config.debug())) diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index 6528b83..4845329 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -2,12 +2,11 @@ use crate::{ apub::AcceptedActors, data::{ActorCache, State}, error::{Error, ErrorKind}, - requests::Requests, + requests::{Requests, Spawner}, }; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; -use actix_web::web; use base64::{engine::general_purpose::STANDARD, Engine}; -use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; +use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm, Spawn}; use rsa::{ pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256, signature::Verifier, RsaPublicKey, @@ -15,7 +14,7 @@ use rsa::{ use std::{future::Future, pin::Pin}; #[derive(Clone, Debug)] -pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); +pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State, pub Spawner); impl MyVerify { #[tracing::instrument("Verify request", skip(self, signature, signing_string))] @@ -55,7 +54,13 @@ impl MyVerify { None => (), }; - let res = do_verify(&actor.public_key, signature.clone(), signing_string.clone()).await; + let res = do_verify( + &self.3, + &actor.public_key, + signature.clone(), + signing_string.clone(), + ) + .await; if let Err(e) = res { if !was_cached { @@ -85,7 +90,7 @@ impl MyVerify { // Now we make sure we fetch an updated actor let actor = self.1.get_no_cache(&actor_id, &self.0).await?; - do_verify(&actor.public_key, signature, signing_string).await?; + do_verify(&self.3, &actor.public_key, signature, signing_string).await?; Ok(true) } @@ -116,6 +121,7 @@ impl PublicKeyResponse { #[tracing::instrument("Verify signature")] async fn do_verify( + spawner: &Spawner, public_key: &str, signature: String, signing_string: String, @@ -123,21 +129,22 @@ async fn do_verify( let public_key = RsaPublicKey::from_public_key_pem(public_key.trim())?; let span = tracing::Span::current(); - web::block(move || { - span.in_scope(|| { - let decoded = STANDARD.decode(signature)?; - let signature = - Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?; + spawner + .spawn_blocking(move || { + span.in_scope(|| { + let decoded = STANDARD.decode(signature)?; + let signature = + Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?; - let verifying_key = VerifyingKey::::new(public_key); - verifying_key - .verify(signing_string.as_bytes(), &signature) - .map_err(ErrorKind::VerifySignature)?; + let verifying_key = VerifyingKey::::new(public_key); + verifying_key + .verify(signing_string.as_bytes(), &signature) + .map_err(ErrorKind::VerifySignature)?; - Ok(()) as Result<(), Error> + Ok(()) as Result<(), Error> + }) }) - }) - .await??; + .await??; Ok(()) }