From d9b31b80ac4c3e58444248b5e5b531fae557af14 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 4 Feb 2024 03:11:48 +0000 Subject: [PATCH] fix: standardize body stream error reporting --- actix-http/examples/h2c-detect.rs | 9 +++++++-- actix-http/src/h1/dispatcher.rs | 5 ++++- actix-http/src/h2/dispatcher.rs | 15 ++++++++------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/actix-http/examples/h2c-detect.rs b/actix-http/examples/h2c-detect.rs index aa3dd5d31..b0bde3fe6 100644 --- a/actix-http/examples/h2c-detect.rs +++ b/actix-http/examples/h2c-detect.rs @@ -8,7 +8,7 @@ use std::{convert::Infallible, io}; -use actix_http::{HttpService, Request, Response, StatusCode}; +use actix_http::{body::BodyStream, HttpService, Request, Response, StatusCode}; use actix_server::Server; #[tokio::main(flavor = "current_thread")] @@ -19,7 +19,12 @@ async fn main() -> io::Result<()> { .bind("h2c-detect", ("127.0.0.1", 8080), || { HttpService::build() .finish(|_req: Request| async move { - Ok::<_, Infallible>(Response::build(StatusCode::OK).body("Hello!")) + Ok::<_, Infallible>(Response::build(StatusCode::OK).body(BodyStream::new( + futures_util::stream::iter([ + Ok::<_, String>("123".into()), + Err("wertyuikmnbvcxdfty6t".to_owned()), + ]), + ))) }) .tcp_auto_h2c() })? diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 270707807..bfbcaf24c 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -512,8 +512,10 @@ where } Poll::Ready(Some(Err(err))) => { + let err = err.into(); + tracing::error!("Response payload stream error: {err:?}"); this.flags.insert(Flags::FINISHED); - return Err(DispatchError::Body(err.into())); + return Err(DispatchError::Body(err)); } Poll::Pending => return Ok(PollResponse::DoNothing), @@ -549,6 +551,7 @@ where } Poll::Ready(Some(Err(err))) => { + tracing::error!("Response payload stream error: {err:?}"); this.flags.insert(Flags::FINISHED); return Err(DispatchError::Body( Error::new_body().with_cause(err).into(), diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 3e618820e..022239c2d 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -4,7 +4,7 @@ use std::{ future::Future, marker::PhantomData, net, - pin::Pin, + pin::{pin, Pin}, rc::Rc, task::{Context, Poll}, }; @@ -20,7 +20,6 @@ use h2::{ Ping, PingPong, }; use pin_project_lite::pin_project; -use tracing::{error, trace, warn}; use crate::{ body::{BodySize, BoxBody, MessageBody}, @@ -147,11 +146,13 @@ where if let Err(err) = res { match err { DispatchError::SendResponse(err) => { - trace!("Error sending HTTP/2 response: {:?}", err) + tracing::trace!("Error sending response: {err:?}"); + } + DispatchError::SendData(err) => { + tracing::warn!("Send data error: {err:?}"); } - DispatchError::SendData(err) => warn!("{:?}", err), DispatchError::ResponseBody(err) => { - error!("Response payload stream error: {:?}", err) + tracing::error!("Response payload stream error: {err:?}"); } } } @@ -228,9 +229,9 @@ where return Ok(()); } - // poll response body and send chunks to client - actix_rt::pin!(body); + let mut body = pin!(body); + // poll response body and send chunks to client while let Some(res) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { let mut chunk = res.map_err(|err| DispatchError::ResponseBody(err.into()))?;