1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-02 14:26:43 +00:00

refactor http client

This commit is contained in:
Nikolay Kim 2018-02-19 03:11:11 -08:00
parent edd114f6e4
commit cb70d5ec3d
9 changed files with 483 additions and 256 deletions

View file

@ -2,10 +2,12 @@ mod connector;
mod parser; mod parser;
mod request; mod request;
mod response; mod response;
mod pipeline;
mod writer; mod writer;
pub(crate) use self::writer::HttpClientWriter; pub use self::pipeline::{SendRequest, SendRequestError};
pub use self::request::{ClientRequest, ClientRequestBuilder}; pub use self::request::{ClientRequest, ClientRequestBuilder};
pub use self::response::{ClientResponse, JsonResponse}; pub use self::response::{ClientResponse, JsonResponse, UrlEncoded};
pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};
pub use self::connector::{Connect, Connection, ClientConnector, ClientConnectorError}; pub use self::connector::{Connect, Connection, ClientConnector, ClientConnectorError};
pub(crate) use self::writer::HttpClientWriter;
pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};

View file

@ -2,15 +2,13 @@ use std::mem;
use httparse; use httparse;
use http::{Version, HttpTryFrom, HeaderMap, StatusCode}; use http::{Version, HttpTryFrom, HeaderMap, StatusCode};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use futures::{Poll, Async}; use futures::{Poll, Async};
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
use server::{utils, IoStream}; use server::{utils, IoStream};
use server::h1::{Decoder, chunked}; use server::h1::{Decoder, chunked};
use server::encoding::PayloadType;
use super::ClientResponse; use super::ClientResponse;
use super::response::ClientMessage; use super::response::ClientMessage;
@ -20,18 +18,7 @@ const MAX_HEADERS: usize = 96;
#[derive(Default)] #[derive(Default)]
pub struct HttpResponseParser { pub struct HttpResponseParser {
payload: Option<PayloadInfo>, decoder: Option<Decoder>,
}
enum Decoding {
Paused,
Ready,
NotReady,
}
struct PayloadInfo {
tx: PayloadType,
decoder: Decoder,
} }
#[derive(Debug)] #[derive(Debug)]
@ -47,31 +34,6 @@ impl HttpResponseParser {
-> Poll<ClientResponse, HttpResponseParserError> -> Poll<ClientResponse, HttpResponseParserError>
where T: IoStream where T: IoStream
{ {
// read payload
if self.payload.is_some() {
match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(PayloadError::Incomplete);
}
// http channel should not deal with payload errors
return Err(HttpResponseParserError::Payload)
},
Err(err) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(err.into());
}
// http channel should not deal with payload errors
return Err(HttpResponseParserError::Payload)
}
_ => (),
}
match self.decode(buf)? {
Decoding::Ready => self.payload = None,
Decoding::Paused | Decoding::NotReady => return Ok(Async::NotReady),
}
}
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { let read = if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
@ -93,32 +55,19 @@ impl HttpResponseParser {
loop { loop {
match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? { match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? {
Async::Ready((msg, decoder)) => { Async::Ready((msg, decoder)) => {
// process payload self.decoder = decoder;
if let Some(payload) = decoder {
self.payload = Some(payload);
match self.decode(buf)? {
Decoding::Paused | Decoding::NotReady => (),
Decoding::Ready => self.payload = None,
}
}
return Ok(Async::Ready(msg)); return Ok(Async::Ready(msg));
}, },
Async::NotReady => { Async::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.capacity() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(HttpResponseParserError::Error(ParseError::TooLarge)); return Err(HttpResponseParserError::Error(ParseError::TooLarge));
} }
if read { if read {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect),
debug!("Ignored premature client disconnection");
return Err(HttpResponseParserError::Disconnect);
},
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => Ok(Async::NotReady) => return Ok(Async::NotReady),
return Ok(Async::NotReady), Err(err) => return Err(HttpResponseParserError::Error(err.into())),
Err(err) =>
return Err(HttpResponseParserError::Error(err.into()))
} }
} else { } else {
return Ok(Async::NotReady) return Ok(Async::NotReady)
@ -128,35 +77,24 @@ impl HttpResponseParser {
} }
} }
fn decode(&mut self, buf: &mut BytesMut) -> Result<Decoding, HttpResponseParserError> { pub fn parse_payload<T>(&mut self, io: &mut T, buf: &mut BytesMut)
if let Some(ref mut payload) = self.payload { -> Poll<Option<Bytes>, PayloadError>
if payload.tx.capacity() > DEFAULT_BUFFER_SIZE { where T: IoStream
return Ok(Decoding::Paused) {
} if let Some(ref mut decoder) = self.decoder {
loop { // read payload
match payload.decoder.decode(buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(0)) => return Err(PayloadError::Incomplete),
payload.tx.feed_data(bytes) Err(err) => return Err(err.into()),
}, _ => (),
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(err) => {
payload.tx.set_error(err.into());
return Err(HttpResponseParserError::Payload)
}
}
} }
decoder.decode(buf).map_err(|e| e.into())
} else { } else {
return Ok(Decoding::Ready) Ok(Async::Ready(None))
} }
} }
fn parse_message(buf: &mut BytesMut) fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option<Decoder>), ParseError> {
-> Poll<(ClientResponse, Option<PayloadInfo>), ParseError>
{
// Parse http message // Parse http message
let bytes_ptr = buf.as_ref().as_ptr() as usize; let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] = let mut headers: [httparse::Header; MAX_HEADERS] =
@ -181,7 +119,6 @@ impl HttpResponseParser {
} }
}; };
let slice = buf.split_to(len).freeze(); let slice = buf.split_to(len).freeze();
// convert headers // convert headers
@ -221,22 +158,19 @@ impl HttpResponseParser {
}; };
if let Some(decoder) = decoder { if let Some(decoder) = decoder {
let (psender, payload) = Payload::new(false); //let info = PayloadInfo {
let info = PayloadInfo { //tx: PayloadType::new(&hdrs, psender),
tx: PayloadType::new(&hdrs, psender), // decoder: decoder,
decoder: decoder, //};
};
Ok(Async::Ready( Ok(Async::Ready(
(ClientResponse::new( (ClientResponse::new(
ClientMessage{ ClientMessage{status: status, version: version,
status: status, version: version, headers: hdrs, cookies: None}), Some(decoder))))
headers: hdrs, cookies: None, payload: Some(payload)}), Some(info))))
} else { } else {
Ok(Async::Ready( Ok(Async::Ready(
(ClientResponse::new( (ClientResponse::new(
ClientMessage{ ClientMessage{status: status, version: version,
status: status, version: version, headers: hdrs, cookies: None}), None)))
headers: hdrs, cookies: None, payload: None}), None)))
} }
} }
} }

126
src/client/pipeline.rs Normal file
View file

@ -0,0 +1,126 @@
use std::{io, mem};
use bytes::{Bytes, BytesMut};
use futures::{Async, Future, Poll};
use actix::prelude::*;
use error::PayloadError;
use server::shared::SharedBytes;
use super::{ClientRequest, ClientResponse};
use super::{Connect, Connection, ClientConnector, ClientConnectorError};
use super::HttpClientWriter;
use super::{HttpResponseParser, HttpResponseParserError};
pub enum SendRequestError {
Connector(ClientConnectorError),
ParseError(HttpResponseParserError),
Io(io::Error),
}
impl From<io::Error> for SendRequestError {
fn from(err: io::Error) -> SendRequestError {
SendRequestError::Io(err)
}
}
enum State {
New,
Connect(actix::dev::Request<Unsync, ClientConnector, Connect>),
Send(Box<Pipeline>),
None,
}
/// `SendRequest` is a `Future` which represents asynchronous request sending process.
#[must_use = "SendRequest do nothing unless polled"]
pub struct SendRequest {
req: ClientRequest,
state: State,
}
impl SendRequest {
pub(crate) fn new(req: ClientRequest) -> SendRequest {
SendRequest{
req: req,
state: State::New,
}
}
}
impl Future for SendRequest {
type Item = ClientResponse;
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let state = mem::replace(&mut self.state, State::None);
match state {
State::New =>
self.state = State::Connect(
ClientConnector::from_registry().send(Connect(self.req.uri().clone()))),
State::Connect(mut conn) => match conn.poll() {
Ok(Async::NotReady) => {
self.state = State::Connect(conn);
return Ok(Async::NotReady);
},
Ok(Async::Ready(result)) => match result {
Ok(stream) => {
let mut pl = Box::new(Pipeline {
conn: stream,
writer: HttpClientWriter::new(SharedBytes::default()),
parser: HttpResponseParser::default(),
parser_buf: BytesMut::new(),
});
pl.writer.start(&mut self.req)?;
self.state = State::Send(pl);
},
Err(err) => return Err(SendRequestError::Connector(err)),
},
Err(_) =>
return Err(SendRequestError::Connector(ClientConnectorError::Disconnected))
},
State::Send(mut pl) => {
pl.poll_write()?;
match pl.parse() {
Ok(Async::Ready(mut resp)) => {
resp.set_pipeline(pl);
return Ok(Async::Ready(resp))
},
Ok(Async::NotReady) => {
self.state = State::Send(pl);
return Ok(Async::NotReady)
},
Err(err) => return Err(SendRequestError::ParseError(err))
}
}
State::None => unreachable!(),
}
}
}
}
pub(crate) struct Pipeline {
conn: Connection,
writer: HttpClientWriter,
parser: HttpResponseParser,
parser_buf: BytesMut,
}
impl Pipeline {
#[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
self.parser.parse(&mut self.conn, &mut self.parser_buf)
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.parser.parse_payload(&mut self.conn, &mut self.parser_buf)
}
#[inline]
pub fn poll_write(&mut self) -> Poll<(), io::Error> {
self.writer.poll_completed(&mut self.conn, false)
}
}

View file

@ -11,6 +11,7 @@ use serde::Serialize;
use body::Body; use body::Body;
use error::Error; use error::Error;
use headers::ContentEncoding; use headers::ContentEncoding;
use super::pipeline::SendRequest;
/// An HTTP Client Request /// An HTTP Client Request
pub struct ClientRequest { pub struct ClientRequest {
@ -19,7 +20,8 @@ pub struct ClientRequest {
version: Version, version: Version,
headers: HeaderMap, headers: HeaderMap,
body: Body, body: Body,
chunked: Option<bool>, chunked: bool,
upgrade: bool,
encoding: ContentEncoding, encoding: ContentEncoding,
} }
@ -32,7 +34,8 @@ impl Default for ClientRequest {
version: Version::HTTP_11, version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16), headers: HeaderMap::with_capacity(16),
body: Body::Empty, body: Body::Empty,
chunked: None, chunked: false,
upgrade: false,
encoding: ContentEncoding::Auto, encoding: ContentEncoding::Auto,
} }
} }
@ -135,6 +138,24 @@ impl ClientRequest {
&mut self.headers &mut self.headers
} }
/// is chunked encoding enabled
#[inline]
pub fn chunked(&self) -> bool {
self.chunked
}
/// is upgrade request
#[inline]
pub fn upgrade(&self) -> bool {
self.upgrade
}
/// Content encoding
#[inline]
pub fn content_encoding(&self) -> ContentEncoding {
self.encoding
}
/// Get body os this response /// Get body os this response
#[inline] #[inline]
pub fn body(&self) -> &Body { pub fn body(&self) -> &Body {
@ -146,9 +167,14 @@ impl ClientRequest {
self.body = body.into(); self.body = body.into();
} }
/// Set a body and return previous body value /// Extract body, replace it with Empty
pub fn replace_body<B: Into<Body>>(&mut self, body: B) -> Body { pub(crate) fn replace_body(&mut self, body: Body) -> Body {
mem::replace(&mut self.body, body.into()) mem::replace(&mut self.body, body)
}
/// Send request
pub fn send(self) -> SendRequest {
SendRequest::new(self)
} }
} }
@ -288,7 +314,16 @@ impl ClientRequestBuilder {
#[inline] #[inline]
pub fn chunked(&mut self) -> &mut Self { pub fn chunked(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) { if let Some(parts) = parts(&mut self.request, &self.err) {
parts.chunked = Some(true); parts.chunked = true;
}
self
}
/// Enable connection upgrade
#[inline]
pub fn upgrade(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.upgrade = true;
} }
self self
} }

View file

@ -1,21 +1,22 @@
use std::{fmt, str}; use std::{fmt, str};
use std::rc::Rc; use std::rc::Rc;
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::collections::HashMap;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use cookie::Cookie; use cookie::Cookie;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use http_range::HttpRange;
use http::{HeaderMap, StatusCode, Version}; use http::{HeaderMap, StatusCode, Version};
use http::header::{self, HeaderValue}; use http::header::{self, HeaderValue};
use mime::Mime; use mime::Mime;
use serde_json; use serde_json;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use url::form_urlencoded;
use payload::{Payload, ReadAny}; // use multipart::Multipart;
use multipart::Multipart; use error::{CookieParseError, ParseError, PayloadError, JsonPayloadError, UrlencodedError};
use httprequest::UrlEncoded;
use error::{CookieParseError, ParseError, PayloadError, JsonPayloadError, HttpRangeError}; use super::pipeline::Pipeline;
pub(crate) struct ClientMessage { pub(crate) struct ClientMessage {
@ -23,7 +24,6 @@ pub(crate) struct ClientMessage {
pub version: Version, pub version: Version,
pub headers: HeaderMap<HeaderValue>, pub headers: HeaderMap<HeaderValue>,
pub cookies: Option<Vec<Cookie<'static>>>, pub cookies: Option<Vec<Cookie<'static>>>,
pub payload: Option<Payload>,
} }
impl Default for ClientMessage { impl Default for ClientMessage {
@ -34,18 +34,21 @@ impl Default for ClientMessage {
version: Version::HTTP_11, version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16), headers: HeaderMap::with_capacity(16),
cookies: None, cookies: None,
payload: None,
} }
} }
} }
/// An HTTP Client response /// An HTTP Client response
pub struct ClientResponse(Rc<UnsafeCell<ClientMessage>>); pub struct ClientResponse(Rc<UnsafeCell<ClientMessage>>, Option<Box<Pipeline>>);
impl ClientResponse { impl ClientResponse {
pub(crate) fn new(msg: ClientMessage) -> ClientResponse { pub(crate) fn new(msg: ClientMessage) -> ClientResponse {
ClientResponse(Rc::new(UnsafeCell::new(msg))) ClientResponse(Rc::new(UnsafeCell::new(msg)), None)
}
pub(crate) fn set_pipeline(&mut self, pl: Box<Pipeline>) {
self.1 = Some(pl);
} }
#[inline] #[inline]
@ -155,53 +158,12 @@ impl ClientResponse {
} }
} }
/// Parses Range HTTP header string as per RFC 2616. // /// Return stream to http payload processes as multipart.
/// `size` is full size of response (file). // ///
pub fn range(&self, size: u64) -> Result<Vec<HttpRange>, HttpRangeError> { // /// Content-type: multipart/form-data;
if let Some(range) = self.headers().get(header::RANGE) { // pub fn multipart(mut self) -> Multipart {
HttpRange::parse(unsafe{str::from_utf8_unchecked(range.as_bytes())}, size) // Multipart::from_response(&mut self)
.map_err(|e| e.into()) // }
} else {
Ok(Vec::new())
}
}
/// Returns reference to the associated http payload.
#[inline]
pub fn payload(&self) -> &Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_ref().unwrap()
}
/// Returns mutable reference to the associated http payload.
#[inline]
pub fn payload_mut(&mut self) -> &mut Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_mut().unwrap()
}
/// Load request body.
///
/// By default only 256Kb payload reads to a memory, then `ResponseBody`
/// resolves to an error. Use `RequestBody::limit()`
/// method to change upper limit.
pub fn body(&self) -> ResponseBody {
ResponseBody::from_response(self)
}
/// Return stream to http payload processes as multipart.
///
/// Content-type: multipart/form-data;
pub fn multipart(&mut self) -> Multipart {
Multipart::from_response(self)
}
/// Parse `application/x-www-form-urlencoded` encoded body. /// Parse `application/x-www-form-urlencoded` encoded body.
/// Return `UrlEncoded` future. It resolves to a `HashMap<String, String>` which /// Return `UrlEncoded` future. It resolves to a `HashMap<String, String>` which
@ -212,10 +174,8 @@ impl ClientResponse {
/// * content type is not `application/x-www-form-urlencoded` /// * content type is not `application/x-www-form-urlencoded`
/// * transfer encoding is `chunked`. /// * transfer encoding is `chunked`.
/// * content-length is greater than 256k /// * content-length is greater than 256k
pub fn urlencoded(&self) -> UrlEncoded { pub fn urlencoded(self) -> UrlEncoded {
UrlEncoded::from(self.payload().clone(), UrlEncoded::new(self)
self.headers(),
self.chunked().unwrap_or(false))
} }
/// Parse `application/json` encoded body. /// Parse `application/json` encoded body.
@ -225,7 +185,7 @@ impl ClientResponse {
/// ///
/// * content type is not `application/json` /// * content type is not `application/json`
/// * content length is greater than 256k /// * content length is greater than 256k
pub fn json<T: DeserializeOwned>(&self) -> JsonResponse<T> { pub fn json<T: DeserializeOwned>(self) -> JsonResponse<T> {
JsonResponse::from_response(self) JsonResponse::from_response(self)
} }
} }
@ -247,77 +207,16 @@ impl fmt::Debug for ClientResponse {
} }
} }
impl Clone for ClientResponse {
fn clone(&self) -> ClientResponse {
ClientResponse(self.0.clone())
}
}
/// Future that resolves to a complete request body. /// Future that resolves to a complete request body.
pub struct ResponseBody { impl Stream for ClientResponse {
pl: ReadAny,
body: BytesMut,
limit: usize,
resp: Option<ClientResponse>,
}
impl ResponseBody {
/// Create `RequestBody` for request.
pub fn from_response(resp: &ClientResponse) -> ResponseBody {
let pl = resp.payload().readany();
ResponseBody {
pl: pl,
body: BytesMut::new(),
limit: 262_144,
resp: Some(resp.clone()),
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl Future for ResponseBody {
type Item = Bytes; type Item = Bytes;
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(resp) = self.resp.take() { if let Some(ref mut pl) = self.1 {
if let Some(len) = resp.headers().get(header::CONTENT_LENGTH) { pl.poll()
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<usize>() {
if len > self.limit {
return Err(PayloadError::Overflow);
}
} else { } else {
return Err(PayloadError::UnknownLength); Ok(Async::Ready(None))
}
} else {
return Err(PayloadError::UnknownLength);
}
}
}
loop {
return match self.pl.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
Ok(Async::Ready(self.body.take().freeze()))
},
Ok(Async::Ready(Some(chunk))) => {
if (self.body.len() + chunk.len()) > self.limit {
Err(PayloadError::Overflow)
} else {
self.body.extend_from_slice(&chunk);
continue
}
},
Err(err) => Err(err),
}
} }
} }
} }
@ -328,6 +227,7 @@ impl Future for ResponseBody {
/// ///
/// * content type is not `application/json` /// * content type is not `application/json`
/// * content length is greater than 256k /// * content length is greater than 256k
#[must_use = "JsonResponse does nothing unless polled"]
pub struct JsonResponse<T: DeserializeOwned>{ pub struct JsonResponse<T: DeserializeOwned>{
limit: usize, limit: usize,
ct: &'static str, ct: &'static str,
@ -338,12 +238,12 @@ pub struct JsonResponse<T: DeserializeOwned>{
impl<T: DeserializeOwned> JsonResponse<T> { impl<T: DeserializeOwned> JsonResponse<T> {
/// Create `JsonBody` for request. /// Create `JsonBody` for request.
pub fn from_response(resp: &ClientResponse) -> Self { pub fn from_response(resp: ClientResponse) -> Self {
JsonResponse{ JsonResponse{
limit: 262_144, limit: 262_144,
resp: Some(resp.clone()), resp: Some(resp),
fut: None,
ct: "application/json", ct: "application/json",
fut: None,
} }
} }
@ -386,8 +286,7 @@ impl<T: DeserializeOwned + 'static> Future for JsonResponse<T> {
} }
let limit = self.limit; let limit = self.limit;
let fut = resp.payload().readany() let fut = resp.from_err()
.from_err()
.fold(BytesMut::new(), move |mut body, chunk| { .fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {
Err(JsonPayloadError::Overflow) Err(JsonPayloadError::Overflow)
@ -397,9 +296,93 @@ impl<T: DeserializeOwned + 'static> Future for JsonResponse<T> {
} }
}) })
.and_then(|body| Ok(serde_json::from_slice::<T>(&body)?)); .and_then(|body| Ok(serde_json::from_slice::<T>(&body)?));
self.fut = Some(Box::new(fut)); self.fut = Some(Box::new(fut));
} }
self.fut.as_mut().expect("JsonResponse could not be used second time").poll() self.fut.as_mut().expect("JsonResponse could not be used second time").poll()
} }
} }
/// Future that resolves to a parsed urlencoded values.
#[must_use = "UrlEncoded does nothing unless polled"]
pub struct UrlEncoded {
resp: Option<ClientResponse>,
limit: usize,
fut: Option<Box<Future<Item=HashMap<String, String>, Error=UrlencodedError>>>,
}
impl UrlEncoded {
pub fn new(resp: ClientResponse) -> UrlEncoded {
UrlEncoded{resp: Some(resp),
limit: 262_144,
fut: None}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl Future for UrlEncoded {
type Item = HashMap<String, String>;
type Error = UrlencodedError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(resp) = self.resp.take() {
if resp.chunked().unwrap_or(false) {
return Err(UrlencodedError::Chunked)
} else if let Some(len) = resp.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len > 262_144 {
return Err(UrlencodedError::Overflow);
}
} else {
return Err(UrlencodedError::UnknownLength);
}
} else {
return Err(UrlencodedError::UnknownLength);
}
}
// check content type
let mut encoding = false;
if let Some(content_type) = resp.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if content_type.to_lowercase() == "application/x-www-form-urlencoded" {
encoding = true;
}
}
}
if !encoding {
return Err(UrlencodedError::ContentType);
}
// urlencoded body
let limit = self.limit;
let fut = resp.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(UrlencodedError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(|body| {
let mut m = HashMap::new();
for (k, v) in form_urlencoded::parse(&body) {
m.insert(k.into(), v.into());
}
Ok(m)
});
self.fut = Some(Box::new(fut));
}
self.fut.as_mut().expect("UrlEncoded could not be used second time").poll()
}
}

View file

@ -1,13 +1,20 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::io; use std::io;
use std::fmt::Write; use std::fmt::Write;
use bytes::BufMut; use bytes::{BytesMut, BufMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use http::{Version, HttpTryFrom};
use http::header::{HeaderValue, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use flate2::Compression;
use flate2::write::{GzEncoder, DeflateEncoder};
use brotli2::write::BrotliEncoder;
use body::Binary; use body::{Body, Binary};
use headers::ContentEncoding;
use server::WriterState; use server::WriterState;
use server::shared::SharedBytes; use server::shared::SharedBytes;
use server::encoding::{ContentEncoder, TransferEncoding};
use client::ClientRequest; use client::ClientRequest;
@ -30,6 +37,7 @@ pub(crate) struct HttpClientWriter {
written: u64, written: u64,
headers_size: u32, headers_size: u32,
buffer: SharedBytes, buffer: SharedBytes,
encoder: ContentEncoder,
low: usize, low: usize,
high: usize, high: usize,
} }
@ -37,11 +45,13 @@ pub(crate) struct HttpClientWriter {
impl HttpClientWriter { impl HttpClientWriter {
pub fn new(buf: SharedBytes) -> HttpClientWriter { pub fn new(buf: SharedBytes) -> HttpClientWriter {
let encoder = ContentEncoder::Identity(TransferEncoding::eof(buf.clone()));
HttpClientWriter { HttpClientWriter {
flags: Flags::empty(), flags: Flags::empty(),
written: 0, written: 0,
headers_size: 0, headers_size: 0,
buffer: buf, buffer: buf,
encoder: encoder,
low: LOW_WATERMARK, low: LOW_WATERMARK,
high: HIGH_WATERMARK, high: HIGH_WATERMARK,
} }
@ -87,14 +97,19 @@ impl HttpClientWriter {
impl HttpClientWriter { impl HttpClientWriter {
pub fn start(&mut self, msg: &mut ClientRequest) { pub fn start(&mut self, msg: &mut ClientRequest) -> io::Result<()> {
// prepare task // prepare task
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg);
// render message // render message
{ {
let buffer = self.buffer.get_mut(); let buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = *msg.body() {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE); buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
// status line // status line
let _ = write!(buffer, "{} {} {:?}\r\n", let _ = write!(buffer, "{} {} {:?}\r\n",
@ -119,8 +134,16 @@ impl HttpClientWriter {
buffer.extend_from_slice(b"\r\n"); buffer.extend_from_slice(b"\r\n");
//} //}
self.headers_size = buffer.len() as u32; self.headers_size = buffer.len() as u32;
if msg.body().is_binary() {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
self.written += bytes.len() as u64;
self.encoder.write(bytes)?;
} }
} }
}
Ok(())
}
pub fn write(&mut self, payload: &Binary) -> io::Result<WriterState> { pub fn write(&mut self, payload: &Binary) -> io::Result<WriterState> {
self.written += payload.len() as u64; self.written += payload.len() as u64;
@ -160,3 +183,127 @@ impl HttpClientWriter {
} }
} }
} }
fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder {
let version = req.version();
let mut body = req.replace_body(Body::Empty);
let mut encoding = req.content_encoding();
let transfer = match body {
Body::Empty => {
req.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::length(0, buf)
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
let tmp = SharedBytes::default();
let transfer = TransferEncoding::eof(tmp.clone());
let mut enc = match encoding {
ContentEncoding::Deflate => ContentEncoder::Deflate(
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
ContentEncoding::Auto => unreachable!()
};
// TODO return error!
let _ = enc.write(bytes.clone());
let _ = enc.write_eof();
*bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity;
}
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
req.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
TransferEncoding::eof(buf)
},
Body::Streaming(_) | Body::Actor(_) => {
if req.upgrade() {
if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2");
} else {
req.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade"));
}
if encoding != ContentEncoding::Identity {
encoding = ContentEncoding::Identity;
req.headers_mut().remove(CONTENT_ENCODING);
}
TransferEncoding::eof(buf)
} else {
streaming_encoding(buf, version, req)
}
}
};
req.replace_body(body);
match encoding {
ContentEncoding::Deflate => ContentEncoder::Deflate(
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),
}
}
fn streaming_encoding(buf: SharedBytes, version: Version, req: &mut ClientRequest)
-> TransferEncoding {
if req.chunked() {
// Enable transfer encoding
req.headers_mut().remove(CONTENT_LENGTH);
if version == Version::HTTP_2 {
req.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
} else {
req.headers_mut().insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf)
}
} else {
// if Content-Length is specified, then use it as length hint
let (len, chunked) =
if let Some(len) = req.headers().get(CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
(Some(len), false)
} else {
error!("illegal Content-Length: {:?}", len);
(None, false)
}
} else {
error!("illegal Content-Length: {:?}", len);
(None, false)
}
} else {
(None, true)
};
if !chunked {
if let Some(len) = len {
TransferEncoding::length(len, buf)
} else {
TransferEncoding::eof(buf)
}
} else {
// Enable transfer encoding
match version {
Version::HTTP_11 => {
req.headers_mut().insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf)
},
_ => {
req.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
}
}
}
}
}

View file

@ -14,7 +14,6 @@ use futures::task::{Task, current as current_task};
use error::{ParseError, PayloadError, MultipartError}; use error::{ParseError, PayloadError, MultipartError};
use payload::Payload; use payload::Payload;
use client::ClientResponse;
use httprequest::HttpRequest; use httprequest::HttpRequest;
const MAX_HEADERS: usize = 32; const MAX_HEADERS: usize = 32;
@ -98,18 +97,18 @@ impl Multipart {
} }
} }
/// Create multipart instance for client response. // /// Create multipart instance for client response.
pub fn from_response(resp: &mut ClientResponse) -> Multipart { // pub fn from_response(resp: &mut ClientResponse) -> Multipart {
match Multipart::boundary(resp.headers()) { // match Multipart::boundary(resp.headers()) {
Ok(boundary) => Multipart::new(boundary, resp.payload().clone()), // Ok(boundary) => Multipart::new(boundary, resp.payload().clone()),
Err(err) => // Err(err) =>
Multipart { // Multipart {
error: Some(err), // error: Some(err),
safety: Safety::new(), // safety: Safety::new(),
inner: None, // inner: None,
} // }
} // }
} // }
/// Extract boundary info from headers. /// Extract boundary info from headers.
pub fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> { pub fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {

View file

@ -26,7 +26,7 @@ use super::shared::SharedBytes;
impl ContentEncoding { impl ContentEncoding {
#[inline] #[inline]
fn is_compression(&self) -> bool { pub fn is_compression(&self) -> bool {
match *self { match *self {
ContentEncoding::Identity | ContentEncoding::Auto => false, ContentEncoding::Identity | ContentEncoding::Auto => false,
_ => true _ => true
@ -546,7 +546,7 @@ impl PayloadEncoder {
} }
} }
enum ContentEncoder { pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>), Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>), Gzip(GzEncoder<TransferEncoding>),
Br(BrotliEncoder<TransferEncoding>), Br(BrotliEncoder<TransferEncoding>),

View file

@ -178,6 +178,7 @@ impl WsClient {
self.request.set_header(header::ORIGIN, origin); self.request.set_header(header::ORIGIN, origin);
} }
self.request.upgrade();
self.request.set_header(header::UPGRADE, "websocket"); self.request.set_header(header::UPGRADE, "websocket");
self.request.set_header(header::CONNECTION, "upgrade"); self.request.set_header(header::CONNECTION, "upgrade");
self.request.set_header("SEC-WEBSOCKET-VERSION", "13"); self.request.set_header("SEC-WEBSOCKET-VERSION", "13");
@ -265,7 +266,7 @@ impl Future for WsHandshake {
if !self.sent { if !self.sent {
self.sent = true; self.sent = true;
inner.writer.start(&mut self.request); inner.writer.start(&mut self.request)?;
} }
if let Err(err) = inner.writer.poll_completed(&mut inner.conn, false) { if let Err(err) = inner.writer.poll_completed(&mut inner.conn, false) {
return Err(err.into()) return Err(err.into())