1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-09-27 22:01:56 +00:00

move actix_http::client module to awc (#2425)

This commit is contained in:
fakeshadow 2021-10-26 07:37:40 +08:00 committed by GitHub
parent d40b6748bc
commit d13854505f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 197 additions and 241 deletions

View file

@ -1,6 +1,11 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
### Removed
* `client` module. [#2425]
* `trust-dns` feature. [#2425]
[#2425]: https://github.com/actix/actix-web/pull/2425
## 3.0.0-beta.11 - 2021-10-20 ## 3.0.0-beta.11 - 2021-10-20

View file

@ -37,9 +37,6 @@ compress-brotli = ["brotli2", "__compress"]
compress-gzip = ["flate2", "__compress"] compress-gzip = ["flate2", "__compress"]
compress-zstd = ["zstd", "__compress"] compress-zstd = ["zstd", "__compress"]
# trust-dns as client dns resolver
trust-dns = ["trust-dns-resolver"]
# Internal (PRIVATE!) features used to aid testing and cheking feature status. # Internal (PRIVATE!) features used to aid testing and cheking feature status.
# Don't rely on these whatsoever. They may disappear at anytime. # Don't rely on these whatsoever. They may disappear at anytime.
__compress = [] __compress = []
@ -49,7 +46,7 @@ actix-service = "2.0.0"
actix-codec = "0.4.0" actix-codec = "0.4.0"
actix-utils = "3.0.0" actix-utils = "3.0.0"
actix-rt = "2.2" actix-rt = "2.2"
actix-tls = { version = "3.0.0-beta.7", features = ["accept", "connect"] } actix-tls = { version = "3.0.0-beta.7", features = ["accept"] }
ahash = "0.7" ahash = "0.7"
base64 = "0.13" base64 = "0.13"
@ -82,8 +79,6 @@ brotli2 = { version="0.3.2", optional = true }
flate2 = { version = "1.0.13", optional = true } flate2 = { version = "1.0.13", optional = true }
zstd = { version = "0.7", optional = true } zstd = { version = "0.7", optional = true }
trust-dns-resolver = { version = "0.20.0", optional = true }
[dev-dependencies] [dev-dependencies]
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] } actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] }

View file

@ -29,7 +29,6 @@ extern crate log;
pub mod body; pub mod body;
mod builder; mod builder;
pub mod client;
mod config; mod config;
#[cfg(feature = "__compress")] #[cfg(feature = "__compress")]

View file

@ -22,10 +22,10 @@ edition = "2018"
default = [] default = []
# rustls # rustls
rustls = ["tls-rustls", "actix-http/rustls"] rustls = ["tls-rustls", "actix-http/rustls", "awc/rustls"]
# openssl # openssl
openssl = ["tls-openssl", "actix-http/openssl"] openssl = ["tls-openssl", "actix-http/openssl", "awc/openssl"]
[dependencies] [dependencies]
actix-codec = "0.4.0" actix-codec = "0.4.0"

View file

@ -30,10 +30,10 @@ features = ["openssl", "rustls", "compress-brotli", "compress-gzip", "compress-z
default = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"] default = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"]
# openssl # openssl
openssl = ["tls-openssl", "actix-http/openssl"] openssl = ["tls-openssl", "actix-tls/openssl"]
# rustls # rustls
rustls = ["tls-rustls", "actix-http/rustls"] rustls = ["tls-rustls", "actix-tls/rustls"]
# Brotli algorithm content-encoding support # Brotli algorithm content-encoding support
compress-brotli = ["actix-http/compress-brotli", "__compress"] compress-brotli = ["actix-http/compress-brotli", "__compress"]
@ -46,7 +46,7 @@ compress-zstd = ["actix-http/compress-zstd", "__compress"]
cookies = ["cookie"] cookies = ["cookie"]
# trust-dns as dns resolver # trust-dns as dns resolver
trust-dns = ["actix-http/trust-dns"] trust-dns = ["trust-dns-resolver"]
# Internal (PRIVATE!) features used to aid testing and cheking feature status. # Internal (PRIVATE!) features used to aid testing and cheking feature status.
# Don't rely on these whatsoever. They may disappear at anytime. # Don't rely on these whatsoever. They may disappear at anytime.
@ -57,13 +57,18 @@ actix-codec = "0.4.0"
actix-service = "2.0.0" actix-service = "2.0.0"
actix-http = "3.0.0-beta.11" actix-http = "3.0.0-beta.11"
actix-rt = { version = "2.1", default-features = false } actix-rt = { version = "2.1", default-features = false }
actix-tls = { version = "3.0.0-beta.7", features = ["connect"] }
actix-utils = "3.0.0"
ahash = "0.7"
base64 = "0.13" base64 = "0.13"
bytes = "1" bytes = "1"
cfg-if = "1" cfg-if = "1"
cookie = { version = "0.15", features = ["percent-encode"], optional = true }
derive_more = "0.99.5" derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
futures-util = { version = "0.3.7", default-features = false }
h2 = "0.3"
http = "0.2"
itoa = "0.4" itoa = "0.4"
log =" 0.4" log =" 0.4"
mime = "0.3" mime = "0.3"
@ -73,9 +78,15 @@ rand = "0.8"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_urlencoded = "0.7" serde_urlencoded = "0.7"
tokio = { version = "1", features = ["sync"] }
cookie = { version = "0.15", features = ["percent-encode"], optional = true }
tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tls-rustls = { package = "rustls", version = "0.20.0", optional = true, features = ["dangerous_configuration"] } tls-rustls = { package = "rustls", version = "0.20.0", optional = true, features = ["dangerous_configuration"] }
trust-dns-resolver = { version = "0.20.0", optional = true }
[dev-dependencies] [dev-dependencies]
actix-web = { version = "4.0.0-beta.10", features = ["openssl"] } actix-web = { version = "4.0.0-beta.10", features = ["openssl"] }
actix-http = { version = "3.0.0-beta.11", features = ["openssl"] } actix-http = { version = "3.0.0-beta.11", features = ["openssl"] }

View file

@ -4,13 +4,11 @@ use std::net::IpAddr;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use actix_http::{ use actix_http::http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri};
client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection},
http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri},
};
use actix_rt::net::{ActixStream, TcpStream}; use actix_rt::net::{ActixStream, TcpStream};
use actix_service::{boxed, Service}; use actix_service::{boxed, Service};
use crate::client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection};
use crate::connect::DefaultConnector; use crate::connect::DefaultConnector;
use crate::error::SendRequestError; use crate::error::SendRequestError;
use crate::middleware::{NestTransform, Redirect, Transform}; use crate::middleware::{NestTransform, Redirect, Transform};

View file

@ -1,5 +1,4 @@
use std::net::IpAddr; use std::{net::IpAddr, time::Duration};
use std::time::Duration;
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2MB const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2MB
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB

View file

@ -12,10 +12,9 @@ use bytes::Bytes;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use h2::client::SendRequest; use h2::client::SendRequest;
use crate::h1::ClientCodec; use actix_http::{
use crate::message::{RequestHeadType, ResponseHead}; body::MessageBody, h1::ClientCodec, Error, Payload, RequestHeadType, ResponseHead,
use crate::payload::Payload; };
use crate::{body::MessageBody, Error};
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
@ -219,11 +218,7 @@ impl<Io: ConnectionIo> ConnectionType<Io> {
} }
} }
pub(super) fn from_h1( pub(super) fn from_h1(io: Io, created: time::Instant, acquired: Acquired<Io>) -> Self {
io: Io,
created: time::Instant,
acquired: Acquired<Io>,
) -> Self {
Self::H1(H1Connection { Self::H1(H1Connection {
io: Some(io), io: Some(io),
created, created,
@ -271,9 +266,7 @@ where
Connection::Tls(ConnectionType::H2(conn)) => { Connection::Tls(ConnectionType::H2(conn)) => {
h2proto::send_request(conn, head.into(), body).await h2proto::send_request(conn, head.into(), body).await
} }
_ => unreachable!( _ => unreachable!("Plain Tcp connection can be used only in Http1 protocol"),
"Plain Tcp connection can be used only in Http1 protocol"
),
} }
}) })
} }
@ -301,9 +294,7 @@ where
Err(SendRequestError::TunnelNotSupported) Err(SendRequestError::TunnelNotSupported)
} }
Connection::Tcp(ConnectionType::H2(_)) => { Connection::Tcp(ConnectionType::H2(_)) => {
unreachable!( unreachable!("Plain Tcp connection can be used only in Http1 protocol")
"Plain Tcp connection can be used only in Http1 protocol"
)
} }
} }
}) })
@ -321,12 +312,8 @@ where
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<()>> {
match self.get_mut() { match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => { Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf),
Pin::new(conn).poll_read(cx, buf) Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf),
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_read(cx, buf)
}
_ => unreachable!("H2Connection can not impl AsyncRead trait"), _ => unreachable!("H2Connection can not impl AsyncRead trait"),
} }
} }
@ -345,12 +332,8 @@ where
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
match self.get_mut() { match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => { Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf),
Pin::new(conn).poll_write(cx, buf) Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf),
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_write(cx, buf)
}
_ => unreachable!(H2_UNREACHABLE_WRITE), _ => unreachable!(H2_UNREACHABLE_WRITE),
} }
} }
@ -363,17 +346,10 @@ where
} }
} }
fn poll_shutdown( fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
match self.get_mut() { match self.get_mut() {
Connection::Tcp(ConnectionType::H1(conn)) => { Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx),
Pin::new(conn).poll_shutdown(cx) Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx),
}
Connection::Tls(ConnectionType::H1(conn)) => {
Pin::new(conn).poll_shutdown(cx)
}
_ => unreachable!(H2_UNREACHABLE_WRITE), _ => unreachable!(H2_UNREACHABLE_WRITE),
} }
} }

View file

@ -8,6 +8,7 @@ use std::{
time::Duration, time::Duration,
}; };
use actix_http::Protocol;
use actix_rt::{ use actix_rt::{
net::{ActixStream, TcpStream}, net::{ActixStream, TcpStream},
time::{sleep, Sleep}, time::{sleep, Sleep},
@ -19,14 +20,13 @@ use actix_tls::connect::{
}; };
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use http::Uri; use http::Uri;
use pin_project::pin_project; use pin_project_lite::pin_project;
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::{Connection, ConnectionIo}; use super::connection::{Connection, ConnectionIo};
use super::error::ConnectError; use super::error::ConnectError;
use super::pool::ConnectionPool; use super::pool::ConnectionPool;
use super::Connect; use super::Connect;
use super::Protocol;
enum SslConnector { enum SslConnector {
#[allow(dead_code)] #[allow(dead_code)]
@ -99,9 +99,7 @@ impl Connector<()> {
/// Build TLS connector with openssl, based on supplied ALPN protocols /// Build TLS connector with openssl, based on supplied ALPN protocols
#[cfg(all(feature = "openssl", not(feature = "rustls")))] #[cfg(all(feature = "openssl", not(feature = "rustls")))]
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector { fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
use actix_tls::connect::tls::openssl::{ use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
SslConnector as OpensslConnector, SslMethod,
};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
let mut alpn = BytesMut::with_capacity(20); let mut alpn = BytesMut::with_capacity(20);
@ -112,7 +110,7 @@ impl Connector<()> {
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
if let Err(err) = ssl.set_alpn_protos(&alpn) { if let Err(err) = ssl.set_alpn_protos(&alpn) {
error!("Can not set ALPN protocol: {:?}", err); log::error!("Can not set ALPN protocol: {:?}", err);
} }
SslConnector::Openssl(ssl.build()) SslConnector::Openssl(ssl.build())
@ -148,11 +146,8 @@ where
// This remap is to hide ActixStream's trait methods. They are not meant to be called // This remap is to hide ActixStream's trait methods. They are not meant to be called
// from user code. // from user code.
Io: ActixStream + fmt::Debug + 'static, Io: ActixStream + fmt::Debug + 'static,
S: Service< S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
TcpConnect<Uri>, + Clone
Response = TcpConnection<Uri, Io>,
Error = TcpConnectError,
> + Clone
+ 'static, + 'static,
{ {
/// Tcp connection timeout, i.e. max time to connect to remote host including dns name /// Tcp connection timeout, i.e. max time to connect to remote host including dns name
@ -171,10 +166,7 @@ where
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
/// Use custom `SslConnector` instance. /// Use custom `SslConnector` instance.
pub fn ssl( pub fn ssl(mut self, connector: actix_tls::connect::ssl::openssl::SslConnector) -> Self {
mut self,
connector: actix_tls::connect::ssl::openssl::SslConnector,
) -> Self {
self.ssl = SslConnector::Openssl(connector); self.ssl = SslConnector::Openssl(connector);
self self
} }
@ -328,10 +320,11 @@ where
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> { impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> {
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) { fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
let sock = self.into_parts().0; let sock = self.into_parts().0;
let h2 = let h2 = sock
sock.get_ref().1.alpn_protocol().map_or(false, |protos| { .get_ref()
protos.windows(2).any(|w| w == H2) .1
}); .alpn_protocol()
.map_or(false, |protos| protos.windows(2).any(|w| w == H2));
if h2 { if h2 {
(Box::new(sock), Protocol::Http2) (Box::new(sock), Protocol::Http2)
} else { } else {
@ -357,8 +350,8 @@ where
let tcp_pool = ConnectionPool::new(tcp_service, tcp_config); let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
let tls_config = self.config; let tls_config = self.config;
let tls_pool = tls_service let tls_pool =
.map(move |tls_service| ConnectionPool::new(tls_service, tls_config)); tls_service.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
ConnectorServicePriv { tcp_pool, tls_pool } ConnectorServicePriv { tcp_pool, tls_pool }
} }
@ -389,10 +382,12 @@ where
} }
} }
#[pin_project] pin_project! {
pub struct TcpConnectorFuture<Fut> { #[project = TcpConnectorFutureProj]
#[pin] pub struct TcpConnectorFuture<Fut> {
fut: Fut, #[pin]
fut: Fut,
}
} }
impl<Fut, Io> Future for TcpConnectorFuture<Fut> impl<Fut, Io> Future for TcpConnectorFuture<Fut>
@ -451,23 +446,25 @@ where
} }
} }
#[pin_project(project = TlsConnectorProj)] pin_project! {
#[allow(clippy::large_enum_variant)] #[project = TlsConnectorProj]
enum TlsConnectorFuture<S, Fut1, Fut2> { #[allow(clippy::large_enum_variant)]
TcpConnect { enum TlsConnectorFuture<S, Fut1, Fut2> {
#[pin] TcpConnect {
fut: Fut1, #[pin]
tls_service: Option<S>, fut: Fut1,
timeout: Duration, tls_service: Option<S>,
}, timeout: Duration,
TlsConnect { },
#[pin] TlsConnect {
fut: Fut2, #[pin]
#[pin] fut: Fut2,
timeout: Sleep, #[pin]
}, timeout: Sleep,
} },
}
}
/// helper trait for generic over different TlsStream types between tls crates. /// helper trait for generic over different TlsStream types between tls crates.
trait IntoConnectionIo { trait IntoConnectionIo {
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol); fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
@ -475,12 +472,7 @@ trait IntoConnectionIo {
impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2> impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
where where
S: Service< S: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error, Future = Fut2>,
TcpConnection<Uri, Io>,
Response = Res,
Error = std::io::Error,
Future = Fut2,
>,
S::Response: IntoConnectionIo, S::Response: IntoConnectionIo,
Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>, Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
Fut2: Future<Output = Result<S::Response, S::Error>>, Fut2: Future<Output = Result<S::Response, S::Error>>,
@ -522,11 +514,7 @@ pub struct TcpConnectorInnerService<S: Clone> {
} }
impl<S: Clone> TcpConnectorInnerService<S> { impl<S: Clone> TcpConnectorInnerService<S> {
fn new( fn new(service: S, timeout: Duration, local_address: Option<std::net::IpAddr>) -> Self {
service: S,
timeout: Duration,
local_address: Option<std::net::IpAddr>,
) -> Self {
Self { Self {
service, service,
timeout, timeout,
@ -537,11 +525,8 @@ impl<S: Clone> TcpConnectorInnerService<S> {
impl<S, Io> Service<Connect> for TcpConnectorInnerService<S> impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
where where
S: Service< S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
TcpConnect<Uri>, + Clone
Response = TcpConnection<Uri, Io>,
Error = TcpConnectError,
> + Clone
+ 'static, + 'static,
{ {
type Response = S::Response; type Response = S::Response;
@ -564,12 +549,14 @@ where
} }
} }
#[pin_project] pin_project! {
pub struct TcpConnectorInnerFuture<Fut> { #[project = TcpConnectorInnerFutureProj]
#[pin] pub struct TcpConnectorInnerFuture<Fut> {
fut: Fut, #[pin]
#[pin] fut: Fut,
timeout: Sleep, #[pin]
timeout: Sleep,
}
} }
impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut> impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
@ -618,12 +605,8 @@ where
impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2> impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
where where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
+ Clone S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
+ 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: ConnectionIo, Io1: ConnectionIo,
Io2: ConnectionIo, Io2: ConnectionIo,
{ {
@ -643,38 +626,46 @@ where
match req.uri.scheme_str() { match req.uri.scheme_str() {
Some("https") | Some("wss") => match self.tls_pool { Some("https") | Some("wss") => match self.tls_pool {
None => ConnectorServiceFuture::SslIsNotSupported, None => ConnectorServiceFuture::SslIsNotSupported,
Some(ref pool) => ConnectorServiceFuture::Tls(pool.call(req)), Some(ref pool) => ConnectorServiceFuture::Tls {
fut: pool.call(req),
},
},
_ => ConnectorServiceFuture::Tcp {
fut: self.tcp_pool.call(req),
}, },
_ => ConnectorServiceFuture::Tcp(self.tcp_pool.call(req)),
} }
} }
} }
#[pin_project(project = ConnectorServiceProj)] pin_project! {
pub enum ConnectorServiceFuture<S1, S2, Io1, Io2> #[project = ConnectorServiceFutureProj]
where pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> where
+ Clone S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
+ 'static, S1: Clone,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> S1: 'static,
+ Clone S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
+ 'static, S2: Clone,
Io1: ConnectionIo, S2: 'static,
Io2: ConnectionIo, Io1: ConnectionIo,
{ Io2: ConnectionIo,
Tcp(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future), {
Tls(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future), Tcp {
SslIsNotSupported, #[pin]
fut: <ConnectionPool<S1, Io1> as Service<Connect>>::Future
},
Tls {
#[pin]
fut: <ConnectionPool<S2, Io2> as Service<Connect>>::Future
},
SslIsNotSupported
}
} }
impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2> impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
where where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
+ Clone S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
+ 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: ConnectionIo, Io1: ConnectionIo,
Io2: ConnectionIo, Io2: ConnectionIo,
{ {
@ -682,9 +673,9 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() { match self.project() {
ConnectorServiceProj::Tcp(fut) => fut.poll(cx).map_ok(Connection::Tcp), ConnectorServiceFutureProj::Tcp { fut } => fut.poll(cx).map_ok(Connection::Tcp),
ConnectorServiceProj::Tls(fut) => fut.poll(cx).map_ok(Connection::Tls), ConnectorServiceFutureProj::Tls { fut } => fut.poll(cx).map_ok(Connection::Tls),
ConnectorServiceProj::SslIsNotSupported => { ConnectorServiceFutureProj::SslIsNotSupported => {
Poll::Ready(Err(ConnectError::SslIsNotSupported)) Poll::Ready(Err(ConnectError::SslIsNotSupported))
} }
} }

View file

@ -2,12 +2,13 @@ use std::{error::Error as StdError, fmt, io};
use derive_more::{Display, From}; use derive_more::{Display, From};
use actix_http::{
error::{Error, ParseError},
http::Error as HttpError,
};
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
use actix_tls::accept::openssl::SslError; use actix_tls::accept::openssl::SslError;
use crate::error::{Error, ParseError};
use crate::http::Error as HttpError;
/// A set of errors that can occur while connecting to an HTTP host /// A set of errors that can occur while connecting to an HTTP host
#[derive(Debug, Display, From)] #[derive(Debug, Display, From)]
#[non_exhaustive] #[non_exhaustive]

View file

@ -5,24 +5,25 @@ use std::{
}; };
use actix_codec::Framed; use actix_codec::Framed;
use actix_http::{
body::{BodySize, MessageBody},
error::PayloadError,
h1,
http::{
header::{HeaderMap, IntoHeaderValue, EXPECT, HOST},
StatusCode,
},
Error, Payload, RequestHeadType, ResponseHead,
};
use actix_utils::future::poll_fn; use actix_utils::future::poll_fn;
use bytes::buf::BufMut; use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use futures_util::SinkExt as _; use futures_util::SinkExt as _;
use pin_project_lite::pin_project;
use crate::h1;
use crate::http::{
header::{HeaderMap, IntoHeaderValue, EXPECT, HOST},
StatusCode,
};
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload;
use crate::{error::PayloadError, Error};
use super::connection::{ConnectionIo, H1Connection}; use super::connection::{ConnectionIo, H1Connection};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
use crate::body::{BodySize, MessageBody};
pub(crate) async fn send_request<Io, B>( pub(crate) async fn send_request<Io, B>(
io: H1Connection<Io>, io: H1Connection<Io>,
@ -194,10 +195,11 @@ where
Ok(()) Ok(())
} }
#[pin_project::pin_project] pin_project! {
pub(crate) struct PlStream<Io: ConnectionIo> { pub(crate) struct PlStream<Io: ConnectionIo> {
#[pin] #[pin]
framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>, framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
}
} }
impl<Io: ConnectionIo> PlStream<Io> { impl<Io: ConnectionIo> PlStream<Io> {
@ -211,10 +213,7 @@ impl<Io: ConnectionIo> PlStream<Io> {
impl<Io: ConnectionIo> Stream for PlStream<Io> { impl<Io: ConnectionIo> Stream for PlStream<Io> {
type Item = Result<Bytes, PayloadError>; type Item = Result<Bytes, PayloadError>;
fn poll_next( fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.project();
match ready!(this.framed.as_mut().next_item(cx)?) { match ready!(this.framed.as_mut().next_item(cx)?) {

View file

@ -8,13 +8,12 @@ use h2::{
}; };
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, Method, Version}; use http::{request::Request, Method, Version};
use log::trace;
use crate::{ use actix_http::{
body::{BodySize, MessageBody}, body::{BodySize, MessageBody},
header::HeaderMap, header::HeaderMap,
message::{RequestHeadType, ResponseHead}, Error, Payload, RequestHeadType, ResponseHead,
payload::Payload,
Error,
}; };
use super::{ use super::{
@ -131,10 +130,7 @@ where
Ok((head, payload)) Ok((head, payload))
} }
async fn send_body<B>( async fn send_body<B>(body: B, mut send: SendStream<Bytes>) -> Result<(), SendRequestError>
body: B,
mut send: SendStream<Bytes>,
) -> Result<(), SendRequestError>
where where
B: MessageBody, B: MessageBody,
B::Error: Into<Error>, B::Error: Into<Error>,
@ -184,8 +180,7 @@ where
pub(crate) fn handshake<Io: ConnectionIo>( pub(crate) fn handshake<Io: ConnectionIo>(
io: Io, io: Io,
config: &ConnectorConfig, config: &ConnectorConfig,
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>> ) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>> {
{
let mut builder = Builder::new(); let mut builder = Builder::new();
builder builder
.initial_window_size(config.stream_window_size) .initial_window_size(config.stream_window_size)

View file

@ -17,7 +17,6 @@ pub use actix_tls::connect::{
pub use self::connection::{Connection, ConnectionIo}; pub use self::connection::{Connection, ConnectionIo};
pub use self::connector::{Connector, ConnectorService}; pub use self::connector::{Connector, ConnectorService};
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
pub use crate::Protocol;
#[derive(Clone)] #[derive(Clone)]
pub struct Connect { pub struct Connect {

View file

@ -14,22 +14,20 @@ use std::{
}; };
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_http::Protocol;
use actix_rt::time::{sleep, Sleep}; use actix_rt::time::{sleep, Sleep};
use actix_service::Service; use actix_service::Service;
use ahash::AHashMap; use ahash::AHashMap;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use http::uri::Authority; use http::uri::Authority;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::{ use super::connection::{ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner};
ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner,
};
use super::error::ConnectError; use super::error::ConnectError;
use super::h2proto::handshake; use super::h2proto::handshake;
use super::Connect; use super::Connect;
use super::Protocol;
#[derive(Hash, Eq, PartialEq, Clone, Debug)] #[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct Key { pub struct Key {
@ -152,9 +150,7 @@ where
impl<S, Io> Service<Connect> for ConnectionPool<S, Io> impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
where where
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + Clone + 'static,
+ Clone
+ 'static,
Io: ConnectionIo, Io: ConnectionIo,
{ {
type Response = ConnectionType<Io>; type Response = ConnectionType<Io>;
@ -195,8 +191,8 @@ where
let config = &inner.config; let config = &inner.config;
let idle_dur = now - c.used; let idle_dur = now - c.used;
let age = now - c.created; let age = now - c.created;
let conn_ineligible = idle_dur > config.conn_keep_alive let conn_ineligible =
|| age > config.conn_lifetime; idle_dur > config.conn_keep_alive || age > config.conn_lifetime;
if conn_ineligible { if conn_ineligible {
// drop connections that are too old // drop connections that are too old
@ -231,9 +227,7 @@ where
// match the connection and spawn new one if did not get anything. // match the connection and spawn new one if did not get anything.
match conn { match conn {
Some(conn) => { Some(conn) => Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired)),
Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired))
}
None => { None => {
let (io, proto) = connector.call(req).await?; let (io, proto) = connector.call(req).await?;
@ -284,9 +278,7 @@ where
let mut read_buf = ReadBuf::new(&mut buf); let mut read_buf = ReadBuf::new(&mut buf);
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) { let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => ConnectionState::Tainted,
ConnectionState::Tainted
}
Poll::Pending => ConnectionState::Live, Poll::Pending => ConnectionState::Live,
_ => ConnectionState::Skip, _ => ConnectionState::Skip,
@ -302,11 +294,13 @@ struct PooledConnection<Io> {
created: Instant, created: Instant,
} }
#[pin_project] pin_project! {
struct CloseConnection<Io> { #[project = CloseConnectionProj]
io: Io, struct CloseConnection<Io> {
#[pin] io: Io,
timeout: Sleep, #[pin]
timeout: Sleep,
}
} }
impl<Io> CloseConnection<Io> impl<Io> CloseConnection<Io>
@ -413,17 +407,11 @@ mod test {
unimplemented!() unimplemented!()
} }
fn poll_flush( fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
unimplemented!() unimplemented!()
} }
fn poll_shutdown( fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }

View file

@ -8,16 +8,14 @@ use std::{
use actix_codec::Framed; use actix_codec::Framed;
use actix_http::{ use actix_http::{
body::Body, body::Body, h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead,
client::{
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
},
h1::ClientCodec,
Payload, RequestHead, RequestHeadType, ResponseHead,
}; };
use actix_service::Service; use actix_service::Service;
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use crate::client::{
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
};
use crate::response::ClientResponse; use crate::response::ClientResponse;
pub type BoxConnectorService = Rc< pub type BoxConnectorService = Rc<

View file

@ -1,15 +1,15 @@
//! HTTP client errors //! HTTP client errors
pub use actix_http::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use actix_http::{
pub use actix_http::error::PayloadError; error::PayloadError,
pub use actix_http::http::Error as HttpError; http::{header::HeaderValue, Error as HttpError, StatusCode},
pub use actix_http::ws::HandshakeError as WsHandshakeError; ws::{HandshakeError as WsHandshakeError, ProtocolError as WsProtocolError},
pub use actix_http::ws::ProtocolError as WsProtocolError; };
use derive_more::{Display, From};
use serde_json::error::Error as JsonError; use serde_json::error::Error as JsonError;
use actix_http::http::{header::HeaderValue, StatusCode}; pub use crate::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
use derive_more::{Display, From};
/// Websocket client error /// Websocket client error
#[derive(Debug, Display, From)] #[derive(Debug, Display, From)]

View file

@ -104,22 +104,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::{convert::TryFrom, rc::Rc, time::Duration};
#[cfg(feature = "cookies")]
pub use cookie;
pub use actix_http::{client::Connector, http};
use actix_http::{
client::{TcpConnect, TcpConnectError, TcpConnection},
http::{Error as HttpError, HeaderMap, Method, Uri},
RequestHead,
};
use actix_rt::net::TcpStream;
use actix_service::Service;
mod builder; mod builder;
mod client;
mod connect; mod connect;
pub mod error; pub mod error;
mod frozen; mod frozen;
@ -130,13 +116,29 @@ mod sender;
pub mod test; pub mod test;
pub mod ws; pub mod ws;
pub use actix_http::http;
#[cfg(feature = "cookies")]
pub use cookie;
pub use self::builder::ClientBuilder; pub use self::builder::ClientBuilder;
pub use self::client::Connector;
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
pub use self::request::ClientRequest; pub use self::request::ClientRequest;
pub use self::response::{ClientResponse, JsonBody, MessageBody}; pub use self::response::{ClientResponse, JsonBody, MessageBody};
pub use self::sender::SendClientRequest; pub use self::sender::SendClientRequest;
use std::{convert::TryFrom, rc::Rc, time::Duration};
use actix_http::{
http::{Error as HttpError, HeaderMap, Method, Uri},
RequestHead,
};
use actix_rt::net::TcpStream;
use actix_service::Service;
use self::client::{TcpConnect, TcpConnectError, TcpConnection};
/// An asynchronous HTTP and WebSocket client. /// An asynchronous HTTP and WebSocket client.
/// ///
/// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU /// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU

View file

@ -9,7 +9,6 @@ use std::{
use actix_http::{ use actix_http::{
body::Body, body::Body,
client::{InvalidUrl, SendRequestError},
http::{header, Method, StatusCode, Uri}, http::{header, Method, StatusCode, Uri},
RequestHead, RequestHeadType, RequestHead, RequestHeadType,
}; };
@ -19,6 +18,7 @@ use futures_core::ready;
use super::Transform; use super::Transform;
use crate::client::{InvalidUrl, SendRequestError};
use crate::connect::{ConnectRequest, ConnectResponse}; use crate::connect::{ConnectRequest, ConnectResponse};
use crate::ClientResponse; use crate::ClientResponse;