1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-29 21:11:17 +00:00

add request chain services

This commit is contained in:
Nikolay Kim 2019-03-02 11:53:05 -08:00
parent fdf3011837
commit cc20fee628
6 changed files with 360 additions and 150 deletions

View file

@ -2,7 +2,7 @@ use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use actix_http::body::{Body, MessageBody};
use actix_http::body::MessageBody;
use actix_http::{Extensions, PayloadStream, Request, Response};
use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url};
use actix_service::boxed::{self, BoxedNewService, BoxedService};
@ -30,32 +30,45 @@ pub trait HttpServiceFactory<Request> {
}
/// Application builder
pub struct App<P, B, T> {
services: Vec<(ResourceDef, HttpNewService<P>)>,
default: Option<Rc<HttpNewService<P>>>,
defaults: Vec<Rc<RefCell<Option<Rc<HttpNewService<P>>>>>>,
endpoint: T,
factory_ref: Rc<RefCell<Option<AppFactory<P>>>>,
pub struct App<P, T>
where
T: NewService<Request = ServiceRequest<PayloadStream>, Response = ServiceRequest<P>>,
{
chain: T,
extensions: Extensions,
state: Vec<Box<StateFactory>>,
_t: PhantomData<(P, B)>,
_t: PhantomData<(P,)>,
}
impl App<PayloadStream, Body, AppEntry<PayloadStream>> {
/// Create application with empty state. Application can
impl App<PayloadStream, AppChain> {
/// Create application builder with empty state. Application can
/// be configured with a builder-like pattern.
pub fn new() -> Self {
App::create()
App {
chain: AppChain,
extensions: Extensions::new(),
state: Vec::new(),
_t: PhantomData,
}
}
}
impl Default for App<PayloadStream, Body, AppEntry<PayloadStream>> {
impl Default for App<PayloadStream, AppChain> {
fn default() -> Self {
App::new()
}
}
impl App<PayloadStream, Body, AppEntry<PayloadStream>> {
impl<P, T> App<P, T>
where
P: 'static,
T: NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P>,
Error = (),
InitError = (),
>,
{
/// Create application with specified state. Application can be
/// configured with a builder-like pattern.
///
@ -86,38 +99,172 @@ impl App<PayloadStream, Body, AppEntry<PayloadStream>> {
self
}
fn create() -> Self {
/// Configure resource for a specific path.
///
/// Resources may have variable path segments. For example, a
/// resource with the path `/a/{name}/c` would match all incoming
/// requests with paths such as `/a/b/c`, `/a/1/c`, or `/a/etc/c`.
///
/// A variable segment is specified in the form `{identifier}`,
/// where the identifier can be used later in a request handler to
/// access the matched value for that segment. This is done by
/// looking up the identifier in the `Params` object returned by
/// `HttpRequest.match_info()` method.
///
/// By default, each segment matches the regular expression `[^{}/]+`.
///
/// You can also specify a custom regex in the form `{identifier:regex}`:
///
/// For instance, to route `GET`-requests on any route matching
/// `/users/{userid}/{friend}` and store `userid` and `friend` in
/// the exposed `Params` object:
///
/// ```rust,ignore
/// # extern crate actix_web;
/// use actix_web::{http, App, HttpResponse};
///
/// fn main() {
/// let app = App::new().resource("/users/{userid}/{friend}", |r| {
/// r.get(|r| r.to(|_| HttpResponse::Ok()));
/// r.head(|r| r.to(|_| HttpResponse::MethodNotAllowed()))
/// });
/// }
/// ```
pub fn resource<F, U, B>(self, path: &str, f: F) -> AppRouter<T, P, B, AppEntry<P>>
where
F: FnOnce(Resource<P>) -> Resource<P, U>,
U: NewService<
Request = ServiceRequest<P>,
Response = ServiceResponse,
Error = (),
InitError = (),
> + 'static,
{
let rdef = ResourceDef::new(path);
let resource = f(Resource::new());
let default = resource.get_default();
let fref = Rc::new(RefCell::new(None));
AppRouter {
chain: self.chain,
services: vec![(rdef, boxed::new_service(resource.into_new_service()))],
default: None,
defaults: vec![default],
endpoint: AppEntry::new(fref.clone()),
factory_ref: fref,
extensions: self.extensions,
state: self.state,
_t: PhantomData,
}
}
/// Register a middleware.
pub fn middleware<M, B, F>(
self,
mw: F,
) -> AppRouter<
T,
P,
B,
impl NewService<
Request = ServiceRequest<P>,
Response = ServiceResponse<B>,
Error = (),
InitError = (),
>,
>
where
M: NewTransform<
AppService<P>,
Request = ServiceRequest<P>,
Response = ServiceResponse<B>,
Error = (),
InitError = (),
>,
B: MessageBody,
F: IntoNewTransform<M, AppService<P>>,
{
let fref = Rc::new(RefCell::new(None));
let endpoint = ApplyNewService::new(mw, AppEntry::new(fref.clone()));
AppRouter {
endpoint,
chain: self.chain,
state: self.state,
services: Vec::new(),
default: None,
defaults: Vec::new(),
factory_ref: fref,
extensions: self.extensions,
_t: PhantomData,
}
}
/// Register a request modifier. It can modify any request parameters
/// including payload stream.
pub fn chain<C, F, P1>(
self,
chain: C,
) -> App<
P1,
impl NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P1>,
Error = (),
InitError = (),
>,
>
where
C: NewService<
(),
Request = ServiceRequest<P>,
Response = ServiceRequest<P1>,
Error = (),
InitError = (),
>,
F: IntoNewService<C>,
{
let chain = self.chain.and_then(chain.into_new_service());
App {
chain,
state: self.state,
extensions: self.extensions,
_t: PhantomData,
}
}
/// Complete applicatin chain configuration and start resource
/// configuration.
pub fn router<B>(self) -> AppRouter<T, P, B, AppEntry<P>> {
let fref = Rc::new(RefCell::new(None));
AppRouter {
chain: self.chain,
services: Vec::new(),
default: None,
defaults: Vec::new(),
endpoint: AppEntry::new(fref.clone()),
factory_ref: fref,
extensions: Extensions::new(),
state: Vec::new(),
extensions: self.extensions,
state: self.state,
_t: PhantomData,
}
}
}
// /// Application router builder
// pub struct AppRouter<S, T, P> {
// services: Vec<(
// ResourceDef,
// BoxedHttpNewService<ServiceRequest<P>, Response>,
// )>,
// default: Option<Rc<HttpDefaultNewService<ServiceRequest<P>, Response>>>,
// defaults:
// Vec<Rc<RefCell<Option<Rc<HttpDefaultNewService<ServiceRequest<P>, Response>>>>>>,
// state: AppState<S>,
// endpoint: T,
// factory_ref: Rc<RefCell<Option<AppFactory<P>>>>,
// extensions: Extensions,
// _t: PhantomData<P>,
// }
/// Structure that follows the builder pattern for building application
/// instances.
pub struct AppRouter<C, P, B, T> {
chain: C,
services: Vec<(ResourceDef, HttpNewService<P>)>,
default: Option<Rc<HttpNewService<P>>>,
defaults: Vec<Rc<RefCell<Option<Rc<HttpNewService<P>>>>>>,
endpoint: T,
factory_ref: Rc<RefCell<Option<AppFactory<P>>>>,
extensions: Extensions,
state: Vec<Box<StateFactory>>,
_t: PhantomData<(P, B)>,
}
impl<P, B, T> App<P, B, T>
impl<C, P, B, T> AppRouter<C, P, B, T>
where
P: 'static,
B: MessageBody,
@ -221,7 +368,8 @@ where
pub fn middleware<M, B1, F>(
self,
mw: F,
) -> App<
) -> AppRouter<
C,
P,
B1,
impl NewService<
@ -243,14 +391,15 @@ where
F: IntoNewTransform<M, T::Service>,
{
let endpoint = ApplyNewService::new(mw, self.endpoint);
App {
AppRouter {
endpoint,
chain: self.chain,
state: self.state,
services: self.services,
default: self.default,
defaults: Vec::new(),
defaults: self.defaults,
factory_ref: self.factory_ref,
extensions: Extensions::new(),
extensions: self.extensions,
_t: PhantomData,
}
}
@ -292,8 +441,8 @@ where
}
}
impl<T, P: 'static, B: MessageBody>
IntoNewService<AndThenNewService<AppStateFactory<P>, T, ()>> for App<P, B, T>
impl<C, T, P: 'static, B: MessageBody>
IntoNewService<AndThenNewService<AppInit<C, P>, T, ()>> for AppRouter<C, P, B, T>
where
T: NewService<
Request = ServiceRequest<P>,
@ -301,8 +450,14 @@ where
Error = (),
InitError = (),
>,
C: NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P>,
Error = (),
InitError = (),
>,
{
fn into_new_service(self) -> AndThenNewService<AppStateFactory<P>, T, ()> {
fn into_new_service(self) -> AndThenNewService<AppInit<C, P>, T, ()> {
// update resource default service
if self.default.is_some() {
for default in &self.defaults {
@ -317,99 +472,15 @@ where
services: Rc::new(self.services),
});
AppStateFactory {
AppInit {
chain: self.chain,
state: self.state,
extensions: Rc::new(RefCell::new(Rc::new(self.extensions))),
_t: PhantomData,
}
.and_then(self.endpoint)
}
}
/// Service factory to convert `Request` to a `ServiceRequest<S>`
pub struct AppStateFactory<P> {
state: Vec<Box<StateFactory>>,
extensions: Rc<RefCell<Rc<Extensions>>>,
_t: PhantomData<P>,
}
impl<P: 'static> NewService for AppStateFactory<P> {
type Request = Request<P>;
type Response = ServiceRequest<P>;
type Error = ();
type InitError = ();
type Service = AppStateService<P>;
type Future = AppStateFactoryResult<P>;
fn new_service(&self, _: &()) -> Self::Future {
AppStateFactoryResult {
state: self.state.iter().map(|s| s.construct()).collect(),
extensions: self.extensions.clone(),
_t: PhantomData,
}
}
}
#[doc(hidden)]
pub struct AppStateFactoryResult<P> {
state: Vec<Box<StateFactoryResult>>,
extensions: Rc<RefCell<Rc<Extensions>>>,
_t: PhantomData<P>,
}
impl<P> Future for AppStateFactoryResult<P> {
type Item = AppStateService<P>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(extensions) = Rc::get_mut(&mut *self.extensions.borrow_mut()) {
let mut idx = 0;
while idx < self.state.len() {
if let Async::Ready(_) = self.state[idx].poll_result(extensions)? {
self.state.remove(idx);
} else {
idx += 1;
}
}
if !self.state.is_empty() {
return Ok(Async::NotReady);
}
} else {
log::warn!("Multiple copies of app extensions exists");
}
Ok(Async::Ready(AppStateService {
extensions: self.extensions.borrow().clone(),
_t: PhantomData,
}))
}
}
/// Service to convert `Request` to a `ServiceRequest<S>`
pub struct AppStateService<P> {
extensions: Rc<Extensions>,
_t: PhantomData<P>,
}
impl<P> Service for AppStateService<P> {
type Request = Request<P>;
type Response = ServiceRequest<P>;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Request<P>) -> Self::Future {
ok(ServiceRequest::new(
Path::new(Url::new(req.uri().clone())),
req,
self.extensions.clone(),
))
}
}
pub struct AppFactory<P> {
services: Rc<Vec<(ResourceDef, HttpNewService<P>)>>,
}
@ -530,17 +601,6 @@ impl<P> Service for AppService<P> {
}
}
pub struct AppServiceResponse(Box<Future<Item = ServiceResponse, Error = ()>>);
impl Future for AppServiceResponse {
type Item = ServiceResponse;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(|_| panic!())
}
}
#[doc(hidden)]
pub struct AppEntry<P> {
factory: Rc<RefCell<Option<AppFactory<P>>>>,
@ -564,3 +624,151 @@ impl<P: 'static> NewService for AppEntry<P> {
self.factory.borrow_mut().as_mut().unwrap().new_service(&())
}
}
#[doc(hidden)]
pub struct AppChain;
impl NewService<()> for AppChain {
type Request = ServiceRequest<PayloadStream>;
type Response = ServiceRequest<PayloadStream>;
type Error = ();
type InitError = ();
type Service = AppChain;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(AppChain)
}
}
impl Service for AppChain {
type Request = ServiceRequest<PayloadStream>;
type Response = ServiceRequest<PayloadStream>;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
ok(req)
}
}
/// Service factory to convert `Request` to a `ServiceRequest<S>`
pub struct AppInit<C, P>
where
C: NewService<Request = ServiceRequest<PayloadStream>, Response = ServiceRequest<P>>,
{
chain: C,
state: Vec<Box<StateFactory>>,
extensions: Rc<RefCell<Rc<Extensions>>>,
}
impl<C, P: 'static> NewService for AppInit<C, P>
where
C: NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P>,
InitError = (),
>,
{
type Request = Request<PayloadStream>;
type Response = ServiceRequest<P>;
type Error = C::Error;
type InitError = C::InitError;
type Service = AppInitService<C::Service, P>;
type Future = AppInitResult<C, P>;
fn new_service(&self, _: &()) -> Self::Future {
AppInitResult {
chain: self.chain.new_service(&()),
state: self.state.iter().map(|s| s.construct()).collect(),
extensions: self.extensions.clone(),
}
}
}
#[doc(hidden)]
pub struct AppInitResult<C, P>
where
C: NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P>,
InitError = (),
>,
{
chain: C::Future,
state: Vec<Box<StateFactoryResult>>,
extensions: Rc<RefCell<Rc<Extensions>>>,
}
impl<C, P> Future for AppInitResult<C, P>
where
C: NewService<
Request = ServiceRequest<PayloadStream>,
Response = ServiceRequest<P>,
InitError = (),
>,
{
type Item = AppInitService<C::Service, P>;
type Error = C::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(extensions) = Rc::get_mut(&mut *self.extensions.borrow_mut()) {
let mut idx = 0;
while idx < self.state.len() {
if let Async::Ready(_) = self.state[idx].poll_result(extensions)? {
self.state.remove(idx);
} else {
idx += 1;
}
}
if !self.state.is_empty() {
return Ok(Async::NotReady);
}
} else {
log::warn!("Multiple copies of app extensions exists");
}
let chain = futures::try_ready!(self.chain.poll());
Ok(Async::Ready(AppInitService {
chain,
extensions: self.extensions.borrow().clone(),
}))
}
}
/// Service to convert `Request` to a `ServiceRequest<S>`
pub struct AppInitService<C, P>
where
C: Service<Request = ServiceRequest<PayloadStream>, Response = ServiceRequest<P>>,
{
chain: C,
extensions: Rc<Extensions>,
}
impl<C, P> Service for AppInitService<C, P>
where
C: Service<Request = ServiceRequest<PayloadStream>, Response = ServiceRequest<P>>,
{
type Request = Request<PayloadStream>;
type Response = ServiceRequest<P>;
type Error = C::Error;
type Future = C::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.chain.poll_ready()
}
fn call(&mut self, req: Request<PayloadStream>) -> Self::Future {
let req = ServiceRequest::new(
Path::new(Url::new(req.uri().clone())),
req,
self.extensions.clone(),
);
self.chain.call(req)
}
}

View file

@ -24,7 +24,7 @@ lazy_static::lazy_static! {
Mutex::new(
threadpool::Builder::new()
.thread_name("actix-web".to_owned())
.num_threads(8)
.num_threads(default)
.build(),
)
};
@ -45,11 +45,15 @@ pub enum BlockingError<E> {
/// to result of the function execution.
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
where
F: FnOnce() -> Result<I, E>,
F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(move |pool| {
let _ = tx.send(f());
POOL.with(|pool| {
pool.execute(move || {
let _ = tx.send(f());
})
});
CpuFuture { rx }

View file

@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::{fmt, str};

View file

@ -1,4 +1,4 @@
#![allow(clippy::type_complexity, dead_code, unused_variables)]
#![allow(clippy::type_complexity)]
mod app;
mod extractor;
@ -28,7 +28,7 @@ pub use crate::service::{ServiceRequest, ServiceResponse};
pub use crate::state::State;
pub mod dev {
pub use crate::app::AppService;
pub use crate::app::AppRouter;
pub use crate::handler::{AsyncFactory, Extract, Factory, Handle};
pub use crate::route::{Route, RouteBuilder};
// pub use crate::info::ConnectionInfo;

View file

@ -117,7 +117,6 @@ where
enum EncoderBody<B> {
Body(B),
Other(Box<dyn MessageBody>),
None,
}
pub struct Encoder<B> {
@ -131,7 +130,6 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
match self.body {
EncoderBody::Body(ref b) => b.length(),
EncoderBody::Other(ref b) => b.length(),
EncoderBody::None => BodyLength::Empty,
}
} else {
BodyLength::Stream
@ -143,7 +141,6 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
let result = match self.body {
EncoderBody::Body(ref mut b) => b.poll_next()?,
EncoderBody::Other(ref mut b) => b.poll_next()?,
EncoderBody::None => return Ok(Async::Ready(None)),
};
match result {
Async::NotReady => return Ok(Async::NotReady),

View file

@ -320,11 +320,11 @@ impl<P: 'static> RouteBuilder<P> {
}
}
pub struct RouteServiceBuilder<P, T, U1, U2> {
service: T,
filters: Vec<Box<Filter>>,
_t: PhantomData<(P, U1, U2)>,
}
// pub struct RouteServiceBuilder<P, T, U1, U2> {
// service: T,
// filters: Vec<Box<Filter>>,
// _t: PhantomData<(P, U1, U2)>,
// }
// impl<T, S: 'static, U1, U2> RouteServiceBuilder<T, S, U1, U2>
// where