From 1296e07c4830f0ab2e2864a6fc9faa93972e5935 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 24 Dec 2021 17:47:47 +0000 Subject: [PATCH] relax unpin bounds on payload types (#2545) --- actix-http/CHANGES.md | 9 ++++ actix-http/src/encoding/decoder.rs | 39 ++++++++------- actix-http/src/h1/dispatcher.rs | 7 +-- actix-http/src/h1/payload.rs | 76 +++++++++++++++++------------- actix-http/src/h1/utils.rs | 4 +- actix-http/src/h2/dispatcher.rs | 4 +- actix-http/src/h2/mod.rs | 11 +++++ actix-http/src/lib.rs | 3 +- actix-http/src/payload.rs | 73 +++++++++++++++++----------- actix-http/src/requests/request.rs | 11 +++-- actix-http/src/test.rs | 2 +- actix-http/tests/test_rustls.rs | 33 +++++++++---- actix-multipart/src/server.rs | 2 +- awc/src/client/connection.rs | 4 +- awc/src/client/h1proto.rs | 16 +++++-- awc/src/response.rs | 6 +-- awc/src/sender.rs | 15 +++--- awc/src/test.rs | 5 +- src/dev.rs | 2 +- src/response/builder.rs | 36 ++++++-------- src/service.rs | 4 +- src/test/test_request.rs | 21 +++++---- 22 files changed, 229 insertions(+), 154 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 3b45e934f..adc4c35c7 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,8 +3,17 @@ ## Unreleased - 2021-xx-xx ### Changes - `HeaderMap::get_all` now returns a `std::slice::Iter`. [#2527] +- `Payload` inner fields are now named. [#2545] +- `impl Stream` for `Payload` no longer requires the `Stream` variant be `Unpin`. [#2545] +- `impl Future` for `h1::SendResponse` no longer requires the body type be `Unpin`. [#2545] +- `impl Stream` for `encoding::Decoder` no longer requires the stream type be `Unpin`. [#2545] +- Rename `PayloadStream` to `BoxedPayloadStream`. [#2545] + +### Removed +- `h1::Payload::readany`. [#2545] [#2527]: https://github.com/actix/actix-web/pull/2527 +[#2545]: https://github.com/actix/actix-web/pull/2545 ## 3.0.0-beta.16 - 2021-12-17 diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index a46e330c9..0f519637a 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -28,11 +28,14 @@ use crate::{ const MAX_CHUNK_SIZE_DECODE_IN_PLACE: usize = 2049; -pub struct Decoder { - decoder: Option, - stream: S, - eof: bool, - fut: Option, ContentDecoder), io::Error>>>, +pin_project_lite::pin_project! { + pub struct Decoder { + decoder: Option, + #[pin] + stream: S, + eof: bool, + fut: Option, ContentDecoder), io::Error>>>, + } } impl Decoder @@ -89,42 +92,44 @@ where impl Stream for Decoder where - S: Stream> + Unpin, + S: Stream>, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { - if let Some(ref mut fut) = self.fut { + if let Some(ref mut fut) = this.fut { let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; - self.decoder = Some(decoder); - self.fut.take(); + *this.decoder = Some(decoder); + this.fut.take(); if let Some(chunk) = chunk { return Poll::Ready(Some(Ok(chunk))); } } - if self.eof { + if *this.eof { return Poll::Ready(None); } - match ready!(Pin::new(&mut self.stream).poll_next(cx)) { + match ready!(this.stream.as_mut().poll_next(cx)) { Some(Err(err)) => return Poll::Ready(Some(Err(err))), Some(Ok(chunk)) => { - if let Some(mut decoder) = self.decoder.take() { + if let Some(mut decoder) = this.decoder.take() { if chunk.len() < MAX_CHUNK_SIZE_DECODE_IN_PLACE { let chunk = decoder.feed_data(chunk)?; - self.decoder = Some(decoder); + *this.decoder = Some(decoder); if let Some(chunk) = chunk { return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(spawn_blocking(move || { + *this.fut = Some(spawn_blocking(move || { let chunk = decoder.feed_data(chunk)?; Ok((chunk, decoder)) })); @@ -137,9 +142,9 @@ where } None => { - self.eof = true; + *this.eof = true; - return if let Some(mut decoder) = self.decoder.take() { + return if let Some(mut decoder) = this.decoder.take() { match decoder.feed_eof() { Ok(Some(res)) => Poll::Ready(Some(Ok(res))), Ok(None) => Poll::Ready(None), diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5c0cb64af..13055f08a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -646,10 +646,11 @@ where Payload is attached to Request and passed to Service::call where the state can be collected and consumed. */ - let (ps, pl) = Payload::create(false); - let (req1, _) = req.replace_payload(crate::Payload::H1(pl)); + let (sender, payload) = Payload::create(false); + let (req1, _) = + req.replace_payload(crate::Payload::H1 { payload }); req = req1; - *this.payload = Some(ps); + *this.payload = Some(sender); } // Request has no payload. diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index f912e0ba3..4d031c15a 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -1,9 +1,12 @@ //! Payload stream -use std::cell::RefCell; -use std::collections::VecDeque; -use std::pin::Pin; -use std::rc::{Rc, Weak}; -use std::task::{Context, Poll, Waker}; + +use std::{ + cell::RefCell, + collections::VecDeque, + pin::Pin, + rc::{Rc, Weak}, + task::{Context, Poll, Waker}, +}; use bytes::Bytes; use futures_core::Stream; @@ -22,39 +25,32 @@ pub enum PayloadStatus { /// Buffered stream of bytes chunks /// -/// Payload stores chunks in a vector. First chunk can be received with -/// `.readany()` method. Payload stream is not thread safe. Payload does not -/// notify current task when new data is available. +/// Payload stores chunks in a vector. First chunk can be received with `poll_next`. Payload does +/// not notify current task when new data is available. /// -/// Payload stream can be used as `Response` body stream. +/// Payload can be used as `Response` body stream. #[derive(Debug)] pub struct Payload { inner: Rc>, } impl Payload { - /// Create payload stream. + /// Creates a payload stream. /// - /// This method construct two objects responsible for bytes stream - /// generation. - /// - /// * `PayloadSender` - *Sender* side of the stream - /// - /// * `Payload` - *Receiver* side of the stream + /// This method construct two objects responsible for bytes stream generation: + /// - `PayloadSender` - *Sender* side of the stream + /// - `Payload` - *Receiver* side of the stream pub fn create(eof: bool) -> (PayloadSender, Payload) { let shared = Rc::new(RefCell::new(Inner::new(eof))); ( - PayloadSender { - inner: Rc::downgrade(&shared), - }, + PayloadSender::new(Rc::downgrade(&shared)), Payload { inner: shared }, ) } - /// Create empty payload - #[doc(hidden)] - pub fn empty() -> Payload { + /// Creates an empty payload. + pub(crate) fn empty() -> Payload { Payload { inner: Rc::new(RefCell::new(Inner::new(true))), } @@ -77,14 +73,6 @@ impl Payload { pub fn unread_data(&mut self, data: Bytes) { self.inner.borrow_mut().unread_data(data); } - - #[inline] - pub fn readany( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - self.inner.borrow_mut().readany(cx) - } } impl Stream for Payload { @@ -94,7 +82,7 @@ impl Stream for Payload { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.inner.borrow_mut().readany(cx) + Pin::new(&mut *self.inner.borrow_mut()).poll_next(cx) } } @@ -104,6 +92,10 @@ pub struct PayloadSender { } impl PayloadSender { + fn new(inner: Weak>) -> Self { + Self { inner } + } + #[inline] pub fn set_error(&mut self, err: PayloadError) { if let Some(shared) = self.inner.upgrade() { @@ -227,7 +219,10 @@ impl Inner { self.len } - fn readany(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); self.need_read = self.len < MAX_BUFFER_SIZE; @@ -257,8 +252,18 @@ impl Inner { #[cfg(test)] mod tests { - use super::*; + use std::panic::{RefUnwindSafe, UnwindSafe}; + use actix_utils::future::poll_fn; + use static_assertions::{assert_impl_all, assert_not_impl_any}; + + use super::*; + + assert_impl_all!(Payload: Unpin); + assert_not_impl_any!(Payload: Send, Sync, UnwindSafe, RefUnwindSafe); + + assert_impl_all!(Inner: Unpin, Send, Sync); + assert_not_impl_any!(Inner: UnwindSafe, RefUnwindSafe); #[actix_rt::test] async fn test_unread_data() { @@ -270,7 +275,10 @@ mod tests { assert_eq!( Bytes::from("data"), - poll_fn(|cx| payload.readany(cx)).await.unwrap().unwrap() + poll_fn(|cx| Pin::new(&mut payload).poll_next(cx)) + .await + .unwrap() + .unwrap() ); } } diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 131c7f1ed..5c11b1dab 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -45,7 +45,7 @@ where impl Future for SendResponse where T: AsyncRead + AsyncWrite + Unpin, - B: MessageBody + Unpin, + B: MessageBody, B::Error: Into, { type Output = Result, Error>; @@ -81,7 +81,7 @@ where // body is done when item is None body_done = item.is_none(); if body_done { - let _ = this.body.take(); + this.body.set(None); } let framed = this.framed.as_mut().as_pin_mut().unwrap(); framed diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 8fbefe6de..7f0f15ee6 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -108,8 +108,8 @@ where match Pin::new(&mut this.connection).poll_accept(cx)? { Poll::Ready(Some((req, tx))) => { let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::H2(pl); + let payload = crate::h2::Payload::new(body); + let pl = Payload::H2 { payload }; let mut req = Request::with_payload(pl); let head = req.head_mut(); diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index cbcb6d0fc..47d51b420 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -98,3 +98,14 @@ where } } } + +#[cfg(test)] +mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; + + use static_assertions::assert_impl_all; + + use super::*; + + assert_impl_all!(Payload: Unpin, Send, Sync, UnwindSafe, RefUnwindSafe); +} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 2b7bc730b..f2b415790 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -58,7 +58,8 @@ pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; pub use self::message::ConnectionType; pub use self::message::Message; -pub use self::payload::{Payload, PayloadStream}; +#[allow(deprecated)] +pub use self::payload::{BoxedPayloadStream, Payload, PayloadStream}; pub use self::requests::{Request, RequestHead, RequestHeadType}; pub use self::responses::{Response, ResponseBuilder, ResponseHead}; pub use self::service::HttpService; diff --git a/actix-http/src/payload.rs b/actix-http/src/payload.rs index 69840e7c1..c9f338c7d 100644 --- a/actix-http/src/payload.rs +++ b/actix-http/src/payload.rs @@ -1,70 +1,89 @@ use std::{ + mem, pin::Pin, task::{Context, Poll}, }; use bytes::Bytes; use futures_core::Stream; -use h2::RecvStream; use crate::error::PayloadError; -// TODO: rename to boxed payload -/// A boxed payload. -pub type PayloadStream = Pin>>>; +/// A boxed payload stream. +pub type BoxedPayloadStream = Pin>>>; -/// A streaming payload. -pub enum Payload { - None, - H1(crate::h1::Payload), - H2(crate::h2::Payload), - Stream(S), +#[deprecated(since = "4.0.0", note = "Renamed to `BoxedPayloadStream`.")] +pub type PayloadStream = BoxedPayloadStream; + +pin_project_lite::pin_project! { + /// A streaming payload. + #[project = PayloadProj] + pub enum Payload { + None, + H1 { payload: crate::h1::Payload }, + H2 { payload: crate::h2::Payload }, + Stream { #[pin] payload: S }, + } } impl From for Payload { - fn from(v: crate::h1::Payload) -> Self { - Payload::H1(v) + fn from(payload: crate::h1::Payload) -> Self { + Payload::H1 { payload } } } impl From for Payload { - fn from(v: crate::h2::Payload) -> Self { - Payload::H2(v) + fn from(payload: crate::h2::Payload) -> Self { + Payload::H2 { payload } } } -impl From for Payload { - fn from(v: RecvStream) -> Self { - Payload::H2(crate::h2::Payload::new(v)) +impl From for Payload { + fn from(stream: h2::RecvStream) -> Self { + Payload::H2 { + payload: crate::h2::Payload::new(stream), + } } } -impl From for Payload { - fn from(pl: PayloadStream) -> Self { - Payload::Stream(pl) +impl From for Payload { + fn from(payload: BoxedPayloadStream) -> Self { + Payload::Stream { payload } } } impl Payload { /// Takes current payload and replaces it with `None` value pub fn take(&mut self) -> Payload { - std::mem::replace(self, Payload::None) + mem::replace(self, Payload::None) } } impl Stream for Payload where - S: Stream> + Unpin, + S: Stream>, { type Item = Result; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.get_mut() { - Payload::None => Poll::Ready(None), - Payload::H1(ref mut pl) => pl.readany(cx), - Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx), - Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx), + match self.project() { + PayloadProj::None => Poll::Ready(None), + PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx), + PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx), + PayloadProj::Stream { payload } => payload.poll_next(cx), } } } + +#[cfg(test)] +mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; + + use static_assertions::{assert_impl_all, assert_not_impl_any}; + + use super::*; + + assert_impl_all!(Payload: Unpin); + assert_not_impl_any!(Payload: Send, Sync, UnwindSafe, RefUnwindSafe); +} diff --git a/actix-http/src/requests/request.rs b/actix-http/src/requests/request.rs index 0254a8f11..4eaaba8e1 100644 --- a/actix-http/src/requests/request.rs +++ b/actix-http/src/requests/request.rs @@ -10,11 +10,12 @@ use std::{ use http::{header, Method, Uri, Version}; use crate::{ - header::HeaderMap, Extensions, HttpMessage, Message, Payload, PayloadStream, RequestHead, + header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Message, Payload, + RequestHead, }; /// An HTTP request. -pub struct Request

{ +pub struct Request

{ pub(crate) payload: Payload

, pub(crate) head: Message, pub(crate) conn_data: Option>, @@ -46,7 +47,7 @@ impl

HttpMessage for Request

{ } } -impl From> for Request { +impl From> for Request { fn from(head: Message) -> Self { Request { head, @@ -57,10 +58,10 @@ impl From> for Request { } } -impl Request { +impl Request { /// Create new Request instance #[allow(clippy::new_without_default)] - pub fn new() -> Request { + pub fn new() -> Request { Request { head: Message::new(), payload: Payload::None, diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index ea80345fe..1f76498ef 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -120,7 +120,7 @@ impl TestRequest { } /// Set request payload. - pub fn set_payload>(&mut self, data: B) -> &mut Self { + pub fn set_payload(&mut self, data: impl Into) -> &mut Self { let mut payload = crate::h1::Payload::empty(); payload.unread_data(data.into()); parts(&mut self.0).payload = Some(payload.into()); diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index 42ff0dba1..51fefae72 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -7,6 +7,7 @@ use std::{ io::{self, BufReader, Write}, net::{SocketAddr, TcpStream as StdTcpStream}, sync::Arc, + task::Poll, }; use actix_http::{ @@ -16,25 +17,37 @@ use actix_http::{ Error, HttpService, Method, Request, Response, StatusCode, Version, }; use actix_http_test::test_server; +use actix_rt::pin; use actix_service::{fn_factory_with_config, fn_service}; use actix_tls::connect::rustls::webpki_roots_cert_store; -use actix_utils::future::{err, ok}; +use actix_utils::future::{err, ok, poll_fn}; use bytes::{Bytes, BytesMut}; use derive_more::{Display, Error}; -use futures_core::Stream; -use futures_util::stream::{once, StreamExt as _}; +use futures_core::{ready, Stream}; +use futures_util::stream::once; use rustls::{Certificate, PrivateKey, ServerConfig as RustlsServerConfig, ServerName}; use rustls_pemfile::{certs, pkcs8_private_keys}; -async fn load_body(mut stream: S) -> Result +async fn load_body(stream: S) -> Result where - S: Stream> + Unpin, + S: Stream>, { - let mut body = BytesMut::new(); - while let Some(item) = stream.next().await { - body.extend_from_slice(&item?) - } - Ok(body) + let mut buf = BytesMut::new(); + + pin!(stream); + + poll_fn(|cx| loop { + let body = stream.as_mut(); + + match ready!(body.poll_next(cx)) { + Some(Ok(bytes)) => buf.extend_from_slice(&*bytes), + None => return Poll::Ready(Ok(())), + Some(Err(err)) => return Poll::Ready(Err(err)), + } + }) + .await?; + + Ok(buf) } fn tls_config() -> RustlsServerConfig { diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 8eabcee10..239f7f905 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -1233,7 +1233,7 @@ mod tests { // and should not consume the payload match payload { - actix_web::dev::Payload::H1(_) => {} //expected + actix_web::dev::Payload::H1 { .. } => {} //expected _ => unreachable!(), } } diff --git a/awc/src/client/connection.rs b/awc/src/client/connection.rs index 0e1f0bfec..456f119aa 100644 --- a/awc/src/client/connection.rs +++ b/awc/src/client/connection.rs @@ -267,7 +267,9 @@ where Connection::Tls(ConnectionType::H2(conn)) => { h2proto::send_request(conn, head.into(), body).await } - _ => unreachable!("Plain Tcp connection can be used only in Http1 protocol"), + _ => { + unreachable!("Plain TCP connection can be used only with HTTP/1.1 protocol") + } } }) } diff --git a/awc/src/client/h1proto.rs b/awc/src/client/h1proto.rs index 1028a2178..cf716db72 100644 --- a/awc/src/client/h1proto.rs +++ b/awc/src/client/h1proto.rs @@ -13,16 +13,17 @@ use actix_http::{ Payload, RequestHeadType, ResponseHead, StatusCode, }; use actix_utils::future::poll_fn; -use bytes::buf::BufMut; -use bytes::{Bytes, BytesMut}; +use bytes::{buf::BufMut, Bytes, BytesMut}; use futures_core::{ready, Stream}; use futures_util::SinkExt as _; use pin_project_lite::pin_project; use crate::BoxError; -use super::connection::{ConnectionIo, H1Connection}; -use super::error::{ConnectError, SendRequestError}; +use super::{ + connection::{ConnectionIo, H1Connection}, + error::{ConnectError, SendRequestError}, +}; pub(crate) async fn send_request( io: H1Connection, @@ -123,7 +124,12 @@ where Ok((head, Payload::None)) } - _ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))), + _ => Ok(( + head, + Payload::Stream { + payload: Box::pin(PlStream::new(framed)), + }, + )), } } diff --git a/awc/src/response.rs b/awc/src/response.rs index fefebd0a0..78cc339b4 100644 --- a/awc/src/response.rs +++ b/awc/src/response.rs @@ -10,8 +10,8 @@ use std::{ }; use actix_http::{ - error::PayloadError, header, header::HeaderMap, Extensions, HttpMessage, Payload, - PayloadStream, ResponseHead, StatusCode, Version, + error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, + HttpMessage, Payload, ResponseHead, StatusCode, Version, }; use actix_rt::time::{sleep, Sleep}; use bytes::{Bytes, BytesMut}; @@ -23,7 +23,7 @@ use crate::cookie::{Cookie, ParseError as CookieParseError}; use crate::error::JsonPayloadError; /// Client Response -pub struct ClientResponse { +pub struct ClientResponse { pub(crate) head: ResponseHead, pub(crate) payload: Payload, pub(crate) timeout: ResponseTimeout, diff --git a/awc/src/sender.rs b/awc/src/sender.rs index f83a70a9b..29c814531 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -20,7 +20,7 @@ use futures_core::Stream; use serde::Serialize; #[cfg(feature = "__compress")] -use actix_http::{encoding::Decoder, header::ContentEncoding, Payload, PayloadStream}; +use actix_http::{encoding::Decoder, header::ContentEncoding, Payload}; use crate::{ any_body::AnyBody, @@ -91,7 +91,7 @@ impl SendClientRequest { #[cfg(feature = "__compress")] impl Future for SendClientRequest { - type Output = Result>>, SendRequestError>; + type Output = Result>, SendRequestError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -108,12 +108,13 @@ impl Future for SendClientRequest { res.into_client_response()._timeout(delay.take()).map_body( |head, payload| { if *response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) + Payload::Stream { + payload: Decoder::from_headers(payload, &head.headers), + } } else { - Payload::Stream(Decoder::new( - payload, - ContentEncoding::Identity, - )) + Payload::Stream { + payload: Decoder::new(payload, ContentEncoding::Identity), + } } }, ) diff --git a/awc/src/test.rs b/awc/src/test.rs index 1b41efc93..96ae1f0a1 100644 --- a/awc/src/test.rs +++ b/awc/src/test.rs @@ -65,7 +65,7 @@ impl TestResponse { /// Set response's payload pub fn set_payload>(mut self, data: B) -> Self { - let mut payload = h1::Payload::empty(); + let (_, mut payload) = h1::Payload::create(true); payload.unread_data(data.into()); self.payload = Some(payload.into()); self @@ -90,7 +90,8 @@ impl TestResponse { if let Some(pl) = self.payload { ClientResponse::new(head, pl) } else { - ClientResponse::new(head, h1::Payload::empty().into()) + let (_, payload) = h1::Payload::create(true); + ClientResponse::new(head, payload.into()) } } } diff --git a/src/dev.rs b/src/dev.rs index 23a40f292..6e1970467 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -14,7 +14,7 @@ pub use crate::types::form::UrlEncoded; pub use crate::types::json::JsonBody; pub use crate::types::readlines::Readlines; -pub use actix_http::{Extensions, Payload, PayloadStream, RequestHead, Response, ResponseHead}; +pub use actix_http::{Extensions, Payload, RequestHead, Response, ResponseHead}; pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; pub use actix_server::{Server, ServerHandle}; pub use actix_service::{ diff --git a/src/response/builder.rs b/src/response/builder.rs index b500ab331..93d8ab567 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -429,9 +429,12 @@ mod tests { use actix_http::body; use super::*; - use crate::http::{ - header::{self, HeaderValue, CONTENT_TYPE}, - StatusCode, + use crate::{ + http::{ + header::{self, HeaderValue, CONTENT_TYPE}, + StatusCode, + }, + test::assert_body_eq, }; #[test] @@ -472,32 +475,23 @@ mod tests { #[actix_rt::test] async fn test_json() { - let resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let res = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); - let resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let res = HttpResponse::Ok().json(&["v1", "v2", "v3"]); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); // content type override - let resp = HttpResponse::Ok() + let res = HttpResponse::Ok() .insert_header((CONTENT_TYPE, "text/json")) .json(&vec!["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("text/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); } #[actix_rt::test] diff --git a/src/service.rs b/src/service.rs index 9ccf5274d..d5c381fa4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -7,7 +7,7 @@ use std::{ use actix_http::{ body::{BoxBody, EitherBody, MessageBody}, header::HeaderMap, - Extensions, HttpMessage, Method, Payload, PayloadStream, RequestHead, Response, + BoxedPayloadStream, Extensions, HttpMessage, Method, Payload, RequestHead, Response, ResponseHead, StatusCode, Uri, Version, }; use actix_router::{IntoPatterns, Path, Patterns, Resource, ResourceDef, Url}; @@ -293,7 +293,7 @@ impl Resource for ServiceRequest { } impl HttpMessage for ServiceRequest { - type Stream = PayloadStream; + type Stream = BoxedPayloadStream; #[inline] /// Returns Request's headers. diff --git a/src/test/test_request.rs b/src/test/test_request.rs index fd3355ef3..5c4de9084 100644 --- a/src/test/test_request.rs +++ b/src/test/test_request.rs @@ -174,25 +174,28 @@ impl TestRequest { } /// Set request payload. - pub fn set_payload>(mut self, data: B) -> Self { + pub fn set_payload(mut self, data: impl Into) -> Self { self.req.set_payload(data); self } - /// Serialize `data` to a URL encoded form and set it as the request payload. The `Content-Type` - /// header is set to `application/x-www-form-urlencoded`. - pub fn set_form(mut self, data: &T) -> Self { - let bytes = serde_urlencoded::to_string(data) + /// Serialize `data` to a URL encoded form and set it as the request payload. + /// + /// The `Content-Type` header is set to `application/x-www-form-urlencoded`. + pub fn set_form(mut self, data: impl Serialize) -> Self { + let bytes = serde_urlencoded::to_string(&data) .expect("Failed to serialize test data as a urlencoded form"); self.req.set_payload(bytes); self.req.insert_header(ContentType::form_url_encoded()); self } - /// Serialize `data` to JSON and set it as the request payload. The `Content-Type` header is - /// set to `application/json`. - pub fn set_json(mut self, data: &T) -> Self { - let bytes = serde_json::to_string(data).expect("Failed to serialize test data to json"); + /// Serialize `data` to JSON and set it as the request payload. + /// + /// The `Content-Type` header is set to `application/json`. + pub fn set_json(mut self, data: impl Serialize) -> Self { + let bytes = + serde_json::to_string(&data).expect("Failed to serialize test data to json"); self.req.set_payload(bytes); self.req.insert_header(ContentType::json()); self