diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 3ab902422..707d5551b 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -1,4 +1,3 @@ -use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -8,7 +7,6 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_rt::task::JoinHandle; use bytes::Bytes; use futures_core::future::LocalBoxFuture; -use futures_util::future::{err, Either, FutureExt, Ready}; use h2::client::SendRequest; use pin_project::pin_project; @@ -75,7 +73,6 @@ impl DerefMut for H2Connection { pub trait Connection { type Io: AsyncRead + AsyncWrite + Unpin; - type Future: Future>; fn protocol(&self) -> Protocol; @@ -84,14 +81,16 @@ pub trait Connection { self, head: H, body: B, - ) -> Self::Future; - - type TunnelFuture: Future< - Output = Result<(ResponseHead, Framed), SendRequestError>, - >; + ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; /// Send request, returns Response and Framed - fn open_tunnel>(self, head: H) -> Self::TunnelFuture; + fn open_tunnel + 'static>( + self, + head: H, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, + >; } pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { @@ -154,8 +153,6 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, { type Io = T; - type Future = - LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; fn protocol(&self) -> Protocol { match self.io { @@ -169,33 +166,35 @@ where mut self, head: H, body: B, - ) -> Self::Future { + ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { match self.io.take().unwrap() { - ConnectionType::H1(io) => { - h1proto::send_request(io, head.into(), body, self.created, self.pool) - .boxed_local() - } - ConnectionType::H2(io) => { - h2proto::send_request(io, head.into(), body, self.created, self.pool) - .boxed_local() - } + ConnectionType::H1(io) => Box::pin(h1proto::send_request( + io, + head.into(), + body, + self.created, + self.pool, + )), + ConnectionType::H2(io) => Box::pin(h2proto::send_request( + io, + head.into(), + body, + self.created, + self.pool, + )), } } - type TunnelFuture = Either< - LocalBoxFuture< - 'static, - Result<(ResponseHead, Framed), SendRequestError>, - >, - Ready), SendRequestError>>, - >; - /// Send request, returns Response and Framed - fn open_tunnel>(mut self, head: H) -> Self::TunnelFuture { + fn open_tunnel>( + mut self, + head: H, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, + > { match self.io.take().unwrap() { - ConnectionType::H1(io) => { - Either::Left(h1proto::open_tunnel(io, head.into()).boxed_local()) - } + ConnectionType::H1(io) => Box::pin(h1proto::open_tunnel(io, head.into())), ConnectionType::H2(io) => { if let Some(mut pool) = self.pool.take() { pool.release(IoConnection::new( @@ -204,7 +203,7 @@ where None, )); } - Either::Right(err(SendRequestError::TunnelNotSupported)) + Box::pin(async { Err(SendRequestError::TunnelNotSupported) }) } } } @@ -226,8 +225,6 @@ where B: AsyncRead + AsyncWrite + Unpin + 'static, { type Io = EitherIo; - type Future = - LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; fn protocol(&self) -> Protocol { match self { @@ -240,33 +237,30 @@ where self, head: H, body: RB, - ) -> Self::Future { + ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { match self { EitherConnection::A(con) => con.send_request(head, body), EitherConnection::B(con) => con.send_request(head, body), } } - type TunnelFuture = LocalBoxFuture< + /// Send request, returns Response and Framed + fn open_tunnel + 'static>( + self, + head: H, + ) -> LocalBoxFuture< 'static, Result<(ResponseHead, Framed), SendRequestError>, - >; - - /// Send request, returns Response and Framed - fn open_tunnel>(self, head: H) -> Self::TunnelFuture { + > { match self { - EitherConnection::A(con) => con - .open_tunnel(head) - .map(|res| { - res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A))) - }) - .boxed_local(), - EitherConnection::B(con) => con - .open_tunnel(head) - .map(|res| { - res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B))) - }) - .boxed_local(), + EitherConnection::A(con) => Box::pin(async { + let (head, framed) = con.open_tunnel(head).await?; + Ok((head, framed.into_map_io(EitherIo::A))) + }), + EitherConnection::B(con) => Box::pin(async { + let (head, framed) = con.open_tunnel(head).await?; + Ok((head, framed.into_map_io(EitherIo::B))) + }), } } } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 1ff5c7017..082c4b8e2 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -8,7 +8,7 @@ use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::future::poll_fn; -use futures_util::{pin_mut, SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt}; use crate::error::PayloadError; use crate::h1; @@ -127,7 +127,7 @@ where T: ConnectionLifetime + Unpin, B: MessageBody, { - pin_mut!(body); + actix_rt::pin!(body); let mut eof = false; while !eof { diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 105e04c73..0deb5c014 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -5,7 +5,6 @@ use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures_util::future::poll_fn; -use futures_util::pin_mut; use h2::{ client::{Builder, Connection, SendRequest}, SendStream, @@ -131,7 +130,7 @@ async fn send_body( mut send: SendStream, ) -> Result<(), SendRequestError> { let mut buf = None; - pin_mut!(body); + actix_rt::pin!(body); loop { if buf.is_none() { match poll_fn(|cx| body.as_mut().poll_next(cx)).await { diff --git a/awc/src/builder.rs b/awc/src/builder.rs index 94ffb8a71..3d1613c66 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -52,7 +52,6 @@ impl ClientBuilder { where T: Service + 'static, T::Response: Connection, - ::Future: 'static, T::Future: 'static, { self.connector = Some(Box::new(ConnectorWrapper(connector))); diff --git a/awc/src/connect.rs b/awc/src/connect.rs index a9b8f9f83..9a2ded195 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -41,8 +41,6 @@ where T: Service, T::Response: Connection, ::Io: 'static, - ::Future: 'static, - ::TunnelFuture: 'static, T::Future: 'static, { fn send_request(