1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-13 02:39:32 +00:00
actix-web/actix-http/src/h1/payload.rs

286 lines
6.9 KiB
Rust
Raw Normal View History

2017-12-19 08:18:57 +00:00
//! Payload stream
use std::{
cell::RefCell,
collections::VecDeque,
pin::Pin,
rc::{Rc, Weak},
task::{Context, Poll, Waker},
};
2017-10-09 03:16:48 +00:00
2019-04-03 10:20:20 +00:00
use bytes::Bytes;
2019-12-13 05:24:57 +00:00
use futures_core::Stream;
2019-03-27 04:54:57 +00:00
2018-12-06 22:32:52 +00:00
use crate::error::PayloadError;
2017-10-27 06:14:33 +00:00
/// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)]
pub enum PayloadStatus {
Read,
Pause,
Dropped,
}
2017-12-19 08:29:25 +00:00
/// Buffered stream of bytes chunks
2017-10-09 03:16:48 +00:00
///
/// Payload stores chunks in a vector. First chunk can be received with `poll_next`. Payload does
/// not notify current task when new data is available.
2017-12-19 08:29:25 +00:00
///
/// Payload can be used as `Response` body stream.
2017-10-19 06:43:50 +00:00
#[derive(Debug)]
2017-10-09 03:16:48 +00:00
pub struct Payload {
inner: Rc<RefCell<Inner>>,
}
impl Payload {
/// Creates a payload stream.
2017-12-19 05:58:38 +00:00
///
/// This method construct two objects responsible for bytes stream generation:
/// - `PayloadSender` - *Sender* side of the stream
/// - `Payload` - *Receiver* side of the stream
2019-01-29 18:14:00 +00:00
pub fn create(eof: bool) -> (PayloadSender, Payload) {
2017-10-09 03:16:48 +00:00
let shared = Rc::new(RefCell::new(Inner::new(eof)));
2018-04-13 23:02:01 +00:00
(
PayloadSender::new(Rc::downgrade(&shared)),
2018-04-29 16:09:08 +00:00
Payload { inner: shared },
2018-04-13 23:02:01 +00:00
)
2017-10-09 03:16:48 +00:00
}
/// Creates an empty payload.
pub(crate) fn empty() -> Payload {
2018-04-13 23:02:01 +00:00
Payload {
inner: Rc::new(RefCell::new(Inner::new(true))),
}
2017-11-27 03:00:57 +00:00
}
2017-10-09 03:16:48 +00:00
/// Length of the data in this payload
2018-06-25 04:58:04 +00:00
#[cfg(test)]
2017-10-09 03:16:48 +00:00
pub fn len(&self) -> usize {
self.inner.borrow().len()
}
/// Is payload empty
2018-06-25 04:58:04 +00:00
#[cfg(test)]
2017-10-09 03:16:48 +00:00
pub fn is_empty(&self) -> bool {
self.inner.borrow().len() == 0
}
/// Put unused data back to payload
2018-01-11 05:02:28 +00:00
#[inline]
2017-10-13 23:33:23 +00:00
pub fn unread_data(&mut self, data: Bytes) {
2017-10-09 03:16:48 +00:00
self.inner.borrow_mut().unread_data(data);
}
}
impl Stream for Payload {
type Item = Result<Bytes, PayloadError>;
2017-10-09 03:16:48 +00:00
fn poll_next(
self: Pin<&mut Self>,
2019-12-07 18:46:51 +00:00
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
Pin::new(&mut *self.inner.borrow_mut()).poll_next(cx)
2017-12-19 08:18:57 +00:00
}
}
2017-12-19 05:58:38 +00:00
/// Sender part of the payload stream
pub struct PayloadSender {
2017-10-09 03:16:48 +00:00
inner: Weak<RefCell<Inner>>,
}
2019-04-07 17:29:26 +00:00
impl PayloadSender {
fn new(inner: Weak<RefCell<Inner>>) -> Self {
Self { inner }
}
2018-03-09 01:19:50 +00:00
#[inline]
2019-04-07 17:29:26 +00:00
pub fn set_error(&mut self, err: PayloadError) {
2017-10-13 23:33:23 +00:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
}
}
2018-03-09 01:19:50 +00:00
#[inline]
2019-04-07 17:29:26 +00:00
pub fn feed_eof(&mut self) {
2017-10-09 03:16:48 +00:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
2018-03-09 01:19:50 +00:00
#[inline]
2019-04-07 17:29:26 +00:00
pub fn feed_data(&mut self, data: Bytes) {
2017-10-09 03:16:48 +00:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
}
}
2018-02-10 00:20:10 +00:00
#[inline]
2019-12-07 18:46:51 +00:00
pub fn need_read(&self, cx: &mut Context<'_>) -> PayloadStatus {
// we check need_read only if Payload (other side) is alive,
// otherwise always return true (consume payload)
2017-11-06 09:24:49 +00:00
if let Some(shared) = self.inner.upgrade() {
if shared.borrow().need_read {
PayloadStatus::Read
} else {
shared.borrow_mut().register_io(cx);
PayloadStatus::Pause
}
2017-11-06 09:24:49 +00:00
} else {
PayloadStatus::Dropped
2017-11-06 09:24:49 +00:00
}
}
}
2017-10-19 06:43:50 +00:00
#[derive(Debug)]
2017-10-09 03:16:48 +00:00
struct Inner {
len: usize,
eof: bool,
2017-10-13 23:33:23 +00:00
err: Option<PayloadError>,
2018-02-27 04:07:22 +00:00
need_read: bool,
2017-10-09 03:16:48 +00:00
items: VecDeque<Bytes>,
task: Option<Waker>,
io_task: Option<Waker>,
2017-10-09 03:16:48 +00:00
}
impl Inner {
fn new(eof: bool) -> Self {
Inner {
2018-02-26 22:33:56 +00:00
eof,
2017-10-09 03:16:48 +00:00
len: 0,
2017-10-13 23:33:23 +00:00
err: None,
2017-10-09 03:16:48 +00:00
items: VecDeque::new(),
2018-03-03 04:47:23 +00:00
need_read: true,
task: None,
io_task: None,
}
}
/// Wake up future waiting for payload data to be available.
fn wake(&mut self) {
if let Some(waker) = self.task.take() {
waker.wake();
}
}
/// Wake up future feeding data to Payload.
fn wake_io(&mut self) {
if let Some(waker) = self.io_task.take() {
waker.wake();
}
}
/// Register future waiting data from payload.
/// Waker would be used in `Inner::wake`
fn register(&mut self, cx: &mut Context<'_>) {
if self
.task
.as_ref()
.map_or(true, |w| !cx.waker().will_wake(w))
{
self.task = Some(cx.waker().clone());
}
}
// Register future feeding data to payload.
/// Waker would be used in `Inner::wake_io`
fn register_io(&mut self, cx: &mut Context<'_>) {
if self
.io_task
.as_ref()
.map_or(true, |w| !cx.waker().will_wake(w))
{
self.io_task = Some(cx.waker().clone());
2017-10-09 03:16:48 +00:00
}
}
2018-03-09 01:19:50 +00:00
#[inline]
2017-10-13 23:33:23 +00:00
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
}
2018-03-09 01:19:50 +00:00
#[inline]
2017-10-09 03:16:48 +00:00
fn feed_eof(&mut self) {
self.eof = true;
}
2018-03-09 01:19:50 +00:00
#[inline]
2017-10-09 03:16:48 +00:00
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_back(data);
2019-04-07 17:40:45 +00:00
self.need_read = self.len < MAX_BUFFER_SIZE;
self.wake();
2017-10-09 03:16:48 +00:00
}
2018-06-25 04:58:04 +00:00
#[cfg(test)]
2017-10-09 03:16:48 +00:00
fn len(&self) -> usize {
self.len
}
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
2017-10-09 03:16:48 +00:00
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
2019-04-07 17:40:45 +00:00
self.need_read = self.len < MAX_BUFFER_SIZE;
2019-03-27 04:54:57 +00:00
if self.need_read && !self.eof {
self.register(cx);
}
self.wake_io();
Poll::Ready(Some(Ok(data)))
2017-10-13 23:33:23 +00:00
} else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err)))
2018-02-10 00:20:10 +00:00
} else if self.eof {
Poll::Ready(None)
2017-10-09 03:16:48 +00:00
} else {
2018-02-27 04:07:22 +00:00
self.need_read = true;
self.register(cx);
self.wake_io();
Poll::Pending
2017-10-09 03:16:48 +00:00
}
}
2017-11-04 16:07:44 +00:00
fn unread_data(&mut self, data: Bytes) {
2017-10-09 03:16:48 +00:00
self.len += data.len();
2017-12-21 04:30:54 +00:00
self.items.push_front(data);
2017-10-09 03:16:48 +00:00
}
}
2017-10-23 02:58:50 +00:00
#[cfg(test)]
2017-10-23 04:40:41 +00:00
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};
2021-04-01 14:26:13 +00:00
use actix_utils::future::poll_fn;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(Payload: Unpin);
assert_not_impl_any!(Payload: Send, Sync, UnwindSafe, RefUnwindSafe);
assert_impl_all!(Inner: Unpin, Send, Sync);
2022-04-23 11:31:32 +00:00
// assertion not stable wrt rustc versions yet
// assert_impl_all!(Inner: UnwindSafe, RefUnwindSafe);
2017-10-23 02:58:50 +00:00
2019-11-26 05:25:50 +00:00
#[actix_rt::test]
async fn test_unread_data() {
let (_, mut payload) = Payload::create(false);
2019-11-19 12:54:19 +00:00
2019-11-26 05:25:50 +00:00
payload.unread_data(Bytes::from("data"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 4);
2019-11-19 12:54:19 +00:00
2019-11-26 05:25:50 +00:00
assert_eq!(
Bytes::from("data"),
poll_fn(|cx| Pin::new(&mut payload).poll_next(cx))
.await
.unwrap()
.unwrap()
2019-11-26 05:25:50 +00:00
);
2017-10-23 03:19:20 +00:00
}
2017-10-23 02:58:50 +00:00
}