1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-05-09 03:53:00 +00:00

add send error response state

add changelog

add send error response state

remove comment
This commit is contained in:
Rob Ede 2022-07-24 15:27:54 +01:00
parent e0a88cea8d
commit dd8692e000
No known key found for this signature in database
GPG key ID: 97C636207D3EF933
3 changed files with 157 additions and 72 deletions

View file

@ -1,6 +1,10 @@
# Changes
## Unreleased - 2022-xx-xx
### Fixed
- Dropping the payload early and causing unclean connections no longer causes erroneous 500 responses. [#2745]
[#2745]: https://github.com/actix/actix-web/issues/2745
## 3.2.1 - 2022-07-02

View file

@ -22,7 +22,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
};
use super::{
@ -185,7 +185,9 @@ pin_project! {
None,
ExpectCall { #[pin] fut: X::Future },
ServiceCall { #[pin] fut: S::Future },
SendResponse { res: Option<Response<B>> },
SendPayload { #[pin] body: B },
SendErrorResponse { res: Option<Response<BoxBody>> },
SendErrorPayload { #[pin] body: BoxBody },
}
}
@ -216,9 +218,15 @@ where
Self::ServiceCall { .. } => {
f.debug_struct("State::ServiceCall").finish_non_exhaustive()
}
Self::SendResponse { .. } => f
.debug_struct("State::SendResponse")
.finish_non_exhaustive(),
Self::SendPayload { .. } => {
f.debug_struct("State::SendPayload").finish_non_exhaustive()
}
Self::SendErrorResponse { .. } => f
.debug_struct("State::SendErrorResponse")
.finish_non_exhaustive(),
Self::SendErrorPayload { .. } => f
.debug_struct("State::SendErrorPayload")
.finish_non_exhaustive(),
@ -379,11 +387,8 @@ where
Ok(size)
}
fn send_response(
mut self: Pin<&mut Self>,
res: Response<()>,
body: B,
) -> Result<(), DispatchError> {
fn send_response(mut self: Pin<&mut Self>, res: Response<B>) -> Result<(), DispatchError> {
let (res, body) = res.replace_body(());
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
@ -397,11 +402,17 @@ where
Ok(())
}
fn queue_response(self: Pin<&mut Self>, res: Response<B>) {
self.project()
.state
.set(State::SendResponse { res: Some(res) });
}
fn send_error_response(
mut self: Pin<&mut Self>,
res: Response<()>,
body: BoxBody,
res: Response<BoxBody>,
) -> Result<(), DispatchError> {
let (res, body) = res.replace_body(());
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
@ -415,6 +426,12 @@ where
Ok(())
}
fn queue_error_response(self: Pin<&mut Self>, res: Response<BoxBody>) {
self.project()
.state
.set(State::SendErrorResponse { res: Some(res) });
}
fn send_continue(self: Pin<&mut Self>) {
self.project()
.write_buf
@ -449,7 +466,8 @@ where
// send_response would update InnerDispatcher state to SendPayload or None
// (If response body is empty)
// continue loop to poll it
self.as_mut().send_error_response(res, BoxBody::new(()))?;
self.as_mut()
.queue_error_response(res.set_body(BoxBody::new(())));
}
// return with upgrade request and poll it exclusively
@ -470,15 +488,12 @@ where
match fut.poll(cx) {
// service call resolved. send response.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)?;
self.as_mut().queue_response(res.into());
}
// send service call error as response
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)?;
self.as_mut().queue_error_response(err.into());
}
// service call pending and could be waiting for more chunk messages
@ -486,14 +501,32 @@ where
Poll::Pending => {
// no new message is decoded and no new payload is fed
// nothing to do except waiting for new incoming data from client
if !self.as_mut().poll_request(cx)? {
return Ok(PollResponse::DoNothing);
}
// optimisation disabled so that poll_request is called from only one place
// if !self.as_mut().poll_request(cx)? {
return Ok(PollResponse::DoNothing);
// }
// else loop
}
}
}
StateProj::SendResponse { res } => {
let mut res = res.take().expect("response should be take-able");
if this.flags.contains(Flags::SHUTDOWN) {
trace!("shutdown flag set; assuming dirty read I/O");
// shutdown flags occur when read I/O is not clean so connections should be
// closed to avoid stuck or erroneous errors on next request
res.head_mut().set_connection_type(ConnectionType::Close);
}
self.send_response(res)?;
return Ok(PollResponse::DrainWriteBuf);
}
StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
@ -529,6 +562,23 @@ where
return Ok(PollResponse::DrainWriteBuf);
}
StateProj::SendErrorResponse { res } => {
// TODO: de-dupe impl with SendResponse
let mut res = res.take().expect("response should be take-able");
if this.flags.contains(Flags::SHUTDOWN) {
trace!("shutdown flag set; assuming dirty read I/O");
// shutdown flags occur when read I/O is not clean so connections should be
// closed to avoid stuck or erroneous errors on next request
res.head_mut().set_connection_type(ConnectionType::Close);
}
self.send_error_response(res)?;
return Ok(PollResponse::DrainWriteBuf);
}
StateProj::SendErrorPayload { mut body } => {
// TODO: de-dupe impl with SendPayload
@ -583,9 +633,7 @@ where
// send expect error as response
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)?;
self.as_mut().queue_error_response(err.into());
}
// expect must be solved before progress can be made.
@ -637,9 +685,8 @@ where
// on success to notify the dispatcher a new state is set and the outer loop
// should be continued
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
return self.send_error_response(res, body);
self.queue_error_response(err.into());
return Ok(());
}
// future is pending; return Ok(()) to notify that a new state is
@ -655,8 +702,8 @@ where
// to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)
self.as_mut().queue_response(res.into());
Ok(())
}
// see the comment on ExpectCall state branch's Pending
@ -664,9 +711,8 @@ where
// see the comment on ExpectCall state branch's Ready(Err(_))
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)
self.as_mut().queue_error_response(err.into());
Ok(())
}
};
}
@ -688,15 +734,13 @@ where
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES;
let can_not_read = !self.can_read(cx);
// limit amount of non-processed requests
if pipeline_queue_full || can_not_read {
if pipeline_queue_full {
return Ok(false);
}
let mut this = self.as_mut().project();
let mut updated = false;
// decode from read buf as many full requests as possible
@ -829,6 +873,72 @@ where
}
}
let can_read = self.can_read(cx);
let mut this = self.as_mut().project();
if !can_read {
// request payload is not readable...
tracing::debug!("cannot read request payload");
if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
tracing::debug!(
"handler dropped payload early; attempt to clean connection"
);
// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
// payload decoded did not yield EOF yet
Message::Chunk(Some(_)) => {
// if non-clean connection, next loop iter will detect empty
// read buffer and close connection
}
// connection is in clean state for next request
Message::Chunk(None) => {
tracing::debug!("connection successfully cleaned");
// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);
// break out of payload decode loop
break;
}
// Either whole payload is read and loop is broken or more data
// was expected in which case connection is closed. In both
// situations dispatcher cannot get here.
Message::Item(_) => {
unreachable!("dispatcher is in payload receive state")
}
}
}
// not enough info to decide if connection is going to be clean or not
None => {
tracing::debug!(
"handler did not read whole payload and dispatcher could not \
drain read buf; close connection"
);
this.flags.insert(Flags::SHUTDOWN);
return Ok(updated);
}
}
}
}
} else {
// can_not_read and no request payload
return Ok(false);
}
}
Ok(updated)
}
@ -844,10 +954,10 @@ where
trace!("timed out on slow request; replying with 408 and closing connection");
let _ = self.as_mut().send_error_response(
Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
BoxBody::new(()),
);
let mut res =
Response::with_body(StatusCode::REQUEST_TIMEOUT, BoxBody::new(()));
res.head_mut().set_connection_type(ConnectionType::Close);
self.as_mut().send_error_response(res)?;
self.project().flags.insert(Flags::SHUTDOWN);
}
@ -1123,6 +1233,7 @@ where
}
}
// process request(s) and queue response
inner.as_mut().poll_request(cx)?;
if should_disconnect {

View file

@ -861,56 +861,26 @@ async fn handler_drop_payload() {
",
));
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual => manual
assert_eq!(h1.poll_count, 2);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect response immediately even though request side has not finished reading payload
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_ready());
// polls: manual => manual => manual
// polls: manual => manual => shutdown
assert_eq!(h1.poll_count, 3);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect that unrequested error response is sent back since connection could not be cleaned
// expect response immediately even though request side has not finished reading payload
// since write buffer was "too short" we should expect a closed connection hint
let exp = http_msg(
// connection: close
r"
HTTP/1.1 500 Internal Server Error
content-length: 0
HTTP/1.1 200 OK
content-length: 15
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);