1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-20 08:31:09 +00:00

reuse PayloadBuffer from actix-http

This commit is contained in:
Nikolay Kim 2019-03-28 05:34:33 -07:00
parent 6e0fe7db2d
commit e84c95968f
3 changed files with 12 additions and 297 deletions

View file

@ -254,7 +254,10 @@ impl From<httparse::Error> for ParseError {
/// A set of errors that can occur during payload parsing
pub enum PayloadError {
/// A payload reached EOF, but is not complete.
#[display(fmt = "A payload reached EOF, but is not complete.")]
#[display(
fmt = "A payload reached EOF, but is not complete. With error: {:?}",
_0
)]
Incomplete(Option<io::Error>),
/// Content encoding stream corruption
#[display(fmt = "Can not decode content-encoding.")]
@ -909,13 +912,12 @@ mod tests {
fn test_payload_error() {
let err: PayloadError =
io::Error::new(io::ErrorKind::Other, "ParseError").into();
assert_eq!(format!("{}", err), "ParseError");
assert_eq!(format!("{}", err.cause().unwrap()), "ParseError");
assert!(format!("{}", err).contains("ParseError"));
let err = PayloadError::Incomplete;
let err = PayloadError::Incomplete(None);
assert_eq!(
format!("{}", err),
"A payload reached EOF, but is not complete."
"A payload reached EOF, but is not complete. With error: None"
);
}

View file

@ -502,15 +502,6 @@ mod tests {
use actix_rt::Runtime;
use futures::future::{lazy, result};
#[test]
fn test_error() {
let err = PayloadError::Incomplete(None);
assert_eq!(
format!("{}", err),
"A payload reached EOF, but is not complete."
);
}
#[test]
fn test_basic() {
Runtime::new()

View file

@ -1,11 +1,10 @@
//! Multipart payload support
use std::cell::{RefCell, UnsafeCell};
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::Rc;
use std::{cmp, fmt};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::task::{current as current_task, Task};
use futures::{Async, Poll, Stream};
use httparse;
@ -22,6 +21,9 @@ use crate::HttpMessage;
const MAX_HEADERS: usize = 32;
type PayloadBuffer =
actix_http::h1::PayloadBuffer<Box<dyn Stream<Item = Bytes, Error = PayloadError>>>;
/// The server-side implementation of `multipart/form-data` requests.
///
/// This will parse the incoming stream into `MultipartItem` instances via its
@ -125,7 +127,7 @@ impl Multipart {
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart {
boundary,
payload: PayloadRef::new(PayloadBuffer::new(stream)),
payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))),
@ -712,157 +714,6 @@ impl Drop for Safety {
}
}
/// Payload buffer
pub struct PayloadBuffer {
len: usize,
items: VecDeque<Bytes>,
stream: Box<dyn Stream<Item = Bytes, Error = PayloadError>>,
}
impl PayloadBuffer {
/// Create new `PayloadBuffer` instance
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
PayloadBuffer {
len: 0,
items: VecDeque::new(),
stream: Box::new(stream),
}
}
#[inline]
fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| match res {
Async::Ready(Some(data)) => {
self.len += data.len();
self.items.push_back(data);
Async::Ready(true)
}
Async::Ready(None) => Async::Ready(false),
Async::NotReady => Async::NotReady,
})
}
/// Read first available chunk of bytes
#[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Ok(Async::Ready(Some(data)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.readany(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read exact number of bytes
#[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
self.len -= size;
let mut chunk = self.items.pop_front().unwrap();
if size < chunk.len() {
let buf = chunk.split_to(size);
self.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
} else if size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
} else {
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&chunk);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.poll_stream()? {
Async::Ready(true) => self.read_exact(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
let mut found = false;
let mut length = 0;
for no in 0..self.items.len() {
{
let chunk = &self.items[no];
for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] {
idx += 1;
if idx == line.len() {
num = no;
offset = pos + 1;
length += pos + 1;
found = true;
break;
}
} else {
idx = 0
}
}
if !found {
length += chunk.len()
}
}
if found {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend_from_slice(&self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
}
self.len -= length;
return Ok(Async::Ready(Some(buf.freeze())));
}
}
match self.poll_stream()? {
Async::Ready(true) => self.read_until(line),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.read_until(b"\n")
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
@ -1005,133 +856,4 @@ mod tests {
}
});
}
#[test]
fn test_basic() {
run_on(|| {
let (_sender, payload) = create_stream();
{
let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.len, 0);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
}
});
}
#[test]
fn test_eof() {
run_on(|| {
let (sender, payload) = create_stream();
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.unbounded_send(Ok(Bytes::from("data"))).unwrap();
drop(sender);
assert_eq!(
Async::Ready(Some(Bytes::from("data"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
assert_eq!(Async::Ready(None), payload.readany().ok().unwrap());
});
}
#[test]
fn test_err() {
run_on(|| {
let (sender, payload) = create_stream();
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender
.unbounded_send(Err(PayloadError::Incomplete(None)))
.unwrap();
payload.readany().err().unwrap();
});
}
#[test]
fn test_readany() {
run_on(|| {
let (sender, payload) = create_stream();
let mut payload = PayloadBuffer::new(payload);
sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap();
sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap();
assert_eq!(
Async::Ready(Some(Bytes::from("line1"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
assert_eq!(
Async::Ready(Some(Bytes::from("line2"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
});
}
#[test]
fn test_readexactly() {
run_on(|| {
let (sender, payload) = create_stream();
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap());
sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap();
sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap();
assert_eq!(
Async::Ready(Some(Bytes::from_static(b"li"))),
payload.read_exact(2).ok().unwrap()
);
assert_eq!(payload.len, 3);
assert_eq!(
Async::Ready(Some(Bytes::from_static(b"ne1l"))),
payload.read_exact(4).ok().unwrap()
);
assert_eq!(payload.len, 4);
sender
.unbounded_send(Err(PayloadError::Incomplete(None)))
.unwrap();
payload.read_exact(10).err().unwrap();
});
}
#[test]
fn test_readuntil() {
run_on(|| {
let (sender, payload) = create_stream();
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap());
sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap();
sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap();
assert_eq!(
Async::Ready(Some(Bytes::from("line"))),
payload.read_until(b"ne").ok().unwrap()
);
assert_eq!(payload.len, 1);
assert_eq!(
Async::Ready(Some(Bytes::from("1line2"))),
payload.read_until(b"2").ok().unwrap()
);
assert_eq!(payload.len, 0);
sender
.unbounded_send(Err(PayloadError::Incomplete(None)))
.unwrap();
payload.read_until(b"b").err().unwrap();
});
}
}