1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-27 10:30:33 +00:00

refactor client payload processing

This commit is contained in:
Nikolay Kim 2018-02-24 07:29:35 +03:00
parent ea8e8e75a2
commit 4e41e13baf
12 changed files with 468 additions and 149 deletions

View file

@ -16,11 +16,11 @@
* Added http client
* Added basic websocket client
* Added websocket client
* Added TestServer::ws(), test websockets client
* Added TestServer test http client
* Added TestServer http client support
* Allow to override content encoding on application level

View file

@ -77,6 +77,7 @@ tokio-tls = { version="0.1", optional = true }
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.2", optional = true }
backtrace="*"
[dependencies.actix]
version = "0.5"

142
src/client/encoding.rs Normal file
View file

@ -0,0 +1,142 @@
use std::io;
use std::io::{Read, Write};
use bytes::{Bytes, BytesMut, BufMut};
use flate2::read::GzDecoder;
use flate2::write::DeflateDecoder;
use brotli2::write::BrotliDecoder;
use headers::ContentEncoding;
use server::encoding::{Decoder, Wrapper};
/// Payload wrapper with content decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Deflate => Decoder::Deflate(
Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream{ decoder: dec, dst: BytesMut::new() }
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
let b = writer.get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
loop {
self.dst.reserve(8192);
match decoder.read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.take().freeze()))
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(err) => return Err(err),
}
}
} else {
Ok(None)
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err)
}
},
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(
Box::new(GzDecoder::new(
Wrapper{buf: BytesMut::from(data), eof: false})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.split_to(n).freeze()));
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(e) => return Err(e),
}
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(Some(data)),
}
}
}

View file

@ -1,5 +1,6 @@
//! Http client
mod connector;
mod encoding;
mod parser;
mod request;
mod response;

View file

@ -83,20 +83,34 @@ impl HttpResponseParser {
-> Poll<Option<Bytes>, PayloadError>
where T: IoStream
{
if let Some(ref mut decoder) = self.decoder {
if self.decoder.is_some() {
// read payload
match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => return Err(PayloadError::Incomplete),
Ok(Async::Ready(0)) => {
if buf.is_empty() {
return Err(PayloadError::Incomplete)
}
}
Err(err) => return Err(err.into()),
_ => (),
}
decoder.decode(buf).map_err(|e| e.into())
match self.decoder.as_mut().unwrap().decode(buf) {
Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))),
Ok(Async::Ready(None)) => {
self.decoder.take();
Ok(Async::Ready(None))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
} else {
Ok(Async::Ready(None))
}
}
fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option<Decoder>), ParseError> {
fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option<Decoder>), ParseError>
{
// Parse http message
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
@ -160,10 +174,6 @@ impl HttpResponseParser {
};
if let Some(decoder) = decoder {
//let info = PayloadInfo {
//tx: PayloadType::new(&hdrs, psender),
// decoder: decoder,
//};
Ok(Async::Ready(
(ClientResponse::new(
ClientMessage{status: status, version: version,

View file

@ -1,5 +1,6 @@
use std::{io, mem};
use bytes::{Bytes, BytesMut};
use http::header::CONTENT_ENCODING;
use futures::{Async, Future, Poll};
use futures::unsync::oneshot;
@ -8,6 +9,7 @@ use actix::prelude::*;
use error::Error;
use body::{Body, BodyStream};
use context::{Frame, ActorHttpContext};
use headers::ContentEncoding;
use error::PayloadError;
use server::WriterState;
use server::shared::SharedBytes;
@ -15,6 +17,7 @@ use super::{ClientRequest, ClientResponse};
use super::{Connect, Connection, ClientConnector, ClientConnectorError};
use super::HttpClientWriter;
use super::{HttpResponseParser, HttpResponseParserError};
use super::encoding::PayloadStream;
/// A set of errors that can occur during sending request and reading response
#[derive(Fail, Debug)]
@ -114,11 +117,13 @@ impl Future for SendRequest {
body: body,
conn: stream,
writer: writer,
parser: HttpResponseParser::default(),
parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(),
disconnected: false,
running: RunningState::Running,
drain: None,
decompress: None,
should_decompress: self.req.response_decompress(),
write_state: RunningState::Running,
});
self.state = State::Send(pl);
},
@ -150,11 +155,13 @@ pub(crate) struct Pipeline {
body: IoBody,
conn: Connection,
writer: HttpClientWriter,
parser: HttpResponseParser,
parser: Option<HttpResponseParser>,
parser_buf: BytesMut,
disconnected: bool,
running: RunningState,
drain: Option<oneshot::Sender<()>>,
decompress: Option<PayloadStream>,
should_decompress: bool,
write_state: RunningState,
}
enum IoBody {
@ -163,7 +170,7 @@ enum IoBody {
Done,
}
#[derive(PartialEq)]
#[derive(Debug, PartialEq)]
enum RunningState {
Running,
Paused,
@ -189,25 +196,90 @@ impl Pipeline {
#[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
self.parser.parse(&mut self.conn, &mut self.parser_buf)
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
}
}
}
}
Ok(Async::Ready(resp))
}
val => val,
}
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.poll_write()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e).as_str()))?;
Ok(self.parser.parse_payload(&mut self.conn, &mut self.parser_buf)?)
let mut need_run = false;
// need write?
match self.poll_write()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?
{
Async::NotReady => need_run = true,
_ => (),
}
// need read?
if self.parser.is_some() {
loop {
match self.parser.as_mut().unwrap()
.parse_payload(&mut self.conn, &mut self.parser_buf)?
{
Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress {
match decompress.feed_data(b) {
Ok(Some(b)) => return Ok(Async::Ready(Some(b))),
Ok(None) => return Ok(Async::NotReady),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock =>
continue,
Err(err) => return Err(err.into()),
}
} else {
return Ok(Async::Ready(Some(b)))
}
},
Async::Ready(None) => {
let _ = self.parser.take();
break
}
Async::NotReady => return Ok(Async::NotReady),
}
}
}
// eof
if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof();
if let Some(b) = res? {
return Ok(Async::Ready(Some(b)))
}
}
if need_run {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(None))
}
}
#[inline]
pub fn poll_write(&mut self) -> Poll<(), Error> {
if self.running == RunningState::Done {
if self.write_state == RunningState::Done {
return Ok(Async::Ready(()))
}
let mut done = false;
if self.drain.is_none() && self.running != RunningState::Paused {
if self.drain.is_none() && self.write_state != RunningState::Paused {
'outter: loop {
let result = match mem::replace(&mut self.body, IoBody::Done) {
IoBody::Payload(mut body) => {
@ -243,6 +315,7 @@ impl Pipeline {
match frame {
Frame::Chunk(None) => {
// info.context = Some(ctx);
self.disconnected = true;
self.writer.write_eof()?;
break 'outter
},
@ -253,7 +326,7 @@ impl Pipeline {
}
self.body = IoBody::Actor(ctx);
if self.drain.is_some() {
self.running.resume();
self.write_state.resume();
break
}
res.unwrap()
@ -270,6 +343,7 @@ impl Pipeline {
}
},
IoBody::Done => {
self.disconnected = true;
done = true;
break
}
@ -277,11 +351,11 @@ impl Pipeline {
match result {
WriterState::Pause => {
self.running.pause();
self.write_state.pause();
break
}
WriterState::Done => {
self.running.resume()
self.write_state.resume()
},
}
}
@ -290,14 +364,18 @@ impl Pipeline {
// flush io but only if we need to
match self.writer.poll_completed(&mut self.conn, false) {
Ok(Async::Ready(_)) => {
self.running.resume();
if self.disconnected {
self.write_state = RunningState::Done;
} else {
self.write_state.resume();
}
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// restart io processing
if !done {
if !done || self.write_state == RunningState::Done {
self.poll_write()
} else {
Ok(Async::NotReady)

View file

@ -4,7 +4,7 @@ use std::io::Write;
use actix::{Addr, Unsync};
use cookie::{Cookie, CookieJar};
use bytes::{BytesMut, BufMut};
use http::{HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError};
use http::{uri, HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError};
use http::header::{self, HeaderName, HeaderValue};
use serde_json;
use serde::Serialize;
@ -25,6 +25,7 @@ pub struct ClientRequest {
chunked: bool,
upgrade: bool,
encoding: ContentEncoding,
response_decompress: bool,
}
impl Default for ClientRequest {
@ -39,6 +40,7 @@ impl Default for ClientRequest {
chunked: false,
upgrade: false,
encoding: ContentEncoding::Auto,
response_decompress: true,
}
}
}
@ -89,6 +91,7 @@ impl ClientRequest {
request: Some(ClientRequest::default()),
err: None,
cookies: None,
default_headers: true,
}
}
@ -158,6 +161,12 @@ impl ClientRequest {
self.encoding
}
/// Decompress response payload
#[inline]
pub fn response_decompress(&self) -> bool {
self.response_decompress
}
/// Get body os this response
#[inline]
pub fn body(&self) -> &Body {
@ -216,6 +225,7 @@ pub struct ClientRequestBuilder {
request: Option<ClientRequest>,
err: Option<HttpError>,
cookies: Option<CookieJar>,
default_headers: bool,
}
impl ClientRequestBuilder {
@ -409,6 +419,22 @@ impl ClientRequestBuilder {
self
}
/// Do not add default request headers.
/// By default `Accept-Encoding` header is set.
pub fn no_default_headers(&mut self) -> &mut Self {
self.default_headers = false;
self
}
/// Disable automatic decompress response body
pub fn disable_decompress(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.response_decompress = false;
}
self
}
/// This method calls provided closure with builder reference if value is true.
pub fn if_true<F>(&mut self, value: bool, f: F) -> &mut Self
where F: FnOnce(&mut ClientRequestBuilder)
@ -437,6 +463,23 @@ impl ClientRequestBuilder {
return Err(e)
}
if self.default_headers {
// enable br only for https
let https =
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.uri.scheme_part()
.map(|s| s == &uri::Scheme::HTTPS).unwrap_or(true)
} else {
true
};
if https {
self.header(header::ACCEPT_ENCODING, "br, gzip, deflate");
} else {
self.header(header::ACCEPT_ENCODING, "gzip, deflate");
}
}
let mut request = self.request.take().expect("cannot reuse request builder");
// set cookies
@ -482,6 +525,7 @@ impl ClientRequestBuilder {
request: self.request.take(),
err: self.err.take(),
cookies: self.cookies.take(),
default_headers: self.default_headers,
}
}
}

View file

@ -237,6 +237,8 @@ pub enum PayloadError {
impl From<IoError> for PayloadError {
fn from(err: IoError) -> PayloadError {
use backtrace;
println!("IO ERROR {:?}", backtrace::Backtrace::new());
PayloadError::Io(err)
}
}

View file

@ -97,6 +97,8 @@ extern crate openssl;
#[cfg(feature="openssl")]
extern crate tokio_openssl;
extern crate backtrace;
mod application;
mod body;
mod context;

View file

@ -128,7 +128,7 @@ impl PayloadWriter for PayloadType {
}
}
enum Decoder {
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer<BytesMut>>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
Br(Box<BrotliDecoder<Writer<BytesMut>>>),
@ -137,9 +137,9 @@ enum Decoder {
// should go after write::GzDecoder get implemented
#[derive(Debug)]
struct Wrapper {
buf: BytesMut,
eof: bool,
pub(crate) struct Wrapper {
pub buf: BytesMut,
pub eof: bool,
}
impl io::Read for Wrapper {

View file

@ -2,8 +2,14 @@ extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate futures;
extern crate flate2;
use std::io::Read;
use bytes::Bytes;
use futures::Future;
use futures::stream::once;
use flate2::read::GzDecoder;
use actix_web::*;
@ -57,3 +63,145 @@ fn test_simple() {
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_no_decompress() {
let mut srv = test::TestServer::new(
|app| app.handler(|_| httpcodes::HTTPOk.build().body(STR)));
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
let mut e = GzDecoder::new(&bytes[..]);
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
// POST
let request = srv.post().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
let bytes = srv.execute(response.body()).unwrap();
let mut e = GzDecoder::new(&bytes[..]);
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_gzip_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Deflate)
.body(bytes))
}).responder()}
));
// client request
let request = srv.post()
.content_encoding(headers::ContentEncoding::Gzip)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_brotli_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Deflate)
.body(bytes))
}).responder()}
));
// client request
let request = srv.client(Method::POST, "/")
.content_encoding(headers::ContentEncoding::Br)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_deflate_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Br)
.body(bytes))
}).responder()}
));
// client request
let request = srv.post()
.content_encoding(headers::ContentEncoding::Deflate)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_streaming_explicit() {
let mut srv = test::TestServer::new(
|app| app.handler(
|req: HttpRequest| req.body()
.map_err(Error::from)
.and_then(|body| {
Ok(httpcodes::HTTPOk.build()
.chunked()
.content_encoding(headers::ContentEncoding::Identity)
.body(body)?)})
.responder()));
let body = once(Ok(Bytes::from_static(STR.as_ref())));
let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_body_streaming_implicit() {
let mut srv = test::TestServer::new(
|app| app.handler(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref())));
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Gzip)
.body(Body::Streaming(Box::new(body)))}));
let request = srv.get().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}

View file

@ -123,7 +123,7 @@ fn test_body_gzip() {
.content_encoding(headers::ContentEncoding::Gzip)
.body(STR)));
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -138,7 +138,7 @@ fn test_body_gzip() {
}
#[test]
fn test_body_streaming_implicit() {
fn test_body_chunked_implicit() {
let mut srv = test::TestServer::new(
|app| app.handler(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref())));
@ -146,7 +146,7 @@ fn test_body_streaming_implicit() {
.content_encoding(headers::ContentEncoding::Gzip)
.body(Body::Streaming(Box::new(body)))}));
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -169,7 +169,7 @@ fn test_body_br_streaming() {
.content_encoding(headers::ContentEncoding::Br)
.body(Body::Streaming(Box::new(body)))}));
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -252,6 +252,7 @@ fn test_body_length() {
let body = once(Ok(Bytes::from_static(STR.as_ref())));
httpcodes::HTTPOk.build()
.content_length(STR.len() as u64)
.content_encoding(headers::ContentEncoding::Identity)
.body(Body::Streaming(Box::new(body)))}));
let request = srv.get().finish().unwrap();
@ -264,7 +265,7 @@ fn test_body_length() {
}
#[test]
fn test_body_streaming_explicit() {
fn test_body_chunked_explicit() {
let mut srv = test::TestServer::new(
|app| app.handler(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref())));
@ -273,7 +274,7 @@ fn test_body_streaming_explicit() {
.content_encoding(headers::ContentEncoding::Gzip)
.body(Body::Streaming(Box::new(body)))}));
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -297,7 +298,7 @@ fn test_body_deflate() {
.body(STR)));
// client request
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -321,7 +322,7 @@ fn test_body_brotli() {
.body(STR)));
// client request
let request = srv.get().finish().unwrap();
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
@ -363,34 +364,6 @@ fn test_gzip_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_gzip_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Deflate)
.body(bytes))
}).responder()}
));
// client request
let request = srv.post()
.content_encoding(headers::ContentEncoding::Gzip)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
let mut e = DeflateDecoder::new(Vec::new());
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_deflate_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
@ -419,35 +392,6 @@ fn test_deflate_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_deflate_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Br)
.body(bytes))
}).responder()}
));
// client request
let request = srv.post()
.content_encoding(headers::ContentEncoding::Deflate)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
// decode brotli
let mut e = BrotliDecoder::new(Vec::with_capacity(2048));
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_brotli_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
@ -476,35 +420,6 @@ fn test_brotli_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_brotli_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Deflate)
.body(bytes))
}).responder()}
));
// client request
let request = srv.client(Method::POST, "/")
.content_encoding(headers::ContentEncoding::Br)
.body(STR).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
// decode brotli
let mut e = DeflateDecoder::new(Vec::with_capacity(2048));
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_h2() {
let srv = test::TestServer::new(|app| app.handler(|_|{
@ -545,30 +460,6 @@ fn test_h2() {
// assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_client_streaming_explicit() {
let mut srv = test::TestServer::new(
|app| app.handler(
|req: HttpRequest| req.body()
.map_err(Error::from)
.and_then(|body| {
Ok(httpcodes::HTTPOk.build()
.chunked()
.content_encoding(headers::ContentEncoding::Identity)
.body(body)?)})
.responder()));
let body = once(Ok(Bytes::from_static(STR.as_ref())));
let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_application() {
let mut srv = test::TestServer::with_factory(