1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-23 07:28:06 +00:00

fix stuck connection when handler doesn't read payload (#2624)

This commit is contained in:
Rob Ede 2022-02-03 07:03:39 +00:00 committed by GitHub
parent fc5ecdc30b
commit 5ca42df89a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 307 additions and 27 deletions

View file

@ -1,6 +1,14 @@
# Changes
## Unreleased - 2021-xx-xx
### Changed
- `error::DispatcherError` enum is now marked `#[non_exhaustive]`. [#2624]
### Fixed
- Issue where handlers that took payload but then dropped without reading it to EOF it would cause keep-alive connections to become stuck. [#2624]
[#2624]: https://github.com/actix/actix-web/pull/2624
## 3.0.0-rc.1 - 2022-01-31

View file

@ -340,6 +340,7 @@ impl From<PayloadError> for Error {
/// A set of errors that can occur during dispatching HTTP requests.
#[derive(Debug, Display, From)]
#[non_exhaustive]
pub enum DispatchError {
/// Service error.
#[display(fmt = "Service Error")]
@ -373,6 +374,10 @@ pub enum DispatchError {
#[display(fmt = "Connection shutdown timeout")]
DisconnectTimeout,
/// Handler dropped payload before reading EOF.
#[display(fmt = "Handler dropped payload before reading EOF")]
HandlerDroppedPayload,
/// Internal error.
#[display(fmt = "Internal error")]
InternalError,

View file

@ -125,11 +125,13 @@ impl Decoder for Codec {
self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version;
self.conn_type = head.connection_type();
if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{
self.conn_type = ConnectionType::Close
}
match payload {
PayloadType::None => self.payload = None,
PayloadType::Payload(pl) => self.payload = Some(pl),

View file

@ -209,15 +209,16 @@ impl MessageType for Request {
let (len, method, uri, ver, h_len) = {
// SAFETY:
// Create an uninitialized array of `MaybeUninit`. The `assume_init` is
// safe because the type we are claiming to have initialized here is a
// bunch of `MaybeUninit`s, which do not require initialization.
// Create an uninitialized array of `MaybeUninit`. The `assume_init` is safe because the
// type we are claiming to have initialized here is a bunch of `MaybeUninit`s, which
// do not require initialization.
let mut parsed = unsafe {
MaybeUninit::<[MaybeUninit<httparse::Header<'_>>; MAX_HEADERS]>::uninit()
.assume_init()
};
let mut req = httparse::Request::new(&mut []);
match req.parse_with_uninit_headers(src, &mut parsed)? {
httparse::Status::Complete(len) => {
let method = Method::from_bytes(req.method.unwrap().as_bytes())
@ -232,6 +233,7 @@ impl MessageType for Request {
(len, method, uri, version, req.headers.len())
}
httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");

View file

@ -21,7 +21,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
};
use super::{
@ -151,7 +151,8 @@ pin_project! {
error: Option<DispatchError>,
#[pin]
state: State<S, B, X>,
pub(super) state: State<S, B, X>,
// when Some(_) dispatcher is in state of receiving request payload
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,
@ -174,7 +175,7 @@ enum DispatcherMessage {
pin_project! {
#[project = StateProj]
enum State<S, B, X>
pub(super) enum State<S, B, X>
where
S: Service<Request>,
X: Service<Request, Response = Request>,
@ -194,7 +195,7 @@ where
X: Service<Request, Response = Request>,
B: MessageBody,
{
fn is_none(&self) -> bool {
pub(super) fn is_none(&self) -> bool {
matches!(self, State::None)
}
}
@ -686,12 +687,74 @@ where
let can_not_read = !self.can_read(cx);
// limit amount of non-processed requests
if pipeline_queue_full || can_not_read {
if pipeline_queue_full {
return Ok(false);
}
let mut this = self.as_mut().project();
if can_not_read {
log::debug!("cannot read request payload");
if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
log::debug!("handler dropped payload early; attempt to clean connection");
// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
// payload decoded did not yield EOF yet
Message::Chunk(Some(_)) => {
// if non-clean connection, next loop iter will detect empty
// read buffer and close connection
}
// connection is in clean state for next request
Message::Chunk(None) => {
log::debug!("connection successfully cleaned");
// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);
// break out of payload decode loop
break;
}
// Either whole payload is read and loop is broken or more data
// was expected in which case connection is closed. In both
// situations dispatcher cannot get here.
Message::Item(_) => {
unreachable!("dispatcher is in payload receive state")
}
}
}
// not enough info to decide if connection is going to be clean or not
None => {
log::error!(
"handler did not read whole payload and dispatcher could not \
drain read buf; return 500 and close connection"
);
this.flags.insert(Flags::SHUTDOWN);
let mut res = Response::internal_server_error().drop_body();
res.head_mut().set_connection_type(ConnectionType::Close);
this.messages.push_back(DispatcherMessage::Error(res));
*this.error = Some(DispatchError::HandlerDroppedPayload);
return Ok(true);
}
}
}
}
} else {
// can_not_read and no request payload
return Ok(false);
}
}
let mut updated = false;
loop {

View file

@ -1,6 +1,6 @@
use std::{future::Future, str, task::Poll, time::Duration};
use actix_rt::time::sleep;
use actix_rt::{pin, time::sleep};
use actix_service::fn_service;
use actix_utils::future::{ready, Ready};
use bytes::Bytes;
@ -53,6 +53,14 @@ fn echo_path_service(
})
}
fn drop_payload_service(
) -> impl Service<Request, Response = Response<&'static str>, Error = Error> {
fn_service(|mut req: Request| async move {
let _ = req.take_payload();
Ok::<_, Error>(Response::with_body(StatusCode::OK, "payload dropped"))
})
}
fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
fn_service(|mut req: Request| {
Box::pin(async move {
@ -89,7 +97,7 @@ async fn late_request() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -156,7 +164,7 @@ async fn oneshot_connection() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -173,13 +181,16 @@ async fn oneshot_connection() {
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 5\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/abcd\
";
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 5
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
/abcd
",
);
assert_eq!(
res,
@ -188,7 +199,7 @@ async fn oneshot_connection() {
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
String::from_utf8_lossy(&exp)
);
})
.await;
@ -214,7 +225,7 @@ async fn keep_alive_timeout() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -293,7 +304,7 @@ async fn keep_alive_follow_up_req() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -413,7 +424,7 @@ async fn req_parse_err() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
match h1.as_mut().poll(cx) {
Poll::Pending => panic!(),
@ -459,7 +470,7 @@ async fn pipelining_ok_then_ok() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -529,7 +540,7 @@ async fn pipelining_ok_then_bad() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -601,7 +612,7 @@ async fn expect_handling() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_pending());
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -678,7 +689,7 @@ async fn expect_eager() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -761,7 +772,7 @@ async fn upgrade_handling() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. }));
@ -771,3 +782,192 @@ async fn upgrade_handling() {
})
.await;
}
#[actix_rt::test]
async fn handler_drop_payload() {
let _ = env_logger::try_init();
let mut buf = TestBuffer::new(http_msg(
r"
POST /drop-payload HTTP/1.1
Content-Length: 3
abc
",
));
let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);
let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual
assert_eq!(h1.poll_count, 1);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
assert!(inner.state.is_none());
}
})
.await;
lazy(|cx| {
// add message that claims to have payload longer than provided
buf.extend_read_buf(http_msg(
r"
POST /drop-payload HTTP/1.1
Content-Length: 200
abc
",
));
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual => manual
assert_eq!(h1.poll_count, 2);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect response immediately even though request side has not finished reading payload
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_ready());
// polls: manual => manual => manual
assert_eq!(h1.poll_count, 3);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect that unrequested error response is sent back since connection could not be cleaned
let exp = http_msg(
r"
HTTP/1.1 500 Internal Server Error
content-length: 0
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
}
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
let mut msg = msg
.as_ref()
.trim()
.split('\n')
.into_iter()
.map(|line| [line.trim_start(), "\r"].concat())
.collect::<Vec<_>>()
.join("\n");
// remove trailing \r
msg.pop();
if !msg.is_empty() && !msg.contains("\r\n\r\n") {
msg.push_str("\r\n\r\n");
}
BytesMut::from(msg.as_bytes())
}
#[test]
fn http_msg_creates_msg() {
assert_eq!(http_msg(r""), "");
assert_eq!(
http_msg(
r"
POST / HTTP/1.1
Content-Length: 3
abc
"
),
"POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc"
);
assert_eq!(
http_msg(
r"
GET / HTTP/1.1
Content-Length: 3
"
),
"GET / HTTP/1.1\r\nContent-Length: 3\r\n\r\n"
);
}