1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 03:21:08 +00:00

refactor middlewares

This commit is contained in:
Nikolay Kim 2017-11-09 22:08:54 -08:00
parent 51cd08ef57
commit 40c1d3b711
12 changed files with 355 additions and 117 deletions

View file

@ -5,6 +5,8 @@
* HTTP/2 Support * HTTP/2 Support
* Asynchronous middlewares
* Content compression/decompression (br, gzip, deflate) * Content compression/decompression (br, gzip, deflate)

View file

@ -49,7 +49,7 @@ fn main() {
HttpServer::new( HttpServer::new(
Application::default("/") Application::default("/")
// enable logger // enable logger
.middleware(Logger::new(None)) .middleware(middlewares::Logger::new(None))
// register simple handle r, handle all methods // register simple handle r, handle all methods
.handler("/index.html", index) .handler("/index.html", index)
// with path parameters // with path parameters

View file

@ -74,7 +74,7 @@ fn main() {
HttpServer::new( HttpServer::new(
Application::builder("/", AppState{counter: Cell::new(0)}) Application::builder("/", AppState{counter: Cell::new(0)})
// enable logger // enable logger
.middleware(Logger::new(None)) .middleware(middlewares::Logger::new(None))
// websocket route // websocket route
.resource("/ws/", |r| r.get::<MyWebSocket>()) .resource("/ws/", |r| r.get::<MyWebSocket>())
// register simple handler, handle all methods // register simple handler, handle all methods

View file

@ -32,7 +32,7 @@ fn main() {
HttpServer::new( HttpServer::new(
Application::default("/") Application::default("/")
// enable logger // enable logger
.middleware(Logger::new(None)) .middleware(middlewares::Logger::new(None))
// register simple handler, handle all methods // register simple handler, handle all methods
.handler("/index.html", index) .handler("/index.html", index)
// with path parameters // with path parameters

View file

@ -73,7 +73,7 @@ fn main() {
HttpServer::new( HttpServer::new(
Application::default("/") Application::default("/")
// enable logger // enable logger
.middleware(Logger::new(None)) .middleware(middlewares::Logger::new(None))
// websocket route // websocket route
.resource("/ws/", |r| r.get::<MyWebSocket>()) .resource("/ws/", |r| r.get::<MyWebSocket>())
.route_handler("/", StaticFiles::new("examples/static/", true))) .route_handler("/", StaticFiles::new("examples/static/", true)))

View file

@ -10,27 +10,9 @@ use recognizer::{RouteRecognizer, check_pattern};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use channel::HttpHandler; use channel::HttpHandler;
use middlewares::Middleware;
/// Middleware definition
#[allow(unused_variables)]
pub trait Middleware {
/// Method is called when request is ready.
fn start(&self, req: &mut HttpRequest) -> Result<(), HttpResponse> {
Ok(())
}
/// Method is called when handler returns response,
/// but before sending body streams to peer.
fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> HttpResponse {
resp
}
/// Http interation is finished
fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) {}
}
/// Application /// Application
pub struct Application<S> { pub struct Application<S> {
state: Rc<S>, state: Rc<S>,
@ -67,19 +49,13 @@ impl<S: 'static> HttpHandler for Application<S> {
} }
fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task { fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task {
// run middlewares
if !self.middlewares.is_empty() {
for middleware in self.middlewares.iter() {
if let Err(resp) = middleware.start(req) {
return Task::reply(resp)
};
}
let mut task = self.run(req, payload); let mut task = self.run(req, payload);
// init middlewares
if !self.middlewares.is_empty() {
task.set_middlewares(Rc::clone(&self.middlewares)); task.set_middlewares(Rc::clone(&self.middlewares));
task
} else {
self.run(req, payload)
} }
task
} }
} }

View file

@ -18,7 +18,7 @@ use route::{Route, Frame};
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
/// Actor execution context /// Http actor execution context
pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route, pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route,
{ {
act: Option<A>, act: Option<A>,

View file

@ -42,7 +42,6 @@ mod date;
mod encoding; mod encoding;
mod httprequest; mod httprequest;
mod httpresponse; mod httpresponse;
mod logger;
mod payload; mod payload;
mod resource; mod resource;
mod recognizer; mod recognizer;
@ -62,17 +61,17 @@ pub mod ws;
pub mod dev; pub mod dev;
pub mod httpcodes; pub mod httpcodes;
pub mod multipart; pub mod multipart;
pub mod middlewares;
pub use encoding::ContentEncoding; pub use encoding::ContentEncoding;
pub use error::ParseError; pub use error::ParseError;
pub use body::{Body, BinaryBody}; pub use body::{Body, BinaryBody};
pub use application::{Application, ApplicationBuilder, Middleware}; pub use application::{Application, ApplicationBuilder};
pub use httprequest::{HttpRequest, UrlEncoded}; pub use httprequest::{HttpRequest, UrlEncoded};
pub use httpresponse::{HttpResponse, HttpResponseBuilder}; pub use httpresponse::{HttpResponse, HttpResponseBuilder};
pub use payload::{Payload, PayloadItem, PayloadError}; pub use payload::{Payload, PayloadItem, PayloadError};
pub use route::{Frame, Route, RouteFactory, RouteHandler, RouteResult}; pub use route::{Frame, Route, RouteFactory, RouteHandler, RouteResult};
pub use resource::{Reply, Resource, HandlerResult}; pub use resource::{Reply, Resource, HandlerResult};
pub use recognizer::{Params, RouteRecognizer}; pub use recognizer::{Params, RouteRecognizer};
pub use logger::Logger;
pub use server::HttpServer; pub use server::HttpServer;
pub use context::HttpContext; pub use context::HttpContext;
pub use channel::HttpChannel; pub use channel::HttpChannel;

View file

@ -6,9 +6,9 @@ use std::fmt::{Display, Formatter};
use time; use time;
use application::Middleware;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use middlewares::{Middleware, Started, Finished};
/// `Middleware` for logging request and response info to the terminal. /// `Middleware` for logging request and response info to the terminal.
pub struct Logger { pub struct Logger {
@ -37,16 +37,13 @@ impl Logger {
struct StartTime(time::Tm); struct StartTime(time::Tm);
impl Logger { impl Logger {
fn initialise(&self, req: &mut HttpRequest) {
req.extensions().insert(StartTime(time::now()));
}
fn log(&self, req: &mut HttpRequest, resp: &HttpResponse) { fn log(&self, req: &mut HttpRequest, resp: &HttpResponse) {
let entry_time = req.extensions().get::<StartTime>().unwrap().0; let entry_time = req.extensions().get::<StartTime>().unwrap().0;
let response_time = time::now() - entry_time; let response_time = time::now() - entry_time;
let response_time_ms = (response_time.num_seconds() * 1000) as f64 + (response_time.num_nanoseconds().unwrap_or(0) as f64) / 1000000.0; let response_time_ms = (response_time.num_seconds() * 1000) as f64 +
(response_time.num_nanoseconds().unwrap_or(0) as f64) / 1000000000.0;
{ {
let render = |fmt: &mut Formatter, text: &FormatText| { let render = |fmt: &mut Formatter, text: &FormatText| {
match *text { match *text {
@ -61,7 +58,7 @@ impl Logger {
}, },
FormatText::Status => resp.status().fmt(fmt), FormatText::Status => resp.status().fmt(fmt),
FormatText::ResponseTime => FormatText::ResponseTime =>
fmt.write_fmt(format_args!("{} ms", response_time_ms)), fmt.write_fmt(format_args!("{} sec", response_time_ms)),
FormatText::RemoteAddr => Ok(()), //req.remote_addr.fmt(fmt), FormatText::RemoteAddr => Ok(()), //req.remote_addr.fmt(fmt),
FormatText::RequestTime => { FormatText::RequestTime => {
entry_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ%z") entry_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ%z")
@ -77,13 +74,15 @@ impl Logger {
} }
impl Middleware for Logger { impl Middleware for Logger {
fn start(&self, req: &mut HttpRequest) -> Result<(), HttpResponse> {
self.initialise(req); fn start(&self, req: &mut HttpRequest) -> Started {
Ok(()) req.extensions().insert(StartTime(time::now()));
Started::Done
} }
fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) { fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished {
self.log(req, resp); self.log(req, resp);
Finished::Done
} }
} }

247
src/middlewares/mod.rs Normal file
View file

@ -0,0 +1,247 @@
//! Middlewares
use std::rc::Rc;
use std::error::Error;
use futures::{Async, Future, Poll};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
mod logger;
pub use self::logger::Logger;
/// Middleware start result
pub enum Started {
/// Execution completed
Done,
/// New http response got generated. If middleware generates response
/// handler execution halts.
Response(HttpResponse),
/// Execution completed, but run future to completion.
Future(Box<Future<Item=(), Error=HttpResponse>>),
}
/// Middleware execution result
pub enum Response {
/// New http response got generated
Response(HttpResponse),
/// Result is a future that resolves to a new http response
Future(Box<Future<Item=HttpResponse, Error=HttpResponse>>),
}
/// Middleware finish result
pub enum Finished {
/// Execution completed
Done,
/// Execution completed, but run future to completion
Future(Box<Future<Item=(), Error=Box<Error>>>),
}
/// Middleware definition
#[allow(unused_variables)]
pub trait Middleware {
/// Method is called when request is ready. It may return
/// future, which should resolve before next middleware get called.
fn start(&self, req: &mut HttpRequest) -> Started {
Started::Done
}
/// Method is called when handler returns response,
/// but before sending body stream to peer.
fn response(&self, req: &mut HttpRequest, resp: HttpResponse) -> Response {
Response::Response(resp)
}
/// Method is called after http response get sent to peer.
fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished {
Finished::Done
}
}
/// Middlewares executor
pub(crate) struct MiddlewaresExecutor {
state: ExecutorState,
fut: Option<Box<Future<Item=HttpResponse, Error=HttpResponse>>>,
started: Option<Box<Future<Item=(), Error=HttpResponse>>>,
finished: Option<Box<Future<Item=(), Error=Box<Error>>>>,
middlewares: Option<Rc<Vec<Box<Middleware>>>>,
}
enum ExecutorState {
None,
Starting(usize),
Started(usize),
Processing(usize, usize),
Finishing(usize),
}
impl Default for MiddlewaresExecutor {
fn default() -> MiddlewaresExecutor {
MiddlewaresExecutor {
fut: None,
started: None,
finished: None,
state: ExecutorState::None,
middlewares: None,
}
}
}
impl MiddlewaresExecutor {
pub fn start(&mut self, mw: Rc<Vec<Box<Middleware>>>) {
self.state = ExecutorState::Starting(0);
self.middlewares = Some(mw);
}
pub fn starting(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
if let ExecutorState::Starting(mut idx) = *state {
loop {
// poll latest fut
if let Some(ref mut fut) = self.started {
match fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => idx += 1,
Err(response) => {
*state = ExecutorState::Started(idx);
return Ok(Async::Ready(Some(response)))
}
}
}
self.started = None;
if idx >= middlewares.len() {
*state = ExecutorState::Started(idx-1);
return Ok(Async::Ready(None))
} else {
match middlewares[idx].start(req) {
Started::Done => idx += 1,
Started::Response(resp) => {
*state = ExecutorState::Started(idx);
return Ok(Async::Ready(Some(resp)))
},
Started::Future(fut) => {
self.started = Some(fut);
},
}
}
}
}
}
Ok(Async::Ready(None))
}
pub fn processing(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
match *state {
ExecutorState::Processing(mut idx, total) => {
loop {
// poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(response)) | Err(response) => {
idx += 1;
response
}
};
self.fut = None;
loop {
if idx == 0 {
*state = ExecutorState::Finishing(total);
return Ok(Async::Ready(Some(resp)))
} else {
match middlewares[idx].response(req, resp) {
Response::Response(r) => {
idx -= 1;
resp = r
},
Response::Future(fut) => {
self.fut = Some(fut);
break
},
}
}
}
}
}
_ => Ok(Async::Ready(None))
}
} else {
Ok(Async::Ready(None))
}
}
pub fn finishing(&mut self, req: &mut HttpRequest, resp: &HttpResponse) -> Poll<(), ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
if let ExecutorState::Finishing(mut idx) = *state {
loop {
// poll latest fut
if let Some(ref mut fut) = self.finished {
match fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => idx -= 1,
Err(err) => {
error!("Middleware finish error: {}", err);
}
}
}
self.finished = None;
match middlewares[idx].finish(req, resp) {
Finished::Done => {
if idx == 0 {
return Ok(Async::Ready(()))
} else {
idx -= 1
}
}
Finished::Future(fut) => {
self.finished = Some(fut);
},
}
}
}
}
Ok(Async::Ready(()))
}
pub fn response(&mut self, req: &mut HttpRequest, resp: HttpResponse)
-> Option<HttpResponse>
{
if let Some(ref middlewares) = self.middlewares {
let mut resp = resp;
let state = &mut self.state;
match *state {
ExecutorState::Started(mut idx) => {
let total = idx;
loop {
resp = match middlewares[idx].response(req, resp) {
Response::Response(r) => {
if idx == 0 {
*state = ExecutorState::Finishing(total);
return Some(r)
} else {
idx -= 1;
r
}
},
Response::Future(fut) => {
*state = ExecutorState::Processing(idx, total);
self.fut = Some(fut);
return None
},
};
}
}
_ => Some(resp)
}
} else {
Some(resp)
}
}
}

View file

@ -8,7 +8,7 @@ use futures::task::{Task as FutureTask, current as current_task};
use h1writer::{Writer, WriterState}; use h1writer::{Writer, WriterState};
use route::Frame; use route::Frame;
use application::Middleware; use middlewares::{Middleware, MiddlewaresExecutor};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -109,7 +109,7 @@ pub struct Task {
drain: Vec<Rc<RefCell<DrainFut>>>, drain: Vec<Rc<RefCell<DrainFut>>>,
prepared: Option<HttpResponse>, prepared: Option<HttpResponse>,
disconnected: bool, disconnected: bool,
middlewares: Option<Rc<Vec<Box<Middleware>>>>, middlewares: MiddlewaresExecutor,
} }
impl Task { impl Task {
@ -119,49 +119,42 @@ impl Task {
frames.push_back(Frame::Message(response.into())); frames.push_back(Frame::Message(response.into()));
frames.push_back(Frame::Payload(None)); frames.push_back(Frame::Payload(None));
Task { Task { state: TaskRunningState::Running,
state: TaskRunningState::Running,
iostate: TaskIOState::Done, iostate: TaskIOState::Done,
frames: frames, frames: frames,
drain: Vec::new(), drain: Vec::new(),
stream: TaskStream::None, stream: TaskStream::None,
prepared: None, prepared: None,
disconnected: false, disconnected: false,
middlewares: None, middlewares: MiddlewaresExecutor::default() }
}
} }
pub(crate) fn with_stream<S>(stream: S) -> Self pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self {
where S: Stream<Item=Frame, Error=io::Error> + 'static Task { state: TaskRunningState::Running,
{
Task {
state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
stream: TaskStream::Stream(Box::new(stream)),
drain: Vec::new(),
prepared: None,
disconnected: false,
middlewares: None,
}
}
pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self
{
Task {
state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage, iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(), frames: VecDeque::new(),
stream: TaskStream::Context(Box::new(ctx)), stream: TaskStream::Context(Box::new(ctx)),
drain: Vec::new(), drain: Vec::new(),
prepared: None, prepared: None,
disconnected: false, disconnected: false,
middlewares: None, middlewares: MiddlewaresExecutor::default() }
} }
pub(crate) fn with_stream<S>(stream: S) -> Self
where S: Stream<Item=Frame, Error=io::Error> + 'static
{
Task { state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
stream: TaskStream::Stream(Box::new(stream)),
drain: Vec::new(),
prepared: None,
disconnected: false,
middlewares: MiddlewaresExecutor::default() }
} }
pub(crate) fn set_middlewares(&mut self, middlewares: Rc<Vec<Box<Middleware>>>) { pub(crate) fn set_middlewares(&mut self, middlewares: Rc<Vec<Box<Middleware>>>) {
self.middlewares = Some(middlewares); self.middlewares.start(middlewares)
} }
pub(crate) fn disconnected(&mut self) { pub(crate) fn disconnected(&mut self) {
@ -175,6 +168,17 @@ impl Task {
where T: Writer where T: Writer
{ {
trace!("POLL-IO frames:{:?}", self.frames.len()); trace!("POLL-IO frames:{:?}", self.frames.len());
// start middlewares
match self.middlewares.starting(req) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) | Err(_) => (),
Ok(Async::Ready(Some(response))) => {
self.frames.clear();
self.frames.push_front(Frame::Message(response));
},
}
// response is completed // response is completed
if self.frames.is_empty() && self.iostate.is_done() { if self.frames.is_empty() && self.iostate.is_done() {
return Ok(Async::Ready(self.state.is_done())); return Ok(Async::Ready(self.state.is_done()));
@ -190,29 +194,40 @@ impl Task {
} }
} }
// process middlewares response
match self.middlewares.processing(req) {
Err(_) => return Err(()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => (),
Ok(Async::Ready(Some(mut response))) => {
let result = io.start(req, &mut response);
self.prepared = Some(response);
match result {
Ok(WriterState::Pause) => {
self.state.pause();
}
Ok(WriterState::Done) => self.state.resume(),
Err(_) => return Err(())
}
},
}
// if task is paused, write buffer probably is full // if task is paused, write buffer probably is full
if self.state != TaskRunningState::Paused { if self.state != TaskRunningState::Paused {
// process exiting frames // process exiting frames
while let Some(frame) = self.frames.pop_front() { while let Some(frame) = self.frames.pop_front() {
trace!("IO Frame: {:?}", frame); trace!("IO Frame: {:?}", frame);
let res = match frame { let res = match frame {
Frame::Message(response) => { Frame::Message(resp) => {
// run middlewares // run middlewares
let mut response = if let Some(mut resp) = self.middlewares.response(req, resp) {
if let Some(middlewares) = self.middlewares.take() { let result = io.start(req, &mut resp);
let mut response = response; self.prepared = Some(resp);
for middleware in middlewares.iter() {
response = middleware.response(req, response);
}
self.middlewares = Some(middlewares);
response
} else {
response
};
let result = io.start(req, &mut response);
self.prepared = Some(response);
result result
} else {
// middlewares need to run some futures
return self.poll_io(io, req)
}
} }
Frame::Payload(Some(chunk)) => { Frame::Payload(Some(chunk)) => {
io.write(chunk.as_ref()) io.write(chunk.as_ref())
@ -251,7 +266,7 @@ impl Task {
} }
} }
// drain // drain futures
if !self.drain.is_empty() { if !self.drain.is_empty() {
for fut in &mut self.drain { for fut in &mut self.drain {
fut.borrow_mut().set() fut.borrow_mut().set()
@ -261,12 +276,11 @@ impl Task {
// response is completed // response is completed
if self.iostate.is_done() { if self.iostate.is_done() {
// run middlewares // finish middlewares
if let Some(ref mut resp) = self.prepared { if let Some(ref resp) = self.prepared {
if let Some(middlewares) = self.middlewares.take() { match self.middlewares.finishing(req, resp) {
for middleware in middlewares.iter() { Ok(Async::NotReady) => return Ok(Async::NotReady),
middleware.finish(req, resp); _ => (),
}
} }
} }
Ok(Async::Ready(self.state.is_done())) Ok(Async::Ready(self.state.is_done()))
@ -276,8 +290,7 @@ impl Task {
} }
fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), ()> fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), ()>
where S: Stream<Item=Frame, Error=io::Error> where S: Stream<Item=Frame, Error=io::Error> {
{
loop { loop {
match stream.poll() { match stream.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {

View file

@ -11,6 +11,7 @@ use tokio_core::net::TcpListener;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
fn create_server<T, A>() -> HttpServer<T, A, Application<()>> { fn create_server<T, A>() -> HttpServer<T, A, Application<()>> {
HttpServer::new( HttpServer::new(
vec![Application::default("/") vec![Application::default("/")
@ -59,19 +60,20 @@ struct MiddlewareTest {
finish: Arc<AtomicUsize>, finish: Arc<AtomicUsize>,
} }
impl Middleware for MiddlewareTest { impl middlewares::Middleware for MiddlewareTest {
fn start(&self, _: &mut HttpRequest) -> Result<(), HttpResponse> { fn start(&self, _: &mut HttpRequest) -> middlewares::Started {
self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed); self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
Ok(()) middlewares::Started::Done
} }
fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> HttpResponse { fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response {
self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed); self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
resp middlewares::Response::Response(resp)
} }
fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) { fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) -> middlewares::Finished {
self.finish.store(self.finish.load(Ordering::Relaxed) + 1, Ordering::Relaxed); self.finish.store(self.finish.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
middlewares::Finished::Done
} }
} }