diff --git a/src/application.rs b/src/application.rs index 1e4d8273c..8cf5db269 100644 --- a/src/application.rs +++ b/src/application.rs @@ -43,8 +43,8 @@ impl PipelineHandler for Inner { path.split_at(prefix.len()).1.starts_with('/')) }; if m { - let path: &'static str = unsafe{ - mem::transmute(&req.path()[self.prefix+prefix.len()..])}; + let path: &'static str = unsafe { + mem::transmute(&req.path()[self.prefix+prefix.len()..]) }; if path.is_empty() { req.match_info_mut().add("tail", ""); } else { @@ -321,9 +321,7 @@ impl Application where S: 'static { } /// Register a middleware - pub fn middleware(mut self, mw: T) -> Application - where T: Middleware + 'static - { + pub fn middleware>(mut self, mw: M) -> Application { self.parts.as_mut().expect("Use after finish") .middlewares.push(Box::new(mw)); self diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index b9798c97b..70f5712e8 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -46,7 +46,7 @@ pub enum Finished { /// Middleware definition #[allow(unused_variables)] -pub trait Middleware { +pub trait Middleware: 'static { /// Method is called when request is ready. It may return /// future, which should resolve before next middleware get called. diff --git a/src/pipeline.rs b/src/pipeline.rs index 44c503104..9873958b1 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -74,7 +74,7 @@ impl PipelineInfo { } } -impl> Pipeline { +impl> Pipeline { pub fn new(req: HttpRequest, mws: Rc>>>, @@ -101,7 +101,7 @@ impl Pipeline<(), Inner<()>> { } } -impl Pipeline { +impl Pipeline { fn is_done(&self) -> bool { match self.1 { @@ -114,7 +114,7 @@ impl Pipeline { } } -impl> HttpHandlerTask for Pipeline { +impl> HttpHandlerTask for Pipeline { fn disconnected(&mut self) { self.0.disconnected = Some(true); @@ -277,7 +277,7 @@ struct StartMiddlewares { _s: PhantomData, } -impl> StartMiddlewares { +impl> StartMiddlewares { fn init(info: &mut PipelineInfo, handler: Rc>) -> PipelineState { @@ -364,7 +364,7 @@ struct WaitingResponse { _h: PhantomData, } -impl WaitingResponse { +impl WaitingResponse { #[inline] fn init(info: &mut PipelineInfo, reply: Reply) -> PipelineState @@ -399,7 +399,7 @@ struct RunMiddlewares { _h: PhantomData, } -impl RunMiddlewares { +impl RunMiddlewares { fn init(info: &mut PipelineInfo, mut resp: HttpResponse) -> PipelineState { @@ -510,7 +510,7 @@ enum IOState { Done, } -impl ProcessResponse { +impl ProcessResponse { #[inline] fn init(resp: HttpResponse) -> PipelineState @@ -550,19 +550,6 @@ impl ProcessResponse { result }, IOState::Payload(mut body) => { - // always poll context - if self.running == RunningState::Running { - match info.poll_context() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(_)) => - self.running = RunningState::Done, - Err(err) => { - info.error = Some(err); - return Ok(FinishingMiddlewares::init(info, self.resp)) - } - } - } - match body.poll() { Ok(Async::Ready(None)) => { self.iostate = IOState::Done; @@ -706,7 +693,7 @@ struct FinishingMiddlewares { _h: PhantomData, } -impl FinishingMiddlewares { +impl FinishingMiddlewares { fn init(info: &mut PipelineInfo, resp: HttpResponse) -> PipelineState { if info.count == 0 { diff --git a/src/resource.rs b/src/resource.rs index ee6d682e5..c9e1251c0 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -1,3 +1,4 @@ +use std::rc::Rc; use std::marker::PhantomData; use http::{Method, StatusCode}; @@ -6,6 +7,7 @@ use pred; use body::Body; use route::Route; use handler::{Reply, Handler, Responder}; +use middleware::Middleware; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -33,6 +35,7 @@ pub struct Resource { name: String, state: PhantomData, routes: Vec>, + middlewares: Rc>>>, } impl Default for Resource { @@ -40,7 +43,8 @@ impl Default for Resource { Resource { name: String::new(), state: PhantomData, - routes: Vec::new() } + routes: Vec::new(), + middlewares: Rc::new(Vec::new()) } } } @@ -50,7 +54,8 @@ impl Resource { Resource { name: String::new(), state: PhantomData, - routes: Vec::new() } + routes: Vec::new(), + middlewares: Rc::new(Vec::new()) } } /// Set resource name @@ -126,12 +131,25 @@ impl Resource { self.routes.last_mut().unwrap().f(handler) } - pub(crate) fn handle(&mut self, mut req: HttpRequest, default: Option<&mut Resource>) - -> Reply + /// Register a middleware + /// + /// This is similar to `Application's` middlewares, but + /// middlewares get invoked on resource level. + pub fn middleware>(&mut self, mw: M) { + Rc::get_mut(&mut self.middlewares).unwrap().push(Box::new(mw)); + } + + pub(crate) fn handle(&mut self, + mut req: HttpRequest, + default: Option<&mut Resource>) -> Reply { for route in &mut self.routes { if route.check(&mut req) { - return route.handle(req) + return if self.middlewares.is_empty() { + route.handle(req) + } else { + route.compose(req, Rc::clone(&self.middlewares)) + }; } } if let Some(resource) = default { diff --git a/src/route.rs b/src/route.rs index 64b60603d..acef0fd44 100644 --- a/src/route.rs +++ b/src/route.rs @@ -1,10 +1,16 @@ -use futures::Future; +use std::mem; +use std::rc::Rc; +use std::marker::PhantomData; +use futures::{Async, Future, Poll}; use error::Error; use pred::Predicate; -use handler::{Reply, Handler, Responder, RouteHandler, AsyncHandler, WrapHandler}; +use handler::{Reply, ReplyItem, Handler, + Responder, RouteHandler, AsyncHandler, WrapHandler}; +use middleware::{Middleware, Response as MiddlewareResponse, Started as MiddlewareStarted}; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; +use httpresponse::HttpResponse; /// Resource route definition /// @@ -12,7 +18,7 @@ use httprequest::HttpRequest; /// If handler is not explicitly set, default *404 Not Found* handler is used. pub struct Route { preds: Vec>>, - handler: Box>, + handler: InnerHandler, } impl Default for Route { @@ -20,13 +26,14 @@ impl Default for Route { fn default() -> Route { Route { preds: Vec::new(), - handler: Box::new(WrapHandler::new(|_| HTTPNotFound)), + handler: InnerHandler::new(|_| HTTPNotFound), } } } impl Route { + #[inline] pub(crate) fn check(&self, req: &mut HttpRequest) -> bool { for pred in &self.preds { if !pred.check(req) { @@ -36,10 +43,18 @@ impl Route { true } + #[inline] pub(crate) fn handle(&mut self, req: HttpRequest) -> Reply { self.handler.handle(req) } + #[inline] + pub(crate) fn compose(&mut self, + req: HttpRequest, + mws: Rc>>>) -> Reply { + Reply::async(Compose::new(req, mws, self.handler.clone())) + } + /// Add match predicate to route. /// /// ```rust @@ -65,7 +80,7 @@ impl Route { /// Set handler object. Usually call to this method is last call /// during route configuration, because it does not return reference to self. pub fn h>(&mut self, handler: H) { - self.handler = Box::new(WrapHandler::new(handler)); + self.handler = InnerHandler::new(handler); } /// Set handler function. Usually call to this method is last call @@ -74,7 +89,7 @@ impl Route { where F: Fn(HttpRequest) -> R + 'static, R: Responder + 'static, { - self.handler = Box::new(WrapHandler::new(handler)); + self.handler = InnerHandler::new(handler); } /// Set async handler function. @@ -84,6 +99,315 @@ impl Route { R: Responder + 'static, E: Into + 'static { - self.handler = Box::new(AsyncHandler::new(handler)); + self.handler = InnerHandler::async(handler); + } +} + +/// RouteHandler wrapper. This struct is required because it needs to be shared +/// for resource level middlewares. +struct InnerHandler(Rc>>); + +impl InnerHandler { + + #[inline] + fn new>(h: H) -> Self { + InnerHandler(Rc::new(Box::new(WrapHandler::new(h)))) + } + + #[inline] + fn async(h: H) -> Self + where H: Fn(HttpRequest) -> F + 'static, + F: Future + 'static, + R: Responder + 'static, + E: Into + 'static + { + InnerHandler(Rc::new(Box::new(AsyncHandler::new(h)))) + } + + #[inline] + pub fn handle(&self, req: HttpRequest) -> Reply { + // reason: handler is unique per thread, + // handler get called from async code, and handler doesnt have side effects + #[allow(mutable_transmutes)] + #[cfg_attr(feature = "cargo-clippy", allow(borrowed_box))] + let h: &mut Box> = unsafe { mem::transmute(self.0.as_ref()) }; + h.handle(req) + } +} + +impl Clone for InnerHandler { + #[inline] + fn clone(&self) -> Self { + InnerHandler(Rc::clone(&self.0)) + } +} + + +/// Compose resource level middlewares with route handler. +struct Compose { + info: ComposeInfo, + state: ComposeState, +} + +struct ComposeInfo { + count: usize, + req: HttpRequest, + mws: Rc>>>, + handler: InnerHandler, +} + +enum ComposeState { + Starting(StartMiddlewares), + Handler(WaitingResponse), + RunMiddlewares(RunMiddlewares), + Response(Response), +} + +impl ComposeState { + fn poll(&mut self, info: &mut ComposeInfo) -> Option> { + match *self { + ComposeState::Starting(ref mut state) => state.poll(info), + ComposeState::Handler(ref mut state) => state.poll(info), + ComposeState::RunMiddlewares(ref mut state) => state.poll(info), + ComposeState::Response(_) => None, + } + } +} + +impl Compose { + fn new(req: HttpRequest, + mws: Rc>>>, + handler: InnerHandler) -> Self + { + let mut info = ComposeInfo { + count: 0, + req: req, + mws: mws, + handler: handler }; + let state = StartMiddlewares::init(&mut info); + + Compose {state: state, info: info} + } +} + +impl Future for Compose { + type Item = HttpResponse; + type Error = Error; + + fn poll(&mut self) -> Poll { + loop { + if let ComposeState::Response(ref mut resp) = self.state { + let resp = resp.resp.take().unwrap(); + return Ok(Async::Ready(resp)) + } + if let Some(state) = self.state.poll(&mut self.info) { + self.state = state; + } else { + return Ok(Async::NotReady) + } + } + } +} + +/// Middlewares start executor +struct StartMiddlewares { + fut: Option, + _s: PhantomData, +} + +type Fut = Box, Error=Error>>; + +impl StartMiddlewares { + + fn init(info: &mut ComposeInfo) -> ComposeState { + let len = info.mws.len(); + loop { + if info.count == len { + let reply = info.handler.handle(info.req.clone()); + return WaitingResponse::init(info, reply) + } else { + match info.mws[info.count].start(&mut info.req) { + MiddlewareStarted::Done => + info.count += 1, + MiddlewareStarted::Response(resp) => + return RunMiddlewares::init(info, resp), + MiddlewareStarted::Future(mut fut) => + match fut.poll() { + Ok(Async::NotReady) => + return ComposeState::Starting(StartMiddlewares { + fut: Some(fut), + _s: PhantomData}), + Ok(Async::Ready(resp)) => { + if let Some(resp) = resp { + return RunMiddlewares::init(info, resp); + } + info.count += 1; + } + Err(err) => + return Response::init(err.into()), + }, + MiddlewareStarted::Err(err) => + return Response::init(err.into()), + } + } + } + } + + fn poll(&mut self, info: &mut ComposeInfo) -> Option> + { + let len = info.mws.len(); + 'outer: loop { + match self.fut.as_mut().unwrap().poll() { + Ok(Async::NotReady) => + return None, + Ok(Async::Ready(resp)) => { + info.count += 1; + if let Some(resp) = resp { + return Some(RunMiddlewares::init(info, resp)); + } + if info.count == len { + let reply = info.handler.handle(info.req.clone()); + return Some(WaitingResponse::init(info, reply)); + } else { + loop { + match info.mws[info.count].start(&mut info.req) { + MiddlewareStarted::Done => + info.count += 1, + MiddlewareStarted::Response(resp) => { + return Some(RunMiddlewares::init(info, resp)); + }, + MiddlewareStarted::Future(fut) => { + self.fut = Some(fut); + continue 'outer + }, + MiddlewareStarted::Err(err) => + return Some(Response::init(err.into())) + } + } + } + } + Err(err) => + return Some(Response::init(err.into())) + } + } + } +} + +// waiting for response +struct WaitingResponse { + fut: Box>, + _s: PhantomData, +} + +impl WaitingResponse { + + #[inline] + fn init(info: &mut ComposeInfo, reply: Reply) -> ComposeState { + match reply.into() { + ReplyItem::Message(resp) => + RunMiddlewares::init(info, resp), + ReplyItem::Future(fut) => + ComposeState::Handler( + WaitingResponse { fut: fut, _s: PhantomData }), + } + } + + fn poll(&mut self, info: &mut ComposeInfo) -> Option> { + match self.fut.poll() { + Ok(Async::NotReady) => None, + Ok(Async::Ready(response)) => + Some(RunMiddlewares::init(info, response)), + Err(err) => + Some(Response::init(err.into())), + } + } +} + + +/// Middlewares response executor +struct RunMiddlewares { + curr: usize, + fut: Option>>, + _s: PhantomData, +} + +impl RunMiddlewares { + + fn init(info: &mut ComposeInfo, mut resp: HttpResponse) -> ComposeState { + let mut curr = 0; + let len = info.mws.len(); + + loop { + resp = match info.mws[curr].response(&mut info.req, resp) { + MiddlewareResponse::Err(err) => { + info.count = curr + 1; + return Response::init(err.into()) + }, + MiddlewareResponse::Done(r) => { + curr += 1; + if curr == len { + return Response::init(r) + } else { + r + } + }, + MiddlewareResponse::Future(fut) => { + return ComposeState::RunMiddlewares( + RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData }) + }, + }; + } + } + + fn poll(&mut self, info: &mut ComposeInfo) -> Option> + { + let len = info.mws.len(); + + loop { + // poll latest fut + let mut resp = match self.fut.as_mut().unwrap().poll() { + Ok(Async::NotReady) => { + return None + } + Ok(Async::Ready(resp)) => { + self.curr += 1; + resp + } + Err(err) => + return Some(Response::init(err.into())), + }; + + loop { + if self.curr == len { + return Some(Response::init(resp)); + } else { + match info.mws[self.curr].response(&mut info.req, resp) { + MiddlewareResponse::Err(err) => + return Some(Response::init(err.into())), + MiddlewareResponse::Done(r) => { + self.curr += 1; + resp = r + }, + MiddlewareResponse::Future(fut) => { + self.fut = Some(fut); + break + }, + } + } + } + } + } +} + +struct Response { + resp: Option, + _s: PhantomData, +} + +impl Response { + + fn init(resp: HttpResponse) -> ComposeState { + ComposeState::Response( + Response{resp: Some(resp), _s: PhantomData}) } } diff --git a/src/test.rs b/src/test.rs index 4f2433a9f..22b09b29e 100644 --- a/src/test.rs +++ b/src/test.rs @@ -192,6 +192,16 @@ impl TestApp { self.app = Some(self.app.take().unwrap().resource("/", |r| r.h(handler))); } + /// Register handler for "/" with resource middleware + pub fn handler2(&mut self, handler: H, mw: M) + where H: Handler, M: Middleware + { + self.app = Some(self.app.take().unwrap() + .resource("/", |r| { + r.middleware(mw); + r.h(handler)})); + } + /// Register middleware pub fn middleware(&mut self, mw: T) -> &mut TestApp where T: Middleware + 'static diff --git a/tests/test_server.rs b/tests/test_server.rs index 51919cd5d..1399879ba 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -135,3 +135,28 @@ fn test_middlewares() { assert_eq!(num2.load(Ordering::Relaxed), 1); assert_eq!(num3.load(Ordering::Relaxed), 1); } + + +#[test] +fn test_resource_middlewares() { + let num1 = Arc::new(AtomicUsize::new(0)); + let num2 = Arc::new(AtomicUsize::new(0)); + let num3 = Arc::new(AtomicUsize::new(0)); + + let act_num1 = Arc::clone(&num1); + let act_num2 = Arc::clone(&num2); + let act_num3 = Arc::clone(&num3); + + let srv = test::TestServer::new( + move |app| app.handler2( + httpcodes::HTTPOk, + MiddlewareTest{start: Arc::clone(&act_num1), + response: Arc::clone(&act_num2), + finish: Arc::clone(&act_num3)}) + ); + + assert!(reqwest::get(&srv.url("/")).unwrap().status().is_success()); + assert_eq!(num1.load(Ordering::Relaxed), 1); + assert_eq!(num2.load(Ordering::Relaxed), 1); + // assert_eq!(num3.load(Ordering::Relaxed), 1); +}