1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-20 14:08:07 +00:00

clean up poll_response. add comments (#1978)

This commit is contained in:
fakeshadow 2021-02-11 06:54:42 -08:00 committed by GitHub
parent d9d0d1d1a2
commit 75a9a72e78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -139,27 +139,14 @@ where
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
matches!(self, State::None) matches!(self, State::None)
} }
}
fn is_call(&self) -> bool {
matches!(self, State::ServiceCall(_))
}
}
enum PollResponse { enum PollResponse {
Upgrade(Request), Upgrade(Request),
DoNothing, DoNothing,
DrainWriteBuf, DrainWriteBuf,
} }
impl PartialEq for PollResponse {
fn eq(&self, other: &PollResponse) -> bool {
match self {
PollResponse::DrainWriteBuf => matches!(other, PollResponse::DrainWriteBuf),
PollResponse::DoNothing => matches!(other, PollResponse::DoNothing),
_ => false,
}
}
}
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
@ -323,9 +310,10 @@ where
message: Response<()>, message: Response<()>,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let size = body.size();
let mut this = self.project(); let mut this = self.project();
this.codec this.codec
.encode(Message::Item((message, body.size())), &mut this.write_buf) .encode(Message::Item((message, size)), &mut this.write_buf)
.map_err(|err| { .map_err(|err| {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
@ -334,7 +322,7 @@ where
})?; })?;
this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
match body.size() { match size {
BodySize::None | BodySize::Empty => this.state.set(State::None), BodySize::None | BodySize::Empty => this.state.set(State::None),
_ => this.state.set(State::SendPayload(body)), _ => this.state.set(State::SendPayload(body)),
}; };
@ -351,109 +339,111 @@ where
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<PollResponse, DispatchError> { ) -> Result<PollResponse, DispatchError> {
loop { 'res: loop {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
// state is not changed on Poll::Pending. match this.state.as_mut().project() {
// other variant and conditions always trigger a state change(or an error). // no future is in InnerDispatcher state. pop next message.
let state_change = match this.state.project() {
StateProj::None => match this.messages.pop_front() { StateProj::None => match this.messages.pop_front() {
// handle request message.
Some(DispatcherMessage::Item(req)) => { Some(DispatcherMessage::Item(req)) => {
self.as_mut().handle_request(req, cx)?; // Handle `EXPECT: 100-Continue` header
true if req.head().expect() {
// set InnerDispatcher state and continue loop to poll it.
let task = this.flow.expect.call(req);
this.state.set(State::ExpectCall(task));
} else {
// the same as expect call.
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
};
} }
// handle error message.
Some(DispatcherMessage::Error(res)) => { Some(DispatcherMessage::Error(res)) => {
// send_response would update InnerDispatcher state to SendPayload or
// None(If response body is empty).
// continue loop to poll it.
self.as_mut() self.as_mut()
.send_response(res, ResponseBody::Other(Body::Empty))?; .send_response(res, ResponseBody::Other(Body::Empty))?;
true
} }
// return with upgrade request and poll it exclusively.
Some(DispatcherMessage::Upgrade(req)) => { Some(DispatcherMessage::Upgrade(req)) => {
return Ok(PollResponse::Upgrade(req)); return Ok(PollResponse::Upgrade(req));
} }
None => false, // all messages are dealt with.
}, None => return Ok(PollResponse::DoNothing),
StateProj::ExpectCall(fut) => match fut.poll(cx) {
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
this = self.as_mut().project();
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall(fut));
continue;
}
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())?;
true
}
Poll::Pending => false,
}, },
StateProj::ServiceCall(fut) => match fut.poll(cx) { StateProj::ServiceCall(fut) => match fut.poll(cx) {
// service call resolved. send response.
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)?; self.as_mut().send_response(res, body)?;
continue;
} }
// send service call error as response
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())?; self.as_mut().send_response(res, body.into_body())?;
true
} }
Poll::Pending => false, // service call pending and could be waiting for more chunk messages.
// (pipeline message limit and/or payload can_read limit)
Poll::Pending => {
// no new message is decoded and no new payload is feed.
// nothing to do except waiting for new incoming data from client.
if !self.as_mut().poll_request(cx)? {
return Ok(PollResponse::DoNothing);
}
// otherwise keep loop.
}
}, },
StateProj::SendPayload(mut stream) => { StateProj::SendPayload(mut stream) => {
loop { // keep populate writer buffer until buffer size limit hit,
if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { // get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) { match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
this.codec.encode( this.codec.encode(
Message::Chunk(Some(item)), Message::Chunk(Some(item)),
&mut this.write_buf, &mut this.write_buf,
)?; )?;
continue;
} }
Poll::Ready(None) => { Poll::Ready(None) => {
this.codec.encode( this.codec
Message::Chunk(None), .encode(Message::Chunk(None), &mut this.write_buf)?;
&mut this.write_buf, // payload stream finished.
)?; // set state to None and handle next message
this = self.as_mut().project();
this.state.set(State::None); this.state.set(State::None);
continue 'res;
} }
Poll::Ready(Some(Err(_))) => { Poll::Ready(Some(Err(e))) => {
return Err(DispatchError::Unknown) return Err(DispatchError::Service(e))
} }
Poll::Pending => return Ok(PollResponse::DoNothing), Poll::Pending => return Ok(PollResponse::DoNothing),
} }
} else { }
// buffer is beyond max size.
// return and try to write the whole buffer to io stream.
return Ok(PollResponse::DrainWriteBuf); return Ok(PollResponse::DrainWriteBuf);
} }
break; StateProj::ExpectCall(fut) => match fut.poll(cx) {
// expect resolved. write continue to buffer and set InnerDispatcher state
// to service call.
Poll::Ready(Ok(req)) => {
this.write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall(fut));
} }
continue; // send expect error as response
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())?;
} }
}; // expect must be solved before progress can be made.
Poll::Pending => return Ok(PollResponse::DoNothing),
// state is changed and continue when the state is not Empty },
if state_change {
if !self.state.is_empty() {
continue;
}
} else {
// if read-backpressure is enabled and we consumed some data.
// we may read more data and retry
if self.state.is_call() {
if self.as_mut().poll_request(cx)? {
continue;
}
} else if !self.messages.is_empty() {
continue;
} }
} }
break;
}
Ok(PollResponse::DoNothing)
} }
fn handle_request( fn handle_request(