mirror of
https://git.asonix.dog/asonix/http-signature-normalization.git
synced 2025-01-03 08:18:42 +00:00
Enable Spawn for server, allow spawner in digest verify
This commit is contained in:
parent
e526699c87
commit
1f23b72140
6 changed files with 136 additions and 93 deletions
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "http-signature-normalization-actix"
|
||||
description = "An HTTP Signatures library that leaves the signing to you"
|
||||
version = "0.9.1"
|
||||
version = "0.10.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
license = "AGPL-3.0"
|
||||
readme = "README.md"
|
||||
|
|
|
@ -45,7 +45,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
let config = Config::default().require_header("accept").require_digest();
|
||||
let config = Config::new().require_header("accept").require_digest();
|
||||
|
||||
request(config.clone()).await?;
|
||||
request(config.mastodon_compat()).await?;
|
||||
|
|
|
@ -63,7 +63,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
let config = Config::default().require_header("accept").require_digest();
|
||||
let config = Config::new().require_header("accept").require_digest();
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
//! Types for setting up Digest middleware verification
|
||||
|
||||
use crate::{DefaultSpawner, Spawn};
|
||||
|
||||
use super::{DigestPart, DigestVerify};
|
||||
use actix_web::{
|
||||
body::MessageBody,
|
||||
|
@ -42,10 +44,10 @@ pub struct DigestVerified;
|
|||
/// .route("/unprotected", web::post().to(|| "No verification required"))
|
||||
/// })
|
||||
/// ```
|
||||
pub struct VerifyDigest<T>(bool, T);
|
||||
pub struct VerifyDigest<T, Spawner = DefaultSpawner>(Spawner, bool, T);
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct VerifyMiddleware<T, S>(S, bool, T);
|
||||
pub struct VerifyMiddleware<T, Spawner, S>(S, Spawner, bool, T);
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Error verifying digest")]
|
||||
|
@ -98,7 +100,22 @@ where
|
|||
{
|
||||
/// Produce a new VerifyDigest with a user-provided [`Digestverify`] type
|
||||
pub fn new(verify_digest: T) -> Self {
|
||||
VerifyDigest(true, verify_digest)
|
||||
VerifyDigest(DefaultSpawner, true, verify_digest)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Spawner> VerifyDigest<T, Spawner>
|
||||
where
|
||||
T: DigestVerify + Clone,
|
||||
{
|
||||
/// Set the spawner used for verifying bytes in the request
|
||||
///
|
||||
/// By default this value uses `actix_web::web::block`
|
||||
pub fn spawner<NewSpawner>(self, spawner: NewSpawner) -> VerifyDigest<T, NewSpawner>
|
||||
where
|
||||
NewSpawner: Spawn,
|
||||
{
|
||||
VerifyDigest(spawner, self.1, self.2)
|
||||
}
|
||||
|
||||
/// Mark verifying the Digest as optional
|
||||
|
@ -106,7 +123,7 @@ where
|
|||
/// If a digest is present in the request, it will be verified, but it is not required to be
|
||||
/// present
|
||||
pub fn optional(self) -> Self {
|
||||
VerifyDigest(false, self.1)
|
||||
VerifyDigest(self.0, false, self.2)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,30 +146,37 @@ impl FromRequest for DigestVerified {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, S, B> Transform<S, ServiceRequest> for VerifyDigest<T>
|
||||
impl<T, Spawner, S, B> Transform<S, ServiceRequest> for VerifyDigest<T, Spawner>
|
||||
where
|
||||
T: DigestVerify + Clone + Send + 'static,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
|
||||
S::Error: 'static,
|
||||
B: MessageBody + 'static,
|
||||
Spawner: Spawn + Clone + 'static,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Error = actix_web::Error;
|
||||
type Transform = VerifyMiddleware<T, S>;
|
||||
type Transform = VerifyMiddleware<T, Spawner, S>;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ready(Ok(VerifyMiddleware(service, self.0, self.1.clone())))
|
||||
ready(Ok(VerifyMiddleware(
|
||||
service,
|
||||
self.0.clone(),
|
||||
self.1,
|
||||
self.2.clone(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S>
|
||||
impl<T, Spawner, S, B> Service<ServiceRequest> for VerifyMiddleware<T, Spawner, S>
|
||||
where
|
||||
T: DigestVerify + Clone + Send + 'static,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
|
||||
S::Error: 'static,
|
||||
B: MessageBody + 'static,
|
||||
Spawner: Spawn + Clone + 'static,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Error = actix_web::Error;
|
||||
|
@ -165,7 +189,7 @@ where
|
|||
fn call(&self, mut req: ServiceRequest) -> Self::Future {
|
||||
let span = tracing::info_span!(
|
||||
"Verify digest",
|
||||
digest.required = tracing::field::display(&self.1),
|
||||
digest.required = tracing::field::display(&self.2),
|
||||
);
|
||||
|
||||
if let Some(digest) = req.headers().get("Digest") {
|
||||
|
@ -178,9 +202,10 @@ where
|
|||
}
|
||||
};
|
||||
let payload = req.take_payload();
|
||||
let spawner = self.1.clone();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let f1 = span.in_scope(|| verify_payload(vec, self.2.clone(), payload, tx));
|
||||
let f1 = span.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx));
|
||||
|
||||
let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> =
|
||||
Box::pin(RxStream(rx).map(Ok));
|
||||
|
@ -193,7 +218,7 @@ where
|
|||
let (_, res) = futures_util::future::join(f1, f2).await;
|
||||
res
|
||||
})
|
||||
} else if self.1 {
|
||||
} else if self.2 {
|
||||
Box::pin(ready(Err(VerifyError::new(
|
||||
&span,
|
||||
VerifyErrorKind::MissingDigest,
|
||||
|
@ -205,8 +230,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Verify Payload", skip(verify_digest, payload, tx))]
|
||||
async fn verify_payload<T>(
|
||||
#[tracing::instrument(name = "Verify Payload", skip(spawner, verify_digest, payload, tx))]
|
||||
async fn verify_payload<T, Spawner>(
|
||||
spawner: Spawner,
|
||||
vec: Vec<DigestPart>,
|
||||
mut verify_digest: T,
|
||||
mut payload: Payload,
|
||||
|
@ -214,23 +240,26 @@ async fn verify_payload<T>(
|
|||
) -> Result<(), actix_web::Error>
|
||||
where
|
||||
T: DigestVerify + Clone + Send + 'static,
|
||||
Spawner: Spawn,
|
||||
{
|
||||
while let Some(res) = payload.next().await {
|
||||
let bytes = res?;
|
||||
let bytes2 = bytes.clone();
|
||||
verify_digest = web::block(move || {
|
||||
verify_digest.update(bytes2.as_ref());
|
||||
Ok(verify_digest) as Result<T, VerifyError>
|
||||
})
|
||||
.await??;
|
||||
verify_digest = spawner
|
||||
.spawn_blocking(move || {
|
||||
verify_digest.update(bytes2.as_ref());
|
||||
Ok(verify_digest) as Result<T, VerifyError>
|
||||
})
|
||||
.await??;
|
||||
|
||||
tx.send(bytes)
|
||||
.await
|
||||
.map_err(|_| VerifyError::new(&Span::current(), VerifyErrorKind::Dropped))?;
|
||||
}
|
||||
|
||||
let verified =
|
||||
web::block(move || Ok(verify_digest.verify(&vec)) as Result<_, VerifyError>).await??;
|
||||
let verified = spawner
|
||||
.spawn_blocking(move || Ok(verify_digest.verify(&vec)) as Result<_, VerifyError>)
|
||||
.await??;
|
||||
|
||||
if verified {
|
||||
Ok(())
|
||||
|
|
132
actix/src/lib.rs
132
actix/src/lib.rs
|
@ -257,7 +257,7 @@ pub mod verify {
|
|||
}
|
||||
|
||||
#[cfg(feature = "client")]
|
||||
pub use self::client::{Canceled, PrepareSignError, Sign, Spawn};
|
||||
pub use self::client::{PrepareSignError, Sign};
|
||||
|
||||
#[cfg(feature = "server")]
|
||||
pub use self::server::{PrepareVerifyError, SignatureVerify};
|
||||
|
@ -285,9 +285,68 @@ pub struct Config<Spawner = DefaultSpawner> {
|
|||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct DefaultSpawner;
|
||||
|
||||
/// An error that indicates a blocking operation panicked and cannot return a response
|
||||
#[derive(Debug)]
|
||||
pub struct Canceled;
|
||||
|
||||
impl std::fmt::Display for Canceled {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Operation was canceled")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Canceled {}
|
||||
|
||||
/// A trait dictating how to spawn a future onto a blocking threadpool. By default,
|
||||
/// http-signature-normalization-actix will use actix_rt's built-in blocking threadpool, but this
|
||||
/// can be customized
|
||||
pub trait Spawn {
|
||||
/// The future type returned by spawn_blocking
|
||||
type Future<T>: std::future::Future<Output = Result<T, Canceled>>;
|
||||
|
||||
/// Spawn the blocking function onto the threadpool
|
||||
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
||||
where
|
||||
Func: FnOnce() -> Out + Send + 'static,
|
||||
Out: Send + 'static;
|
||||
}
|
||||
|
||||
/// The future returned by DefaultSpawner when spawning blocking operations on the actix_rt
|
||||
/// blocking threadpool
|
||||
pub struct DefaultSpawnerFuture<Out> {
|
||||
inner: actix_rt::task::JoinHandle<Out>,
|
||||
}
|
||||
|
||||
impl Spawn for DefaultSpawner {
|
||||
type Future<T> = DefaultSpawnerFuture<T>;
|
||||
|
||||
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
||||
where
|
||||
Func: FnOnce() -> Out + Send + 'static,
|
||||
Out: Send + 'static,
|
||||
{
|
||||
DefaultSpawnerFuture {
|
||||
inner: actix_rt::task::spawn_blocking(func),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Out> std::future::Future for DefaultSpawnerFuture<Out> {
|
||||
type Output = Result<Out, Canceled>;
|
||||
|
||||
fn poll(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
let res = std::task::ready!(std::pin::Pin::new(&mut self.inner).poll(cx));
|
||||
|
||||
std::task::Poll::Ready(res.map_err(|_| Canceled))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "client")]
|
||||
mod client {
|
||||
use super::{Config, DefaultSpawner, RequiredError};
|
||||
use super::{Config, RequiredError, Spawn};
|
||||
use actix_http::header::{InvalidHeaderValue, ToStrError};
|
||||
use actix_rt::task::JoinError;
|
||||
use std::{fmt::Display, future::Future, pin::Pin};
|
||||
|
@ -354,65 +413,6 @@ mod client {
|
|||
/// Invalid Date header
|
||||
InvalidHeader(#[from] actix_http::header::InvalidHeaderValue),
|
||||
}
|
||||
|
||||
/// An error that indicates a blocking operation panicked and cannot return a response
|
||||
#[derive(Debug)]
|
||||
pub struct Canceled;
|
||||
|
||||
impl std::fmt::Display for Canceled {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Operation was canceled")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Canceled {}
|
||||
|
||||
/// A trait dictating how to spawn a future onto a blocking threadpool. By default,
|
||||
/// http-signature-normalization-actix will use actix_rt's built-in blocking threadpool, but this
|
||||
/// can be customized
|
||||
pub trait Spawn {
|
||||
/// The future type returned by spawn_blocking
|
||||
type Future<T>: std::future::Future<Output = Result<T, Canceled>>;
|
||||
|
||||
/// Spawn the blocking function onto the threadpool
|
||||
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
||||
where
|
||||
Func: FnOnce() -> Out + Send + 'static,
|
||||
Out: Send + 'static;
|
||||
}
|
||||
|
||||
/// The future returned by DefaultSpawner when spawning blocking operations on the actix_rt
|
||||
/// blocking threadpool
|
||||
pub struct DefaultSpawnerFuture<Out> {
|
||||
inner: actix_rt::task::JoinHandle<Out>,
|
||||
}
|
||||
|
||||
impl Spawn for DefaultSpawner {
|
||||
type Future<T> = DefaultSpawnerFuture<T>;
|
||||
|
||||
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
||||
where
|
||||
Func: FnOnce() -> Out + Send + 'static,
|
||||
Out: Send + 'static,
|
||||
{
|
||||
DefaultSpawnerFuture {
|
||||
inner: actix_rt::task::spawn_blocking(func),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Out> std::future::Future for DefaultSpawnerFuture<Out> {
|
||||
type Output = Result<Out, Canceled>;
|
||||
|
||||
fn poll(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
let res = std::task::ready!(std::pin::Pin::new(&mut self.inner).poll(cx));
|
||||
|
||||
std::task::Poll::Ready(res.map_err(|_| Canceled))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "server")]
|
||||
|
@ -485,6 +485,16 @@ mod server {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl actix_web::ResponseError for super::Canceled {
|
||||
fn status_code(&self) -> actix_http::StatusCode {
|
||||
actix_http::StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
|
||||
fn error_response(&self) -> actix_web::HttpResponse<actix_http::body::BoxBody> {
|
||||
actix_web::HttpResponse::new(self.status_code())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Types for verifying requests with Actix Web
|
||||
|
||||
use crate::{Config, PrepareVerifyError, SignatureVerify};
|
||||
use crate::{Config, PrepareVerifyError, SignatureVerify, Spawn};
|
||||
use actix_web::{
|
||||
body::MessageBody,
|
||||
dev::{Payload, Service, ServiceRequest, ServiceResponse, Transform},
|
||||
|
@ -45,11 +45,11 @@ impl SignatureVerified {
|
|||
/// .route("/unprotected", web::post().to(|| "No verification required"))
|
||||
/// })
|
||||
/// ```
|
||||
pub struct VerifySignature<T>(T, Config, HeaderKind);
|
||||
pub struct VerifySignature<T, Spawner>(T, Config<Spawner>, HeaderKind);
|
||||
|
||||
#[derive(Debug)]
|
||||
#[doc(hidden)]
|
||||
pub struct VerifyMiddleware<T, S>(Rc<S>, Config, HeaderKind, T);
|
||||
pub struct VerifyMiddleware<T, Spawner, S>(Rc<S>, Config<Spawner>, HeaderKind, T);
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
|
||||
enum HeaderKind {
|
||||
|
@ -126,7 +126,7 @@ impl VerifyError {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> VerifySignature<T>
|
||||
impl<T, Spawner> VerifySignature<T, Spawner>
|
||||
where
|
||||
T: SignatureVerify,
|
||||
{
|
||||
|
@ -135,7 +135,10 @@ where
|
|||
///
|
||||
/// By default, this middleware expects to verify Signature headers, and requires the presence
|
||||
/// of the header
|
||||
pub fn new(verify_signature: T, config: Config) -> Self {
|
||||
pub fn new(verify_signature: T, config: Config<Spawner>) -> Self
|
||||
where
|
||||
Spawner: Spawn,
|
||||
{
|
||||
VerifySignature(verify_signature, config, HeaderKind::Signature)
|
||||
}
|
||||
|
||||
|
@ -145,7 +148,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, S, B> VerifyMiddleware<T, S>
|
||||
impl<T, Spawner, S, B> VerifyMiddleware<T, Spawner, S>
|
||||
where
|
||||
T: SignatureVerify + Clone + 'static,
|
||||
T::Future: 'static,
|
||||
|
@ -257,16 +260,17 @@ impl FromRequest for SignatureVerified {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, S, B> Transform<S, ServiceRequest> for VerifySignature<T>
|
||||
impl<T, Spawner, S, B> Transform<S, ServiceRequest> for VerifySignature<T, Spawner>
|
||||
where
|
||||
T: SignatureVerify + Clone + 'static,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
|
||||
S::Error: 'static,
|
||||
B: MessageBody + 'static,
|
||||
Spawner: Clone,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Error = actix_web::Error;
|
||||
type Transform = VerifyMiddleware<T, S>;
|
||||
type Transform = VerifyMiddleware<T, Spawner, S>;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
|
@ -280,7 +284,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S>
|
||||
impl<T, Spawner, S, B> Service<ServiceRequest> for VerifyMiddleware<T, Spawner, S>
|
||||
where
|
||||
T: SignatureVerify + Clone + 'static,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
|
||||
|
|
Loading…
Reference in a new issue