From 6acd291315c7414ca37508f75b6d992383e78b13 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 9 Sep 2023 17:17:09 -0400 Subject: [PATCH] replace futures-util with streem --- actix/Cargo.toml | 3 ++- actix/src/digest/middleware.rs | 26 +++++++++++++++----------- actix/src/middleware.rs | 2 +- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/actix/Cargo.toml b/actix/Cargo.toml index 05c3b45..5b68137 100644 --- a/actix/Cargo.toml +++ b/actix/Cargo.toml @@ -33,11 +33,12 @@ actix-rt = "2.6.0" actix-web = { version = "4.0.0", default-features = false, optional = true } awc = { version = "3.0.0", default-features = false, optional = true } base64 = { version = "0.13", optional = true } -futures-util = { version = "0.3", default-features = false } +futures-core = "0.3.28" http-signature-normalization = { version = "0.7.0", path = ".." } ring = { version = "0.16.20", optional = true } sha2 = { version = "0.10", optional = true } sha3 = { version = "0.10", optional = true } +streem = "0.1.1" thiserror = "1.0" tokio = { version = "1", default-features = false, features = ["sync"] } tracing = "0.1" diff --git a/actix/src/digest/middleware.rs b/actix/src/digest/middleware.rs index 18c8669..b4795a8 100644 --- a/actix/src/digest/middleware.rs +++ b/actix/src/digest/middleware.rs @@ -10,15 +10,13 @@ use actix_web::{ http::{header::HeaderValue, StatusCode}, web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, }; -use futures_util::{ - future::LocalBoxFuture, - stream::{Stream, StreamExt}, -}; +use futures_core::{future::LocalBoxFuture, Stream}; use std::{ future::{ready, Ready}, pin::Pin, task::{Context, Poll}, }; +use streem::IntoStreamer; use tokio::sync::mpsc; use tracing::{debug, Span}; use tracing_error::SpanTrace; @@ -87,10 +85,10 @@ enum VerifyErrorKind { struct RxStream(mpsc::Receiver); impl Stream for RxStream { - type Item = T; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_recv(cx) + Pin::new(&mut self.0).poll_recv(cx).map(|opt| opt.map(Ok)) } } @@ -208,14 +206,19 @@ where let f1 = span.in_scope(|| verify_payload(spawner, vec, self.3.clone(), payload, tx)); let payload: Pin> + 'static>> = - Box::pin(RxStream(rx).map(Ok)); + Box::pin(RxStream(rx)); req.set_payload(payload.into()); req.extensions_mut().insert(DigestVerified); let f2 = self.0.call(req); Box::pin(async move { - let (_, res) = futures_util::future::join(f1, f2).await; + let handle1 = actix_web::rt::spawn(f1); + let handle2 = actix_web::rt::spawn(f2); + + handle1.await.expect("verify panic")?; + let res = handle2.await.expect("inner panic"); + res }) } else if self.2 { @@ -235,15 +238,16 @@ async fn verify_payload( spawner: Spawner, vec: Vec, mut verify_digest: T, - mut payload: Payload, + payload: Payload, tx: mpsc::Sender, ) -> Result<(), actix_web::Error> where T: DigestVerify + Clone + Send + 'static, Spawner: Spawn, { - while let Some(res) = payload.next().await { - let bytes = res?; + let mut payload = payload.into_streamer(); + + while let Some(bytes) = payload.try_next().await? { let bytes2 = bytes.clone(); verify_digest = spawner .spawn_blocking(move || { diff --git a/actix/src/middleware.rs b/actix/src/middleware.rs index 693ead4..af52689 100644 --- a/actix/src/middleware.rs +++ b/actix/src/middleware.rs @@ -7,7 +7,7 @@ use actix_web::{ http::StatusCode, Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, }; -use futures_util::future::LocalBoxFuture; +use futures_core::future::LocalBoxFuture; use std::{ collections::HashSet, future::{ready, Ready},