diff --git a/CHANGES.md b/CHANGES.md index b8ca7d47d..4bbfe5bd2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -25,6 +25,8 @@ * `HttpRequest::url_for_static()` for a named route with no variables segments +* Propagation of the application's default resource to scopes that haven't set a default resource. + ### Changed @@ -39,7 +41,7 @@ * Added header `User-Agent: Actix-web/` to default headers when building a request -* port `Extensions` type from http create, we dont need `Send + Sync` +* port `Extensions` type from http create, we don't need `Send + Sync` * `HttpRequest::query()` returns `&HashMap` @@ -50,6 +52,16 @@ * Remove `HttpMessage::range()` +## [0.6.14] - 2018-06-21 + +### Added + +* Allow to disable masking for websockets client + +### Fixed + +* SendRequest execution fails with the "internal error: entered unreachable code" #329 + ## [0.6.13] - 2018-06-11 @@ -88,7 +100,7 @@ ### Added -* Allow to use path without traling slashes for scope registration #241 +* Allow to use path without trailing slashes for scope registration #241 * Allow to set encoding for exact NamedFile #239 @@ -449,7 +461,7 @@ * Server multi-threading -* Gracefull shutdown support +* Graceful shutdown support ## 0.2.1 (2017-11-03) diff --git a/Cargo.toml b/Cargo.toml index 61259c79b..f4ca9262d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,8 @@ flate2-rust = ["flate2/rust_backend"] features = ["tls", "alpn", "session", "brotli", "flate2-c"] [dependencies] -actix = "0.6.1" +# actix = "0.6.1" +actix = { git="https://github.com/actix/actix.git" } base64 = "0.9" bitflags = "1.0" diff --git a/MIGRATION.md b/MIGRATION.md index 175d82b3c..73e2d5653 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -23,6 +23,38 @@ * Renamed `client::ClientConnectorError::Connector` to `client::ClientConnectorError::Resolver` +* `Route::with()` does not return `ExtractorConfig`, to configure + extractor use `Route::with_config()` + + instead of + + ```rust + fn main() { + let app = App::new().resource("/index.html", |r| { + r.method(http::Method::GET) + .with(index) + .limit(4096); // <- limit size of the payload + }); + } + ``` + + use + + ```rust + + fn main() { + let app = App::new().resource("/index.html", |r| { + r.method(http::Method::GET) + .with_config(index, |cfg| { // <- register handler + cfg.limit(4096); // <- limit size of the payload + }) + }); + } + ``` + +* `Route::with_async()` does not return `ExtractorConfig`, to configure + extractor use `Route::with_async_config()` + ## 0.6 diff --git a/src/application.rs b/src/application.rs index 93008b3d2..9ce99e5b1 100644 --- a/src/application.rs +++ b/src/application.rs @@ -29,7 +29,7 @@ pub struct HttpApplication { #[doc(hidden)] pub struct Inner { prefix: usize, - default: ResourceHandler, + default: Rc>>, encoding: ContentEncoding, resources: Vec>, handlers: Vec>, @@ -51,7 +51,7 @@ impl PipelineHandler for Inner { match htype { HandlerType::Normal(idx) => match self.resources[idx].handle(req) { Ok(result) => result, - Err(req) => match self.default.handle(req) { + Err(req) => match self.default.borrow_mut().handle(req) { Ok(result) => result, Err(_) => AsyncResult::ok(HttpResponse::new(StatusCode::NOT_FOUND)), }, @@ -60,7 +60,7 @@ impl PipelineHandler for Inner { PrefixHandlerType::Handler(_, ref mut hnd) => hnd.handle(req), PrefixHandlerType::Scope(_, ref mut hnd, _) => hnd.handle(req), }, - HandlerType::Default => match self.default.handle(req) { + HandlerType::Default => match self.default.borrow_mut().handle(req) { Ok(result) => result, Err(_) => AsyncResult::ok(HttpResponse::new(StatusCode::NOT_FOUND)), }, @@ -172,7 +172,7 @@ struct ApplicationParts { state: S, prefix: String, settings: ServerSettings, - default: ResourceHandler, + default: Rc>>, resources: Vec<(Resource, Option>)>, handlers: Vec>, external: HashMap, @@ -223,7 +223,7 @@ where state, prefix: "/".to_owned(), settings: ServerSettings::default(), - default: ResourceHandler::default_not_found(), + default: Rc::new(RefCell::new(ResourceHandler::default_not_found())), resources: Vec::new(), handlers: Vec::new(), external: HashMap::new(), @@ -335,33 +335,34 @@ where T: FromRequest + 'static, { { - let parts = self.parts.as_mut().expect("Use after finish"); + let parts: &mut ApplicationParts = self.parts.as_mut().expect("Use after finish"); - // get resource handler - let mut found = false; - for &mut (ref pattern, ref handler) in &mut parts.resources { - if handler.is_some() && pattern.pattern() == path { - found = true; - break; - } - } + let out = { + // get resource handler + let mut iterator = parts.resources.iter_mut(); - if !found { - let mut handler = ResourceHandler::default(); - handler.method(method).with(f); - let pattern = Resource::new(handler.get_name(), path); - parts.resources.push((pattern, Some(handler))); - } else { - for &mut (ref pattern, ref mut handler) in &mut parts.resources { - if let Some(ref mut handler) = *handler { - if pattern.pattern() == path { - handler.method(method).with(f); - break; + loop { + if let Some(&mut (ref pattern, ref mut handler)) = iterator.next() { + if let Some(ref mut handler) = *handler { + if pattern.pattern() == path { + handler.method(method).with(f); + break None; + } } + } else { + let mut handler = ResourceHandler::default(); + handler.method(method).with(f); + let pattern = Resource::new(handler.get_name(), path); + break Some((pattern, Some(handler))); } } + }; + + if let Some(out) = out { + parts.resources.push(out); } } + self } @@ -473,7 +474,7 @@ where { { let parts = self.parts.as_mut().expect("Use after finish"); - f(&mut parts.default); + f(&mut parts.default.borrow_mut()); } self } @@ -614,7 +615,7 @@ where /// Finish application configuration and create `HttpHandler` object. pub fn finish(&mut self) -> HttpApplication { - let parts = self.parts.take().expect("Use after finish"); + let mut parts = self.parts.take().expect("Use after finish"); let prefix = parts.prefix.trim().trim_right_matches('/'); let (prefix, prefix_len) = if prefix.is_empty() { ("/".to_owned(), 0) @@ -627,11 +628,19 @@ where resources.push((pattern, None)); } + for ref mut handler in parts.handlers.iter_mut() { + if let PrefixHandlerType::Scope(_, ref mut route_handler, _) = handler { + if !route_handler.has_default_resource() { + route_handler.default_resource(Rc::clone(&parts.default)); + } + }; + } + let (router, resources) = Router::new(&prefix, parts.settings, resources); let inner = Rc::new(RefCell::new(Inner { prefix: prefix_len, - default: parts.default, + default: Rc::clone(&parts.default), encoding: parts.encoding, handlers: parts.handlers, resources, diff --git a/src/client/connector.rs b/src/client/connector.rs index e094fd0cf..58b6331db 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -6,7 +6,7 @@ use std::{fmt, io, mem, time}; use actix::resolver::{Connect as ResolveConnect, Resolver, ResolverError}; use actix::{ fut, Actor, ActorContext, ActorFuture, ActorResponse, Addr, AsyncContext, Context, - ContextFutureSpawner, Handler, Message, Recipient, StreamHandler, Supervised, + ContextFutureSpawner, Handler, Message, Recipient, StreamHandler2, Supervised, SystemService, WrapFuture, }; @@ -220,7 +220,7 @@ impl Actor for ClientConnector { self.resolver = Some(Resolver::from_registry()) } self.collect_periodic(ctx); - ctx.add_stream(self.acq_rx.take().unwrap()); + ctx.add_stream2(self.acq_rx.take().unwrap()); ctx.spawn(Maintenance); } } @@ -767,7 +767,7 @@ impl Handler for ClientConnector { } } -impl StreamHandler for ClientConnector { +impl StreamHandler2 for ClientConnector { fn handle( &mut self, msg: Result, ()>, ctx: &mut Context, diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index e77894b24..4173c7d2c 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -392,7 +392,7 @@ impl Pipeline { match self.timeout.as_mut().unwrap().poll() { Ok(Async::Ready(())) => return Err(SendRequestError::Timeout), Ok(Async::NotReady) => (), - Err(_) => return Err(SendRequestError::Timeout), + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e).into()), } } Ok(()) diff --git a/src/error.rs b/src/error.rs index f011733b3..6d8d3b042 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,7 +24,7 @@ pub use cookie::ParseError as CookieParseError; use handler::Responder; use httprequest::HttpRequest; -use httpresponse::{HttpResponse, InnerHttpResponse}; +use httpresponse::{HttpResponse, HttpResponseParts}; /// A specialized [`Result`](https://doc.rust-lang.org/std/result/enum.Result.html) /// for actix web operations @@ -654,7 +654,7 @@ pub struct InternalError { enum InternalErrorType { Status(StatusCode), - Response(Mutex>>), + Response(Box>>), } impl InternalError { @@ -669,12 +669,10 @@ impl InternalError { /// Create `InternalError` with predefined `HttpResponse`. pub fn from_response(cause: T, response: HttpResponse) -> Self { - let mut resp = response.into_inner(); - resp.drop_unsupported_body(); - + let resp = response.into_parts(); InternalError { cause, - status: InternalErrorType::Response(Mutex::new(Some(resp))), + status: InternalErrorType::Response(Box::new(Mutex::new(Some(resp)))), backtrace: Backtrace::new(), } } @@ -716,7 +714,7 @@ where InternalErrorType::Status(st) => HttpResponse::new(st), InternalErrorType::Response(ref resp) => { if let Some(resp) = resp.lock().unwrap().take() { - HttpResponse::from_inner(resp) + HttpResponse::from_parts(resp) } else { HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR) } diff --git a/src/extractor.rs b/src/extractor.rs index 0cdcb3afb..5ace390dc 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -1,7 +1,7 @@ use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; -use std::{fmt, str}; use std::rc::Rc; +use std::{fmt, str}; use bytes::Bytes; use encoding::all::UTF_8; @@ -312,8 +312,10 @@ impl fmt::Display for Form { /// let app = App::new().resource( /// "/index.html", /// |r| { -/// r.method(http::Method::GET).with(index).limit(4096); -/// }, // <- change form extractor configuration +/// r.method(http::Method::GET) +/// // register form handler and change form extractor configuration +/// .with_config(index, |cfg| {cfg.limit(4096);}) +/// }, /// ); /// } /// ``` @@ -328,7 +330,7 @@ impl FormConfig { self.limit = limit; self } - + /// Set custom error handler pub fn error_handler(&mut self, f: F) -> &mut Self where @@ -408,8 +410,9 @@ impl FromRequest for Bytes { /// fn main() { /// let app = App::new().resource("/index.html", |r| { /// r.method(http::Method::GET) -/// .with(index) // <- register handler with extractor params -/// .limit(4096); // <- limit size of the payload +/// .with_config(index, |cfg| { // <- register handler with extractor params +/// cfg.limit(4096); // <- limit size of the payload +/// }) /// }); /// } /// ``` diff --git a/src/handler.rs b/src/handler.rs index d330e0716..4428ce83f 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,5 +1,7 @@ +use std::cell::RefCell; use std::marker::PhantomData; use std::ops::Deref; +use std::rc::Rc; use futures::future::{err, ok, Future}; use futures::{Async, Poll}; @@ -8,6 +10,7 @@ use error::Error; use http::StatusCode; use httprequest::HttpRequest; use httpresponse::HttpResponse; +use resource::ResourceHandler; /// Trait defines object that could be registered as route handler #[allow(unused_variables)] @@ -403,6 +406,14 @@ where // /// Trait defines object that could be registered as resource route pub(crate) trait RouteHandler: 'static { fn handle(&mut self, req: HttpRequest) -> AsyncResult; + + fn has_default_resource(&self) -> bool { + false + } + + fn default_resource(&mut self, default: Rc>>) { + unimplemented!() + } } /// Route handler wrapper for Handler diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 8caf470ce..333e6c4ad 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -299,12 +299,15 @@ impl HttpResponse { self.get_mut().write_capacity = cap; } - pub(crate) fn into_inner(mut self) -> Box { - self.0.take().unwrap() + pub(crate) fn into_parts(mut self) -> HttpResponseParts { + self.0.take().unwrap().into_parts() } - pub(crate) fn from_inner(inner: Box) -> HttpResponse { - HttpResponse(Some(inner), HttpResponsePool::pool()) + pub(crate) fn from_parts(parts: HttpResponseParts) -> HttpResponse { + HttpResponse( + Some(Box::new(InnerHttpResponse::from_parts(parts))), + HttpResponsePool::pool(), + ) } } @@ -880,12 +883,12 @@ impl<'a, S> From<&'a HttpRequest> for HttpResponseBuilder { } #[derive(Debug)] -pub(crate) struct InnerHttpResponse { +struct InnerHttpResponse { version: Option, headers: HeaderMap, status: StatusCode, reason: Option<&'static str>, - pub(crate) body: Body, + body: Body, chunked: Option, encoding: Option, connection_type: Option, @@ -894,10 +897,16 @@ pub(crate) struct InnerHttpResponse { error: Option, } -/// This is here only because `failure::Fail: Send + Sync` which looks insane to me -unsafe impl Sync for InnerHttpResponse {} -/// This is here only because `failure::Fail: Send + Sync` which looks insane to me -unsafe impl Send for InnerHttpResponse {} +pub(crate) struct HttpResponseParts { + version: Option, + headers: HeaderMap, + status: StatusCode, + reason: Option<&'static str>, + body: Option, + encoding: Option, + connection_type: Option, + error: Option, +} impl InnerHttpResponse { #[inline] @@ -918,21 +927,52 @@ impl InnerHttpResponse { } /// This is for failure, we can not have Send + Sync on Streaming and Actor response - pub(crate) fn drop_unsupported_body(&mut self) { - let body = mem::replace(&mut self.body, Body::Empty); - match body { - Body::Empty => (), - Body::Binary(mut bin) => { - self.body = Body::Binary(bin.take().into()); - } + fn into_parts(mut self) -> HttpResponseParts { + let body = match mem::replace(&mut self.body, Body::Empty) { + Body::Empty => None, + Body::Binary(mut bin) => Some(bin.take()), Body::Streaming(_) | Body::Actor(_) => { error!("Streaming or Actor body is not support by error response"); + None } + }; + + HttpResponseParts { + body, + version: self.version, + headers: self.headers, + status: self.status, + reason: self.reason, + encoding: self.encoding, + connection_type: self.connection_type, + error: self.error, + } + } + + fn from_parts(parts: HttpResponseParts) -> InnerHttpResponse { + let body = if let Some(ref body) = parts.body { + Body::Binary(body.clone().into()) + } else { + Body::Empty + }; + + InnerHttpResponse { + body, + status: parts.status, + version: parts.version, + headers: parts.headers, + reason: parts.reason, + chunked: None, + encoding: parts.encoding, + connection_type: parts.connection_type, + response_size: 0, + write_capacity: MAX_WRITE_BUFFER_SIZE, + error: parts.error, } } } -/// Internal use only! unsafe +/// Internal use only! pub(crate) struct HttpResponsePool(VecDeque>); thread_local!(static POOL: Rc> = HttpResponsePool::pool()); diff --git a/src/json.rs b/src/json.rs index d0e12c04d..0b5cb96e4 100644 --- a/src/json.rs +++ b/src/json.rs @@ -171,12 +171,13 @@ where /// fn main() { /// let app = App::new().resource("/index.html", |r| { /// r.method(http::Method::POST) -/// .with(index) -/// .limit(4096) // <- change json extractor configuration -/// .error_handler(|err, req| { // <- create custom error response -/// error::InternalError::from_response( -/// err, HttpResponse::Conflict().finish()).into() -/// }); +/// .with_config(index, |cfg| { +/// cfg.limit(4096) // <- change json extractor configuration +/// .error_handler(|err, req| { // <- create custom error response +/// error::InternalError::from_response( +/// err, HttpResponse::Conflict().finish()).into() +/// }); +/// }) /// }); /// } /// ``` @@ -326,7 +327,7 @@ mod tests { use http::header; use handler::Handler; - use with::{ExtractorConfig, With}; + use with::With; impl PartialEq for JsonPayloadError { fn eq(&self, other: &JsonPayloadError) -> bool { @@ -409,7 +410,7 @@ mod tests { #[test] fn test_with_json() { - let mut cfg = ExtractorConfig::<_, Json>::default(); + let mut cfg = JsonConfig::default(); cfg.limit(4096); let mut handler = With::new(|data: Json| data, cfg); diff --git a/src/lib.rs b/src/lib.rs index 95f5a4ee0..90b743810 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -246,7 +246,6 @@ pub mod dev { pub use resource::ResourceHandler; pub use route::Route; pub use router::{Resource, ResourceType, Router}; - pub use with::ExtractorConfig; } pub mod http { diff --git a/src/multipart.rs b/src/multipart.rs index 9c5c0380c..7c93b5657 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -1,5 +1,5 @@ //! Multipart requests support -use std::cell::RefCell; +use std::cell::{RefCell, UnsafeCell}; use std::marker::PhantomData; use std::rc::Rc; use std::{cmp, fmt}; @@ -590,7 +590,7 @@ where } struct PayloadRef { - payload: Rc>, + payload: Rc>>, } impl PayloadRef @@ -599,7 +599,7 @@ where { fn new(payload: PayloadHelper) -> PayloadRef { PayloadRef { - payload: Rc::new(payload), + payload: Rc::new(payload.into()), } } @@ -609,7 +609,7 @@ where { if s.current() { let payload: &mut PayloadHelper = - unsafe { &mut *(self.payload.as_ref() as *const _ as *mut _) }; + unsafe { &mut *self.payload.get() }; Some(payload) } else { None diff --git a/src/route.rs b/src/route.rs index 524b66ef8..4c82926e8 100644 --- a/src/route.rs +++ b/src/route.rs @@ -17,7 +17,7 @@ use middleware::{ Started as MiddlewareStarted, }; use pred::Predicate; -use with::{ExtractorConfig, With, WithAsync}; +use with::{With, WithAsync}; /// Resource route definition /// @@ -164,15 +164,49 @@ impl Route { /// ); // <- use `with` extractor /// } /// ``` - pub fn with(&mut self, handler: F) -> ExtractorConfig + pub fn with(&mut self, handler: F) where F: Fn(T) -> R + 'static, R: Responder + 'static, T: FromRequest + 'static, { - let cfg = ExtractorConfig::::default(); - self.h(With::new(handler, cfg.clone())); - cfg + self.h(With::new(handler, ::default())); + } + + /// Set handler function. Same as `.with()` but it allows to configure + /// extractor. + /// + /// ```rust + /// # extern crate bytes; + /// # extern crate actix_web; + /// # extern crate futures; + /// #[macro_use] extern crate serde_derive; + /// use actix_web::{http, App, Path, Result}; + /// + /// /// extract text data from request + /// fn index(body: String) -> Result { + /// Ok(format!("Body {}!", body)) + /// } + /// + /// fn main() { + /// let app = App::new().resource("/index.html", |r| { + /// r.method(http::Method::GET) + /// .with_config(index, |cfg| { // <- register handler + /// cfg.limit(4096); // <- limit size of the payload + /// }) + /// }); + /// } + /// ``` + pub fn with_config(&mut self, handler: F, cfg_f: C) + where + F: Fn(T) -> R + 'static, + R: Responder + 'static, + T: FromRequest + 'static, + C: FnOnce(&mut T::Config), + { + let mut cfg = ::default(); + cfg_f(&mut cfg); + self.h(With::new(handler, cfg)); } /// Set async handler function, use request extractor for parameters. @@ -204,7 +238,7 @@ impl Route { /// ); // <- use `with` extractor /// } /// ``` - pub fn with_async(&mut self, handler: F) -> ExtractorConfig + pub fn with_async(&mut self, handler: F) where F: Fn(T) -> R + 'static, R: Future + 'static, @@ -212,9 +246,52 @@ impl Route { E: Into + 'static, T: FromRequest + 'static, { - let cfg = ExtractorConfig::::default(); - self.h(WithAsync::new(handler, cfg.clone())); - cfg + self.h(WithAsync::new(handler, ::default())); + } + + /// Set async handler function, use request extractor for parameters. + /// This method allows to configure extractor. + /// + /// ```rust + /// # extern crate bytes; + /// # extern crate actix_web; + /// # extern crate futures; + /// #[macro_use] extern crate serde_derive; + /// use actix_web::{http, App, Error, Form}; + /// use futures::Future; + /// + /// #[derive(Deserialize)] + /// struct Info { + /// username: String, + /// } + /// + /// /// extract path info using serde + /// fn index(info: Form) -> Box> { + /// unimplemented!() + /// } + /// + /// fn main() { + /// let app = App::new().resource( + /// "/{username}/index.html", // <- define path parameters + /// |r| r.method(http::Method::GET) + /// .with_async_config(index, |cfg| { + /// cfg.limit(4096); + /// }), + /// ); // <- use `with` extractor + /// } + /// ``` + pub fn with_async_config(&mut self, handler: F, cfg: C) + where + F: Fn(T) -> R + 'static, + R: Future + 'static, + I: Responder + 'static, + E: Into + 'static, + T: FromRequest + 'static, + C: FnOnce(&mut T::Config), + { + let mut extractor_cfg = ::default(); + cfg(&mut extractor_cfg); + self.h(WithAsync::new(handler, extractor_cfg)); } } @@ -241,7 +318,7 @@ impl InnerHandler { #[inline] pub fn handle(&self, req: HttpRequest) -> AsyncResult { - // reason: handler is unique per thread, handler get called from async code only + // reason: handler is unique per thread, handler get called from sync code only let h = unsafe { &mut *self.0.as_ref().get() }; h.handle(req) } diff --git a/src/router.rs b/src/router.rs index 0ae178089..e04956e92 100644 --- a/src/router.rs +++ b/src/router.rs @@ -309,7 +309,7 @@ impl Resource { params.set_tail(len as u16); for (idx, segment) in segments.iter().enumerate() { // reason: Router is part of App, which is unique per thread - // app is alive during whole life of tthread + // app is alive during whole life of a thread let name = unsafe { &*(names[idx].as_str() as *const _) }; params.add(name, *segment); } @@ -378,7 +378,7 @@ impl Resource { params.set_tail(tail_len as u16); for (idx, segment) in segments.iter().enumerate() { // reason: Router is part of App, which is unique per thread - // app is alive during whole life of tthread + // app is alive during whole life of a thread let name = unsafe { &*(names[idx].as_str() as *const _) }; params.add(name, *segment); } diff --git a/src/scope.rs b/src/scope.rs index 6651992db..70fb17287 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -405,6 +405,14 @@ impl RouteHandler for Scope { unimplemented!() } } + + fn has_default_resource(&self) -> bool { + self.default.is_some() + } + + fn default_resource(&mut self, default: ScopeResource) { + self.default = Some(default); + } } struct Wrapper { @@ -1188,4 +1196,27 @@ mod tests { let resp = app.run(req); assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND); } + + #[test] + fn test_default_resource_propagation() { + let mut app = App::new() + .scope("/app1", |scope| { + scope.default_resource(|r| r.f(|_| HttpResponse::BadRequest())) + }) + .scope("/app2", |scope| scope) + .default_resource(|r| r.f(|_| HttpResponse::MethodNotAllowed())) + .finish(); + + let req = TestRequest::with_uri("/non-exist").finish(); + let resp = app.run(req); + assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED); + + let req = TestRequest::with_uri("/app1/non-exist").finish(); + let resp = app.run(req); + assert_eq!(resp.as_msg().status(), StatusCode::BAD_REQUEST); + + let req = TestRequest::with_uri("/app2/non-exist").finish(); + let resp = app.run(req); + assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED); + } } diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index d174964b9..ebb0fff32 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -73,12 +73,11 @@ impl H1Writer { self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) } - fn write_data(&mut self, data: &[u8]) -> io::Result { + fn write_data(stream: &mut T, data: &[u8]) -> io::Result { let mut written = 0; while written < data.len() { - match self.stream.write(&data[written..]) { + match stream.write(&data[written..]) { Ok(0) => { - self.disconnected(); return Err(io::Error::new(io::ErrorKind::WriteZero, "")); } Ok(n) => { @@ -243,7 +242,16 @@ impl Writer for H1Writer { if self.flags.contains(Flags::UPGRADE) { if self.buffer.is_empty() { let pl: &[u8] = payload.as_ref(); - let n = self.write_data(pl)?; + let n = match Self::write_data(&mut self.stream, pl) { + Err(err) => { + if err.kind() == io::ErrorKind::WriteZero { + self.disconnected(); + } + + return Err(err); + } + Ok(val) => val, + }; if n < pl.len() { self.buffer.extend_from_slice(&pl[n..]); return Ok(WriterState::Done); @@ -284,9 +292,18 @@ impl Writer for H1Writer { #[inline] fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> { if !self.buffer.is_empty() { - let buf: &[u8] = - unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) }; - let written = self.write_data(buf)?; + let written = { + match Self::write_data(&mut self.stream, self.buffer.as_ref()) { + Err(err) => { + if err.kind() == io::ErrorKind::WriteZero { + self.disconnected(); + } + + return Err(err); + } + Ok(val) => val, + } + }; let _ = self.buffer.split_to(written); if shutdown && !self.buffer.is_empty() || (self.buffer.len() > self.buffer_capacity) diff --git a/src/server/srv.rs b/src/server/srv.rs index cd6703663..d5c94ea8c 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -5,7 +5,7 @@ use std::{io, net, thread}; use actix::{ fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, - Response, StreamHandler, System, WrapFuture, + Response, StreamHandler2, System, WrapFuture, }; use futures::sync::mpsc; @@ -449,7 +449,7 @@ impl HttpServer { // start http server actor let signals = self.subscribe_to_signals(); let addr = Actor::create(move |ctx| { - ctx.add_stream(rx); + ctx.add_stream2(rx); self }); if let Some(signals) = signals { @@ -611,7 +611,7 @@ impl Handler for HttpServer { } /// Commands from accept threads -impl StreamHandler for HttpServer { +impl StreamHandler2 for HttpServer { fn handle(&mut self, msg: Result, ()>, _: &mut Context) { if let Ok(Some(ServerCommand::WorkerDied(idx, socks))) = msg { let mut found = false; diff --git a/src/with.rs b/src/with.rs index 4cb1546a7..126958b50 100644 --- a/src/with.rs +++ b/src/with.rs @@ -1,7 +1,6 @@ use futures::{Async, Future, Poll}; use std::cell::UnsafeCell; use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; use std::rc::Rc; use error::Error; @@ -9,110 +8,24 @@ use handler::{AsyncResult, AsyncResultItem, FromRequest, Handler, Responder}; use httprequest::HttpRequest; use httpresponse::HttpResponse; -/// Extractor configuration -/// -/// `Route::with()` and `Route::with_async()` returns instance -/// of the `ExtractorConfig` type. It could be used for extractor configuration. -/// -/// In this example `Form` configured. -/// -/// ```rust -/// # extern crate actix_web; -/// #[macro_use] extern crate serde_derive; -/// use actix_web::{http, App, Form, Result}; -/// -/// #[derive(Deserialize)] -/// struct FormData { -/// username: String, -/// } -/// -/// fn index(form: Form) -> Result { -/// Ok(format!("Welcome {}!", form.username)) -/// } -/// -/// fn main() { -/// let app = App::new().resource( -/// "/index.html", -/// |r| { -/// r.method(http::Method::GET).with(index).limit(4096); -/// }, // <- change form extractor configuration -/// ); -/// } -/// ``` -/// -/// Same could be donce with multiple extractors -/// -/// ```rust -/// # extern crate actix_web; -/// #[macro_use] extern crate serde_derive; -/// use actix_web::{http, App, Form, Path, Result}; -/// -/// #[derive(Deserialize)] -/// struct FormData { -/// username: String, -/// } -/// -/// fn index(data: (Path<(String,)>, Form)) -> Result { -/// Ok(format!("Welcome {}!", data.1.username)) -/// } -/// -/// fn main() { -/// let app = App::new().resource( -/// "/index.html", -/// |r| { -/// r.method(http::Method::GET).with(index).1.limit(4096); -/// }, // <- change form extractor configuration -/// ); -/// } -/// ``` -pub struct ExtractorConfig> { - cfg: Rc>, +pub(crate) struct With +where + F: Fn(T) -> R, + T: FromRequest, + S: 'static, +{ + hnd: Rc>, + cfg: Rc, } -impl> Default for ExtractorConfig { - fn default() -> Self { - ExtractorConfig { - cfg: Rc::new(UnsafeCell::new(T::Config::default())), - } - } -} - -impl> ExtractorConfig { - pub(crate) fn clone(&self) -> Self { - ExtractorConfig { - cfg: Rc::clone(&self.cfg), - } - } -} - -impl> AsRef for ExtractorConfig { - fn as_ref(&self) -> &T::Config { - unsafe { &*self.cfg.get() } - } -} - -impl> Deref for ExtractorConfig { - type Target = T::Config; - - fn deref(&self) -> &T::Config { - unsafe { &*self.cfg.get() } - } -} - -impl> DerefMut for ExtractorConfig { - fn deref_mut(&mut self) -> &mut T::Config { - unsafe { &mut *self.cfg.get() } - } -} - -pub struct With +pub struct WithHnd where F: Fn(T) -> R, T: FromRequest, S: 'static, { hnd: Rc>, - cfg: ExtractorConfig, + _t: PhantomData, _s: PhantomData, } @@ -122,11 +35,14 @@ where T: FromRequest, S: 'static, { - pub fn new(f: F, cfg: ExtractorConfig) -> Self { + pub fn new(f: F, cfg: T::Config) -> Self { With { - cfg, - hnd: Rc::new(UnsafeCell::new(f)), - _s: PhantomData, + cfg: Rc::new(cfg), + hnd: Rc::new(WithHnd { + hnd: Rc::new(UnsafeCell::new(f)), + _t: PhantomData, + _s: PhantomData, + }), } } } @@ -166,8 +82,8 @@ where S: 'static, { started: bool, - hnd: Rc>, - cfg: ExtractorConfig, + hnd: Rc>, + cfg: Rc, req: HttpRequest, fut1: Option>>, fut2: Option>>, @@ -206,24 +122,32 @@ where } }; - let hnd: &mut F = unsafe { &mut *self.hnd.get() }; - let item = match (*hnd)(item).respond_to(&self.req) { - Ok(item) => item.into(), - Err(e) => return Err(e.into()), - }; - - match item.into() { - AsyncResultItem::Err(err) => Err(err), - AsyncResultItem::Ok(resp) => Ok(Async::Ready(resp)), - AsyncResultItem::Future(fut) => { - self.fut2 = Some(fut); - self.poll() + let fut = { + // clone handler, inicrease ref counter + let h = self.hnd.as_ref().hnd.clone(); + // Enforce invariants before entering unsafe code. + // Only two references could exists With struct owns one, and line above + if Rc::weak_count(&h) != 0 || Rc::strong_count(&h) != 2 { + panic!("Multiple copies of handler are in use") } - } + let hnd: &mut F = unsafe { &mut *h.as_ref().get() }; + let item = match (*hnd)(item).respond_to(&self.req) { + Ok(item) => item.into(), + Err(e) => return Err(e.into()), + }; + + match item.into() { + AsyncResultItem::Err(err) => return Err(err), + AsyncResultItem::Ok(resp) => return Ok(Async::Ready(resp)), + AsyncResultItem::Future(fut) => fut, + } + }; + self.fut2 = Some(fut); + self.poll() } } -pub struct WithAsync +pub(crate) struct WithAsync where F: Fn(T) -> R, R: Future, @@ -232,9 +156,8 @@ where T: FromRequest, S: 'static, { - hnd: Rc>, - cfg: ExtractorConfig, - _s: PhantomData, + hnd: Rc>, + cfg: Rc, } impl WithAsync @@ -246,11 +169,14 @@ where T: FromRequest, S: 'static, { - pub fn new(f: F, cfg: ExtractorConfig) -> Self { + pub fn new(f: F, cfg: T::Config) -> Self { WithAsync { - cfg, - hnd: Rc::new(UnsafeCell::new(f)), - _s: PhantomData, + cfg: Rc::new(cfg), + hnd: Rc::new(WithHnd { + hnd: Rc::new(UnsafeCell::new(f)), + _s: PhantomData, + _t: PhantomData, + }), } } } @@ -271,7 +197,7 @@ where req, started: false, hnd: Rc::clone(&self.hnd), - cfg: self.cfg.clone(), + cfg: Rc::clone(&self.cfg), fut1: None, fut2: None, fut3: None, @@ -295,8 +221,8 @@ where S: 'static, { started: bool, - hnd: Rc>, - cfg: ExtractorConfig, + hnd: Rc>, + cfg: Rc, req: HttpRequest, fut1: Option>>, fut2: Option, @@ -356,8 +282,17 @@ where } }; - let hnd: &mut F = unsafe { &mut *self.hnd.get() }; - self.fut2 = Some((*hnd)(item)); + self.fut2 = { + // clone handler, inicrease ref counter + let h = self.hnd.as_ref().hnd.clone(); + // Enforce invariants before entering unsafe code. + // Only two references could exists With struct owns one, and line above + if Rc::weak_count(&h) != 0 || Rc::strong_count(&h) != 2 { + panic!("Multiple copies of handler are in use") + } + let hnd: &mut F = unsafe { &mut *h.as_ref().get() }; + Some((*hnd)(item)) + }; self.poll() } } diff --git a/src/ws/client.rs b/src/ws/client.rs index e9b7cf827..6a4fcf7c3 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -127,6 +127,7 @@ pub struct Client { protocols: Option, conn: Addr, max_size: usize, + no_masking: bool, } impl Client { @@ -144,6 +145,7 @@ impl Client { origin: None, protocols: None, max_size: 65_536, + no_masking: false, conn, }; cl.request.uri(uri.as_ref()); @@ -198,6 +200,12 @@ impl Client { self } + /// Disable payload masking. By default ws client masks frame payload. + pub fn no_masking(mut self) -> Self { + self.no_masking = true; + self + } + /// Set request header pub fn header(mut self, key: K, value: V) -> Self where @@ -260,7 +268,7 @@ impl Client { } // start handshake - ClientHandshake::new(request, self.max_size) + ClientHandshake::new(request, self.max_size, self.no_masking) } } } @@ -281,10 +289,13 @@ pub struct ClientHandshake { key: String, error: Option, max_size: usize, + no_masking: bool, } impl ClientHandshake { - fn new(mut request: ClientRequest, max_size: usize) -> ClientHandshake { + fn new( + mut request: ClientRequest, max_size: usize, no_masking: bool, + ) -> ClientHandshake { // Generate a random key for the `Sec-WebSocket-Key` header. // a base64-encoded (see Section 4 of [RFC4648]) value that, // when decoded, is 16 bytes in length (RFC 6455) @@ -304,6 +315,7 @@ impl ClientHandshake { ClientHandshake { key, max_size, + no_masking, request: Some(request.send()), tx: Some(tx), error: None, @@ -317,6 +329,7 @@ impl ClientHandshake { tx: None, error: Some(err), max_size: 0, + no_masking: false, } } @@ -427,6 +440,7 @@ impl Future for ClientHandshake { ClientReader { inner: Rc::clone(&inner), max_size: self.max_size, + no_masking: self.no_masking, }, ClientWriter { inner }, ))) @@ -437,6 +451,7 @@ impl Future for ClientHandshake { pub struct ClientReader { inner: Rc>, max_size: usize, + no_masking: bool, } impl fmt::Debug for ClientReader { @@ -451,13 +466,14 @@ impl Stream for ClientReader { fn poll(&mut self) -> Poll, Self::Error> { let max_size = self.max_size; + let no_masking = self.no_masking; let mut inner = self.inner.borrow_mut(); if inner.closed { return Ok(Async::Ready(None)); } // read - match Frame::parse(&mut inner.rx, false, max_size) { + match Frame::parse(&mut inner.rx, no_masking, max_size) { Ok(Async::Ready(Some(frame))) => { let (_finished, opcode, payload) = frame.unpack(); diff --git a/src/ws/mod.rs b/src/ws/mod.rs index c68cf300c..558ecb515 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -25,12 +25,12 @@ //! //! // Handler for ws::Message messages //! impl StreamHandler for Ws { -//! fn handle(&mut self, msg: Result, ws::ProtocolError>, ctx: &mut Self::Context) { +//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { //! match msg { -//! Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), -//! Ok(Some(ws::Message::Text(text))) => ctx.text(text), -//! Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), -//! _ => ctx.stop(), +//! ws::Message::Ping(msg) => ctx.pong(&msg), +//! ws::Message::Text(text) => ctx.text(text), +//! ws::Message::Binary(bin) => ctx.binary(bin), +//! _ => (), //! } //! } //! } diff --git a/tests/test_handlers.rs b/tests/test_handlers.rs index 116112e27..95bd5be2e 100644 --- a/tests/test_handlers.rs +++ b/tests/test_handlers.rs @@ -42,6 +42,28 @@ fn test_path_extractor() { assert_eq!(bytes, Bytes::from_static(b"Welcome test!")); } +#[test] +fn test_async_handler() { + let mut srv = test::TestServer::new(|app| { + app.resource("/{username}/index.html", |r| { + r.route().with(|p: Path| { + Delay::new(Instant::now() + Duration::from_millis(10)) + .and_then(move |_| Ok(format!("Welcome {}!", p.username))) + .responder() + }) + }); + }); + + // client request + let request = srv.get().uri(srv.url("/test/index.html")).finish().unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(b"Welcome test!")); +} + #[test] fn test_query_extractor() { let mut srv = test::TestServer::new(|app| { @@ -130,14 +152,17 @@ fn test_form_extractor() { fn test_form_extractor2() { let mut srv = test::TestServer::new(|app| { app.resource("/{username}/index.html", |r| { - r.route() - .with(|form: Form| format!("{}", form.username)) - .error_handler(|err, _| { - error::InternalError::from_response( - err, - HttpResponse::Conflict().finish(), - ).into() - }); + r.route().with_config( + |form: Form| format!("{}", form.username), + |cfg| { + cfg.error_handler(|err, _| { + error::InternalError::from_response( + err, + HttpResponse::Conflict().finish(), + ).into() + }); + }, + ); }); }); diff --git a/tests/test_ws.rs b/tests/test_ws.rs index eeeffb7aa..dd65d4a58 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -23,16 +23,13 @@ impl Actor for Ws { } impl StreamHandler for Ws { - fn handle( - &mut self, msg: Result, ws::ProtocolError>, - ctx: &mut Self::Context, - ) { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { match msg { - Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), - Ok(Some(ws::Message::Text(text))) => ctx.text(text), - Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), - Ok(Some(ws::Message::Close(reason))) => ctx.close(reason), - _ => ctx.stop(), + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(reason) => ctx.close(reason), + _ => (), } } } @@ -156,16 +153,13 @@ impl Ws2 { } impl StreamHandler for Ws2 { - fn handle( - &mut self, msg: Result, ws::ProtocolError>, - ctx: &mut Self::Context, - ) { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { match msg { - Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), - Ok(Some(ws::Message::Text(text))) => ctx.text(text), - Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), - Ok(Some(ws::Message::Close(reason))) => ctx.close(reason), - _ => ctx.stop(), + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(reason) => ctx.close(reason), + _ => (), } } }