1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-10-05 09:42:08 +00:00

refactor Payload stream

This commit is contained in:
Nikolay Kim 2017-10-26 23:14:33 -07:00
parent 488fb256b4
commit fb0270e27d
9 changed files with 56 additions and 48 deletions

View file

@ -1,6 +1,7 @@
language: rust language: rust
rust: rust:
- 1.20.0
- stable - stable
- beta - beta
- nightly - nightly

View file

@ -9,6 +9,8 @@
* Refactor `HttpContext::write` * Refactor `HttpContext::write`
* Refactor `Payload` stream
* Re-use `BinaryBody` for `Frame::Payload` * Re-use `BinaryBody` for `Frame::Payload`
* Fix disconnection handling. * Fix disconnection handling.

View file

@ -9,6 +9,6 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "*" env_logger = "*"
actix = "0.3" #actix = "0.3"
#actix = { git = "https://github.com/actix/actix.git" } actix = { git = "https://github.com/actix/actix.git" }
actix-web = { path = "../../" } actix-web = { path = "../../" }

View file

@ -24,6 +24,6 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "0.3" #actix = "0.3"
#actix = { git = "https://github.com/actix/actix.git" } actix = { git = "https://github.com/actix/actix.git" }
actix-web = { path = "../../" } actix-web = { path = "../../" }

View file

@ -268,7 +268,6 @@ impl Future for UrlEncoded {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
return match self.pl.poll() { return match self.pl.poll() {
Err(_) => unreachable!(),
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
let mut m = HashMap::new(); let mut m = HashMap::new();
@ -277,16 +276,14 @@ impl Future for UrlEncoded {
} }
Ok(Async::Ready(m)) Ok(Async::Ready(m))
}, },
Ok(Async::Ready(Some(item))) => match item { Ok(Async::Ready(Some(item))) => {
Ok(bytes) => { self.body.extend(item.0);
self.body.extend(bytes);
continue continue
}, },
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
} }
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -512,18 +512,18 @@ impl InnerField {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.readany() { match payload.readany() {
Async::NotReady => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Async::Ready(None) => Ok(Async::Ready(None)), Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Async::Ready(Some(Ok(mut chunk))) => { Ok(Async::Ready(Some(mut chunk))) => {
let len = cmp::min(chunk.len() as u64, *size); let len = cmp::min(chunk.0.len() as u64, *size);
*size -= len; *size -= len;
let ch = chunk.split_to(len as usize); let ch = chunk.0.split_to(len as usize);
if !chunk.is_empty() { if !chunk.0.is_empty() {
payload.unread_data(chunk); payload.unread_data(chunk.0);
} }
Ok(Async::Ready(Some(ch))) Ok(Async::Ready(Some(ch)))
}, },
Async::Ready(Some(Err(err))) => Err(err.into()) Err(err) => Err(err.into())
} }
} }
} }

View file

@ -8,10 +8,17 @@ use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use actix::ResponseType;
const MAX_PAYLOAD_SIZE: usize = 65_536; // max buffer size 64k const MAX_PAYLOAD_SIZE: usize = 65_536; // max buffer size 64k
/// Just Bytes object /// Just Bytes object
pub type PayloadItem = Result<Bytes, PayloadError>; pub struct PayloadItem(pub Bytes);
impl ResponseType for PayloadItem {
type Item = ();
type Error = ();
}
#[derive(Debug)] #[derive(Debug)]
/// A set of error that can occur during payload parsing. /// A set of error that can occur during payload parsing.
@ -92,7 +99,7 @@ impl Payload {
/// Get first available chunk of data. /// Get first available chunk of data.
/// Returns Some(PayloadItem) as chunk, `None` indicates eof. /// Returns Some(PayloadItem) as chunk, `None` indicates eof.
pub fn readany(&mut self) -> Async<Option<PayloadItem>> { pub fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
self.inner.borrow_mut().readany() self.inner.borrow_mut().readany()
} }
@ -128,10 +135,10 @@ impl Payload {
impl Stream for Payload { impl Stream for Payload {
type Item = PayloadItem; type Item = PayloadItem;
type Error = (); type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<PayloadItem>, ()> { fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
Ok(self.readany()) self.readany()
} }
} }
@ -244,17 +251,17 @@ impl Inner {
self.len self.len
} }
fn readany(&mut self) -> Async<Option<PayloadItem>> { fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
Async::Ready(Some(Ok(data))) Ok(Async::Ready(Some(PayloadItem(data))))
} else if self.eof { } else if self.eof {
Async::Ready(None) Ok(Async::Ready(None))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
Async::Ready(Some(Err(err))) Err(err)
} else { } else {
self.task = Some(current_task()); self.task = Some(current_task());
Async::NotReady Ok(Async::NotReady)
} }
} }
@ -391,7 +398,7 @@ mod tests {
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
match payload.readany() { match payload.readany() {
Async::NotReady => (), Ok(Async::NotReady) => (),
_ => panic!("error"), _ => panic!("error"),
} }
@ -406,7 +413,7 @@ mod tests {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
match payload.readany() { match payload.readany() {
Async::NotReady => (), Ok(Async::NotReady) => (),
_ => panic!("error"), _ => panic!("error"),
} }
@ -418,7 +425,7 @@ mod tests {
assert!(!payload.eof()); assert!(!payload.eof());
match payload.readany() { match payload.readany() {
Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "data"), Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"),
_ => panic!("error"), _ => panic!("error"),
} }
assert!(payload.is_empty()); assert!(payload.is_empty());
@ -426,7 +433,7 @@ mod tests {
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
match payload.readany() { match payload.readany() {
Async::Ready(None) => (), Ok(Async::Ready(None)) => (),
_ => panic!("error"), _ => panic!("error"),
} }
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
@ -440,13 +447,13 @@ mod tests {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
match payload.readany() { match payload.readany() {
Async::NotReady => (), Ok(Async::NotReady) => (),
_ => panic!("error"), _ => panic!("error"),
} }
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
match payload.readany() { match payload.readany() {
Async::Ready(Some(data)) => assert!(data.is_err()), Err(_) => (),
_ => panic!("error"), _ => panic!("error"),
} }
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
@ -469,7 +476,7 @@ mod tests {
assert_eq!(payload.len(), 10); assert_eq!(payload.len(), 10);
match payload.readany() { match payload.readany() {
Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "line1"), Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "line1"),
_ => panic!("error"), _ => panic!("error"),
} }
assert!(!payload.is_empty()); assert!(!payload.is_empty());
@ -587,7 +594,7 @@ mod tests {
assert_eq!(payload.len(), 4); assert_eq!(payload.len(), 4);
match payload.readany() { match payload.readany() {
Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "data"), Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"),
_ => panic!("error"), _ => panic!("error"),
} }

View file

@ -86,7 +86,7 @@ impl<H: HttpHandler> HttpServer<TcpStream, net::SocketAddr, H> {
if let Ok(iter) = addr.to_socket_addrs() { if let Ok(iter) = addr.to_socket_addrs() {
for addr in iter { for addr in iter {
match TcpListener::bind(&addr, Arbiter::handle()) { match TcpListener::bind(&addr, Arbiter::handle()) {
Ok(tcp) => addrs.push(tcp), Ok(tcp) => addrs.push((addr, tcp)),
Err(e) => err = Some(e), Err(e) => err = Some(e),
} }
} }
@ -99,7 +99,8 @@ impl<H: HttpHandler> HttpServer<TcpStream, net::SocketAddr, H> {
} }
} else { } else {
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
for tcp in addrs { for (addr, tcp) in addrs {
info!("Starting http server on {}", addr);
ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a))); ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a)));
} }
self self

View file

@ -196,19 +196,19 @@ impl Stream for WsStream {
if !self.closed { if !self.closed {
loop { loop {
match self.rx.readany() { match self.rx.readany() {
Async::Ready(Some(Ok(chunk))) => { Ok(Async::Ready(Some(chunk))) => {
self.buf.extend(chunk) self.buf.extend(chunk.0)
} }
Async::Ready(Some(Err(_))) => { Ok(Async::Ready(None)) => {
self.closed = true;
break;
}
Async::Ready(None) => {
done = true; done = true;
self.closed = true; self.closed = true;
break; break;
} }
Async::NotReady => break, Ok(Async::NotReady) => break,
Err(_) => {
self.closed = true;
break;
}
} }
} }
} }