Allow providing custom spawners for reqwest

This commit is contained in:
asonix 2023-08-17 11:56:09 -05:00
parent 8750783667
commit bbafc22d08
3 changed files with 224 additions and 62 deletions

View file

@ -11,15 +11,16 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["sha-2", "sha-3"] default = ["sha-2", "default-spawner"]
middleware = ["reqwest-middleware"] middleware = ["dep:reqwest-middleware"]
digest = ["base64", "tokio"] default-spawner = ["dep:tokio"]
sha-2 = ["digest", "sha2"] digest = ["dep:base64", "dep:tokio"]
sha-3 = ["digest", "sha3"] sha-2 = ["digest", "dep:sha2"]
sha-3 = ["digest", "dep:sha3"]
[[example]] [[example]]
name = "client" name = "client"
required-features = ["sha-2"] required-features = ["default-spawner", "sha-2"]
[dependencies] [dependencies]
async-trait = "0.1.71" async-trait = "0.1.71"

View file

@ -1,4 +1,4 @@
use crate::{Config, Sign, SignError}; use crate::{Config, Sign, SignError, Spawn};
use reqwest::{Body, Request, RequestBuilder}; use reqwest::{Body, Request, RequestBuilder};
use std::fmt::Display; use std::fmt::Display;
@ -23,9 +23,9 @@ pub trait DigestCreate {
/// a malicious entity /// a malicious entity
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait SignExt: Sign { pub trait SignExt: Sign {
async fn authorization_signature_with_digest<F, E, K, D, V>( async fn authorization_signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
digest: D, digest: D,
v: V, v: V,
@ -37,11 +37,12 @@ pub trait SignExt: Sign {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
Self: Sized; Self: Sized;
async fn signature_with_digest<F, E, K, D, V>( async fn signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
digest: D, digest: D,
v: V, v: V,
@ -53,14 +54,15 @@ pub trait SignExt: Sign {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
Self: Sized; Self: Sized;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl SignExt for RequestBuilder { impl SignExt for RequestBuilder {
async fn authorization_signature_with_digest<F, E, K, D, V>( async fn authorization_signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -72,13 +74,16 @@ impl SignExt for RequestBuilder {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
{ {
let (v, digest) = tokio::task::spawn_blocking(move || { let (v, digest) = config
let digest = digest.compute(v.as_ref()); .spawner
(v, digest) .spawn_blocking(move || {
}) let digest = digest.compute(v.as_ref());
.await (v, digest)
.map_err(|_| SignError::Canceled)?; })
.await
.map_err(|_| SignError::Canceled)?;
let mut req = self let mut req = self
.header("Digest", format!("{}={}", D::NAME, digest)) .header("Digest", format!("{}={}", D::NAME, digest))
@ -90,9 +95,9 @@ impl SignExt for RequestBuilder {
Ok(req) Ok(req)
} }
async fn signature_with_digest<F, E, K, D, V>( async fn signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -104,13 +109,16 @@ impl SignExt for RequestBuilder {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
{ {
let (v, digest) = tokio::task::spawn_blocking(move || { let (v, digest) = config
let digest = digest.compute(v.as_ref()); .spawner
(v, digest) .spawn_blocking(move || {
}) let digest = digest.compute(v.as_ref());
.await (v, digest)
.map_err(|_| SignError::Canceled)?; })
.await
.map_err(|_| SignError::Canceled)?;
let mut req = self let mut req = self
.header("Digest", format!("{}={}", D::NAME, digest)) .header("Digest", format!("{}={}", D::NAME, digest))
@ -125,16 +133,16 @@ impl SignExt for RequestBuilder {
#[cfg(feature = "middleware")] #[cfg(feature = "middleware")]
mod middleware { mod middleware {
use super::{Config, DigestCreate, Sign, SignError, SignExt}; use super::{Config, DigestCreate, Sign, SignError, SignExt, Spawn};
use reqwest::{Body, Request}; use reqwest::{Body, Request};
use reqwest_middleware::RequestBuilder; use reqwest_middleware::RequestBuilder;
use std::fmt::Display; use std::fmt::Display;
#[async_trait::async_trait] #[async_trait::async_trait]
impl SignExt for RequestBuilder { impl SignExt for RequestBuilder {
async fn authorization_signature_with_digest<F, E, K, D, V>( async fn authorization_signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -146,14 +154,17 @@ mod middleware {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
Self: Sized, Self: Sized,
{ {
let (v, digest) = tokio::task::spawn_blocking(move || { let (v, digest) = config
let digest = digest.compute(v.as_ref()); .spawner
(v, digest) .spawn_blocking(move || {
}) let digest = digest.compute(v.as_ref());
.await (v, digest)
.map_err(|_| SignError::Canceled)?; })
.await
.map_err(|_| SignError::Canceled)?;
let mut req = self let mut req = self
.header("Digest", format!("{}={}", D::NAME, digest)) .header("Digest", format!("{}={}", D::NAME, digest))
@ -165,9 +176,9 @@ mod middleware {
Ok(req) Ok(req)
} }
async fn signature_with_digest<F, E, K, D, V>( async fn signature_with_digest<F, E, K, S, D, V>(
self, self,
config: Config, config: Config<S>,
key_id: K, key_id: K,
mut digest: D, mut digest: D,
v: V, v: V,
@ -179,14 +190,17 @@ mod middleware {
K: Display + Send + 'static, K: Display + Send + 'static,
D: DigestCreate + Send + 'static, D: DigestCreate + Send + 'static,
V: AsRef<[u8]> + Into<Body> + Send + 'static, V: AsRef<[u8]> + Into<Body> + Send + 'static,
S: Spawn + Send + Sync,
Self: Sized, Self: Sized,
{ {
let (v, digest) = tokio::task::spawn_blocking(move || { let (v, digest) = config
let digest = digest.compute(v.as_ref()); .spawner
(v, digest) .spawn_blocking(move || {
}) let digest = digest.compute(v.as_ref());
.await (v, digest)
.map_err(|_| SignError::Canceled)?; })
.await
.map_err(|_| SignError::Canceled)?;
let mut req = self let mut req = self
.header("Digest", format!("{}={}", D::NAME, digest)) .header("Digest", format!("{}={}", D::NAME, digest))

View file

@ -18,16 +18,23 @@ pub mod digest;
pub mod prelude { pub mod prelude {
pub use crate::{Config, Sign, SignError}; pub use crate::{Config, Sign, SignError};
#[cfg(feature = "default-spawner")]
pub use crate::default_spawner::DefaultSpawner;
#[cfg(feature = "digest")] #[cfg(feature = "digest")]
pub use crate::digest::{DigestCreate, SignExt}; pub use crate::digest::{DigestCreate, SignExt};
} }
#[cfg(feature = "default-spawner")]
pub use default_spawner::DefaultSpawner;
#[cfg(feature = "default-spawner")]
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
/// Configuration for signing and verifying signatures /// Configuration for signing and verifying signatures
/// ///
/// By default, the config is set up to create and verify signatures that expire after 10 seconds, /// By default, the config is set up to create and verify signatures that expire after 10 seconds,
/// and use the `(created)` and `(expires)` fields that were introduced in draft 11 /// and use the `(created)` and `(expires)` fields that were introduced in draft 11
pub struct Config { pub struct Config<Spawner = DefaultSpawner> {
/// The inner config type /// The inner config type
config: http_signature_normalization::Config, config: http_signature_normalization::Config,
@ -36,15 +43,115 @@ pub struct Config {
/// Whether to set the Date header /// Whether to set the Date header
set_date: bool, set_date: bool,
/// How to spawn blocking tasks
spawner: Spawner,
}
#[cfg(not(feature = "default-spawner"))]
#[derive(Clone, Debug, Default)]
/// Configuration for signing and verifying signatures
///
/// By default, the config is set up to create and verify signatures that expire after 10 seconds,
/// and use the `(created)` and `(expires)` fields that were introduced in draft 11
pub struct Config<Spawner> {
/// The inner config type
config: http_signature_normalization::Config,
/// Whether to set the Host header
set_host: bool,
/// Whether to set the Date header
set_date: bool,
/// How to spawn blocking tasks
spawner: Spawner,
}
#[cfg(feature = "default-spawner")]
mod default_spawner {
use super::{Canceled, Config, Spawn};
impl Config<DefaultSpawner> {
/// Create a new config with the default spawner
pub fn new() -> Self {
Default::default()
}
}
/// A default implementation of Spawner for spawning blocking operations
#[derive(Clone, Copy, Debug, Default)]
pub struct DefaultSpawner;
/// The future returned by DefaultSpawner when spawning blocking operations on the tokio
/// blocking threadpool
pub struct DefaultSpawnerFuture<Out> {
inner: tokio::task::JoinHandle<Out>,
}
impl Spawn for DefaultSpawner {
type Future<T> = DefaultSpawnerFuture<T> where T: Send;
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
DefaultSpawnerFuture {
inner: tokio::task::spawn_blocking(func),
}
}
}
impl<Out> std::future::Future for DefaultSpawnerFuture<Out> {
type Output = Result<Out, Canceled>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(std::pin::Pin::new(&mut self.inner).poll(cx));
std::task::Poll::Ready(res.map_err(|_| Canceled))
}
}
}
/// An error that indicates a blocking operation panicked and cannot return a response
#[derive(Debug)]
pub struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Operation was canceled")
}
}
impl std::error::Error for Canceled {}
/// A trait dictating how to spawn a future onto a blocking threadpool. By default,
/// http-signature-normalization-actix will use tokio's built-in blocking threadpool, but this
/// can be customized
pub trait Spawn {
/// The future type returned by spawn_blocking
type Future<T>: std::future::Future<Output = Result<T, Canceled>> + Send
where
T: Send;
/// Spawn the blocking function onto the threadpool
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static;
} }
/// A trait implemented by the reqwest RequestBuilder type to add an HTTP Signature to the request /// A trait implemented by the reqwest RequestBuilder type to add an HTTP Signature to the request
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Sign { pub trait Sign {
/// Add an Authorization Signature to the request /// Add an Authorization Signature to the request
async fn authorization_signature<F, E, K>( async fn authorization_signature<F, E, K, S>(
self, self,
config: &Config, config: &Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Result<Request, E> ) -> Result<Request, E>
@ -52,15 +159,17 @@ pub trait Sign {
Self: Sized, Self: Sized,
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send; K: Display + Send,
S: Spawn + Send + Sync;
/// Add a Signature to the request /// Add a Signature to the request
async fn signature<F, E, K>(self, config: &Config, key_id: K, f: F) -> Result<Request, E> async fn signature<F, E, K, S>(self, config: &Config<S>, key_id: K, f: F) -> Result<Request, E>
where where
Self: Sized, Self: Sized,
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send; K: Display + Send,
S: Spawn + Send + Sync;
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -88,9 +197,15 @@ pub enum SignError {
Canceled, Canceled,
} }
impl Config { impl<Spawner> Config<Spawner> {
pub fn new() -> Self { /// Create a new config with the provided spawner
Default::default() pub fn new_with_spawner(spawner: Spawner) -> Self {
Config {
config: Default::default(),
set_host: Default::default(),
set_date: Default::default(),
spawner,
}
} }
/// This method can be used to include the Host header in the HTTP Signature without /// This method can be used to include the Host header in the HTTP Signature without
@ -100,6 +215,7 @@ impl Config {
config: self.config, config: self.config,
set_host: true, set_host: true,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -112,6 +228,7 @@ impl Config {
config: self.config.mastodon_compat(), config: self.config.mastodon_compat(),
set_host: true, set_host: true,
set_date: true, set_date: true,
spawner: self.spawner,
} }
} }
@ -123,6 +240,7 @@ impl Config {
config: self.config.require_digest(), config: self.config.require_digest(),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -135,6 +253,7 @@ impl Config {
config: self.config.dont_use_created_field(), config: self.config.dont_use_created_field(),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -144,6 +263,7 @@ impl Config {
config: self.config.set_expiration(expiries_after), config: self.config.set_expiration(expiries_after),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
} }
} }
@ -153,15 +273,25 @@ impl Config {
config: self.config.require_header(header), config: self.config.require_header(header),
set_host: self.set_host, set_host: self.set_host,
set_date: self.set_date, set_date: self.set_date,
spawner: self.spawner,
}
}
pub fn set_spawner<NewSpawner: Spawn>(self, spawner: NewSpawner) -> Config<NewSpawner> {
Config {
config: self.config,
set_host: self.set_host,
set_date: self.set_date,
spawner,
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Sign for RequestBuilder { impl Sign for RequestBuilder {
async fn authorization_signature<F, E, K>( async fn authorization_signature<F, E, K, S>(
self, self,
config: &Config, config: &Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Result<Request, E> ) -> Result<Request, E>
@ -169,6 +299,7 @@ impl Sign for RequestBuilder {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send, K: Display + Send,
S: Spawn + Send + Sync,
{ {
let mut request = self.build()?; let mut request = self.build()?;
let signed = prepare(&mut request, config, key_id, f).await?; let signed = prepare(&mut request, config, key_id, f).await?;
@ -182,11 +313,12 @@ impl Sign for RequestBuilder {
Ok(request) Ok(request)
} }
async fn signature<F, E, K>(self, config: &Config, key_id: K, f: F) -> Result<Request, E> async fn signature<F, E, K, S>(self, config: &Config<S>, key_id: K, f: F) -> Result<Request, E>
where where
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send, K: Display + Send,
S: Spawn + Send + Sync,
{ {
let mut request = self.build()?; let mut request = self.build()?;
let signed = prepare(&mut request, config, key_id, f).await?; let signed = prepare(&mut request, config, key_id, f).await?;
@ -202,11 +334,17 @@ impl Sign for RequestBuilder {
} }
} }
async fn prepare<F, E, K>(req: &mut Request, config: &Config, key_id: K, f: F) -> Result<Signed, E> async fn prepare<F, E, K, S>(
req: &mut Request,
config: &Config<S>,
key_id: K,
f: F,
) -> Result<Signed, E>
where where
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + Send + 'static, E: From<SignError> + Send + 'static,
K: Display + Send, K: Display + Send,
S: Spawn,
{ {
if config.set_date && !req.headers().contains_key("date") { if config.set_date && !req.headers().contains_key("date") {
req.headers_mut().insert( req.headers_mut().insert(
@ -249,7 +387,9 @@ where
.map_err(SignError::from)?; .map_err(SignError::from)?;
let key_string = key_id.to_string(); let key_string = key_id.to_string();
let signed = tokio::task::spawn_blocking(move || unsigned.sign(key_string, f)) let signed = config
.spawner
.spawn_blocking(move || unsigned.sign(key_string, f))
.await .await
.map_err(|_| SignError::Canceled)??; .map_err(|_| SignError::Canceled)??;
Ok(signed) Ok(signed)
@ -257,16 +397,16 @@ where
#[cfg(feature = "middleware")] #[cfg(feature = "middleware")]
mod middleware { mod middleware {
use super::{prepare, Config, Sign, SignError}; use super::{prepare, Config, Sign, SignError, Spawn};
use reqwest::Request; use reqwest::Request;
use reqwest_middleware::RequestBuilder; use reqwest_middleware::RequestBuilder;
use std::fmt::Display; use std::fmt::Display;
#[async_trait::async_trait] #[async_trait::async_trait]
impl Sign for RequestBuilder { impl Sign for RequestBuilder {
async fn authorization_signature<F, E, K>( async fn authorization_signature<F, E, K, S>(
self, self,
config: &Config, config: &Config<S>,
key_id: K, key_id: K,
f: F, f: F,
) -> Result<Request, E> ) -> Result<Request, E>
@ -274,6 +414,7 @@ mod middleware {
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send, K: Display + Send,
S: Spawn + Send + Sync,
{ {
let mut request = self.build()?; let mut request = self.build()?;
let signed = prepare(&mut request, config, key_id, f).await?; let signed = prepare(&mut request, config, key_id, f).await?;
@ -287,11 +428,17 @@ mod middleware {
Ok(request) Ok(request)
} }
async fn signature<F, E, K>(self, config: &Config, key_id: K, f: F) -> Result<Request, E> async fn signature<F, E, K, S>(
self,
config: &Config<S>,
key_id: K,
f: F,
) -> Result<Request, E>
where where
F: FnOnce(&str) -> Result<String, E> + Send + 'static, F: FnOnce(&str) -> Result<String, E> + Send + 'static,
E: From<SignError> + From<reqwest::Error> + Send + 'static, E: From<SignError> + From<reqwest::Error> + Send + 'static,
K: Display + Send, K: Display + Send,
S: Spawn + Send + Sync,
{ {
let mut request = self.build()?; let mut request = self.build()?;
let signed = prepare(&mut request, config, key_id, f).await?; let signed = prepare(&mut request, config, key_id, f).await?;