mirror of
https://git.asonix.dog/asonix/http-signature-normalization.git
synced 2024-05-09 07:12:41 +00:00
Compare commits
5 commits
ef266eed95
...
6e0a6fa3a2
Author | SHA1 | Date | |
---|---|---|---|
6e0a6fa3a2 | |||
f0dc14d5f1 | |||
92a73f0313 | |||
e8588efda7 | |||
6acd291315 |
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "http-signature-normalization-actix"
|
||||
description = "An HTTP Signatures library that leaves the signing to you"
|
||||
version = "0.10.1"
|
||||
version = "0.10.4"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
license = "AGPL-3.0"
|
||||
readme = "README.md"
|
||||
|
@ -33,11 +33,12 @@ actix-rt = "2.6.0"
|
|||
actix-web = { version = "4.0.0", default-features = false, optional = true }
|
||||
awc = { version = "3.0.0", default-features = false, optional = true }
|
||||
base64 = { version = "0.13", optional = true }
|
||||
futures-util = { version = "0.3", default-features = false }
|
||||
futures-core = "0.3.28"
|
||||
http-signature-normalization = { version = "0.7.0", path = ".." }
|
||||
ring = { version = "0.16.20", optional = true }
|
||||
sha2 = { version = "0.10", optional = true }
|
||||
sha3 = { version = "0.10", optional = true }
|
||||
streem = "0.1.1"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -10,16 +10,14 @@ use actix_web::{
|
|||
http::{header::HeaderValue, StatusCode},
|
||||
web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
|
||||
};
|
||||
use futures_util::{
|
||||
future::LocalBoxFuture,
|
||||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use futures_core::{future::LocalBoxFuture, Stream};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use streem::IntoStreamer;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tracing::{debug, Span};
|
||||
use tracing_error::SpanTrace;
|
||||
|
||||
|
@ -87,10 +85,10 @@ enum VerifyErrorKind {
|
|||
struct RxStream<T>(mpsc::Receiver<T>);
|
||||
|
||||
impl<T> Stream for RxStream<T> {
|
||||
type Item = T;
|
||||
type Item = Result<T, PayloadError>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.0).poll_recv(cx)
|
||||
Pin::new(&mut self.0).poll_recv(cx).map(|opt| opt.map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,22 +125,30 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
struct VerifiedReceiver {
|
||||
rx: Option<oneshot::Receiver<()>>,
|
||||
}
|
||||
|
||||
impl FromRequest for DigestVerified {
|
||||
type Error = VerifyError;
|
||||
type Future = Ready<Result<Self, Self::Error>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
|
||||
|
||||
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
|
||||
let res = req
|
||||
.extensions()
|
||||
.get::<Self>()
|
||||
.copied()
|
||||
.extensions_mut()
|
||||
.get_mut::<VerifiedReceiver>()
|
||||
.and_then(|r| r.rx.take())
|
||||
.ok_or_else(|| VerifyError::new(&Span::current(), VerifyErrorKind::Extension));
|
||||
|
||||
if res.is_err() {
|
||||
debug!("Failed to fetch DigestVerified from request");
|
||||
}
|
||||
|
||||
ready(res)
|
||||
Box::pin(async move {
|
||||
res?.await
|
||||
.map_err(|_| VerifyError::new(&Span::current(), VerifyErrorKind::Dropped))
|
||||
.map(|()| DigestVerified)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,18 +211,25 @@ where
|
|||
let spawner = self.1.clone();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let f1 = span.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx));
|
||||
let (verify_tx, verify_rx) = oneshot::channel();
|
||||
let f1 = span
|
||||
.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx, verify_tx));
|
||||
|
||||
let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> =
|
||||
Box::pin(RxStream(rx).map(Ok));
|
||||
Box::pin(RxStream(rx));
|
||||
req.set_payload(payload.into());
|
||||
req.extensions_mut().insert(DigestVerified);
|
||||
req.extensions_mut().insert(VerifiedReceiver {
|
||||
rx: Some(verify_rx),
|
||||
});
|
||||
|
||||
let f2 = self.0.call(req);
|
||||
|
||||
Box::pin(async move {
|
||||
let (_, res) = futures_util::future::join(f1, f2).await;
|
||||
res
|
||||
let handle1 = actix_web::rt::spawn(f1);
|
||||
let handle2 = actix_web::rt::spawn(f2);
|
||||
|
||||
handle1.await.expect("verify panic")?;
|
||||
handle2.await.expect("inner panic")
|
||||
})
|
||||
} else if self.2 {
|
||||
Box::pin(ready(Err(VerifyError::new(
|
||||
|
@ -235,15 +248,17 @@ async fn verify_payload<T, Spawner>(
|
|||
spawner: Spawner,
|
||||
vec: Vec<DigestPart>,
|
||||
mut verify_digest: T,
|
||||
mut payload: Payload,
|
||||
payload: Payload,
|
||||
tx: mpsc::Sender<web::Bytes>,
|
||||
verify_tx: oneshot::Sender<()>,
|
||||
) -> Result<(), actix_web::Error>
|
||||
where
|
||||
T: DigestVerify + Clone + Send + 'static,
|
||||
Spawner: Spawn,
|
||||
{
|
||||
while let Some(res) = payload.next().await {
|
||||
let bytes = res?;
|
||||
let mut payload = payload.into_streamer();
|
||||
|
||||
while let Some(bytes) = payload.try_next().await? {
|
||||
let bytes2 = bytes.clone();
|
||||
verify_digest = spawner
|
||||
.spawn_blocking(move || {
|
||||
|
@ -262,6 +277,9 @@ where
|
|||
.await??;
|
||||
|
||||
if verified {
|
||||
if verify_tx.send(()).is_err() {
|
||||
debug!("handler dropped");
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err(VerifyError::new(&Span::current(), VerifyErrorKind::Verify).into())
|
||||
|
|
|
@ -7,7 +7,7 @@ use actix_web::{
|
|||
http::StatusCode,
|
||||
Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
|
||||
};
|
||||
use futures_util::future::LocalBoxFuture;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
future::{ready, Ready},
|
||||
|
|
Loading…
Reference in a new issue