1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-24 07:58:07 +00:00

fix potential over read (#1991)

This commit is contained in:
fakeshadow 2021-02-14 09:36:18 -08:00 committed by GitHub
parent 7fa6333a0c
commit 308b70b039
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -728,6 +728,53 @@ where
let mut read_some = false; let mut read_some = false;
loop { loop {
// Return early when read buf exceed decoder's max buffer size.
if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE {
/*
At this point it's not known IO stream is still scheduled
to be waked up. so force wake up dispatcher just in case.
Reason:
AsyncRead mostly would only have guarantee wake up
when the poll_read return Poll::Pending.
Case:
When read_buf is beyond max buffer size the early return
could be successfully be parsed as a new Request.
This case would not generate ParseError::TooLarge
and at this point IO stream is not fully read to Pending
and would result in dispatcher stuck until timeout (KA)
Note:
This is a perf choice to reduce branch on
<Request as MessageType>::decode.
A Request head too large to parse is only checked on
httparse::Status::Partial condition.
*/
if this.payload.is_none() {
/*
When dispatcher has a payload the responsibility of
wake up it would be shift to h1::payload::Payload.
Reason:
Self wake up when there is payload would waste poll
and/or result in over read.
Case:
When payload is (partial) dropped by user there is
no need to do read anymore.
At this case read_buf could always remain beyond
MAX_BUFFER_SIZE and self wake up would be busy poll
dispatcher and waste resource.
*/
cx.waker().wake_by_ref();
}
return Ok(false);
}
// grow buffer if necessary. // grow buffer if necessary.
let remaining = this.read_buf.capacity() - this.read_buf.len(); let remaining = this.read_buf.capacity() - this.read_buf.len();
if remaining < LW_BUFFER_SIZE { if remaining < LW_BUFFER_SIZE {
@ -735,35 +782,18 @@ where
} }
match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
Poll::Pending => return Ok(false),
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
if n == 0 { if n == 0 {
return Ok(true); return Ok(true);
} else {
// Return early when read buf exceed decoder's max buffer size.
if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
if this.payload.is_none() {
// When dispatcher has a payload. The responsibility of
// wake up stream would be shift to PayloadSender.
// Therefore no self wake up is needed.
cx.waker().wake_by_ref();
}
return Ok(false);
}
read_some = true;
} }
read_some = true;
} }
Poll::Pending => return Ok(false),
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
return if err.kind() == io::ErrorKind::WouldBlock { return match err.kind() {
Ok(false) io::ErrorKind::WouldBlock => Ok(false),
} else if err.kind() == io::ErrorKind::ConnectionReset && read_some { io::ErrorKind::ConnectionReset if read_some => Ok(true),
Ok(true) _ => Err(DispatchError::Io(err)),
} else {
Err(DispatchError::Io(err))
} }
} }
} }
@ -985,7 +1015,7 @@ mod tests {
None, None,
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
match h1.as_mut().poll(cx) { match h1.as_mut().poll(cx) {
Poll::Pending => panic!(), Poll::Pending => panic!(),
@ -1025,7 +1055,7 @@ mod tests {
None, None,
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal(_))); assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
@ -1079,7 +1109,7 @@ mod tests {
None, None,
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal(_))); assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
@ -1138,7 +1168,7 @@ mod tests {
", ",
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_pending()); assert!(h1.as_mut().poll(cx).is_pending());
assert!(matches!(&h1.inner, DispatcherState::Normal(_))); assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
@ -1210,7 +1240,7 @@ mod tests {
", ",
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready()); assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Normal(_))); assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
@ -1271,7 +1301,7 @@ mod tests {
", ",
); );
futures_util::pin_mut!(h1); actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready()); assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Upgrade(_))); assert!(matches!(&h1.inner, DispatcherState::Upgrade(_)));