mirror of
https://github.com/actix/actix-web.git
synced 2024-11-26 03:21:08 +00:00
refactor app_service (#1877)
This commit is contained in:
parent
68117543ea
commit
57da1d3c0f
2 changed files with 60 additions and 123 deletions
|
@ -458,11 +458,12 @@ where
|
||||||
Error = Error,
|
Error = Error,
|
||||||
InitError = (),
|
InitError = (),
|
||||||
>,
|
>,
|
||||||
|
T::Future: 'static,
|
||||||
{
|
{
|
||||||
fn into_factory(self) -> AppInit<T, B> {
|
fn into_factory(self) -> AppInit<T, B> {
|
||||||
AppInit {
|
AppInit {
|
||||||
data: self.data.into_boxed_slice().into(),
|
data_factories: self.data.into_boxed_slice().into(),
|
||||||
data_factories: self.data_factories.into_boxed_slice().into(),
|
async_data_factories: self.data_factories.into_boxed_slice().into(),
|
||||||
endpoint: self.endpoint,
|
endpoint: self.endpoint,
|
||||||
services: Rc::new(RefCell::new(self.services)),
|
services: Rc::new(RefCell::new(self.services)),
|
||||||
external: RefCell::new(self.external),
|
external: RefCell::new(self.external),
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::marker::PhantomData;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_http::{Extensions, Request, Response};
|
use actix_http::{Extensions, Request, Response};
|
||||||
use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url};
|
use actix_router::{Path, ResourceDef, Router, Url};
|
||||||
use actix_service::boxed::{self, BoxService, BoxServiceFactory};
|
use actix_service::boxed::{self, BoxService, BoxServiceFactory};
|
||||||
use actix_service::{fn_service, Service, ServiceFactory};
|
use actix_service::{fn_service, Service, ServiceFactory};
|
||||||
use futures_util::future::{join_all, ok, FutureExt, LocalBoxFuture};
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use futures_util::future::join_all;
|
||||||
|
|
||||||
use crate::config::{AppConfig, AppService};
|
use crate::config::{AppConfig, AppService};
|
||||||
use crate::data::{DataFactory, FnDataFactory};
|
use crate::data::{DataFactory, FnDataFactory};
|
||||||
|
@ -22,7 +22,6 @@ use crate::service::{AppServiceFactory, ServiceRequest, ServiceResponse};
|
||||||
type Guards = Vec<Box<dyn Guard>>;
|
type Guards = Vec<Box<dyn Guard>>;
|
||||||
type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>;
|
type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>;
|
||||||
type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
|
type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
|
||||||
type BoxResponse = LocalBoxFuture<'static, Result<ServiceResponse, Error>>;
|
|
||||||
|
|
||||||
/// Service factory to convert `Request` to a `ServiceRequest<S>`.
|
/// Service factory to convert `Request` to a `ServiceRequest<S>`.
|
||||||
/// It also executes data factories.
|
/// It also executes data factories.
|
||||||
|
@ -38,8 +37,8 @@ where
|
||||||
{
|
{
|
||||||
pub(crate) endpoint: T,
|
pub(crate) endpoint: T,
|
||||||
pub(crate) extensions: RefCell<Option<Extensions>>,
|
pub(crate) extensions: RefCell<Option<Extensions>>,
|
||||||
pub(crate) data: Rc<[Box<dyn DataFactory>]>,
|
pub(crate) data_factories: Rc<[Box<dyn DataFactory>]>,
|
||||||
pub(crate) data_factories: Rc<[FnDataFactory]>,
|
pub(crate) async_data_factories: Rc<[FnDataFactory]>,
|
||||||
pub(crate) services: Rc<RefCell<Vec<Box<dyn AppServiceFactory>>>>,
|
pub(crate) services: Rc<RefCell<Vec<Box<dyn AppServiceFactory>>>>,
|
||||||
pub(crate) default: Option<Rc<HttpNewService>>,
|
pub(crate) default: Option<Rc<HttpNewService>>,
|
||||||
pub(crate) factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>,
|
pub(crate) factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>,
|
||||||
|
@ -55,24 +54,26 @@ where
|
||||||
Error = Error,
|
Error = Error,
|
||||||
InitError = (),
|
InitError = (),
|
||||||
>,
|
>,
|
||||||
|
T::Future: 'static,
|
||||||
{
|
{
|
||||||
type Response = ServiceResponse<B>;
|
type Response = ServiceResponse<B>;
|
||||||
type Error = T::Error;
|
type Error = T::Error;
|
||||||
type Config = AppConfig;
|
type Config = AppConfig;
|
||||||
type Service = AppInitService<T::Service, B>;
|
type Service = AppInitService<T::Service, B>;
|
||||||
type InitError = T::InitError;
|
type InitError = T::InitError;
|
||||||
type Future = AppInitResult<T, B>;
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
|
|
||||||
fn new_service(&self, config: AppConfig) -> Self::Future {
|
fn new_service(&self, config: AppConfig) -> Self::Future {
|
||||||
// update resource default service
|
// update resource default service
|
||||||
let default = self.default.clone().unwrap_or_else(|| {
|
let default = self.default.clone().unwrap_or_else(|| {
|
||||||
Rc::new(boxed::factory(fn_service(|req: ServiceRequest| {
|
Rc::new(boxed::factory(fn_service(|req: ServiceRequest| async {
|
||||||
ok(req.into_response(Response::NotFound().finish()))
|
Ok(req.into_response(Response::NotFound().finish()))
|
||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
|
|
||||||
// App config
|
// App config
|
||||||
let mut config = AppService::new(config, default.clone(), self.data.clone());
|
let mut config =
|
||||||
|
AppService::new(config, default.clone(), self.data_factories.clone());
|
||||||
|
|
||||||
// register services
|
// register services
|
||||||
std::mem::take(&mut *self.services.borrow_mut())
|
std::mem::take(&mut *self.services.borrow_mut())
|
||||||
|
@ -83,7 +84,7 @@ where
|
||||||
|
|
||||||
let (config, services) = config.into_services();
|
let (config, services) = config.into_services();
|
||||||
|
|
||||||
// complete pipeline creation
|
// complete pipeline creation.
|
||||||
*self.factory_ref.borrow_mut() = Some(AppRoutingFactory {
|
*self.factory_ref.borrow_mut() = Some(AppRoutingFactory {
|
||||||
default,
|
default,
|
||||||
services: services
|
services: services
|
||||||
|
@ -106,107 +107,48 @@ where
|
||||||
let rmap = Rc::new(rmap);
|
let rmap = Rc::new(rmap);
|
||||||
rmap.finish(rmap.clone());
|
rmap.finish(rmap.clone());
|
||||||
|
|
||||||
// start all data factory futures
|
// construct all async data factory futures
|
||||||
let factory_futs = join_all(self.data_factories.iter().map(|f| f()));
|
let factory_futs = join_all(self.async_data_factories.iter().map(|f| f()));
|
||||||
|
|
||||||
AppInitResult {
|
// construct app service and middleware service factory future.
|
||||||
endpoint: None,
|
let endpoint_fut = self.endpoint.new_service(());
|
||||||
endpoint_fut: self.endpoint.new_service(()),
|
|
||||||
data: self.data.clone(),
|
|
||||||
data_factories: None,
|
|
||||||
data_factories_fut: factory_futs.boxed_local(),
|
|
||||||
extensions: Some(
|
|
||||||
self.extensions
|
|
||||||
.borrow_mut()
|
|
||||||
.take()
|
|
||||||
.unwrap_or_else(Extensions::new),
|
|
||||||
),
|
|
||||||
config,
|
|
||||||
rmap,
|
|
||||||
_phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[pin_project::pin_project]
|
// take extensions or create new one as app data container.
|
||||||
pub struct AppInitResult<T, B>
|
let mut app_data = self
|
||||||
where
|
.extensions
|
||||||
T: ServiceFactory<ServiceRequest>,
|
.borrow_mut()
|
||||||
{
|
.take()
|
||||||
#[pin]
|
.unwrap_or_else(Extensions::new);
|
||||||
endpoint_fut: T::Future,
|
|
||||||
// a Some signals completion of endpoint creation
|
|
||||||
endpoint: Option<T::Service>,
|
|
||||||
|
|
||||||
#[pin]
|
let data_factories = self.data_factories.clone();
|
||||||
data_factories_fut: LocalBoxFuture<'static, Vec<Result<Box<dyn DataFactory>, ()>>>,
|
|
||||||
// a Some signals completion of factory futures
|
|
||||||
data_factories: Option<Vec<Box<dyn DataFactory>>>,
|
|
||||||
|
|
||||||
rmap: Rc<ResourceMap>,
|
Box::pin(async move {
|
||||||
config: AppConfig,
|
// async data factories
|
||||||
data: Rc<[Box<dyn DataFactory>]>,
|
let async_data_factories = factory_futs
|
||||||
extensions: Option<Extensions>,
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.collect::<Result<Vec<_>, _>>()
|
||||||
|
.map_err(|_| ())?;
|
||||||
|
|
||||||
_phantom: PhantomData<B>,
|
// app service and middleware
|
||||||
}
|
let service = endpoint_fut.await?;
|
||||||
|
|
||||||
impl<T, B> Future for AppInitResult<T, B>
|
// populate app data container from (async) data factories.
|
||||||
where
|
data_factories
|
||||||
T: ServiceFactory<
|
.iter()
|
||||||
ServiceRequest,
|
.chain(&async_data_factories)
|
||||||
Config = (),
|
.for_each(|factory| {
|
||||||
Response = ServiceResponse<B>,
|
factory.create(&mut app_data);
|
||||||
Error = Error,
|
});
|
||||||
InitError = (),
|
|
||||||
>,
|
|
||||||
{
|
|
||||||
type Output = Result<AppInitService<T::Service, B>, ()>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
Ok(AppInitService {
|
||||||
let this = self.project();
|
service,
|
||||||
|
rmap,
|
||||||
// async data factories
|
config,
|
||||||
if let Poll::Ready(factories) = this.data_factories_fut.poll(cx) {
|
app_data: Rc::new(app_data),
|
||||||
let factories: Result<Vec<_>, ()> = factories.into_iter().collect();
|
|
||||||
|
|
||||||
if let Ok(factories) = factories {
|
|
||||||
this.data_factories.replace(factories);
|
|
||||||
} else {
|
|
||||||
return Poll::Ready(Err(()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// app service and middleware
|
|
||||||
if this.endpoint.is_none() {
|
|
||||||
if let Poll::Ready(srv) = this.endpoint_fut.poll(cx)? {
|
|
||||||
*this.endpoint = Some(srv);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// not using if let so condition only needs shared ref
|
|
||||||
if this.endpoint.is_some() && this.data_factories.is_some() {
|
|
||||||
// create app data container
|
|
||||||
let mut data = this.extensions.take().unwrap();
|
|
||||||
|
|
||||||
for f in this.data.iter() {
|
|
||||||
f.create(&mut data);
|
|
||||||
}
|
|
||||||
|
|
||||||
for f in this.data_factories.take().unwrap().iter() {
|
|
||||||
f.create(&mut data);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Poll::Ready(Ok(AppInitService {
|
|
||||||
service: this.endpoint.take().unwrap(),
|
|
||||||
rmap: this.rmap.clone(),
|
|
||||||
config: this.config.clone(),
|
|
||||||
data: Rc::new(data),
|
|
||||||
pool: HttpRequestPool::create(),
|
pool: HttpRequestPool::create(),
|
||||||
}));
|
})
|
||||||
}
|
})
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,7 +160,7 @@ where
|
||||||
service: T,
|
service: T,
|
||||||
rmap: Rc<ResourceMap>,
|
rmap: Rc<ResourceMap>,
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
data: Rc<Extensions>,
|
app_data: Rc<Extensions>,
|
||||||
pool: &'static HttpRequestPool,
|
pool: &'static HttpRequestPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,7 +193,7 @@ where
|
||||||
payload,
|
payload,
|
||||||
self.rmap.clone(),
|
self.rmap.clone(),
|
||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
self.data.clone(),
|
self.app_data.clone(),
|
||||||
self.pool,
|
self.pool,
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
@ -290,7 +232,7 @@ impl ServiceFactory<ServiceRequest> for AppRoutingFactory {
|
||||||
CreateAppRoutingItem::Future(
|
CreateAppRoutingItem::Future(
|
||||||
Some(path.clone()),
|
Some(path.clone()),
|
||||||
guards.borrow_mut().take(),
|
guards.borrow_mut().take(),
|
||||||
service.new_service(()).boxed_local(),
|
Box::pin(service.new_service(())),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
|
@ -307,7 +249,7 @@ type HttpServiceFut = LocalBoxFuture<'static, Result<HttpService, ()>>;
|
||||||
pub struct AppRoutingFactoryResponse {
|
pub struct AppRoutingFactoryResponse {
|
||||||
fut: Vec<CreateAppRoutingItem>,
|
fut: Vec<CreateAppRoutingItem>,
|
||||||
default: Option<HttpService>,
|
default: Option<HttpService>,
|
||||||
default_fut: Option<LocalBoxFuture<'static, Result<HttpService, ()>>>,
|
default_fut: Option<HttpServiceFut>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum CreateAppRoutingItem {
|
enum CreateAppRoutingItem {
|
||||||
|
@ -367,7 +309,6 @@ impl Future for AppRoutingFactoryResponse {
|
||||||
router
|
router
|
||||||
});
|
});
|
||||||
Poll::Ready(Ok(AppRouting {
|
Poll::Ready(Ok(AppRouting {
|
||||||
ready: None,
|
|
||||||
router: router.finish(),
|
router: router.finish(),
|
||||||
default: self.default.take(),
|
default: self.default.take(),
|
||||||
}))
|
}))
|
||||||
|
@ -379,22 +320,15 @@ impl Future for AppRoutingFactoryResponse {
|
||||||
|
|
||||||
pub struct AppRouting {
|
pub struct AppRouting {
|
||||||
router: Router<HttpService, Guards>,
|
router: Router<HttpService, Guards>,
|
||||||
ready: Option<(ServiceRequest, ResourceInfo)>,
|
|
||||||
default: Option<HttpService>,
|
default: Option<HttpService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<ServiceRequest> for AppRouting {
|
impl Service<ServiceRequest> for AppRouting {
|
||||||
type Response = ServiceResponse;
|
type Response = ServiceResponse;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = BoxResponse;
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
actix_service::always_ready!();
|
||||||
if self.ready.is_none() {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
} else {
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
|
fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
|
||||||
let res = self.router.recognize_mut_checked(&mut req, |req, guards| {
|
let res = self.router.recognize_mut_checked(&mut req, |req, guards| {
|
||||||
|
@ -414,7 +348,9 @@ impl Service<ServiceRequest> for AppRouting {
|
||||||
default.call(req)
|
default.call(req)
|
||||||
} else {
|
} else {
|
||||||
let req = req.into_parts().0;
|
let req = req.into_parts().0;
|
||||||
ok(ServiceResponse::new(req, Response::NotFound().finish())).boxed_local()
|
Box::pin(async {
|
||||||
|
Ok(ServiceResponse::new(req, Response::NotFound().finish()))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -431,11 +367,11 @@ impl AppEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceFactory<ServiceRequest> for AppEntry {
|
impl ServiceFactory<ServiceRequest> for AppEntry {
|
||||||
type Config = ();
|
|
||||||
type Response = ServiceResponse;
|
type Response = ServiceResponse;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type InitError = ();
|
type Config = ();
|
||||||
type Service = AppRouting;
|
type Service = AppRouting;
|
||||||
|
type InitError = ();
|
||||||
type Future = AppRoutingFactoryResponse;
|
type Future = AppRoutingFactoryResponse;
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
fn new_service(&self, _: ()) -> Self::Future {
|
||||||
|
|
Loading…
Reference in a new issue