diff --git a/CHANGES.md b/CHANGES.md index d1ab317ea..f4cec6a66 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,8 @@ * HTTP/2 Support +* Asynchronous middlewares + * Content compression/decompression (br, gzip, deflate) diff --git a/examples/basic.rs b/examples/basic.rs index 191ea683b..19eee567a 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -49,8 +49,8 @@ fn main() { HttpServer::new( Application::default("/") // enable logger - .middleware(Logger::new(None)) - // register simple handler, handle all methods + .middleware(middlewares::Logger::new(None)) + // register simple handle r, handle all methods .handler("/index.html", index) // with path parameters .resource("/user/{name}/", |r| r.handler(Method::GET, with_param)) diff --git a/examples/state.rs b/examples/state.rs index 620f67826..27b45696c 100644 --- a/examples/state.rs +++ b/examples/state.rs @@ -74,7 +74,7 @@ fn main() { HttpServer::new( Application::builder("/", AppState{counter: Cell::new(0)}) // enable logger - .middleware(Logger::new(None)) + .middleware(middlewares::Logger::new(None)) // websocket route .resource("/ws/", |r| r.get::()) // register simple handler, handle all methods diff --git a/examples/tls/src/main.rs b/examples/tls/src/main.rs index 062f6334d..742a9f69a 100644 --- a/examples/tls/src/main.rs +++ b/examples/tls/src/main.rs @@ -32,7 +32,7 @@ fn main() { HttpServer::new( Application::default("/") // enable logger - .middleware(Logger::new(None)) + .middleware(middlewares::Logger::new(None)) // register simple handler, handle all methods .handler("/index.html", index) // with path parameters diff --git a/examples/websocket.rs b/examples/websocket.rs index b5f03f9cc..a70ac71a1 100644 --- a/examples/websocket.rs +++ b/examples/websocket.rs @@ -73,7 +73,7 @@ fn main() { HttpServer::new( Application::default("/") // enable logger - .middleware(Logger::new(None)) + .middleware(middlewares::Logger::new(None)) // websocket route .resource("/ws/", |r| r.get::()) .route_handler("/", StaticFiles::new("examples/static/", true))) diff --git a/src/application.rs b/src/application.rs index a2badbd50..c710142d4 100644 --- a/src/application.rs +++ b/src/application.rs @@ -10,27 +10,9 @@ use recognizer::{RouteRecognizer, check_pattern}; use httprequest::HttpRequest; use httpresponse::HttpResponse; use channel::HttpHandler; +use middlewares::Middleware; -/// Middleware definition -#[allow(unused_variables)] -pub trait Middleware { - - /// Method is called when request is ready. - fn start(&self, req: &mut HttpRequest) -> Result<(), HttpResponse> { - Ok(()) - } - - /// Method is called when handler returns response, - /// but before sending body streams to peer. - fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> HttpResponse { - resp - } - - /// Http interation is finished - fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) {} -} - /// Application pub struct Application { state: Rc, @@ -67,19 +49,13 @@ impl HttpHandler for Application { } fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task { - // run middlewares + let mut task = self.run(req, payload); + + // init middlewares if !self.middlewares.is_empty() { - for middleware in self.middlewares.iter() { - if let Err(resp) = middleware.start(req) { - return Task::reply(resp) - }; - } - let mut task = self.run(req, payload); task.set_middlewares(Rc::clone(&self.middlewares)); - task - } else { - self.run(req, payload) } + task } } diff --git a/src/context.rs b/src/context.rs index b6fd4425c..4b201184a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -18,7 +18,7 @@ use route::{Route, Frame}; use httpresponse::HttpResponse; -/// Actor execution context +/// Http actor execution context pub struct HttpContext where A: Actor> + Route, { act: Option, diff --git a/src/lib.rs b/src/lib.rs index 85a20aa2f..32380a65a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,6 @@ mod date; mod encoding; mod httprequest; mod httpresponse; -mod logger; mod payload; mod resource; mod recognizer; @@ -62,17 +61,17 @@ pub mod ws; pub mod dev; pub mod httpcodes; pub mod multipart; +pub mod middlewares; pub use encoding::ContentEncoding; pub use error::ParseError; pub use body::{Body, BinaryBody}; -pub use application::{Application, ApplicationBuilder, Middleware}; +pub use application::{Application, ApplicationBuilder}; pub use httprequest::{HttpRequest, UrlEncoded}; pub use httpresponse::{HttpResponse, HttpResponseBuilder}; pub use payload::{Payload, PayloadItem, PayloadError}; pub use route::{Frame, Route, RouteFactory, RouteHandler, RouteResult}; pub use resource::{Reply, Resource, HandlerResult}; pub use recognizer::{Params, RouteRecognizer}; -pub use logger::Logger; pub use server::HttpServer; pub use context::HttpContext; pub use channel::HttpChannel; diff --git a/src/logger.rs b/src/middlewares/logger.rs similarity index 95% rename from src/logger.rs rename to src/middlewares/logger.rs index d600a99c4..52c43e348 100644 --- a/src/logger.rs +++ b/src/middlewares/logger.rs @@ -6,9 +6,9 @@ use std::fmt::{Display, Formatter}; use time; -use application::Middleware; use httprequest::HttpRequest; use httpresponse::HttpResponse; +use middlewares::{Middleware, Started, Finished}; /// `Middleware` for logging request and response info to the terminal. pub struct Logger { @@ -37,16 +37,13 @@ impl Logger { struct StartTime(time::Tm); impl Logger { - fn initialise(&self, req: &mut HttpRequest) { - req.extensions().insert(StartTime(time::now())); - } fn log(&self, req: &mut HttpRequest, resp: &HttpResponse) { let entry_time = req.extensions().get::().unwrap().0; let response_time = time::now() - entry_time; - let response_time_ms = (response_time.num_seconds() * 1000) as f64 + (response_time.num_nanoseconds().unwrap_or(0) as f64) / 1000000.0; - + let response_time_ms = (response_time.num_seconds() * 1000) as f64 + + (response_time.num_nanoseconds().unwrap_or(0) as f64) / 1000000000.0; { let render = |fmt: &mut Formatter, text: &FormatText| { match *text { @@ -61,7 +58,7 @@ impl Logger { }, FormatText::Status => resp.status().fmt(fmt), FormatText::ResponseTime => - fmt.write_fmt(format_args!("{} ms", response_time_ms)), + fmt.write_fmt(format_args!("{} sec", response_time_ms)), FormatText::RemoteAddr => Ok(()), //req.remote_addr.fmt(fmt), FormatText::RequestTime => { entry_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ%z") @@ -77,13 +74,15 @@ impl Logger { } impl Middleware for Logger { - fn start(&self, req: &mut HttpRequest) -> Result<(), HttpResponse> { - self.initialise(req); - Ok(()) + + fn start(&self, req: &mut HttpRequest) -> Started { + req.extensions().insert(StartTime(time::now())); + Started::Done } - fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) { + fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { self.log(req, resp); + Finished::Done } } diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs new file mode 100644 index 000000000..04790bc85 --- /dev/null +++ b/src/middlewares/mod.rs @@ -0,0 +1,247 @@ +//! Middlewares +use std::rc::Rc; +use std::error::Error; +use futures::{Async, Future, Poll}; +use httprequest::HttpRequest; +use httpresponse::HttpResponse; + +mod logger; +pub use self::logger::Logger; + +/// Middleware start result +pub enum Started { + /// Execution completed + Done, + /// New http response got generated. If middleware generates response + /// handler execution halts. + Response(HttpResponse), + /// Execution completed, but run future to completion. + Future(Box>), +} + +/// Middleware execution result +pub enum Response { + /// New http response got generated + Response(HttpResponse), + /// Result is a future that resolves to a new http response + Future(Box>), +} + +/// Middleware finish result +pub enum Finished { + /// Execution completed + Done, + /// Execution completed, but run future to completion + Future(Box>>), +} + +/// Middleware definition +#[allow(unused_variables)] +pub trait Middleware { + + /// Method is called when request is ready. It may return + /// future, which should resolve before next middleware get called. + fn start(&self, req: &mut HttpRequest) -> Started { + Started::Done + } + + /// Method is called when handler returns response, + /// but before sending body stream to peer. + fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response { + Response::Response(resp) + } + + /// Method is called after http response get sent to peer. + fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { + Finished::Done + } +} + +/// Middlewares executor +pub(crate) struct MiddlewaresExecutor { + state: ExecutorState, + fut: Option>>, + started: Option>>, + finished: Option>>>, + middlewares: Option>>>, +} + +enum ExecutorState { + None, + Starting(usize), + Started(usize), + Processing(usize, usize), + Finishing(usize), +} + +impl Default for MiddlewaresExecutor { + + fn default() -> MiddlewaresExecutor { + MiddlewaresExecutor { + fut: None, + started: None, + finished: None, + state: ExecutorState::None, + middlewares: None, + } + } +} + +impl MiddlewaresExecutor { + + pub fn start(&mut self, mw: Rc>>) { + self.state = ExecutorState::Starting(0); + self.middlewares = Some(mw); + } + + pub fn starting(&mut self, req: &mut HttpRequest) -> Poll, ()> { + if let Some(ref middlewares) = self.middlewares { + let state = &mut self.state; + if let ExecutorState::Starting(mut idx) = *state { + loop { + // poll latest fut + if let Some(ref mut fut) = self.started { + match fut.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => idx += 1, + Err(response) => { + *state = ExecutorState::Started(idx); + return Ok(Async::Ready(Some(response))) + } + } + } + self.started = None; + + if idx >= middlewares.len() { + *state = ExecutorState::Started(idx-1); + return Ok(Async::Ready(None)) + } else { + match middlewares[idx].start(req) { + Started::Done => idx += 1, + Started::Response(resp) => { + *state = ExecutorState::Started(idx); + return Ok(Async::Ready(Some(resp))) + }, + Started::Future(fut) => { + self.started = Some(fut); + }, + } + } + } + } + } + Ok(Async::Ready(None)) + } + + pub fn processing(&mut self, req: &mut HttpRequest) -> Poll, ()> { + if let Some(ref middlewares) = self.middlewares { + let state = &mut self.state; + match *state { + ExecutorState::Processing(mut idx, total) => { + loop { + // poll latest fut + let mut resp = match self.fut.as_mut().unwrap().poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(response)) | Err(response) => { + idx += 1; + response + } + }; + self.fut = None; + + loop { + if idx == 0 { + *state = ExecutorState::Finishing(total); + return Ok(Async::Ready(Some(resp))) + } else { + match middlewares[idx].response(req, resp) { + Response::Response(r) => { + idx -= 1; + resp = r + }, + Response::Future(fut) => { + self.fut = Some(fut); + break + }, + } + } + } + } + } + _ => Ok(Async::Ready(None)) + } + } else { + Ok(Async::Ready(None)) + } + } + + pub fn finishing(&mut self, req: &mut HttpRequest, resp: &HttpResponse) -> Poll<(), ()> { + if let Some(ref middlewares) = self.middlewares { + let state = &mut self.state; + if let ExecutorState::Finishing(mut idx) = *state { + loop { + // poll latest fut + if let Some(ref mut fut) = self.finished { + match fut.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => idx -= 1, + Err(err) => { + error!("Middleware finish error: {}", err); + } + } + } + self.finished = None; + + match middlewares[idx].finish(req, resp) { + Finished::Done => { + if idx == 0 { + return Ok(Async::Ready(())) + } else { + idx -= 1 + } + } + Finished::Future(fut) => { + self.finished = Some(fut); + }, + } + } + } + } + Ok(Async::Ready(())) + } + + pub fn response(&mut self, req: &mut HttpRequest, resp: HttpResponse) + -> Option + { + if let Some(ref middlewares) = self.middlewares { + let mut resp = resp; + let state = &mut self.state; + match *state { + ExecutorState::Started(mut idx) => { + let total = idx; + loop { + resp = match middlewares[idx].response(req, resp) { + Response::Response(r) => { + if idx == 0 { + *state = ExecutorState::Finishing(total); + return Some(r) + } else { + idx -= 1; + r + } + }, + Response::Future(fut) => { + *state = ExecutorState::Processing(idx, total); + self.fut = Some(fut); + return None + }, + }; + } + } + _ => Some(resp) + } + } else { + Some(resp) + } + } +} diff --git a/src/task.rs b/src/task.rs index a9c9c6276..ce1447c78 100644 --- a/src/task.rs +++ b/src/task.rs @@ -8,7 +8,7 @@ use futures::task::{Task as FutureTask, current as current_task}; use h1writer::{Writer, WriterState}; use route::Frame; -use application::Middleware; +use middlewares::{Middleware, MiddlewaresExecutor}; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -109,7 +109,7 @@ pub struct Task { drain: Vec>>, prepared: Option, disconnected: bool, - middlewares: Option>>>, + middlewares: MiddlewaresExecutor, } impl Task { @@ -119,49 +119,42 @@ impl Task { frames.push_back(Frame::Message(response.into())); frames.push_back(Frame::Payload(None)); - Task { - state: TaskRunningState::Running, - iostate: TaskIOState::Done, - frames: frames, - drain: Vec::new(), - stream: TaskStream::None, - prepared: None, - disconnected: false, - middlewares: None, - } + Task { state: TaskRunningState::Running, + iostate: TaskIOState::Done, + frames: frames, + drain: Vec::new(), + stream: TaskStream::None, + prepared: None, + disconnected: false, + middlewares: MiddlewaresExecutor::default() } + } + + pub(crate) fn with_context(ctx: C) -> Self { + Task { state: TaskRunningState::Running, + iostate: TaskIOState::ReadingMessage, + frames: VecDeque::new(), + stream: TaskStream::Context(Box::new(ctx)), + drain: Vec::new(), + prepared: None, + disconnected: false, + middlewares: MiddlewaresExecutor::default() } } pub(crate) fn with_stream(stream: S) -> Self where S: Stream + 'static { - Task { - state: TaskRunningState::Running, - iostate: TaskIOState::ReadingMessage, - frames: VecDeque::new(), - stream: TaskStream::Stream(Box::new(stream)), - drain: Vec::new(), - prepared: None, - disconnected: false, - middlewares: None, - } - } - - pub(crate) fn with_context(ctx: C) -> Self - { - Task { - state: TaskRunningState::Running, - iostate: TaskIOState::ReadingMessage, - frames: VecDeque::new(), - stream: TaskStream::Context(Box::new(ctx)), - drain: Vec::new(), - prepared: None, - disconnected: false, - middlewares: None, - } + Task { state: TaskRunningState::Running, + iostate: TaskIOState::ReadingMessage, + frames: VecDeque::new(), + stream: TaskStream::Stream(Box::new(stream)), + drain: Vec::new(), + prepared: None, + disconnected: false, + middlewares: MiddlewaresExecutor::default() } } pub(crate) fn set_middlewares(&mut self, middlewares: Rc>>) { - self.middlewares = Some(middlewares); + self.middlewares.start(middlewares) } pub(crate) fn disconnected(&mut self) { @@ -175,6 +168,17 @@ impl Task { where T: Writer { trace!("POLL-IO frames:{:?}", self.frames.len()); + + // start middlewares + match self.middlewares.starting(req) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) | Err(_) => (), + Ok(Async::Ready(Some(response))) => { + self.frames.clear(); + self.frames.push_front(Frame::Message(response)); + }, + } + // response is completed if self.frames.is_empty() && self.iostate.is_done() { return Ok(Async::Ready(self.state.is_done())); @@ -190,29 +194,40 @@ impl Task { } } + // process middlewares response + match self.middlewares.processing(req) { + Err(_) => return Err(()), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => (), + Ok(Async::Ready(Some(mut response))) => { + let result = io.start(req, &mut response); + self.prepared = Some(response); + match result { + Ok(WriterState::Pause) => { + self.state.pause(); + } + Ok(WriterState::Done) => self.state.resume(), + Err(_) => return Err(()) + } + }, + } + // if task is paused, write buffer probably is full if self.state != TaskRunningState::Paused { // process exiting frames while let Some(frame) = self.frames.pop_front() { trace!("IO Frame: {:?}", frame); let res = match frame { - Frame::Message(response) => { + Frame::Message(resp) => { // run middlewares - let mut response = - if let Some(middlewares) = self.middlewares.take() { - let mut response = response; - for middleware in middlewares.iter() { - response = middleware.response(req, response); - } - self.middlewares = Some(middlewares); - response - } else { - response - }; - - let result = io.start(req, &mut response); - self.prepared = Some(response); - result + if let Some(mut resp) = self.middlewares.response(req, resp) { + let result = io.start(req, &mut resp); + self.prepared = Some(resp); + result + } else { + // middlewares need to run some futures + return self.poll_io(io, req) + } } Frame::Payload(Some(chunk)) => { io.write(chunk.as_ref()) @@ -251,7 +266,7 @@ impl Task { } } - // drain + // drain futures if !self.drain.is_empty() { for fut in &mut self.drain { fut.borrow_mut().set() @@ -261,12 +276,11 @@ impl Task { // response is completed if self.iostate.is_done() { - // run middlewares - if let Some(ref mut resp) = self.prepared { - if let Some(middlewares) = self.middlewares.take() { - for middleware in middlewares.iter() { - middleware.finish(req, resp); - } + // finish middlewares + if let Some(ref resp) = self.prepared { + match self.middlewares.finishing(req, resp) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + _ => (), } } Ok(Async::Ready(self.state.is_done())) @@ -276,8 +290,7 @@ impl Task { } fn poll_stream(&mut self, stream: &mut S) -> Poll<(), ()> - where S: Stream - { + where S: Stream { loop { match stream.poll() { Ok(Async::Ready(Some(frame))) => { diff --git a/tests/test_server.rs b/tests/test_server.rs index b489fd8c4..eec293033 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -11,6 +11,7 @@ use tokio_core::net::TcpListener; use actix::*; use actix_web::*; + fn create_server() -> HttpServer> { HttpServer::new( vec![Application::default("/") @@ -59,19 +60,20 @@ struct MiddlewareTest { finish: Arc, } -impl Middleware for MiddlewareTest { - fn start(&self, _: &mut HttpRequest) -> Result<(), HttpResponse> { +impl middlewares::Middleware for MiddlewareTest { + fn start(&self, _: &mut HttpRequest) -> middlewares::Started { self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed); - Ok(()) + middlewares::Started::Done } - fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> HttpResponse { + fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response { self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed); - resp + middlewares::Response::Response(resp) } - fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) { + fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) -> middlewares::Finished { self.finish.store(self.finish.load(Ordering::Relaxed) + 1, Ordering::Relaxed); + middlewares::Finished::Done } }