From bb17280f512927cbeed95a6ce99dd1d42e463add Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 29 Apr 2020 07:38:53 +0100 Subject: [PATCH] simplify data factory future polling (#1473) Co-authored-by: Yuki Okushi --- src/app.rs | 15 +++++- src/app_service.rs | 51 ++++++++++++-------- src/test.rs | 118 ++++++++++++++++++++++++++------------------- 3 files changed, 113 insertions(+), 71 deletions(-) diff --git a/src/app.rs b/src/app.rs index ed2aff8e6..c611f2657 100644 --- a/src/app.rs +++ b/src/app.rs @@ -476,13 +476,13 @@ where mod tests { use actix_service::Service; use bytes::Bytes; - use futures::future::ok; + use futures::future::{ok, err}; use super::*; use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::middleware::DefaultHeaders; use crate::service::ServiceRequest; - use crate::test::{call_service, init_service, read_body, TestRequest}; + use crate::test::{call_service, init_service, try_init_service, read_body, TestRequest}; use crate::{web, HttpRequest, HttpResponse}; #[actix_rt::test] @@ -551,6 +551,17 @@ mod tests { assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); } + #[actix_rt::test] + async fn test_data_factory_errors() { + let srv = + try_init_service(App::new().data_factory(|| err::(())).service( + web::resource("/").to(|_: web::Data| HttpResponse::Ok()), + )) + .await; + + assert!(srv.is_err()); + } + #[actix_rt::test] async fn test_extension() { let mut srv = init_service(App::new().app_data(10usize).service( diff --git a/src/app_service.rs b/src/app_service.rs index ccfefbc68..67fa4dc2c 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -9,7 +9,7 @@ use actix_http::{Extensions, Request, Response}; use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url}; use actix_service::boxed::{self, BoxService, BoxServiceFactory}; use actix_service::{fn_service, Service, ServiceFactory}; -use futures::future::{ok, FutureExt, LocalBoxFuture}; +use futures::future::{join_all, ok, FutureExt, LocalBoxFuture}; use crate::config::{AppConfig, AppService}; use crate::data::DataFactory; @@ -109,12 +109,15 @@ where let rmap = Rc::new(rmap); rmap.finish(rmap.clone()); + // start all data factory futures + let factory_futs = join_all(self.data_factories.iter().map(|f| f())); + AppInitResult { endpoint: None, endpoint_fut: self.endpoint.new_service(()), data: self.data.clone(), - data_factories: Vec::new(), - data_factories_fut: self.data_factories.iter().map(|f| f()).collect(), + data_factories: None, + data_factories_fut: factory_futs.boxed_local(), extensions: Some( self.extensions .borrow_mut() @@ -133,15 +136,21 @@ pub struct AppInitResult where T: ServiceFactory, { - endpoint: Option, #[pin] endpoint_fut: T::Future, + // a Some signals completion of endpoint creation + endpoint: Option, + + #[pin] + data_factories_fut: LocalBoxFuture<'static, Vec, ()>>>, + // a Some signals completion of factory futures + data_factories: Option>>, + rmap: Rc, config: AppConfig, data: Rc>>, - data_factories: Vec>, - data_factories_fut: Vec, ()>>>, extensions: Option, + _t: PhantomData, } @@ -161,44 +170,46 @@ where let this = self.project(); // async data factories - let mut idx = 0; - while idx < this.data_factories_fut.len() { - match Pin::new(&mut this.data_factories_fut[idx]).poll(cx)? { - Poll::Ready(f) => { - this.data_factories.push(f); - let _ = this.data_factories_fut.remove(idx); - } - Poll::Pending => idx += 1, + if let Poll::Ready(factories) = this.data_factories_fut.poll(cx) { + let factories: Result, ()> = factories.into_iter().collect(); + + if let Ok(factories) = factories { + this.data_factories.replace(factories); + } else { + return Poll::Ready(Err(())); } } + // app service and middleware if this.endpoint.is_none() { if let Poll::Ready(srv) = this.endpoint_fut.poll(cx)? { *this.endpoint = Some(srv); } } - if this.endpoint.is_some() && this.data_factories_fut.is_empty() { + // not using if let so condition only needs shared ref + if this.endpoint.is_some() && this.data_factories.is_some() { // create app data container let mut data = this.extensions.take().unwrap(); + for f in this.data.iter() { f.create(&mut data); } - for f in this.data_factories.iter() { + for f in this.data_factories.take().unwrap().iter() { f.create(&mut data); } - Poll::Ready(Ok(AppInitService { + return Poll::Ready(Ok(AppInitService { service: this.endpoint.take().unwrap(), rmap: this.rmap.clone(), config: this.config.clone(), data: Rc::new(data), pool: HttpRequestPool::create(), - })) - } else { - Poll::Pending + })); } + + Poll::Pending } } diff --git a/src/test.rs b/src/test.rs index 19ea8bbef..c8a738d83 100644 --- a/src/test.rs +++ b/src/test.rs @@ -78,6 +78,26 @@ pub fn default_service( pub async fn init_service( app: R, ) -> impl Service, Error = E> +where + R: IntoServiceFactory, + S: ServiceFactory< + Config = AppConfig, + Request = Request, + Response = ServiceResponse, + Error = E, + >, + S::InitError: std::fmt::Debug, +{ + try_init_service(app).await.expect("service initilization failed") +} + +/// Fallible version of init_service that allows testing data factory errors. +pub(crate) async fn try_init_service( + app: R, +) -> Result< + impl Service, Error = E>, + S::InitError, +> where R: IntoServiceFactory, S: ServiceFactory< @@ -89,7 +109,7 @@ where S::InitError: std::fmt::Debug, { let srv = app.into_factory(); - srv.new_service(AppConfig::default()).await.unwrap() + srv.new_service(AppConfig::default()).await } /// Calls service and waits for response future completion. @@ -580,7 +600,7 @@ impl TestRequest { pub async fn send_request(self, app: &mut S) -> S::Response where S: Service, Error = E>, - E: std::fmt::Debug + E: std::fmt::Debug, { let req = self.to_request(); call_service(app, req).await @@ -1125,8 +1145,8 @@ mod tests { #[actix_rt::test] async fn test_response_json() { let mut app = init_service(App::new().service(web::resource("/people").route( - web::post().to(|person: web::Json| { - async { HttpResponse::Ok().json(person.into_inner()) } + web::post().to(|person: web::Json| async { + HttpResponse::Ok().json(person.into_inner()) }), ))) .await; @@ -1146,8 +1166,8 @@ mod tests { #[actix_rt::test] async fn test_body_json() { let mut app = init_service(App::new().service(web::resource("/people").route( - web::post().to(|person: web::Json| { - async { HttpResponse::Ok().json(person.into_inner()) } + web::post().to(|person: web::Json| async { + HttpResponse::Ok().json(person.into_inner()) }), ))) .await; @@ -1168,8 +1188,8 @@ mod tests { #[actix_rt::test] async fn test_request_response_form() { let mut app = init_service(App::new().service(web::resource("/people").route( - web::post().to(|person: web::Form| { - async { HttpResponse::Ok().json(person.into_inner()) } + web::post().to(|person: web::Form| async { + HttpResponse::Ok().json(person.into_inner()) }), ))) .await; @@ -1194,8 +1214,8 @@ mod tests { #[actix_rt::test] async fn test_request_response_json() { let mut app = init_service(App::new().service(web::resource("/people").route( - web::post().to(|person: web::Json| { - async { HttpResponse::Ok().json(person.into_inner()) } + web::post().to(|person: web::Json| async { + HttpResponse::Ok().json(person.into_inner()) }), ))) .await; @@ -1259,53 +1279,53 @@ mod tests { assert!(res.status().is_success()); } -/* + /* - Comment out until actix decoupled of actix-http: - https://github.com/actix/actix/issues/321 + Comment out until actix decoupled of actix-http: + https://github.com/actix/actix/issues/321 - use futures::FutureExt; + use futures::FutureExt; - #[actix_rt::test] - async fn test_actor() { - use actix::Actor; + #[actix_rt::test] + async fn test_actor() { + use actix::Actor; - struct MyActor; + struct MyActor; - struct Num(usize); - impl actix::Message for Num { - type Result = usize; - } - impl actix::Actor for MyActor { - type Context = actix::Context; - } - impl actix::Handler for MyActor { - type Result = usize; - fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result { - msg.0 + struct Num(usize); + impl actix::Message for Num { + type Result = usize; + } + impl actix::Actor for MyActor { + type Context = actix::Context; + } + impl actix::Handler for MyActor { + type Result = usize; + fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result { + msg.0 + } } - } - let mut app = init_service(App::new().service(web::resource("/index.html").to( - move || { - addr.send(Num(1)).map(|res| match res { - Ok(res) => { - if res == 1 { - Ok(HttpResponse::Ok()) - } else { - Ok(HttpResponse::BadRequest()) + let mut app = init_service(App::new().service(web::resource("/index.html").to( + move || { + addr.send(Num(1)).map(|res| match res { + Ok(res) => { + if res == 1 { + Ok(HttpResponse::Ok()) + } else { + Ok(HttpResponse::BadRequest()) + } } - } - Err(err) => Err(err), - }) - }, - ))) - .await; + Err(err) => Err(err), + }) + }, + ))) + .await; - let req = TestRequest::post().uri("/index.html").to_request(); - let res = app.call(req).await.unwrap(); - assert!(res.status().is_success()); - } -*/ + let req = TestRequest::post().uri("/index.html").to_request(); + let res = app.call(req).await.unwrap(); + assert!(res.status().is_success()); + } + */ }