1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 19:41:12 +00:00

Read client response until eof if connection header set to close #464

This commit is contained in:
Nikolay Kim 2018-09-03 21:35:11 -07:00
parent 24d1228943
commit f0f67072ae
3 changed files with 67 additions and 8 deletions

View file

@ -15,6 +15,10 @@
* Handling scoped paths without leading slashes #460 * Handling scoped paths without leading slashes #460
### Changed
* Read client response until eof if connection header set to close #464
## [0.7.4] - 2018-08-23 ## [0.7.4] - 2018-08-23

View file

@ -20,6 +20,7 @@ const MAX_HEADERS: usize = 96;
#[derive(Default)] #[derive(Default)]
pub struct HttpResponseParser { pub struct HttpResponseParser {
decoder: Option<EncodingDecoder>, decoder: Option<EncodingDecoder>,
eof: bool, // indicate that we read payload until stream eof
} }
#[derive(Debug, Fail)] #[derive(Debug, Fail)]
@ -44,8 +45,14 @@ impl HttpResponseParser {
match HttpResponseParser::parse_message(buf) match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)? .map_err(HttpResponseParserError::Error)?
{ {
Async::Ready((msg, decoder)) => { Async::Ready((msg, info)) => {
self.decoder = decoder; if let Some((decoder, eof)) = info {
self.eof = eof;
self.decoder = Some(decoder);
} else {
self.eof = false;
self.decoder = None;
}
return Ok(Async::Ready(msg)); return Ok(Async::Ready(msg));
} }
Async::NotReady => { Async::NotReady => {
@ -97,9 +104,14 @@ impl HttpResponseParser {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
if stream_finished { if stream_finished {
// read untile eof?
if self.eof {
return Ok(Async::Ready(None));
} else {
return Err(PayloadError::Incomplete); return Err(PayloadError::Incomplete);
} }
} }
}
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),
} }
} }
@ -110,7 +122,7 @@ impl HttpResponseParser {
fn parse_message( fn parse_message(
buf: &mut BytesMut, buf: &mut BytesMut,
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> { ) -> Poll<(ClientResponse, Option<(EncodingDecoder, bool)>), ParseError> {
// Unsafe: we read only this data only after httparse parses headers into. // Unsafe: we read only this data only after httparse parses headers into.
// performance bump for pipeline benchmarks. // performance bump for pipeline benchmarks.
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() }; let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
@ -156,12 +168,12 @@ impl HttpResponseParser {
} }
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS { let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
Some(EncodingDecoder::eof()) Some((EncodingDecoder::eof(), true))
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) { } else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
// Content-Length // Content-Length
if let Ok(s) = len.to_str() { if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() { if let Ok(len) = s.parse::<u64>() {
Some(EncodingDecoder::length(len)) Some((EncodingDecoder::length(len), false))
} else { } else {
debug!("illegal Content-Length: {:?}", len); debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header); return Err(ParseError::Header);
@ -172,7 +184,18 @@ impl HttpResponseParser {
} }
} else if chunked(&hdrs)? { } else if chunked(&hdrs)? {
// Chunked encoding // Chunked encoding
Some(EncodingDecoder::chunked()) Some((EncodingDecoder::chunked(), false))
} else if let Some(value) = hdrs.get(header::CONNECTION) {
let close = if let Ok(s) = value.to_str() {
s == "close"
} else {
false
};
if close {
Some((EncodingDecoder::eof(), true))
} else {
None
}
} else { } else {
None None
}; };

View file

@ -8,7 +8,8 @@ extern crate rand;
#[cfg(all(unix, feature = "uds"))] #[cfg(all(unix, feature = "uds"))]
extern crate tokio_uds; extern crate tokio_uds;
use std::io::Read; use std::io::{Read, Write};
use std::{net, thread};
use bytes::Bytes; use bytes::Bytes;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
@ -470,3 +471,34 @@ fn test_default_headers() {
"\"" "\""
))); )));
} }
#[test]
fn client_read_until_eof() {
let addr = test::TestServer::unused_addr();
thread::spawn(move || {
let lst = net::TcpListener::bind(addr).unwrap();
for stream in lst.incoming() {
let mut stream = stream.unwrap();
let mut b = [0; 1000];
let _ = stream.read(&mut b).unwrap();
let _ = stream
.write_all(b"HTTP/1.1 200 OK\r\nconnection: close\r\n\r\nwelcome!");
}
});
let mut sys = actix::System::new("test");
// client request
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
println!("TEST: {:?}", req);
let response = sys.block_on(req.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = sys.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(b"welcome!"));
}