Add configurable blocking spawner for client

This commit is contained in:
asonix 2023-07-26 17:17:21 -05:00
parent 40ace55619
commit cb198f6922
5 changed files with 157 additions and 37 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "http-signature-normalization-actix" name = "http-signature-normalization-actix"
description = "An HTTP Signatures library that leaves the signing to you" description = "An HTTP Signatures library that leaves the signing to you"
version = "0.8.0" version = "0.9.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"

View file

@ -26,7 +26,7 @@ pub trait DigestName {
#[cfg(feature = "client")] #[cfg(feature = "client")]
mod client { mod client {
use crate::{Config, PrepareSignError, Sign}; use crate::{Config, PrepareSignError, Sign, Spawn};
use actix_http::header::InvalidHeaderValue; use actix_http::header::InvalidHeaderValue;
use actix_rt::task::JoinError; use actix_rt::task::JoinError;
use awc::{ClientRequest, SendClientRequest}; use awc::{ClientRequest, SendClientRequest};
@ -47,9 +47,9 @@ mod client {
/// a malicious entity /// a malicious entity
pub trait SignExt: Sign { pub trait SignExt: Sign {
/// Set the Digest and Authorization headers on the request /// Set the Digest and Authorization headers on the request
fn authorization_signature_with_digest<F, E, K, D, V>( fn authorization_signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
digest: D, digest: D,
v: V, v: V,
@ -59,19 +59,21 @@ mod client {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Send + 'static, V: AsRef<[u8]> + Send + 'static,
Self: Sized; Self: Sized;
/// Set the Digest and Signature headers on the request /// Set the Digest and Signature headers on the request
fn signature_with_digest<F, E, K, D, V>( fn signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
digest: D, digest: D,
v: V, v: V,
@ -81,11 +83,13 @@ mod client {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Send + 'static, V: AsRef<[u8]> + Send + 'static,
Self: Sized; Self: Sized;

View file

@ -5,13 +5,13 @@ use std::{fmt::Display, future::Future, pin::Pin};
use crate::{ use crate::{
digest::{DigestClient, DigestCreate, SignExt}, digest::{DigestClient, DigestCreate, SignExt},
Config, PrepareSignError, Sign, Config, PrepareSignError, Sign, Spawn,
}; };
impl SignExt for ClientRequest { impl SignExt for ClientRequest {
fn authorization_signature_with_digest<F, E, K, D, V>( fn authorization_signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -21,21 +21,25 @@ impl SignExt for ClientRequest {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Send + 'static, V: AsRef<[u8]> + Send + 'static,
Self: Sized, Self: Sized,
{ {
Box::pin(async move { Box::pin(async move {
let (d, v) = actix_rt::task::spawn_blocking(move || { let (d, v) = config
let d = digest.compute(v.as_ref()); .spawner
Ok((d, v)) as Result<(String, V), E> .spawn_blocking(move || {
}) let d = digest.compute(v.as_ref());
.await??; Ok((d, v)) as Result<(String, V), E>
})
.await??;
let c = self let c = self
.insert_header(("Digest", format!("{}={}", D::NAME, d))) .insert_header(("Digest", format!("{}={}", D::NAME, d)))
@ -46,9 +50,9 @@ impl SignExt for ClientRequest {
}) })
} }
fn signature_with_digest<F, E, K, D, V>( fn signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -58,21 +62,25 @@ impl SignExt for ClientRequest {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Send + 'static, V: AsRef<[u8]> + Send + 'static,
Self: Sized, Self: Sized,
{ {
Box::pin(async move { Box::pin(async move {
let (d, v) = actix_rt::task::spawn_blocking(move || { let (d, v) = config
let d = digest.compute(v.as_ref()); .spawner
Ok((d, v)) as Result<(String, V), E> .spawn_blocking(move || {
}) let d = digest.compute(v.as_ref());
.await??; Ok((d, v)) as Result<(String, V), E>
})
.await??;
let c = self let c = self
.insert_header(("Digest", format!("{}={}", D::NAME, d))) .insert_header(("Digest", format!("{}={}", D::NAME, d)))

View file

@ -257,7 +257,7 @@ pub mod verify {
} }
#[cfg(feature = "client")] #[cfg(feature = "client")]
pub use self::client::{PrepareSignError, Sign}; pub use self::client::{Canceled, PrepareSignError, Sign, Spawn};
#[cfg(feature = "server")] #[cfg(feature = "server")]
pub use self::server::{PrepareVerifyError, SignatureVerify}; pub use self::server::{PrepareVerifyError, SignatureVerify};
@ -267,7 +267,7 @@ pub use self::server::{PrepareVerifyError, SignatureVerify};
/// ///
/// By default, the config is set up to create and verify signatures that expire after 10 /// By default, the config is set up to create and verify signatures that expire after 10
/// seconds, and use the `(created)` and `(expires)` fields that were introduced in draft 11 /// seconds, and use the `(created)` and `(expires)` fields that were introduced in draft 11
pub struct Config { pub struct Config<Spawner = DefaultSpawner> {
/// The inner config type /// The inner config type
config: http_signature_normalization::Config, config: http_signature_normalization::Config,
@ -276,11 +276,18 @@ pub struct Config {
/// Whether to set the Date header /// Whether to set the Date header
set_date: bool, set_date: bool,
/// The spawner used to create blocking operations
spawner: Spawner,
} }
/// A default implementation of Spawner for spawning blocking operations
#[derive(Clone, Copy, Debug, Default)]
pub struct DefaultSpawner;
#[cfg(feature = "client")] #[cfg(feature = "client")]
mod client { mod client {
use super::{Config, RequiredError}; use super::{Config, DefaultSpawner, RequiredError};
use actix_http::header::{InvalidHeaderValue, ToStrError}; use actix_http::header::{InvalidHeaderValue, ToStrError};
use actix_rt::task::JoinError; use actix_rt::task::JoinError;
use std::{fmt::Display, future::Future, pin::Pin}; use std::{fmt::Display, future::Future, pin::Pin};
@ -288,9 +295,9 @@ mod client {
/// A trait implemented by the awc ClientRequest type to add an HTTP signature to the request /// A trait implemented by the awc ClientRequest type to add an HTTP signature to the request
pub trait Sign { pub trait Sign {
/// Add an Authorization Signature to the request /// Add an Authorization Signature to the request
fn authorization_signature<F, E, K>( fn authorization_signature<F, E, K, S>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Pin<Box<dyn Future<Output = Result<Self, E>>>> ) -> Pin<Box<dyn Future<Output = Result<Self, E>>>>
@ -298,17 +305,19 @@ mod client {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
Self: Sized; Self: Sized;
/// Add a Signature to the request /// Add a Signature to the request
fn signature<F, E, K>( fn signature<F, E, K, S>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Pin<Box<dyn Future<Output = Result<Self, E>>>> ) -> Pin<Box<dyn Future<Output = Result<Self, E>>>>
@ -316,11 +325,13 @@ mod client {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
Self: Sized; Self: Sized;
} }
@ -343,6 +354,65 @@ mod client {
/// Invalid Date header /// Invalid Date header
InvalidHeader(#[from] actix_http::header::InvalidHeaderValue), InvalidHeader(#[from] actix_http::header::InvalidHeaderValue),
} }
/// An error that indicates a blocking operation panicked and cannot return a response
#[derive(Debug)]
pub struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Operation was canceled")
}
}
impl std::error::Error for Canceled {}
/// A trait dictating how to spawn a future onto a blocking threadpool. By default,
/// http-signature-normalization-actix will use actix_rt's built-in blocking threadpool, but this
/// can be customized
pub trait Spawn {
/// The future type returned by spawn_blocking
type Future<T>: std::future::Future<Output = Result<T, Canceled>>;
/// Spawn the blocking function onto the threadpool
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static;
}
/// The future returned by DefaultSpawner when spawning blocking operations on the actix_rt
/// blocking threadpool
pub struct DefaultSpawnerFuture<Out> {
inner: actix_rt::task::JoinHandle<Out>,
}
impl Spawn for DefaultSpawner {
type Future<T> = DefaultSpawnerFuture<T>;
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
DefaultSpawnerFuture {
inner: actix_rt::task::spawn_blocking(func),
}
}
}
impl<Out> std::future::Future for DefaultSpawnerFuture<Out> {
type Output = Result<Out, Canceled>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(std::pin::Pin::new(&mut self.inner).poll(cx));
std::task::Poll::Ready(res.map_err(|_| Canceled))
}
}
} }
#[cfg(feature = "server")] #[cfg(feature = "server")]
@ -422,7 +492,9 @@ impl Config {
pub fn new() -> Self { pub fn new() -> Self {
Config::default() Config::default()
} }
}
impl<Spawner> Config<Spawner> {
/// Since manually setting the Host header doesn't work so well in AWC, you can use this method /// Since manually setting the Host header doesn't work so well in AWC, you can use this method
/// to enable setting the Host header for signing requests without breaking client /// to enable setting the Host header for signing requests without breaking client
/// functionality /// functionality
@ -431,6 +503,24 @@ impl Config {
config: self.config, config: self.config,
set_host: true, set_host: true,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
}
}
#[cfg(client)]
/// Set the spawner for spawning blocking tasks
///
/// http-signature-normalization-actix offloads signing messages and generating hashes to a
/// blocking threadpool, which can be configured by providing a custom spawner.
pub fn spawner<S>(self, spawner: S) -> Config<S>
where
S: Spawn,
{
Config {
config: self.config,
set_host: self.set_host,
set_date: self.set_date,
spawner,
} }
} }
@ -443,6 +533,7 @@ impl Config {
config: self.config.mastodon_compat(), config: self.config.mastodon_compat(),
set_host: true, set_host: true,
set_date: true, set_date: true,
spawner: self.spawner,
} }
} }
@ -454,6 +545,7 @@ impl Config {
config: self.config.require_digest(), config: self.config.require_digest(),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -466,6 +558,7 @@ impl Config {
config: self.config.dont_use_created_field(), config: self.config.dont_use_created_field(),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -475,6 +568,7 @@ impl Config {
config: self.config.set_expiration(expires_after), config: self.config.set_expiration(expires_after),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -484,6 +578,7 @@ impl Config {
config: self.config.require_header(header), config: self.config.require_header(header),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }

View file

@ -1,4 +1,4 @@
use crate::{create::Signed, Config, PrepareSignError, Sign}; use crate::{create::Signed, Config, PrepareSignError, Sign, Spawn};
use actix_rt::task::JoinError; use actix_rt::task::JoinError;
use awc::{ use awc::{
http::header::{HttpDate, InvalidHeaderValue, TryIntoHeaderValue}, http::header::{HttpDate, InvalidHeaderValue, TryIntoHeaderValue},
@ -7,9 +7,9 @@ use awc::{
use std::{fmt::Display, future::Future, pin::Pin, time::SystemTime}; use std::{fmt::Display, future::Future, pin::Pin, time::SystemTime};
impl Sign for ClientRequest { impl Sign for ClientRequest {
fn authorization_signature<F, E, K>( fn authorization_signature<F, E, K, S>(
mut self, mut self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Pin<Box<dyn Future<Output = Result<Self, E>>>> ) -> Pin<Box<dyn Future<Output = Result<Self, E>>>>
@ -17,11 +17,13 @@ impl Sign for ClientRequest {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<crate::Canceled>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
Self: Sized, Self: Sized,
{ {
Box::pin(async move { Box::pin(async move {
@ -31,9 +33,9 @@ impl Sign for ClientRequest {
}) })
} }
fn signature<F, E, K>( fn signature<F, E, K, S>(
mut self, mut self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Pin<Box<dyn Future<Output = Result<Self, E>>>> ) -> Pin<Box<dyn Future<Output = Result<Self, E>>>>
@ -42,10 +44,12 @@ impl Sign for ClientRequest {
E: From<JoinError> E: From<JoinError>
+ From<PrepareSignError> + From<PrepareSignError>
+ From<InvalidHeaderValue> + From<InvalidHeaderValue>
+ From<crate::Canceled>
+ std::fmt::Debug + std::fmt::Debug
+ Send + Send
+ 'static, + 'static,
K: Display + 'static, K: Display + 'static,
S: Spawn + 'static,
Self: Sized, Self: Sized,
{ {
Box::pin(async move { Box::pin(async move {
@ -56,16 +60,22 @@ impl Sign for ClientRequest {
} }
} }
async fn prepare<F, E, K>( async fn prepare<F, E, K, S>(
request: &mut ClientRequest, request: &mut ClientRequest,
config: &Config, config: &Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Result<Signed, E> ) -> Result<Signed, E>
where where
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<JoinError> + From<PrepareSignError> + std::fmt::Debug + Send + 'static, E: From<JoinError>
+ From<PrepareSignError>
+ From<crate::Canceled>
+ std::fmt::Debug
+ Send
+ 'static,
K: Display, K: Display,
S: Spawn + 'static,
{ {
if config.set_date && !request.headers().contains_key("date") { if config.set_date && !request.headers().contains_key("date") {
request.headers_mut().insert( request.headers_mut().insert(
@ -103,7 +113,10 @@ where
let key_id = key_id.to_string(); let key_id = key_id.to_string();
let signed = actix_rt::task::spawn_blocking(move || unsigned.sign(key_id, f)).await??; let signed = config
.spawner
.spawn_blocking(move || unsigned.sign(key_id, f))
.await??;
Ok(signed) Ok(signed)
} }