1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-29 21:11:17 +00:00

remove awc::connect::connect trait. (#2004)

This commit is contained in:
fakeshadow 2021-02-17 09:10:46 -08:00 committed by GitHub
parent 5efea652e3
commit dfd9dc40ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 84 deletions

View file

@ -7,7 +7,7 @@ use actix_http::client::{Connect as HttpConnect, ConnectError, Connection, Conne
use actix_http::http::{self, header, Error as HttpError, HeaderMap, HeaderName}; use actix_http::http::{self, header, Error as HttpError, HeaderMap, HeaderName};
use actix_service::Service; use actix_service::Service;
use crate::connect::{Connect, ConnectorWrapper}; use crate::connect::{ConnectService, ConnectorWrapper};
use crate::{Client, ClientConfig}; use crate::{Client, ClientConfig};
/// An HTTP Client builder /// An HTTP Client builder
@ -23,7 +23,7 @@ pub struct ClientBuilder {
conn_window_size: Option<u32>, conn_window_size: Option<u32>,
headers: HeaderMap, headers: HeaderMap,
timeout: Option<Duration>, timeout: Option<Duration>,
connector: Option<Box<dyn Connect>>, connector: Option<ConnectService>,
} }
impl Default for ClientBuilder { impl Default for ClientBuilder {
@ -54,7 +54,7 @@ impl ClientBuilder {
T::Response: Connection, T::Response: Connection,
T::Future: 'static, T::Future: 'static,
{ {
self.connector = Some(Box::new(ConnectorWrapper(connector))); self.connector = Some(Box::new(ConnectorWrapper::new(connector)));
self self
} }
@ -181,7 +181,7 @@ impl ClientBuilder {
if let Some(val) = self.stream_window_size { if let Some(val) = self.stream_window_size {
connector = connector.initial_window_size(val) connector = connector.initial_window_size(val)
}; };
Box::new(ConnectorWrapper(connector.finish())) as _ Box::new(ConnectorWrapper::new(connector.finish())) as _
}; };
let config = ClientConfig { let config = ClientConfig {
headers: self.headers, headers: self.headers,

View file

@ -16,74 +16,100 @@ use futures_core::future::LocalBoxFuture;
use crate::response::ClientResponse; use crate::response::ClientResponse;
pub(crate) struct ConnectorWrapper<T>(pub T); pub(crate) struct ConnectorWrapper<T> {
connector: T,
type TunnelResponse = (ResponseHead, Framed<BoxedSocket, ClientCodec>);
pub(crate) trait Connect {
fn send_request(
&self,
head: RequestHeadType,
body: Body,
addr: Option<net::SocketAddr>,
) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>;
/// Send request, returns Response and Framed
fn open_tunnel(
&self,
head: RequestHead,
addr: Option<net::SocketAddr>,
) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>>;
} }
impl<T> Connect for ConnectorWrapper<T> impl<T> ConnectorWrapper<T> {
pub(crate) fn new(connector: T) -> Self {
Self { connector }
}
}
pub type ConnectService = Box<
dyn Service<
ConnectRequest,
Response = ConnectResponse,
Error = SendRequestError,
Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
>,
>;
pub enum ConnectRequest {
Client(RequestHeadType, Body, Option<net::SocketAddr>),
Tunnel(RequestHead, Option<net::SocketAddr>),
}
pub enum ConnectResponse {
Client(ClientResponse),
Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
}
impl ConnectResponse {
pub fn into_client_response(self) -> ClientResponse {
match self {
ConnectResponse::Client(res) => res,
_ => panic!(
"ClientResponse only reachable with ConnectResponse::ClientResponse variant"
),
}
}
pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
match self {
ConnectResponse::Tunnel(head, framed) => (head, framed),
_ => panic!(
"TunnelResponse only reachable with ConnectResponse::TunnelResponse variant"
),
}
}
}
impl<T> Service<ConnectRequest> for ConnectorWrapper<T>
where where
T: Service<ClientConnect, Error = ConnectError>, T: Service<ClientConnect, Error = ConnectError>,
T::Response: Connection, T::Response: Connection,
<T::Response as Connection>::Io: 'static, <T::Response as Connection>::Io: 'static,
T::Future: 'static, T::Future: 'static,
{ {
fn send_request( type Response = ConnectResponse;
&self, type Error = SendRequestError;
head: RequestHeadType, type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
body: Body,
addr: Option<net::SocketAddr>, actix_service::forward_ready!(connector);
) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>> {
fn call(&self, req: ConnectRequest) -> Self::Future {
// connect to the host // connect to the host
let fut = self.0.call(ClientConnect { let fut = match req {
uri: head.as_ref().uri.clone(), ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
addr, uri: head.as_ref().uri.clone(),
}); addr,
}),
ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
uri: head.uri.clone(),
addr,
}),
};
Box::pin(async move { Box::pin(async move {
let connection = fut.await?; let connection = fut.await?;
// send request match req {
let (head, payload) = connection.send_request(head, body).await?; ConnectRequest::Client(head, body, ..) => {
// send request
let (head, payload) = connection.send_request(head, body).await?;
Ok(ClientResponse::new(head, payload)) Ok(ConnectResponse::Client(ClientResponse::new(head, payload)))
}) }
} ConnectRequest::Tunnel(head, ..) => {
// send request
let (head, framed) =
connection.open_tunnel(RequestHeadType::from(head)).await?;
fn open_tunnel( let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
&self, Ok(ConnectResponse::Tunnel(head, framed))
head: RequestHead, }
addr: Option<net::SocketAddr>, }
) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>> {
// connect to the host
let fut = self.0.call(ClientConnect {
uri: head.uri.clone(),
addr,
});
Box::pin(async move {
let connection = fut.await?;
// send request
let (head, framed) = connection.open_tunnel(RequestHeadType::from(head)).await?;
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
Ok((head, framed))
}) })
} }
} }

View file

@ -115,13 +115,13 @@ pub mod test;
pub mod ws; pub mod ws;
pub use self::builder::ClientBuilder; pub use self::builder::ClientBuilder;
pub use self::connect::BoxedSocket; pub use self::connect::{BoxedSocket, ConnectRequest, ConnectResponse, ConnectService};
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
pub use self::request::ClientRequest; pub use self::request::ClientRequest;
pub use self::response::{ClientResponse, JsonBody, MessageBody}; pub use self::response::{ClientResponse, JsonBody, MessageBody};
pub use self::sender::SendClientRequest; pub use self::sender::SendClientRequest;
use self::connect::{Connect, ConnectorWrapper}; use self::connect::ConnectorWrapper;
/// An asynchronous HTTP and WebSocket client. /// An asynchronous HTTP and WebSocket client.
/// ///
@ -146,7 +146,7 @@ use self::connect::{Connect, ConnectorWrapper};
pub struct Client(Rc<ClientConfig>); pub struct Client(Rc<ClientConfig>);
pub(crate) struct ClientConfig { pub(crate) struct ClientConfig {
pub(crate) connector: Box<dyn Connect>, pub(crate) connector: ConnectService,
pub(crate) headers: HeaderMap, pub(crate) headers: HeaderMap,
pub(crate) timeout: Option<Duration>, pub(crate) timeout: Option<Duration>,
} }
@ -154,7 +154,7 @@ pub(crate) struct ClientConfig {
impl Default for Client { impl Default for Client {
fn default() -> Self { fn default() -> Self {
Client(Rc::new(ClientConfig { Client(Rc::new(ClientConfig {
connector: Box::new(ConnectorWrapper(Connector::new().finish())), connector: Box::new(ConnectorWrapper::new(Connector::new().finish())),
headers: HeaderMap::new(), headers: HeaderMap::new(),
timeout: Some(Duration::from_secs(5)), timeout: Some(Duration::from_secs(5)),
})) }))

View file

@ -18,12 +18,13 @@ use actix_http::{
use actix_rt::time::{sleep, Sleep}; use actix_rt::time::{sleep, Sleep};
use bytes::Bytes; use bytes::Bytes;
use derive_more::From; use derive_more::From;
use futures_core::{ready, Stream}; use futures_core::Stream;
use serde::Serialize; use serde::Serialize;
#[cfg(feature = "compress")] #[cfg(feature = "compress")]
use actix_http::{encoding::Decoder, http::header::ContentEncoding, Payload, PayloadStream}; use actix_http::{encoding::Decoder, http::header::ContentEncoding, Payload, PayloadStream};
use crate::connect::{ConnectRequest, ConnectResponse};
use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError}; use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError};
use crate::response::ClientResponse; use crate::response::ClientResponse;
use crate::ClientConfig; use crate::ClientConfig;
@ -56,7 +57,8 @@ impl From<PrepForSendingError> for SendRequestError {
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub enum SendClientRequest { pub enum SendClientRequest {
Fut( Fut(
Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>, Pin<Box<dyn Future<Output = Result<ConnectResponse, SendRequestError>>>>,
// FIXME: use a pinned Sleep instead of box.
Option<Pin<Box<Sleep>>>, Option<Pin<Box<Sleep>>>,
bool, bool,
), ),
@ -65,7 +67,7 @@ pub enum SendClientRequest {
impl SendClientRequest { impl SendClientRequest {
pub(crate) fn new( pub(crate) fn new(
send: Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>, send: Pin<Box<dyn Future<Output = Result<ConnectResponse, SendRequestError>>>>,
response_decompress: bool, response_decompress: bool,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> SendClientRequest { ) -> SendClientRequest {
@ -89,14 +91,19 @@ impl Future for SendClientRequest {
} }
} }
let res = ready!(send.as_mut().poll(cx)).map(|res| { let res = futures_core::ready!(send.as_mut().poll(cx)).map(|res| {
res._timeout(delay.take()).map_body(|head, payload| { res.into_client_response()._timeout(delay.take()).map_body(
if *response_decompress { |head, payload| {
Payload::Stream(Decoder::from_headers(payload, &head.headers)) if *response_decompress {
} else { Payload::Stream(Decoder::from_headers(payload, &head.headers))
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) } else {
} Payload::Stream(Decoder::new(
}) payload,
ContentEncoding::Identity,
))
}
},
)
}); });
Poll::Ready(res) Poll::Ready(res)
@ -122,10 +129,9 @@ impl Future for SendClientRequest {
return Poll::Ready(Err(SendRequestError::Timeout)); return Poll::Ready(Err(SendRequestError::Timeout));
} }
} }
send.as_mut() send.as_mut()
.poll(cx) .poll(cx)
.map_ok(|res| res._timeout(delay.take())) .map_ok(|res| res.into_client_response()._timeout(delay.take()))
} }
SendClientRequest::Err(ref mut e) => match e.take() { SendClientRequest::Err(ref mut e) => match e.take() {
Some(e) => Poll::Ready(Err(e)), Some(e) => Poll::Ready(Err(e)),
@ -177,19 +183,19 @@ impl RequestSender {
where where
B: Into<Body>, B: Into<Body>,
{ {
let fut = match self { let req = match self {
RequestSender::Owned(head) => { RequestSender::Owned(head) => {
config ConnectRequest::Client(RequestHeadType::Owned(head), body.into(), addr)
.connector
.send_request(RequestHeadType::Owned(head), body.into(), addr)
} }
RequestSender::Rc(head, extra_headers) => config.connector.send_request( RequestSender::Rc(head, extra_headers) => ConnectRequest::Client(
RequestHeadType::Rc(head, extra_headers), RequestHeadType::Rc(head, extra_headers),
body.into(), body.into(),
addr, addr,
), ),
}; };
let fut = config.connector.call(req);
SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout)) SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout))
} }

View file

@ -36,10 +36,11 @@ use actix_codec::Framed;
use actix_http::cookie::{Cookie, CookieJar}; use actix_http::cookie::{Cookie, CookieJar};
use actix_http::{ws, Payload, RequestHead}; use actix_http::{ws, Payload, RequestHead};
use actix_rt::time::timeout; use actix_rt::time::timeout;
use actix_service::Service;
pub use actix_http::ws::{CloseCode, CloseReason, Codec, Frame, Message}; pub use actix_http::ws::{CloseCode, CloseReason, Codec, Frame, Message};
use crate::connect::BoxedSocket; use crate::connect::{BoxedSocket, ConnectRequest};
use crate::error::{InvalidUrl, SendRequestError, WsClientError}; use crate::error::{InvalidUrl, SendRequestError, WsClientError};
use crate::http::header::{self, HeaderName, HeaderValue, IntoHeaderValue, AUTHORIZATION}; use crate::http::header::{self, HeaderName, HeaderValue, IntoHeaderValue, AUTHORIZATION};
use crate::http::{ConnectionType, Error as HttpError, Method, StatusCode, Uri, Version}; use crate::http::{ConnectionType, Error as HttpError, Method, StatusCode, Uri, Version};
@ -327,18 +328,21 @@ impl WebsocketsRequest {
let max_size = self.max_size; let max_size = self.max_size;
let server_mode = self.server_mode; let server_mode = self.server_mode;
let fut = self.config.connector.open_tunnel(head, self.addr); let req = ConnectRequest::Tunnel(head, self.addr);
let fut = self.config.connector.call(req);
// set request timeout // set request timeout
let (head, framed) = if let Some(to) = self.config.timeout { let res = if let Some(to) = self.config.timeout {
timeout(to, fut) timeout(to, fut)
.await .await
.map_err(|_| SendRequestError::Timeout) .map_err(|_| SendRequestError::Timeout)??
.and_then(|res| res)?
} else { } else {
fut.await? fut.await?
}; };
let (head, framed) = res.into_tunnel_response();
// verify response // verify response
if head.status != StatusCode::SWITCHING_PROTOCOLS { if head.status != StatusCode::SWITCHING_PROTOCOLS {
return Err(WsClientError::InvalidResponseStatus(head.status)); return Err(WsClientError::InvalidResponseStatus(head.status));