Use spawner for CPU-bound operations

This commit is contained in:
asonix 2023-07-26 18:11:44 -05:00
parent 7cfebd927e
commit 73c7150f97
3 changed files with 54 additions and 29 deletions

View file

@ -1,6 +1,6 @@
use actix_web::{ use actix_web::{
dev::Payload, dev::Payload,
error::{BlockingError, ParseError}, error::ParseError,
http::{ http::{
header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue}, header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
StatusCode, StatusCode,
@ -10,11 +10,11 @@ use actix_web::{
}; };
use bcrypt::{BcryptError, DEFAULT_COST}; use bcrypt::{BcryptError, DEFAULT_COST};
use futures_util::future::LocalBoxFuture; use futures_util::future::LocalBoxFuture;
use http_signature_normalization_actix::prelude::InvalidHeaderValue; use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn};
use std::{convert::Infallible, str::FromStr, time::Instant}; use std::{convert::Infallible, str::FromStr, time::Instant};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
use crate::db::Db; use crate::{db::Db, requests::Spawner};
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AdminConfig { pub(crate) struct AdminConfig {
@ -40,7 +40,7 @@ pub(crate) struct Admin {
impl Admin { impl Admin {
fn prepare_verify( fn prepare_verify(
req: &HttpRequest, req: &HttpRequest,
) -> Result<(Data<Db>, Data<AdminConfig>, XApiToken), Error> { ) -> Result<(Data<Db>, Data<AdminConfig>, Data<Spawner>, XApiToken), Error> {
let hashed_api_token = req let hashed_api_token = req
.app_data::<Data<AdminConfig>>() .app_data::<Data<AdminConfig>>()
.ok_or_else(Error::missing_config)? .ok_or_else(Error::missing_config)?
@ -53,16 +53,23 @@ impl Admin {
.ok_or_else(Error::missing_db)? .ok_or_else(Error::missing_db)?
.clone(); .clone();
Ok((db, hashed_api_token, x_api_token)) let spawner = req
.app_data::<Data<Spawner>>()
.ok_or_else(Error::missing_spawner)?
.clone();
Ok((db, hashed_api_token, spawner, x_api_token))
} }
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn verify( async fn verify(
hashed_api_token: Data<AdminConfig>, hashed_api_token: Data<AdminConfig>,
spawner: Data<Spawner>,
x_api_token: XApiToken, x_api_token: XApiToken,
) -> Result<(), Error> { ) -> Result<(), Error> {
let span = tracing::Span::current(); let span = tracing::Span::current();
if actix_web::web::block(move || span.in_scope(|| hashed_api_token.verify(x_api_token))) if spawner
.spawn_blocking(move || span.in_scope(|| hashed_api_token.verify(x_api_token)))
.await .await
.map_err(Error::canceled)?? .map_err(Error::canceled)??
{ {
@ -107,6 +114,13 @@ impl Error {
} }
} }
fn missing_spawner() -> Self {
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingSpawner,
}
}
fn bcrypt_verify(e: BcryptError) -> Self { fn bcrypt_verify(e: BcryptError) -> Self {
Error { Error {
context: SpanTrace::capture().to_string(), context: SpanTrace::capture().to_string(),
@ -128,7 +142,7 @@ impl Error {
} }
} }
fn canceled(_: BlockingError) -> Self { fn canceled(_: Canceled) -> Self {
Error { Error {
context: SpanTrace::capture().to_string(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled, kind: ErrorKind::Canceled,
@ -147,6 +161,9 @@ enum ErrorKind {
#[error("Missing Db")] #[error("Missing Db")]
MissingDb, MissingDb,
#[error("Missing Spawner")]
MissingSpawner,
#[error("Panic in verify")] #[error("Panic in verify")]
Canceled, Canceled,
@ -182,8 +199,8 @@ impl FromRequest for Admin {
let now = Instant::now(); let now = Instant::now();
let res = Self::prepare_verify(req); let res = Self::prepare_verify(req);
Box::pin(async move { Box::pin(async move {
let (db, c, t) = res?; let (db, c, s, t) = res?;
Self::verify(c, t).await?; Self::verify(c, s, t).await?;
metrics::histogram!( metrics::histogram!(
"relay.admin.verify", "relay.admin.verify",
now.elapsed().as_micros() as f64 / 1_000_000_f64 now.elapsed().as_micros() as f64 / 1_000_000_f64

View file

@ -281,7 +281,8 @@ async fn do_server_main(
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(job_server)) .app_data(web::Data::new(job_server))
.app_data(web::Data::new(media.clone())) .app_data(web::Data::new(media.clone()))
.app_data(web::Data::new(collector.clone())); .app_data(web::Data::new(collector.clone()))
.app_data(web::Data::new(spawner.clone()));
let app = if let Some(data) = config.admin_config() { let app = if let Some(data) = config.admin_config() {
app.app_data(data) app.app_data(data)
@ -299,7 +300,7 @@ async fn do_server_main(
web::resource("/inbox") web::resource("/inbox")
.wrap(config.digest_middleware()) .wrap(config.digest_middleware())
.wrap(VerifySignature::new( .wrap(VerifySignature::new(
MyVerify(requests, actors.clone(), state.clone()), MyVerify(requests, actors.clone(), state.clone(), spawner.clone()),
Default::default(), Default::default(),
)) ))
.wrap(DebugPayload(config.debug())) .wrap(DebugPayload(config.debug()))

View file

@ -2,12 +2,11 @@ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
data::{ActorCache, State}, data::{ActorCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
requests::Requests, requests::{Requests, Spawner},
}; };
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use actix_web::web;
use base64::{engine::general_purpose::STANDARD, Engine}; use base64::{engine::general_purpose::STANDARD, Engine};
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm, Spawn};
use rsa::{ use rsa::{
pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256, pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256,
signature::Verifier, RsaPublicKey, signature::Verifier, RsaPublicKey,
@ -15,7 +14,7 @@ use rsa::{
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State, pub Spawner);
impl MyVerify { impl MyVerify {
#[tracing::instrument("Verify request", skip(self, signature, signing_string))] #[tracing::instrument("Verify request", skip(self, signature, signing_string))]
@ -55,7 +54,13 @@ impl MyVerify {
None => (), None => (),
}; };
let res = do_verify(&actor.public_key, signature.clone(), signing_string.clone()).await; let res = do_verify(
&self.3,
&actor.public_key,
signature.clone(),
signing_string.clone(),
)
.await;
if let Err(e) = res { if let Err(e) = res {
if !was_cached { if !was_cached {
@ -85,7 +90,7 @@ impl MyVerify {
// Now we make sure we fetch an updated actor // Now we make sure we fetch an updated actor
let actor = self.1.get_no_cache(&actor_id, &self.0).await?; let actor = self.1.get_no_cache(&actor_id, &self.0).await?;
do_verify(&actor.public_key, signature, signing_string).await?; do_verify(&self.3, &actor.public_key, signature, signing_string).await?;
Ok(true) Ok(true)
} }
@ -116,6 +121,7 @@ impl PublicKeyResponse {
#[tracing::instrument("Verify signature")] #[tracing::instrument("Verify signature")]
async fn do_verify( async fn do_verify(
spawner: &Spawner,
public_key: &str, public_key: &str,
signature: String, signature: String,
signing_string: String, signing_string: String,
@ -123,7 +129,8 @@ async fn do_verify(
let public_key = RsaPublicKey::from_public_key_pem(public_key.trim())?; let public_key = RsaPublicKey::from_public_key_pem(public_key.trim())?;
let span = tracing::Span::current(); let span = tracing::Span::current();
web::block(move || { spawner
.spawn_blocking(move || {
span.in_scope(|| { span.in_scope(|| {
let decoded = STANDARD.decode(signature)?; let decoded = STANDARD.decode(signature)?;
let signature = let signature =