From 60f9cfbb2a0fc77ba6dc918f199ee3a602c64549 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 26 Mar 2021 11:24:51 -0700 Subject: [PATCH] Refactor actix_http::h2::service module. Reduce loc. (#2118) --- actix-http/src/h2/service.rs | 88 ++++++++++++------------------------ 1 file changed, 30 insertions(+), 58 deletions(-) diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index c64139564..db0b580b3 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -11,8 +11,8 @@ use actix_service::{ ServiceFactory, }; use bytes::Bytes; -use futures_core::ready; -use futures_util::future::ok; +use futures_core::{future::LocalBoxFuture, ready}; +use futures_util::future::ready; use h2::server::{handshake, Handshake}; use log::error; @@ -65,6 +65,7 @@ where impl H2Service where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -80,11 +81,11 @@ where Error = DispatchError, InitError = S::InitError, > { - pipeline_factory(fn_factory(|| async { - Ok::<_, S::InitError>(fn_service(|io: TcpStream| { + pipeline_factory(fn_factory(|| { + ready(Ok::<_, S::InitError>(fn_service(|io: TcpStream| { let peer_addr = io.peer_addr().ok(); - ok::<_, DispatchError>((io, peer_addr)) - })) + ready(Ok::<_, DispatchError>((io, peer_addr))) + }))) })) .and_then(self) } @@ -101,6 +102,7 @@ mod openssl { impl H2Service, S, B> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -123,10 +125,12 @@ mod openssl { .map_init_err(|_| panic!()), ) .and_then(fn_factory(|| { - ok::<_, S::InitError>(fn_service(|io: TlsStream| { - let peer_addr = io.get_ref().peer_addr().ok(); - ok((io, peer_addr)) - })) + ready(Ok::<_, S::InitError>(fn_service( + |io: TlsStream| { + let peer_addr = io.get_ref().peer_addr().ok(); + ready(Ok((io, peer_addr))) + }, + ))) })) .and_then(self.map_err(TlsError::Service)) } @@ -144,6 +148,7 @@ mod rustls { impl H2Service, S, B> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -169,10 +174,12 @@ mod rustls { .map_init_err(|_| panic!()), ) .and_then(fn_factory(|| { - ok::<_, S::InitError>(fn_service(|io: TlsStream| { - let peer_addr = io.get_ref().0.peer_addr().ok(); - ok((io, peer_addr)) - })) + ready(Ok::<_, S::InitError>(fn_service( + |io: TlsStream| { + let peer_addr = io.get_ref().0.peer_addr().ok(); + ready(Ok((io, peer_addr))) + }, + ))) })) .and_then(self.map_err(TlsError::Service)) } @@ -181,8 +188,9 @@ mod rustls { impl ServiceFactory<(T, Option)> for H2Service where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -193,52 +201,16 @@ where type Config = (); type Service = H2ServiceHandler; type InitError = S::InitError; - type Future = H2ServiceResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - H2ServiceResponse { - fut: self.srv.new_service(()), - cfg: Some(self.cfg.clone()), - on_connect_ext: self.on_connect_ext.clone(), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let cfg = self.cfg.clone(); + let on_connect_ext = self.on_connect_ext.clone(); -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H2ServiceResponse -where - S: ServiceFactory, -{ - #[pin] - fut: S::Future, - cfg: Option, - on_connect_ext: Option>>, - _phantom: PhantomData, -} - -impl Future for H2ServiceResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory, - S::Error: Into + 'static, - S::Response: Into> + 'static, - >::Future: 'static, - B: MessageBody + 'static, -{ - type Output = Result, S::InitError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - this.fut.poll(cx).map_ok(|service| { - let this = self.as_mut().project(); - H2ServiceHandler::new( - this.cfg.take().unwrap(), - this.on_connect_ext.clone(), - service, - ) + Box::pin(async move { + let service = service.await?; + Ok(H2ServiceHandler::new(cfg, on_connect_ext, service)) }) } }