1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-25 19:11:10 +00:00

refactor reply handling

This commit is contained in:
Nikolay Kim 2017-11-28 19:49:17 -08:00
parent 6f5b58b691
commit afeecea05f
19 changed files with 167 additions and 144 deletions

View file

@ -84,7 +84,7 @@ impl Actor for MyWebSocket {
impl Route for MyWebSocket {
type State = ();
fn request(mut req: HttpRequest, ctx: &mut HttpContext<Self>) -> RouteResult<Self>
fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
{
// websocket handshake
let resp = ws::handshake(&req)?;
@ -92,7 +92,7 @@ impl Route for MyWebSocket {
ctx.start(resp);
// convert bytes stream to a stream of `ws::Message` and handle stream
ctx.add_stream(ws::WsStream::new(&mut req));
Reply::async(MyWebSocket)
ctx.reply(MyWebSocket)
}
}

View file

@ -7,7 +7,6 @@ extern crate env_logger;
extern crate futures;
use actix_web::*;
use actix_web::error::{Error, Result};
use actix_web::middlewares::RequestSession;
use futures::stream::{once, Once};

View file

@ -37,12 +37,12 @@ impl Route for MyWebSocket {
/// Shared application state
type State = AppState;
fn request(mut req: HttpRequest<AppState>, ctx: &mut HttpContext<Self>) -> RouteResult<Self>
fn request(mut req: HttpRequest<AppState>, mut ctx: HttpContext<Self>) -> Result<Reply>
{
let resp = ws::handshake(&req)?;
ctx.start(resp);
ctx.add_stream(ws::WsStream::new(&mut req));
Reply::async(MyWebSocket{counter: 0})
ctx.reply(MyWebSocket{counter: 0})
}
}

View file

@ -22,7 +22,7 @@ impl Actor for MyWebSocket {
impl Route for MyWebSocket {
type State = ();
fn request(mut req: HttpRequest, ctx: &mut HttpContext<Self>) -> RouteResult<Self>
fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
{
// websocket handshake
let resp = ws::handshake(&req)?;
@ -30,7 +30,7 @@ impl Route for MyWebSocket {
ctx.start(resp);
// convert bytes stream to a stream of `ws::Message` and register it
ctx.add_stream(ws::WsStream::new(&mut req));
Reply::async(MyWebSocket)
ctx.reply(MyWebSocket)
}
}

View file

@ -31,7 +31,7 @@ and returns a type that can be converted into `HttpResponse`:
```rust,ignore
extern crate actix_web;
use actix_web::prelude::*;
use actix_web::*;
fn index(req: HttpRequest) -> &'static str {
"Hello world!"
@ -62,7 +62,7 @@ Here is full source of main.rs file:
```rust
extern crate actix;
extern crate actix_web;
use actix_web::prelude::*;
use actix_web::*;
fn index(req: HttpRequest) -> &'static str {
"Hello world!"

View file

@ -40,7 +40,7 @@ extern crate actix;
extern crate actix_web;
use std::cell::Cell;
use actix_web::prelude::*;
use actix_web::*;
// This struct represents state
struct AppState {

View file

@ -24,21 +24,22 @@ pub struct Application<S> {
impl<S: 'static> Application<S> {
fn run(&self, req: HttpRequest) -> Task {
fn run(&self, req: HttpRequest, task: &mut Task) {
let mut req = req.with_state(Rc::clone(&self.state));
if let Some((params, h)) = self.router.recognize(req.path()) {
if let Some(params) = params {
req.set_match_info(params);
}
h.handle(req)
h.handle(req, task)
} else {
for (prefix, handler) in &self.handlers {
if req.path().starts_with(prefix) {
return handler.handle(req)
handler.handle(req, task);
return
}
}
self.default.handle(req)
self.default.handle(req, task)
}
}
}
@ -50,7 +51,8 @@ impl<S: 'static> HttpHandler for Application<S> {
}
fn handle(&self, req: HttpRequest) -> Pipeline {
Pipeline::new(req, Rc::clone(&self.middlewares), &|req: HttpRequest| {self.run(req)})
Pipeline::new(req, Rc::clone(&self.middlewares),
&|req: HttpRequest, task: &mut Task| {self.run(req, task)})
}
}
@ -140,7 +142,7 @@ impl<S> ApplicationBuilder<S> where S: 'static {
/// impl Route for MyRoute {
/// type State = ();
///
/// fn request(req: HttpRequest, ctx: &mut HttpContext<Self>) -> RouteResult<Self> {
/// fn request(req: HttpRequest, ctx: HttpContext<Self>) -> Result<Reply> {
/// Reply::reply(httpcodes::HTTPOk)
/// }
/// }

View file

@ -14,8 +14,8 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
use task::{IoContext, DrainFut};
use body::Binary;
use error::Error;
use route::{Route, Frame};
use error::{Error, Result as ActixResult};
use route::{Route, Frame, Reply};
use httpresponse::HttpResponse;
@ -158,6 +158,11 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
pub fn connected(&self) -> bool {
!self.disconnected
}
pub fn reply(mut self, actor: A) -> ActixResult<Reply> {
self.set_actor(actor);
Reply::async(self)
}
}
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {

View file

@ -8,11 +8,13 @@
//! use actix_web::dev::*;
//! ```
pub use super::*;
// dev specific
pub use task::Task;
pub use pipeline::Pipeline;
pub use route::RouteFactory;
pub use recognizer::RouteRecognizer;
pub use channel::HttpChannel;
pub use application::ApplicationBuilder;
pub use httpresponse::HttpResponseBuilder;
pub use cookie::CookieBuilder;

View file

@ -69,8 +69,8 @@ impl StaticResponse {
}
impl<S> RouteHandler<S> for StaticResponse {
fn handle(&self, _: HttpRequest<S>) -> Task {
Task::reply(HttpResponse::new(self.0, Body::Empty))
fn handle(&self, _: HttpRequest<S>, task: &mut Task) {
task.reply(HttpResponse::new(self.0, Body::Empty))
}
}

View file

@ -71,19 +71,19 @@ mod h2writer;
pub mod ws;
pub mod dev;
pub mod prelude;
pub mod error;
pub mod httpcodes;
pub mod multipart;
pub mod middlewares;
pub use error::{Error, Result};
pub use encoding::ContentEncoding;
pub use body::{Body, Binary};
pub use application::Application;
pub use httprequest::{HttpRequest, UrlEncoded};
pub use httpresponse::HttpResponse;
pub use payload::{Payload, PayloadItem};
pub use route::{Frame, Route, RouteFactory, RouteHandler, RouteResult};
pub use resource::{Reply, Resource};
pub use route::{Frame, Route, RouteFactory, RouteHandler, Reply};
pub use resource::Resource;
pub use recognizer::Params;
pub use server::HttpServer;
pub use context::HttpContext;

View file

@ -10,8 +10,8 @@ use h1writer::Writer;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
type Handler = Fn(HttpRequest) -> Task;
pub(crate) type PipelineHandler<'a> = &'a Fn(HttpRequest) -> Task;
type Handler = Fn(HttpRequest, &mut Task);
pub(crate) type PipelineHandler<'a> = &'a Fn(HttpRequest, &mut Task);
pub struct Pipeline(PipelineState);
@ -29,7 +29,8 @@ impl Pipeline {
pub fn new(req: HttpRequest, mw: Rc<Vec<Box<Middleware>>>, handler: PipelineHandler) -> Pipeline
{
if mw.is_empty() {
let task = (handler)(req.clone());
let mut task = Task::default();
(handler)(req.clone(), &mut task);
Pipeline(PipelineState::Task(Box::new((task, req))))
} else {
match Start::init(mw, req, handler) {
@ -39,13 +40,14 @@ impl Pipeline {
Pipeline(PipelineState::Starting(res)),
Err(err) =>
Pipeline(PipelineState::Error(
Box::new((Task::reply(err), HttpRequest::default()))))
Box::new((Task::from_error(err), HttpRequest::default()))))
}
}
}
pub fn error<R: Into<HttpResponse>>(resp: R) -> Self {
Pipeline(PipelineState::Error(Box::new((Task::reply(resp), HttpRequest::default()))))
Pipeline(PipelineState::Error(
Box::new((Task::from_response(resp), HttpRequest::default()))))
}
pub(crate) fn disconnected(&mut self) {
@ -79,7 +81,7 @@ impl Pipeline {
self.0 = PipelineState::Handle(h),
Err(err) =>
self.0 = PipelineState::Error(
Box::new((Task::reply(err), HttpRequest::default())))
Box::new((Task::from_error(err), HttpRequest::default())))
}
}
PipelineState::Handle(mut st) => {
@ -193,7 +195,8 @@ impl Start {
let len = self.middlewares.len();
loop {
if self.idx == len {
let task = (unsafe{&*self.hnd})(self.req.clone());
let mut task = Task::default();
(unsafe{&*self.hnd})(self.req.clone(), &mut task);
return Ok(StartResult::Ready(
Box::new(Handle::new(self.idx-1, self.req.clone(),
self.prepare(task), self.middlewares))))
@ -205,7 +208,7 @@ impl Start {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, self.req.clone(),
self.prepare(Task::reply(resp)), self.middlewares)))),
self.prepare(Task::from_response(resp)), self.middlewares)))),
Started::Future(mut fut) =>
match fut.poll() {
Ok(Async::NotReady) => {
@ -217,7 +220,8 @@ impl Start {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, self.req.clone(),
self.prepare(Task::reply(resp)), self.middlewares))))
self.prepare(Task::from_response(resp)),
self.middlewares))))
}
self.idx += 1;
}
@ -239,10 +243,12 @@ impl Start {
if let Some(resp) = resp {
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, self.req.clone(),
self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares)))))
self.prepare(Task::from_response(resp)),
Rc::clone(&self.middlewares)))))
}
if self.idx == len {
let task = (unsafe{&*self.hnd})(self.req.clone());
let mut task = Task::default();
(unsafe{&*self.hnd})(self.req.clone(), &mut task);
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, self.req.clone(),
self.prepare(task), Rc::clone(&self.middlewares)))))
@ -255,7 +261,7 @@ impl Start {
self.idx += 1;
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, self.req.clone(),
self.prepare(Task::reply(resp)),
self.prepare(Task::from_response(resp)),
Rc::clone(&self.middlewares)))))
},
Started::Future(fut) => {

View file

@ -1,7 +0,0 @@
//! The `actix-web` prelude
pub use super::*;
pub use error::*;
pub use application::ApplicationBuilder;
pub use httpresponse::HttpResponseBuilder;
pub use cookie::CookieBuilder;

View file

@ -7,7 +7,7 @@ use futures::Stream;
use task::Task;
use error::Error;
use route::{Route, RouteHandler, RouteResult, Frame, FnHandler, StreamHandler};
use route::{Route, RouteHandler, Frame, FnHandler, StreamHandler};
use context::HttpContext;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -124,54 +124,11 @@ impl<S> Resource<S> where S: 'static {
impl<S: 'static> RouteHandler<S> for Resource<S> {
fn handle(&self, req: HttpRequest<S>) -> Task {
fn handle(&self, req: HttpRequest<S>, task: &mut Task) {
if let Some(handler) = self.routes.get(req.method()) {
handler.handle(req)
handler.handle(req, task)
} else {
self.default.handle(req)
self.default.handle(req, task)
}
}
}
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
enum ReplyItem<A> where A: Actor + Route {
Message(HttpResponse),
Actor(A),
}
/// Represents response process.
pub struct Reply<A: Actor + Route> (ReplyItem<A>);
impl<A> Reply<A> where A: Actor + Route
{
/// Create async response
pub fn async(act: A) -> RouteResult<A> {
Ok(Reply(ReplyItem::Actor(act)))
}
/// Send response
pub fn reply<R: Into<HttpResponse>>(response: R) -> RouteResult<A> {
Ok(Reply(ReplyItem::Message(response.into())))
}
pub fn into(self, mut ctx: HttpContext<A>) -> Task where A: Actor<Context=HttpContext<A>>
{
match self.0 {
ReplyItem::Message(msg) => {
Task::reply(msg)
},
ReplyItem::Actor(act) => {
ctx.set_actor(act);
Task::with_context(ctx)
}
}
}
}
impl<A, T> From<T> for Reply<A>
where T: Into<HttpResponse>, A: Actor + Route
{
fn from(item: T) -> Self {
Reply(ReplyItem::Message(item.into()))
}
}

View file

@ -6,11 +6,10 @@ use actix::Actor;
use http::{header, Version};
use futures::Stream;
use task::{Task, DrainFut};
use task::{Task, DrainFut, IoContext};
use body::Binary;
use error::{Error, ExpectError};
use error::{Error, ExpectError, Result};
use context::HttpContext;
use resource::Reply;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -32,15 +31,12 @@ impl Frame {
#[allow(unused_variables)]
pub trait RouteHandler<S>: 'static {
/// Handle request
fn handle(&self, req: HttpRequest<S>) -> Task;
fn handle(&self, req: HttpRequest<S>, task: &mut Task);
/// Set route prefix
fn set_prefix(&mut self, prefix: String) {}
}
/// Request handling result.
pub type RouteResult<T> = Result<Reply<T>, Error>;
/// Actors with ability to handle http requests.
#[allow(unused_variables)]
pub trait Route: Actor {
@ -49,7 +45,7 @@ pub trait Route: Actor {
type State;
/// Handle `EXPECT` header. By default respones with `HTTP/1.1 100 Continue`
fn expect(req: &mut HttpRequest<Self::State>, ctx: &mut Self::Context) -> Result<(), Error>
fn expect(req: &mut HttpRequest<Self::State>, ctx: &mut Self::Context) -> Result<()>
where Self: Actor<Context=HttpContext<Self>>
{
// handle expect header only for HTTP/1.1
@ -79,7 +75,7 @@ pub trait Route: Actor {
/// request/response or websocket connection.
/// In that case `HttpContext::start` and `HttpContext::write` has to be used
/// for writing response.
fn request(req: HttpRequest<Self::State>, ctx: &mut Self::Context) -> RouteResult<Self>;
fn request(req: HttpRequest<Self::State>, ctx: Self::Context) -> Result<Reply>;
/// This method creates `RouteFactory` for this actor.
fn factory() -> RouteFactory<Self, Self::State> {
@ -94,18 +90,18 @@ impl<A, S> RouteHandler<S> for RouteFactory<A, S>
where A: Actor<Context=HttpContext<A>> + Route<State=S>,
S: 'static
{
fn handle(&self, mut req: HttpRequest<A::State>) -> Task {
fn handle(&self, mut req: HttpRequest<A::State>, task: &mut Task) {
let mut ctx = HttpContext::new(req.clone_state());
// handle EXPECT header
if req.headers().contains_key(header::EXPECT) {
if let Err(resp) = A::expect(&mut req, &mut ctx) {
return Task::reply(resp)
task.reply(resp)
}
}
match A::request(req, &mut ctx) {
Ok(reply) => reply.into(ctx),
Err(err) => Task::reply(err),
match A::request(req, ctx) {
Ok(reply) => reply.into(task),
Err(err) => task.reply(err),
}
}
}
@ -136,8 +132,8 @@ impl<S, R, F> RouteHandler<S> for FnHandler<S, R, F>
R: Into<HttpResponse> + 'static,
S: 'static,
{
fn handle(&self, req: HttpRequest<S>) -> Task {
Task::reply((self.f)(req).into())
fn handle(&self, req: HttpRequest<S>, task: &mut Task) {
task.reply((self.f)(req).into())
}
}
@ -167,7 +163,59 @@ impl<S, R, F> RouteHandler<S> for StreamHandler<S, R, F>
R: Stream<Item=Frame, Error=Error> + 'static,
S: 'static,
{
fn handle(&self, req: HttpRequest<S>) -> Task {
Task::with_stream((self.f)(req))
fn handle(&self, req: HttpRequest<S>, task: &mut Task) {
task.stream((self.f)(req))
}
}
enum ReplyItem {
Message(HttpResponse),
Actor(Box<IoContext<Item=Frame, Error=Error>>),
Stream(Box<Stream<Item=Frame, Error=Error>>),
}
/// Represents response process.
pub struct Reply(ReplyItem);
impl Reply
{
/// Create actor response
pub(crate) fn async<C: IoContext>(ctx: C) -> Result<Reply> {
Ok(Reply(ReplyItem::Actor(Box::new(ctx))))
}
/// Create async response
pub fn stream<S>(stream: S) -> Result<Reply>
where S: Stream<Item=Frame, Error=Error> + 'static
{
Ok(Reply(ReplyItem::Stream(Box::new(stream))))
}
/// Send response
pub fn reply<R: Into<HttpResponse>>(response: R) -> Result<Reply> {
Ok(Reply(ReplyItem::Message(response.into())))
}
pub fn into(self, task: &mut Task)
{
match self.0 {
ReplyItem::Message(msg) => {
task.reply(msg)
},
ReplyItem::Actor(ctx) => {
task.context(ctx)
}
ReplyItem::Stream(stream) => {
task.stream(stream)
}
}
}
}
impl<T> From<T> for Reply
where T: Into<HttpResponse>
{
fn from(item: T) -> Self {
Reply(ReplyItem::Message(item.into()))
}
}

View file

@ -136,9 +136,9 @@ impl<S: 'static> RouteHandler<S> for StaticFiles {
}
}
fn handle(&self, req: HttpRequest<S>) -> Task {
fn handle(&self, req: HttpRequest<S>, task: &mut Task) {
if !self.accessible {
Task::reply(HTTPNotFound)
task.reply(HTTPNotFound)
} else {
let mut hidden = false;
let filepath = req.path()[self.prefix.len()..]
@ -152,7 +152,7 @@ impl<S: 'static> RouteHandler<S> for StaticFiles {
// hidden file
if hidden {
return Task::reply(HTTPNotFound)
task.reply(HTTPNotFound)
}
// full filepath
@ -160,19 +160,19 @@ impl<S: 'static> RouteHandler<S> for StaticFiles {
let filename = match self.directory.join(&filepath[idx..]).canonicalize() {
Ok(fname) => fname,
Err(err) => return match err.kind() {
io::ErrorKind::NotFound => Task::reply(HTTPNotFound),
io::ErrorKind::PermissionDenied => Task::reply(HTTPForbidden),
_ => Task::error(err),
io::ErrorKind::NotFound => task.reply(HTTPNotFound),
io::ErrorKind::PermissionDenied => task.reply(HTTPForbidden),
_ => task.error(err),
}
};
if filename.is_dir() {
match self.index(&filepath[idx..], &filename) {
Ok(resp) => Task::reply(resp),
Ok(resp) => task.reply(resp),
Err(err) => match err.kind() {
io::ErrorKind::NotFound => Task::reply(HTTPNotFound),
io::ErrorKind::PermissionDenied => Task::reply(HTTPForbidden),
_ => Task::error(err),
io::ErrorKind::NotFound => task.reply(HTTPNotFound),
io::ErrorKind::PermissionDenied => task.reply(HTTPForbidden),
_ => task.error(err),
}
}
} else {
@ -185,9 +185,9 @@ impl<S: 'static> RouteHandler<S> for StaticFiles {
Ok(mut file) => {
let mut data = Vec::new();
let _ = file.read_to_end(&mut data);
Task::reply(resp.body(data).unwrap())
task.reply(resp.body(data).unwrap())
},
Err(err) => Task::error(err),
Err(err) => task.error(err),
}
}
}

View file

@ -114,9 +114,23 @@ pub struct Task {
middlewares: Option<MiddlewaresResponse>,
}
impl Default for Task {
fn default() -> Task {
Task { state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
drain: Vec::new(),
stream: TaskStream::None,
prepared: None,
disconnected: false,
middlewares: None }
}
}
impl Task {
pub fn reply<R: Into<HttpResponse>>(response: R) -> Self {
pub fn from_response<R: Into<HttpResponse>>(response: R) -> Task {
let mut frames = VecDeque::new();
frames.push_back(Frame::Message(response.into()));
frames.push_back(Frame::Payload(None));
@ -131,32 +145,28 @@ impl Task {
middlewares: None }
}
pub fn error<E: Into<Error>>(err: E) -> Self {
Task::reply(err.into())
pub fn from_error<E: Into<Error>>(err: E) -> Task {
Task::from_response(err.into())
}
pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self {
Task { state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
stream: TaskStream::Context(Box::new(ctx)),
drain: Vec::new(),
prepared: None,
disconnected: false,
middlewares: None }
pub fn reply<R: Into<HttpResponse>>(&mut self, response: R) {
self.frames.push_back(Frame::Message(response.into()));
self.frames.push_back(Frame::Payload(None));
self.iostate = TaskIOState::Done;
}
pub(crate) fn with_stream<S>(stream: S) -> Self
pub fn error<E: Into<Error>>(&mut self, err: E) {
self.reply(err.into())
}
pub(crate) fn context(&mut self, ctx: Box<IoContext<Item=Frame, Error=Error>>) {
self.stream = TaskStream::Context(ctx);
}
pub(crate) fn stream<S>(&mut self, stream: S)
where S: Stream<Item=Frame, Error=Error> + 'static
{
Task { state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
stream: TaskStream::Stream(Box::new(stream)),
drain: Vec::new(),
prepared: None,
disconnected: false,
middlewares: None }
self.stream = TaskStream::Stream(Box::new(stream));
}
pub(crate) fn response(&mut self) -> HttpResponse {

View file

@ -22,7 +22,7 @@
//! impl Route for WsRoute {
//! type State = ();
//!
//! fn request(mut req: HttpRequest, ctx: &mut HttpContext<Self>) -> RouteResult<Self>
//! fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
//! {
//! // WebSocket handshake
//! let resp = ws::handshake(&req)?;
@ -31,7 +31,7 @@
//! // Map Payload into WsStream
//! ctx.add_stream(ws::WsStream::new(&mut req));
//! // Start ws messages processing
//! Reply::async(WsRoute)
//! ctx.reply(WsRoute)
//! }
//! }
//!

View file

@ -3,6 +3,7 @@ extern crate http;
extern crate time;
use std::str;
use actix_web::*;
use actix_web::dev::*;
use http::{header, Method, Version, HeaderMap};