diff --git a/actix-http/src/body/boxed.rs b/actix-http/src/body/boxed.rs index 7581bec88..a2d7540c4 100644 --- a/actix-http/src/body/boxed.rs +++ b/actix-http/src/body/boxed.rs @@ -8,10 +8,16 @@ use std::{ use bytes::Bytes; use super::{BodySize, MessageBody, MessageBodyMapErr}; -use crate::Error; +use crate::body; /// A boxed message body with boxed errors. -pub struct BoxBody(Pin>>>); +pub struct BoxBody(BoxBodyInner); + +enum BoxBodyInner { + None(body::None), + Bytes(Bytes), + Stream(Pin>>>), +} impl BoxBody { /// Same as `MessageBody::boxed`. @@ -23,29 +29,42 @@ impl BoxBody { where B: MessageBody + 'static, { - let body = MessageBodyMapErr::new(body, Into::into); - Self(Box::pin(body)) + match body.size() { + BodySize::None => Self(BoxBodyInner::None(body::None)), + _ => match body.try_into_bytes() { + Ok(bytes) => Self(BoxBodyInner::Bytes(bytes)), + Err(body) => { + let body = MessageBodyMapErr::new(body, Into::into); + Self(BoxBodyInner::Stream(Box::pin(body))) + } + }, + } } /// Returns a mutable pinned reference to the inner message body type. #[inline] - pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody>)> { - self.0.as_mut() + pub fn as_pin_mut(&mut self) -> Pin<&mut Self> { + Pin::new(self) } } impl fmt::Debug for BoxBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO show BoxBodyInner f.write_str("BoxBody(dyn MessageBody)") } } impl MessageBody for BoxBody { - type Error = Error; + type Error = Box; #[inline] fn size(&self) -> BodySize { - self.0.size() + match &self.0 { + BoxBodyInner::None(none) => none.size(), + BoxBodyInner::Bytes(bytes) => bytes.size(), + BoxBodyInner::Stream(stream) => stream.size(), + } } #[inline] @@ -53,20 +72,20 @@ impl MessageBody for BoxBody { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.0 - .as_mut() - .poll_next(cx) - .map_err(|err| Error::new_body().with_cause(err)) + match &mut self.0 { + BoxBodyInner::None(_) => Poll::Ready(None), + BoxBodyInner::Bytes(bytes) => Pin::new(bytes).poll_next(cx).map_err(Into::into), + BoxBodyInner::Stream(stream) => Pin::new(stream).poll_next(cx), + } } #[inline] - fn is_complete_body(&self) -> bool { - self.0.is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.0.take_complete_body() + fn try_into_bytes(self) -> Result { + match self.0 { + BoxBodyInner::None(none) => Ok(none.try_into_bytes().unwrap()), + BoxBodyInner::Bytes(bytes) => Ok(bytes.try_into_bytes().unwrap()), + _ => Err(self), + } } #[inline] diff --git a/actix-http/src/body/either.rs b/actix-http/src/body/either.rs index 3a4082dc9..add1eab7c 100644 --- a/actix-http/src/body/either.rs +++ b/actix-http/src/body/either.rs @@ -74,18 +74,14 @@ where } #[inline] - fn is_complete_body(&self) -> bool { + fn try_into_bytes(self) -> Result { match self { - EitherBody::Left { body } => body.is_complete_body(), - EitherBody::Right { body } => body.is_complete_body(), - } - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - match self { - EitherBody::Left { body } => body.take_complete_body(), - EitherBody::Right { body } => body.take_complete_body(), + EitherBody::Left { body } => body + .try_into_bytes() + .map_err(|body| EitherBody::Left { body }), + EitherBody::Right { body } => body + .try_into_bytes() + .map_err(|body| EitherBody::Right { body }), } } diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 075ae7220..bd13e75ec 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -31,51 +31,14 @@ pub trait MessageBody { cx: &mut Context<'_>, ) -> Poll>>; - /// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`. + /// Convert this body into `Bytes`. /// - /// This method's implementation should agree with [`take_complete_body`] and should always be - /// checked before taking the body. - /// - /// The default implementation returns `false. - /// - /// [`take_complete_body`]: MessageBody::take_complete_body - fn is_complete_body(&self) -> bool { - false - } - - /// Returns the complete chunk of body bytes. - /// - /// Implementors of this method should note the following: - /// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of - /// performing this check is delegated to the caller. - /// - If the result of [`is_complete_body`] is conditional, that condition should be given - /// equivalent attention here. - /// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic. - /// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless - /// the chunk is guaranteed to be empty. - /// - /// The default implementation panics unconditionally, indicating a control flow bug in the - /// calling code. - /// - /// # Panics - /// With a correct implementation, panics if called without first checking [`is_complete_body`]. - /// - /// [`is_complete_body`]: MessageBody::is_complete_body - /// [`take_complete_body`]: MessageBody::take_complete_body - /// [`poll_next`]: MessageBody::poll_next - fn take_complete_body(&mut self) -> Bytes { - assert!( - self.is_complete_body(), - "type ({}) allows taking complete body but did not provide an implementation \ - of `take_complete_body`", - std::any::type_name::() - ); - - unimplemented!( - "type ({}) does not allow taking complete body; caller should make sure to \ - check `is_complete_body` first", - std::any::type_name::() - ); + /// Bodies with `BodySize::None` are allowed to return empty `Bytes`. + fn try_into_bytes(self) -> Result + where + Self: Sized, + { + Err(self) } /// Converts this body into `BoxBody`. @@ -104,14 +67,6 @@ mod foreign_impls { ) -> Poll>> { match *self {} } - - fn is_complete_body(&self) -> bool { - true - } - - fn take_complete_body(&mut self) -> Bytes { - match *self {} - } } impl MessageBody for () { @@ -131,13 +86,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::new() + fn try_into_bytes(self) -> Result { + Ok(Bytes::new()) } } @@ -159,16 +109,6 @@ mod foreign_impls { ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } - - #[inline] - fn is_complete_body(&self) -> bool { - self.as_ref().is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.as_mut().take_complete_body() - } } impl MessageBody for Pin> @@ -189,38 +129,6 @@ mod foreign_impls { ) -> Poll>> { self.get_mut().as_mut().poll_next(cx) } - - #[inline] - fn is_complete_body(&self) -> bool { - self.as_ref().is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - debug_assert!( - self.is_complete_body(), - "inner type \"{}\" does not allow taking complete body; caller should make sure to \ - call `is_complete_body` first", - std::any::type_name::(), - ); - - // we do not have DerefMut access to call take_complete_body directly but since - // is_complete_body is true we should expect the entire bytes chunk in one poll_next - - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - match self.as_mut().poll_next(&mut cx) { - Poll::Ready(Some(Ok(data))) => data, - _ => { - panic!( - "inner type \"{}\" indicated it allows taking complete body but failed to \ - return Bytes when polled", - std::any::type_name::() - ); - } - } - } } impl MessageBody for &'static [u8] { @@ -232,24 +140,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut()))))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from_static(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from_static(self)) } } @@ -262,24 +165,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut())))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self) + fn try_into_bytes(self) -> Result { + Ok(self) } } @@ -292,24 +190,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze()))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self).freeze() + fn try_into_bytes(self) -> Result { + Ok(self.freeze()) } } @@ -322,24 +215,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut()).into()))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from(self)) } } @@ -365,13 +253,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from_static(mem::take(self).as_bytes()) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from_static(self.as_bytes())) } } @@ -396,13 +279,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from(self)) } } @@ -423,13 +301,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self).into_bytes() + fn try_into_bytes(self) -> Result { + Ok(self.into_bytes()) } } } @@ -486,13 +359,9 @@ where } #[inline] - fn is_complete_body(&self) -> bool { - self.body.is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.body.take_complete_body() + fn try_into_bytes(self) -> Result { + let Self { body, mapper } = self; + body.try_into_bytes().map_err(|body| Self { body, mapper }) } } @@ -503,6 +372,7 @@ mod tests { use bytes::{Bytes, BytesMut}; use super::*; + use crate::body::{self, EitherBody}; macro_rules! assert_poll_next { ($pin:expr, $exp:expr) => { @@ -604,70 +474,45 @@ mod tests { assert_poll_next!(pl, Bytes::from("test")); } - #[test] - fn take_string() { - let mut data = "test".repeat(2); - let data_bytes = Bytes::from(data.clone()); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), data_bytes); - - let mut big_data = "test".repeat(64 * 1024); - let data_bytes = Bytes::from(big_data.clone()); - assert!(big_data.is_complete_body()); - assert_eq!(big_data.take_complete_body(), data_bytes); - } - - #[test] - fn take_boxed_equivalence() { - let mut data = Bytes::from_static(b"test"); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - - let mut data = Box::new(Bytes::from_static(b"test")); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - - let mut data = Box::pin(Bytes::from_static(b"test")); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - } - - #[test] - fn take_policy() { - let mut data = Bytes::from_static(b"test"); - // first call returns chunk - assert_eq!(data.take_complete_body(), b"test".as_ref()); - // second call returns empty - assert_eq!(data.take_complete_body(), b"".as_ref()); - - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - let mut data = Bytes::from_static(b"test"); - // take returns whole chunk - assert_eq!(data.take_complete_body(), b"test".as_ref()); - // subsequent poll_next returns None - assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None)); - } - - #[test] - fn complete_body_combinators() { - use crate::body::{BoxBody, EitherBody}; - + #[actix_rt::test] + async fn complete_body_combinators() { + let body = Bytes::from_static(b"test"); + let body = BoxBody::new(body); + let body = EitherBody::<_, ()>::left(body); + let body = EitherBody::<(), _>::right(body); + // Do not support try_into_bytes: + // let body = Box::new(body); + // let body = Box::pin(body); + + assert_eq!(body.try_into_bytes().unwrap(), Bytes::from("test")); + } + + #[actix_rt::test] + async fn complete_body_combinators_poll() { let body = Bytes::from_static(b"test"); let body = BoxBody::new(body); let body = EitherBody::<_, ()>::left(body); let body = EitherBody::<(), _>::right(body); - let body = Box::new(body); - let body = Box::pin(body); let mut body = body; - assert!(body.is_complete_body()); - assert_eq!(body.take_complete_body(), b"test".as_ref()); + assert_eq!(body.size(), BodySize::Sized(4)); + assert_poll_next!(Pin::new(&mut body), Bytes::from("test")); + assert_poll_next_none!(Pin::new(&mut body)); + } - // subsequent poll_next returns None - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - assert!(Pin::new(&mut body).poll_next(&mut cx).map_err(drop) == Poll::Ready(None)); + #[actix_rt::test] + async fn none_body_combinators() { + fn none_body() -> BoxBody { + let body = body::None; + let body = BoxBody::new(body); + let body = EitherBody::<_, ()>::left(body); + let body = EitherBody::<(), _>::right(body); + body.boxed() + } + + assert_eq!(none_body().size(), BodySize::None); + assert_eq!(none_body().try_into_bytes().unwrap(), Bytes::new()); + assert_poll_next_none!(Pin::new(&mut none_body())); } // down-casting used to be done with a method on MessageBody trait diff --git a/actix-http/src/body/none.rs b/actix-http/src/body/none.rs index bb494078f..0e7bbe5a9 100644 --- a/actix-http/src/body/none.rs +++ b/actix-http/src/body/none.rs @@ -42,12 +42,7 @@ impl MessageBody for None { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::new() + fn try_into_bytes(self) -> Result { + Ok(Bytes::new()) } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index fa294ab0d..70448a115 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -53,7 +53,7 @@ impl Encoder { } } - pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self { + pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self { let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) || head.status == StatusCode::SWITCHING_PROTOCOLS || head.status == StatusCode::NO_CONTENT @@ -65,11 +65,9 @@ impl Encoder { return Self::none(); } - let body = if body.is_complete_body() { - let body = body.take_complete_body(); - EncoderBody::Full { body } - } else { - EncoderBody::Stream { body } + let body = match body.try_into_bytes() { + Ok(body) => EncoderBody::Full { body }, + Err(body) => EncoderBody::Stream { body }, }; if can_encode { @@ -133,21 +131,14 @@ where } } - fn is_complete_body(&self) -> bool { + fn try_into_bytes(self) -> Result + where + Self: Sized, + { match self { - EncoderBody::None => true, - EncoderBody::Full { .. } => true, - EncoderBody::Stream { .. } => false, - } - } - - fn take_complete_body(&mut self) -> Bytes { - match self { - EncoderBody::None => Bytes::new(), - EncoderBody::Full { body } => body.take_complete_body(), - EncoderBody::Stream { .. } => { - panic!("EncoderBody::Stream variant cannot be taken") - } + EncoderBody::None => Ok(Bytes::new()), + EncoderBody::Full { body } => Ok(body), + _ => Err(self), } } } @@ -234,19 +225,20 @@ where } } - fn is_complete_body(&self) -> bool { + fn try_into_bytes(mut self) -> Result + where + Self: Sized, + { if self.encoder.is_some() { - false + Err(self) } else { - self.body.is_complete_body() - } - } - - fn take_complete_body(&mut self) -> Bytes { - if self.encoder.is_some() { - panic!("compressed body stream cannot be taken") - } else { - self.body.take_complete_body() + match self.body.try_into_bytes() { + Ok(body) => Ok(body), + Err(body) => { + self.body = body; + Err(self) + } + } } } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 472845e65..5c0cb64af 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -22,7 +22,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Extensions, OnConnectData, Request, Response, StatusCode, + Error, Extensions, OnConnectData, Request, Response, StatusCode, }; use super::{ @@ -494,7 +494,9 @@ where } Poll::Ready(Some(Err(err))) => { - return Err(DispatchError::Body(err.into())) + return Err(DispatchError::Body( + Error::new_body().with_cause(err).into(), + )) } Poll::Pending => return Ok(PollResponse::DoNothing), diff --git a/src/dev.rs b/src/dev.rs index d4a64985c..edcc158f8 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -139,4 +139,12 @@ impl crate::body::MessageBody for AnyBody { AnyBody::Boxed { body } => body.as_pin_mut().poll_next(cx), } } + + fn try_into_bytes(self) -> Result { + match self { + AnyBody::None => Ok(crate::web::Bytes::new()), + AnyBody::Full { body } => Ok(body), + _ => Err(self), + } + } }