mirror of
https://github.com/actix/actix-web.git
synced 2024-11-26 11:31:09 +00:00
make AnyBody
generic on Body
type (#2448)
This commit is contained in:
parent
13cf5a9e44
commit
d8cbb879dd
18 changed files with 283 additions and 210 deletions
10
CHANGES.md
10
CHANGES.md
|
@ -1,6 +1,16 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Changed
|
||||||
|
* Compress middleware's response type is now `AnyBody<Encoder<B>>`. [#2448]
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
* Relax `Unpin` bound on `S` (stream) parameter of `HttpResponseBuilder::streaming`. [#2448]
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
* `dev::ResponseBody` re-export; is function is replaced by the new `dev::AnyBody` enum. [#2446]
|
||||||
|
|
||||||
|
[#2423]: https://github.com/actix/actix-web/pull/2423
|
||||||
|
|
||||||
|
|
||||||
## 4.0.0-beta.11 - 2021-11-15
|
## 4.0.0-beta.11 - 2021-11-15
|
||||||
|
|
|
@ -2,14 +2,23 @@
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
### Added
|
### Added
|
||||||
* `AnyBody::empty` for quickly creating an empty body. [#2446]
|
* `body::AnyBody::empty` for quickly creating an empty body. [#2446]
|
||||||
|
* `impl Clone` for `body::AnyBody<S> where S: Clone`. [#2448]
|
||||||
|
* `body::AnyBody::into_boxed` for quickly converting to a type-erased, boxed body type. [#2448]
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
* Rename `AnyBody::{Message => Stream}`. [#2446]
|
* Rename `body::AnyBody::{Message => Body}`. [#2446]
|
||||||
|
* Rename `body::AnyBody::{from_message => new_boxed}`. [#2448]
|
||||||
|
* Rename `body::AnyBody::{from_slice => copy_from_slice}`. [#2448]
|
||||||
|
* Rename `body::{BoxAnyBody => BoxBody}`. [#2448]
|
||||||
|
* Change representation of `AnyBody` to include a type parameter in `Body` variant. Defaults to `BoxBody`. [#2448]
|
||||||
|
* `Encoder::response` now returns `AnyBody<Encoder<B>>`. [#2448]
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
* `AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446]
|
* `body::AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446]
|
||||||
* `BodySize::Empty`; an empty body can now only be represented as a `Sized(0)` variant. [#2446]
|
* `body::BodySize::Empty`; an empty body can now only be represented as a `Sized(0)` variant. [#2446]
|
||||||
|
* `EncoderError::Boxed`; it is no longer required. [#2446]
|
||||||
|
* `body::ResponseBody`; is function is replaced by the new `body::AnyBody` enum. [#2446]
|
||||||
|
|
||||||
[#2446]: https://github.com/actix/actix-web/pull/2446
|
[#2446]: https://github.com/actix/actix-web/pull/2446
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,7 @@ regex = "1.3"
|
||||||
rustls-pemfile = "0.2"
|
rustls-pemfile = "0.2"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
static_assertions = "1"
|
||||||
tls-openssl = { package = "openssl", version = "0.10.9" }
|
tls-openssl = { package = "openssl", version = "0.10.9" }
|
||||||
tls-rustls = { package = "rustls", version = "0.20.0" }
|
tls-rustls = { package = "rustls", version = "0.20.0" }
|
||||||
tokio = { version = "1.2", features = ["net", "rt"] }
|
tokio = { version = "1.2", features = ["net", "rt"] }
|
||||||
|
|
|
@ -8,6 +8,7 @@ use std::{
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
|
||||||
|
@ -16,15 +17,17 @@ use super::{BodySize, BodyStream, MessageBody, MessageBodyMapErr, SizedStream};
|
||||||
pub type Body = AnyBody;
|
pub type Body = AnyBody;
|
||||||
|
|
||||||
/// Represents various types of HTTP message body.
|
/// Represents various types of HTTP message body.
|
||||||
pub enum AnyBody {
|
#[pin_project(project = AnyBodyProj)]
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum AnyBody<B = BoxBody> {
|
||||||
/// Empty response. `Content-Length` header is not set.
|
/// Empty response. `Content-Length` header is not set.
|
||||||
None,
|
None,
|
||||||
|
|
||||||
/// Specific response body.
|
/// Complete, in-memory response body.
|
||||||
Bytes(Bytes),
|
Bytes(Bytes),
|
||||||
|
|
||||||
/// Generic message body.
|
/// Generic / Other message body.
|
||||||
Stream(BoxAnyBody),
|
Body(#[pin] B),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AnyBody {
|
impl AnyBody {
|
||||||
|
@ -33,29 +36,60 @@ impl AnyBody {
|
||||||
Self::Bytes(Bytes::new())
|
Self::Bytes(Bytes::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create body from slice (copy)
|
/// Create boxed body from generic message body.
|
||||||
pub fn from_slice(s: &[u8]) -> Self {
|
pub fn new_boxed<B>(body: B) -> Self
|
||||||
Self::Bytes(Bytes::copy_from_slice(s))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create body from generic message body.
|
|
||||||
pub fn from_message<B>(body: B) -> Self
|
|
||||||
where
|
where
|
||||||
B: MessageBody + 'static,
|
B: MessageBody + 'static,
|
||||||
B::Error: Into<Box<dyn StdError + 'static>>,
|
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||||
{
|
{
|
||||||
Self::Stream(BoxAnyBody::from_body(body))
|
Self::Body(BoxBody::from_body(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Constructs new `AnyBody` instance from a slice of bytes by copying it.
|
||||||
|
///
|
||||||
|
/// If your bytes container is owned, it may be cheaper to use a `From` impl.
|
||||||
|
pub fn copy_from_slice(s: &[u8]) -> Self {
|
||||||
|
Self::Bytes(Bytes::copy_from_slice(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[deprecated(since = "4.0.0", note = "Renamed to `copy_from_slice`.")]
|
||||||
|
pub fn from_slice(s: &[u8]) -> Self {
|
||||||
|
Self::Bytes(Bytes::copy_from_slice(s))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for AnyBody {
|
impl<B> AnyBody<B>
|
||||||
|
where
|
||||||
|
B: MessageBody + 'static,
|
||||||
|
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||||
|
{
|
||||||
|
/// Create body from generic message body.
|
||||||
|
pub fn new(body: B) -> Self {
|
||||||
|
Self::Body(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_boxed(self) -> AnyBody {
|
||||||
|
match self {
|
||||||
|
Self::None => AnyBody::None,
|
||||||
|
Self::Bytes(bytes) => AnyBody::Bytes(bytes),
|
||||||
|
Self::Body(body) => AnyBody::new_boxed(body),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> MessageBody for AnyBody<B>
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
B::Error: Into<Box<dyn StdError>> + 'static,
|
||||||
|
{
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn size(&self) -> BodySize {
|
fn size(&self) -> BodySize {
|
||||||
match self {
|
match self {
|
||||||
AnyBody::None => BodySize::None,
|
AnyBody::None => BodySize::None,
|
||||||
AnyBody::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
|
AnyBody::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
|
||||||
AnyBody::Stream(ref body) => body.size(),
|
AnyBody::Body(ref body) => body.size(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,9 +97,9 @@ impl MessageBody for AnyBody {
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
match self.get_mut() {
|
match self.project() {
|
||||||
AnyBody::None => Poll::Ready(None),
|
AnyBodyProj::None => Poll::Ready(None),
|
||||||
AnyBody::Bytes(ref mut bin) => {
|
AnyBodyProj::Bytes(bin) => {
|
||||||
let len = bin.len();
|
let len = bin.len();
|
||||||
if len == 0 {
|
if len == 0 {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
|
@ -74,8 +108,7 @@ impl MessageBody for AnyBody {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AnyBody::Stream(body) => body
|
AnyBodyProj::Body(body) => body
|
||||||
.as_pin_mut()
|
|
||||||
.poll_next(cx)
|
.poll_next(cx)
|
||||||
.map_err(|err| Error::new_body().with_cause(err)),
|
.map_err(|err| Error::new_body().with_cause(err)),
|
||||||
}
|
}
|
||||||
|
@ -90,30 +123,30 @@ impl PartialEq for AnyBody {
|
||||||
AnyBody::Bytes(ref b2) => b == b2,
|
AnyBody::Bytes(ref b2) => b == b2,
|
||||||
_ => false,
|
_ => false,
|
||||||
},
|
},
|
||||||
AnyBody::Stream(_) => false,
|
AnyBody::Body(_) => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for AnyBody {
|
impl<S: fmt::Debug> fmt::Debug for AnyBody<S> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match *self {
|
match *self {
|
||||||
AnyBody::None => write!(f, "AnyBody::None"),
|
AnyBody::None => write!(f, "AnyBody::None"),
|
||||||
AnyBody::Bytes(ref b) => write!(f, "AnyBody::Bytes({:?})", b),
|
AnyBody::Bytes(ref bytes) => write!(f, "AnyBody::Bytes({:?})", bytes),
|
||||||
AnyBody::Stream(_) => write!(f, "AnyBody::Message(_)"),
|
AnyBody::Body(ref stream) => write!(f, "AnyBody::Message({:?})", stream),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&'static str> for AnyBody {
|
impl From<&'static str> for AnyBody {
|
||||||
fn from(s: &'static str) -> Body {
|
fn from(string: &'static str) -> Body {
|
||||||
AnyBody::Bytes(Bytes::from_static(s.as_ref()))
|
AnyBody::Bytes(Bytes::from_static(string.as_ref()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&'static [u8]> for AnyBody {
|
impl From<&'static [u8]> for AnyBody {
|
||||||
fn from(s: &'static [u8]) -> Body {
|
fn from(bytes: &'static [u8]) -> Body {
|
||||||
AnyBody::Bytes(Bytes::from_static(s))
|
AnyBody::Bytes(Bytes::from_static(bytes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,20 +157,20 @@ impl From<Vec<u8>> for AnyBody {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<String> for AnyBody {
|
impl From<String> for AnyBody {
|
||||||
fn from(s: String) -> Body {
|
fn from(string: String) -> Body {
|
||||||
s.into_bytes().into()
|
string.into_bytes().into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&'_ String> for AnyBody {
|
impl From<&'_ String> for AnyBody {
|
||||||
fn from(s: &String) -> Body {
|
fn from(string: &String) -> Body {
|
||||||
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
|
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&string)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Cow<'_, str>> for AnyBody {
|
impl From<Cow<'_, str>> for AnyBody {
|
||||||
fn from(s: Cow<'_, str>) -> Body {
|
fn from(string: Cow<'_, str>) -> Body {
|
||||||
match s {
|
match string {
|
||||||
Cow::Owned(s) => AnyBody::from(s),
|
Cow::Owned(s) => AnyBody::from(s),
|
||||||
Cow::Borrowed(s) => {
|
Cow::Borrowed(s) => {
|
||||||
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(s)))
|
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(s)))
|
||||||
|
@ -147,14 +180,14 @@ impl From<Cow<'_, str>> for AnyBody {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Bytes> for AnyBody {
|
impl From<Bytes> for AnyBody {
|
||||||
fn from(s: Bytes) -> Body {
|
fn from(bytes: Bytes) -> Body {
|
||||||
AnyBody::Bytes(s)
|
AnyBody::Bytes(bytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BytesMut> for AnyBody {
|
impl From<BytesMut> for AnyBody {
|
||||||
fn from(s: BytesMut) -> Body {
|
fn from(bytes: BytesMut) -> Body {
|
||||||
AnyBody::Bytes(s.freeze())
|
AnyBody::Bytes(bytes.freeze())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,8 +196,8 @@ where
|
||||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||||
E: Into<Box<dyn StdError>> + 'static,
|
E: Into<Box<dyn StdError>> + 'static,
|
||||||
{
|
{
|
||||||
fn from(s: SizedStream<S>) -> Body {
|
fn from(stream: SizedStream<S>) -> Body {
|
||||||
AnyBody::from_message(s)
|
AnyBody::new_boxed(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,15 +206,15 @@ where
|
||||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||||
E: Into<Box<dyn StdError>> + 'static,
|
E: Into<Box<dyn StdError>> + 'static,
|
||||||
{
|
{
|
||||||
fn from(s: BodyStream<S>) -> Body {
|
fn from(stream: BodyStream<S>) -> Body {
|
||||||
AnyBody::from_message(s)
|
AnyBody::new_boxed(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A boxed message body with boxed errors.
|
/// A boxed message body with boxed errors.
|
||||||
pub struct BoxAnyBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError + 'static>>>>);
|
pub struct BoxBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>);
|
||||||
|
|
||||||
impl BoxAnyBody {
|
impl BoxBody {
|
||||||
/// Boxes a `MessageBody` and any errors it generates.
|
/// Boxes a `MessageBody` and any errors it generates.
|
||||||
pub fn from_body<B>(body: B) -> Self
|
pub fn from_body<B>(body: B) -> Self
|
||||||
where
|
where
|
||||||
|
@ -195,18 +228,18 @@ impl BoxAnyBody {
|
||||||
/// Returns a mutable pinned reference to the inner message body type.
|
/// Returns a mutable pinned reference to the inner message body type.
|
||||||
pub fn as_pin_mut(
|
pub fn as_pin_mut(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError + 'static>>)> {
|
) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError>>)> {
|
||||||
self.0.as_mut()
|
self.0.as_mut()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for BoxAnyBody {
|
impl fmt::Debug for BoxBody {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.write_str("BoxAnyBody(dyn MessageBody)")
|
f.write_str("BoxAnyBody(dyn MessageBody)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBody for BoxAnyBody {
|
impl MessageBody for BoxBody {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn size(&self) -> BodySize {
|
fn size(&self) -> BodySize {
|
||||||
|
@ -223,3 +256,52 @@ impl MessageBody for BoxAnyBody {
|
||||||
.map_err(|err| Error::new_body().with_cause(err))
|
.map_err(|err| Error::new_body().with_cause(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::marker::PhantomPinned;
|
||||||
|
|
||||||
|
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::body::to_bytes;
|
||||||
|
|
||||||
|
struct PinType(PhantomPinned);
|
||||||
|
|
||||||
|
impl MessageBody for PinType {
|
||||||
|
type Error = crate::Error;
|
||||||
|
|
||||||
|
fn size(&self) -> BodySize {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_impl_all!(AnyBody<()>: MessageBody, fmt::Debug, Send, Sync, Unpin);
|
||||||
|
assert_impl_all!(AnyBody<AnyBody<()>>: MessageBody, fmt::Debug, Send, Sync, Unpin);
|
||||||
|
assert_impl_all!(AnyBody<Bytes>: MessageBody, fmt::Debug, Send, Sync, Unpin);
|
||||||
|
assert_impl_all!(AnyBody: MessageBody, fmt::Debug, Unpin);
|
||||||
|
assert_impl_all!(BoxBody: MessageBody, fmt::Debug, Unpin);
|
||||||
|
assert_impl_all!(AnyBody<PinType>: MessageBody);
|
||||||
|
|
||||||
|
assert_not_impl_all!(AnyBody: Send, Sync, Unpin);
|
||||||
|
assert_not_impl_all!(BoxBody: Send, Sync, Unpin);
|
||||||
|
assert_not_impl_all!(AnyBody<PinType>: Send, Sync, Unpin);
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn nested_boxed_body() {
|
||||||
|
let body = AnyBody::copy_from_slice(&[1, 2, 3]);
|
||||||
|
let boxed_body = BoxBody::from_body(BoxBody::from_body(body));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
to_bytes(boxed_body).await.unwrap(),
|
||||||
|
Bytes::from(vec![1, 2, 3]),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -75,10 +75,22 @@ mod tests {
|
||||||
use derive_more::{Display, Error};
|
use derive_more::{Display, Error};
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use futures_util::{stream, FutureExt as _};
|
use futures_util::{stream, FutureExt as _};
|
||||||
|
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::body::to_bytes;
|
use crate::body::to_bytes;
|
||||||
|
|
||||||
|
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
|
||||||
|
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
|
||||||
|
assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
|
||||||
|
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
|
||||||
|
assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
|
||||||
|
|
||||||
|
assert_not_impl_all!(BodyStream<stream::Empty<Bytes>>: MessageBody);
|
||||||
|
assert_not_impl_all!(BodyStream<stream::Repeat<Bytes>>: MessageBody);
|
||||||
|
// crate::Error is not Clone
|
||||||
|
assert_not_impl_all!(BodyStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn skips_empty_chunks() {
|
async fn skips_empty_chunks() {
|
||||||
let body = BodyStream::new(stream::iter(
|
let body = BodyStream::new(stream::iter(
|
||||||
|
@ -124,6 +136,30 @@ mod tests {
|
||||||
assert!(matches!(to_bytes(body).await, Err(StreamErr)));
|
assert!(matches!(to_bytes(body).await, Err(StreamErr)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stream_string_error() {
|
||||||
|
// `&'static str` does not impl `Error`
|
||||||
|
// but it does impl `Into<Box<dyn Error>>`
|
||||||
|
|
||||||
|
let body = BodyStream::new(stream::once(async { Err("stringy error") }));
|
||||||
|
assert!(matches!(to_bytes(body).await, Err("stringy error")));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stream_boxed_error() {
|
||||||
|
// `Box<dyn Error>` does not impl `Error`
|
||||||
|
// but it does impl `Into<Box<dyn Error>>`
|
||||||
|
|
||||||
|
let body = BodyStream::new(stream::once(async {
|
||||||
|
Err(Box::<dyn StdError>::from("stringy error"))
|
||||||
|
}));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
to_bytes(body).await.unwrap_err().to_string(),
|
||||||
|
"stringy error"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn stream_delayed_error() {
|
async fn stream_delayed_error() {
|
||||||
let body =
|
let body =
|
||||||
|
|
|
@ -11,15 +11,13 @@ use futures_core::ready;
|
||||||
mod body;
|
mod body;
|
||||||
mod body_stream;
|
mod body_stream;
|
||||||
mod message_body;
|
mod message_body;
|
||||||
mod response_body;
|
|
||||||
mod size;
|
mod size;
|
||||||
mod sized_stream;
|
mod sized_stream;
|
||||||
|
|
||||||
pub use self::body::{AnyBody, Body, BoxAnyBody};
|
pub use self::body::{AnyBody, Body, BoxBody};
|
||||||
pub use self::body_stream::BodyStream;
|
pub use self::body_stream::BodyStream;
|
||||||
pub use self::message_body::MessageBody;
|
pub use self::message_body::MessageBody;
|
||||||
pub(crate) use self::message_body::MessageBodyMapErr;
|
pub(crate) use self::message_body::MessageBodyMapErr;
|
||||||
pub use self::response_body::ResponseBody;
|
|
||||||
pub use self::size::BodySize;
|
pub use self::size::BodySize;
|
||||||
pub use self::sized_stream::SizedStream;
|
pub use self::sized_stream::SizedStream;
|
||||||
|
|
||||||
|
@ -108,10 +106,10 @@ mod tests {
|
||||||
assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
|
assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
|
||||||
assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
|
assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
Body::from_slice(b"test".as_ref()).size(),
|
Body::copy_from_slice(b"test".as_ref()).size(),
|
||||||
BodySize::Sized(4)
|
BodySize::Sized(4)
|
||||||
);
|
);
|
||||||
assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
|
assert_eq!(Body::copy_from_slice(b"test".as_ref()).get_ref(), b"test");
|
||||||
let sb = Bytes::from(&b"test"[..]);
|
let sb = Bytes::from(&b"test"[..]);
|
||||||
pin!(sb);
|
pin!(sb);
|
||||||
|
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
use std::{
|
|
||||||
mem,
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use futures_core::Stream;
|
|
||||||
use pin_project::pin_project;
|
|
||||||
|
|
||||||
use crate::error::Error;
|
|
||||||
|
|
||||||
use super::{Body, BodySize, MessageBody};
|
|
||||||
|
|
||||||
#[pin_project(project = ResponseBodyProj)]
|
|
||||||
pub enum ResponseBody<B> {
|
|
||||||
Body(#[pin] B),
|
|
||||||
Other(Body),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ResponseBody<Body> {
|
|
||||||
pub fn into_body<B>(self) -> ResponseBody<B> {
|
|
||||||
match self {
|
|
||||||
ResponseBody::Body(b) => ResponseBody::Other(b),
|
|
||||||
ResponseBody::Other(b) => ResponseBody::Other(b),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> ResponseBody<B> {
|
|
||||||
pub fn take_body(&mut self) -> ResponseBody<B> {
|
|
||||||
mem::replace(self, ResponseBody::Other(Body::None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B: MessageBody> ResponseBody<B> {
|
|
||||||
pub fn as_ref(&self) -> Option<&B> {
|
|
||||||
if let ResponseBody::Body(ref b) = self {
|
|
||||||
Some(b)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> MessageBody for ResponseBody<B>
|
|
||||||
where
|
|
||||||
B: MessageBody,
|
|
||||||
B::Error: Into<Error>,
|
|
||||||
{
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn size(&self) -> BodySize {
|
|
||||||
match self {
|
|
||||||
ResponseBody::Body(ref body) => body.size(),
|
|
||||||
ResponseBody::Other(ref body) => body.size(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
Stream::poll_next(self, cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> Stream for ResponseBody<B>
|
|
||||||
where
|
|
||||||
B: MessageBody,
|
|
||||||
B::Error: Into<Error>,
|
|
||||||
{
|
|
||||||
type Item = Result<Bytes, Error>;
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
) -> Poll<Option<Self::Item>> {
|
|
||||||
match self.project() {
|
|
||||||
ResponseBodyProj::Body(body) => body.poll_next(cx).map_err(Into::into),
|
|
||||||
ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -72,10 +72,22 @@ mod tests {
|
||||||
use actix_rt::pin;
|
use actix_rt::pin;
|
||||||
use actix_utils::future::poll_fn;
|
use actix_utils::future::poll_fn;
|
||||||
use futures_util::stream;
|
use futures_util::stream;
|
||||||
|
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::body::to_bytes;
|
use crate::body::to_bytes;
|
||||||
|
|
||||||
|
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
|
||||||
|
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
|
||||||
|
assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
|
||||||
|
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
|
||||||
|
assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
|
||||||
|
|
||||||
|
assert_not_impl_all!(SizedStream<stream::Empty<Bytes>>: MessageBody);
|
||||||
|
assert_not_impl_all!(SizedStream<stream::Repeat<Bytes>>: MessageBody);
|
||||||
|
// crate::Error is not Clone
|
||||||
|
assert_not_impl_all!(SizedStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn skips_empty_chunks() {
|
async fn skips_empty_chunks() {
|
||||||
let body = SizedStream::new(
|
let body = SizedStream::new(
|
||||||
|
@ -119,4 +131,37 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
|
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stream_string_error() {
|
||||||
|
// `&'static str` does not impl `Error`
|
||||||
|
// but it does impl `Into<Box<dyn Error>>`
|
||||||
|
|
||||||
|
let body = SizedStream::new(0, stream::once(async { Err("stringy error") }));
|
||||||
|
assert_eq!(to_bytes(body).await, Ok(Bytes::new()));
|
||||||
|
|
||||||
|
let body = SizedStream::new(1, stream::once(async { Err("stringy error") }));
|
||||||
|
assert!(matches!(to_bytes(body).await, Err("stringy error")));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stream_boxed_error() {
|
||||||
|
// `Box<dyn Error>` does not impl `Error`
|
||||||
|
// but it does impl `Into<Box<dyn Error>>`
|
||||||
|
|
||||||
|
let body = SizedStream::new(
|
||||||
|
0,
|
||||||
|
stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
|
||||||
|
);
|
||||||
|
assert_eq!(to_bytes(body).await.unwrap(), Bytes::new());
|
||||||
|
|
||||||
|
let body = SizedStream::new(
|
||||||
|
1,
|
||||||
|
stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
to_bytes(body).await.unwrap_err().to_string(),
|
||||||
|
"stringy error"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ use flate2::write::{GzEncoder, ZlibEncoder};
|
||||||
use zstd::stream::write::Encoder as ZstdEncoder;
|
use zstd::stream::write::Encoder as ZstdEncoder;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody},
|
body::{AnyBody, BodySize, MessageBody},
|
||||||
http::{
|
http::{
|
||||||
header::{ContentEncoding, CONTENT_ENCODING},
|
header::{ContentEncoding, CONTENT_ENCODING},
|
||||||
HeaderValue, StatusCode,
|
HeaderValue, StatusCode,
|
||||||
|
@ -50,8 +50,8 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
pub fn response(
|
pub fn response(
|
||||||
encoding: ContentEncoding,
|
encoding: ContentEncoding,
|
||||||
head: &mut ResponseHead,
|
head: &mut ResponseHead,
|
||||||
body: ResponseBody<B>,
|
body: AnyBody<B>,
|
||||||
) -> ResponseBody<Encoder<B>> {
|
) -> AnyBody<Encoder<B>> {
|
||||||
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|
||||||
|| head.status == StatusCode::SWITCHING_PROTOCOLS
|
|| head.status == StatusCode::SWITCHING_PROTOCOLS
|
||||||
|| head.status == StatusCode::NO_CONTENT
|
|| head.status == StatusCode::NO_CONTENT
|
||||||
|
@ -59,18 +59,15 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
|| encoding == ContentEncoding::Auto);
|
|| encoding == ContentEncoding::Auto);
|
||||||
|
|
||||||
let body = match body {
|
let body = match body {
|
||||||
ResponseBody::Other(b) => match b {
|
AnyBody::None => return AnyBody::None,
|
||||||
Body::None => return ResponseBody::Other(Body::None),
|
AnyBody::Bytes(buf) => {
|
||||||
Body::Bytes(buf) => {
|
|
||||||
if can_encode {
|
if can_encode {
|
||||||
EncoderBody::Bytes(buf)
|
EncoderBody::Bytes(buf)
|
||||||
} else {
|
} else {
|
||||||
return ResponseBody::Other(Body::Bytes(buf));
|
return AnyBody::Bytes(buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Body::Stream(stream) => EncoderBody::BoxedStream(stream),
|
AnyBody::Body(body) => EncoderBody::Stream(body),
|
||||||
},
|
|
||||||
ResponseBody::Body(stream) => EncoderBody::Stream(stream),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if can_encode {
|
if can_encode {
|
||||||
|
@ -78,7 +75,8 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
if let Some(enc) = ContentEncoder::encoder(encoding) {
|
if let Some(enc) = ContentEncoder::encoder(encoding) {
|
||||||
update_head(encoding, head);
|
update_head(encoding, head);
|
||||||
head.no_chunking(false);
|
head.no_chunking(false);
|
||||||
return ResponseBody::Body(Encoder {
|
|
||||||
|
return AnyBody::Body(Encoder {
|
||||||
body,
|
body,
|
||||||
eof: false,
|
eof: false,
|
||||||
fut: None,
|
fut: None,
|
||||||
|
@ -87,7 +85,7 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ResponseBody::Body(Encoder {
|
AnyBody::Body(Encoder {
|
||||||
body,
|
body,
|
||||||
eof: false,
|
eof: false,
|
||||||
fut: None,
|
fut: None,
|
||||||
|
@ -100,7 +98,6 @@ impl<B: MessageBody> Encoder<B> {
|
||||||
enum EncoderBody<B> {
|
enum EncoderBody<B> {
|
||||||
Bytes(Bytes),
|
Bytes(Bytes),
|
||||||
Stream(#[pin] B),
|
Stream(#[pin] B),
|
||||||
BoxedStream(BoxAnyBody),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B> MessageBody for EncoderBody<B>
|
impl<B> MessageBody for EncoderBody<B>
|
||||||
|
@ -113,7 +110,6 @@ where
|
||||||
match self {
|
match self {
|
||||||
EncoderBody::Bytes(ref b) => b.size(),
|
EncoderBody::Bytes(ref b) => b.size(),
|
||||||
EncoderBody::Stream(ref b) => b.size(),
|
EncoderBody::Stream(ref b) => b.size(),
|
||||||
EncoderBody::BoxedStream(ref b) => b.size(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,9 +126,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EncoderBodyProj::Stream(b) => b.poll_next(cx).map_err(EncoderError::Body),
|
EncoderBodyProj::Stream(b) => b.poll_next(cx).map_err(EncoderError::Body),
|
||||||
EncoderBodyProj::BoxedStream(ref mut b) => {
|
|
||||||
b.as_pin_mut().poll_next(cx).map_err(EncoderError::Boxed)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,9 +341,6 @@ pub enum EncoderError<E> {
|
||||||
#[display(fmt = "body")]
|
#[display(fmt = "body")]
|
||||||
Body(E),
|
Body(E),
|
||||||
|
|
||||||
#[display(fmt = "boxed")]
|
|
||||||
Boxed(Box<dyn StdError>),
|
|
||||||
|
|
||||||
#[display(fmt = "blocking")]
|
#[display(fmt = "blocking")]
|
||||||
Blocking(BlockingError),
|
Blocking(BlockingError),
|
||||||
|
|
||||||
|
@ -362,7 +352,6 @@ impl<E: StdError + 'static> StdError for EncoderError<E> {
|
||||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
EncoderError::Body(err) => Some(err),
|
EncoderError::Body(err) => Some(err),
|
||||||
EncoderError::Boxed(err) => Some(&**err),
|
|
||||||
EncoderError::Blocking(err) => Some(err),
|
EncoderError::Blocking(err) => Some(err),
|
||||||
EncoderError::Io(err) => Some(err),
|
EncoderError::Io(err) => Some(err),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1077,7 +1077,7 @@ mod tests {
|
||||||
fn_service(|req: Request| {
|
fn_service(|req: Request| {
|
||||||
let path = req.path().as_bytes();
|
let path = req.path().as_bytes();
|
||||||
ready(Ok::<_, Error>(
|
ready(Ok::<_, Error>(
|
||||||
Response::ok().set_body(AnyBody::from_slice(path)),
|
Response::ok().set_body(AnyBody::copy_from_slice(path)),
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -262,7 +262,7 @@ impl ResponseBuilder {
|
||||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||||
E: Into<Box<dyn StdError>> + 'static,
|
E: Into<Box<dyn StdError>> + 'static,
|
||||||
{
|
{
|
||||||
self.body(AnyBody::from_message(BodyStream::new(stream)))
|
self.body(AnyBody::new_boxed(BodyStream::new(stream)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate response with an empty body.
|
/// Generate response with an empty body.
|
||||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
body::{Body, BodyStream},
|
body::{AnyBody, Body, BodyStream},
|
||||||
http::{
|
http::{
|
||||||
header::{self, HeaderMap, HeaderName, IntoHeaderValue},
|
header::{self, HeaderMap, HeaderName, IntoHeaderValue},
|
||||||
Error as HttpError,
|
Error as HttpError,
|
||||||
|
@ -286,7 +286,7 @@ impl RequestSender {
|
||||||
response_decompress,
|
response_decompress,
|
||||||
timeout,
|
timeout,
|
||||||
config,
|
config,
|
||||||
Body::from_message(BodyStream::new(stream)),
|
AnyBody::new_boxed(BodyStream::new(stream)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ pub use crate::types::form::UrlEncoded;
|
||||||
pub use crate::types::json::JsonBody;
|
pub use crate::types::json::JsonBody;
|
||||||
pub use crate::types::readlines::Readlines;
|
pub use crate::types::readlines::Readlines;
|
||||||
|
|
||||||
pub use actix_http::body::{AnyBody, Body, BodySize, MessageBody, ResponseBody, SizedStream};
|
pub use actix_http::body::{AnyBody, Body, BodySize, MessageBody, SizedStream};
|
||||||
|
|
||||||
#[cfg(feature = "__compress")]
|
#[cfg(feature = "__compress")]
|
||||||
pub use actix_http::encoding::Decoder as Decompress;
|
pub use actix_http::encoding::Decoder as Decompress;
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::body::{Body, MessageBody};
|
use actix_http::body::{AnyBody, MessageBody};
|
||||||
use actix_service::{Service, Transform};
|
use actix_service::{Service, Transform};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ where
|
||||||
B::Error: Into<Box<dyn StdError + 'static>>,
|
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||||
{
|
{
|
||||||
fn map_body(self) -> ServiceResponse {
|
fn map_body(self) -> ServiceResponse {
|
||||||
self.map_body(|_, body| Body::from_message(body))
|
self.map_body(|_, body| AnyBody::new_boxed(body))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,13 +10,14 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
body::{MessageBody, ResponseBody},
|
body::{AnyBody, MessageBody},
|
||||||
encoding::Encoder,
|
encoding::Encoder,
|
||||||
http::header::{ContentEncoding, ACCEPT_ENCODING},
|
http::header::{ContentEncoding, ACCEPT_ENCODING},
|
||||||
StatusCode,
|
StatusCode,
|
||||||
};
|
};
|
||||||
use actix_service::{Service, Transform};
|
use actix_service::{Service, Transform};
|
||||||
use actix_utils::future::{ok, Either, Ready};
|
use actix_utils::future::{ok, Either, Ready};
|
||||||
|
use bytes::Bytes;
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
@ -61,7 +62,7 @@ where
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||||
{
|
{
|
||||||
type Response = ServiceResponse<ResponseBody<Encoder<B>>>;
|
type Response = ServiceResponse<AnyBody<Encoder<B>>>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Transform = CompressMiddleware<S>;
|
type Transform = CompressMiddleware<S>;
|
||||||
type InitError = ();
|
type InitError = ();
|
||||||
|
@ -110,7 +111,7 @@ where
|
||||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
type Response = ServiceResponse<ResponseBody<Encoder<B>>>;
|
type Response = ServiceResponse<AnyBody<Encoder<B>>>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = Either<CompressResponse<S, B>, Ready<Result<Self::Response, Self::Error>>>;
|
type Future = Either<CompressResponse<S, B>, Ready<Result<Self::Response, Self::Error>>>;
|
||||||
|
|
||||||
|
@ -142,15 +143,19 @@ where
|
||||||
|
|
||||||
// There is an HTTP header but we cannot match what client as asked for
|
// There is an HTTP header but we cannot match what client as asked for
|
||||||
Some(Err(_)) => {
|
Some(Err(_)) => {
|
||||||
let res = HttpResponse::with_body(
|
let res = HttpResponse::new(StatusCode::NOT_ACCEPTABLE);
|
||||||
StatusCode::NOT_ACCEPTABLE,
|
|
||||||
SUPPORTED_ALGORITHM_NAMES.as_str(),
|
|
||||||
);
|
|
||||||
let enc = ContentEncoding::Identity;
|
|
||||||
|
|
||||||
Either::right(ok(req.into_response(res.map_body(move |head, body| {
|
let res: HttpResponse<AnyBody<Encoder<B>>> = res.map_body(move |head, _| {
|
||||||
Encoder::response(enc, head, ResponseBody::Other(body.into()))
|
let body_bytes = Bytes::from(SUPPORTED_ALGORITHM_NAMES.as_bytes());
|
||||||
}))))
|
|
||||||
|
Encoder::response(
|
||||||
|
ContentEncoding::Identity,
|
||||||
|
head,
|
||||||
|
AnyBody::Bytes(body_bytes),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
Either::right(ok(req.into_response(res)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,7 +177,7 @@ where
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||||
{
|
{
|
||||||
type Output = Result<ServiceResponse<ResponseBody<Encoder<B>>>, Error>;
|
type Output = Result<ServiceResponse<AnyBody<Encoder<B>>>, Error>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.project();
|
let this = self.project();
|
||||||
|
@ -186,7 +191,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
Poll::Ready(Ok(resp.map_body(move |head, body| {
|
Poll::Ready(Ok(resp.map_body(move |head, body| {
|
||||||
Encoder::response(enc, head, ResponseBody::Body(body))
|
Encoder::response(enc, head, AnyBody::Body(body))
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Err(e) => Poll::Ready(Err(e)),
|
Err(e) => Poll::Ready(Err(e)),
|
||||||
|
|
|
@ -232,7 +232,7 @@ pub(crate) mod tests {
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::dev::{Body, ResponseBody};
|
use crate::dev::AnyBody;
|
||||||
use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode};
|
use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode};
|
||||||
use crate::test::{init_service, TestRequest};
|
use crate::test::{init_service, TestRequest};
|
||||||
use crate::{error, web, App};
|
use crate::{error, web, App};
|
||||||
|
@ -264,13 +264,13 @@ pub(crate) mod tests {
|
||||||
|
|
||||||
pub(crate) trait BodyTest {
|
pub(crate) trait BodyTest {
|
||||||
fn bin_ref(&self) -> &[u8];
|
fn bin_ref(&self) -> &[u8];
|
||||||
fn body(&self) -> &Body;
|
fn body(&self) -> &AnyBody;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodyTest for Body {
|
impl BodyTest for Body {
|
||||||
fn bin_ref(&self) -> &[u8] {
|
fn bin_ref(&self) -> &[u8] {
|
||||||
match self {
|
match self {
|
||||||
Body::Bytes(ref bin) => bin,
|
AnyBody::Bytes(ref bin) => bin,
|
||||||
_ => unreachable!("bug in test impl"),
|
_ => unreachable!("bug in test impl"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,27 +279,6 @@ pub(crate) mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodyTest for ResponseBody<Body> {
|
|
||||||
fn bin_ref(&self) -> &[u8] {
|
|
||||||
match self {
|
|
||||||
ResponseBody::Body(ref b) => match b {
|
|
||||||
Body::Bytes(ref bin) => bin,
|
|
||||||
_ => unreachable!("bug in test impl"),
|
|
||||||
},
|
|
||||||
ResponseBody::Other(ref b) => match b {
|
|
||||||
Body::Bytes(ref bin) => bin,
|
|
||||||
_ => unreachable!("bug in test impl"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn body(&self) -> &Body {
|
|
||||||
match self {
|
|
||||||
ResponseBody::Body(ref b) => b,
|
|
||||||
ResponseBody::Other(ref b) => b,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_responder() {
|
async fn test_responder() {
|
||||||
let req = TestRequest::default().to_http_request();
|
let req = TestRequest::default().to_http_request();
|
||||||
|
|
|
@ -354,10 +354,10 @@ impl HttpResponseBuilder {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse
|
pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||||
E: Into<Box<dyn StdError>> + 'static,
|
E: Into<Box<dyn StdError>> + 'static,
|
||||||
{
|
{
|
||||||
self.body(AnyBody::from_message(BodyStream::new(stream)))
|
self.body(AnyBody::new_boxed(BodyStream::new(stream)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set a json body and generate `Response`
|
/// Set a json body and generate `Response`
|
||||||
|
|
|
@ -227,6 +227,9 @@ impl<B> HttpResponse<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: into_body equivalent
|
||||||
|
// TODO: into_boxed_body
|
||||||
|
|
||||||
/// Extract response body
|
/// Extract response body
|
||||||
pub fn into_body(self) -> B {
|
pub fn into_body(self) -> B {
|
||||||
self.res.into_body()
|
self.res.into_body()
|
||||||
|
|
Loading…
Reference in a new issue