mirror of
https://github.com/actix/actix-web.git
synced 2024-11-29 21:11:17 +00:00
simplify MessageBody::complete_body interface (#2522)
This commit is contained in:
parent
2cf27863cb
commit
57ea322ce5
7 changed files with 149 additions and 292 deletions
|
@ -8,10 +8,16 @@ use std::{
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
|
||||||
use super::{BodySize, MessageBody, MessageBodyMapErr};
|
use super::{BodySize, MessageBody, MessageBodyMapErr};
|
||||||
use crate::Error;
|
use crate::body;
|
||||||
|
|
||||||
/// A boxed message body with boxed errors.
|
/// A boxed message body with boxed errors.
|
||||||
pub struct BoxBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>);
|
pub struct BoxBody(BoxBodyInner);
|
||||||
|
|
||||||
|
enum BoxBodyInner {
|
||||||
|
None(body::None),
|
||||||
|
Bytes(Bytes),
|
||||||
|
Stream(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>),
|
||||||
|
}
|
||||||
|
|
||||||
impl BoxBody {
|
impl BoxBody {
|
||||||
/// Same as `MessageBody::boxed`.
|
/// Same as `MessageBody::boxed`.
|
||||||
|
@ -23,29 +29,42 @@ impl BoxBody {
|
||||||
where
|
where
|
||||||
B: MessageBody + 'static,
|
B: MessageBody + 'static,
|
||||||
{
|
{
|
||||||
let body = MessageBodyMapErr::new(body, Into::into);
|
match body.size() {
|
||||||
Self(Box::pin(body))
|
BodySize::None => Self(BoxBodyInner::None(body::None)),
|
||||||
|
_ => match body.try_into_bytes() {
|
||||||
|
Ok(bytes) => Self(BoxBodyInner::Bytes(bytes)),
|
||||||
|
Err(body) => {
|
||||||
|
let body = MessageBodyMapErr::new(body, Into::into);
|
||||||
|
Self(BoxBodyInner::Stream(Box::pin(body)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a mutable pinned reference to the inner message body type.
|
/// Returns a mutable pinned reference to the inner message body type.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError>>)> {
|
pub fn as_pin_mut(&mut self) -> Pin<&mut Self> {
|
||||||
self.0.as_mut()
|
Pin::new(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for BoxBody {
|
impl fmt::Debug for BoxBody {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
// TODO show BoxBodyInner
|
||||||
f.write_str("BoxBody(dyn MessageBody)")
|
f.write_str("BoxBody(dyn MessageBody)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for BoxBody {
|
impl MessageBody for BoxBody {
|
||||||
type Error = Error;
|
type Error = Box<dyn StdError>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn size(&self) -> BodySize {
|
fn size(&self) -> BodySize {
|
||||||
self.0.size()
|
match &self.0 {
|
||||||
|
BoxBodyInner::None(none) => none.size(),
|
||||||
|
BoxBodyInner::Bytes(bytes) => bytes.size(),
|
||||||
|
BoxBodyInner::Stream(stream) => stream.size(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -53,20 +72,20 @@ impl MessageBody for BoxBody {
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
self.0
|
match &mut self.0 {
|
||||||
.as_mut()
|
BoxBodyInner::None(_) => Poll::Ready(None),
|
||||||
.poll_next(cx)
|
BoxBodyInner::Bytes(bytes) => Pin::new(bytes).poll_next(cx).map_err(Into::into),
|
||||||
.map_err(|err| Error::new_body().with_cause(err))
|
BoxBodyInner::Stream(stream) => Pin::new(stream).poll_next(cx),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
self.0.is_complete_body()
|
match self.0 {
|
||||||
}
|
BoxBodyInner::None(none) => Ok(none.try_into_bytes().unwrap()),
|
||||||
|
BoxBodyInner::Bytes(bytes) => Ok(bytes.try_into_bytes().unwrap()),
|
||||||
#[inline]
|
_ => Err(self),
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
}
|
||||||
self.0.take_complete_body()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|
|
@ -74,18 +74,14 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
match self {
|
match self {
|
||||||
EitherBody::Left { body } => body.is_complete_body(),
|
EitherBody::Left { body } => body
|
||||||
EitherBody::Right { body } => body.is_complete_body(),
|
.try_into_bytes()
|
||||||
}
|
.map_err(|body| EitherBody::Left { body }),
|
||||||
}
|
EitherBody::Right { body } => body
|
||||||
|
.try_into_bytes()
|
||||||
#[inline]
|
.map_err(|body| EitherBody::Right { body }),
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
match self {
|
|
||||||
EitherBody::Left { body } => body.take_complete_body(),
|
|
||||||
EitherBody::Right { body } => body.take_complete_body(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,51 +31,14 @@ pub trait MessageBody {
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>>;
|
) -> Poll<Option<Result<Bytes, Self::Error>>>;
|
||||||
|
|
||||||
/// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`.
|
/// Convert this body into `Bytes`.
|
||||||
///
|
///
|
||||||
/// This method's implementation should agree with [`take_complete_body`] and should always be
|
/// Bodies with `BodySize::None` are allowed to return empty `Bytes`.
|
||||||
/// checked before taking the body.
|
fn try_into_bytes(self) -> Result<Bytes, Self>
|
||||||
///
|
where
|
||||||
/// The default implementation returns `false.
|
Self: Sized,
|
||||||
///
|
{
|
||||||
/// [`take_complete_body`]: MessageBody::take_complete_body
|
Err(self)
|
||||||
fn is_complete_body(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the complete chunk of body bytes.
|
|
||||||
///
|
|
||||||
/// Implementors of this method should note the following:
|
|
||||||
/// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of
|
|
||||||
/// performing this check is delegated to the caller.
|
|
||||||
/// - If the result of [`is_complete_body`] is conditional, that condition should be given
|
|
||||||
/// equivalent attention here.
|
|
||||||
/// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic.
|
|
||||||
/// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless
|
|
||||||
/// the chunk is guaranteed to be empty.
|
|
||||||
///
|
|
||||||
/// The default implementation panics unconditionally, indicating a control flow bug in the
|
|
||||||
/// calling code.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
/// With a correct implementation, panics if called without first checking [`is_complete_body`].
|
|
||||||
///
|
|
||||||
/// [`is_complete_body`]: MessageBody::is_complete_body
|
|
||||||
/// [`take_complete_body`]: MessageBody::take_complete_body
|
|
||||||
/// [`poll_next`]: MessageBody::poll_next
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
assert!(
|
|
||||||
self.is_complete_body(),
|
|
||||||
"type ({}) allows taking complete body but did not provide an implementation \
|
|
||||||
of `take_complete_body`",
|
|
||||||
std::any::type_name::<Self>()
|
|
||||||
);
|
|
||||||
|
|
||||||
unimplemented!(
|
|
||||||
"type ({}) does not allow taking complete body; caller should make sure to \
|
|
||||||
check `is_complete_body` first",
|
|
||||||
std::any::type_name::<Self>()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Converts this body into `BoxBody`.
|
/// Converts this body into `BoxBody`.
|
||||||
|
@ -104,14 +67,6 @@ mod foreign_impls {
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
match *self {}
|
match *self {}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_complete_body(&self) -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
match *self {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for () {
|
impl MessageBody for () {
|
||||||
|
@ -131,13 +86,8 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::new())
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::new()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,16 +109,6 @@ mod foreign_impls {
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
Pin::new(self.get_mut().as_mut()).poll_next(cx)
|
Pin::new(self.get_mut().as_mut()).poll_next(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn is_complete_body(&self) -> bool {
|
|
||||||
self.as_ref().is_complete_body()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
self.as_mut().take_complete_body()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B> MessageBody for Pin<Box<B>>
|
impl<B> MessageBody for Pin<Box<B>>
|
||||||
|
@ -189,38 +129,6 @@ mod foreign_impls {
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
self.get_mut().as_mut().poll_next(cx)
|
self.get_mut().as_mut().poll_next(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn is_complete_body(&self) -> bool {
|
|
||||||
self.as_ref().is_complete_body()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
debug_assert!(
|
|
||||||
self.is_complete_body(),
|
|
||||||
"inner type \"{}\" does not allow taking complete body; caller should make sure to \
|
|
||||||
call `is_complete_body` first",
|
|
||||||
std::any::type_name::<B>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// we do not have DerefMut access to call take_complete_body directly but since
|
|
||||||
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
|
|
||||||
|
|
||||||
let waker = futures_task::noop_waker();
|
|
||||||
let mut cx = Context::from_waker(&waker);
|
|
||||||
|
|
||||||
match self.as_mut().poll_next(&mut cx) {
|
|
||||||
Poll::Ready(Some(Ok(data))) => data,
|
|
||||||
_ => {
|
|
||||||
panic!(
|
|
||||||
"inner type \"{}\" indicated it allows taking complete body but failed to \
|
|
||||||
return Bytes when polled",
|
|
||||||
std::any::type_name::<B>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for &'static [u8] {
|
impl MessageBody for &'static [u8] {
|
||||||
|
@ -232,24 +140,19 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
mut self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Some(Ok(self.take_complete_body())))
|
Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut())))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::from_static(self))
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::from_static(mem::take(self))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,24 +165,19 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
mut self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Some(Ok(self.take_complete_body())))
|
Poll::Ready(Some(Ok(mem::take(self.get_mut()))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(self)
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
mem::take(self)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,24 +190,19 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
mut self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Some(Ok(self.take_complete_body())))
|
Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(self.freeze())
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
mem::take(self).freeze()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,24 +215,19 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
mut self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_cx: &mut Context<'_>,
|
_cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(Some(Ok(self.take_complete_body())))
|
Poll::Ready(Some(Ok(mem::take(self.get_mut()).into())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::from(self))
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::from(mem::take(self))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,13 +253,8 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::from_static(self.as_bytes()))
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::from_static(mem::take(self).as_bytes())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,13 +279,8 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::from(self))
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::from(mem::take(self))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,13 +301,8 @@ mod foreign_impls {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(self.into_bytes())
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
mem::take(self).into_bytes()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -486,13 +359,9 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
self.body.is_complete_body()
|
let Self { body, mapper } = self;
|
||||||
}
|
body.try_into_bytes().map_err(|body| Self { body, mapper })
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
self.body.take_complete_body()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,6 +372,7 @@ mod tests {
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::body::{self, EitherBody};
|
||||||
|
|
||||||
macro_rules! assert_poll_next {
|
macro_rules! assert_poll_next {
|
||||||
($pin:expr, $exp:expr) => {
|
($pin:expr, $exp:expr) => {
|
||||||
|
@ -604,70 +474,45 @@ mod tests {
|
||||||
assert_poll_next!(pl, Bytes::from("test"));
|
assert_poll_next!(pl, Bytes::from("test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[actix_rt::test]
|
||||||
fn take_string() {
|
async fn complete_body_combinators() {
|
||||||
let mut data = "test".repeat(2);
|
let body = Bytes::from_static(b"test");
|
||||||
let data_bytes = Bytes::from(data.clone());
|
let body = BoxBody::new(body);
|
||||||
assert!(data.is_complete_body());
|
let body = EitherBody::<_, ()>::left(body);
|
||||||
assert_eq!(data.take_complete_body(), data_bytes);
|
let body = EitherBody::<(), _>::right(body);
|
||||||
|
// Do not support try_into_bytes:
|
||||||
let mut big_data = "test".repeat(64 * 1024);
|
// let body = Box::new(body);
|
||||||
let data_bytes = Bytes::from(big_data.clone());
|
// let body = Box::pin(body);
|
||||||
assert!(big_data.is_complete_body());
|
|
||||||
assert_eq!(big_data.take_complete_body(), data_bytes);
|
assert_eq!(body.try_into_bytes().unwrap(), Bytes::from("test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[actix_rt::test]
|
||||||
fn take_boxed_equivalence() {
|
async fn complete_body_combinators_poll() {
|
||||||
let mut data = Bytes::from_static(b"test");
|
|
||||||
assert!(data.is_complete_body());
|
|
||||||
assert_eq!(data.take_complete_body(), b"test".as_ref());
|
|
||||||
|
|
||||||
let mut data = Box::new(Bytes::from_static(b"test"));
|
|
||||||
assert!(data.is_complete_body());
|
|
||||||
assert_eq!(data.take_complete_body(), b"test".as_ref());
|
|
||||||
|
|
||||||
let mut data = Box::pin(Bytes::from_static(b"test"));
|
|
||||||
assert!(data.is_complete_body());
|
|
||||||
assert_eq!(data.take_complete_body(), b"test".as_ref());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn take_policy() {
|
|
||||||
let mut data = Bytes::from_static(b"test");
|
|
||||||
// first call returns chunk
|
|
||||||
assert_eq!(data.take_complete_body(), b"test".as_ref());
|
|
||||||
// second call returns empty
|
|
||||||
assert_eq!(data.take_complete_body(), b"".as_ref());
|
|
||||||
|
|
||||||
let waker = futures_task::noop_waker();
|
|
||||||
let mut cx = Context::from_waker(&waker);
|
|
||||||
let mut data = Bytes::from_static(b"test");
|
|
||||||
// take returns whole chunk
|
|
||||||
assert_eq!(data.take_complete_body(), b"test".as_ref());
|
|
||||||
// subsequent poll_next returns None
|
|
||||||
assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn complete_body_combinators() {
|
|
||||||
use crate::body::{BoxBody, EitherBody};
|
|
||||||
|
|
||||||
let body = Bytes::from_static(b"test");
|
let body = Bytes::from_static(b"test");
|
||||||
let body = BoxBody::new(body);
|
let body = BoxBody::new(body);
|
||||||
let body = EitherBody::<_, ()>::left(body);
|
let body = EitherBody::<_, ()>::left(body);
|
||||||
let body = EitherBody::<(), _>::right(body);
|
let body = EitherBody::<(), _>::right(body);
|
||||||
let body = Box::new(body);
|
|
||||||
let body = Box::pin(body);
|
|
||||||
let mut body = body;
|
let mut body = body;
|
||||||
|
|
||||||
assert!(body.is_complete_body());
|
assert_eq!(body.size(), BodySize::Sized(4));
|
||||||
assert_eq!(body.take_complete_body(), b"test".as_ref());
|
assert_poll_next!(Pin::new(&mut body), Bytes::from("test"));
|
||||||
|
assert_poll_next_none!(Pin::new(&mut body));
|
||||||
|
}
|
||||||
|
|
||||||
// subsequent poll_next returns None
|
#[actix_rt::test]
|
||||||
let waker = futures_task::noop_waker();
|
async fn none_body_combinators() {
|
||||||
let mut cx = Context::from_waker(&waker);
|
fn none_body() -> BoxBody {
|
||||||
assert!(Pin::new(&mut body).poll_next(&mut cx).map_err(drop) == Poll::Ready(None));
|
let body = body::None;
|
||||||
|
let body = BoxBody::new(body);
|
||||||
|
let body = EitherBody::<_, ()>::left(body);
|
||||||
|
let body = EitherBody::<(), _>::right(body);
|
||||||
|
body.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(none_body().size(), BodySize::None);
|
||||||
|
assert_eq!(none_body().try_into_bytes().unwrap(), Bytes::new());
|
||||||
|
assert_poll_next_none!(Pin::new(&mut none_body()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// down-casting used to be done with a method on MessageBody trait
|
// down-casting used to be done with a method on MessageBody trait
|
||||||
|
|
|
@ -42,12 +42,7 @@ impl MessageBody for None {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self> {
|
||||||
true
|
Ok(Bytes::new())
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
Bytes::new()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self {
|
pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
|
||||||
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|
||||||
|| head.status == StatusCode::SWITCHING_PROTOCOLS
|
|| head.status == StatusCode::SWITCHING_PROTOCOLS
|
||||||
|| head.status == StatusCode::NO_CONTENT
|
|| head.status == StatusCode::NO_CONTENT
|
||||||
|
@ -65,11 +65,9 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
return Self::none();
|
return Self::none();
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = if body.is_complete_body() {
|
let body = match body.try_into_bytes() {
|
||||||
let body = body.take_complete_body();
|
Ok(body) => EncoderBody::Full { body },
|
||||||
EncoderBody::Full { body }
|
Err(body) => EncoderBody::Stream { body },
|
||||||
} else {
|
|
||||||
EncoderBody::Stream { body }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if can_encode {
|
if can_encode {
|
||||||
|
@ -133,21 +131,14 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(self) -> Result<Bytes, Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
match self {
|
match self {
|
||||||
EncoderBody::None => true,
|
EncoderBody::None => Ok(Bytes::new()),
|
||||||
EncoderBody::Full { .. } => true,
|
EncoderBody::Full { body } => Ok(body),
|
||||||
EncoderBody::Stream { .. } => false,
|
_ => Err(self),
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
|
||||||
match self {
|
|
||||||
EncoderBody::None => Bytes::new(),
|
|
||||||
EncoderBody::Full { body } => body.take_complete_body(),
|
|
||||||
EncoderBody::Stream { .. } => {
|
|
||||||
panic!("EncoderBody::Stream variant cannot be taken")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -234,19 +225,20 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_complete_body(&self) -> bool {
|
fn try_into_bytes(mut self) -> Result<Bytes, Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
if self.encoder.is_some() {
|
if self.encoder.is_some() {
|
||||||
false
|
Err(self)
|
||||||
} else {
|
} else {
|
||||||
self.body.is_complete_body()
|
match self.body.try_into_bytes() {
|
||||||
}
|
Ok(body) => Ok(body),
|
||||||
}
|
Err(body) => {
|
||||||
|
self.body = body;
|
||||||
fn take_complete_body(&mut self) -> Bytes {
|
Err(self)
|
||||||
if self.encoder.is_some() {
|
}
|
||||||
panic!("compressed body stream cannot be taken")
|
}
|
||||||
} else {
|
|
||||||
self.body.take_complete_body()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ use crate::{
|
||||||
config::ServiceConfig,
|
config::ServiceConfig,
|
||||||
error::{DispatchError, ParseError, PayloadError},
|
error::{DispatchError, ParseError, PayloadError},
|
||||||
service::HttpFlow,
|
service::HttpFlow,
|
||||||
Extensions, OnConnectData, Request, Response, StatusCode,
|
Error, Extensions, OnConnectData, Request, Response, StatusCode,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
|
@ -494,7 +494,9 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Ready(Some(Err(err))) => {
|
Poll::Ready(Some(Err(err))) => {
|
||||||
return Err(DispatchError::Body(err.into()))
|
return Err(DispatchError::Body(
|
||||||
|
Error::new_body().with_cause(err).into(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending => return Ok(PollResponse::DoNothing),
|
Poll::Pending => return Ok(PollResponse::DoNothing),
|
||||||
|
|
|
@ -139,4 +139,12 @@ impl crate::body::MessageBody for AnyBody {
|
||||||
AnyBody::Boxed { body } => body.as_pin_mut().poll_next(cx),
|
AnyBody::Boxed { body } => body.as_pin_mut().poll_next(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn try_into_bytes(self) -> Result<crate::web::Bytes, Self> {
|
||||||
|
match self {
|
||||||
|
AnyBody::None => Ok(crate::web::Bytes::new()),
|
||||||
|
AnyBody::Full { body } => Ok(body),
|
||||||
|
_ => Err(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue