replace futures-util with streem

This commit is contained in:
asonix 2023-09-09 17:17:09 -04:00
parent ef266eed95
commit 6acd291315
3 changed files with 18 additions and 13 deletions

View file

@ -33,11 +33,12 @@ actix-rt = "2.6.0"
actix-web = { version = "4.0.0", default-features = false, optional = true } actix-web = { version = "4.0.0", default-features = false, optional = true }
awc = { version = "3.0.0", default-features = false, optional = true } awc = { version = "3.0.0", default-features = false, optional = true }
base64 = { version = "0.13", optional = true } base64 = { version = "0.13", optional = true }
futures-util = { version = "0.3", default-features = false } futures-core = "0.3.28"
http-signature-normalization = { version = "0.7.0", path = ".." } http-signature-normalization = { version = "0.7.0", path = ".." }
ring = { version = "0.16.20", optional = true } ring = { version = "0.16.20", optional = true }
sha2 = { version = "0.10", optional = true } sha2 = { version = "0.10", optional = true }
sha3 = { version = "0.10", optional = true } sha3 = { version = "0.10", optional = true }
streem = "0.1.1"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] } tokio = { version = "1", default-features = false, features = ["sync"] }
tracing = "0.1" tracing = "0.1"

View file

@ -10,15 +10,13 @@ use actix_web::{
http::{header::HeaderValue, StatusCode}, http::{header::HeaderValue, StatusCode},
web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
}; };
use futures_util::{ use futures_core::{future::LocalBoxFuture, Stream};
future::LocalBoxFuture,
stream::{Stream, StreamExt},
};
use std::{ use std::{
future::{ready, Ready}, future::{ready, Ready},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use streem::IntoStreamer;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{debug, Span}; use tracing::{debug, Span};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
@ -87,10 +85,10 @@ enum VerifyErrorKind {
struct RxStream<T>(mpsc::Receiver<T>); struct RxStream<T>(mpsc::Receiver<T>);
impl<T> Stream for RxStream<T> { impl<T> Stream for RxStream<T> {
type Item = T; type Item = Result<T, PayloadError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_recv(cx) Pin::new(&mut self.0).poll_recv(cx).map(|opt| opt.map(Ok))
} }
} }
@ -208,14 +206,19 @@ where
let f1 = span.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx)); let f1 = span.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx));
let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> = let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> =
Box::pin(RxStream(rx).map(Ok)); Box::pin(RxStream(rx));
req.set_payload(payload.into()); req.set_payload(payload.into());
req.extensions_mut().insert(DigestVerified); req.extensions_mut().insert(DigestVerified);
let f2 = self.0.call(req); let f2 = self.0.call(req);
Box::pin(async move { Box::pin(async move {
let (_, res) = futures_util::future::join(f1, f2).await; let handle1 = actix_web::rt::spawn(f1);
let handle2 = actix_web::rt::spawn(f2);
handle1.await.expect("verify panic")?;
let res = handle2.await.expect("inner panic");
res res
}) })
} else if self.2 { } else if self.2 {
@ -235,15 +238,16 @@ async fn verify_payload<T, Spawner>(
spawner: Spawner, spawner: Spawner,
vec: Vec<DigestPart>, vec: Vec<DigestPart>,
mut verify_digest: T, mut verify_digest: T,
mut payload: Payload, payload: Payload,
tx: mpsc::Sender<web::Bytes>, tx: mpsc::Sender<web::Bytes>,
) -> Result<(), actix_web::Error> ) -> Result<(), actix_web::Error>
where where
T: DigestVerify + Clone + Send + 'static, T: DigestVerify + Clone + Send + 'static,
Spawner: Spawn, Spawner: Spawn,
{ {
while let Some(res) = payload.next().await { let mut payload = payload.into_streamer();
let bytes = res?;
while let Some(bytes) = payload.try_next().await? {
let bytes2 = bytes.clone(); let bytes2 = bytes.clone();
verify_digest = spawner verify_digest = spawner
.spawn_blocking(move || { .spawn_blocking(move || {

View file

@ -7,7 +7,7 @@ use actix_web::{
http::StatusCode, http::StatusCode,
Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
}; };
use futures_util::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use std::{ use std::{
collections::HashSet, collections::HashSet,
future::{ready, Ready}, future::{ready, Ready},