1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-09-30 15:22:02 +00:00

return poll in poll_flush (#2005)

This commit is contained in:
fakeshadow 2021-02-17 03:18:31 -08:00 committed by GitHub
parent 2cc6b47fcf
commit dfa795ff9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,6 +13,7 @@ use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_service::Service; use actix_service::Service;
use bitflags::bitflags; use bitflags::bitflags;
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use futures_core::ready;
use log::{error, trace}; use log::{error, trace};
use pin_project::pin_project; use pin_project::pin_project;
@ -233,14 +234,10 @@ where
} }
} }
/// Flush stream
///
/// true - got WouldBlock
/// false - didn't get WouldBlock
fn poll_flush( fn poll_flush(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, DispatchError> { ) -> Poll<Result<(), io::Error>> {
let InnerDispatcherProj { io, write_buf, .. } = self.project(); let InnerDispatcherProj { io, write_buf, .. } = self.project();
let mut io = Pin::new(io.as_mut().unwrap()); let mut io = Pin::new(io.as_mut().unwrap());
@ -248,19 +245,18 @@ where
let mut written = 0; let mut written = 0;
while written < len { while written < len {
match io.as_mut().poll_write(cx, &write_buf[written..]) { match io.as_mut().poll_write(cx, &write_buf[written..])? {
Poll::Ready(Ok(0)) => { Poll::Ready(0) => {
return Err(DispatchError::Io(io::Error::new( return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero, io::ErrorKind::WriteZero,
"", "",
))) )))
} }
Poll::Ready(Ok(n)) => written += n, Poll::Ready(n) => written += n,
Poll::Pending => { Poll::Pending => {
write_buf.advance(written); write_buf.advance(written);
return Ok(true); return Poll::Pending;
} }
Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)),
} }
} }
@ -268,9 +264,7 @@ where
write_buf.clear(); write_buf.clear();
// flush the io and check if get blocked. // flush the io and check if get blocked.
let blocked = io.poll_flush(cx)?.is_pending(); io.poll_flush(cx)
Ok(blocked)
} }
fn send_response( fn send_response(
@ -841,15 +835,12 @@ where
if inner.flags.contains(Flags::WRITE_DISCONNECT) { if inner.flags.contains(Flags::WRITE_DISCONNECT) {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {
// flush buffer and wait on block. // flush buffer and wait on blocked.
if inner.as_mut().poll_flush(cx)? { ready!(inner.as_mut().poll_flush(cx))?;
Poll::Pending
} else {
Pin::new(inner.project().io.as_mut().unwrap()) Pin::new(inner.project().io.as_mut().unwrap())
.poll_shutdown(cx) .poll_shutdown(cx)
.map_err(DispatchError::from) .map_err(DispatchError::from)
} }
}
} else { } else {
// read from io stream and fill read buffer. // read from io stream and fill read buffer.
let should_disconnect = inner.as_mut().read_available(cx)?; let should_disconnect = inner.as_mut().read_available(cx)?;
@ -888,7 +879,7 @@ where
// //
// TODO: what? is WouldBlock good or bad? // TODO: what? is WouldBlock good or bad?
// want to find a reference for this macOS behavior // want to find a reference for this macOS behavior
if inner.as_mut().poll_flush(cx)? || !drain { if inner.as_mut().poll_flush(cx)?.is_pending() || !drain {
break; break;
} }
} }