1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-11 09:49:29 +00:00

refactor actix_http connection types and connector services (#2081)

This commit is contained in:
fakeshadow 2021-03-18 10:53:22 -07:00 committed by GitHub
parent abcb444dd9
commit b75b5114c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 867 additions and 611 deletions

View file

@ -1,10 +1,16 @@
# Changes
## Unreleased - 2021-xx-xx
### Added
* `client::Connector::handshake_timeout` method for customize tls connection handshake timeout. [#2081]
* `client::ConnectorService` as `client::Connector::finish` method's return type [#2081]
* `client::ConnectionIo` trait alias [#2081]
### Chaged
* `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063]
[#2063]: https://github.com/actix/actix-web/pull/2063
[#2081]: https://github.com/actix/actix-web/pull/2081
## 3.0.0-beta.4 - 2021-03-08

View file

@ -8,6 +8,7 @@ const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB
#[derive(Clone)]
pub(crate) struct ConnectorConfig {
pub(crate) timeout: Duration,
pub(crate) handshake_timeout: Duration,
pub(crate) conn_lifetime: Duration,
pub(crate) conn_keep_alive: Duration,
pub(crate) disconnect_timeout: Option<Duration>,
@ -21,6 +22,7 @@ impl Default for ConnectorConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(5),
handshake_timeout: Duration::from_secs(5),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Some(Duration::from_millis(3000)),

View file

@ -1,14 +1,16 @@
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io, time};
use std::{
io,
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
time,
};
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use actix_rt::task::JoinHandle;
use bytes::Bytes;
use futures_core::future::LocalBoxFuture;
use h2::client::SendRequest;
use pin_project::pin_project;
use crate::body::MessageBody;
use crate::h1::ClientCodec;
@ -19,32 +21,148 @@ use super::error::SendRequestError;
use super::pool::Acquired;
use super::{h1proto, h2proto};
/// Trait alias for types impl [tokio::io::AsyncRead] and [tokio::io::AsyncWrite].
pub trait ConnectionIo: AsyncRead + AsyncWrite + Unpin + 'static {}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ConnectionIo for T {}
pub(crate) enum ConnectionType<Io> {
H1(Io),
H2(H2Connection),
/// HTTP client connection
pub struct H1Connection<Io: ConnectionIo> {
io: Option<Io>,
created: time::Instant,
acquired: Acquired<Io>,
}
/// `H2Connection` has two parts: `SendRequest` and `Connection`.
impl<Io: ConnectionIo> H1Connection<Io> {
/// close or release the connection to pool based on flag input
pub(super) fn on_release(&mut self, keep_alive: bool) {
if keep_alive {
self.release();
} else {
self.close();
}
}
/// Close connection
fn close(&mut self) {
let io = self.io.take().unwrap();
self.acquired.close(ConnectionInnerType::H1(io));
}
/// Release this connection to the connection pool
fn release(&mut self) {
let io = self.io.take().unwrap();
self.acquired
.release(ConnectionInnerType::H1(io), self.created);
}
fn io_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Io> {
Pin::new(self.get_mut().io.as_mut().unwrap())
}
}
impl<Io: ConnectionIo> AsyncRead for H1Connection<Io> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.io_pin_mut().poll_read(cx, buf)
}
}
impl<Io: ConnectionIo> AsyncWrite for H1Connection<Io> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.io_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io_pin_mut().poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.io_pin_mut().poll_shutdown(cx)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io_pin_mut().poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
self.io.as_ref().unwrap().is_write_vectored()
}
}
/// HTTP2 client connection
pub struct H2Connection<Io: ConnectionIo> {
io: Option<H2ConnectionInner>,
created: time::Instant,
acquired: Acquired<Io>,
}
impl<Io: ConnectionIo> Deref for H2Connection<Io> {
type Target = SendRequest<Bytes>;
fn deref(&self) -> &Self::Target {
&self.io.as_ref().unwrap().sender
}
}
impl<Io: ConnectionIo> DerefMut for H2Connection<Io> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.io.as_mut().unwrap().sender
}
}
impl<Io: ConnectionIo> H2Connection<Io> {
/// close or release the connection to pool based on flag input
pub(super) fn on_release(&mut self, close: bool) {
if close {
self.close();
} else {
self.release();
}
}
/// Close connection
fn close(&mut self) {
let io = self.io.take().unwrap();
self.acquired.close(ConnectionInnerType::H2(io));
}
/// Release this connection to the connection pool
fn release(&mut self) {
let io = self.io.take().unwrap();
self.acquired
.release(ConnectionInnerType::H2(io), self.created);
}
}
/// `H2ConnectionInner` has two parts: `SendRequest` and `Connection`.
///
/// `Connection` is spawned as an async task on runtime and `H2Connection` holds a handle for
/// this task. Therefore, it can wake up and quit the task when SendRequest is dropped.
pub(crate) struct H2Connection {
/// `Connection` is spawned as an async task on runtime and `H2ConnectionInner` holds a handle
/// for this task. Therefore, it can wake up and quit the task when SendRequest is dropped.
pub(super) struct H2ConnectionInner {
handle: JoinHandle<()>,
sender: SendRequest<Bytes>,
}
impl H2Connection {
pub(crate) fn new<Io>(
impl H2ConnectionInner {
pub(super) fn new<Io: ConnectionIo>(
sender: SendRequest<Bytes>,
connection: h2::client::Connection<Io>,
) -> Self
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
) -> Self {
let handle = actix_rt::spawn(async move {
let _ = connection.await;
});
@ -53,143 +171,80 @@ impl H2Connection {
}
}
// cancel spawned connection task on drop.
impl Drop for H2Connection {
/// Cancel spawned connection task on drop.
impl Drop for H2ConnectionInner {
fn drop(&mut self) {
self.handle.abort();
}
}
// only expose sender type to public.
impl Deref for H2Connection {
type Target = SendRequest<Bytes>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for H2Connection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin;
/// Send request and body
fn send_request<B, H>(
self,
head: H,
body: B,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
where
B: MessageBody + 'static,
H: Into<RequestHeadType> + 'static;
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType> + 'static>(
self,
head: H,
) -> LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
>;
}
#[doc(hidden)]
/// HTTP client connection
pub struct IoConnection<T>
where
T: AsyncWrite + Unpin + 'static,
{
io: Option<ConnectionType<T>>,
created: time::Instant,
pool: Acquired<T>,
}
impl<T> fmt::Debug for IoConnection<T>
where
T: AsyncWrite + Unpin + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.io {
Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io),
Some(ConnectionType::H2(_)) => write!(f, "H2Connection"),
None => write!(f, "Connection(Empty)"),
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
pub(crate) fn new(
io: ConnectionType<T>,
created: time::Instant,
pool: Acquired<T>,
) -> Self {
IoConnection {
pool,
created,
io: Some(io),
}
}
#[cfg(test)]
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
(self.io.unwrap(), self.created, self.pool)
}
async fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
mut self,
head: H,
body: B,
) -> Result<(ResponseHead, Payload), SendRequestError> {
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
h1proto::send_request(io, head.into(), body, self.created, self.pool)
.await
}
ConnectionType::H2(io) => {
h2proto::send_request(io, head.into(), body, self.created, self.pool)
.await
}
}
}
/// Send request, returns Response and Framed
async fn open_tunnel<H: Into<RequestHeadType>>(
mut self,
head: H,
) -> Result<(ResponseHead, Framed<T, ClientCodec>), SendRequestError> {
match self.io.take().unwrap() {
ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await,
ConnectionType::H2(io) => {
self.pool.release(ConnectionType::H2(io), self.created);
Err(SendRequestError::TunnelNotSupported)
}
}
}
}
#[allow(dead_code)]
pub(crate) enum EitherIoConnection<A, B>
/// Unified connection type cover Http1 Plain/Tls and Http2 protocols
pub enum Connection<A, B = Box<dyn ConnectionIo>>
where
A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static,
A: ConnectionIo,
B: ConnectionIo,
{
A(IoConnection<A>),
B(IoConnection<B>),
Tcp(ConnectionType<A>),
Tls(ConnectionType<B>),
}
impl<A, B> Connection for EitherIoConnection<A, B>
where
A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Io = EitherIo<A, B>;
/// Unified connection type cover Http1/2 protocols
pub enum ConnectionType<Io: ConnectionIo> {
H1(H1Connection<Io>),
H2(H2Connection<Io>),
}
fn send_request<RB, H>(
/// Helper type for storing connection types in pool.
pub(super) enum ConnectionInnerType<Io> {
H1(Io),
H2(H2ConnectionInner),
}
impl<Io: ConnectionIo> ConnectionType<Io> {
pub(super) fn from_pool(
inner: ConnectionInnerType<Io>,
created: time::Instant,
acquired: Acquired<Io>,
) -> Self {
match inner {
ConnectionInnerType::H1(io) => Self::from_h1(io, created, acquired),
ConnectionInnerType::H2(io) => Self::from_h2(io, created, acquired),
}
}
pub(super) fn from_h1(
io: Io,
created: time::Instant,
acquired: Acquired<Io>,
) -> Self {
Self::H1(H1Connection {
io: Some(io),
created,
acquired,
})
}
pub(super) fn from_h2(
io: H2ConnectionInner,
created: time::Instant,
acquired: Acquired<Io>,
) -> Self {
Self::H2(H2Connection {
io: Some(io),
created,
acquired,
})
}
}
impl<A, B> Connection<A, B>
where
A: ConnectionIo,
B: ConnectionIo,
{
/// Send a request through connection.
pub fn send_request<RB, H>(
self,
head: H,
body: RB,
@ -198,76 +253,106 @@ where
RB: MessageBody + 'static,
H: Into<RequestHeadType> + 'static,
{
match self {
EitherIoConnection::A(con) => Box::pin(con.send_request(head, body)),
EitherIoConnection::B(con) => Box::pin(con.send_request(head, body)),
}
Box::pin(async move {
match self {
Connection::Tcp(ConnectionType::H1(conn)) => {
h1proto::send_request(conn, head.into(), body).await
}
Connection::Tls(ConnectionType::H1(conn)) => {
h1proto::send_request(conn, head.into(), body).await
}
Connection::Tls(ConnectionType::H2(conn)) => {
h2proto::send_request(conn, head.into(), body).await
}
_ => unreachable!(
"Plain Tcp connection can be used only in Http1 protocol"
),
}
})
}
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType> + 'static>(
/// Send request, returns Response and Framed tunnel.
pub fn open_tunnel<H: Into<RequestHeadType> + 'static>(
self,
head: H,
) -> LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
Result<(ResponseHead, Framed<Connection<A, B>, ClientCodec>), SendRequestError>,
> {
match self {
EitherIoConnection::A(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::A)))
}),
EitherIoConnection::B(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::B)))
}),
}
Box::pin(async move {
match self {
Connection::Tcp(ConnectionType::H1(ref _conn)) => {
let (head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, framed))
}
Connection::Tls(ConnectionType::H1(ref _conn)) => {
let (head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, framed))
}
Connection::Tls(ConnectionType::H2(mut conn)) => {
conn.release();
Err(SendRequestError::TunnelNotSupported)
}
Connection::Tcp(ConnectionType::H2(_)) => {
unreachable!(
"Plain Tcp connection can be used only in Http1 protocol"
)
}
}
})
}
}
#[pin_project(project = EitherIoProj)]
pub enum EitherIo<A, B> {
A(#[pin] A),
B(#[pin] B),
}
impl<A, B> AsyncRead for EitherIo<A, B>
impl<A, B> AsyncRead for Connection<A, B>
where
A: AsyncRead,
B: AsyncRead,
A: ConnectionIo,
B: ConnectionIo,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.project() {
EitherIoProj::A(val) => val.poll_read(cx, buf),
EitherIoProj::B(val) => val.poll_read(cx, buf),
match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_read(cx, buf)
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_read(cx, buf)
}
_ => unreachable!("H2Connection can not impl AsyncRead trait"),
}
}
}
impl<A, B> AsyncWrite for EitherIo<A, B>
const H2_UNREACHABLE_WRITE: &'static str = "H2Connection can not impl AsyncWrite trait";
impl<A, B> AsyncWrite for Connection<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
A: ConnectionIo,
B: ConnectionIo,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match self.project() {
EitherIoProj::A(val) => val.poll_write(cx, buf),
EitherIoProj::B(val) => val.poll_write(cx, buf),
match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_write(cx, buf)
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_write(cx, buf)
}
_ => unreachable!(H2_UNREACHABLE_WRITE),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.project() {
EitherIoProj::A(val) => val.poll_flush(cx),
EitherIoProj::B(val) => val.poll_flush(cx),
match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_flush(cx),
Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_flush(cx),
_ => unreachable!(H2_UNREACHABLE_WRITE),
}
}
@ -275,9 +360,38 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
match self.project() {
EitherIoProj::A(val) => val.poll_shutdown(cx),
EitherIoProj::B(val) => val.poll_shutdown(cx),
match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_shutdown(cx)
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_shutdown(cx)
}
_ => unreachable!(H2_UNREACHABLE_WRITE),
}
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_write_vectored(cx, bufs)
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_write_vectored(cx, bufs)
}
_ => unreachable!(H2_UNREACHABLE_WRITE),
}
}
fn is_write_vectored(&self) -> bool {
match *self {
Connection::Tcp(ConnectionType::H1(ref conn)) => conn.is_write_vectored(),
Connection::Tls(ConnectionType::H1(ref conn)) => conn.is_write_vectored(),
_ => unreachable!(H2_UNREACHABLE_WRITE),
}
}
}
@ -300,10 +414,13 @@ mod test {
let tcp = TcpStream::connect(local).await.unwrap();
let (sender, connection) = h2::client::handshake(tcp).await.unwrap();
let conn = H2Connection::new(sender.clone(), connection);
let conn = H2ConnectionInner::new(sender.clone(), connection);
assert!(sender.clone().ready().await.is_ok());
assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok());
assert!(h2::client::SendRequest::clone(&conn.sender)
.ready()
.await
.is_ok());
drop(conn);

View file

@ -3,22 +3,26 @@ use std::{
future::Future,
net::IpAddr,
pin::Pin,
rc::Rc,
task::{Context, Poll},
time::Duration,
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream;
use actix_service::{apply_fn, Service, ServiceExt};
use actix_tls::connect::{
new_connector, Connect as TcpConnect, Connection as TcpConnection, Resolver,
use actix_rt::{
net::TcpStream,
time::{sleep, Sleep},
};
use actix_utils::timeout::{TimeoutError, TimeoutService};
use futures_core::ready;
use actix_service::Service;
use actix_tls::connect::{
new_connector, Connect as TcpConnect, ConnectError as TcpConnectError,
Connection as TcpConnection, Resolver,
};
use futures_core::{future::LocalBoxFuture, ready};
use http::Uri;
use pin_project::pin_project;
use super::config::ConnectorConfig;
use super::connection::{Connection, ConnectionIo, EitherIoConnection};
use super::connection::{Connection, ConnectionIo};
use super::error::ConnectError;
use super::pool::ConnectionPool;
use super::Connect;
@ -28,18 +32,15 @@ use super::Protocol;
use actix_tls::connect::ssl::openssl::SslConnector as OpensslConnector;
#[cfg(feature = "rustls")]
use actix_tls::connect::ssl::rustls::ClientConfig;
#[cfg(feature = "rustls")]
use std::sync::Arc;
#[cfg(any(feature = "openssl", feature = "rustls"))]
enum SslConnector {
#[allow(dead_code)]
None,
#[cfg(feature = "openssl")]
Openssl(OpensslConnector),
#[cfg(feature = "rustls")]
Rustls(Arc<ClientConfig>),
Rustls(std::sync::Arc<ClientConfig>),
}
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
type SslConnector = ();
/// Manages HTTP client network connectivity.
///
@ -104,23 +105,25 @@ impl Connector<()> {
config.root_store.add_server_trust_anchors(
&actix_tls::connect::ssl::rustls::TLS_SERVER_ROOTS,
);
SslConnector::Rustls(Arc::new(config))
SslConnector::Rustls(std::sync::Arc::new(config))
}
// ssl turned off, provides empty ssl connector
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
fn build_ssl(_: Vec<Vec<u8>>) -> SslConnector {}
fn build_ssl(_: Vec<Vec<u8>>) -> SslConnector {
SslConnector::None
}
}
impl<T> Connector<T> {
impl<S> Connector<S> {
/// Use custom connector.
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1>
pub fn connector<S1, Io1>(self, connector: S1) -> Connector<S1>
where
U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
T1: Service<
Io1: ConnectionIo + fmt::Debug,
S1: Service<
TcpConnect<Uri>,
Response = TcpConnection<Uri, U1>,
Error = actix_tls::connect::ConnectError,
Response = TcpConnection<Uri, Io1>,
Error = TcpConnectError,
> + Clone,
{
Connector {
@ -131,23 +134,30 @@ impl<T> Connector<T> {
}
}
impl<T, U> Connector<T>
impl<S, Io> Connector<S>
where
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
T: Service<
Io: ConnectionIo + fmt::Debug,
S: Service<
TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_tls::connect::ConnectError,
Response = TcpConnection<Uri, Io>,
Error = TcpConnectError,
> + Clone
+ 'static,
{
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Set to 5 second by default.
/// Tcp connection timeout, i.e. max time to connect to remote host including dns name
/// resolution. Set to 5 second by default.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = timeout;
self
}
/// Tls handshake timeout, i.e. max time to do tls handshake with remote host after tcp
/// connection established. Set to 5 second by default.
pub fn handshake_timeout(mut self, timeout: Duration) -> Self {
self.config.handshake_timeout = timeout;
self
}
#[cfg(feature = "openssl")]
/// Use custom `SslConnector` instance.
pub fn ssl(mut self, connector: OpensslConnector) -> Self {
@ -157,7 +167,7 @@ where
#[cfg(feature = "rustls")]
/// Use custom `SslConnector` instance.
pub fn rustls(mut self, connector: Arc<ClientConfig>) -> Self {
pub fn rustls(mut self, connector: std::sync::Arc<ClientConfig>) -> Self {
self.ssl = SslConnector::Rustls(connector);
self
}
@ -247,158 +257,369 @@ where
/// Finish configuration process and create connector service.
/// The Connector builder always concludes by calling `finish()` last in
/// its combinator chain.
pub fn finish(
self,
) -> impl Service<Connect, Response = impl Connection, Error = ConnectError> {
pub fn finish(self) -> ConnectorService<S, Io> {
let local_address = self.config.local_address;
let timeout = self.config.timeout;
let tcp_service = TimeoutService::new(
timeout,
apply_fn(self.connector.clone(), move |msg: Connect, srv| {
let mut req = TcpConnect::new(msg.uri).set_addr(msg.addr);
let tcp_service_inner =
TcpConnectorInnerService::new(self.connector, timeout, local_address);
if let Some(local_addr) = local_address {
req = req.set_local_addr(local_addr);
#[allow(clippy::redundant_clone)]
let tcp_service = TcpConnectorService {
service: tcp_service_inner.clone(),
};
let tls_service = match self.ssl {
SslConnector::None => None,
#[cfg(feature = "openssl")]
SslConnector::Openssl(tls) => {
const H2: &[u8] = b"h2";
use actix_tls::connect::ssl::openssl::{OpensslConnector, SslStream};
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, SslStream<Io>> {
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
let sock = self.into_parts().0;
let h2 = sock
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(Box::new(sock), Protocol::Http2)
} else {
(Box::new(sock), Protocol::Http1)
}
}
}
srv.call(req)
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectError::Timeout,
});
let handshake_timeout = self.config.handshake_timeout;
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
{
// A dummy service for annotate tls pool's type signature.
pub type DummyService = Box<
dyn Service<
Connect,
Response = (Box<dyn ConnectionIo>, Protocol),
Error = ConnectError,
Future = futures_core::future::LocalBoxFuture<
'static,
Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
>,
>,
>;
let tls_service = TlsConnectorService {
tcp_service: tcp_service_inner,
tls_service: OpensslConnector::service(tls),
timeout: handshake_timeout,
};
InnerConnector::<_, DummyService, _, Box<dyn ConnectionIo>> {
tcp_pool: ConnectionPool::new(
tcp_service,
self.config.no_disconnect_timeout(),
),
tls_pool: None,
Some(actix_service::boxed::rc_service(tls_service))
}
}
#[cfg(any(feature = "openssl", feature = "rustls"))]
{
const H2: &[u8] = b"h2";
use actix_service::{boxed::service, pipeline};
#[cfg(feature = "openssl")]
use actix_tls::connect::ssl::openssl::OpensslConnector;
#[cfg(feature = "rustls")]
use actix_tls::connect::ssl::rustls::{RustlsConnector, Session};
SslConnector::Rustls(tls) => {
const H2: &[u8] = b"h2";
let ssl_service = TimeoutService::new(
timeout,
pipeline(
apply_fn(self.connector.clone(), move |msg: Connect, srv| {
let mut req = TcpConnect::new(msg.uri).set_addr(msg.addr);
use actix_tls::connect::ssl::rustls::{
RustlsConnector, Session, TlsStream,
};
if let Some(local_addr) = local_address {
req = req.set_local_addr(local_addr);
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> {
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
let sock = self.into_parts().0;
let h2 = sock
.get_ref()
.1
.get_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(Box::new(sock), Protocol::Http2)
} else {
(Box::new(sock), Protocol::Http1)
}
}
}
srv.call(req)
})
.map_err(ConnectError::from),
)
.and_then(match self.ssl {
#[cfg(feature = "openssl")]
SslConnector::Openssl(ssl) => service(
OpensslConnector::service(ssl)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(
Box::new(sock) as Box<dyn ConnectionIo>,
Protocol::Http2,
)
} else {
(Box::new(sock) as _, Protocol::Http1)
}
})
.map_err(ConnectError::from),
),
#[cfg(feature = "rustls")]
SslConnector::Rustls(ssl) => service(
RustlsConnector::service(ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.1
.get_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(Box::new(sock) as _, Protocol::Http2)
} else {
(Box::new(sock) as _, Protocol::Http1)
}
}),
),
}),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectError::Timeout,
});
let handshake_timeout = self.config.handshake_timeout;
InnerConnector {
tcp_pool: ConnectionPool::new(
tcp_service,
self.config.no_disconnect_timeout(),
),
tls_pool: Some(ConnectionPool::new(ssl_service, self.config)),
let tls_service = TlsConnectorService {
tcp_service: tcp_service_inner,
tls_service: RustlsConnector::service(tls),
timeout: handshake_timeout,
};
Some(actix_service::boxed::rc_service(tls_service))
}
};
let tcp_config = self.config.no_disconnect_timeout();
let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
let tls_config = self.config;
let tls_pool = tls_service
.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
ConnectorServicePriv { tcp_pool, tls_pool }
}
}
/// tcp service for map `TcpConnection<Uri, Io>` type to `(Io, Protocol)`
#[derive(Clone)]
pub struct TcpConnectorService<S: Clone> {
service: S,
}
impl<S, Io> Service<Connect> for TcpConnectorService<S>
where
S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError>
+ Clone
+ 'static,
{
type Response = (Io, Protocol);
type Error = ConnectError;
type Future = TcpConnectorFuture<S::Future>;
actix_service::forward_ready!(service);
fn call(&self, req: Connect) -> Self::Future {
TcpConnectorFuture {
fut: self.service.call(req),
}
}
}
struct InnerConnector<S1, S2, Io1, Io2>
#[pin_project]
pub struct TcpConnectorFuture<Fut> {
#[pin]
fut: Fut,
}
impl<Fut, Io> Future for TcpConnectorFuture<Fut>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
Fut: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
{
type Output = Result<(Io, Protocol), ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project()
.fut
.poll(cx)
.map_ok(|res| (res.into_parts().0, Protocol::Http1))
}
}
/// service for establish tcp connection and do client tls handshake.
/// operation is canceled when timeout limit reached.
struct TlsConnectorService<S, St> {
/// tcp connection is canceled on `TcpConnectorInnerService`'s timeout setting.
tcp_service: S,
/// tls connection is canceled on `TlsConnectorService`'s timeout setting.
tls_service: St,
timeout: Duration,
}
impl<S, St, Io, Res> Service<Connect> for TlsConnectorService<S, St>
where
S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError>
+ Clone
+ 'static,
St: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error>
+ Clone
+ 'static,
Io: ConnectionIo,
Res: IntoConnectionIo,
{
type Response = (Box<dyn ConnectionIo>, Protocol);
type Error = ConnectError;
type Future = TlsConnectorFuture<St, S::Future, St::Future>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.tcp_service.poll_ready(cx))?;
ready!(self.tls_service.poll_ready(cx))?;
Poll::Ready(Ok(()))
}
fn call(&self, req: Connect) -> Self::Future {
let fut = self.tcp_service.call(req);
let tls_service = self.tls_service.clone();
let timeout = self.timeout;
TlsConnectorFuture::TcpConnect {
fut,
tls_service: Some(tls_service),
timeout,
}
}
}
#[pin_project(project = TlsConnectorProj)]
#[allow(clippy::large_enum_variant)]
enum TlsConnectorFuture<S, Fut1, Fut2> {
TcpConnect {
#[pin]
fut: Fut1,
tls_service: Option<S>,
timeout: Duration,
},
TlsConnect {
#[pin]
fut: Fut2,
#[pin]
timeout: Sleep,
},
}
/// helper trait for generic over different TlsStream types between tls crates.
trait IntoConnectionIo {
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
}
impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
where
S: Service<
TcpConnection<Uri, Io>,
Response = Res,
Error = std::io::Error,
Future = Fut2,
>,
Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
Fut2: Future<Output = Result<S::Response, S::Error>>,
Io: ConnectionIo,
Res: IntoConnectionIo,
{
type Output = Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
TlsConnectorProj::TcpConnect {
fut,
tls_service,
timeout,
} => {
let res = ready!(fut.poll(cx))?;
let fut = tls_service
.take()
.expect("TlsConnectorFuture polled after complete")
.call(res);
let timeout = sleep(*timeout);
self.set(TlsConnectorFuture::TlsConnect { fut, timeout });
self.poll(cx)
}
TlsConnectorProj::TlsConnect { fut, timeout } => match fut.poll(cx)? {
Poll::Ready(res) => Poll::Ready(Ok(res.into_connection_io())),
Poll::Pending => timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
},
}
}
}
/// service for establish tcp connection.
/// operation is canceled when timeout limit reached.
#[derive(Clone)]
pub struct TcpConnectorInnerService<S: Clone> {
service: S,
timeout: Duration,
local_address: Option<std::net::IpAddr>,
}
impl<S: Clone> TcpConnectorInnerService<S> {
fn new(
service: S,
timeout: Duration,
local_address: Option<std::net::IpAddr>,
) -> Self {
Self {
service,
timeout,
local_address,
}
}
}
impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
where
S: Service<
TcpConnect<Uri>,
Response = TcpConnection<Uri, Io>,
Error = TcpConnectError,
> + Clone
+ 'static,
{
type Response = S::Response;
type Error = ConnectError;
type Future = TcpConnectorInnerFuture<S::Future>;
actix_service::forward_ready!(service);
fn call(&self, req: Connect) -> Self::Future {
let mut req = TcpConnect::new(req.uri).set_addr(req.addr);
if let Some(local_addr) = self.local_address {
req = req.set_local_addr(local_addr);
}
TcpConnectorInnerFuture {
fut: self.service.call(req),
timeout: sleep(self.timeout),
}
}
}
#[pin_project]
pub struct TcpConnectorInnerFuture<Fut> {
#[pin]
fut: Fut,
#[pin]
timeout: Sleep,
}
impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
where
Fut: Future<Output = Result<TcpConnection<Uri, Io>, TcpConnectError>>,
{
type Output = Result<TcpConnection<Uri, Io>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.fut.poll(cx) {
Poll::Ready(res) => Poll::Ready(res.map_err(ConnectError::from)),
Poll::Pending => this.timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
}
}
}
/// Connector service for pooled Plain/Tls Tcp connections.
pub type ConnectorService<S, Io> = ConnectorServicePriv<
TcpConnectorService<TcpConnectorInnerService<S>>,
Rc<
dyn Service<
Connect,
Response = (Box<dyn ConnectionIo>, Protocol),
Error = ConnectError,
Future = LocalBoxFuture<
'static,
Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
>,
>,
>,
Io,
Box<dyn ConnectionIo>,
>;
pub struct ConnectorServicePriv<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
Io1: ConnectionIo,
Io2: ConnectionIo,
{
tcp_pool: ConnectionPool<S1, Io1>,
tls_pool: Option<ConnectionPool<S2, Io2>>,
}
impl<S1, S2, Io1, Io2> Service<Connect> for InnerConnector<S1, S2, Io1, Io2>
impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: ConnectionIo,
Io2: ConnectionIo,
{
type Response = EitherIoConnection<Io1, Io2>;
type Response = Connection<Io1, Io2>;
type Error = ConnectError;
type Future = InnerConnectorResponse<S1, S2, Io1, Io2>;
type Future = ConnectorServiceFuture<S1, S2, Io1, Io2>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.tcp_pool.poll_ready(cx))?;
@ -411,41 +632,49 @@ where
fn call(&self, req: Connect) -> Self::Future {
match req.uri.scheme_str() {
Some("https") | Some("wss") => match self.tls_pool {
None => InnerConnectorResponse::SslIsNotSupported,
Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)),
None => ConnectorServiceFuture::SslIsNotSupported,
Some(ref pool) => ConnectorServiceFuture::Tls(pool.call(req)),
},
_ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)),
_ => ConnectorServiceFuture::Tcp(self.tcp_pool.call(req)),
}
}
}
#[pin_project::pin_project(project = InnerConnectorProj)]
enum InnerConnectorResponse<S1, S2, Io1, Io2>
#[pin_project(project = ConnectorServiceProj)]
pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: ConnectionIo,
Io2: ConnectionIo,
{
Io1(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
Io2(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
Tcp(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
Tls(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
SslIsNotSupported,
}
impl<S1, S2, Io1, Io2> Future for InnerConnectorResponse<S1, S2, Io1, Io2>
impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: ConnectionIo,
Io2: ConnectionIo,
{
type Output = Result<EitherIoConnection<Io1, Io2>, ConnectError>;
type Output = Result<Connection<Io1, Io2>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
InnerConnectorProj::Io1(fut) => fut.poll(cx).map_ok(EitherIoConnection::A),
InnerConnectorProj::Io2(fut) => fut.poll(cx).map_ok(EitherIoConnection::B),
InnerConnectorProj::SslIsNotSupported => {
ConnectorServiceProj::Tcp(fut) => fut.poll(cx).map_ok(Connection::Tcp),
ConnectorServiceProj::Tls(fut) => fut.poll(cx).map_ok(Connection::Tls),
ConnectorServiceProj::SslIsNotSupported => {
Poll::Ready(Err(ConnectError::SslIsNotSupported))
}
}

View file

@ -1,9 +1,10 @@
use std::io::Write;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{io, time};
use std::{
io::Write,
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use actix_codec::Framed;
use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
@ -11,28 +12,24 @@ use futures_util::{future::poll_fn, SinkExt as _};
use crate::error::PayloadError;
use crate::h1;
use crate::header::HeaderMap;
use crate::http::{
header::{IntoHeaderValue, EXPECT, HOST},
header::{HeaderMap, IntoHeaderValue, EXPECT, HOST},
StatusCode,
};
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream};
use super::connection::ConnectionType;
use super::connection::{ConnectionIo, H1Connection};
use super::error::{ConnectError, SendRequestError};
use super::pool::Acquired;
use crate::body::{BodySize, MessageBody};
pub(crate) async fn send_request<T, B>(
io: T,
pub(crate) async fn send_request<Io, B>(
io: H1Connection<Io>,
mut head: RequestHeadType,
body: B,
created: time::Instant,
acquired: Acquired<T>,
) -> Result<(ResponseHead, Payload), SendRequestError>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
Io: ConnectionIo,
B: MessageBody,
{
// set request host header
@ -62,12 +59,6 @@ where
}
}
let io = H1Connection {
created,
acquired,
io: Some(io),
};
// create Framed and prepare sending request
let mut framed = Framed::new(io, h1::ClientCodec::default());
@ -138,18 +129,18 @@ where
}
}
pub(crate) async fn open_tunnel<T>(
io: T,
pub(crate) async fn open_tunnel<Io>(
io: Io,
head: RequestHeadType,
) -> Result<(ResponseHead, Framed<T, h1::ClientCodec>), SendRequestError>
) -> Result<(ResponseHead, Framed<Io, h1::ClientCodec>), SendRequestError>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
Io: ConnectionIo,
{
// create Framed and send request
// create Framed and send request.
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, BodySize::None).into()).await?;
// read response
// read response head.
let head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx))
.await
.ok_or(ConnectError::Disconnected)??;
@ -158,12 +149,12 @@ where
}
/// send request body to the peer
pub(crate) async fn send_body<T, B>(
pub(crate) async fn send_body<Io, B>(
body: B,
mut framed: Pin<&mut Framed<T, h1::ClientCodec>>,
mut framed: Pin<&mut Framed<Io, h1::ClientCodec>>,
) -> Result<(), SendRequestError>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
Io: ConnectionIo,
B: MessageBody,
{
actix_rt::pin!(body);
@ -202,92 +193,16 @@ where
Ok(())
}
#[doc(hidden)]
/// HTTP client connection
pub struct H1Connection<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
/// T should be `Unpin`
io: Option<T>,
created: time::Instant,
acquired: Acquired<T>,
}
impl<T> H1Connection<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
fn on_release(&mut self, keep_alive: bool) {
if keep_alive {
self.release();
} else {
self.close();
}
}
/// Close connection
fn close(&mut self) {
if let Some(io) = self.io.take() {
self.acquired.close(ConnectionType::H1(io));
}
}
/// Release this connection to the connection pool
fn release(&mut self) {
if let Some(io) = self.io.take() {
self.acquired.release(ConnectionType::H1(io), self.created);
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncRead for H1Connection<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io.as_mut().unwrap()).poll_read(cx, buf)
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io.as_mut().unwrap()).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.io.as_mut().unwrap()).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(self.io.as_mut().unwrap()).poll_shutdown(cx)
}
}
#[pin_project::pin_project]
pub(crate) struct PlStream<Io>
pub(crate) struct PlStream<Io: ConnectionIo>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
Io: ConnectionIo,
{
#[pin]
framed: Option<Framed<H1Connection<Io>, h1::ClientPayloadCodec>>,
}
impl<Io> PlStream<Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
impl<Io: ConnectionIo> PlStream<Io> {
fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
@ -297,10 +212,7 @@ where
}
}
impl<Io> Stream for PlStream<Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
impl<Io: ConnectionIo> Stream for PlStream<Io> {
type Item = Result<Bytes, PayloadError>;
fn poll_next(

View file

@ -1,7 +1,5 @@
use std::future::Future;
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use futures_util::future::poll_fn;
use h2::{
@ -17,20 +15,16 @@ use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload;
use super::config::ConnectorConfig;
use super::connection::ConnectionType;
use super::connection::{ConnectionIo, H2Connection};
use super::error::SendRequestError;
use super::pool::Acquired;
use crate::client::connection::H2Connection;
pub(crate) async fn send_request<T, B>(
mut io: H2Connection,
pub(crate) async fn send_request<Io, B>(
mut io: H2Connection<Io>,
head: RequestHeadType,
body: B,
created: time::Instant,
acquired: Acquired<T>,
) -> Result<(ResponseHead, Payload), SendRequestError>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
Io: ConnectionIo,
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.size());
@ -103,13 +97,13 @@ where
let res = poll_fn(|cx| io.poll_ready(cx)).await;
if let Err(e) = res {
release(io, acquired, created, e.is_io());
io.on_release(e.is_io());
return Err(SendRequestError::from(e));
}
let resp = match io.send_request(req, eof) {
Ok((fut, send)) => {
release(io, acquired, created, false);
io.on_release(false);
if !eof {
send_body(body, send).await?;
@ -117,7 +111,7 @@ where
fut.await.map_err(SendRequestError::from)?
}
Err(e) => {
release(io, acquired, created, e.is_io());
io.on_release(e.is_io());
return Err(e.into());
}
};
@ -178,26 +172,10 @@ async fn send_body<B: MessageBody>(
}
}
/// release SendRequest object
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
io: H2Connection,
acquired: Acquired<T>,
created: time::Instant,
close: bool,
) {
if close {
acquired.close(ConnectionType::H2(io));
} else {
acquired.release(ConnectionType::H2(io), created);
}
}
pub(crate) fn handshake<Io>(
pub(crate) fn handshake<Io: ConnectionIo>(
io: Io,
config: &ConnectorConfig,
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
let mut builder = Builder::new();
builder

View file

@ -15,7 +15,7 @@ pub use actix_tls::connect::{
};
pub use self::connection::{Connection, ConnectionIo};
pub use self::connector::Connector;
pub use self::connector::{Connector, ConnectorService};
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
pub use crate::Protocol;

View file

@ -1,34 +1,38 @@
//! Client connection pooling keyed on the authority part of the connection URI.
use std::collections::VecDeque;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{cell::RefCell, io};
use std::{
cell::RefCell,
collections::VecDeque,
future::Future,
io,
ops::Deref,
pin::Pin,
rc::Rc,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::time::{sleep, Sleep};
use actix_service::Service;
use ahash::AHashMap;
use futures_core::future::LocalBoxFuture;
use http::uri::Authority;
use pin_project::pin_project;
use tokio::io::ReadBuf;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use super::config::ConnectorConfig;
use super::connection::{ConnectionType, H2Connection, IoConnection};
use super::connection::{
ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner,
};
use super::error::ConnectError;
use super::h2proto::handshake;
use super::Connect;
use super::Protocol;
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(crate) struct Key {
pub struct Key {
authority: Authority,
}
@ -38,17 +42,18 @@ impl From<Authority> for Key {
}
}
#[doc(hidden)]
/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key.
pub(crate) struct ConnectionPool<S, Io>
pub struct ConnectionPool<S, Io>
where
Io: AsyncWrite + Unpin + 'static,
{
connector: Rc<S>,
connector: S,
inner: ConnectionPoolInner<Io>,
}
/// wrapper type for check the ref count of Rc.
struct ConnectionPoolInner<Io>(Rc<ConnectionPoolInnerPriv<Io>>)
pub struct ConnectionPoolInner<Io>(Rc<ConnectionPoolInnerPriv<Io>>)
where
Io: AsyncWrite + Unpin + 'static;
@ -56,10 +61,21 @@ impl<Io> ConnectionPoolInner<Io>
where
Io: AsyncWrite + Unpin + 'static,
{
fn new(config: ConnectorConfig) -> Self {
let permits = Arc::new(Semaphore::new(config.limit));
let available = RefCell::new(AHashMap::default());
Self(Rc::new(ConnectionPoolInnerPriv {
config,
available,
permits,
}))
}
/// spawn a async for graceful shutdown h1 Io type with a timeout.
fn close(&self, conn: ConnectionType<Io>) {
fn close(&self, conn: ConnectionInnerType<Io>) {
if let Some(timeout) = self.config.disconnect_timeout {
if let ConnectionType::H1(io) = conn {
if let ConnectionInnerType::H1(io) = conn {
actix_rt::spawn(CloseConnection::new(io, timeout));
}
}
@ -104,7 +120,7 @@ where
}
}
struct ConnectionPoolInnerPriv<Io>
pub struct ConnectionPoolInnerPriv<Io>
where
Io: AsyncWrite + Unpin + 'static,
{
@ -128,15 +144,7 @@ where
/// Any requests beyond limit would be wait in fifo order and get notified in async manner
/// by [`tokio::sync::Semaphore`]
pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self {
let permits = Arc::new(Semaphore::new(config.limit));
let available = RefCell::new(AHashMap::default());
let connector = Rc::new(connector);
let inner = ConnectionPoolInner(Rc::new(ConnectionPoolInnerPriv {
config,
available,
permits,
}));
let inner = ConnectionPoolInner::new(config);
Self { connector, inner }
}
@ -144,12 +152,14 @@ where
impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
where
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
Io: AsyncRead + AsyncWrite + Unpin + 'static,
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io: ConnectionIo,
{
type Response = IoConnection<Io>;
type Response = ConnectionType<Io>;
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::forward_ready!(connector);
@ -193,7 +203,7 @@ where
inner.close(c.conn);
} else {
// check if the connection is still usable
if let ConnectionType::H1(ref mut io) = c.conn {
if let ConnectionInnerType::H1(ref mut io) = c.conn {
let check = ConnectionCheckFuture { io };
match check.await {
ConnectionState::Tainted => {
@ -221,7 +231,9 @@ where
// match the connection and spawn new one if did not get anything.
match conn {
Some(conn) => Ok(IoConnection::new(conn.conn, conn.created, acquired)),
Some(conn) => {
Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired))
}
None => {
let (io, proto) = connector.call(req).await?;
@ -229,19 +241,12 @@ where
assert!(proto != Protocol::Http3);
if proto == Protocol::Http1 {
Ok(IoConnection::new(
ConnectionType::H1(io),
Instant::now(),
acquired,
))
Ok(ConnectionType::from_h1(io, Instant::now(), acquired))
} else {
let config = &acquired.inner.config;
let (sender, connection) = handshake(io, config).await?;
Ok(IoConnection::new(
ConnectionType::H2(H2Connection::new(sender, connection)),
Instant::now(),
acquired,
))
let inner = H2ConnectionInner::new(sender, connection);
Ok(ConnectionType::from_h2(inner, Instant::now(), acquired))
}
}
}
@ -292,7 +297,7 @@ where
}
struct PooledConnection<Io> {
conn: ConnectionType<Io>,
conn: ConnectionInnerType<Io>,
used: Instant,
created: Instant,
}
@ -332,26 +337,26 @@ where
}
}
pub(crate) struct Acquired<Io>
pub struct Acquired<Io>
where
Io: AsyncWrite + Unpin + 'static,
{
/// authority key for identify connection.
key: Key,
/// handle to connection pool.
inner: ConnectionPoolInner<Io>,
/// permit for limit concurrent in-flight connection for a Client object.
permit: OwnedSemaphorePermit,
}
impl<Io> Acquired<Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
impl<Io: ConnectionIo> Acquired<Io> {
/// Close the IO.
pub(crate) fn close(&self, conn: ConnectionType<Io>) {
pub(super) fn close(&self, conn: ConnectionInnerType<Io>) {
self.inner.close(conn);
}
/// Release IO back into pool.
pub(crate) fn release(&self, conn: ConnectionType<Io>, created: Instant) {
pub(super) fn release(&self, conn: ConnectionInnerType<Io>, created: Instant) {
let Acquired { key, inner, .. } = self;
inner
@ -376,7 +381,7 @@ mod test {
use http::Uri;
use super::*;
use crate::client::connection::IoConnection;
use crate::client::connection::ConnectionType;
/// A stream type that always returns pending on async read.
///
@ -423,6 +428,7 @@ mod test {
}
}
#[derive(Clone)]
struct TestPoolConnector {
generated: Rc<Cell<usize>>,
}
@ -441,12 +447,14 @@ mod test {
}
}
fn release<T>(conn: IoConnection<T>)
fn release<T>(conn: ConnectionType<T>)
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let (conn, created, acquired) = conn.into_parts();
acquired.release(conn, created);
match conn {
ConnectionType::H1(mut conn) => conn.on_release(true),
ConnectionType::H2(mut conn) => conn.on_release(false),
}
}
#[actix_rt::test]

View file

@ -1,6 +1,10 @@
# Changes
## Unreleased - 2021-xx-xx
### Changed
* `ConnectorService` type is renamed to `BoxConnectorService` [#2081]
[#2081]: https://github.com/actix/actix-web/pull/2081
## 3.0.0-beta.3 - 2021-03-08

View file

@ -6,7 +6,7 @@ use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::{
client::{Connector, TcpConnect, TcpConnectError, TcpConnection},
client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection},
http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri},
};
use actix_rt::net::TcpStream;
@ -15,7 +15,7 @@ use actix_service::{boxed, Service};
use crate::connect::DefaultConnector;
use crate::error::SendRequestError;
use crate::middleware::{NestTransform, Redirect, Transform};
use crate::{Client, ClientConfig, ConnectRequest, ConnectResponse, ConnectorService};
use crate::{Client, ClientConfig, ConnectRequest, ConnectResponse};
/// An HTTP Client builder
///
@ -234,7 +234,7 @@ where
/// Finish build process and create `Client` instance.
pub fn finish(self) -> Client
where
M: Transform<ConnectorService, ConnectRequest> + 'static,
M: Transform<DefaultConnector<ConnectorService<S, Io>>, ConnectRequest> + 'static,
M::Transform:
Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
{
@ -250,7 +250,7 @@ where
fn _finish(self) -> Client
where
M: Transform<ConnectorService, ConnectRequest> + 'static,
M: Transform<DefaultConnector<ConnectorService<S, Io>>, ConnectRequest> + 'static,
M::Transform:
Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
{
@ -269,16 +269,14 @@ where
connector = connector.local_address(val);
}
let connector = boxed::service(DefaultConnector::new(connector.finish()));
let connector = boxed::service(self.middleware.new_transform(connector));
let connector = DefaultConnector::new(connector.finish());
let connector = boxed::rc_service(self.middleware.new_transform(connector));
let config = ClientConfig {
headers: self.headers,
Client(ClientConfig {
headers: Rc::new(self.headers),
timeout: self.timeout,
connector,
};
Client(Rc::new(config))
})
}
}

View file

@ -2,6 +2,7 @@ use std::{
future::Future,
net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
@ -19,7 +20,7 @@ use futures_core::{future::LocalBoxFuture, ready};
use crate::response::ClientResponse;
pub type ConnectorService = Box<
pub type BoxConnectorService = Rc<
dyn Service<
ConnectRequest,
Response = ConnectResponse,
@ -28,6 +29,8 @@ pub type ConnectorService = Box<
>,
>;
pub type BoxedSocket = Box<dyn ConnectionIo>;
pub enum ConnectRequest {
Client(RequestHeadType, Body, Option<net::SocketAddr>),
Tunnel(RequestHead, Option<net::SocketAddr>),
@ -58,7 +61,7 @@ impl ConnectResponse {
}
}
pub(crate) struct DefaultConnector<S> {
pub struct DefaultConnector<S> {
connector: S,
}
@ -68,15 +71,14 @@ impl<S> DefaultConnector<S> {
}
}
impl<S> Service<ConnectRequest> for DefaultConnector<S>
impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
where
S: Service<ClientConnect, Error = ConnectError>,
S::Response: Connection,
<S::Response as Connection>::Io: 'static,
S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
Io: ConnectionIo,
{
type Response = ConnectResponse;
type Error = SendRequestError;
type Future = ConnectRequestFuture<S::Future, <S::Response as Connection>::Io>;
type Future = ConnectRequestFuture<S::Future, Io>;
actix_service::forward_ready!(connector);
@ -102,7 +104,10 @@ where
pin_project_lite::pin_project! {
#[project = ConnectRequestProj]
pub(crate) enum ConnectRequestFuture<Fut, Io> {
pub enum ConnectRequestFuture<Fut, Io>
where
Io: ConnectionIo
{
Connection {
#[pin]
fut: Fut,
@ -114,16 +119,15 @@ pin_project_lite::pin_project! {
Tunnel {
fut: LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Io, ClientCodec>), SendRequestError>,
Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
>,
}
}
}
impl<Fut, C, Io> Future for ConnectRequestFuture<Fut, Io>
impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
where
Fut: Future<Output = Result<C, ConnectError>>,
C: Connection<Io = Io>,
Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
Io: ConnectionIo,
{
type Output = Result<ConnectResponse, SendRequestError>;
@ -165,5 +169,3 @@ where
}
}
}
pub type BoxedSocket = Box<dyn ConnectionIo>;

View file

@ -23,7 +23,7 @@ pub struct FrozenClientRequest {
pub(crate) addr: Option<net::SocketAddr>,
pub(crate) response_decompress: bool,
pub(crate) timeout: Option<Duration>,
pub(crate) config: Rc<ClientConfig>,
pub(crate) config: ClientConfig,
}
impl FrozenClientRequest {
@ -51,7 +51,7 @@ impl FrozenClientRequest {
self.addr,
self.response_decompress,
self.timeout,
self.config.as_ref(),
&self.config,
body,
)
}
@ -62,7 +62,7 @@ impl FrozenClientRequest {
self.addr,
self.response_decompress,
self.timeout,
self.config.as_ref(),
&self.config,
value,
)
}
@ -73,7 +73,7 @@ impl FrozenClientRequest {
self.addr,
self.response_decompress,
self.timeout,
self.config.as_ref(),
&self.config,
value,
)
}
@ -88,7 +88,7 @@ impl FrozenClientRequest {
self.addr,
self.response_decompress,
self.timeout,
self.config.as_ref(),
&self.config,
stream,
)
}
@ -99,7 +99,7 @@ impl FrozenClientRequest {
self.addr,
self.response_decompress,
self.timeout,
self.config.as_ref(),
&self.config,
)
}
@ -168,7 +168,7 @@ impl FrozenSendBuilder {
self.req.addr,
self.req.response_decompress,
self.req.timeout,
self.req.config.as_ref(),
&self.req.config,
body,
)
}
@ -183,7 +183,7 @@ impl FrozenSendBuilder {
self.req.addr,
self.req.response_decompress,
self.req.timeout,
self.req.config.as_ref(),
&self.req.config,
value,
)
}
@ -198,7 +198,7 @@ impl FrozenSendBuilder {
self.req.addr,
self.req.response_decompress,
self.req.timeout,
self.req.config.as_ref(),
&self.req.config,
value,
)
}
@ -217,7 +217,7 @@ impl FrozenSendBuilder {
self.req.addr,
self.req.response_decompress,
self.req.timeout,
self.req.config.as_ref(),
&self.req.config,
stream,
)
}
@ -232,7 +232,7 @@ impl FrozenSendBuilder {
self.req.addr,
self.req.response_decompress,
self.req.timeout,
self.req.config.as_ref(),
&self.req.config,
)
}
}

View file

@ -121,7 +121,7 @@ pub mod test;
pub mod ws;
pub use self::builder::ClientBuilder;
pub use self::connect::{BoxedSocket, ConnectRequest, ConnectResponse, ConnectorService};
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
pub use self::request::ClientRequest;
pub use self::response::{ClientResponse, JsonBody, MessageBody};
@ -147,11 +147,12 @@ pub use self::sender::SendClientRequest;
/// }
/// ```
#[derive(Clone)]
pub struct Client(Rc<ClientConfig>);
pub struct Client(ClientConfig);
#[derive(Clone)]
pub(crate) struct ClientConfig {
pub(crate) connector: ConnectorService,
pub(crate) headers: HeaderMap,
pub(crate) connector: BoxConnectorService,
pub(crate) headers: Rc<HeaderMap>,
pub(crate) timeout: Option<Duration>,
}

View file

@ -189,7 +189,7 @@ where
// remove body
.call(ConnectRequest::Client(head, Body::None, addr));
self.as_mut().set(RedirectServiceFuture::Client {
self.set(RedirectServiceFuture::Client {
fut,
max_redirect_times,
uri: Some(uri),
@ -236,7 +236,7 @@ where
.unwrap()
.call(ConnectRequest::Client(head, body_new, addr));
self.as_mut().set(RedirectServiceFuture::Client {
self.set(RedirectServiceFuture::Client {
fut,
max_redirect_times,
uri: Some(uri),

View file

@ -57,7 +57,7 @@ pub struct ClientRequest {
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: Rc<ClientConfig>,
config: ClientConfig,
#[cfg(feature = "cookies")]
cookies: Option<CookieJar>,
@ -65,7 +65,7 @@ pub struct ClientRequest {
impl ClientRequest {
/// Create new client request builder.
pub(crate) fn new<U>(method: Method, uri: U, config: Rc<ClientConfig>) -> Self
pub(crate) fn new<U>(method: Method, uri: U, config: ClientConfig) -> Self
where
Uri: TryFrom<U>,
<Uri as TryFrom<U>>::Error: Into<HttpError>,
@ -398,7 +398,7 @@ impl ClientRequest {
slf.addr,
slf.response_decompress,
slf.timeout,
slf.config.as_ref(),
&slf.config,
body,
)
}
@ -414,7 +414,7 @@ impl ClientRequest {
slf.addr,
slf.response_decompress,
slf.timeout,
slf.config.as_ref(),
&slf.config,
value,
)
}
@ -432,7 +432,7 @@ impl ClientRequest {
slf.addr,
slf.response_decompress,
slf.timeout,
slf.config.as_ref(),
&slf.config,
value,
)
}
@ -452,7 +452,7 @@ impl ClientRequest {
slf.addr,
slf.response_decompress,
slf.timeout,
slf.config.as_ref(),
&slf.config,
stream,
)
}
@ -468,7 +468,7 @@ impl ClientRequest {
slf.addr,
slf.response_decompress,
slf.timeout,
slf.config.as_ref(),
&slf.config,
)
}

View file

@ -28,7 +28,6 @@
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::rc::Rc;
use std::{fmt, str};
use actix_codec::Framed;
@ -56,7 +55,7 @@ pub struct WebsocketsRequest {
addr: Option<SocketAddr>,
max_size: usize,
server_mode: bool,
config: Rc<ClientConfig>,
config: ClientConfig,
#[cfg(feature = "cookies")]
cookies: Option<CookieJar>,
@ -64,7 +63,7 @@ pub struct WebsocketsRequest {
impl WebsocketsRequest {
/// Create new WebSocket connection
pub(crate) fn new<U>(uri: U, config: Rc<ClientConfig>) -> Self
pub(crate) fn new<U>(uri: U, config: ClientConfig) -> Self
where
Uri: TryFrom<U>,
<Uri as TryFrom<U>>::Error: Into<HttpError>,