From 9e7a6fe57bfffc36d199c3e729f3470d10a381aa Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 13 Mar 2023 13:31:48 +0000 Subject: [PATCH] add `body::to_bytes_limited` (#3000 * add body::to_body_limit * rename to_bytes_limited --- actix-http/CHANGES.md | 5 ++ actix-http/src/body/mod.rs | 2 +- actix-http/src/body/utils.rs | 141 ++++++++++++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 13 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 831a0bcd0..aaf84d765 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,6 +2,11 @@ ## Unreleased - 2023-xx-xx +### Added + +- Add `body::to_body_limit()` function. +- Add `body::BodyLimitExceeded` error type. + ## 3.3.1 - 2023-03-02 ### Fixed diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index 0fb090eb5..d1708b9d5 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -22,4 +22,4 @@ pub(crate) use self::message_body::MessageBodyMapErr; pub use self::none::None; pub use self::size::BodySize; pub use self::sized_stream::SizedStream; -pub use self::utils::to_bytes; +pub use self::utils::{to_bytes, to_bytes_limited, BodyLimitExceeded}; diff --git a/actix-http/src/body/utils.rs b/actix-http/src/body/utils.rs index 0a6fb0c15..c741eefd2 100644 --- a/actix-http/src/body/utils.rs +++ b/actix-http/src/body/utils.rs @@ -7,71 +7,188 @@ use futures_core::ready; use super::{BodySize, MessageBody}; -/// Collects the body produced by a `MessageBody` implementation into `Bytes`. +/// Collects all the bytes produced by `body`. /// /// Any errors produced by the body stream are returned immediately. /// +/// Consider using [`to_bytes_limited`] instead to protect against memory exhaustion. +/// /// # Examples +/// /// ``` /// use actix_http::body::{self, to_bytes}; /// use bytes::Bytes; /// -/// # async fn test_to_bytes() { +/// # actix_rt::System::new().block_on(async { /// let body = body::None::new(); /// let bytes = to_bytes(body).await.unwrap(); /// assert!(bytes.is_empty()); /// /// let body = Bytes::from_static(b"123"); /// let bytes = to_bytes(body).await.unwrap(); -/// assert_eq!(bytes, b"123"[..]); -/// # } +/// assert_eq!(bytes, "123"); +/// # }); /// ``` pub async fn to_bytes(body: B) -> Result { + to_bytes_limited(body, usize::MAX) + .await + .expect("body should never overflow usize::MAX") +} + +/// Error type returned from [`to_bytes_limited`] when body produced exceeds limit. +#[derive(Debug)] +#[non_exhaustive] +pub struct BodyLimitExceeded; + +/// Collects the bytes produced by `body`, up to `limit` bytes. +/// +/// If a chunk read from `poll_next` causes the total number of bytes read to exceed `limit`, an +/// `Err(BodyLimitExceeded)` is returned. +/// +/// Any errors produced by the body stream are returned immediately as `Ok(Err(B::Error))`. +/// +/// # Examples +/// +/// ``` +/// use actix_http::body::{self, to_bytes_limited}; +/// use bytes::Bytes; +/// +/// # actix_rt::System::new().block_on(async { +/// let body = body::None::new(); +/// let bytes = to_bytes_limited(body, 10).await.unwrap().unwrap(); +/// assert!(bytes.is_empty()); +/// +/// let body = Bytes::from_static(b"123"); +/// let bytes = to_bytes_limited(body, 10).await.unwrap().unwrap(); +/// assert_eq!(bytes, "123"); +/// +/// let body = Bytes::from_static(b"123"); +/// assert!(to_bytes_limited(body, 2).await.is_err()); +/// # }); +/// ``` +pub async fn to_bytes_limited( + body: B, + limit: usize, +) -> Result, BodyLimitExceeded> { let cap = match body.size() { - BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()), + BodySize::None | BodySize::Sized(0) => return Ok(Ok(Bytes::new())), + BodySize::Sized(size) if size as usize > limit => return Err(BodyLimitExceeded), BodySize::Sized(size) => size as usize, // good enough first guess for chunk size BodySize::Stream => 32_768, }; + let mut exceeded_limit = false; let mut buf = BytesMut::with_capacity(cap); pin!(body); - poll_fn(|cx| loop { + match poll_fn(|cx| loop { let body = body.as_mut(); match ready!(body.poll_next(cx)) { - Some(Ok(bytes)) => buf.extend_from_slice(&bytes), + Some(Ok(bytes)) => { + // if limit is exceeded... + if buf.len() + bytes.len() > limit { + // ...set flag to true and break out of poll_fn + exceeded_limit = true; + return Poll::Ready(Ok(())); + } + + buf.extend_from_slice(&bytes) + } None => return Poll::Ready(Ok(())), Some(Err(err)) => return Poll::Ready(Err(err)), } }) - .await?; + .await + { + // propagate error returned from body poll + Err(err) => Ok(Err(err)), - Ok(buf.freeze()) + // limit was exceeded while reading body + Ok(()) if exceeded_limit => Err(BodyLimitExceeded), + + // otherwise return body buffer + Ok(()) => Ok(Ok(buf.freeze())), + } } #[cfg(test)] -mod test { +mod tests { + use std::io; + use futures_util::{stream, StreamExt as _}; use super::*; - use crate::{body::BodyStream, Error}; + use crate::{ + body::{BodyStream, SizedStream}, + Error, + }; #[actix_rt::test] - async fn test_to_bytes() { + async fn to_bytes_complete() { let bytes = to_bytes(()).await.unwrap(); assert!(bytes.is_empty()); let body = Bytes::from_static(b"123"); let bytes = to_bytes(body).await.unwrap(); assert_eq!(bytes, b"123"[..]); + } + #[actix_rt::test] + async fn to_bytes_streams() { let stream = stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")]) .map(Ok::<_, Error>); let body = BodyStream::new(stream); let bytes = to_bytes(body).await.unwrap(); assert_eq!(bytes, b"123abc"[..]); } + + #[actix_rt::test] + async fn to_bytes_limited_complete() { + let bytes = to_bytes_limited((), 0).await.unwrap().unwrap(); + assert!(bytes.is_empty()); + + let bytes = to_bytes_limited((), 1).await.unwrap().unwrap(); + assert!(bytes.is_empty()); + + assert!(to_bytes_limited(Bytes::from_static(b"12"), 0) + .await + .is_err()); + assert!(to_bytes_limited(Bytes::from_static(b"12"), 1) + .await + .is_err()); + assert!(to_bytes_limited(Bytes::from_static(b"12"), 2).await.is_ok()); + assert!(to_bytes_limited(Bytes::from_static(b"12"), 3).await.is_ok()); + } + + #[actix_rt::test] + async fn to_bytes_limited_streams() { + // hinting a larger body fails + let body = SizedStream::new(8, stream::empty().map(Ok::<_, Error>)); + assert!(to_bytes_limited(body, 3).await.is_err()); + + // hinting a smaller body is okay + let body = SizedStream::new(3, stream::empty().map(Ok::<_, Error>)); + assert!(to_bytes_limited(body, 3).await.unwrap().unwrap().is_empty()); + + // hinting a smaller body then returning a larger one fails + let stream = stream::iter(vec![Bytes::from_static(b"1234")]).map(Ok::<_, Error>); + let body = SizedStream::new(3, stream); + assert!(to_bytes_limited(body, 3).await.is_err()); + + let stream = stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")]) + .map(Ok::<_, Error>); + let body = BodyStream::new(stream); + assert!(to_bytes_limited(body, 3).await.is_err()); + } + + #[actix_rt::test] + async fn to_body_limit_error() { + let err_stream = stream::once(async { Err(io::Error::new(io::ErrorKind::Other, "")) }); + let body = SizedStream::new(8, err_stream); + // not too big, but propagates error from body stream + assert!(to_bytes_limited(body, 10).await.unwrap().is_err()); + } }