mirror of
https://github.com/actix/actix-web.git
synced 2024-11-25 19:11:10 +00:00
ClientRequest::send_body
takes impl MessageBody
(#2546)
This commit is contained in:
parent
1296e07c48
commit
d2590fd46c
18 changed files with 853 additions and 687 deletions
66
.github/workflows/ci-master.yml
vendored
Normal file
66
.github/workflows/ci-master.yml
vendored
Normal file
|
@ -0,0 +1,66 @@
|
|||
name: CI (master only)
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
|
||||
jobs:
|
||||
ci_feature_powerset_check:
|
||||
name: Verify Feature Combinations
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: generate-lockfile }
|
||||
- name: Cache Dependencies
|
||||
uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
- name: Install cargo-hack
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: install
|
||||
args: cargo-hack
|
||||
|
||||
- name: check feature combinations
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: ci-check-all-feature-powerset }
|
||||
|
||||
- name: check feature combinations
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: ci-check-all-feature-powerset-linux }
|
||||
|
||||
coverage:
|
||||
name: coverage
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: generate-lockfile }
|
||||
- name: Cache Dependencies
|
||||
uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
- name: Generate coverage file
|
||||
run: |
|
||||
cargo install cargo-tarpaulin --vers "^0.13"
|
||||
cargo tarpaulin --workspace --features=rustls,openssl --out Xml --verbose
|
||||
- name: Upload to Codecov
|
||||
uses: codecov/codecov-action@v1
|
||||
with: { file: cobertura.xml }
|
62
.github/workflows/ci.yml
vendored
62
.github/workflows/ci.yml
vendored
|
@ -96,68 +96,6 @@ jobs:
|
|||
cargo install cargo-cache --version 0.6.3 --no-default-features --features ci-autoclean
|
||||
cargo-cache
|
||||
|
||||
ci_feature_powerset_check:
|
||||
name: Verify Feature Combinations
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: generate-lockfile }
|
||||
- name: Cache Dependencies
|
||||
uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
- name: Install cargo-hack
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: install
|
||||
args: cargo-hack
|
||||
|
||||
- name: check feature combinations
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: ci-check-all-feature-powerset }
|
||||
|
||||
- name: check feature combinations
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: ci-check-all-feature-powerset-linux }
|
||||
|
||||
coverage:
|
||||
name: coverage
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: generate-lockfile }
|
||||
- name: Cache Dependencies
|
||||
uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
- name: Generate coverage file
|
||||
if: github.ref == 'refs/heads/master'
|
||||
run: |
|
||||
cargo install cargo-tarpaulin --vers "^0.13"
|
||||
cargo tarpaulin --workspace --features=rustls,openssl --out Xml --verbose
|
||||
- name: Upload to Codecov
|
||||
if: github.ref == 'refs/heads/master'
|
||||
uses: codecov/codecov-action@v1
|
||||
with: { file: cobertura.xml }
|
||||
|
||||
rustdoc:
|
||||
name: doc tests
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
@ -3,8 +3,14 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
- Rename `Connector::{ssl => openssl}`. [#2503]
|
||||
- Improve `Client` instantiation efficiency when using `openssl` by only building connectors once. [#2503]
|
||||
- `ClientRequest::send_body` now takes an `impl MessageBody`. [#2546]
|
||||
- Rename `MessageBody => ResponseBody` to avoid conflicts with `MessageBody` trait. [#2546]
|
||||
- `impl Future` for `ResponseBody` no longer requires the body type be `Unpin`. [#2546]
|
||||
- `impl Future` for `JsonBody` no longer requires the body type be `Unpin`. [#2546]
|
||||
- `impl Stream` for `ClientResponse` no longer requires the body type be `Unpin`. [#2546]
|
||||
|
||||
[#2503]: https://github.com/actix/actix-web/pull/2503
|
||||
[#2546]: https://github.com/actix/actix-web/pull/2546
|
||||
|
||||
|
||||
## 3.0.0-beta.14 - 2021-12-17
|
||||
|
|
|
@ -77,10 +77,27 @@ impl<B> AnyBody<B>
|
|||
where
|
||||
B: MessageBody + 'static,
|
||||
{
|
||||
/// Converts a [`MessageBody`] type into the best possible representation.
|
||||
///
|
||||
/// Checks size for `None` and tries to convert to `Bytes`. Otherwise, uses the `Body` variant.
|
||||
pub fn from_message_body(body: B) -> Self
|
||||
where
|
||||
B: MessageBody,
|
||||
{
|
||||
if matches!(body.size(), BodySize::None) {
|
||||
return Self::None;
|
||||
}
|
||||
|
||||
match body.try_into_bytes() {
|
||||
Ok(body) => Self::Bytes { body },
|
||||
Err(body) => Self::new(body),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_boxed(self) -> AnyBody {
|
||||
match self {
|
||||
Self::None => AnyBody::None,
|
||||
Self::Bytes { body: bytes } => AnyBody::Bytes { body: bytes },
|
||||
Self::Bytes { body } => AnyBody::Bytes { body },
|
||||
Self::Body { body } => AnyBody::new_boxed(body),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ use crate::{
|
|||
client::{
|
||||
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
|
||||
},
|
||||
response::ClientResponse,
|
||||
ClientResponse,
|
||||
};
|
||||
|
||||
pub type BoxConnectorService = Rc<
|
||||
|
|
|
@ -5,13 +5,13 @@ use futures_core::Stream;
|
|||
use serde::Serialize;
|
||||
|
||||
use actix_http::{
|
||||
body::MessageBody,
|
||||
error::HttpError,
|
||||
header::{HeaderMap, HeaderName, TryIntoHeaderValue},
|
||||
Method, RequestHead, Uri,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
any_body::AnyBody,
|
||||
sender::{RequestSender, SendClientRequest},
|
||||
BoxError, ClientConfig,
|
||||
};
|
||||
|
@ -46,7 +46,7 @@ impl FrozenClientRequest {
|
|||
/// Send a body.
|
||||
pub fn send_body<B>(&self, body: B) -> SendClientRequest
|
||||
where
|
||||
B: Into<AnyBody>,
|
||||
B: MessageBody + 'static,
|
||||
{
|
||||
RequestSender::Rc(self.head.clone(), None).send_body(
|
||||
self.addr,
|
||||
|
@ -159,7 +159,7 @@ impl FrozenSendBuilder {
|
|||
/// Complete request construction and send a body.
|
||||
pub fn send_body<B>(self, body: B) -> SendClientRequest
|
||||
where
|
||||
B: Into<AnyBody>,
|
||||
B: MessageBody + 'static,
|
||||
{
|
||||
if let Some(e) = self.err {
|
||||
return e.into();
|
||||
|
|
|
@ -113,7 +113,7 @@ pub mod error;
|
|||
mod frozen;
|
||||
pub mod middleware;
|
||||
mod request;
|
||||
mod response;
|
||||
mod responses;
|
||||
mod sender;
|
||||
pub mod test;
|
||||
pub mod ws;
|
||||
|
@ -128,7 +128,8 @@ pub use self::client::Connector;
|
|||
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
|
||||
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
|
||||
pub use self::request::ClientRequest;
|
||||
pub use self::response::{ClientResponse, JsonBody, MessageBody};
|
||||
#[allow(deprecated)]
|
||||
pub use self::responses::{ClientResponse, JsonBody, MessageBody, ResponseBody};
|
||||
pub use self::sender::SendClientRequest;
|
||||
|
||||
use std::{convert::TryFrom, rc::Rc, time::Duration};
|
||||
|
|
|
@ -190,9 +190,7 @@ where
|
|||
let body_new = if is_redirect {
|
||||
// try to reuse body
|
||||
match body {
|
||||
Some(ref bytes) => AnyBody::Bytes {
|
||||
body: bytes.clone(),
|
||||
},
|
||||
Some(ref bytes) => AnyBody::from(bytes.clone()),
|
||||
// TODO: should this be AnyBody::Empty or AnyBody::None.
|
||||
_ => AnyBody::empty(),
|
||||
}
|
||||
|
|
|
@ -5,13 +5,13 @@ use futures_core::Stream;
|
|||
use serde::Serialize;
|
||||
|
||||
use actix_http::{
|
||||
body::MessageBody,
|
||||
error::HttpError,
|
||||
header::{self, HeaderMap, HeaderValue, TryIntoHeaderPair},
|
||||
ConnectionType, Method, RequestHead, Uri, Version,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
any_body::AnyBody,
|
||||
error::{FreezeRequestError, InvalidUrl},
|
||||
frozen::FrozenClientRequest,
|
||||
sender::{PrepForSendingError, RequestSender, SendClientRequest},
|
||||
|
@ -26,20 +26,20 @@ use crate::cookie::{Cookie, CookieJar};
|
|||
/// This type can be used to construct an instance of `ClientRequest` through a
|
||||
/// builder-like pattern.
|
||||
///
|
||||
/// ```
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() {
|
||||
/// let response = awc::Client::new()
|
||||
/// .get("http://www.rust-lang.org") // <- Create request builder
|
||||
/// .insert_header(("User-Agent", "Actix-web"))
|
||||
/// .send() // <- Send HTTP request
|
||||
/// .await;
|
||||
/// ```no_run
|
||||
/// # #[actix_rt::main]
|
||||
/// # async fn main() {
|
||||
/// let response = awc::Client::new()
|
||||
/// .get("http://www.rust-lang.org") // <- Create request builder
|
||||
/// .insert_header(("User-Agent", "Actix-web"))
|
||||
/// .send() // <- Send HTTP request
|
||||
/// .await;
|
||||
///
|
||||
/// response.and_then(|response| { // <- server HTTP response
|
||||
/// println!("Response: {:?}", response);
|
||||
/// Ok(())
|
||||
/// });
|
||||
/// }
|
||||
/// response.and_then(|response| { // <- server HTTP response
|
||||
/// println!("Response: {:?}", response);
|
||||
/// Ok(())
|
||||
/// });
|
||||
/// # }
|
||||
/// ```
|
||||
pub struct ClientRequest {
|
||||
pub(crate) head: RequestHead,
|
||||
|
@ -174,17 +174,13 @@ impl ClientRequest {
|
|||
|
||||
/// Append a header, keeping any that were set with an equivalent field name.
|
||||
///
|
||||
/// ```
|
||||
/// # #[actix_rt::main]
|
||||
/// # async fn main() {
|
||||
/// # use awc::Client;
|
||||
/// use awc::http::header::CONTENT_TYPE;
|
||||
/// ```no_run
|
||||
/// use awc::{http::header, Client};
|
||||
///
|
||||
/// Client::new()
|
||||
/// .get("http://www.rust-lang.org")
|
||||
/// .insert_header(("X-TEST", "value"))
|
||||
/// .insert_header((CONTENT_TYPE, mime::APPLICATION_JSON));
|
||||
/// # }
|
||||
/// .insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON));
|
||||
/// ```
|
||||
pub fn append_header(mut self, header: impl TryIntoHeaderPair) -> Self {
|
||||
match header.try_into_pair() {
|
||||
|
@ -252,23 +248,25 @@ impl ClientRequest {
|
|||
|
||||
/// Set a cookie
|
||||
///
|
||||
/// ```
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() {
|
||||
/// let resp = awc::Client::new().get("https://www.rust-lang.org")
|
||||
/// .cookie(
|
||||
/// awc::cookie::Cookie::build("name", "value")
|
||||
/// .domain("www.rust-lang.org")
|
||||
/// .path("/")
|
||||
/// .secure(true)
|
||||
/// .http_only(true)
|
||||
/// .finish(),
|
||||
/// )
|
||||
/// .send()
|
||||
/// .await;
|
||||
/// ```no_run
|
||||
/// use awc::{cookie, Client};
|
||||
///
|
||||
/// println!("Response: {:?}", resp);
|
||||
/// }
|
||||
/// # #[actix_rt::main]
|
||||
/// # async fn main() {
|
||||
/// let resp = Client::new().get("https://www.rust-lang.org")
|
||||
/// .cookie(
|
||||
/// awc::cookie::Cookie::build("name", "value")
|
||||
/// .domain("www.rust-lang.org")
|
||||
/// .path("/")
|
||||
/// .secure(true)
|
||||
/// .http_only(true)
|
||||
/// .finish(),
|
||||
/// )
|
||||
/// .send()
|
||||
/// .await;
|
||||
///
|
||||
/// println!("Response: {:?}", resp);
|
||||
/// # }
|
||||
/// ```
|
||||
#[cfg(feature = "cookies")]
|
||||
pub fn cookie(mut self, cookie: Cookie<'_>) -> Self {
|
||||
|
@ -340,7 +338,7 @@ impl ClientRequest {
|
|||
/// Complete request construction and send body.
|
||||
pub fn send_body<B>(self, body: B) -> SendClientRequest
|
||||
where
|
||||
B: Into<AnyBody>,
|
||||
B: MessageBody + 'static,
|
||||
{
|
||||
let slf = match self.prep_for_sending() {
|
||||
Ok(slf) => slf,
|
||||
|
|
|
@ -1,556 +0,0 @@
|
|||
use std::{
|
||||
cell::{Ref, RefMut},
|
||||
fmt,
|
||||
future::Future,
|
||||
io,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_http::{
|
||||
error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions,
|
||||
HttpMessage, Payload, ResponseHead, StatusCode, Version,
|
||||
};
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_core::{ready, Stream};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
#[cfg(feature = "cookies")]
|
||||
use crate::cookie::{Cookie, ParseError as CookieParseError};
|
||||
use crate::error::JsonPayloadError;
|
||||
|
||||
/// Client Response
|
||||
pub struct ClientResponse<S = BoxedPayloadStream> {
|
||||
pub(crate) head: ResponseHead,
|
||||
pub(crate) payload: Payload<S>,
|
||||
pub(crate) timeout: ResponseTimeout,
|
||||
}
|
||||
|
||||
/// helper enum with reusable sleep passed from `SendClientResponse`.
|
||||
/// See `ClientResponse::_timeout` for reason.
|
||||
pub(crate) enum ResponseTimeout {
|
||||
Disabled(Option<Pin<Box<Sleep>>>),
|
||||
Enabled(Pin<Box<Sleep>>),
|
||||
}
|
||||
|
||||
impl Default for ResponseTimeout {
|
||||
fn default() -> Self {
|
||||
Self::Disabled(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseTimeout {
|
||||
fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
|
||||
match *self {
|
||||
Self::Enabled(ref mut timeout) => {
|
||||
if timeout.as_mut().poll(cx).is_ready() {
|
||||
Err(PayloadError::Io(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"Response Payload IO timed out",
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Self::Disabled(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> HttpMessage for ClientResponse<S> {
|
||||
type Stream = S;
|
||||
|
||||
fn headers(&self) -> &HeaderMap {
|
||||
&self.head.headers
|
||||
}
|
||||
|
||||
fn take_payload(&mut self) -> Payload<S> {
|
||||
std::mem::replace(&mut self.payload, Payload::None)
|
||||
}
|
||||
|
||||
fn extensions(&self) -> Ref<'_, Extensions> {
|
||||
self.head.extensions()
|
||||
}
|
||||
|
||||
fn extensions_mut(&self) -> RefMut<'_, Extensions> {
|
||||
self.head.extensions_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ClientResponse<S> {
|
||||
/// Create new Request instance
|
||||
pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
|
||||
ClientResponse {
|
||||
head,
|
||||
payload,
|
||||
timeout: ResponseTimeout::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn head(&self) -> &ResponseHead {
|
||||
&self.head
|
||||
}
|
||||
|
||||
/// Read the Request Version.
|
||||
#[inline]
|
||||
pub fn version(&self) -> Version {
|
||||
self.head().version
|
||||
}
|
||||
|
||||
/// Get the status from the server.
|
||||
#[inline]
|
||||
pub fn status(&self) -> StatusCode {
|
||||
self.head().status
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns request's headers.
|
||||
pub fn headers(&self) -> &HeaderMap {
|
||||
&self.head().headers
|
||||
}
|
||||
|
||||
/// Set a body and return previous body value
|
||||
pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
|
||||
where
|
||||
F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
|
||||
{
|
||||
let payload = f(&mut self.head, self.payload);
|
||||
|
||||
ClientResponse {
|
||||
payload,
|
||||
head: self.head,
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set a timeout duration for [`ClientResponse`](self::ClientResponse).
|
||||
///
|
||||
/// This duration covers the duration of processing the response body stream
|
||||
/// and would end it as timeout error when deadline met.
|
||||
///
|
||||
/// Disabled by default.
|
||||
pub fn timeout(self, dur: Duration) -> Self {
|
||||
let timeout = match self.timeout {
|
||||
ResponseTimeout::Disabled(Some(mut timeout))
|
||||
| ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
|
||||
Some(deadline) => {
|
||||
timeout.as_mut().reset(deadline.into());
|
||||
ResponseTimeout::Enabled(timeout)
|
||||
}
|
||||
None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
|
||||
},
|
||||
_ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
|
||||
};
|
||||
|
||||
Self {
|
||||
payload: self.payload,
|
||||
head: self.head,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// This method does not enable timeout. It's used to pass the boxed `Sleep` from
|
||||
/// `SendClientRequest` and reuse it's heap allocation together with it's slot in
|
||||
/// timer wheel.
|
||||
pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
|
||||
self.timeout = ResponseTimeout::Disabled(timeout);
|
||||
self
|
||||
}
|
||||
|
||||
/// Load request cookies.
|
||||
#[cfg(feature = "cookies")]
|
||||
pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
|
||||
struct Cookies(Vec<Cookie<'static>>);
|
||||
|
||||
if self.extensions().get::<Cookies>().is_none() {
|
||||
let mut cookies = Vec::new();
|
||||
for hdr in self.headers().get_all(&header::SET_COOKIE) {
|
||||
let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
|
||||
cookies.push(Cookie::parse_encoded(s)?.into_owned());
|
||||
}
|
||||
self.extensions_mut().insert(Cookies(cookies));
|
||||
}
|
||||
Ok(Ref::map(self.extensions(), |ext| {
|
||||
&ext.get::<Cookies>().unwrap().0
|
||||
}))
|
||||
}
|
||||
|
||||
/// Return request cookie.
|
||||
#[cfg(feature = "cookies")]
|
||||
pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
|
||||
if let Ok(cookies) = self.cookies() {
|
||||
for cookie in cookies.iter() {
|
||||
if cookie.name() == name {
|
||||
return Some(cookie.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
/// Loads HTTP response's body.
|
||||
pub fn body(&mut self) -> MessageBody<S> {
|
||||
MessageBody::new(self)
|
||||
}
|
||||
|
||||
/// Loads and parse `application/json` encoded body.
|
||||
/// Return `JsonBody<T>` future. It resolves to a `T` value.
|
||||
///
|
||||
/// Returns error:
|
||||
///
|
||||
/// * content type is not `application/json`
|
||||
/// * content length is greater than 256k
|
||||
pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
|
||||
JsonBody::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
{
|
||||
type Item = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
this.timeout.poll_timeout(cx)?;
|
||||
|
||||
Pin::new(&mut this.payload).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> fmt::Debug for ClientResponse<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers().iter() {
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024;
|
||||
|
||||
/// Future that resolves to a complete HTTP message body.
|
||||
pub struct MessageBody<S> {
|
||||
length: Option<usize>,
|
||||
timeout: ResponseTimeout,
|
||||
body: Result<ReadBody<S>, Option<PayloadError>>,
|
||||
}
|
||||
|
||||
impl<S> MessageBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
/// Create `MessageBody` for request.
|
||||
pub fn new(res: &mut ClientResponse<S>) -> MessageBody<S> {
|
||||
let length = match res.headers().get(&header::CONTENT_LENGTH) {
|
||||
Some(value) => {
|
||||
let len = value.to_str().ok().and_then(|s| s.parse::<usize>().ok());
|
||||
|
||||
match len {
|
||||
None => return Self::err(PayloadError::UnknownLength),
|
||||
len => len,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
MessageBody {
|
||||
length,
|
||||
timeout: std::mem::take(&mut res.timeout),
|
||||
body: Ok(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size of payload. By default max size is 2048kB
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
if let Ok(ref mut body) = self.body {
|
||||
body.limit = limit;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
fn err(e: PayloadError) -> Self {
|
||||
MessageBody {
|
||||
length: None,
|
||||
timeout: ResponseTimeout::default(),
|
||||
body: Err(Some(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for MessageBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
{
|
||||
type Output = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
match this.body {
|
||||
Err(ref mut err) => Poll::Ready(Err(err.take().unwrap())),
|
||||
Ok(ref mut body) => {
|
||||
if let Some(len) = this.length.take() {
|
||||
if len > body.limit {
|
||||
return Poll::Ready(Err(PayloadError::Overflow));
|
||||
}
|
||||
}
|
||||
|
||||
this.timeout.poll_timeout(cx)?;
|
||||
|
||||
Pin::new(body).poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Response's payload json parser, it resolves to a deserialized `T` value.
|
||||
///
|
||||
/// Returns error:
|
||||
///
|
||||
/// * content type is not `application/json`
|
||||
/// * content length is greater than 64k
|
||||
pub struct JsonBody<S, U> {
|
||||
length: Option<usize>,
|
||||
err: Option<JsonPayloadError>,
|
||||
timeout: ResponseTimeout,
|
||||
fut: Option<ReadBody<S>>,
|
||||
_phantom: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<S, U> JsonBody<S, U>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
U: DeserializeOwned,
|
||||
{
|
||||
/// Create `JsonBody` for request.
|
||||
pub fn new(res: &mut ClientResponse<S>) -> Self {
|
||||
// check content-type
|
||||
let json = if let Ok(Some(mime)) = res.mime_type() {
|
||||
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if !json {
|
||||
return JsonBody {
|
||||
length: None,
|
||||
fut: None,
|
||||
timeout: ResponseTimeout::default(),
|
||||
err: Some(JsonPayloadError::ContentType),
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
}
|
||||
|
||||
let mut len = None;
|
||||
|
||||
if let Some(l) = res.headers().get(&header::CONTENT_LENGTH) {
|
||||
if let Ok(s) = l.to_str() {
|
||||
if let Ok(l) = s.parse::<usize>() {
|
||||
len = Some(l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JsonBody {
|
||||
length: len,
|
||||
err: None,
|
||||
timeout: std::mem::take(&mut res.timeout),
|
||||
fut: Some(ReadBody::new(res.take_payload(), 65536)),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size of payload. By default max size is 64kB
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
fut.limit = limit;
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Unpin for JsonBody<T, U>
|
||||
where
|
||||
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
U: DeserializeOwned,
|
||||
{
|
||||
}
|
||||
|
||||
impl<T, U> Future for JsonBody<T, U>
|
||||
where
|
||||
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
U: DeserializeOwned,
|
||||
{
|
||||
type Output = Result<U, JsonPayloadError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
if let Some(err) = self.err.take() {
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
|
||||
if let Some(len) = self.length.take() {
|
||||
if len > self.fut.as_ref().unwrap().limit {
|
||||
return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow)));
|
||||
}
|
||||
}
|
||||
|
||||
self.timeout
|
||||
.poll_timeout(cx)
|
||||
.map_err(JsonPayloadError::Payload)?;
|
||||
|
||||
let body = ready!(Pin::new(&mut self.get_mut().fut.as_mut().unwrap()).poll(cx))?;
|
||||
Poll::Ready(serde_json::from_slice::<U>(&body).map_err(JsonPayloadError::from))
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadBody<S> {
|
||||
stream: Payload<S>,
|
||||
buf: BytesMut,
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
impl<S> ReadBody<S> {
|
||||
fn new(stream: Payload<S>, limit: usize) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
buf: BytesMut::new(),
|
||||
limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for ReadBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
{
|
||||
type Output = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
while let Some(chunk) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) {
|
||||
if (this.buf.len() + chunk.len()) > this.limit {
|
||||
return Poll::Ready(Err(PayloadError::Overflow));
|
||||
}
|
||||
this.buf.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(this.buf.split().freeze()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{http::header, test::TestResponse};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_body() {
|
||||
let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish();
|
||||
match req.body().await.err().unwrap() {
|
||||
PayloadError::UnknownLength => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
|
||||
let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "10000000")).finish();
|
||||
match req.body().await.err().unwrap() {
|
||||
PayloadError::Overflow => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.set_payload(Bytes::from_static(b"test"))
|
||||
.finish();
|
||||
assert_eq!(req.body().await.ok().unwrap(), Bytes::from_static(b"test"));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.set_payload(Bytes::from_static(b"11111111111111"))
|
||||
.finish();
|
||||
match req.body().limit(5).await.err().unwrap() {
|
||||
PayloadError::Overflow => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||
struct MyObject {
|
||||
name: String,
|
||||
}
|
||||
|
||||
fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool {
|
||||
match err {
|
||||
JsonPayloadError::Payload(PayloadError::Overflow) => {
|
||||
matches!(other, JsonPayloadError::Payload(PayloadError::Overflow))
|
||||
}
|
||||
JsonPayloadError::ContentType => matches!(other, JsonPayloadError::ContentType),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_json_body() {
|
||||
let mut req = TestResponse::default().finish();
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/text"),
|
||||
))
|
||||
.finish();
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/json"),
|
||||
))
|
||||
.insert_header((
|
||||
header::CONTENT_LENGTH,
|
||||
header::HeaderValue::from_static("10000"),
|
||||
))
|
||||
.finish();
|
||||
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).limit(100).await;
|
||||
assert!(json_eq(
|
||||
json.err().unwrap(),
|
||||
JsonPayloadError::Payload(PayloadError::Overflow)
|
||||
));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/json"),
|
||||
))
|
||||
.insert_header((
|
||||
header::CONTENT_LENGTH,
|
||||
header::HeaderValue::from_static("16"),
|
||||
))
|
||||
.set_payload(Bytes::from_static(b"{\"name\": \"test\"}"))
|
||||
.finish();
|
||||
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert_eq!(
|
||||
json.ok().unwrap(),
|
||||
MyObject {
|
||||
name: "test".to_owned()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
192
awc/src/responses/json_body.rs
Normal file
192
awc/src/responses/json_body.rs
Normal file
|
@ -0,0 +1,192 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_http::{error::PayloadError, header, HttpMessage};
|
||||
use bytes::Bytes;
|
||||
use futures_core::{ready, Stream};
|
||||
use pin_project_lite::pin_project;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT};
|
||||
use crate::{error::JsonPayloadError, ClientResponse};
|
||||
|
||||
pin_project! {
|
||||
/// A `Future` that reads a body stream, parses JSON, resolving to a deserialized `T`.
|
||||
///
|
||||
/// # Errors
|
||||
/// `Future` implementation returns error if:
|
||||
/// - content type is not `application/json`;
|
||||
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
|
||||
pub struct JsonBody<S, T> {
|
||||
#[pin]
|
||||
body: Option<ReadBody<S>>,
|
||||
length: Option<usize>,
|
||||
timeout: ResponseTimeout,
|
||||
err: Option<JsonPayloadError>,
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> JsonBody<S, T>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
/// Creates a JSON body stream reader from a response by taking its payload.
|
||||
pub fn new(res: &mut ClientResponse<S>) -> Self {
|
||||
// check content-type
|
||||
let json = if let Ok(Some(mime)) = res.mime_type() {
|
||||
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if !json {
|
||||
return JsonBody {
|
||||
length: None,
|
||||
body: None,
|
||||
timeout: ResponseTimeout::default(),
|
||||
err: Some(JsonPayloadError::ContentType),
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
}
|
||||
|
||||
let length = res
|
||||
.headers()
|
||||
.get(&header::CONTENT_LENGTH)
|
||||
.and_then(|len_hdr| len_hdr.to_str().ok())
|
||||
.and_then(|len_str| len_str.parse::<usize>().ok());
|
||||
|
||||
JsonBody {
|
||||
body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)),
|
||||
length,
|
||||
timeout: mem::take(&mut res.timeout),
|
||||
err: None,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size of payload. Default limit is 2 MiB.
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
if let Some(ref mut fut) = self.body {
|
||||
fut.limit = limit;
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> Future for JsonBody<S, T>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
type Output = Result<T, JsonPayloadError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
if let Some(err) = this.err.take() {
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
|
||||
if let Some(len) = this.length.take() {
|
||||
let body = Option::as_ref(&this.body).unwrap();
|
||||
if len > body.limit {
|
||||
return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow)));
|
||||
}
|
||||
}
|
||||
|
||||
this.timeout
|
||||
.poll_timeout(cx)
|
||||
.map_err(JsonPayloadError::Payload)?;
|
||||
|
||||
let body = ready!(this.body.as_pin_mut().unwrap().poll(cx))?;
|
||||
Poll::Ready(serde_json::from_slice::<T>(&body).map_err(JsonPayloadError::from))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use actix_http::BoxedPayloadStream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use static_assertions::assert_impl_all;
|
||||
|
||||
use super::*;
|
||||
use crate::{http::header, test::TestResponse};
|
||||
|
||||
assert_impl_all!(JsonBody<BoxedPayloadStream, String>: Unpin);
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||
struct MyObject {
|
||||
name: String,
|
||||
}
|
||||
|
||||
fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool {
|
||||
match err {
|
||||
JsonPayloadError::Payload(PayloadError::Overflow) => {
|
||||
matches!(other, JsonPayloadError::Payload(PayloadError::Overflow))
|
||||
}
|
||||
JsonPayloadError::ContentType => matches!(other, JsonPayloadError::ContentType),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn read_json_body() {
|
||||
let mut req = TestResponse::default().finish();
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/text"),
|
||||
))
|
||||
.finish();
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/json"),
|
||||
))
|
||||
.insert_header((
|
||||
header::CONTENT_LENGTH,
|
||||
header::HeaderValue::from_static("10000"),
|
||||
))
|
||||
.finish();
|
||||
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).limit(100).await;
|
||||
assert!(json_eq(
|
||||
json.err().unwrap(),
|
||||
JsonPayloadError::Payload(PayloadError::Overflow)
|
||||
));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.insert_header((
|
||||
header::CONTENT_TYPE,
|
||||
header::HeaderValue::from_static("application/json"),
|
||||
))
|
||||
.insert_header((
|
||||
header::CONTENT_LENGTH,
|
||||
header::HeaderValue::from_static("16"),
|
||||
))
|
||||
.set_payload(Bytes::from_static(b"{\"name\": \"test\"}"))
|
||||
.finish();
|
||||
|
||||
let json = JsonBody::<_, MyObject>::new(&mut req).await;
|
||||
assert_eq!(
|
||||
json.ok().unwrap(),
|
||||
MyObject {
|
||||
name: "test".to_owned()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
49
awc/src/responses/mod.rs
Normal file
49
awc/src/responses/mod.rs
Normal file
|
@ -0,0 +1,49 @@
|
|||
use std::{future::Future, io, pin::Pin, task::Context};
|
||||
|
||||
use actix_http::error::PayloadError;
|
||||
use actix_rt::time::Sleep;
|
||||
|
||||
mod json_body;
|
||||
mod read_body;
|
||||
mod response;
|
||||
mod response_body;
|
||||
|
||||
pub use self::json_body::JsonBody;
|
||||
pub use self::response::ClientResponse;
|
||||
#[allow(deprecated)]
|
||||
pub use self::response_body::{MessageBody, ResponseBody};
|
||||
|
||||
/// Default body size limit: 2 MiB
|
||||
const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024;
|
||||
|
||||
/// Helper enum with reusable sleep passed from `SendClientResponse`.
|
||||
///
|
||||
/// See [`ClientResponse::_timeout`] for reason.
|
||||
pub(crate) enum ResponseTimeout {
|
||||
Disabled(Option<Pin<Box<Sleep>>>),
|
||||
Enabled(Pin<Box<Sleep>>),
|
||||
}
|
||||
|
||||
impl Default for ResponseTimeout {
|
||||
fn default() -> Self {
|
||||
Self::Disabled(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseTimeout {
|
||||
fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
|
||||
match *self {
|
||||
Self::Enabled(ref mut timeout) => {
|
||||
if timeout.as_mut().poll(cx).is_ready() {
|
||||
Err(PayloadError::Io(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"Response Payload IO timed out",
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Self::Disabled(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
61
awc/src/responses/read_body.rs
Normal file
61
awc/src/responses/read_body.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_http::{error::PayloadError, Payload};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_core::{ready, Stream};
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
pin_project! {
|
||||
pub(crate) struct ReadBody<S> {
|
||||
#[pin]
|
||||
pub(crate) stream: Payload<S>,
|
||||
pub(crate) buf: BytesMut,
|
||||
pub(crate) limit: usize,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ReadBody<S> {
|
||||
pub(crate) fn new(stream: Payload<S>, limit: usize) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
buf: BytesMut::new(),
|
||||
limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for ReadBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
type Output = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
|
||||
while let Some(chunk) = ready!(this.stream.as_mut().poll_next(cx)?) {
|
||||
if (this.buf.len() + chunk.len()) > *this.limit {
|
||||
return Poll::Ready(Err(PayloadError::Overflow));
|
||||
}
|
||||
|
||||
this.buf.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(this.buf.split().freeze()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use static_assertions::assert_impl_all;
|
||||
|
||||
use super::*;
|
||||
use crate::any_body::AnyBody;
|
||||
|
||||
assert_impl_all!(ReadBody<()>: Unpin);
|
||||
assert_impl_all!(ReadBody<AnyBody>: Unpin);
|
||||
}
|
257
awc/src/responses/response.rs
Normal file
257
awc/src/responses/response.rs
Normal file
|
@ -0,0 +1,257 @@
|
|||
use std::{
|
||||
cell::{Ref, RefMut},
|
||||
fmt, mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_http::{
|
||||
error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions,
|
||||
HttpMessage, Payload, ResponseHead, StatusCode, Version,
|
||||
};
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use bytes::Bytes;
|
||||
use futures_core::Stream;
|
||||
use pin_project_lite::pin_project;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
#[cfg(feature = "cookies")]
|
||||
use crate::cookie::{Cookie, ParseError as CookieParseError};
|
||||
|
||||
use super::{JsonBody, ResponseBody, ResponseTimeout};
|
||||
|
||||
pin_project! {
|
||||
/// Client Response
|
||||
pub struct ClientResponse<S = BoxedPayloadStream> {
|
||||
pub(crate) head: ResponseHead,
|
||||
#[pin]
|
||||
pub(crate) payload: Payload<S>,
|
||||
pub(crate) timeout: ResponseTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ClientResponse<S> {
|
||||
/// Create new Request instance
|
||||
pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
|
||||
ClientResponse {
|
||||
head,
|
||||
payload,
|
||||
timeout: ResponseTimeout::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn head(&self) -> &ResponseHead {
|
||||
&self.head
|
||||
}
|
||||
|
||||
/// Read the Request Version.
|
||||
#[inline]
|
||||
pub fn version(&self) -> Version {
|
||||
self.head().version
|
||||
}
|
||||
|
||||
/// Get the status from the server.
|
||||
#[inline]
|
||||
pub fn status(&self) -> StatusCode {
|
||||
self.head().status
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns request's headers.
|
||||
pub fn headers(&self) -> &HeaderMap {
|
||||
&self.head().headers
|
||||
}
|
||||
|
||||
/// Set a body and return previous body value
|
||||
pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
|
||||
where
|
||||
F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
|
||||
{
|
||||
let payload = f(&mut self.head, self.payload);
|
||||
|
||||
ClientResponse {
|
||||
payload,
|
||||
head: self.head,
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set a timeout duration for [`ClientResponse`](self::ClientResponse).
|
||||
///
|
||||
/// This duration covers the duration of processing the response body stream
|
||||
/// and would end it as timeout error when deadline met.
|
||||
///
|
||||
/// Disabled by default.
|
||||
pub fn timeout(self, dur: Duration) -> Self {
|
||||
let timeout = match self.timeout {
|
||||
ResponseTimeout::Disabled(Some(mut timeout))
|
||||
| ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
|
||||
Some(deadline) => {
|
||||
timeout.as_mut().reset(deadline.into());
|
||||
ResponseTimeout::Enabled(timeout)
|
||||
}
|
||||
None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
|
||||
},
|
||||
_ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
|
||||
};
|
||||
|
||||
Self {
|
||||
payload: self.payload,
|
||||
head: self.head,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// This method does not enable timeout. It's used to pass the boxed `Sleep` from
|
||||
/// `SendClientRequest` and reuse it's heap allocation together with it's slot in
|
||||
/// timer wheel.
|
||||
pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
|
||||
self.timeout = ResponseTimeout::Disabled(timeout);
|
||||
self
|
||||
}
|
||||
|
||||
/// Load request cookies.
|
||||
#[cfg(feature = "cookies")]
|
||||
pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
|
||||
struct Cookies(Vec<Cookie<'static>>);
|
||||
|
||||
if self.extensions().get::<Cookies>().is_none() {
|
||||
let mut cookies = Vec::new();
|
||||
for hdr in self.headers().get_all(&header::SET_COOKIE) {
|
||||
let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
|
||||
cookies.push(Cookie::parse_encoded(s)?.into_owned());
|
||||
}
|
||||
self.extensions_mut().insert(Cookies(cookies));
|
||||
}
|
||||
Ok(Ref::map(self.extensions(), |ext| {
|
||||
&ext.get::<Cookies>().unwrap().0
|
||||
}))
|
||||
}
|
||||
|
||||
/// Return request cookie.
|
||||
#[cfg(feature = "cookies")]
|
||||
pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
|
||||
if let Ok(cookies) = self.cookies() {
|
||||
for cookie in cookies.iter() {
|
||||
if cookie.name() == name {
|
||||
return Some(cookie.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
/// Returns a [`Future`] that consumes the body stream and resolves to [`Bytes`].
|
||||
///
|
||||
/// # Errors
|
||||
/// `Future` implementation returns error if:
|
||||
/// - content type is not `application/json`
|
||||
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB)
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// # use awc::Client;
|
||||
/// # use bytes::Bytes;
|
||||
/// # #[actix_rt::main]
|
||||
/// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let client = Client::default();
|
||||
/// let mut res = client.get("https://httpbin.org/robots.txt").send().await?;
|
||||
/// let body: Bytes = res.body().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// [`Future`]: std::future::Future
|
||||
pub fn body(&mut self) -> ResponseBody<S> {
|
||||
ResponseBody::new(self)
|
||||
}
|
||||
|
||||
/// Returns a [`Future`] consumes the body stream, parses JSON, and resolves to a deserialized
|
||||
/// `T` value.
|
||||
///
|
||||
/// # Errors
|
||||
/// Future returns error if:
|
||||
/// - content type is not `application/json`;
|
||||
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// # use awc::Client;
|
||||
/// # #[actix_rt::main]
|
||||
/// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let client = Client::default();
|
||||
/// let mut res = client.get("https://httpbin.org/json").send().await?;
|
||||
/// let val = res.json::<serde_json::Value>().await?;
|
||||
/// assert!(val.is_object());
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// [`Future`]: std::future::Future
|
||||
pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
|
||||
JsonBody::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> fmt::Debug for ClientResponse<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers().iter() {
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> HttpMessage for ClientResponse<S> {
|
||||
type Stream = S;
|
||||
|
||||
fn headers(&self) -> &HeaderMap {
|
||||
&self.head.headers
|
||||
}
|
||||
|
||||
fn take_payload(&mut self) -> Payload<S> {
|
||||
mem::replace(&mut self.payload, Payload::None)
|
||||
}
|
||||
|
||||
fn extensions(&self) -> Ref<'_, Extensions> {
|
||||
self.head.extensions()
|
||||
}
|
||||
|
||||
fn extensions_mut(&self) -> RefMut<'_, Extensions> {
|
||||
self.head.extensions_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
||||
{
|
||||
type Item = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
this.timeout.poll_timeout(cx)?;
|
||||
this.payload.poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use static_assertions::assert_impl_all;
|
||||
|
||||
use super::*;
|
||||
use crate::any_body::AnyBody;
|
||||
|
||||
assert_impl_all!(ClientResponse: Unpin);
|
||||
assert_impl_all!(ClientResponse<()>: Unpin);
|
||||
assert_impl_all!(ClientResponse<AnyBody>: Unpin);
|
||||
}
|
144
awc/src/responses/response_body.rs
Normal file
144
awc/src/responses/response_body.rs
Normal file
|
@ -0,0 +1,144 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_http::{error::PayloadError, header, HttpMessage};
|
||||
use bytes::Bytes;
|
||||
use futures_core::Stream;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT};
|
||||
use crate::ClientResponse;
|
||||
|
||||
pin_project! {
|
||||
/// A `Future` that reads a body stream, resolving as [`Bytes`].
|
||||
///
|
||||
/// # Errors
|
||||
/// `Future` implementation returns error if:
|
||||
/// - content type is not `application/json`;
|
||||
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
|
||||
pub struct ResponseBody<S> {
|
||||
#[pin]
|
||||
body: Option<ReadBody<S>>,
|
||||
length: Option<usize>,
|
||||
timeout: ResponseTimeout,
|
||||
err: Option<PayloadError>,
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated(since = "3.0.0", note = "Renamed to `ResponseBody`.")]
|
||||
pub type MessageBody<B> = ResponseBody<B>;
|
||||
|
||||
impl<S> ResponseBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
/// Creates a body stream reader from a response by taking its payload.
|
||||
pub fn new(res: &mut ClientResponse<S>) -> ResponseBody<S> {
|
||||
let length = match res.headers().get(&header::CONTENT_LENGTH) {
|
||||
Some(value) => {
|
||||
let len = value.to_str().ok().and_then(|s| s.parse::<usize>().ok());
|
||||
|
||||
match len {
|
||||
None => return Self::err(PayloadError::UnknownLength),
|
||||
len => len,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
ResponseBody {
|
||||
body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)),
|
||||
length,
|
||||
timeout: mem::take(&mut res.timeout),
|
||||
err: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size limit of payload.
|
||||
///
|
||||
/// The default limit is 2 MiB.
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
if let Some(ref mut body) = self.body {
|
||||
body.limit = limit;
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
fn err(err: PayloadError) -> Self {
|
||||
ResponseBody {
|
||||
body: None,
|
||||
length: None,
|
||||
timeout: ResponseTimeout::default(),
|
||||
err: Some(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for ResponseBody<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||
{
|
||||
type Output = Result<Bytes, PayloadError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
if let Some(err) = this.err.take() {
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
|
||||
if let Some(len) = this.length.take() {
|
||||
let body = Option::as_ref(&this.body).unwrap();
|
||||
if len > body.limit {
|
||||
return Poll::Ready(Err(PayloadError::Overflow));
|
||||
}
|
||||
}
|
||||
|
||||
this.timeout.poll_timeout(cx)?;
|
||||
|
||||
this.body.as_pin_mut().unwrap().poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use static_assertions::assert_impl_all;
|
||||
|
||||
use super::*;
|
||||
use crate::{http::header, test::TestResponse};
|
||||
|
||||
assert_impl_all!(ResponseBody<()>: Unpin);
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn read_body() {
|
||||
let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish();
|
||||
match req.body().await.err().unwrap() {
|
||||
PayloadError::UnknownLength => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
|
||||
let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "10000000")).finish();
|
||||
match req.body().await.err().unwrap() {
|
||||
PayloadError::Overflow => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.set_payload(Bytes::from_static(b"test"))
|
||||
.finish();
|
||||
assert_eq!(req.body().await.ok().unwrap(), Bytes::from_static(b"test"));
|
||||
|
||||
let mut req = TestResponse::default()
|
||||
.set_payload(Bytes::from_static(b"11111111111111"))
|
||||
.finish();
|
||||
match req.body().limit(5).await.err().unwrap() {
|
||||
PayloadError::Overflow => {}
|
||||
_ => unreachable!("error"),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,7 +8,7 @@ use std::{
|
|||
};
|
||||
|
||||
use actix_http::{
|
||||
body::BodyStream,
|
||||
body::{BodyStream, MessageBody},
|
||||
error::HttpError,
|
||||
header::{self, HeaderMap, HeaderName, TryIntoHeaderValue},
|
||||
RequestHead, RequestHeadType,
|
||||
|
@ -189,15 +189,17 @@ impl RequestSender {
|
|||
body: B,
|
||||
) -> SendClientRequest
|
||||
where
|
||||
B: Into<AnyBody>,
|
||||
B: MessageBody + 'static,
|
||||
{
|
||||
let req = match self {
|
||||
RequestSender::Owned(head) => {
|
||||
ConnectRequest::Client(RequestHeadType::Owned(head), body.into(), addr)
|
||||
}
|
||||
RequestSender::Owned(head) => ConnectRequest::Client(
|
||||
RequestHeadType::Owned(head),
|
||||
AnyBody::from_message_body(body).into_boxed(),
|
||||
addr,
|
||||
),
|
||||
RequestSender::Rc(head, extra_headers) => ConnectRequest::Client(
|
||||
RequestHeadType::Rc(head, extra_headers),
|
||||
body.into(),
|
||||
AnyBody::from_message_body(body).into_boxed(),
|
||||
addr,
|
||||
),
|
||||
};
|
||||
|
@ -229,9 +231,7 @@ impl RequestSender {
|
|||
response_decompress,
|
||||
timeout,
|
||||
config,
|
||||
AnyBody::Bytes {
|
||||
body: Bytes::from(body),
|
||||
},
|
||||
AnyBody::from_message_body(body.into_bytes()),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -260,9 +260,7 @@ impl RequestSender {
|
|||
response_decompress,
|
||||
timeout,
|
||||
config,
|
||||
AnyBody::Bytes {
|
||||
body: Bytes::from(body),
|
||||
},
|
||||
AnyBody::from_message_body(body.into_bytes()),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -42,8 +42,7 @@ use crate::{
|
|||
header::{self, HeaderName, HeaderValue, TryIntoHeaderValue, AUTHORIZATION},
|
||||
ConnectionType, Method, StatusCode, Uri, Version,
|
||||
},
|
||||
response::ClientResponse,
|
||||
ClientConfig,
|
||||
ClientConfig, ClientResponse,
|
||||
};
|
||||
|
||||
#[cfg(feature = "cookies")]
|
||||
|
|
12
src/guard.rs
12
src/guard.rs
|
@ -270,13 +270,11 @@ impl Guard for HeaderGuard {
|
|||
/// ```
|
||||
/// use actix_web::{web, guard::Host, App, HttpResponse};
|
||||
///
|
||||
/// fn main() {
|
||||
/// App::new().service(
|
||||
/// web::resource("/index.html")
|
||||
/// .guard(Host("www.rust-lang.org"))
|
||||
/// .to(|| HttpResponse::MethodNotAllowed())
|
||||
/// );
|
||||
/// }
|
||||
/// App::new().service(
|
||||
/// web::resource("/index.html")
|
||||
/// .guard(Host("www.rust-lang.org"))
|
||||
/// .to(|| HttpResponse::MethodNotAllowed())
|
||||
/// );
|
||||
/// ```
|
||||
pub fn Host<H: AsRef<str>>(host: H) -> HostGuard {
|
||||
HostGuard(host.as_ref().to_string(), None)
|
||||
|
|
Loading…
Reference in a new issue