From c3d5e4301ac53badd70a35becbe9dd5fb9f2c713 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 15 Dec 2017 13:10:12 -0800 Subject: [PATCH] cleanup h1 parse --- README.md | 4 +- src/h1.rs | 164 +++++++++++++++++++++++++----------------------------- 2 files changed, 78 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index c6ceee9c3..4026830f7 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Each result is best of five runs. All measurements are req/sec. Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline ---- | -------- | ---------- | -------- | ---------- | -------- | ---------- -Actix | 87.200 | 813.200 | 122.100 | 1.877.000 | 107.400 | 2.390.000 +Actix | 89.100 | 815.200 | 122.100 | 1.877.000 | 107.400 | 2.350.000 Gotham | 61.000 | 178.000 | | | | Iron | | | | | 94.500 | 78.000 Rocket | | | | | 95.500 | failed @@ -65,7 +65,7 @@ Some notes on results. Iron and Rocket got tested with 8 threads, which showed best results. Gothan and tokio-minihttp seem does not support multithreading, or at least i couldn't figured out. I manually enabled pipelining for *Shio* and Gotham*. While shio seems support multithreading, but it showed -absolutly same results for any how number of threads (maybe macos?) +absolutly same results for any how number of threads (maybe macos problem?) Rocket completely failed in pipelined tests. ## Examples diff --git a/src/h1.rs b/src/h1.rs index 7687810c3..91801e836 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -215,10 +215,10 @@ impl Http1 // read incoming data while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) && - self.tasks.len() < MAX_PIPELINED_MESSAGES { - match self.reader.parse(self.stream.get_mut(), - &mut self.read_buf, &self.settings) - { + self.tasks.len() < MAX_PIPELINED_MESSAGES + { + match self.reader.parse(self.stream.get_mut(), + &mut self.read_buf, &self.settings) { Ok(Async::Ready(Item::Http1(mut req))) => { not_ready = false; @@ -405,77 +405,58 @@ impl Reader { settings: &WorkerSettings) -> Poll where T: AsyncRead { - loop { - match self.decode(buf)? { - Decoding::Paused => return Ok(Async::NotReady), - Decoding::Ready => { - self.payload = None; - break - }, - Decoding::NotReady => { - match self.read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - if let Some(ref mut payload) = self.payload { - payload.tx.set_error(PayloadError::Incomplete); - } - // http channel should not deal with payload errors - return Err(ReaderError::Payload) - } - Ok(Async::Ready(_)) => { - continue - } - Ok(Async::NotReady) => break, - Err(err) => { - if let Some(ref mut payload) = self.payload { - payload.tx.set_error(err.into()); - } - // http channel should not deal with payload errors - return Err(ReaderError::Payload) - } + // read payload + if self.payload.is_some() { + match self.read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + if let Some(ref mut payload) = self.payload { + payload.tx.set_error(PayloadError::Incomplete); } + // http channel should not deal with payload errors + return Err(ReaderError::Payload) + }, + Err(err) => { + if let Some(ref mut payload) = self.payload { + payload.tx.set_error(err.into()); + } + // http channel should not deal with payload errors + return Err(ReaderError::Payload) } + _ => (), + } + match self.decode(buf)? { + Decoding::Ready => self.payload = None, + Decoding::Paused | Decoding::NotReady => return Ok(Async::NotReady), } } + // if buf is empty parse_message will always return NotReady, let's avoid that + let read = if buf.is_empty() { + match self.read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + debug!("Ignored premature client disconnection"); + return Err(ReaderError::Disconnect); + }, + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(err) => + return Err(ReaderError::Error(err.into())) + } + false + } else { + true + }; + loop { match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { Message::Http1(msg, decoder) => { + // process payload if let Some(payload) = decoder { self.payload = Some(payload); - - loop { - match self.decode(buf)? { - Decoding::Paused => - break, - Decoding::Ready => { - self.payload = None; - break - }, - Decoding::NotReady => { - match self.read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - trace!("parse eof"); - if let Some(ref mut payload) = self.payload { - payload.tx.set_error( - PayloadError::Incomplete); - } - // http channel should deal with payload errors - return Err(ReaderError::Payload) - } - Ok(Async::Ready(_)) => { - continue - } - Ok(Async::NotReady) => break, - Err(err) => { - if let Some(ref mut payload) = self.payload { - payload.tx.set_error(err.into()); - } - // http channel should deal with payload errors - return Err(ReaderError::Payload) - } - } - } - } + match self.decode(buf)? { + Decoding::Paused | Decoding::NotReady => (), + Decoding::Ready => self.payload = None, } } self.h1 = true; @@ -489,42 +470,49 @@ impl Reader { }, Message::NotReady => { if buf.capacity() >= MAX_BUFFER_SIZE { - debug!("MAX_BUFFER_SIZE reached, closing"); + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); return Err(ReaderError::Error(ParseError::TooLarge)); } + if read { + match self.read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + debug!("Ignored premature client disconnection"); + return Err(ReaderError::Disconnect); + }, + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(err) => + return Err(ReaderError::Error(err.into())) + } + } else { + return Ok(Async::NotReady) + } }, } - match self.read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - debug!("Ignored premature client disconnection"); - return Err(ReaderError::Disconnect); - }, - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => - return Ok(Async::NotReady), - Err(err) => - return Err(ReaderError::Error(err.into())) - } } } fn read_from_io(&mut self, io: &mut T, buf: &mut BytesMut) - -> Poll { - if buf.remaining_mut() < LW_BUFFER_SIZE { - buf.reserve(HW_BUFFER_SIZE); - } + -> Poll + { unsafe { - let n = match io.read(buf.bytes_mut()) { - Ok(n) => n, + if buf.remaining_mut() < LW_BUFFER_SIZE { + buf.reserve(HW_BUFFER_SIZE); + } + match io.read(buf.bytes_mut()) { + Ok(n) => { + buf.advance_mut(n); + Ok(Async::Ready(n)) + }, Err(e) => { if e.kind() == io::ErrorKind::WouldBlock { - return Ok(Async::NotReady); + Ok(Async::NotReady) + } else { + Err(e) } - return Err(e) } - }; - buf.advance_mut(n); - Ok(Async::Ready(n)) + } } }