From b98ab2eebeab584769db6c2a1b679bc856429965 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 9 Dec 2017 04:33:40 -0800 Subject: [PATCH] use trait instead of pipeline --- src/application.rs | 23 ++--- src/channel.rs | 19 +++- src/encoding.rs | 8 +- src/error.rs | 10 +-- src/h1.rs | 4 +- src/h1writer.rs | 12 +-- src/h2.rs | 4 +- src/h2writer.rs | 4 +- src/httprequest.rs | 48 +++++++--- src/middlewares/defaultheaders.rs | 4 +- src/middlewares/logger.rs | 16 ++-- src/middlewares/mod.rs | 8 +- src/middlewares/session.rs | 25 +++--- src/pipeline.rs | 141 ++++++++++++++++-------------- tests/test_server.rs | 8 +- 15 files changed, 189 insertions(+), 145 deletions(-) diff --git a/src/application.rs b/src/application.rs index 899cd453b..291b33d8a 100644 --- a/src/application.rs +++ b/src/application.rs @@ -5,7 +5,7 @@ use handler::{Reply, RouteHandler}; use router::{Router, Pattern}; use resource::Resource; use httprequest::HttpRequest; -use channel::{HttpHandler, IntoHttpHandler}; +use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask}; use pipeline::Pipeline; use middlewares::Middleware; use server::ServerSettings; @@ -16,14 +16,12 @@ pub struct HttpApplication { prefix: String, default: Resource, router: Router, - middlewares: Rc>>, + middlewares: Rc>>>, } impl HttpApplication { - fn run(&self, req: HttpRequest) -> Reply { - let mut req = req.with_state(Rc::clone(&self.state), self.router.clone()); - + fn run(&self, mut req: HttpRequest) -> Reply { if let Some(h) = self.router.recognize(&mut req) { h.handle(req) } else { @@ -34,10 +32,12 @@ impl HttpApplication { impl HttpHandler for HttpApplication { - fn handle(&self, req: HttpRequest) -> Result { + fn handle(&self, req: HttpRequest) -> Result, HttpRequest> { if req.path().starts_with(&self.prefix) { - Ok(Pipeline::new(req, Rc::clone(&self.middlewares), - &|req: HttpRequest| self.run(req))) + let req = req.with_state(Rc::clone(&self.state), self.router.clone()); + + Ok(Box::new(Pipeline::new(req, Rc::clone(&self.middlewares), + &|req: HttpRequest| self.run(req)))) } else { Err(req) } @@ -54,7 +54,7 @@ struct ApplicationParts { default: Resource, resources: HashMap>>, external: HashMap, - middlewares: Vec>, + middlewares: Vec>>, } /// Structure that follows the builder pattern for building `Application` structs. @@ -204,7 +204,7 @@ impl Application where S: 'static { /// Register a middleware pub fn middleware(&mut self, mw: T) -> &mut Self - where T: Middleware + 'static + where T: Middleware + 'static { self.parts.as_mut().expect("Use after finish") .middlewares.push(Box::new(mw)); @@ -322,7 +322,8 @@ mod tests { let app = Application::with_state("/", 10) .resource("/", |r| r.h(httpcodes::HTTPOk)) .finish(); + let req = HttpRequest::default().with_state(Rc::clone(&app.state), app.router.clone()); assert_eq!( - app.run(HttpRequest::default()).msg().unwrap().status(), StatusCode::OK); + app.run(req).msg().unwrap().status(), StatusCode::OK); } } diff --git a/src/channel.rs b/src/channel.rs index c649d8c46..bf7e24a96 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -8,20 +8,31 @@ use tokio_io::{AsyncRead, AsyncWrite}; use h1; use h2; -use pipeline::Pipeline; +use error::Error; +use h1writer::Writer; use httprequest::HttpRequest; use server::ServerSettings; /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { + /// Handle request - fn handle(&self, req: HttpRequest) -> Result; + fn handle(&self, req: HttpRequest) -> Result, HttpRequest>; /// Set server settings fn server_settings(&mut self, settings: ServerSettings) {} } +pub trait HttpHandlerTask { + + fn poll_io(&mut self, io: &mut Writer) -> Poll; + + fn poll(&mut self) -> Poll<(), Error>; + + fn disconnected(&mut self); +} + /// Conversion helper trait pub trait IntoHttpHandler { /// The associated type which is result of conversion. @@ -40,7 +51,7 @@ impl IntoHttpHandler for T { } enum HttpProtocol - where T: AsyncRead + AsyncWrite + 'static, H: 'static + where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { H1(h1::Http1), H2(h2::Http2), @@ -48,7 +59,7 @@ enum HttpProtocol #[doc(hidden)] pub struct HttpChannel - where T: AsyncRead + AsyncWrite + 'static, H: 'static + where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { proto: Option>, } diff --git a/src/encoding.rs b/src/encoding.rs index 2768dfd18..1c8c88272 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -15,7 +15,7 @@ use bytes::{Bytes, BytesMut, BufMut, Writer}; use body::{Body, Binary}; use error::PayloadError; -use httprequest::HttpRequest; +use httprequest::HttpMessage; use httpresponse::HttpResponse; use payload::{PayloadSender, PayloadWriter}; @@ -336,8 +336,8 @@ impl Default for PayloadEncoder { impl PayloadEncoder { - pub fn new(req: &HttpRequest, resp: &mut HttpResponse) -> PayloadEncoder { - let version = resp.version().unwrap_or_else(|| req.version()); + pub fn new(req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder { + let version = resp.version().unwrap_or_else(|| req.version); let mut body = resp.replace_body(Body::Empty); let has_body = match body { Body::Empty => false, @@ -350,7 +350,7 @@ impl PayloadEncoder { let encoding = match *resp.content_encoding() { ContentEncoding::Auto => { // negotiate content-encoding - if let Some(val) = req.headers().get(ACCEPT_ENCODING) { + if let Some(val) = req.headers.get(ACCEPT_ENCODING) { if let Ok(enc) = val.to_str() { AcceptEncoding::parse(enc) } else { diff --git a/src/error.rs b/src/error.rs index 33cb25bab..e79b12939 100644 --- a/src/error.rs +++ b/src/error.rs @@ -491,12 +491,12 @@ pub struct ErrorNotFound(pub T); ERROR_WRAP!(ErrorNotFound, StatusCode::NOT_FOUND); #[derive(Debug)] -/// Helper type that can wrap any error and generate *METHOD_NOT_ALLOWED* response. +/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response. pub struct ErrorMethodNotAllowed(pub T); ERROR_WRAP!(ErrorMethodNotAllowed, StatusCode::METHOD_NOT_ALLOWED); #[derive(Debug)] -/// Helper type that can wrap any error and generate *REQUEST_TIMEOUT* response. +/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response. pub struct ErrorRequestTimeout(pub T); ERROR_WRAP!(ErrorRequestTimeout, StatusCode::REQUEST_TIMEOUT); @@ -511,17 +511,17 @@ pub struct ErrorGone(pub T); ERROR_WRAP!(ErrorGone, StatusCode::GONE); #[derive(Debug)] -/// Helper type that can wrap any error and generate *PRECONDITION_FAILED* response. +/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response. pub struct ErrorPreconditionFailed(pub T); ERROR_WRAP!(ErrorPreconditionFailed, StatusCode::PRECONDITION_FAILED); #[derive(Debug)] -/// Helper type that can wrap any error and generate *EXPECTATION_FAILED* response. +/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response. pub struct ErrorExpectationFailed(pub T); ERROR_WRAP!(ErrorExpectationFailed, StatusCode::EXPECTATION_FAILED); #[derive(Debug)] -/// Helper type that can wrap any error and generate *INTERNAL_SERVER_ERROR* response. +/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response. pub struct ErrorInternalServerError(pub T); ERROR_WRAP!(ErrorInternalServerError, StatusCode::INTERNAL_SERVER_ERROR); diff --git a/src/h1.rs b/src/h1.rs index 9b4587d87..5a46a3bb4 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -15,7 +15,7 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use encoding::PayloadType; -use channel::HttpHandler; +use channel::{HttpHandler, HttpHandlerTask}; use h1writer::H1Writer; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; @@ -69,7 +69,7 @@ pub(crate) struct Http1 { } struct Entry { - pipe: Pipeline, + pipe: Box, flags: EntryFlags, } diff --git a/src/h1writer.rs b/src/h1writer.rs index 8776b4f4f..58b6da58c 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -8,7 +8,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, DATE}; use date; use body::Body; use encoding::PayloadEncoder; -use httprequest::HttpRequest; +use httprequest::HttpMessage; use httpresponse::HttpResponse; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -16,16 +16,16 @@ const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k #[derive(Debug)] -pub(crate) enum WriterState { +pub enum WriterState { Done, Pause, } /// Send stream -pub(crate) trait Writer { +pub trait Writer { fn written(&self) -> u64; - fn start(&mut self, req: &mut HttpRequest, resp: &mut HttpResponse) + fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> Result; fn write(&mut self, payload: &[u8]) -> Result; @@ -116,7 +116,7 @@ impl Writer for H1Writer { } } - fn start(&mut self, req: &mut HttpRequest, msg: &mut HttpResponse) + fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> Result { trace!("Prepare response with status: {:?}", msg.status()); @@ -129,7 +129,7 @@ impl Writer for H1Writer { } // Connection upgrade - let version = msg.version().unwrap_or_else(|| req.version()); + let version = msg.version().unwrap_or_else(|| req.version); if msg.upgrade() { msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade")); } diff --git a/src/h2.rs b/src/h2.rs index 264fb4629..d3f357aef 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -16,7 +16,7 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use h2writer::H2Writer; -use channel::HttpHandler; +use channel::{HttpHandler, HttpHandlerTask}; use error::PayloadError; use encoding::PayloadType; use httpcodes::HTTPNotFound; @@ -217,7 +217,7 @@ bitflags! { } struct Entry { - task: Pipeline, + task: Box, payload: PayloadType, recv: RecvStream, stream: H2Writer, diff --git a/src/h2writer.rs b/src/h2writer.rs index 062c69e4e..f2c07d651 100644 --- a/src/h2writer.rs +++ b/src/h2writer.rs @@ -9,7 +9,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, TRANSFER_ENCODING, DAT use date; use body::Body; use encoding::PayloadEncoder; -use httprequest::HttpRequest; +use httprequest::HttpMessage; use httpresponse::HttpResponse; use h1writer::{Writer, WriterState}; @@ -108,7 +108,7 @@ impl Writer for H2Writer { self.written } - fn start(&mut self, req: &mut HttpRequest, msg: &mut HttpResponse) + fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> Result { trace!("Prepare response with status: {:?}", msg.status()); diff --git a/src/httprequest.rs b/src/httprequest.rs index d6d0f43e4..092299594 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -19,18 +19,17 @@ use error::{ParseError, PayloadError, UrlGenerationError, MultipartError, CookieParseError, HttpRangeError, UrlencodedError}; -struct HttpMessage { - version: Version, - method: Method, - uri: Uri, - headers: HeaderMap, - extensions: Extensions, - params: Params<'static>, - cookies: Option>>, - addr: Option, - payload: Payload, - info: Option>, - +pub struct HttpMessage { + pub version: Version, + pub method: Method, + pub uri: Uri, + pub headers: HeaderMap, + pub extensions: Extensions, + pub params: Params<'static>, + pub cookies: Option>>, + pub addr: Option, + pub payload: Payload, + pub info: Option>, } impl Default for HttpMessage { @@ -51,6 +50,27 @@ impl Default for HttpMessage { } } +impl HttpMessage { + + /// Checks if a connection should be kept alive. + pub fn keep_alive(&self) -> bool { + if let Some(conn) = self.headers.get(header::CONNECTION) { + if let Ok(conn) = conn.to_str() { + if self.version == Version::HTTP_10 && conn.contains("keep-alive") { + true + } else { + self.version == Version::HTTP_11 && + !(conn.contains("close") || conn.contains("upgrade")) + } + } else { + false + } + } else { + self.version != Version::HTTP_10 + } + } +} + /// An HTTP Request pub struct HttpRequest(Rc, Rc, Option>); @@ -101,6 +121,10 @@ impl HttpRequest { unsafe{mem::transmute(r)} } + pub(crate) fn get_inner(&mut self) -> &mut HttpMessage { + self.as_mut() + } + /// Shared application state #[inline] pub fn state(&self) -> &S { diff --git a/src/middlewares/defaultheaders.rs b/src/middlewares/defaultheaders.rs index 5f4dd8bcd..e1a797a23 100644 --- a/src/middlewares/defaultheaders.rs +++ b/src/middlewares/defaultheaders.rs @@ -35,9 +35,9 @@ impl DefaultHeaders { } } -impl Middleware for DefaultHeaders { +impl Middleware for DefaultHeaders { - fn response(&self, _: &mut HttpRequest, mut resp: HttpResponse) -> Response { + fn response(&self, _: &mut HttpRequest, mut resp: HttpResponse) -> Response { for (key, value) in self.0.iter() { if !resp.headers().contains_key(key) { resp.headers_mut().insert(key, value.clone()); diff --git a/src/middlewares/logger.rs b/src/middlewares/logger.rs index 5e443fce6..67e5c3ffa 100644 --- a/src/middlewares/logger.rs +++ b/src/middlewares/logger.rs @@ -86,7 +86,7 @@ struct StartTime(time::Tm); impl Logger { - fn log(&self, req: &mut HttpRequest, resp: &HttpResponse) { + fn log(&self, req: &mut HttpRequest, resp: &HttpResponse) { let entry_time = req.extensions().get::().unwrap().0; let render = |fmt: &mut Formatter| { @@ -99,14 +99,14 @@ impl Logger { } } -impl Middleware for Logger { +impl Middleware for Logger { - fn start(&self, req: &mut HttpRequest) -> Started { + fn start(&self, req: &mut HttpRequest) -> Started { req.extensions().insert(StartTime(time::now())); Started::Done } - fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { + fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { self.log(req, resp); Finished::Done } @@ -199,10 +199,10 @@ pub enum FormatText { impl FormatText { - fn render(&self, fmt: &mut Formatter, - req: &HttpRequest, - resp: &HttpResponse, - entry_time: time::Tm) -> Result<(), fmt::Error> + fn render(&self, fmt: &mut Formatter, + req: &HttpRequest, + resp: &HttpResponse, + entry_time: time::Tm) -> Result<(), fmt::Error> { match *self { FormatText::Str(ref string) => fmt.write_str(string), diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index ebe405319..b9798c97b 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -46,22 +46,22 @@ pub enum Finished { /// Middleware definition #[allow(unused_variables)] -pub trait Middleware { +pub trait Middleware { /// Method is called when request is ready. It may return /// future, which should resolve before next middleware get called. - fn start(&self, req: &mut HttpRequest) -> Started { + fn start(&self, req: &mut HttpRequest) -> Started { Started::Done } /// Method is called when handler returns response, /// but before sending http message to peer. - fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response { + fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response { Response::Done(resp) } /// Method is called after body stream get sent to peer. - fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { + fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { Finished::Done } } diff --git a/src/middlewares/session.rs b/src/middlewares/session.rs index f962171a7..a807b0c03 100644 --- a/src/middlewares/session.rs +++ b/src/middlewares/session.rs @@ -3,6 +3,7 @@ use std::any::Any; use std::rc::Rc; use std::sync::Arc; +use std::marker::PhantomData; use std::collections::HashMap; use serde_json; @@ -79,18 +80,18 @@ unsafe impl Send for SessionImplBox {} unsafe impl Sync for SessionImplBox {} /// Session storage middleware -pub struct SessionStorage(T); +pub struct SessionStorage(T, PhantomData); -impl SessionStorage { +impl> SessionStorage { /// Create session storage - pub fn new(backend: T) -> SessionStorage { - SessionStorage(backend) + pub fn new(backend: T) -> SessionStorage { + SessionStorage(backend, PhantomData) } } -impl Middleware for SessionStorage { +impl> Middleware for SessionStorage { - fn start(&self, req: &mut HttpRequest) -> Started { + fn start(&self, req: &mut HttpRequest) -> Started { let mut req = req.clone(); let fut = self.0.from_request(&mut req) @@ -106,7 +107,7 @@ impl Middleware for SessionStorage { Started::Future(Box::new(fut)) } - fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response { + fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response { if let Some(s_box) = req.extensions().remove::>() { s_box.0.write(resp) } else { @@ -133,12 +134,12 @@ pub trait SessionImpl: 'static { /// Session's storage backend trait definition. #[doc(hidden)] -pub trait SessionBackend: Sized + 'static { +pub trait SessionBackend: Sized + 'static { type Session: SessionImpl; type ReadFuture: Future; /// Parse the session from request and load data from a storage backend. - fn from_request(&self, request: &mut HttpRequest) -> Self::ReadFuture; + fn from_request(&self, request: &mut HttpRequest) -> Self::ReadFuture; } /// Dummy session impl, does not do anything @@ -258,7 +259,7 @@ impl CookieSessionInner { Ok(()) } - fn load(&self, req: &mut HttpRequest) -> HashMap { + fn load(&self, req: &mut HttpRequest) -> HashMap { if let Ok(cookies) = req.cookies() { for cookie in cookies { if cookie.name() == self.name { @@ -316,12 +317,12 @@ impl CookieSessionBackend { } } -impl SessionBackend for CookieSessionBackend { +impl SessionBackend for CookieSessionBackend { type Session = CookieSession; type ReadFuture = FutureResult; - fn from_request(&self, req: &mut HttpRequest) -> Self::ReadFuture { + fn from_request(&self, req: &mut HttpRequest) -> Self::ReadFuture { let state = self.0.load(req); FutOk( CookieSession { diff --git a/src/pipeline.rs b/src/pipeline.rs index 4c9c79369..06f1e8f01 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -5,6 +5,7 @@ use std::cell::RefCell; use futures::{Async, Poll, Future, Stream}; use futures::task::{Task as FutureTask, current as current_task}; +use channel::HttpHandlerTask; use body::{Body, BodyStream}; use context::{Frame, IoContext}; use error::{Error, UnexpectedTaskFrame}; @@ -14,23 +15,23 @@ use httprequest::HttpRequest; use httpresponse::HttpResponse; use middlewares::{Middleware, Finished, Started, Response}; -type Handler = Fn(HttpRequest) -> Reply; -pub(crate) type PipelineHandler<'a> = &'a Fn(HttpRequest) -> Reply; +type Handler = Fn(HttpRequest) -> Reply; +pub(crate) type PipelineHandler<'a, S> = &'a Fn(HttpRequest) -> Reply; -pub struct Pipeline(PipelineState); +pub struct Pipeline(PipelineState); -enum PipelineState { +enum PipelineState { None, Error, - Starting(StartMiddlewares), - Handler(WaitingResponse), - RunMiddlewares(RunMiddlewares), - Response(ProcessResponse), - Finishing(FinishingMiddlewares), - Completed(Completed), + Starting(StartMiddlewares), + Handler(WaitingResponse), + RunMiddlewares(RunMiddlewares), + Response(ProcessResponse), + Finishing(FinishingMiddlewares), + Completed(Completed), } -impl PipelineState { +impl PipelineState { fn is_done(&self) -> bool { match *self { @@ -71,16 +72,16 @@ impl PipelineState { } } -struct PipelineInfo { - req: HttpRequest, +struct PipelineInfo { + req: HttpRequest, count: usize, - mws: Rc>>, + mws: Rc>>>, context: Option>, error: Option, } -impl PipelineInfo { - fn new(req: HttpRequest) -> PipelineInfo { +impl PipelineInfo { + fn new(req: HttpRequest) -> PipelineInfo { PipelineInfo { req: req, count: 0, @@ -91,7 +92,7 @@ impl PipelineInfo { } #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] - fn req_mut(&self) -> &mut HttpRequest { + fn req_mut(&self) -> &mut HttpRequest { #[allow(mutable_transmutes)] unsafe{mem::transmute(&self.req)} } @@ -158,25 +159,30 @@ impl Future for DrainFut { } -impl Pipeline { +impl Pipeline { - pub fn new(req: HttpRequest, - mw: Rc>>, - handler: PipelineHandler) -> Pipeline + pub fn new(req: HttpRequest, + mw: Rc>>>, + handler: PipelineHandler) -> Pipeline { Pipeline(StartMiddlewares::init(mw, req, handler)) } +} - pub fn error>(err: R) -> Self { - Pipeline(ProcessResponse::init( - Box::new(PipelineInfo::new(HttpRequest::default())), err.into())) +impl Pipeline<()> { + pub fn error>(err: R) -> Box { + Box::new(Pipeline(ProcessResponse::init( + PipelineInfo::new(HttpRequest::default()), err.into()))) } +} - pub(crate) fn disconnected(&mut self) { +impl HttpHandlerTask for Pipeline { + + fn disconnected(&mut self) { self.0.disconnect() } - pub(crate) fn poll_io(&mut self, io: &mut T) -> Poll { + fn poll_io(&mut self, io: &mut Writer) -> Poll { loop { let state = mem::replace(&mut self.0, PipelineState::None); match state { @@ -256,7 +262,7 @@ impl Pipeline { } } - pub(crate) fn poll(&mut self) -> Poll<(), Error> { + fn poll(&mut self) -> Poll<(), Error> { loop { let state = mem::replace(&mut self.0, PipelineState::None); match state { @@ -327,16 +333,16 @@ impl Pipeline { type Fut = Box, Error=Error>>; /// Middlewares start executor -struct StartMiddlewares { - hnd: *mut Handler, +struct StartMiddlewares { + hnd: *mut Handler, fut: Option, - info: Box, + info: PipelineInfo, } -impl StartMiddlewares { +impl StartMiddlewares { - fn init(mws: Rc>>, - req: HttpRequest, handler: PipelineHandler) -> PipelineState { + fn init(mws: Rc>>>, + req: HttpRequest, handler: PipelineHandler) -> PipelineState { let mut info = PipelineInfo { req: req, count: 0, @@ -351,37 +357,37 @@ impl StartMiddlewares { loop { if info.count == len { let reply = (&*handler)(info.req.clone()); - return WaitingResponse::init(Box::new(info), reply) + return WaitingResponse::init(info, reply) } else { match info.mws[info.count].start(&mut info.req) { Started::Done => info.count += 1, Started::Response(resp) => - return RunMiddlewares::init(Box::new(info), resp), + return RunMiddlewares::init(info, resp), Started::Future(mut fut) => match fut.poll() { Ok(Async::NotReady) => return PipelineState::Starting(StartMiddlewares { hnd: handler as *const _ as *mut _, fut: Some(fut), - info: Box::new(info)}), + info: info}), Ok(Async::Ready(resp)) => { if let Some(resp) = resp { - return RunMiddlewares::init(Box::new(info), resp); + return RunMiddlewares::init(info, resp); } info.count += 1; } Err(err) => - return ProcessResponse::init(Box::new(info), err.into()), + return ProcessResponse::init(info, err.into()), }, Started::Err(err) => - return ProcessResponse::init(Box::new(info), err.into()), + return ProcessResponse::init(info, err.into()), } } } } - fn poll(mut self) -> Result { + fn poll(mut self) -> Result, PipelineState> { let len = self.info.mws.len(); 'outer: loop { match self.fut.as_mut().unwrap().poll() { @@ -421,14 +427,14 @@ impl StartMiddlewares { } // waiting for response -struct WaitingResponse { - info: Box, +struct WaitingResponse { + info: PipelineInfo, stream: PipelineResponse, } -impl WaitingResponse { +impl WaitingResponse { - fn init(info: Box, reply: Reply) -> PipelineState + fn init(info: PipelineInfo, reply: Reply) -> PipelineState { let stream = match reply.into() { ReplyItem::Message(resp) => @@ -443,7 +449,7 @@ impl WaitingResponse { WaitingResponse { info: info, stream: stream }) } - fn poll(mut self) -> Result { + fn poll(mut self) -> Result, PipelineState> { let stream = mem::replace(&mut self.stream, PipelineResponse::None); match stream { @@ -494,15 +500,15 @@ impl WaitingResponse { } /// Middlewares response executor -pub(crate) struct RunMiddlewares { - info: Box, +struct RunMiddlewares { + info: PipelineInfo, curr: usize, fut: Option>>, } -impl RunMiddlewares { +impl RunMiddlewares { - fn init(mut info: Box, mut resp: HttpResponse) -> PipelineState + fn init(mut info: PipelineInfo, mut resp: HttpResponse) -> PipelineState { if info.count == 0 { return ProcessResponse::init(info, resp); @@ -532,7 +538,7 @@ impl RunMiddlewares { } } - fn poll(mut self) -> Result { + fn poll(mut self) -> Result, PipelineState> { let len = self.info.mws.len(); loop { @@ -570,12 +576,12 @@ impl RunMiddlewares { } } -struct ProcessResponse { +struct ProcessResponse { resp: HttpResponse, iostate: IOState, running: RunningState, drain: DrainVec, - info: Box, + info: PipelineInfo, } #[derive(PartialEq)] @@ -625,9 +631,9 @@ impl Drop for DrainVec { } } -impl ProcessResponse { +impl ProcessResponse { - fn init(info: Box, resp: HttpResponse) -> PipelineState + fn init(info: PipelineInfo, resp: HttpResponse) -> PipelineState { PipelineState::Response( ProcessResponse{ resp: resp, @@ -637,14 +643,15 @@ impl ProcessResponse { info: info}) } - fn poll_io(mut self, io: &mut T) -> Result { + fn poll_io(mut self, io: &mut Writer) -> Result, PipelineState> { if self.drain.0.is_empty() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { - let result = match io.start(self.info.req_mut(), &mut self.resp) { + let result = match io.start(self.info.req_mut().get_inner(), + &mut self.resp) { Ok(res) => res, Err(err) => { self.info.error = Some(err.into()); @@ -804,15 +811,15 @@ impl ProcessResponse { } /// Middlewares start executor -struct FinishingMiddlewares { - info: Box, +struct FinishingMiddlewares { + info: PipelineInfo, resp: HttpResponse, fut: Option>>, } -impl FinishingMiddlewares { +impl FinishingMiddlewares { - fn init(info: Box, resp: HttpResponse) -> PipelineState { + fn init(info: PipelineInfo, resp: HttpResponse) -> PipelineState { if info.count == 0 { Completed::init(info) } else { @@ -822,7 +829,7 @@ impl FinishingMiddlewares { } } - fn poll(mut self) -> Result { + fn poll(mut self) -> Result, PipelineState> { loop { // poll latest fut let not_ready = if let Some(ref mut fut) = self.fut { @@ -861,11 +868,11 @@ impl FinishingMiddlewares { } } -struct Completed(Box); +struct Completed(PipelineInfo); -impl Completed { +impl Completed { - fn init(info: Box) -> PipelineState { + fn init(info: PipelineInfo) -> PipelineState { if info.context.is_none() { PipelineState::None } else { @@ -873,7 +880,7 @@ impl Completed { } } - fn poll(mut self) -> Result { + fn poll(mut self) -> Result, PipelineState> { match self.0.poll_context() { Ok(Async::NotReady) => Ok(PipelineState::Completed(self)), Ok(Async::Ready(())) => Ok(PipelineState::None), @@ -890,11 +897,11 @@ mod tests { use tokio_core::reactor::Core; use futures::future::{lazy, result}; - impl PipelineState { + impl PipelineState { fn is_none(&self) -> Option { if let PipelineState::None = *self { Some(true) } else { None } } - fn completed(self) -> Option { + fn completed(self) -> Option> { if let PipelineState::Completed(c) = self { Some(c) } else { None } } } diff --git a/tests/test_server.rs b/tests/test_server.rs index b1715f252..1cc955867 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -53,18 +53,18 @@ struct MiddlewareTest { finish: Arc, } -impl middlewares::Middleware for MiddlewareTest { - fn start(&self, _: &mut HttpRequest) -> middlewares::Started { +impl middlewares::Middleware for MiddlewareTest { + fn start(&self, _: &mut HttpRequest) -> middlewares::Started { self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed); middlewares::Started::Done } - fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response { + fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response { self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed); middlewares::Response::Done(resp) } - fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) -> middlewares::Finished { + fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) -> middlewares::Finished { self.finish.store(self.finish.load(Ordering::Relaxed) + 1, Ordering::Relaxed); middlewares::Finished::Done }