1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-18 06:06:36 +00:00
This commit is contained in:
Gary Hai 2018-06-21 19:14:45 +08:00
commit 454e9f8ce1
23 changed files with 466 additions and 265 deletions

View file

@ -25,6 +25,8 @@
* `HttpRequest::url_for_static()` for a named route with no variables segments
* Propagation of the application's default resource to scopes that haven't set a default resource.
### Changed
@ -39,7 +41,7 @@
* Added header `User-Agent: Actix-web/<current_version>` to default headers when building a request
* port `Extensions` type from http create, we dont need `Send + Sync`
* port `Extensions` type from http create, we don't need `Send + Sync`
* `HttpRequest::query()` returns `&HashMap<String, String>`
@ -50,6 +52,16 @@
* Remove `HttpMessage::range()`
## [0.6.14] - 2018-06-21
### Added
* Allow to disable masking for websockets client
### Fixed
* SendRequest execution fails with the "internal error: entered unreachable code" #329
## [0.6.13] - 2018-06-11
@ -88,7 +100,7 @@
### Added
* Allow to use path without traling slashes for scope registration #241
* Allow to use path without trailing slashes for scope registration #241
* Allow to set encoding for exact NamedFile #239
@ -449,7 +461,7 @@
* Server multi-threading
* Gracefull shutdown support
* Graceful shutdown support
## 0.2.1 (2017-11-03)

View file

@ -50,7 +50,8 @@ flate2-rust = ["flate2/rust_backend"]
features = ["tls", "alpn", "session", "brotli", "flate2-c"]
[dependencies]
actix = "0.6.1"
# actix = "0.6.1"
actix = { git="https://github.com/actix/actix.git" }
base64 = "0.9"
bitflags = "1.0"

View file

@ -23,6 +23,38 @@
* Renamed `client::ClientConnectorError::Connector` to
`client::ClientConnectorError::Resolver`
* `Route::with()` does not return `ExtractorConfig`, to configure
extractor use `Route::with_config()`
instead of
```rust
fn main() {
let app = App::new().resource("/index.html", |r| {
r.method(http::Method::GET)
.with(index)
.limit(4096); // <- limit size of the payload
});
}
```
use
```rust
fn main() {
let app = App::new().resource("/index.html", |r| {
r.method(http::Method::GET)
.with_config(index, |cfg| { // <- register handler
cfg.limit(4096); // <- limit size of the payload
})
});
}
```
* `Route::with_async()` does not return `ExtractorConfig`, to configure
extractor use `Route::with_async_config()`
## 0.6

View file

@ -29,7 +29,7 @@ pub struct HttpApplication<S = ()> {
#[doc(hidden)]
pub struct Inner<S> {
prefix: usize,
default: ResourceHandler<S>,
default: Rc<RefCell<ResourceHandler<S>>>,
encoding: ContentEncoding,
resources: Vec<ResourceHandler<S>>,
handlers: Vec<PrefixHandlerType<S>>,
@ -51,7 +51,7 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
match htype {
HandlerType::Normal(idx) => match self.resources[idx].handle(req) {
Ok(result) => result,
Err(req) => match self.default.handle(req) {
Err(req) => match self.default.borrow_mut().handle(req) {
Ok(result) => result,
Err(_) => AsyncResult::ok(HttpResponse::new(StatusCode::NOT_FOUND)),
},
@ -60,7 +60,7 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
PrefixHandlerType::Handler(_, ref mut hnd) => hnd.handle(req),
PrefixHandlerType::Scope(_, ref mut hnd, _) => hnd.handle(req),
},
HandlerType::Default => match self.default.handle(req) {
HandlerType::Default => match self.default.borrow_mut().handle(req) {
Ok(result) => result,
Err(_) => AsyncResult::ok(HttpResponse::new(StatusCode::NOT_FOUND)),
},
@ -172,7 +172,7 @@ struct ApplicationParts<S> {
state: S,
prefix: String,
settings: ServerSettings,
default: ResourceHandler<S>,
default: Rc<RefCell<ResourceHandler<S>>>,
resources: Vec<(Resource, Option<ResourceHandler<S>>)>,
handlers: Vec<PrefixHandlerType<S>>,
external: HashMap<String, Resource>,
@ -223,7 +223,7 @@ where
state,
prefix: "/".to_owned(),
settings: ServerSettings::default(),
default: ResourceHandler::default_not_found(),
default: Rc::new(RefCell::new(ResourceHandler::default_not_found())),
resources: Vec::new(),
handlers: Vec::new(),
external: HashMap::new(),
@ -335,33 +335,34 @@ where
T: FromRequest<S> + 'static,
{
{
let parts = self.parts.as_mut().expect("Use after finish");
let parts: &mut ApplicationParts<S> = self.parts.as_mut().expect("Use after finish");
// get resource handler
let mut found = false;
for &mut (ref pattern, ref handler) in &mut parts.resources {
if handler.is_some() && pattern.pattern() == path {
found = true;
break;
}
}
let out = {
// get resource handler
let mut iterator = parts.resources.iter_mut();
if !found {
let mut handler = ResourceHandler::default();
handler.method(method).with(f);
let pattern = Resource::new(handler.get_name(), path);
parts.resources.push((pattern, Some(handler)));
} else {
for &mut (ref pattern, ref mut handler) in &mut parts.resources {
if let Some(ref mut handler) = *handler {
if pattern.pattern() == path {
handler.method(method).with(f);
break;
loop {
if let Some(&mut (ref pattern, ref mut handler)) = iterator.next() {
if let Some(ref mut handler) = *handler {
if pattern.pattern() == path {
handler.method(method).with(f);
break None;
}
}
} else {
let mut handler = ResourceHandler::default();
handler.method(method).with(f);
let pattern = Resource::new(handler.get_name(), path);
break Some((pattern, Some(handler)));
}
}
};
if let Some(out) = out {
parts.resources.push(out);
}
}
self
}
@ -473,7 +474,7 @@ where
{
{
let parts = self.parts.as_mut().expect("Use after finish");
f(&mut parts.default);
f(&mut parts.default.borrow_mut());
}
self
}
@ -614,7 +615,7 @@ where
/// Finish application configuration and create `HttpHandler` object.
pub fn finish(&mut self) -> HttpApplication<S> {
let parts = self.parts.take().expect("Use after finish");
let mut parts = self.parts.take().expect("Use after finish");
let prefix = parts.prefix.trim().trim_right_matches('/');
let (prefix, prefix_len) = if prefix.is_empty() {
("/".to_owned(), 0)
@ -627,11 +628,19 @@ where
resources.push((pattern, None));
}
for ref mut handler in parts.handlers.iter_mut() {
if let PrefixHandlerType::Scope(_, ref mut route_handler, _) = handler {
if !route_handler.has_default_resource() {
route_handler.default_resource(Rc::clone(&parts.default));
}
};
}
let (router, resources) = Router::new(&prefix, parts.settings, resources);
let inner = Rc::new(RefCell::new(Inner {
prefix: prefix_len,
default: parts.default,
default: Rc::clone(&parts.default),
encoding: parts.encoding,
handlers: parts.handlers,
resources,

View file

@ -6,7 +6,7 @@ use std::{fmt, io, mem, time};
use actix::resolver::{Connect as ResolveConnect, Resolver, ResolverError};
use actix::{
fut, Actor, ActorContext, ActorFuture, ActorResponse, Addr, AsyncContext, Context,
ContextFutureSpawner, Handler, Message, Recipient, StreamHandler, Supervised,
ContextFutureSpawner, Handler, Message, Recipient, StreamHandler2, Supervised,
SystemService, WrapFuture,
};
@ -220,7 +220,7 @@ impl Actor for ClientConnector {
self.resolver = Some(Resolver::from_registry())
}
self.collect_periodic(ctx);
ctx.add_stream(self.acq_rx.take().unwrap());
ctx.add_stream2(self.acq_rx.take().unwrap());
ctx.spawn(Maintenance);
}
}
@ -767,7 +767,7 @@ impl Handler<Connect> for ClientConnector {
}
}
impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
impl StreamHandler2<AcquiredConnOperation, ()> for ClientConnector {
fn handle(
&mut self, msg: Result<Option<AcquiredConnOperation>, ()>,
ctx: &mut Context<Self>,

View file

@ -392,7 +392,7 @@ impl Pipeline {
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => return Err(SendRequestError::Timeout),
Ok(Async::NotReady) => (),
Err(_) => return Err(SendRequestError::Timeout),
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e).into()),
}
}
Ok(())

View file

@ -24,7 +24,7 @@ pub use cookie::ParseError as CookieParseError;
use handler::Responder;
use httprequest::HttpRequest;
use httpresponse::{HttpResponse, InnerHttpResponse};
use httpresponse::{HttpResponse, HttpResponseParts};
/// A specialized [`Result`](https://doc.rust-lang.org/std/result/enum.Result.html)
/// for actix web operations
@ -654,7 +654,7 @@ pub struct InternalError<T> {
enum InternalErrorType {
Status(StatusCode),
Response(Mutex<Option<Box<InnerHttpResponse>>>),
Response(Box<Mutex<Option<HttpResponseParts>>>),
}
impl<T> InternalError<T> {
@ -669,12 +669,10 @@ impl<T> InternalError<T> {
/// Create `InternalError` with predefined `HttpResponse`.
pub fn from_response(cause: T, response: HttpResponse) -> Self {
let mut resp = response.into_inner();
resp.drop_unsupported_body();
let resp = response.into_parts();
InternalError {
cause,
status: InternalErrorType::Response(Mutex::new(Some(resp))),
status: InternalErrorType::Response(Box::new(Mutex::new(Some(resp)))),
backtrace: Backtrace::new(),
}
}
@ -716,7 +714,7 @@ where
InternalErrorType::Status(st) => HttpResponse::new(st),
InternalErrorType::Response(ref resp) => {
if let Some(resp) = resp.lock().unwrap().take() {
HttpResponse::from_inner(resp)
HttpResponse::from_parts(resp)
} else {
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR)
}

View file

@ -1,7 +1,7 @@
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::{fmt, str};
use std::rc::Rc;
use std::{fmt, str};
use bytes::Bytes;
use encoding::all::UTF_8;
@ -312,8 +312,10 @@ impl<T: fmt::Display> fmt::Display for Form<T> {
/// let app = App::new().resource(
/// "/index.html",
/// |r| {
/// r.method(http::Method::GET).with(index).limit(4096);
/// }, // <- change form extractor configuration
/// r.method(http::Method::GET)
/// // register form handler and change form extractor configuration
/// .with_config(index, |cfg| {cfg.limit(4096);})
/// },
/// );
/// }
/// ```
@ -328,7 +330,7 @@ impl<S> FormConfig<S> {
self.limit = limit;
self
}
/// Set custom error handler
pub fn error_handler<F>(&mut self, f: F) -> &mut Self
where
@ -408,8 +410,9 @@ impl<S: 'static> FromRequest<S> for Bytes {
/// fn main() {
/// let app = App::new().resource("/index.html", |r| {
/// r.method(http::Method::GET)
/// .with(index) // <- register handler with extractor params
/// .limit(4096); // <- limit size of the payload
/// .with_config(index, |cfg| { // <- register handler with extractor params
/// cfg.limit(4096); // <- limit size of the payload
/// })
/// });
/// }
/// ```

View file

@ -1,5 +1,7 @@
use std::cell::RefCell;
use std::marker::PhantomData;
use std::ops::Deref;
use std::rc::Rc;
use futures::future::{err, ok, Future};
use futures::{Async, Poll};
@ -8,6 +10,7 @@ use error::Error;
use http::StatusCode;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use resource::ResourceHandler;
/// Trait defines object that could be registered as route handler
#[allow(unused_variables)]
@ -403,6 +406,14 @@ where
// /// Trait defines object that could be registered as resource route
pub(crate) trait RouteHandler<S>: 'static {
fn handle(&mut self, req: HttpRequest<S>) -> AsyncResult<HttpResponse>;
fn has_default_resource(&self) -> bool {
false
}
fn default_resource(&mut self, default: Rc<RefCell<ResourceHandler<S>>>) {
unimplemented!()
}
}
/// Route handler wrapper for Handler

View file

@ -299,12 +299,15 @@ impl HttpResponse {
self.get_mut().write_capacity = cap;
}
pub(crate) fn into_inner(mut self) -> Box<InnerHttpResponse> {
self.0.take().unwrap()
pub(crate) fn into_parts(mut self) -> HttpResponseParts {
self.0.take().unwrap().into_parts()
}
pub(crate) fn from_inner(inner: Box<InnerHttpResponse>) -> HttpResponse {
HttpResponse(Some(inner), HttpResponsePool::pool())
pub(crate) fn from_parts(parts: HttpResponseParts) -> HttpResponse {
HttpResponse(
Some(Box::new(InnerHttpResponse::from_parts(parts))),
HttpResponsePool::pool(),
)
}
}
@ -880,12 +883,12 @@ impl<'a, S> From<&'a HttpRequest<S>> for HttpResponseBuilder {
}
#[derive(Debug)]
pub(crate) struct InnerHttpResponse {
struct InnerHttpResponse {
version: Option<Version>,
headers: HeaderMap,
status: StatusCode,
reason: Option<&'static str>,
pub(crate) body: Body,
body: Body,
chunked: Option<bool>,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>,
@ -894,10 +897,16 @@ pub(crate) struct InnerHttpResponse {
error: Option<Error>,
}
/// This is here only because `failure::Fail: Send + Sync` which looks insane to me
unsafe impl Sync for InnerHttpResponse {}
/// This is here only because `failure::Fail: Send + Sync` which looks insane to me
unsafe impl Send for InnerHttpResponse {}
pub(crate) struct HttpResponseParts {
version: Option<Version>,
headers: HeaderMap,
status: StatusCode,
reason: Option<&'static str>,
body: Option<Bytes>,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>,
error: Option<Error>,
}
impl InnerHttpResponse {
#[inline]
@ -918,21 +927,52 @@ impl InnerHttpResponse {
}
/// This is for failure, we can not have Send + Sync on Streaming and Actor response
pub(crate) fn drop_unsupported_body(&mut self) {
let body = mem::replace(&mut self.body, Body::Empty);
match body {
Body::Empty => (),
Body::Binary(mut bin) => {
self.body = Body::Binary(bin.take().into());
}
fn into_parts(mut self) -> HttpResponseParts {
let body = match mem::replace(&mut self.body, Body::Empty) {
Body::Empty => None,
Body::Binary(mut bin) => Some(bin.take()),
Body::Streaming(_) | Body::Actor(_) => {
error!("Streaming or Actor body is not support by error response");
None
}
};
HttpResponseParts {
body,
version: self.version,
headers: self.headers,
status: self.status,
reason: self.reason,
encoding: self.encoding,
connection_type: self.connection_type,
error: self.error,
}
}
fn from_parts(parts: HttpResponseParts) -> InnerHttpResponse {
let body = if let Some(ref body) = parts.body {
Body::Binary(body.clone().into())
} else {
Body::Empty
};
InnerHttpResponse {
body,
status: parts.status,
version: parts.version,
headers: parts.headers,
reason: parts.reason,
chunked: None,
encoding: parts.encoding,
connection_type: parts.connection_type,
response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE,
error: parts.error,
}
}
}
/// Internal use only! unsafe
/// Internal use only!
pub(crate) struct HttpResponsePool(VecDeque<Box<InnerHttpResponse>>);
thread_local!(static POOL: Rc<UnsafeCell<HttpResponsePool>> = HttpResponsePool::pool());

View file

@ -171,12 +171,13 @@ where
/// fn main() {
/// let app = App::new().resource("/index.html", |r| {
/// r.method(http::Method::POST)
/// .with(index)
/// .limit(4096) // <- change json extractor configuration
/// .error_handler(|err, req| { // <- create custom error response
/// error::InternalError::from_response(
/// err, HttpResponse::Conflict().finish()).into()
/// });
/// .with_config(index, |cfg| {
/// cfg.limit(4096) // <- change json extractor configuration
/// .error_handler(|err, req| { // <- create custom error response
/// error::InternalError::from_response(
/// err, HttpResponse::Conflict().finish()).into()
/// });
/// })
/// });
/// }
/// ```
@ -326,7 +327,7 @@ mod tests {
use http::header;
use handler::Handler;
use with::{ExtractorConfig, With};
use with::With;
impl PartialEq for JsonPayloadError {
fn eq(&self, other: &JsonPayloadError) -> bool {
@ -409,7 +410,7 @@ mod tests {
#[test]
fn test_with_json() {
let mut cfg = ExtractorConfig::<_, Json<MyObject>>::default();
let mut cfg = JsonConfig::default();
cfg.limit(4096);
let mut handler = With::new(|data: Json<MyObject>| data, cfg);

View file

@ -246,7 +246,6 @@ pub mod dev {
pub use resource::ResourceHandler;
pub use route::Route;
pub use router::{Resource, ResourceType, Router};
pub use with::ExtractorConfig;
}
pub mod http {

View file

@ -1,5 +1,5 @@
//! Multipart requests support
use std::cell::RefCell;
use std::cell::{RefCell, UnsafeCell};
use std::marker::PhantomData;
use std::rc::Rc;
use std::{cmp, fmt};
@ -590,7 +590,7 @@ where
}
struct PayloadRef<S> {
payload: Rc<PayloadHelper<S>>,
payload: Rc<UnsafeCell<PayloadHelper<S>>>,
}
impl<S> PayloadRef<S>
@ -599,7 +599,7 @@ where
{
fn new(payload: PayloadHelper<S>) -> PayloadRef<S> {
PayloadRef {
payload: Rc::new(payload),
payload: Rc::new(payload.into()),
}
}
@ -609,7 +609,7 @@ where
{
if s.current() {
let payload: &mut PayloadHelper<S> =
unsafe { &mut *(self.payload.as_ref() as *const _ as *mut _) };
unsafe { &mut *self.payload.get() };
Some(payload)
} else {
None

View file

@ -17,7 +17,7 @@ use middleware::{
Started as MiddlewareStarted,
};
use pred::Predicate;
use with::{ExtractorConfig, With, WithAsync};
use with::{With, WithAsync};
/// Resource route definition
///
@ -164,15 +164,49 @@ impl<S: 'static> Route<S> {
/// ); // <- use `with` extractor
/// }
/// ```
pub fn with<T, F, R>(&mut self, handler: F) -> ExtractorConfig<S, T>
pub fn with<T, F, R>(&mut self, handler: F)
where
F: Fn(T) -> R + 'static,
R: Responder + 'static,
T: FromRequest<S> + 'static,
{
let cfg = ExtractorConfig::<S, T>::default();
self.h(With::new(handler, cfg.clone()));
cfg
self.h(With::new(handler, <T::Config as Default>::default()));
}
/// Set handler function. Same as `.with()` but it allows to configure
/// extractor.
///
/// ```rust
/// # extern crate bytes;
/// # extern crate actix_web;
/// # extern crate futures;
/// #[macro_use] extern crate serde_derive;
/// use actix_web::{http, App, Path, Result};
///
/// /// extract text data from request
/// fn index(body: String) -> Result<String> {
/// Ok(format!("Body {}!", body))
/// }
///
/// fn main() {
/// let app = App::new().resource("/index.html", |r| {
/// r.method(http::Method::GET)
/// .with_config(index, |cfg| { // <- register handler
/// cfg.limit(4096); // <- limit size of the payload
/// })
/// });
/// }
/// ```
pub fn with_config<T, F, R, C>(&mut self, handler: F, cfg_f: C)
where
F: Fn(T) -> R + 'static,
R: Responder + 'static,
T: FromRequest<S> + 'static,
C: FnOnce(&mut T::Config),
{
let mut cfg = <T::Config as Default>::default();
cfg_f(&mut cfg);
self.h(With::new(handler, cfg));
}
/// Set async handler function, use request extractor for parameters.
@ -204,7 +238,7 @@ impl<S: 'static> Route<S> {
/// ); // <- use `with` extractor
/// }
/// ```
pub fn with_async<T, F, R, I, E>(&mut self, handler: F) -> ExtractorConfig<S, T>
pub fn with_async<T, F, R, I, E>(&mut self, handler: F)
where
F: Fn(T) -> R + 'static,
R: Future<Item = I, Error = E> + 'static,
@ -212,9 +246,52 @@ impl<S: 'static> Route<S> {
E: Into<Error> + 'static,
T: FromRequest<S> + 'static,
{
let cfg = ExtractorConfig::<S, T>::default();
self.h(WithAsync::new(handler, cfg.clone()));
cfg
self.h(WithAsync::new(handler, <T::Config as Default>::default()));
}
/// Set async handler function, use request extractor for parameters.
/// This method allows to configure extractor.
///
/// ```rust
/// # extern crate bytes;
/// # extern crate actix_web;
/// # extern crate futures;
/// #[macro_use] extern crate serde_derive;
/// use actix_web::{http, App, Error, Form};
/// use futures::Future;
///
/// #[derive(Deserialize)]
/// struct Info {
/// username: String,
/// }
///
/// /// extract path info using serde
/// fn index(info: Form<Info>) -> Box<Future<Item = &'static str, Error = Error>> {
/// unimplemented!()
/// }
///
/// fn main() {
/// let app = App::new().resource(
/// "/{username}/index.html", // <- define path parameters
/// |r| r.method(http::Method::GET)
/// .with_async_config(index, |cfg| {
/// cfg.limit(4096);
/// }),
/// ); // <- use `with` extractor
/// }
/// ```
pub fn with_async_config<T, F, R, I, E, C>(&mut self, handler: F, cfg: C)
where
F: Fn(T) -> R + 'static,
R: Future<Item = I, Error = E> + 'static,
I: Responder + 'static,
E: Into<Error> + 'static,
T: FromRequest<S> + 'static,
C: FnOnce(&mut T::Config),
{
let mut extractor_cfg = <T::Config as Default>::default();
cfg(&mut extractor_cfg);
self.h(WithAsync::new(handler, extractor_cfg));
}
}
@ -241,7 +318,7 @@ impl<S: 'static> InnerHandler<S> {
#[inline]
pub fn handle(&self, req: HttpRequest<S>) -> AsyncResult<HttpResponse> {
// reason: handler is unique per thread, handler get called from async code only
// reason: handler is unique per thread, handler get called from sync code only
let h = unsafe { &mut *self.0.as_ref().get() };
h.handle(req)
}

View file

@ -309,7 +309,7 @@ impl Resource {
params.set_tail(len as u16);
for (idx, segment) in segments.iter().enumerate() {
// reason: Router is part of App, which is unique per thread
// app is alive during whole life of tthread
// app is alive during whole life of a thread
let name = unsafe { &*(names[idx].as_str() as *const _) };
params.add(name, *segment);
}
@ -378,7 +378,7 @@ impl Resource {
params.set_tail(tail_len as u16);
for (idx, segment) in segments.iter().enumerate() {
// reason: Router is part of App, which is unique per thread
// app is alive during whole life of tthread
// app is alive during whole life of a thread
let name = unsafe { &*(names[idx].as_str() as *const _) };
params.add(name, *segment);
}

View file

@ -405,6 +405,14 @@ impl<S: 'static> RouteHandler<S> for Scope<S> {
unimplemented!()
}
}
fn has_default_resource(&self) -> bool {
self.default.is_some()
}
fn default_resource(&mut self, default: ScopeResource<S>) {
self.default = Some(default);
}
}
struct Wrapper<S: 'static> {
@ -1188,4 +1196,27 @@ mod tests {
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
}
#[test]
fn test_default_resource_propagation() {
let mut app = App::new()
.scope("/app1", |scope| {
scope.default_resource(|r| r.f(|_| HttpResponse::BadRequest()))
})
.scope("/app2", |scope| scope)
.default_resource(|r| r.f(|_| HttpResponse::MethodNotAllowed()))
.finish();
let req = TestRequest::with_uri("/non-exist").finish();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
let req = TestRequest::with_uri("/app1/non-exist").finish();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::BAD_REQUEST);
let req = TestRequest::with_uri("/app2/non-exist").finish();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
}
}

View file

@ -73,12 +73,11 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_data(&mut self, data: &[u8]) -> io::Result<usize> {
fn write_data(stream: &mut T, data: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < data.len() {
match self.stream.write(&data[written..]) {
match stream.write(&data[written..]) {
Ok(0) => {
self.disconnected();
return Err(io::Error::new(io::ErrorKind::WriteZero, ""));
}
Ok(n) => {
@ -243,7 +242,16 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
if self.flags.contains(Flags::UPGRADE) {
if self.buffer.is_empty() {
let pl: &[u8] = payload.as_ref();
let n = self.write_data(pl)?;
let n = match Self::write_data(&mut self.stream, pl) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
return Err(err);
}
Ok(val) => val,
};
if n < pl.len() {
self.buffer.extend_from_slice(&pl[n..]);
return Ok(WriterState::Done);
@ -284,9 +292,18 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
if !self.buffer.is_empty() {
let buf: &[u8] =
unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) };
let written = self.write_data(buf)?;
let written = {
match Self::write_data(&mut self.stream, self.buffer.as_ref()) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
return Err(err);
}
Ok(val) => val,
}
};
let _ = self.buffer.split_to(written);
if shutdown && !self.buffer.is_empty()
|| (self.buffer.len() > self.buffer_capacity)

View file

@ -5,7 +5,7 @@ use std::{io, net, thread};
use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
Response, StreamHandler2, System, WrapFuture,
};
use futures::sync::mpsc;
@ -449,7 +449,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
ctx.add_stream2(rx);
self
});
if let Some(signals) = signals {
@ -611,7 +611,7 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
}
/// Commands from accept threads
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
impl<H: IntoHttpHandler> StreamHandler2<ServerCommand, ()> for HttpServer<H> {
fn handle(&mut self, msg: Result<Option<ServerCommand>, ()>, _: &mut Context<Self>) {
if let Ok(Some(ServerCommand::WorkerDied(idx, socks))) = msg {
let mut found = false;

View file

@ -1,7 +1,6 @@
use futures::{Async, Future, Poll};
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use error::Error;
@ -9,110 +8,24 @@ use handler::{AsyncResult, AsyncResultItem, FromRequest, Handler, Responder};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// Extractor configuration
///
/// `Route::with()` and `Route::with_async()` returns instance
/// of the `ExtractorConfig` type. It could be used for extractor configuration.
///
/// In this example `Form<FormData>` configured.
///
/// ```rust
/// # extern crate actix_web;
/// #[macro_use] extern crate serde_derive;
/// use actix_web::{http, App, Form, Result};
///
/// #[derive(Deserialize)]
/// struct FormData {
/// username: String,
/// }
///
/// fn index(form: Form<FormData>) -> Result<String> {
/// Ok(format!("Welcome {}!", form.username))
/// }
///
/// fn main() {
/// let app = App::new().resource(
/// "/index.html",
/// |r| {
/// r.method(http::Method::GET).with(index).limit(4096);
/// }, // <- change form extractor configuration
/// );
/// }
/// ```
///
/// Same could be donce with multiple extractors
///
/// ```rust
/// # extern crate actix_web;
/// #[macro_use] extern crate serde_derive;
/// use actix_web::{http, App, Form, Path, Result};
///
/// #[derive(Deserialize)]
/// struct FormData {
/// username: String,
/// }
///
/// fn index(data: (Path<(String,)>, Form<FormData>)) -> Result<String> {
/// Ok(format!("Welcome {}!", data.1.username))
/// }
///
/// fn main() {
/// let app = App::new().resource(
/// "/index.html",
/// |r| {
/// r.method(http::Method::GET).with(index).1.limit(4096);
/// }, // <- change form extractor configuration
/// );
/// }
/// ```
pub struct ExtractorConfig<S: 'static, T: FromRequest<S>> {
cfg: Rc<UnsafeCell<T::Config>>,
pub(crate) struct With<T, S, F, R>
where
F: Fn(T) -> R,
T: FromRequest<S>,
S: 'static,
{
hnd: Rc<WithHnd<T, S, F, R>>,
cfg: Rc<T::Config>,
}
impl<S: 'static, T: FromRequest<S>> Default for ExtractorConfig<S, T> {
fn default() -> Self {
ExtractorConfig {
cfg: Rc::new(UnsafeCell::new(T::Config::default())),
}
}
}
impl<S: 'static, T: FromRequest<S>> ExtractorConfig<S, T> {
pub(crate) fn clone(&self) -> Self {
ExtractorConfig {
cfg: Rc::clone(&self.cfg),
}
}
}
impl<S: 'static, T: FromRequest<S>> AsRef<T::Config> for ExtractorConfig<S, T> {
fn as_ref(&self) -> &T::Config {
unsafe { &*self.cfg.get() }
}
}
impl<S: 'static, T: FromRequest<S>> Deref for ExtractorConfig<S, T> {
type Target = T::Config;
fn deref(&self) -> &T::Config {
unsafe { &*self.cfg.get() }
}
}
impl<S: 'static, T: FromRequest<S>> DerefMut for ExtractorConfig<S, T> {
fn deref_mut(&mut self) -> &mut T::Config {
unsafe { &mut *self.cfg.get() }
}
}
pub struct With<T, S, F, R>
pub struct WithHnd<T, S, F, R>
where
F: Fn(T) -> R,
T: FromRequest<S>,
S: 'static,
{
hnd: Rc<UnsafeCell<F>>,
cfg: ExtractorConfig<S, T>,
_t: PhantomData<T>,
_s: PhantomData<S>,
}
@ -122,11 +35,14 @@ where
T: FromRequest<S>,
S: 'static,
{
pub fn new(f: F, cfg: ExtractorConfig<S, T>) -> Self {
pub fn new(f: F, cfg: T::Config) -> Self {
With {
cfg,
hnd: Rc::new(UnsafeCell::new(f)),
_s: PhantomData,
cfg: Rc::new(cfg),
hnd: Rc::new(WithHnd {
hnd: Rc::new(UnsafeCell::new(f)),
_t: PhantomData,
_s: PhantomData,
}),
}
}
}
@ -166,8 +82,8 @@ where
S: 'static,
{
started: bool,
hnd: Rc<UnsafeCell<F>>,
cfg: ExtractorConfig<S, T>,
hnd: Rc<WithHnd<T, S, F, R>>,
cfg: Rc<T::Config>,
req: HttpRequest<S>,
fut1: Option<Box<Future<Item = T, Error = Error>>>,
fut2: Option<Box<Future<Item = HttpResponse, Error = Error>>>,
@ -206,24 +122,32 @@ where
}
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
let item = match (*hnd)(item).respond_to(&self.req) {
Ok(item) => item.into(),
Err(e) => return Err(e.into()),
};
match item.into() {
AsyncResultItem::Err(err) => Err(err),
AsyncResultItem::Ok(resp) => Ok(Async::Ready(resp)),
AsyncResultItem::Future(fut) => {
self.fut2 = Some(fut);
self.poll()
let fut = {
// clone handler, inicrease ref counter
let h = self.hnd.as_ref().hnd.clone();
// Enforce invariants before entering unsafe code.
// Only two references could exists With struct owns one, and line above
if Rc::weak_count(&h) != 0 || Rc::strong_count(&h) != 2 {
panic!("Multiple copies of handler are in use")
}
}
let hnd: &mut F = unsafe { &mut *h.as_ref().get() };
let item = match (*hnd)(item).respond_to(&self.req) {
Ok(item) => item.into(),
Err(e) => return Err(e.into()),
};
match item.into() {
AsyncResultItem::Err(err) => return Err(err),
AsyncResultItem::Ok(resp) => return Ok(Async::Ready(resp)),
AsyncResultItem::Future(fut) => fut,
}
};
self.fut2 = Some(fut);
self.poll()
}
}
pub struct WithAsync<T, S, F, R, I, E>
pub(crate) struct WithAsync<T, S, F, R, I, E>
where
F: Fn(T) -> R,
R: Future<Item = I, Error = E>,
@ -232,9 +156,8 @@ where
T: FromRequest<S>,
S: 'static,
{
hnd: Rc<UnsafeCell<F>>,
cfg: ExtractorConfig<S, T>,
_s: PhantomData<S>,
hnd: Rc<WithHnd<T, S, F, R>>,
cfg: Rc<T::Config>,
}
impl<T, S, F, R, I, E> WithAsync<T, S, F, R, I, E>
@ -246,11 +169,14 @@ where
T: FromRequest<S>,
S: 'static,
{
pub fn new(f: F, cfg: ExtractorConfig<S, T>) -> Self {
pub fn new(f: F, cfg: T::Config) -> Self {
WithAsync {
cfg,
hnd: Rc::new(UnsafeCell::new(f)),
_s: PhantomData,
cfg: Rc::new(cfg),
hnd: Rc::new(WithHnd {
hnd: Rc::new(UnsafeCell::new(f)),
_s: PhantomData,
_t: PhantomData,
}),
}
}
}
@ -271,7 +197,7 @@ where
req,
started: false,
hnd: Rc::clone(&self.hnd),
cfg: self.cfg.clone(),
cfg: Rc::clone(&self.cfg),
fut1: None,
fut2: None,
fut3: None,
@ -295,8 +221,8 @@ where
S: 'static,
{
started: bool,
hnd: Rc<UnsafeCell<F>>,
cfg: ExtractorConfig<S, T>,
hnd: Rc<WithHnd<T, S, F, R>>,
cfg: Rc<T::Config>,
req: HttpRequest<S>,
fut1: Option<Box<Future<Item = T, Error = Error>>>,
fut2: Option<R>,
@ -356,8 +282,17 @@ where
}
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
self.fut2 = Some((*hnd)(item));
self.fut2 = {
// clone handler, inicrease ref counter
let h = self.hnd.as_ref().hnd.clone();
// Enforce invariants before entering unsafe code.
// Only two references could exists With struct owns one, and line above
if Rc::weak_count(&h) != 0 || Rc::strong_count(&h) != 2 {
panic!("Multiple copies of handler are in use")
}
let hnd: &mut F = unsafe { &mut *h.as_ref().get() };
Some((*hnd)(item))
};
self.poll()
}
}

View file

@ -127,6 +127,7 @@ pub struct Client {
protocols: Option<String>,
conn: Addr<ClientConnector>,
max_size: usize,
no_masking: bool,
}
impl Client {
@ -144,6 +145,7 @@ impl Client {
origin: None,
protocols: None,
max_size: 65_536,
no_masking: false,
conn,
};
cl.request.uri(uri.as_ref());
@ -198,6 +200,12 @@ impl Client {
self
}
/// Disable payload masking. By default ws client masks frame payload.
pub fn no_masking(mut self) -> Self {
self.no_masking = true;
self
}
/// Set request header
pub fn header<K, V>(mut self, key: K, value: V) -> Self
where
@ -260,7 +268,7 @@ impl Client {
}
// start handshake
ClientHandshake::new(request, self.max_size)
ClientHandshake::new(request, self.max_size, self.no_masking)
}
}
}
@ -281,10 +289,13 @@ pub struct ClientHandshake {
key: String,
error: Option<ClientError>,
max_size: usize,
no_masking: bool,
}
impl ClientHandshake {
fn new(mut request: ClientRequest, max_size: usize) -> ClientHandshake {
fn new(
mut request: ClientRequest, max_size: usize, no_masking: bool,
) -> ClientHandshake {
// Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that,
// when decoded, is 16 bytes in length (RFC 6455)
@ -304,6 +315,7 @@ impl ClientHandshake {
ClientHandshake {
key,
max_size,
no_masking,
request: Some(request.send()),
tx: Some(tx),
error: None,
@ -317,6 +329,7 @@ impl ClientHandshake {
tx: None,
error: Some(err),
max_size: 0,
no_masking: false,
}
}
@ -427,6 +440,7 @@ impl Future for ClientHandshake {
ClientReader {
inner: Rc::clone(&inner),
max_size: self.max_size,
no_masking: self.no_masking,
},
ClientWriter { inner },
)))
@ -437,6 +451,7 @@ impl Future for ClientHandshake {
pub struct ClientReader {
inner: Rc<RefCell<Inner>>,
max_size: usize,
no_masking: bool,
}
impl fmt::Debug for ClientReader {
@ -451,13 +466,14 @@ impl Stream for ClientReader {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let max_size = self.max_size;
let no_masking = self.no_masking;
let mut inner = self.inner.borrow_mut();
if inner.closed {
return Ok(Async::Ready(None));
}
// read
match Frame::parse(&mut inner.rx, false, max_size) {
match Frame::parse(&mut inner.rx, no_masking, max_size) {
Ok(Async::Ready(Some(frame))) => {
let (_finished, opcode, payload) = frame.unpack();

View file

@ -25,12 +25,12 @@
//!
//! // Handler for ws::Message messages
//! impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
//! fn handle(&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>, ctx: &mut Self::Context) {
//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
//! match msg {
//! Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
//! Ok(Some(ws::Message::Text(text))) => ctx.text(text),
//! Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
//! _ => ctx.stop(),
//! ws::Message::Ping(msg) => ctx.pong(&msg),
//! ws::Message::Text(text) => ctx.text(text),
//! ws::Message::Binary(bin) => ctx.binary(bin),
//! _ => (),
//! }
//! }
//! }

View file

@ -42,6 +42,28 @@ fn test_path_extractor() {
assert_eq!(bytes, Bytes::from_static(b"Welcome test!"));
}
#[test]
fn test_async_handler() {
let mut srv = test::TestServer::new(|app| {
app.resource("/{username}/index.html", |r| {
r.route().with(|p: Path<PParam>| {
Delay::new(Instant::now() + Duration::from_millis(10))
.and_then(move |_| Ok(format!("Welcome {}!", p.username)))
.responder()
})
});
});
// client request
let request = srv.get().uri(srv.url("/test/index.html")).finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(b"Welcome test!"));
}
#[test]
fn test_query_extractor() {
let mut srv = test::TestServer::new(|app| {
@ -130,14 +152,17 @@ fn test_form_extractor() {
fn test_form_extractor2() {
let mut srv = test::TestServer::new(|app| {
app.resource("/{username}/index.html", |r| {
r.route()
.with(|form: Form<FormData>| format!("{}", form.username))
.error_handler(|err, _| {
error::InternalError::from_response(
err,
HttpResponse::Conflict().finish(),
).into()
});
r.route().with_config(
|form: Form<FormData>| format!("{}", form.username),
|cfg| {
cfg.error_handler(|err, _| {
error::InternalError::from_response(
err,
HttpResponse::Conflict().finish(),
).into()
});
},
);
});
});

View file

@ -23,16 +23,13 @@ impl Actor for Ws {
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn handle(
&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
Ok(Some(ws::Message::Text(text))) => ctx.text(text),
Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
Ok(Some(ws::Message::Close(reason))) => ctx.close(reason),
_ => ctx.stop(),
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
}
}
}
@ -156,16 +153,13 @@ impl Ws2 {
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
fn handle(
&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
Ok(Some(ws::Message::Text(text))) => ctx.text(text),
Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
Ok(Some(ws::Message::Close(reason))) => ctx.close(reason),
_ => ctx.stop(),
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
}
}
}