From e2dc775e2105d88d1776ac2bfa909dacbea94231 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 8 Oct 2017 20:16:48 -0700 Subject: [PATCH] refactor payload --- README.md | 3 +- src/application.rs | 5 +- src/httpcodes.rs | 6 +- src/lib.rs | 4 +- src/main.rs | 37 ++++------ src/payload.rs | 180 +++++++++++++++++++++++++++++++++++++++++++++ src/reader.rs | 66 +++++------------ src/resource.rs | 5 +- src/route.rs | 37 +--------- src/router.rs | 7 +- src/task.rs | 5 +- src/ws.rs | 55 ++++++-------- 12 files changed, 257 insertions(+), 153 deletions(-) create mode 100644 src/payload.rs diff --git a/README.md b/README.md index 23d8203c5..dae6b4f43 100644 --- a/README.md +++ b/README.md @@ -48,8 +48,7 @@ impl Actor for MyRoute { impl Route for MyRoute { type State = (); - fn request(req: HttpRequest, payload: Option, - ctx: &mut HttpContext) -> Reply + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { Reply::with(req, httpcodes::HTTPOk) } diff --git a/src/application.rs b/src/application.rs index 61628ec30..797be7ccc 100644 --- a/src/application.rs +++ b/src/application.rs @@ -5,9 +5,10 @@ use std::collections::HashMap; use route_recognizer::Router; use task::Task; -use route::{Payload, RouteHandler}; +use route::RouteHandler; use router::Handler; use resource::Resource; +use payload::Payload; use httpmessage::HttpRequest; @@ -93,7 +94,7 @@ struct InnerApplication { impl Handler for InnerApplication { - fn handle(&self, req: HttpRequest, payload: Option) -> Task { + fn handle(&self, req: HttpRequest, payload: Payload) -> Task { if let Ok(h) = self.router.recognize(req.path()) { h.handler.handle(req.with_params(h.params), payload, Rc::clone(&self.state)) } else { diff --git a/src/httpcodes.rs b/src/httpcodes.rs index e3b332180..b7b753405 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -4,7 +4,8 @@ use std::rc::Rc; use http::StatusCode; use task::Task; -use route::{Payload, RouteHandler}; +use route::RouteHandler; +use payload::Payload; use httpmessage::{Body, HttpRequest, HttpResponse, IntoHttpResponse}; pub const HTTPOk: StaticResponse = StaticResponse(StatusCode::OK); @@ -25,8 +26,7 @@ impl StaticResponse { } impl RouteHandler for StaticResponse { - fn handle(&self, req: HttpRequest, _: Option, _: Rc) -> Task - { + fn handle(&self, req: HttpRequest, _: Payload, _: Rc) -> Task { Task::reply(HttpResponse::new(req, self.0, Body::Empty)) } } diff --git a/src/lib.rs b/src/lib.rs index 9222bfcac..f48ea5351 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ mod error; mod date; mod decode; mod httpmessage; +mod payload; mod resource; mod route; mod router; @@ -40,9 +41,10 @@ mod wsproto; pub mod httpcodes; pub use application::Application; pub use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse}; +pub use payload::{Payload, PayloadItem}; pub use router::RoutingMap; pub use resource::{Reply, Resource}; -pub use route::{Route, RouteFactory, RouteHandler, Payload, PayloadItem}; +pub use route::{Route, RouteFactory, RouteHandler}; pub use server::HttpServer; pub use context::HttpContext; pub use route_recognizer::Params; diff --git a/src/main.rs b/src/main.rs index 417cf9e45..24e6d2a52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,12 +19,9 @@ impl Actor for MyRoute { impl Route for MyRoute { type State = (); - fn request(req: HttpRequest, - payload: Option, - ctx: &mut HttpContext) -> Reply - { - if let Some(pl) = payload { - ctx.add_stream(pl); + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { + if !payload.eof() { + ctx.add_stream(payload); Reply::stream(MyRoute{req: Some(req)}) } else { Reply::with(req, httpcodes::HTTPOk) @@ -37,7 +34,7 @@ impl ResponseType for MyRoute { type Error = (); } -impl StreamHandler for MyRoute {} +impl StreamHandler for MyRoute {} impl Handler for MyRoute { fn handle(&mut self, msg: PayloadItem, ctx: &mut HttpContext) @@ -48,7 +45,6 @@ impl Handler for MyRoute { ctx.start(httpcodes::HTTPOk.response(req)); ctx.write_eof(); } - Self::empty() } } @@ -62,22 +58,15 @@ impl Actor for MyWS { impl Route for MyWS { type State = (); - fn request(req: HttpRequest, - payload: Option, - ctx: &mut HttpContext) -> Reply - { - if let Some(payload) = payload { - match ws::handshake(req) { - Ok(resp) => { - ctx.start(resp); - ctx.add_stream(ws::WsStream::new(payload)); - Reply::stream(MyWS{}) - }, - Err(err) => - Reply::reply(err) - } - } else { - Reply::with(req, httpcodes::HTTPBadRequest) + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { + match ws::handshake(req) { + Ok(resp) => { + ctx.start(resp); + ctx.add_stream(ws::WsStream::new(payload)); + Reply::stream(MyWS{}) + }, + Err(err) => + Reply::reply(err) } } } diff --git a/src/payload.rs b/src/payload.rs new file mode 100644 index 000000000..3b54ba3d6 --- /dev/null +++ b/src/payload.rs @@ -0,0 +1,180 @@ +use std::rc::{Rc, Weak}; +use std::cell::RefCell; +use std::collections::VecDeque; +use bytes::Bytes; +use futures::{Async, Poll, Stream}; +use futures::task::{Task, current as current_task}; + +pub type PayloadItem = Bytes; + +const MAX_PAYLOAD_SIZE: usize = 65_536; // max buffer size 64k + + +/// Stream of byte chunks +/// +/// Payload stores chunks in vector. First chunk can be received with `.readany()` method. +pub struct Payload { + inner: Rc>, +} + +impl Payload { + + pub(crate) fn new(eof: bool) -> (PayloadSender, Payload) { + let shared = Rc::new(RefCell::new(Inner::new(eof))); + + (PayloadSender{inner: Rc::downgrade(&shared)}, + Payload{inner: shared}) + } + + /// Indicates paused state of the payload. If payload data is not consumed + /// it get paused. Max size of not consumed data is 64k + pub fn paused(&self) -> bool { + self.inner.borrow().paused() + } + + /// Indicates EOF of payload + pub fn eof(&self) -> bool { + self.inner.borrow().eof() + } + + /// Length of the data in this payload + pub fn len(&self) -> usize { + self.inner.borrow().len() + } + + /// Is payload empty + pub fn is_empty(&self) -> bool { + self.inner.borrow().len() == 0 + } + + /// Get any chunk of data + pub fn readany(&mut self) -> Async> { + self.inner.borrow_mut().readany() + } + + /// Put unused data back to payload + pub fn unread_data(&mut self, data: PayloadItem) { + self.inner.borrow_mut().unread_data(data); + } +} + + +impl Stream for Payload { + type Item = PayloadItem; + type Error = (); + + fn poll(&mut self) -> Poll, ()> { + Ok(self.readany()) + } +} + +pub(crate) struct PayloadSender { + inner: Weak>, +} + +impl PayloadSender { + pub(crate) fn feed_eof(&mut self) { + if let Some(shared) = self.inner.upgrade() { + shared.borrow_mut().feed_eof() + } + } + + pub(crate) fn feed_data(&mut self, data: Bytes) { + if let Some(shared) = self.inner.upgrade() { + shared.borrow_mut().feed_data(data) + } + } + + pub(crate) fn maybe_paused(&self) -> bool { + match self.inner.upgrade() { + Some(shared) => { + let inner = shared.borrow(); + if inner.paused() && inner.len() < MAX_PAYLOAD_SIZE { + drop(inner); + shared.borrow_mut().resume(); + false + } else if !inner.paused() && inner.len() > MAX_PAYLOAD_SIZE { + drop(inner); + shared.borrow_mut().pause(); + true + } else { + inner.paused() + } + } + None => false, + } + } +} + +struct Inner { + len: usize, + eof: bool, + paused: bool, + task: Option, + items: VecDeque, +} + +impl Inner { + + fn new(eof: bool) -> Self { + Inner { + len: 0, + eof: eof, + paused: false, + task: None, + items: VecDeque::new(), + } + } + + fn paused(&self) -> bool { + self.paused + } + + fn pause(&mut self) { + self.paused = true; + } + + fn resume(&mut self) { + self.paused = false; + } + + fn feed_eof(&mut self) { + self.eof = true; + if let Some(task) = self.task.take() { + task.notify() + } + } + + fn feed_data(&mut self, data: Bytes) { + self.len += data.len(); + self.items.push_back(data); + if let Some(task) = self.task.take() { + task.notify() + } + } + + fn eof(&self) -> bool { + self.eof + } + + fn len(&self) -> usize { + self.len + } + + fn readany(&mut self) -> Async> { + if let Some(data) = self.items.pop_front() { + self.len -= data.len(); + Async::Ready(Some(data)) + } else if self.eof { + Async::Ready(None) + } else { + self.task = Some(current_task()); + Async::NotReady + } + } + + pub fn unread_data(&mut self, data: Bytes) { + self.len += data.len(); + self.items.push_front(data) + } +} diff --git a/src/reader.rs b/src/reader.rs index f79a9ca7b..9fb5e457b 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -3,26 +3,23 @@ use std::{self, fmt, io, ptr}; use httparse; use http::{Method, Version, Uri, HttpTryFrom}; use bytes::{Bytes, BytesMut, BufMut}; -use futures::{Async, AsyncSink, Poll, Sink}; -use futures::unsync::mpsc::{channel, Sender}; +use futures::{Async, Poll}; use tokio_io::AsyncRead; use hyper::header::{Headers, ContentLength}; -use {Payload, PayloadItem}; use error::{Error, Result}; use decode::Decoder; +use payload::{Payload, PayloadSender}; use httpmessage::{Message, HttpRequest}; - const MAX_HEADERS: usize = 100; const INIT_BUFFER_SIZE: usize = 8192; -pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; +const MAX_BUFFER_SIZE: usize = 131_072; struct PayloadInfo { - tx: Sender, + tx: PayloadSender, decoder: Decoder, - tmp_item: Option, } pub struct Reader { @@ -61,48 +58,17 @@ impl Reader { fn decode(&mut self) -> std::result::Result { if let Some(ref mut payload) = self.payload { + if payload.tx.maybe_paused() { + return Ok(Decoding::Paused) + } loop { - if let Some(item) = payload.tmp_item.take() { - let eof = item.is_eof(); - - match payload.tx.start_send(item) { - Ok(AsyncSink::NotReady(item)) => { - payload.tmp_item = Some(item); - return Ok(Decoding::Paused) - } - Ok(AsyncSink::Ready) => { - if eof { - return Ok(Decoding::Ready) - } - }, - Err(_) => return Err(Error::Incomplete), - } - } - match payload.decoder.decode(&mut self.read_buf) { Ok(Async::Ready(Some(bytes))) => { - match payload.tx.start_send(PayloadItem::Chunk(bytes)) { - Ok(AsyncSink::NotReady(item)) => { - payload.tmp_item = Some(item); - return Ok(Decoding::Paused) - } - Ok(AsyncSink::Ready) => { - continue - } - Err(_) => return Err(Error::Incomplete), - } + payload.tx.feed_data(bytes) }, Ok(Async::Ready(None)) => { - match payload.tx.start_send(PayloadItem::Eof) { - Ok(AsyncSink::NotReady(item)) => { - payload.tmp_item = Some(item); - return Ok(Decoding::Paused) - } - Ok(AsyncSink::Ready) => { - return Ok(Decoding::Ready) - } - Err(_) => return Err(Error::Incomplete), - } + payload.tx.feed_eof(); + return Ok(Decoding::Ready) }, Ok(Async::NotReady) => return Ok(Decoding::NotReady), Err(_) => return Err(Error::Incomplete), @@ -113,9 +79,11 @@ impl Reader { } } - pub fn parse(&mut self, io: &mut T) -> Poll<(HttpRequest, Option), Error> + pub fn parse(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), Error> where T: AsyncRead { + + loop { match self.decode()? { Decoding::Paused => return Ok(Async::NotReady), @@ -137,11 +105,10 @@ impl Reader { match try!(parse(&mut self.read_buf)) { Some((msg, decoder)) => { let payload = if let Some(decoder) = decoder { - let (tx, rx) = channel(32); + let (tx, rx) = Payload::new(false); let payload = PayloadInfo { tx: tx, decoder: decoder, - tmp_item: None, }; self.payload = Some(payload); @@ -170,9 +137,10 @@ impl Reader { } } } - Some(rx) + rx } else { - None + let (_, rx) = Payload::new(true); + rx }; return Ok(Async::Ready((msg, payload))); }, diff --git a/src/resource.rs b/src/resource.rs index 4c5802924..b7cf6e4f8 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -6,7 +6,8 @@ use actix::Actor; use http::Method; use task::Task; -use route::{Route, Payload, RouteHandler}; +use route::{Route, RouteHandler}; +use payload::Payload; use context::HttpContext; use httpcodes::HTTPMethodNotAllowed; use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse}; @@ -92,7 +93,7 @@ impl Resource where S: 'static { impl RouteHandler for Resource { - fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task { + fn handle(&self, req: HttpRequest, payload: Payload, state: Rc) -> Task { if let Some(handler) = self.routes.get(req.method()) { handler.handle(req, payload, state) } else { diff --git a/src/route.rs b/src/route.rs index 25667055c..e6836415b 100644 --- a/src/route.rs +++ b/src/route.rs @@ -3,40 +3,13 @@ use std::marker::PhantomData; use actix::Actor; use bytes::Bytes; -use futures::unsync::mpsc::Receiver; use task::Task; use context::HttpContext; use resource::Reply; +use payload::Payload; use httpmessage::{HttpRequest, HttpResponse}; -/// Stream of `PayloadItem`'s -pub type Payload = Receiver; - -/// `PayloadItem` represents one payload item -#[derive(Debug)] -pub enum PayloadItem { - /// Indicates end of payload stream - Eof, - /// Chunk of bytes - Chunk(Bytes) -} - -impl PayloadItem { - /// Is item an eof - pub fn is_eof(&self) -> bool { - match *self { - PayloadItem::Eof => true, - _ => false, - } - } - /// Is item a chunk - pub fn is_chunk(&self) -> bool { - !self.is_eof() - } -} - - #[doc(hidden)] #[derive(Debug)] #[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] @@ -47,7 +20,7 @@ pub enum Frame { /// Trait defines object that could be regestered as resource route pub trait RouteHandler: 'static { - fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task; + fn handle(&self, req: HttpRequest, payload: Payload, state: Rc) -> Task; } /// Actors with ability to handle http requests @@ -60,9 +33,7 @@ pub trait Route: Actor> { /// result immediately with `Reply::reply` or `Reply::with`. /// Actor itself could be returned for handling streaming request/response. /// In that case `HttpContext::start` and `HttpContext::write` has to be used. - fn request(req: HttpRequest, - payload: Option, - ctx: &mut HttpContext) -> Reply; + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply; /// This method creates `RouteFactory` for this actor. fn factory() -> RouteFactory { @@ -77,7 +48,7 @@ impl RouteHandler for RouteFactory where A: Route, S: 'static { - fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task + fn handle(&self, req: HttpRequest, payload: Payload, state: Rc) -> Task { let mut ctx = HttpContext::new(state); A::request(req, payload, &mut ctx).into(ctx) diff --git a/src/router.rs b/src/router.rs index 1bf38744e..74083940d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -4,14 +4,15 @@ use std::collections::HashMap; use route_recognizer::{Router as Recognizer}; use task::Task; -use route::{Payload, RouteHandler}; +use payload::Payload; +use route::RouteHandler; use resource::Resource; use application::Application; use httpcodes::HTTPNotFound; use httpmessage::{HttpRequest, IntoHttpResponse}; pub(crate) trait Handler: 'static { - fn handle(&self, req: HttpRequest, payload: Option) -> Task; + fn handle(&self, req: HttpRequest, payload: Payload) -> Task; } /// Request routing map @@ -127,7 +128,7 @@ struct Router { impl Router { - pub fn call(&self, req: HttpRequest, payload: Option) -> Task + pub fn call(&self, req: HttpRequest, payload: Payload) -> Task { if let Ok(h) = self.resources.recognize(req.path()) { h.handler.handle(req.with_params(h.params), payload, Rc::new(())) diff --git a/src/task.rs b/src/task.rs index 1744fd421..0360b19d2 100644 --- a/src/task.rs +++ b/src/task.rs @@ -18,8 +18,7 @@ use httpmessage::{Body, HttpResponse}; type FrameStream = Stream; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific -const DEFAULT_LIMIT: usize = 65_536; // max buffer size 64k - +const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k #[derive(PartialEq, Debug)] enum TaskRunningState { @@ -239,7 +238,7 @@ impl Task { // should pause task if self.state != TaskRunningState::Done { - if self.buffer.len() > DEFAULT_LIMIT { + if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { self.state = TaskRunningState::Paused; } else if self.state == TaskRunningState::Paused { self.state = TaskRunningState::Running; diff --git a/src/ws.rs b/src/ws.rs index 2ac58ef09..59442c301 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -21,25 +21,20 @@ //! impl Route for WsRoute { //! type State = (); //! -//! fn request(req: HttpRequest, payload: Option, -//! ctx: &mut HttpContext) -> Reply +//! fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply //! { -//! if let Some(payload) = payload { -//! // WebSocket handshake -//! match ws::handshake(req) { -//! Ok(resp) => { -//! // Send handshake response to peer -//! ctx.start(resp); -//! // Map Payload into WsStream -//! ctx.add_stream(ws::WsStream::new(payload)); -//! // Start ws messages processing -//! Reply::stream(WsRoute) -//! }, -//! Err(err) => -//! Reply::reply(err) -//! } -//! } else { -//! Reply::with(req, httpcodes::HTTPBadRequest) +//! // WebSocket handshake +//! match ws::handshake(req) { +//! Ok(resp) => { +//! // Send handshake response to peer +//! ctx.start(resp); +//! // Map Payload into WsStream +//! ctx.add_stream(ws::WsStream::new(payload)); +//! // Start ws messages processing +//! Reply::stream(WsRoute) +//! }, +//! Err(err) => +//! Reply::reply(err) //! } //! } //! } @@ -77,7 +72,8 @@ use hyper::header; use actix::Actor; use context::HttpContext; -use route::{Route, Payload, PayloadItem}; +use route::Route; +use payload::Payload; use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed}; use httpmessage::{Body, ConnectionType, HttpRequest, HttpResponse, IntoHttpResponse}; @@ -204,21 +200,18 @@ impl Stream for WsStream { let mut done = false; loop { - match self.rx.poll() { - Ok(Async::Ready(Some(item))) => { - match item { - PayloadItem::Eof => - return Ok(Async::Ready(None)), - PayloadItem::Chunk(chunk) => { - self.buf.extend(chunk) - } - } + match self.rx.readany() { + Async::Ready(Some(chunk)) => { + self.buf.extend(chunk) } - Ok(Async::Ready(None)) => done = true, - Ok(Async::NotReady) => {}, - Err(err) => return Err(err), + Async::Ready(None) => { + done = true; + } + Async::NotReady => break, } + } + loop { match wsframe::Frame::parse(&mut self.buf) { Ok(Some(frame)) => { trace!("Frame {}", frame);