diff --git a/src/client/parser.rs b/src/client/parser.rs index 0d4da4c4d..f81aed119 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -7,18 +7,18 @@ use std::mem; use error::{ParseError, PayloadError}; -use server::h1::{chunked, Decoder}; +use server::h1decoder::EncodingDecoder; use server::{utils, IoStream}; -use super::ClientResponse; use super::response::ClientMessage; +use super::ClientResponse; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; #[derive(Default)] pub struct HttpResponseParser { - decoder: Option, + decoder: Option, } #[derive(Debug, Fail)] @@ -32,7 +32,7 @@ pub enum HttpResponseParserError { impl HttpResponseParser { pub fn parse( - &mut self, io: &mut T, buf: &mut BytesMut + &mut self, io: &mut T, buf: &mut BytesMut, ) -> Poll where T: IoStream, @@ -75,7 +75,7 @@ impl HttpResponseParser { } pub fn parse_payload( - &mut self, io: &mut T, buf: &mut BytesMut + &mut self, io: &mut T, buf: &mut BytesMut, ) -> Poll, PayloadError> where T: IoStream, @@ -113,8 +113,8 @@ impl HttpResponseParser { } fn parse_message( - buf: &mut BytesMut - ) -> Poll<(ClientResponse, Option), ParseError> { + buf: &mut BytesMut, + ) -> Poll<(ClientResponse, Option), ParseError> { // Parse http message let bytes_ptr = buf.as_ref().as_ptr() as usize; let mut headers: [httparse::Header; MAX_HEADERS] = @@ -160,12 +160,12 @@ impl HttpResponseParser { } let decoder = if status == StatusCode::SWITCHING_PROTOCOLS { - Some(Decoder::eof()) + Some(EncodingDecoder::eof()) } else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) { // Content-Length if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { - Some(Decoder::length(len)) + Some(EncodingDecoder::length(len)) } else { debug!("illegal Content-Length: {:?}", len); return Err(ParseError::Header); @@ -176,7 +176,7 @@ impl HttpResponseParser { } } else if chunked(&hdrs)? { // Chunked encoding - Some(Decoder::chunked()) + Some(EncodingDecoder::chunked()) } else { None }; @@ -204,3 +204,16 @@ impl HttpResponseParser { } } } + +/// Check if request has chunked transfer encoding +pub fn chunked(headers: &HeaderMap) -> Result { + if let Some(encodings) = headers.get(header::TRANSFER_ENCODING) { + if let Ok(s) = encodings.to_str() { + Ok(s.to_lowercase().contains("chunked")) + } else { + Err(ParseError::Header) + } + } else { + Ok(false) + } +} diff --git a/src/server/h1.rs b/src/server/h1.rs index ec0b1938a..e411a7889 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -4,32 +4,29 @@ use std::collections::VecDeque; use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; -use std::{self, io}; +use std::{io, mem}; use actix::Arbiter; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use futures::{Async, Future, Poll}; -use http::header::{self, HeaderName, HeaderValue}; -use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; -use httparse; use tokio_core::reactor::Timeout; -use error::{ParseError, PayloadError, ResponseError}; +use error::PayloadError; use httprequest::HttpRequest; use httpresponse::HttpResponse; use payload::{Payload, PayloadStatus, PayloadWriter}; use pipeline::Pipeline; -use uri::Url; use super::encoding::PayloadType; +use super::h1decoder::{DecoderError, H1Decoder, Message}; use super::h1writer::H1Writer; use super::settings::WorkerSettings; +use super::Writer; use super::{HttpHandler, HttpHandlerTask, IoStream}; -use super::{utils, Writer}; -const MAX_BUFFER_SIZE: usize = 131_072; -const MAX_HEADERS: usize = 96; const MAX_PIPELINED_MESSAGES: usize = 16; +const LW_BUFFER_SIZE: usize = 4096; +const HW_BUFFER_SIZE: usize = 32_768; bitflags! { struct Flags: u8 { @@ -37,6 +34,7 @@ bitflags! { const ERROR = 0b0000_0010; const KEEPALIVE = 0b0000_0100; const SHUTDOWN = 0b0000_1000; + const DISCONNECTED = 0b0001_0000; } } @@ -53,8 +51,9 @@ pub(crate) struct Http1 { settings: Rc>, addr: Option, stream: H1Writer, - reader: Reader, - read_buf: BytesMut, + decoder: H1Decoder, + payload: Option, + buf: BytesMut, tasks: VecDeque, keepalive_timer: Option, } @@ -71,29 +70,42 @@ where { pub fn new( settings: Rc>, stream: T, addr: Option, - read_buf: BytesMut, + buf: BytesMut, ) -> Self { let bytes = settings.get_shared_bytes(); Http1 { flags: Flags::KEEPALIVE, stream: H1Writer::new(stream, bytes, Rc::clone(&settings)), - reader: Reader::new(), + decoder: H1Decoder::new(), + payload: None, tasks: VecDeque::new(), keepalive_timer: None, addr, - read_buf, + buf, settings, } } + #[inline] pub fn settings(&self) -> &WorkerSettings { self.settings.as_ref() } + #[inline] pub(crate) fn io(&mut self) -> &mut T { self.stream.get_mut() } + #[inline] + fn can_read(&self) -> bool { + if let Some(ref info) = self.payload { + info.need_read() == PayloadStatus::Read + } else { + true + } + } + + #[inline] pub fn poll(&mut self) -> Poll<(), ()> { // keep-alive timer if let Some(ref mut timer) = self.keepalive_timer { @@ -119,9 +131,13 @@ where } } + self.poll_io(); + loop { - match self.poll_io()? { - Async::Ready(true) => (), + match self.poll_handler()? { + Async::Ready(true) => { + self.poll_io(); + } Async::Ready(false) => { self.flags.insert(Flags::SHUTDOWN); return self.poll(); @@ -131,93 +147,48 @@ where } } - // TODO: refactor - pub fn poll_io(&mut self) -> Poll { - // read incoming data - let need_read = if !self.flags.intersects(Flags::ERROR) - && self.tasks.len() < MAX_PIPELINED_MESSAGES + #[inline] + pub fn poll_io(&mut self) { + // read io from socket + if !self.flags.intersects(Flags::ERROR) + && self.tasks.len() < MAX_PIPELINED_MESSAGES && self.can_read() { - 'outer: loop { - match self.reader.parse( - self.stream.get_mut(), - &mut self.read_buf, - &self.settings, - ) { - Ok(Async::Ready(mut req)) => { - self.flags.insert(Flags::STARTED); - - // set remote addr - req.set_peer_addr(self.addr); - - // stop keepalive timer - self.keepalive_timer.take(); - - // start request processing - for h in self.settings.handlers().iter_mut() { - req = match h.handle(req) { - Ok(pipe) => { - self.tasks.push_back(Entry { - pipe, - flags: EntryFlags::empty(), - }); - continue 'outer; - } - Err(req) => req, - } - } - - self.tasks.push_back(Entry { - pipe: Pipeline::error(HttpResponse::NotFound()), - flags: EntryFlags::empty(), - }); - continue; + match self.read() { + Ok(true) | Err(_) => { + // notify all tasks + self.stream.disconnected(); + for entry in &mut self.tasks { + entry.pipe.disconnected() } - Ok(Async::NotReady) => (), - Err(err) => { - trace!("Parse error: {:?}", err); + // kill keepalive + self.flags.remove(Flags::KEEPALIVE); + self.keepalive_timer.take(); - // notify all tasks - self.stream.disconnected(); - for entry in &mut self.tasks { - entry.pipe.disconnected() - } + // on parse error, stop reading stream but tasks need to be + // completed + self.flags.insert(Flags::ERROR); - // kill keepalive - self.flags.remove(Flags::KEEPALIVE); - self.keepalive_timer.take(); - - // on parse error, stop reading stream but tasks need to be - // completed - self.flags.insert(Flags::ERROR); - - match err { - ReaderError::Disconnect => (), - _ => if self.tasks.is_empty() { - if let ReaderError::Error(err) = err { - self.tasks.push_back(Entry { - pipe: Pipeline::error(err.error_response()), - flags: EntryFlags::empty(), - }); - } - }, - } + if let Some(ref mut payload) = self.payload { + payload.set_error(PayloadError::Incomplete); } } - break; + Ok(false) => { + self.parse(); + } } - false - } else { - true - }; + } + } - let retry = self.reader.need_read() == PayloadStatus::Read; + pub fn poll_handler(&mut self) -> Poll { + let retry = self.can_read(); // check in-flight messages let mut io = false; let mut idx = 0; while idx < self.tasks.len() { - let item = &mut self.tasks[idx]; + let item: &mut Entry = unsafe { mem::transmute(&mut self.tasks[idx]) }; + // only one task can do io operation in http/1 if !io && !item.flags.contains(EntryFlags::EOF) { // io is corrupted, send buffer if item.flags.contains(EntryFlags::ERROR) { @@ -247,7 +218,8 @@ where } // no more IO for this iteration Ok(Async::NotReady) => { - if self.reader.need_read() == PayloadStatus::Read && !retry { + // check if previously read backpressure was enabled + if self.can_read() && !retry { return Ok(Async::Ready(true)); } io = true; @@ -279,20 +251,20 @@ where } // cleanup finished tasks - let mut popped = false; + let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES; while !self.tasks.is_empty() { if self.tasks[0] .flags .contains(EntryFlags::EOF | EntryFlags::FINISHED) { - popped = true; self.tasks.pop_front(); } else { break; } } - if need_read && popped { - return self.poll_io(); + // read more message + if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES { + return Ok(Async::Ready(true)); } // check stream state @@ -332,736 +304,167 @@ where } Ok(Async::NotReady) } -} -struct Reader { - payload: Option, -} + pub fn parse(&mut self) { + 'outer: loop { + match self.decoder.decode(&mut self.buf, &self.settings) { + Ok(Some(Message::Message { msg, payload })) => { + self.flags.insert(Flags::STARTED); -enum Decoding { - Ready, - NotReady, -} + if payload { + let (ps, pl) = Payload::new(false); + msg.get_mut().payload = Some(pl); + self.payload = + Some(PayloadType::new(&msg.get_ref().headers, ps)); + } -struct PayloadInfo { - tx: PayloadType, - decoder: Decoder, -} + let mut req = HttpRequest::from_message(msg); -#[derive(Debug)] -enum ReaderError { - Disconnect, - Payload, - PayloadDropped, - Error(ParseError), -} + // set remote addr + req.set_peer_addr(self.addr); -impl Reader { - pub fn new() -> Reader { - Reader { payload: None } - } + // stop keepalive timer + self.keepalive_timer.take(); - #[inline] - fn need_read(&self) -> PayloadStatus { - if let Some(ref info) = self.payload { - info.tx.need_read() - } else { - PayloadStatus::Read + // search handler for request + for h in self.settings.handlers().iter_mut() { + req = match h.handle(req) { + Ok(pipe) => { + self.tasks.push_back(Entry { + pipe, + flags: EntryFlags::empty(), + }); + continue 'outer; + } + Err(req) => req, + } + } + + // handler is not found + self.tasks.push_back(Entry { + pipe: Pipeline::error(HttpResponse::NotFound()), + flags: EntryFlags::empty(), + }); + } + Ok(Some(Message::Chunk(chunk))) => { + if let Some(ref mut payload) = self.payload { + payload.feed_data(chunk); + } else { + error!("Internal server error: unexpected payload chunk"); + self.flags.insert(Flags::ERROR); + } + } + Ok(Some(Message::Eof)) => { + if let Some(ref mut payload) = self.payload { + payload.feed_eof(); + } else { + error!("Internal server error: unexpected eof"); + self.flags.insert(Flags::ERROR); + } + } + Ok(None) => break, + Err(e) => { + self.flags.insert(Flags::ERROR); + if let Some(ref mut payload) = self.payload { + let e = match e { + DecoderError::Io(e) => PayloadError::Io(e), + DecoderError::Error(_) => PayloadError::EncodingCorrupted, + }; + payload.set_error(e); + } + } + } } } #[inline] - fn decode( - &mut self, buf: &mut BytesMut, payload: &mut PayloadInfo - ) -> Result { - while !buf.is_empty() { - match payload.decoder.decode(buf) { - Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes); - if payload.decoder.is_eof() { - payload.tx.feed_eof(); - return Ok(Decoding::Ready); - } - } - Ok(Async::Ready(None)) => { - payload.tx.feed_eof(); - return Ok(Decoding::Ready); - } - Ok(Async::NotReady) => return Ok(Decoding::NotReady), - Err(err) => { - payload.tx.set_error(err.into()); - return Err(ReaderError::Payload); - } - } - } - Ok(Decoding::NotReady) - } - - pub fn parse( - &mut self, io: &mut T, buf: &mut BytesMut, settings: &WorkerSettings - ) -> Poll - where - T: IoStream, - { - match self.need_read() { - PayloadStatus::Read => (), - PayloadStatus::Pause => return Ok(Async::NotReady), - PayloadStatus::Dropped => return Err(ReaderError::PayloadDropped), - } - - // read payload - let done = { - if let Some(ref mut payload) = self.payload { - 'buf: loop { - let not_ready = match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - payload.tx.set_error(PayloadError::Incomplete); - - // http channel should not deal with payload errors - return Err(ReaderError::Payload); - } - Ok(Async::NotReady) => true, - Err(err) => { - payload.tx.set_error(err.into()); - - // http channel should not deal with payload errors - return Err(ReaderError::Payload); - } - _ => false, - }; - loop { - match payload.decoder.decode(buf) { - Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes); - if payload.decoder.is_eof() { - payload.tx.feed_eof(); - break 'buf true; - } - } - Ok(Async::Ready(None)) => { - payload.tx.feed_eof(); - break 'buf true; - } - Ok(Async::NotReady) => { - // if buffer is full then - // socket still can contain more data - if not_ready { - return Ok(Async::NotReady); - } - continue 'buf; - } - Err(err) => { - payload.tx.set_error(err.into()); - return Err(ReaderError::Payload); - } - } - } - } - } else { - false - } - }; - if done { - self.payload = None - } - - // if buf is empty parse_message will always return NotReady, let's avoid that - if buf.is_empty() { - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect), - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(ReaderError::Error(err.into())), - } - }; - + fn read(&mut self) -> io::Result { loop { - match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { - Async::Ready((msg, decoder)) => { - // process payload - if let Some(mut payload) = decoder { - match self.decode(buf, &mut payload)? { - Decoding::Ready => (), - Decoding::NotReady => self.payload = Some(payload), - } - } - return Ok(Async::Ready(msg)); + unsafe { + if self.buf.remaining_mut() < LW_BUFFER_SIZE { + self.buf.reserve(HW_BUFFER_SIZE); } - Async::NotReady => { - if buf.len() >= MAX_BUFFER_SIZE { - error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - return Err(ReaderError::Error(ParseError::TooLarge)); - } - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - debug!("Ignored premature client disconnection"); - return Err(ReaderError::Disconnect); - } - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(ReaderError::Error(err.into())), - } - } - } - } - } - - fn parse_message( - buf: &mut BytesMut, settings: &WorkerSettings - ) -> Poll<(HttpRequest, Option), ParseError> { - // Parse http message - let mut has_upgrade = false; - let mut chunked = false; - let mut content_length = None; - - let msg = { - let bytes_ptr = buf.as_ref().as_ptr() as usize; - let mut headers: [httparse::Header; MAX_HEADERS] = - unsafe { std::mem::uninitialized() }; - - let (len, method, path, version, headers_len) = { - let b = unsafe { - let b: &[u8] = buf; - std::mem::transmute(b) - }; - let mut req = httparse::Request::new(&mut headers); - match req.parse(b)? { - httparse::Status::Complete(len) => { - let method = Method::from_bytes(req.method.unwrap().as_bytes()) - .map_err(|_| ParseError::Method)?; - let path = Url::new(Uri::try_from(req.path.unwrap())?); - let version = if req.version.unwrap() == 1 { - Version::HTTP_11 + match self.stream.get_mut().read(self.buf.bytes_mut()) { + Ok(n) => { + if n == 0 { + return Ok(true); } else { - Version::HTTP_10 - }; - (len, method, path, version, req.headers.len()) - } - httparse::Status::Partial => return Ok(Async::NotReady), - } - }; - - let slice = buf.split_to(len).freeze(); - - // convert headers - let msg = settings.get_http_message(); - { - let msg_mut = msg.get_mut(); - msg_mut.keep_alive = version != Version::HTTP_10; - - for header in headers[..headers_len].iter() { - if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) { - has_upgrade = has_upgrade || name == header::UPGRADE; - let v_start = header.value.as_ptr() as usize - bytes_ptr; - let v_end = v_start + header.value.len(); - let value = unsafe { - HeaderValue::from_shared_unchecked( - slice.slice(v_start, v_end), - ) - }; - match name { - header::CONTENT_LENGTH => { - if let Ok(s) = value.to_str() { - if let Ok(len) = s.parse::() { - content_length = Some(len) - } else { - debug!("illegal Content-Length: {:?}", len); - return Err(ParseError::Header); - } - } else { - debug!("illegal Content-Length: {:?}", len); - return Err(ParseError::Header); - } - }, - // transfer-encoding - header::TRANSFER_ENCODING => { - if let Ok(s) = value.to_str() { - chunked = s.to_lowercase().contains("chunked"); - } else { - return Err(ParseError::Header) - } - }, - // connection keep-alive state - header::CONNECTION => { - msg_mut.keep_alive = if let Ok(conn) = value.to_str() { - if version == Version::HTTP_10 - && conn.contains("keep-alive") - { - true - } else { - version == Version::HTTP_11 - && !(conn.contains("close") - || conn.contains("upgrade")) - } - } else { - false - }; - }, - _ => (), + self.buf.advance_mut(n); } - - msg_mut.headers.append(name, value); - } else { - return Err(ParseError::Header); } - } - - msg_mut.url = path; - msg_mut.method = method; - msg_mut.version = version; - } - msg - }; - - // https://tools.ietf.org/html/rfc7230#section-3.3.3 - let decoder = if chunked { - // Chunked encoding - Some(Decoder::chunked()) - } else if let Some(len) = content_length { - // Content-Length - Some(Decoder::length(len)) - } else if has_upgrade || msg.get_ref().method == Method::CONNECT { - // upgrade(websocket) or connect - Some(Decoder::eof()) - } else { - None - }; - - if let Some(decoder) = decoder { - let (psender, payload) = Payload::new(false); - let info = PayloadInfo { - tx: PayloadType::new(&msg.get_ref().headers, psender), - decoder, - }; - msg.get_mut().payload = Some(payload); - Ok(Async::Ready(( - HttpRequest::from_message(msg), - Some(info), - ))) - } else { - Ok(Async::Ready((HttpRequest::from_message(msg), None))) - } - } -} - -/// Check if request has chunked transfer encoding -pub fn chunked(headers: &HeaderMap) -> Result { - if let Some(encodings) = headers.get(header::TRANSFER_ENCODING) { - if let Ok(s) = encodings.to_str() { - Ok(s.to_lowercase().contains("chunked")) - } else { - Err(ParseError::Header) - } - } else { - Ok(false) - } -} - -/// Decoders to handle different Transfer-Encodings. -/// -/// If a message body does not include a Transfer-Encoding, it *should* -/// include a Content-Length header. -#[derive(Debug, Clone, PartialEq)] -pub struct Decoder { - kind: Kind, -} - -impl Decoder { - pub fn length(x: u64) -> Decoder { - Decoder { - kind: Kind::Length(x), - } - } - - pub fn chunked() -> Decoder { - Decoder { - kind: Kind::Chunked(ChunkedState::Size, 0), - } - } - - pub fn eof() -> Decoder { - Decoder { - kind: Kind::Eof(false), - } - } -} - -#[derive(Debug, Clone, PartialEq)] -enum Kind { - /// A Reader used when a Content-Length header is passed with a positive - /// integer. - Length(u64), - /// A Reader used when Transfer-Encoding is `chunked`. - Chunked(ChunkedState, u64), - /// A Reader used for responses that don't indicate a length or chunked. - /// - /// Note: This should only used for `Response`s. It is illegal for a - /// `Request` to be made with both `Content-Length` and - /// `Transfer-Encoding: chunked` missing, as explained from the spec: - /// - /// > If a Transfer-Encoding header field is present in a response and - /// > the chunked transfer coding is not the final encoding, the - /// > message body length is determined by reading the connection until - /// > it is closed by the server. If a Transfer-Encoding header field - /// > is present in a request and the chunked transfer coding is not - /// > the final encoding, the message body length cannot be determined - /// > reliably; the server MUST respond with the 400 (Bad Request) - /// > status code and then close the connection. - Eof(bool), -} - -#[derive(Debug, PartialEq, Clone)] -enum ChunkedState { - Size, - SizeLws, - Extension, - SizeLf, - Body, - BodyCr, - BodyLf, - EndCr, - EndLf, - End, -} - -impl Decoder { - pub fn is_eof(&self) -> bool { - match self.kind { - Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => { - true - } - _ => false, - } - } - - pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { - match self.kind { - Kind::Length(ref mut remaining) => { - if *remaining == 0 { - Ok(Async::Ready(None)) - } else { - if body.is_empty() { - return Ok(Async::NotReady); - } - let len = body.len() as u64; - let buf; - if *remaining > len { - buf = body.take().freeze(); - *remaining -= len; - } else { - buf = body.split_to(*remaining as usize).freeze(); - *remaining = 0; - } - trace!("Length read: {}", buf.len()); - Ok(Async::Ready(Some(buf))) - } - } - Kind::Chunked(ref mut state, ref mut size) => { - loop { - let mut buf = None; - // advances the chunked state - *state = try_ready!(state.step(body, size, &mut buf)); - if *state == ChunkedState::End { - trace!("End of chunked stream"); - return Ok(Async::Ready(None)); - } - if let Some(buf) = buf { - return Ok(Async::Ready(Some(buf))); - } - if body.is_empty() { - return Ok(Async::NotReady); + Err(e) => { + return if e.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else { + Err(e) + }; } } } - Kind::Eof(ref mut is_eof) => { - if *is_eof { - Ok(Async::Ready(None)) - } else if !body.is_empty() { - Ok(Async::Ready(Some(body.take().freeze()))) - } else { - Ok(Async::NotReady) - } - } - } - } -} - -macro_rules! byte ( - ($rdr:ident) => ({ - if $rdr.len() > 0 { - let b = $rdr[0]; - $rdr.split_to(1); - b - } else { - return Ok(Async::NotReady) - } - }) -); - -impl ChunkedState { - fn step( - &self, body: &mut BytesMut, size: &mut u64, buf: &mut Option - ) -> Poll { - use self::ChunkedState::*; - match *self { - Size => ChunkedState::read_size(body, size), - SizeLws => ChunkedState::read_size_lws(body), - Extension => ChunkedState::read_extension(body), - SizeLf => ChunkedState::read_size_lf(body, size), - Body => ChunkedState::read_body(body, size, buf), - BodyCr => ChunkedState::read_body_cr(body), - BodyLf => ChunkedState::read_body_lf(body), - EndCr => ChunkedState::read_end_cr(body), - EndLf => ChunkedState::read_end_lf(body), - End => Ok(Async::Ready(ChunkedState::End)), - } - } - fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { - let radix = 16; - match byte!(rdr) { - b @ b'0'...b'9' => { - *size *= radix; - *size += u64::from(b - b'0'); - } - b @ b'a'...b'f' => { - *size *= radix; - *size += u64::from(b + 10 - b'a'); - } - b @ b'A'...b'F' => { - *size *= radix; - *size += u64::from(b + 10 - b'A'); - } - b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => return Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), - _ => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size line: Invalid Size", - )); - } - } - Ok(Async::Ready(ChunkedState::Size)) - } - fn read_size_lws(rdr: &mut BytesMut) -> Poll { - trace!("read_size_lws"); - match byte!(rdr) { - // LWS can follow the chunk size, but no more digits can come - b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size linear white space", - )), - } - } - fn read_extension(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions - } - } - fn read_size_lf( - rdr: &mut BytesMut, size: &mut u64 - ) -> Poll { - match byte!(rdr) { - b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), - b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size LF", - )), - } - } - - fn read_body( - rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option - ) -> Poll { - trace!("Chunked read, remaining={:?}", rem); - - let len = rdr.len() as u64; - if len == 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - let slice; - if *rem > len { - slice = rdr.take().freeze(); - *rem -= len; - } else { - slice = rdr.split_to(*rem as usize).freeze(); - *rem = 0; - } - *buf = Some(slice); - if *rem > 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - Ok(Async::Ready(ChunkedState::BodyCr)) - } - } - } - - fn read_body_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk body CR", - )), - } - } - fn read_body_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::Size)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk body LF", - )), - } - } - fn read_end_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk end CR", - )), - } - } - fn read_end_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::End)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk end LF", - )), } } } #[cfg(test)] mod tests { - use bytes::{Buf, Bytes, BytesMut}; - use futures::{Async, Stream}; + use bytes::{Bytes, BytesMut}; use http::{Method, Version}; - use std::net::Shutdown; - use std::{cmp, io, time}; - use tokio_io::{AsyncRead, AsyncWrite}; use super::*; use application::HttpApplication; use httpmessage::HttpMessage; + use server::h1decoder::Message; + use server::helpers::SharedHttpInnerMessage; use server::settings::WorkerSettings; - use server::{IoStream, KeepAlive}; + use server::KeepAlive; - struct Buffer { - buf: Bytes, - err: Option, - } - - impl Buffer { - fn new(data: &'static str) -> Buffer { - Buffer { - buf: Bytes::from(data), - err: None, + impl Message { + fn message(self) -> SharedHttpInnerMessage { + match self { + Message::Message { msg, payload: _ } => msg, + _ => panic!("error"), } } - fn feed_data(&mut self, data: &'static str) { - let mut b = BytesMut::from(self.buf.as_ref()); - b.extend(data.as_bytes()); - self.buf = b.take().freeze(); - } - } - - impl AsyncRead for Buffer {} - impl io::Read for Buffer { - fn read(&mut self, dst: &mut [u8]) -> Result { - if self.buf.is_empty() { - if self.err.is_some() { - Err(self.err.take().unwrap()) - } else { - Err(io::Error::new(io::ErrorKind::WouldBlock, "")) - } - } else { - let size = cmp::min(self.buf.len(), dst.len()); - let b = self.buf.split_to(size); - dst[..size].copy_from_slice(&b); - Ok(size) + fn is_payload(&self) -> bool { + match *self { + Message::Message { msg: _, payload } => payload, + _ => panic!("error"), } } - } - - impl IoStream for Buffer { - fn shutdown(&mut self, _: Shutdown) -> io::Result<()> { - Ok(()) - } - fn set_nodelay(&mut self, _: bool) -> io::Result<()> { - Ok(()) - } - fn set_linger(&mut self, _: Option) -> io::Result<()> { - Ok(()) - } - } - impl io::Write for Buffer { - fn write(&mut self, buf: &[u8]) -> io::Result { - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - } - impl AsyncWrite for Buffer { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(Async::Ready(())) - } - fn write_buf(&mut self, _: &mut B) -> Poll { - Ok(Async::NotReady) - } - } - - macro_rules! not_ready { - ($e:expr) => { - match $e { - Ok(Async::NotReady) => (), - Err(err) => unreachable!("Unexpected error: {:?}", err), - _ => unreachable!("Should not be ready"), + fn chunk(self) -> Bytes { + match self { + Message::Chunk(chunk) => chunk, + _ => panic!("error"), } - }; + } + fn eof(&self) -> bool { + match *self { + Message::Eof => true, + _ => false, + } + } } macro_rules! parse_ready { ($e:expr) => {{ let settings: WorkerSettings = WorkerSettings::new(Vec::new(), KeepAlive::Os); - match Reader::new().parse($e, &mut BytesMut::new(), &settings) { - Ok(Async::Ready(req)) => req, + match H1Decoder::new().decode($e, &settings) { + Ok(Some(msg)) => HttpRequest::from_message(msg.message()), Ok(_) => unreachable!("Eof during parsing http request"), Err(err) => unreachable!("Error during parsing http request: {:?}", err), } }}; } - macro_rules! reader_parse_ready { - ($e:expr) => { - match $e { - Ok(Async::Ready(req)) => req, - Ok(_) => unreachable!("Eof during parsing http request"), - Err(err) => { - unreachable!("Error during parsing http request: {:?}", err) - } - } - }; - } - macro_rules! expect_parse_err { ($e:expr) => {{ - let mut buf = BytesMut::new(); let settings: WorkerSettings = WorkerSettings::new(Vec::new(), KeepAlive::Os); - match Reader::new().parse($e, &mut buf, &settings) { + match H1Decoder::new().decode($e, &settings) { Err(err) => match err { - ReaderError::Error(_) => (), + DecoderError::Error(_) => (), _ => unreachable!("Parse error expected"), }, _ => unreachable!("Error expected"), @@ -1071,13 +474,13 @@ mod tests { #[test] fn test_parse() { - let mut buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n"); - let mut readbuf = BytesMut::new(); + let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { + let mut reader = H1Decoder::new(); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); @@ -1088,19 +491,19 @@ mod tests { #[test] fn test_parse_partial() { - let mut buf = Buffer::new("PUT /test HTTP/1"); - let mut readbuf = BytesMut::new(); + let mut buf = BytesMut::from("PUT /test HTTP/1"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::NotReady) => (), + let mut reader = H1Decoder::new(); + match reader.decode(&mut buf, &settings) { + Ok(None) => (), _ => unreachable!("Error"), } - buf.feed_data(".1\r\n\r\n"); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { + buf.extend(b".1\r\n\r\n"); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let mut req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::PUT); assert_eq!(req.path(), "/test"); @@ -1111,13 +514,13 @@ mod tests { #[test] fn test_parse_post() { - let mut buf = Buffer::new("POST /test2 HTTP/1.0\r\n\r\n"); - let mut readbuf = BytesMut::new(); + let mut buf = BytesMut::from("POST /test2 HTTP/1.0\r\n\r\n"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { + let mut reader = H1Decoder::new(); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let mut req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_10); assert_eq!(*req.method(), Method::POST); assert_eq!(req.path(), "/test2"); @@ -1128,17 +531,26 @@ mod tests { #[test] fn test_parse_body() { - let mut buf = Buffer::new("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let mut readbuf = BytesMut::new(); + let mut buf = + BytesMut::from("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(mut req)) => { + let mut reader = H1Decoder::new(); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let mut req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body"); + assert_eq!( + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk() + .as_ref(), + b"body" + ); } Ok(_) | Err(_) => unreachable!("Error during parsing http request"), } @@ -1147,17 +559,25 @@ mod tests { #[test] fn test_parse_body_crlf() { let mut buf = - Buffer::new("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let mut readbuf = BytesMut::new(); + BytesMut::from("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(mut req)) => { + let mut reader = H1Decoder::new(); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let mut req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body"); + assert_eq!( + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk() + .as_ref(), + b"body" + ); } Ok(_) | Err(_) => unreachable!("Error during parsing http request"), } @@ -1165,16 +585,15 @@ mod tests { #[test] fn test_parse_partial_eof() { - let mut buf = Buffer::new("GET /test HTTP/1.1\r\n"); - let mut readbuf = BytesMut::new(); + let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); + let mut reader = H1Decoder::new(); + assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); - let mut reader = Reader::new(); - not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) } - - buf.feed_data("\r\n"); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { + buf.extend(b"\r\n"); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); @@ -1185,22 +604,22 @@ mod tests { #[test] fn test_headers_split_field() { - let mut buf = Buffer::new("GET /test HTTP/1.1\r\n"); - let mut readbuf = BytesMut::new(); + let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) } + let mut reader = H1Decoder::new(); + assert!{ reader.decode(&mut buf, &settings).unwrap().is_none() } - buf.feed_data("t"); - not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) } + buf.extend(b"t"); + assert!{ reader.decode(&mut buf, &settings).unwrap().is_none() } - buf.feed_data("es"); - not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) } + buf.extend(b"es"); + assert!{ reader.decode(&mut buf, &settings).unwrap().is_none() } - buf.feed_data("t: value\r\n\r\n"); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { + buf.extend(b"t: value\r\n\r\n"); + match reader.decode(&mut buf, &settings) { + Ok(Some(msg)) => { + let req = HttpRequest::from_message(msg.message()); assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); @@ -1215,32 +634,28 @@ mod tests { #[test] fn test_headers_multi_value() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ Set-Cookie: c1=cookie1\r\n\ Set-Cookie: c2=cookie2\r\n\r\n", ); - let mut readbuf = BytesMut::new(); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + let req = HttpRequest::from_message(msg.message()); - let mut reader = Reader::new(); - match reader.parse(&mut buf, &mut readbuf, &settings) { - Ok(Async::Ready(req)) => { - let val: Vec<_> = req.headers() - .get_all("Set-Cookie") - .iter() - .map(|v| v.to_str().unwrap().to_owned()) - .collect(); - assert_eq!(val[0], "c1=cookie1"); - assert_eq!(val[1], "c2=cookie2"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let val: Vec<_> = req.headers() + .get_all("Set-Cookie") + .iter() + .map(|v| v.to_str().unwrap().to_owned()) + .collect(); + assert_eq!(val[0], "c1=cookie1"); + assert_eq!(val[1], "c2=cookie2"); } #[test] fn test_conn_default_1_0() { - let mut buf = Buffer::new("GET /test HTTP/1.0\r\n\r\n"); + let mut buf = BytesMut::from("GET /test HTTP/1.0\r\n\r\n"); let req = parse_ready!(&mut buf); assert!(!req.keep_alive()); @@ -1248,7 +663,7 @@ mod tests { #[test] fn test_conn_default_1_1() { - let mut buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n"); + let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); let req = parse_ready!(&mut buf); assert!(req.keep_alive()); @@ -1256,7 +671,7 @@ mod tests { #[test] fn test_conn_close() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ connection: close\r\n\r\n", ); @@ -1267,7 +682,7 @@ mod tests { #[test] fn test_conn_close_1_0() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.0\r\n\ connection: close\r\n\r\n", ); @@ -1278,7 +693,7 @@ mod tests { #[test] fn test_conn_keep_alive_1_0() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.0\r\n\ connection: keep-alive\r\n\r\n", ); @@ -1289,7 +704,7 @@ mod tests { #[test] fn test_conn_keep_alive_1_1() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ connection: keep-alive\r\n\r\n", ); @@ -1300,7 +715,7 @@ mod tests { #[test] fn test_conn_other_1_0() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.0\r\n\ connection: other\r\n\r\n", ); @@ -1311,7 +726,7 @@ mod tests { #[test] fn test_conn_other_1_1() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ connection: other\r\n\r\n", ); @@ -1322,32 +737,30 @@ mod tests { #[test] fn test_conn_upgrade() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ upgrade: websockets\r\n\ connection: upgrade\r\n\r\n", ); let req = parse_ready!(&mut buf); - assert!(!req.payload().eof()); assert!(req.upgrade()); } #[test] fn test_conn_upgrade_connect_method() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "CONNECT /test HTTP/1.1\r\n\ content-type: text/plain\r\n\r\n", ); let req = parse_ready!(&mut buf); assert!(req.upgrade()); - assert!(!req.payload().eof()); } #[test] fn test_request_chunked() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); @@ -1360,7 +773,7 @@ mod tests { } // type in chunked - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ transfer-encoding: chnked\r\n\r\n", ); @@ -1375,7 +788,7 @@ mod tests { #[test] fn test_headers_content_length_err_1() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ content-length: line\r\n\r\n", ); @@ -1385,7 +798,7 @@ mod tests { #[test] fn test_headers_content_length_err_2() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ content-length: -1\r\n\r\n", ); @@ -1395,7 +808,7 @@ mod tests { #[test] fn test_invalid_header() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ test line\r\n\r\n", ); @@ -1405,7 +818,7 @@ mod tests { #[test] fn test_invalid_name() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ test[]: line\r\n\r\n", ); @@ -1415,30 +828,39 @@ mod tests { #[test] fn test_http_request_bad_status_line() { - let mut buf = Buffer::new("getpath \r\n\r\n"); + let mut buf = BytesMut::from("getpath \r\n\r\n"); expect_parse_err!(&mut buf); } #[test] fn test_http_request_upgrade() { - let mut buf = Buffer::new( + let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ connection: upgrade\r\n\ upgrade: websocket\r\n\r\n\ some raw data", ); - let mut req = parse_ready!(&mut buf); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req = HttpRequest::from_message(msg.message()); assert!(!req.keep_alive()); assert!(req.upgrade()); assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk() + .as_ref(), b"some raw data" ); } #[test] fn test_http_request_parser_utf8() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ x-test: ั‚ะตัั‚\r\n\r\n", ); @@ -1452,7 +874,7 @@ mod tests { #[test] fn test_http_request_parser_two_slashes() { - let mut buf = Buffer::new("GET //path HTTP/1.1\r\n\r\n"); + let mut buf = BytesMut::from("GET //path HTTP/1.1\r\n\r\n"); let req = parse_ready!(&mut buf); assert_eq!(req.path(), "//path"); @@ -1460,175 +882,175 @@ mod tests { #[test] fn test_http_request_parser_bad_method() { - let mut buf = Buffer::new("!12%()+=~$ /get HTTP/1.1\r\n\r\n"); + let mut buf = BytesMut::from("!12%()+=~$ /get HTTP/1.1\r\n\r\n"); expect_parse_err!(&mut buf); } #[test] fn test_http_request_parser_bad_version() { - let mut buf = Buffer::new("GET //get HT/11\r\n\r\n"); + let mut buf = BytesMut::from("GET //get HT/11\r\n\r\n"); expect_parse_err!(&mut buf); } #[test] fn test_http_request_chunked_payload() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let mut readbuf = BytesMut::new(); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - let mut req = - reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req = HttpRequest::from_message(msg.message()); assert!(req.chunked().unwrap()); - assert!(!req.payload().eof()); - buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); - let _ = req.payload_mut().poll(); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(!req.payload().eof()); + buf.extend(b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk() + .as_ref(), + b"data" + ); + assert_eq!( + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk() + .as_ref(), + b"line" + ); + assert!( + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .eof() ); - assert!(req.payload().eof()); } #[test] fn test_http_request_chunked_payload_and_next_message() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let mut readbuf = BytesMut::new(); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - - let mut reader = Reader::new(); - - let mut req = - reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req = HttpRequest::from_message(msg.message()); assert!(req.chunked().unwrap()); - assert!(!req.payload().eof()); - buf.feed_data( - "4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ - POST /test2 HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", + buf.extend( + b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ + POST /test2 HTTP/1.1\r\n\ + transfer-encoding: chunked\r\n\r\n" + .iter(), ); - let _ = req.payload_mut().poll(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"data"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"line"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.eof()); - let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req2 = HttpRequest::from_message(msg.message()); + assert!(req2.chunked().unwrap()); assert_eq!(*req2.method(), Method::POST); assert!(req2.chunked().unwrap()); - assert!(!req2.payload().eof()); - - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); - assert!(req.payload().eof()); } #[test] fn test_http_request_chunked_payload_chunks() { - let mut buf = Buffer::new( + let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let mut readbuf = BytesMut::new(); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - let mut req = - reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - req.payload_mut().set_read_buffer_capacity(0); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req = HttpRequest::from_message(msg.message()); assert!(req.chunked().unwrap()); - assert!(!req.payload().eof()); - buf.feed_data("4\r\n1111\r\n"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"1111"); + buf.extend(b"4\r\n1111\r\n"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"1111"); - buf.feed_data("4\r\ndata\r"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + buf.extend(b"4\r\ndata\r"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"data"); - buf.feed_data("\n4"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + buf.extend(b"\n4"); + assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); - buf.feed_data("\r"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - buf.feed_data("\n"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + buf.extend(b"\r"); + assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); + buf.extend(b"\n"); + assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); - buf.feed_data("li"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - - buf.feed_data("ne\r\n0\r\n"); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + buf.extend(b"li"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"li"); //trailers //buf.feed_data("test: test\r\n"); //not_ready!(reader.parse(&mut buf, &mut readbuf)); - let _ = req.payload_mut().poll(); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + buf.extend(b"ne\r\n0\r\n"); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert_eq!(msg.chunk().as_ref(), b"ne"); + assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" + buf.extend(b"\r\n"); + assert!( + reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .eof() ); - assert!(!req.payload().eof()); - - buf.feed_data("\r\n"); - let _ = req.payload_mut().poll(); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(req.payload().eof()); } #[test] fn test_parse_chunked_payload_chunk_extension() { - let mut buf = Buffer::new( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", + let mut buf = BytesMut::from( + &"GET /test HTTP/1.1\r\n\ + transfer-encoding: chunked\r\n\r\n"[..], ); - let mut readbuf = BytesMut::new(); let settings = WorkerSettings::::new(Vec::new(), KeepAlive::Os); - let mut reader = Reader::new(); - let mut req = - reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + let mut reader = H1Decoder::new(); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.is_payload()); + let req = HttpRequest::from_message(msg.message()); assert!(req.chunked().unwrap()); - assert!(!req.payload().eof()); - buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") - let _ = req.payload_mut().poll(); - not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(!req.payload().eof()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); - assert!(req.payload().eof()); + buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") + let chunk = reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk(); + assert_eq!(chunk, Bytes::from_static(b"data")); + let chunk = reader + .decode(&mut buf, &settings) + .unwrap() + .unwrap() + .chunk(); + assert_eq!(chunk, Bytes::from_static(b"line")); + let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); + assert!(msg.eof()); } - - /*#[test] - #[should_panic] - fn test_parse_multiline() { - let mut buf = Buffer::new( - "GET /test HTTP/1.1\r\n\ - test: line\r\n \ - continue\r\n\ - test2: data\r\n\ - \r\n", false); - - let mut reader = Reader::new(); - match reader.parse(&mut buf) { - Ok(res) => (), - Err(err) => unreachable!("{:?}", err), - } - }*/ } diff --git a/src/server/h1decoder.rs b/src/server/h1decoder.rs new file mode 100644 index 000000000..d610afc61 --- /dev/null +++ b/src/server/h1decoder.rs @@ -0,0 +1,487 @@ +use std::{io, mem}; + +use bytes::{Bytes, BytesMut}; +use futures::{Async, Poll}; +use httparse; + +use super::helpers::SharedHttpInnerMessage; +use super::settings::WorkerSettings; +use error::ParseError; +use http::header::{HeaderName, HeaderValue}; +use http::{header, HttpTryFrom, Method, Uri, Version}; +use uri::Url; + +const MAX_BUFFER_SIZE: usize = 131_072; +const MAX_HEADERS: usize = 96; + +pub(crate) struct H1Decoder { + decoder: Option, +} + +pub(crate) enum Message { + Message { + msg: SharedHttpInnerMessage, + payload: bool, + }, + Chunk(Bytes), + Eof, +} + +#[derive(Debug)] +pub(crate) enum DecoderError { + Io(io::Error), + Error(ParseError), +} + +impl From for DecoderError { + fn from(err: io::Error) -> DecoderError { + DecoderError::Io(err) + } +} + +impl H1Decoder { + pub fn new() -> H1Decoder { + H1Decoder { decoder: None } + } + + pub fn decode( + &mut self, src: &mut BytesMut, settings: &WorkerSettings, + ) -> Result, DecoderError> { + // read payload + if self.decoder.is_some() { + match self.decoder.as_mut().unwrap().decode(src)? { + Async::Ready(Some(bytes)) => return Ok(Some(Message::Chunk(bytes))), + Async::Ready(None) => { + self.decoder.take(); + return Ok(Some(Message::Eof)); + } + Async::NotReady => return Ok(None), + } + } + + match self.parse_message(src, settings) + .map_err(DecoderError::Error)? + { + Async::Ready((msg, decoder)) => { + if let Some(decoder) = decoder { + self.decoder = Some(decoder); + Ok(Some(Message::Message { + msg, + payload: true, + })) + } else { + Ok(Some(Message::Message { + msg, + payload: false, + })) + } + } + Async::NotReady => { + if src.len() >= MAX_BUFFER_SIZE { + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + Err(DecoderError::Error(ParseError::TooLarge)) + } else { + Ok(None) + } + } + } + } + + fn parse_message( + &self, buf: &mut BytesMut, settings: &WorkerSettings, + ) -> Poll<(SharedHttpInnerMessage, Option), ParseError> { + // Parse http message + let mut has_upgrade = false; + let mut chunked = false; + let mut content_length = None; + + let msg = { + let bytes_ptr = buf.as_ref().as_ptr() as usize; + let mut headers: [httparse::Header; MAX_HEADERS] = + unsafe { mem::uninitialized() }; + + let (len, method, path, version, headers_len) = { + let b = unsafe { + let b: &[u8] = buf; + mem::transmute(b) + }; + let mut req = httparse::Request::new(&mut headers); + match req.parse(b)? { + httparse::Status::Complete(len) => { + let method = Method::from_bytes(req.method.unwrap().as_bytes()) + .map_err(|_| ParseError::Method)?; + let path = Url::new(Uri::try_from(req.path.unwrap())?); + let version = if req.version.unwrap() == 1 { + Version::HTTP_11 + } else { + Version::HTTP_10 + }; + (len, method, path, version, req.headers.len()) + } + httparse::Status::Partial => return Ok(Async::NotReady), + } + }; + + let slice = buf.split_to(len).freeze(); + + // convert headers + let msg = settings.get_http_message(); + { + let msg_mut = msg.get_mut(); + msg_mut.keep_alive = version != Version::HTTP_10; + + for header in headers[..headers_len].iter() { + if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) { + has_upgrade = has_upgrade || name == header::UPGRADE; + + let v_start = header.value.as_ptr() as usize - bytes_ptr; + let v_end = v_start + header.value.len(); + let value = unsafe { + HeaderValue::from_shared_unchecked( + slice.slice(v_start, v_end), + ) + }; + match name { + header::CONTENT_LENGTH => { + if let Ok(s) = value.to_str() { + if let Ok(len) = s.parse::() { + content_length = Some(len) + } else { + debug!("illegal Content-Length: {:?}", len); + return Err(ParseError::Header); + } + } else { + debug!("illegal Content-Length: {:?}", len); + return Err(ParseError::Header); + } + } + // transfer-encoding + header::TRANSFER_ENCODING => { + if let Ok(s) = value.to_str() { + chunked = s.to_lowercase().contains("chunked"); + } else { + return Err(ParseError::Header); + } + } + // connection keep-alive state + header::CONNECTION => { + msg_mut.keep_alive = if let Ok(conn) = value.to_str() { + if version == Version::HTTP_10 + && conn.contains("keep-alive") + { + true + } else { + version == Version::HTTP_11 + && !(conn.contains("close") + || conn.contains("upgrade")) + } + } else { + false + }; + } + _ => (), + } + + msg_mut.headers.append(name, value); + } else { + return Err(ParseError::Header); + } + } + + msg_mut.url = path; + msg_mut.method = method; + msg_mut.version = version; + } + msg + }; + + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + let decoder = if chunked { + // Chunked encoding + Some(EncodingDecoder::chunked()) + } else if let Some(len) = content_length { + // Content-Length + Some(EncodingDecoder::length(len)) + } else if has_upgrade || msg.get_ref().method == Method::CONNECT { + // upgrade(websocket) or connect + Some(EncodingDecoder::eof()) + } else { + None + }; + + Ok(Async::Ready((msg, decoder))) + } +} + +/// Decoders to handle different Transfer-Encodings. +/// +/// If a message body does not include a Transfer-Encoding, it *should* +/// include a Content-Length header. +#[derive(Debug, Clone, PartialEq)] +pub struct EncodingDecoder { + kind: Kind, +} + +impl EncodingDecoder { + pub fn length(x: u64) -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Length(x), + } + } + + pub fn chunked() -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Chunked(ChunkedState::Size, 0), + } + } + + pub fn eof() -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Eof(false), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +enum Kind { + /// A Reader used when a Content-Length header is passed with a positive + /// integer. + Length(u64), + /// A Reader used when Transfer-Encoding is `chunked`. + Chunked(ChunkedState, u64), + /// A Reader used for responses that don't indicate a length or chunked. + /// + /// Note: This should only used for `Response`s. It is illegal for a + /// `Request` to be made with both `Content-Length` and + /// `Transfer-Encoding: chunked` missing, as explained from the spec: + /// + /// > If a Transfer-Encoding header field is present in a response and + /// > the chunked transfer coding is not the final encoding, the + /// > message body length is determined by reading the connection until + /// > it is closed by the server. If a Transfer-Encoding header field + /// > is present in a request and the chunked transfer coding is not + /// > the final encoding, the message body length cannot be determined + /// > reliably; the server MUST respond with the 400 (Bad Request) + /// > status code and then close the connection. + Eof(bool), +} + +#[derive(Debug, PartialEq, Clone)] +enum ChunkedState { + Size, + SizeLws, + Extension, + SizeLf, + Body, + BodyCr, + BodyLf, + EndCr, + EndLf, + End, +} + +impl EncodingDecoder { + pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { + match self.kind { + Kind::Length(ref mut remaining) => { + if *remaining == 0 { + Ok(Async::Ready(None)) + } else { + if body.is_empty() { + return Ok(Async::NotReady); + } + let len = body.len() as u64; + let buf; + if *remaining > len { + buf = body.take().freeze(); + *remaining -= len; + } else { + buf = body.split_to(*remaining as usize).freeze(); + *remaining = 0; + } + trace!("Length read: {}", buf.len()); + Ok(Async::Ready(Some(buf))) + } + } + Kind::Chunked(ref mut state, ref mut size) => { + loop { + let mut buf = None; + // advances the chunked state + *state = try_ready!(state.step(body, size, &mut buf)); + if *state == ChunkedState::End { + trace!("End of chunked stream"); + return Ok(Async::Ready(None)); + } + if let Some(buf) = buf { + return Ok(Async::Ready(Some(buf))); + } + if body.is_empty() { + return Ok(Async::NotReady); + } + } + } + Kind::Eof(ref mut is_eof) => { + if *is_eof { + Ok(Async::Ready(None)) + } else if !body.is_empty() { + Ok(Async::Ready(Some(body.take().freeze()))) + } else { + Ok(Async::NotReady) + } + } + } + } +} + +macro_rules! byte ( + ($rdr:ident) => ({ + if $rdr.len() > 0 { + let b = $rdr[0]; + $rdr.split_to(1); + b + } else { + return Ok(Async::NotReady) + } + }) +); + +impl ChunkedState { + fn step( + &self, body: &mut BytesMut, size: &mut u64, buf: &mut Option, + ) -> Poll { + use self::ChunkedState::*; + match *self { + Size => ChunkedState::read_size(body, size), + SizeLws => ChunkedState::read_size_lws(body), + Extension => ChunkedState::read_extension(body), + SizeLf => ChunkedState::read_size_lf(body, size), + Body => ChunkedState::read_body(body, size, buf), + BodyCr => ChunkedState::read_body_cr(body), + BodyLf => ChunkedState::read_body_lf(body), + EndCr => ChunkedState::read_end_cr(body), + EndLf => ChunkedState::read_end_lf(body), + End => Ok(Async::Ready(ChunkedState::End)), + } + } + fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { + let radix = 16; + match byte!(rdr) { + b @ b'0'...b'9' => { + *size *= radix; + *size += u64::from(b - b'0'); + } + b @ b'a'...b'f' => { + *size *= radix; + *size += u64::from(b + 10 - b'a'); + } + b @ b'A'...b'F' => { + *size *= radix; + *size += u64::from(b + 10 - b'A'); + } + b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => return Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size line: Invalid Size", + )); + } + } + Ok(Async::Ready(ChunkedState::Size)) + } + fn read_size_lws(rdr: &mut BytesMut) -> Poll { + trace!("read_size_lws"); + match byte!(rdr) { + // LWS can follow the chunk size, but no more digits can come + b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size linear white space", + )), + } + } + fn read_extension(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions + } + } + fn read_size_lf( + rdr: &mut BytesMut, size: &mut u64, + ) -> Poll { + match byte!(rdr) { + b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), + b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size LF", + )), + } + } + + fn read_body( + rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option, + ) -> Poll { + trace!("Chunked read, remaining={:?}", rem); + + let len = rdr.len() as u64; + if len == 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + let slice; + if *rem > len { + slice = rdr.take().freeze(); + *rem -= len; + } else { + slice = rdr.split_to(*rem as usize).freeze(); + *rem = 0; + } + *buf = Some(slice); + if *rem > 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + Ok(Async::Ready(ChunkedState::BodyCr)) + } + } + } + + fn read_body_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk body CR", + )), + } + } + fn read_body_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::Size)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk body LF", + )), + } + } + fn read_end_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk end CR", + )), + } + } + fn read_end_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::End)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk end LF", + )), + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 85faf77b3..36d80e2de 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -10,6 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; mod channel; pub(crate) mod encoding; pub(crate) mod h1; +pub(crate) mod h1decoder; mod h1writer; mod h2; mod h2writer;