From 74377ef73df060dffdb123f74808158bc5a2d435 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 9 Feb 2018 16:20:10 -0800 Subject: [PATCH] fix back pressure for h1 import stream --- src/payload.rs | 6 ++- src/server/encoding.rs | 4 ++ src/server/h1.rs | 113 +++++++++++++++++++++++------------------ 3 files changed, 71 insertions(+), 52 deletions(-) diff --git a/src/payload.rs b/src/payload.rs index a7b008132..c5c63e786 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -260,6 +260,7 @@ impl PayloadWriter for PayloadSender { } } + #[inline] fn capacity(&self) -> usize { if let Some(shared) = self.inner.upgrade() { shared.borrow().capacity() @@ -327,10 +328,10 @@ impl Inner { if let Some(data) = self.items.pop_front() { self.len -= data.len(); Ok(Async::Ready(Some(PayloadItem(data)))) - } else if self.eof { - Ok(Async::Ready(None)) } else if let Some(err) = self.err.take() { Err(err) + } else if self.eof { + Ok(Async::Ready(None)) } else { self.task = Some(current_task()); Ok(Async::NotReady) @@ -439,6 +440,7 @@ impl Inner { self.items.push_front(data); } + #[inline] fn capacity(&self) -> usize { if self.len > self.buf_size { 0 diff --git a/src/server/encoding.rs b/src/server/encoding.rs index b5213efd9..bf872a499 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -94,6 +94,7 @@ impl PayloadType { } impl PayloadWriter for PayloadType { + #[inline] fn set_error(&mut self, err: PayloadError) { match *self { PayloadType::Sender(ref mut sender) => sender.set_error(err), @@ -101,6 +102,7 @@ impl PayloadWriter for PayloadType { } } + #[inline] fn feed_eof(&mut self) { match *self { PayloadType::Sender(ref mut sender) => sender.feed_eof(), @@ -108,6 +110,7 @@ impl PayloadWriter for PayloadType { } } + #[inline] fn feed_data(&mut self, data: Bytes) { match *self { PayloadType::Sender(ref mut sender) => sender.feed_data(data), @@ -115,6 +118,7 @@ impl PayloadWriter for PayloadType { } } + #[inline] fn capacity(&self) -> usize { match *self { PayloadType::Sender(ref sender) => sender.capacity(), diff --git a/src/server/h1.rs b/src/server/h1.rs index b039b09ed..f2578b3b0 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -16,7 +16,7 @@ use pipeline::Pipeline; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use error::{ParseError, PayloadError, ResponseError}; -use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; +use payload::{Payload, PayloadWriter}; use super::{utils, Writer}; use super::h1writer::H1Writer; @@ -319,7 +319,6 @@ struct Reader { } enum Decoding { - Paused, Ready, NotReady, } @@ -343,61 +342,76 @@ impl Reader { } } - fn decode(&mut self, buf: &mut BytesMut) -> std::result::Result { - if let Some(ref mut payload) = self.payload { - if payload.tx.capacity() > DEFAULT_BUFFER_SIZE { - return Ok(Decoding::Paused) - } - loop { - match payload.decoder.decode(buf) { - Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes) - }, - Ok(Async::Ready(None)) => { - payload.tx.feed_eof(); - return Ok(Decoding::Ready) - }, - Ok(Async::NotReady) => return Ok(Decoding::NotReady), - Err(err) => { - payload.tx.set_error(err.into()); - return Err(ReaderError::Payload) - } + #[inline] + fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo) + -> std::result::Result + { + loop { + match payload.decoder.decode(buf) { + Ok(Async::Ready(Some(bytes))) => { + payload.tx.feed_data(bytes) + }, + Ok(Async::Ready(None)) => { + payload.tx.feed_eof(); + return Ok(Decoding::Ready) + }, + Ok(Async::NotReady) => return Ok(Decoding::NotReady), + Err(err) => { + payload.tx.set_error(err.into()); + return Err(ReaderError::Payload) } } - } else { - return Ok(Decoding::Ready) } } - + pub fn parse(&mut self, io: &mut T, buf: &mut BytesMut, settings: &WorkerSettings) -> Poll where T: IoStream { // read payload - if self.payload.is_some() { - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - if let Some(ref mut payload) = self.payload { - payload.tx.set_error(PayloadError::Incomplete); - } - // http channel should not deal with payload errors - return Err(ReaderError::Payload) - }, - Err(err) => { - if let Some(ref mut payload) = self.payload { - payload.tx.set_error(err.into()); - } - // http channel should not deal with payload errors - return Err(ReaderError::Payload) + let done = { + if let Some(ref mut payload) = self.payload { + if payload.tx.capacity() == 0 { + return Ok(Async::NotReady) } - _ => (), + match utils::read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + payload.tx.set_error(PayloadError::Incomplete); + + // http channel should not deal with payload errors + return Err(ReaderError::Payload) + }, + Err(err) => { + payload.tx.set_error(err.into()); + + // http channel should not deal with payload errors + return Err(ReaderError::Payload) + } + _ => (), + } + loop { + match payload.decoder.decode(buf) { + Ok(Async::Ready(Some(bytes))) => { + payload.tx.feed_data(bytes) + }, + Ok(Async::Ready(None)) => { + payload.tx.feed_eof(); + break true + }, + Ok(Async::NotReady) => + break false, + Err(err) => { + payload.tx.set_error(err.into()); + return Err(ReaderError::Payload) + } + } + } + } else { + false } - match self.decode(buf)? { - Decoding::Ready => self.payload = None, - Decoding::Paused | Decoding::NotReady => return Ok(Async::NotReady), - } - } + }; + if done { self.payload = None } // if buf is empty parse_message will always return NotReady, let's avoid that let read = if buf.is_empty() { @@ -421,11 +435,10 @@ impl Reader { match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { Async::Ready((msg, decoder)) => { // process payload - if let Some(payload) = decoder { - self.payload = Some(payload); - match self.decode(buf)? { - Decoding::Paused | Decoding::NotReady => (), - Decoding::Ready => self.payload = None, + if let Some(mut payload) = decoder { + match self.decode(buf, &mut payload)? { + Decoding::Ready => (), + Decoding::NotReady => self.payload = Some(payload), } } return Ok(Async::Ready(msg));