From 6822bf2f580a504afcd36bcaa3a95d12e628bc3b Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 26 Mar 2021 09:15:04 -0700 Subject: [PATCH] Refactor actix_http::h1::service (#2117) --- actix-http/src/h1/service.rs | 205 ++++++++++++----------------------- 1 file changed, 70 insertions(+), 135 deletions(-) diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 51303886b..4fe79736b 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,6 +1,4 @@ -use std::future::Future; use std::marker::PhantomData; -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, net}; @@ -8,7 +6,7 @@ use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; -use futures_core::ready; +use futures_core::{future::LocalBoxFuture, ready}; use futures_util::future::ready; use crate::body::MessageBody; @@ -60,14 +58,17 @@ where impl H1Service where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -94,17 +95,21 @@ mod openssl { use super::*; use actix_service::ServiceFactoryExt; - use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, TlsStream}; - use actix_tls::accept::TlsError; + use actix_tls::accept::{ + openssl::{Acceptor, SslAcceptor, SslError, TlsStream}, + TlsError, + }; impl H1Service, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< @@ -112,6 +117,7 @@ mod openssl { Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -143,19 +149,25 @@ mod openssl { #[cfg(feature = "rustls")] mod rustls { use super::*; + + use std::io; + use actix_service::ServiceFactoryExt; - use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream}; - use actix_tls::accept::TlsError; - use std::{fmt, io}; + use actix_tls::accept::{ + rustls::{Acceptor, ServerConfig, TlsStream}, + TlsError, + }; impl H1Service, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< @@ -163,6 +175,7 @@ mod rustls { Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -241,16 +254,19 @@ where impl ServiceFactory<(T, Option)> for H1Service where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -259,103 +275,42 @@ where type Config = (); type Service = H1ServiceHandler; type InitError = (); - type Future = H1ServiceResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - H1ServiceResponse { - fut: self.srv.new_service(()), - fut_ex: Some(self.expect.new_service(())), - fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), - expect: None, - upgrade: None, - on_connect_ext: self.on_connect_ext.clone(), - cfg: Some(self.cfg.clone()), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let expect = self.expect.new_service(()); + let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); + let on_connect_ext = self.on_connect_ext.clone(); + let cfg = self.cfg.clone(); -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H1ServiceResponse -where - S: ServiceFactory, - S::Error: Into, - S::InitError: fmt::Debug, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option, - #[pin] - fut_upg: Option, - expect: Option, - upgrade: Option, - on_connect_ext: Option>>, - cfg: Option, - _phantom: PhantomData, -} + Box::pin(async move { + let expect = expect + .await + .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; -impl Future for H1ServiceResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory, - S::Error: Into, - S::Response: Into>, - S::InitError: fmt::Debug, - B: MessageBody, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, -{ - type Output = Result, ()>; + let upgrade = match upgrade { + Some(upgrade) => { + let upgrade = upgrade.await.map_err(|e| { + log::error!("Init http upgrade service error: {:?}", e) + })?; + Some(upgrade) + } + None => None, + }; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); + let service = service + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; - if let Some(fut) = this.fut_ex.as_pin_mut() { - let expect = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.expect = Some(expect); - this.fut_ex.set(None); - } - - if let Some(fut) = this.fut_upg.as_pin_mut() { - let upgrade = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.upgrade = Some(upgrade); - this.fut_upg.set(None); - } - - let result = ready!(this - .fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e))); - - Poll::Ready(result.map(|service| { - let this = self.as_mut().project(); - - H1ServiceHandler::new( - this.cfg.take().unwrap(), + Ok(H1ServiceHandler::new( + cfg, service, - this.expect.take().unwrap(), - this.upgrade.take(), - this.on_connect_ext.clone(), - ) - })) + expect, + upgrade, + on_connect_ext, + )) + }) } } @@ -417,47 +372,27 @@ where type Future = Dispatcher; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready = self - .flow - .expect - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready(); + ready!(self.flow.expect.poll_ready(cx)).map_err(|e| { + let e = e.into(); + log::error!("Http expect service readiness error: {:?}", e); + DispatchError::Service(e) + })?; - let ready = self - .flow - .service - .poll_ready(cx) - .map_err(|e| { + if let Some(ref upg) = self.flow.upgrade { + ready!(upg.poll_ready(cx)).map_err(|e| { let e = e.into(); - log::error!("Http service readiness error: {:?}", e); + log::error!("Http upgrade service readiness error: {:?}", e); DispatchError::Service(e) - })? - .is_ready() - && ready; - - let ready = if let Some(ref upg) = self.flow.upgrade { - upg.poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready - } else { - ready + })?; }; - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + ready!(self.flow.service.poll_ready(cx)).map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })?; + + Poll::Ready(Ok(())) } fn call(&self, (io, addr): (T, Option)) -> Self::Future {