diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 2e729b78d..cb15bc1e9 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -728,6 +728,53 @@ where let mut read_some = false; loop { + // Return early when read buf exceed decoder's max buffer size. + if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE { + /* + At this point it's not known IO stream is still scheduled + to be waked up. so force wake up dispatcher just in case. + + Reason: + AsyncRead mostly would only have guarantee wake up + when the poll_read return Poll::Pending. + + Case: + When read_buf is beyond max buffer size the early return + could be successfully be parsed as a new Request. + This case would not generate ParseError::TooLarge + and at this point IO stream is not fully read to Pending + and would result in dispatcher stuck until timeout (KA) + + Note: + This is a perf choice to reduce branch on + ::decode. + + A Request head too large to parse is only checked on + httparse::Status::Partial condition. + */ + if this.payload.is_none() { + /* + When dispatcher has a payload the responsibility of + wake up it would be shift to h1::payload::Payload. + + Reason: + Self wake up when there is payload would waste poll + and/or result in over read. + + Case: + When payload is (partial) dropped by user there is + no need to do read anymore. + At this case read_buf could always remain beyond + MAX_BUFFER_SIZE and self wake up would be busy poll + dispatcher and waste resource. + + */ + cx.waker().wake_by_ref(); + } + + return Ok(false); + } + // grow buffer if necessary. let remaining = this.read_buf.capacity() - this.read_buf.len(); if remaining < LW_BUFFER_SIZE { @@ -735,35 +782,18 @@ where } match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { - Poll::Pending => return Ok(false), Poll::Ready(Ok(n)) => { if n == 0 { return Ok(true); - } else { - // Return early when read buf exceed decoder's max buffer size. - if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE { - // at this point it's not known io is still scheduled to - // be waked up. so force wake up dispatcher just in case. - // TODO: figure out the overhead. - if this.payload.is_none() { - // When dispatcher has a payload. The responsibility of - // wake up stream would be shift to PayloadSender. - // Therefore no self wake up is needed. - cx.waker().wake_by_ref(); - } - return Ok(false); - } - - read_some = true; } + read_some = true; } + Poll::Pending => return Ok(false), Poll::Ready(Err(err)) => { - return if err.kind() == io::ErrorKind::WouldBlock { - Ok(false) - } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { - Ok(true) - } else { - Err(DispatchError::Io(err)) + return match err.kind() { + io::ErrorKind::WouldBlock => Ok(false), + io::ErrorKind::ConnectionReset if read_some => Ok(true), + _ => Err(DispatchError::Io(err)), } } } @@ -985,7 +1015,7 @@ mod tests { None, ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); match h1.as_mut().poll(cx) { Poll::Pending => panic!(), @@ -1025,7 +1055,7 @@ mod tests { None, ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal(_))); @@ -1079,7 +1109,7 @@ mod tests { None, ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal(_))); @@ -1138,7 +1168,7 @@ mod tests { ", ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); assert!(h1.as_mut().poll(cx).is_pending()); assert!(matches!(&h1.inner, DispatcherState::Normal(_))); @@ -1210,7 +1240,7 @@ mod tests { ", ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Normal(_))); @@ -1271,7 +1301,7 @@ mod tests { ", ); - futures_util::pin_mut!(h1); + actix_rt::pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Upgrade(_)));