1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-05 06:48:44 +00:00

remove localwaker from h1::payload (#2051)

This commit is contained in:
fakeshadow 2021-03-07 13:23:42 -08:00 committed by GitHub
parent ca69b6577e
commit fe0b3f459f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -3,9 +3,8 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::task::{Context, Poll}; use std::task::{Context, Poll, Waker};
use actix_utils::task::LocalWaker;
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
@ -134,7 +133,7 @@ impl PayloadSender {
if shared.borrow().need_read { if shared.borrow().need_read {
PayloadStatus::Read PayloadStatus::Read
} else { } else {
shared.borrow_mut().io_task.register(cx.waker()); shared.borrow_mut().register_io(cx);
PayloadStatus::Pause PayloadStatus::Pause
} }
} else { } else {
@ -150,8 +149,8 @@ struct Inner {
err: Option<PayloadError>, err: Option<PayloadError>,
need_read: bool, need_read: bool,
items: VecDeque<Bytes>, items: VecDeque<Bytes>,
task: LocalWaker, task: Option<Waker>,
io_task: LocalWaker, io_task: Option<Waker>,
} }
impl Inner { impl Inner {
@ -162,8 +161,48 @@ impl Inner {
err: None, err: None,
items: VecDeque::new(), items: VecDeque::new(),
need_read: true, need_read: true,
task: LocalWaker::new(), task: None,
io_task: LocalWaker::new(), io_task: None,
}
}
/// Wake up future waiting for payload data to be available.
fn wake(&mut self) {
if let Some(waker) = self.task.take() {
waker.wake();
}
}
/// Wake up future feeding data to Payload.
fn wake_io(&mut self) {
if let Some(waker) = self.io_task.take() {
waker.wake();
}
}
/// Register future waiting data from payload.
/// Waker would be used in `Inner::wake`
fn register(&mut self, cx: &mut Context<'_>) {
if self
.task
.as_ref()
.map(|w| !cx.waker().will_wake(w))
.unwrap_or(true)
{
self.task = Some(cx.waker().clone());
}
}
// Register future feeding data to payload.
/// Waker would be used in `Inner::wake_io`
fn register_io(&mut self, cx: &mut Context<'_>) {
if self
.io_task
.as_ref()
.map(|w| !cx.waker().will_wake(w))
.unwrap_or(true)
{
self.io_task = Some(cx.waker().clone());
} }
} }
@ -182,7 +221,7 @@ impl Inner {
self.len += data.len(); self.len += data.len();
self.items.push_back(data); self.items.push_back(data);
self.need_read = self.len < MAX_BUFFER_SIZE; self.need_read = self.len < MAX_BUFFER_SIZE;
self.task.wake(); self.wake();
} }
#[cfg(test)] #[cfg(test)]
@ -199,9 +238,9 @@ impl Inner {
self.need_read = self.len < MAX_BUFFER_SIZE; self.need_read = self.len < MAX_BUFFER_SIZE;
if self.need_read && !self.eof { if self.need_read && !self.eof {
self.task.register(cx.waker()); self.register(cx);
} }
self.io_task.wake(); self.wake_io();
Poll::Ready(Some(Ok(data))) Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err))) Poll::Ready(Some(Err(err)))
@ -209,8 +248,8 @@ impl Inner {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
self.need_read = true; self.need_read = true;
self.task.register(cx.waker()); self.register(cx);
self.io_task.wake(); self.wake_io();
Poll::Pending Poll::Pending
} }
} }