1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-30 12:00:38 +00:00

refactor payload related futures for HttpRequest

This commit is contained in:
Nikolay Kim 2018-02-25 20:34:26 +03:00
parent ab5ed27bf1
commit a2b98b31e8
7 changed files with 109 additions and 111 deletions

View file

@ -22,7 +22,7 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req); println!("{:?}", req);
// example of ... // example of ...
if let Ok(ch) = req.payload_mut().readany().poll() { if let Ok(ch) = req.poll() {
if let futures::Async::Ready(Some(d)) = ch { if let futures::Async::Ready(Some(d)) = ch {
println!("{}", String::from_utf8_lossy(d.as_ref())); println!("{}", String::from_utf8_lossy(d.as_ref()));
} }

View file

@ -34,9 +34,9 @@ fn index(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
const MAX_SIZE: usize = 262_144; // max payload size is 256k const MAX_SIZE: usize = 262_144; // max payload size is 256k
/// This handler manually load request payload and parse serde json /// This handler manually load request payload and parse serde json
fn index_manual(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index_manual(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
// readany() returns asynchronous stream of Bytes objects // HttpRequest is stream of Bytes objects
req.payload_mut().readany() req
// `Future::from_err` acts like `?` in that it coerces the error type from // `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type // the future into the final error type
.from_err() .from_err()
@ -63,8 +63,8 @@ fn index_manual(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Err
} }
/// This handler manually load request payload and parse json-rust /// This handler manually load request payload and parse json-rust
fn index_mjsonrust(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index_mjsonrust(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.payload_mut().readany().concat2() req.concat2()
.from_err() .from_err()
.and_then(|body| { .and_then(|body| {
// body is loaded, now we can deserialize json-rust // body is loaded, now we can deserialize json-rust

View file

@ -109,7 +109,7 @@ struct MyObj {name: String, number: i32}
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
// `concat2` will asynchronously read each chunk of the request body and // `concat2` will asynchronously read each chunk of the request body and
// return a single, concatenated, chunk // return a single, concatenated, chunk
req.payload_mut().readany().concat2() req.payload_mut().concat2()
// `Future::from_err` acts like `?` in that it coerces the error type from // `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type // the future into the final error type
.from_err() .from_err()
@ -256,13 +256,13 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
## Streaming request ## Streaming request
Actix uses [*Payload*](../actix_web/payload/struct.Payload.html) object as request payload stream. *HttpRequest* is a stream of `Bytes` objects. It could be used to read request
*HttpRequest* provides several methods, which can be used for payload access. body payload. At the same time actix uses
At the same time *Payload* implements *Stream* trait, so it could be used with various [*Payload*](../actix_web/payload/struct.Payload.html) object.
stream combinators. Also *Payload* provides several convenience methods that return *HttpRequest* provides several methods, which can be used for
future object that resolve to Bytes object. payload access.At the same time *Payload* implements *Stream* trait, so it
could be used with various stream combinators. Also *Payload* provides
* *readany()* method returns *Stream* of *Bytes* objects. several convenience methods that return future object that resolve to Bytes object.
* *readexactly()* method returns *Future* that resolves when specified number of bytes * *readexactly()* method returns *Future* that resolves when specified number of bytes
get received. get received.
@ -283,9 +283,7 @@ use futures::{Future, Stream};
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.payload() req.from_err()
.readany()
.from_err()
.fold((), |_, chunk| { .fold((), |_, chunk| {
println!("Chunk: {:?}", chunk); println!("Chunk: {:?}", chunk);
result::<_, error::PayloadError>(Ok(())) result::<_, error::PayloadError>(Ok(()))

View file

@ -5,7 +5,7 @@ use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::HashMap;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use cookie::Cookie; use cookie::Cookie;
use futures::{Async, Future, Stream, Poll}; use futures::{Future, Stream, Poll};
use http_range::HttpRange; use http_range::HttpRange;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use mime::Mime; use mime::Mime;
@ -155,8 +155,8 @@ impl<S> HttpRequest<S> {
HttpRequest(self.0.clone(), None, None) HttpRequest(self.0.clone(), None, None)
} }
// get mutable reference for inner message /// get mutable reference for inner message
// mutable reference should not be returned as result for request's method /// mutable reference should not be returned as result for request's method
#[inline(always)] #[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub(crate) fn as_mut(&self) -> &mut HttpMessage { pub(crate) fn as_mut(&self) -> &mut HttpMessage {
@ -480,8 +480,8 @@ impl<S> HttpRequest<S> {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn body(&self) -> RequestBody { pub fn body(self) -> RequestBody {
RequestBody::from_request(self) RequestBody::from(self)
} }
/// Return stream to http payload processes as multipart. /// Return stream to http payload processes as multipart.
@ -518,7 +518,7 @@ impl<S> HttpRequest<S> {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn multipart(&mut self) -> Multipart { pub fn multipart(self) -> Multipart {
Multipart::from_request(self) Multipart::from_request(self)
} }
@ -549,10 +549,8 @@ impl<S> HttpRequest<S> {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn urlencoded(&self) -> UrlEncoded { pub fn urlencoded(self) -> UrlEncoded {
UrlEncoded::from(self.payload().clone(), UrlEncoded::from(self)
self.headers(),
self.chunked().unwrap_or(false))
} }
/// Parse `application/json` encoded body. /// Parse `application/json` encoded body.
@ -585,7 +583,7 @@ impl<S> HttpRequest<S> {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn json<T: DeserializeOwned>(&self) -> JsonBody<S, T> { pub fn json<T: DeserializeOwned>(self) -> JsonBody<S, T> {
JsonBody::from_request(self) JsonBody::from_request(self)
} }
} }
@ -638,49 +636,24 @@ impl<S> fmt::Debug for HttpRequest<S> {
/// Future that resolves to a parsed urlencoded values. /// Future that resolves to a parsed urlencoded values.
pub struct UrlEncoded { pub struct UrlEncoded {
pl: Payload, req: Option<HttpRequest<()>>,
body: BytesMut, limit: usize,
error: Option<UrlencodedError>, fut: Option<Box<Future<Item=HashMap<String, String>, Error=UrlencodedError>>>,
} }
impl UrlEncoded { impl UrlEncoded {
pub fn from(pl: Payload, headers: &HeaderMap, chunked: bool) -> UrlEncoded { pub fn from<S>(req: HttpRequest<S>) -> UrlEncoded {
let mut encoded = UrlEncoded { UrlEncoded {
pl: pl, req: Some(req.clone_without_state()),
body: BytesMut::new(), limit: 262_144,
error: None fut: None,
};
if chunked {
encoded.error = Some(UrlencodedError::Chunked);
} else if let Some(len) = headers.get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len > 262_144 {
encoded.error = Some(UrlencodedError::Overflow);
}
} else {
encoded.error = Some(UrlencodedError::UnknownLength);
}
} else {
encoded.error = Some(UrlencodedError::UnknownLength);
} }
} }
// check content type /// Change max size of payload. By default max size is 256Kb
if encoded.error.is_none() { pub fn limit(mut self, limit: usize) -> Self {
if let Some(content_type) = headers.get(header::CONTENT_TYPE) { self.limit = limit;
if let Ok(content_type) = content_type.to_str() { self
if content_type.to_lowercase() == "application/x-www-form-urlencoded" {
return encoded
}
}
}
encoded.error = Some(UrlencodedError::ContentType);
return encoded
}
encoded
} }
} }
@ -689,48 +662,76 @@ impl Future for UrlEncoded {
type Error = UrlencodedError; type Error = UrlencodedError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() { if let Some(req) = self.req.take() {
return Err(err) if req.chunked().unwrap_or(false) {
return Err(UrlencodedError::Chunked)
} else if let Some(len) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len > 262_144 {
return Err(UrlencodedError::Overflow);
}
} else {
return Err(UrlencodedError::UnknownLength)
}
} else {
return Err(UrlencodedError::UnknownLength)
}
} }
loop { // check content type
return match self.pl.poll() { let mut err = true;
Ok(Async::NotReady) => Ok(Async::NotReady), if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) {
Ok(Async::Ready(None)) => { if let Ok(content_type) = content_type.to_str() {
if content_type.to_lowercase() == "application/x-www-form-urlencoded" {
err = false;
}
}
}
if err {
return Err(UrlencodedError::ContentType);
}
// future
let limit = self.limit;
let fut = req.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(UrlencodedError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.map(|body| {
let mut m = HashMap::new(); let mut m = HashMap::new();
for (k, v) in form_urlencoded::parse(&self.body) { for (k, v) in form_urlencoded::parse(&body) {
m.insert(k.into(), v.into()); m.insert(k.into(), v.into());
} }
Ok(Async::Ready(m)) m
}, });
Ok(Async::Ready(Some(item))) => { self.fut = Some(Box::new(fut));
self.body.extend_from_slice(&item);
continue
},
Err(err) => Err(err.into()),
}
} }
self.fut.as_mut().expect("UrlEncoded could not be used second time").poll()
} }
} }
/// Future that resolves to a complete request body. /// Future that resolves to a complete request body.
pub struct RequestBody { pub struct RequestBody {
pl: Payload,
body: BytesMut,
limit: usize, limit: usize,
req: Option<HttpRequest<()>>, req: Option<HttpRequest<()>>,
fut: Option<Box<Future<Item=Bytes, Error=PayloadError>>>,
} }
impl RequestBody { impl RequestBody {
/// Create `RequestBody` for request. /// Create `RequestBody` for request.
pub fn from_request<S>(req: &HttpRequest<S>) -> RequestBody { pub fn from<S>(req: HttpRequest<S>) -> RequestBody {
let pl = req.payload().clone();
RequestBody { RequestBody {
pl: pl,
body: BytesMut::new(),
limit: 262_144, limit: 262_144,
req: Some(req.clone_without_state()) req: Some(req.clone_without_state()),
fut: None,
} }
} }
@ -760,25 +761,24 @@ impl Future for RequestBody {
return Err(PayloadError::UnknownLength); return Err(PayloadError::UnknownLength);
} }
} }
}
loop { // future
return match self.pl.poll() { let limit = self.limit;
Ok(Async::NotReady) => Ok(Async::NotReady), self.fut = Some(Box::new(
Ok(Async::Ready(None)) => { req.from_err()
Ok(Async::Ready(self.body.take().freeze())) .fold(BytesMut::new(), move |mut body, chunk| {
}, if (body.len() + chunk.len()) > limit {
Ok(Async::Ready(Some(chunk))) => {
if (self.body.len() + chunk.len()) > self.limit {
Err(PayloadError::Overflow) Err(PayloadError::Overflow)
} else { } else {
self.body.extend_from_slice(&chunk); body.extend_from_slice(&chunk);
continue Ok(body)
}
},
Err(err) => Err(err),
} }
})
.map(|body| body.freeze())
));
} }
self.fut.as_mut().expect("UrlEncoded could not be used second time").poll()
} }
} }

View file

@ -86,10 +86,10 @@ pub struct JsonBody<S, T: DeserializeOwned>{
impl<S, T: DeserializeOwned> JsonBody<S, T> { impl<S, T: DeserializeOwned> JsonBody<S, T> {
/// Create `JsonBody` for request. /// Create `JsonBody` for request.
pub fn from_request(req: &HttpRequest<S>) -> Self { pub fn from_request(req: HttpRequest<S>) -> Self {
JsonBody{ JsonBody{
limit: 262_144, limit: 262_144,
req: Some(req.clone()), req: Some(req),
fut: None, fut: None,
ct: "application/json", ct: "application/json",
} }

View file

@ -32,11 +32,11 @@
//! * Supported *HTTP/1.x* and *HTTP/2.0* protocols //! * Supported *HTTP/1.x* and *HTTP/2.0* protocols
//! * Streaming and pipelining //! * Streaming and pipelining
//! * Keep-alive and slow requests handling //! * Keep-alive and slow requests handling
//! * `WebSockets` //! * WebSockets server/client
//! * Transparent content compression/decompression (br, gzip, deflate) //! * Transparent content compression/decompression (br, gzip, deflate)
//! * Configurable request routing //! * Configurable request routing
//! * Multipart streams //! * Multipart streams
//! * Middlewares (`Logger`, `Session`, `DefaultHeaders`) //! * Middlewares (`Logger`, `Session`, `CORS`, `DefaultHeaders`)
//! * Graceful server shutdown //! * Graceful server shutdown
//! * Built on top of [Actix](https://github.com/actix/actix). //! * Built on top of [Actix](https://github.com/actix/actix).

View file

@ -85,7 +85,7 @@ impl Multipart {
} }
/// Create multipart instance for request. /// Create multipart instance for request.
pub fn from_request<S>(req: &mut HttpRequest<S>) -> Multipart { pub fn from_request<S>(req: HttpRequest<S>) -> Multipart {
match Multipart::boundary(req.headers()) { match Multipart::boundary(req.headers()) {
Ok(boundary) => Multipart::new(boundary, req.payload().clone()), Ok(boundary) => Multipart::new(boundary, req.payload().clone()),
Err(err) => Err(err) =>