1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 11:31:09 +00:00

Http2 client configuration to improve performance (#1394)

* add defaults for http2 client configuration

* fix spaces

* Add changes text for extended H2 defaults buffers

* client: configurable H2 window sizes and max_http_version

* add H2 window size configuration and max_http_version to awc::ClientBuilder

* add awc::ClientBuilder H2 window sizes and max_http_version

* add test for H2 window size settings

* cleanup comment

* Apply code review fixes

* Code review fix for awc ClientBuilder

* Remove unnecessary comments on code review

* pin quote version to resolve build issue

* max_http_version to accept http::Version

* revert fix for quote broken build
This commit is contained in:
Maxim Vorobjov 2020-03-07 04:09:31 +02:00 committed by GitHub
parent a7d805aab7
commit 10e3e72595
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 302 additions and 110 deletions

View file

@ -6,6 +6,11 @@
* Update `actix-connect` and `actix-tls` dependency to 2.0.0-alpha.1 * Update `actix-connect` and `actix-tls` dependency to 2.0.0-alpha.1
* Change default initial window size and connection window size for HTTP2 to 2MB and 1MB respectively to improve download speed for awc when downloading large objects.
* client::Connector accepts initial_window_size and initial_connection_window_size HTTP2 configuration
* client::Connector allowing to set max_http_version to limit HTTP version to be used
## [2.0.0-alpha.1] - 2020-02-27 ## [2.0.0-alpha.1] - 2020-02-27

View file

@ -0,0 +1,39 @@
use std::time::Duration;
// These values are taken from hyper/src/proto/h2/client.rs
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
/// Connector configuration
#[derive(Clone)]
pub(crate) struct ConnectorConfig {
pub(crate) timeout: Duration,
pub(crate) conn_lifetime: Duration,
pub(crate) conn_keep_alive: Duration,
pub(crate) disconnect_timeout: Option<Duration>,
pub(crate) limit: usize,
pub(crate) conn_window_size: u32,
pub(crate) stream_window_size: u32,
}
impl Default for ConnectorConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(1),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Some(Duration::from_millis(3000)),
limit: 100,
conn_window_size: DEFAULT_H2_CONN_WINDOW,
stream_window_size: DEFAULT_H2_STREAM_WINDOW,
}
}
}
impl ConnectorConfig {
pub(crate) fn no_disconnect_timeout(&self) -> Self {
let mut res = self.clone();
res.disconnect_timeout = None;
res
}
}

View file

@ -11,6 +11,7 @@ use actix_service::{apply_fn, Service};
use actix_utils::timeout::{TimeoutError, TimeoutService}; use actix_utils::timeout::{TimeoutError, TimeoutService};
use http::Uri; use http::Uri;
use super::config::ConnectorConfig;
use super::connection::Connection; use super::connection::Connection;
use super::error::ConnectError; use super::error::ConnectError;
use super::pool::{ConnectionPool, Protocol}; use super::pool::{ConnectionPool, Protocol};
@ -48,11 +49,7 @@ type SslConnector = ();
/// ``` /// ```
pub struct Connector<T, U> { pub struct Connector<T, U> {
connector: T, connector: T,
timeout: Duration, config: ConnectorConfig,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Duration,
limit: usize,
#[allow(dead_code)] #[allow(dead_code)]
ssl: SslConnector, ssl: SslConnector,
_t: PhantomData<U>, _t: PhantomData<U>,
@ -71,42 +68,49 @@ impl Connector<(), ()> {
> + Clone, > + Clone,
TcpStream, TcpStream,
> { > {
let ssl = { Connector {
ssl: Self::build_ssl(vec![b"h2".to_vec(), b"http/1.1".to_vec()]),
connector: default_connector(),
config: ConnectorConfig::default(),
_t: PhantomData,
}
}
// Build Ssl connector with openssl, based on supplied alpn protocols
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector
{ {
use actix_connect::ssl::openssl::SslMethod; use actix_connect::ssl::openssl::SslMethod;
use bytes::{BufMut, BytesMut};
let mut alpn = BytesMut::with_capacity(20);
for proto in protocols.iter() {
alpn.put_u8(proto.len() as u8);
alpn.put(proto.as_slice());
}
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
let _ = ssl let _ = ssl
.set_alpn_protos(b"\x02h2\x08http/1.1") .set_alpn_protos(&alpn)
.map_err(|e| error!("Can not set alpn protocol: {:?}", e)); .map_err(|e| error!("Can not set alpn protocol: {:?}", e));
SslConnector::Openssl(ssl.build()) SslConnector::Openssl(ssl.build())
} }
// Build Ssl connector with rustls, based on supplied alpn protocols
#[cfg(all(not(feature = "openssl"), feature = "rustls"))] #[cfg(all(not(feature = "openssl"), feature = "rustls"))]
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector
{ {
let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let mut config = ClientConfig::new(); let mut config = ClientConfig::new();
config.set_protocols(&protos); config.set_protocols(&protocols);
config config
.root_store .root_store
.add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS); .add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS);
SslConnector::Rustls(Arc::new(config)) SslConnector::Rustls(Arc::new(config))
} }
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
{}
};
Connector { // ssl turned off, provides empty ssl connector
ssl, #[cfg(not(any(feature = "openssl", feature = "rustls")))]
connector: default_connector(), fn build_ssl(_: Vec<Vec<u8>>) -> SslConnector {}
timeout: Duration::from_secs(1),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Duration::from_millis(3000),
limit: 100,
_t: PhantomData,
}
}
} }
impl<T, U> Connector<T, U> { impl<T, U> Connector<T, U> {
@ -122,11 +126,7 @@ impl<T, U> Connector<T, U> {
{ {
Connector { Connector {
connector, connector,
timeout: self.timeout, config: self.config,
conn_lifetime: self.conn_lifetime,
conn_keep_alive: self.conn_keep_alive,
disconnect_timeout: self.disconnect_timeout,
limit: self.limit,
ssl: self.ssl, ssl: self.ssl,
_t: PhantomData, _t: PhantomData,
} }
@ -146,7 +146,7 @@ where
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default. /// Set to 1 second by default.
pub fn timeout(mut self, timeout: Duration) -> Self { pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout; self.config.timeout = timeout;
self self
} }
@ -163,12 +163,42 @@ where
self self
} }
/// Maximum supported http major version
/// Supported versions http/1.1, http/2
pub fn max_http_version(mut self, val: http::Version) -> Self {
let versions = match val {
http::Version::HTTP_11 => vec![b"http/1.1".to_vec()],
http::Version::HTTP_2 => vec![b"h2".to_vec(), b"http/1.1".to_vec()],
_ => unimplemented!("actix-http:client: supported versions http/1.1, http/2"),
};
self.ssl = Connector::build_ssl(versions);
self
}
/// Indicates the initial window size (in octets) for
/// HTTP2 stream-level flow control for received data.
///
/// The default value is 65,535 and is good for APIs, but not for big objects.
pub fn initial_window_size(mut self, size: u32) -> Self {
self.config.stream_window_size = size;
self
}
/// Indicates the initial window size (in octets) for
/// HTTP2 connection-level flow control for received data.
///
/// The default value is 65,535 and is good for APIs, but not for big objects.
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
self.config.conn_window_size = size;
self
}
/// Set total number of simultaneous connections per type of scheme. /// Set total number of simultaneous connections per type of scheme.
/// ///
/// If limit is 0, the connector has no limit. /// If limit is 0, the connector has no limit.
/// The default limit size is 100. /// The default limit size is 100.
pub fn limit(mut self, limit: usize) -> Self { pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit; self.config.limit = limit;
self self
} }
@ -179,7 +209,7 @@ where
/// exceeds this period, the connection is closed. /// exceeds this period, the connection is closed.
/// Default keep-alive period is 15 seconds. /// Default keep-alive period is 15 seconds.
pub fn conn_keep_alive(mut self, dur: Duration) -> Self { pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
self.conn_keep_alive = dur; self.config.conn_keep_alive = dur;
self self
} }
@ -189,7 +219,7 @@ where
/// until it is closed regardless of keep-alive period. /// until it is closed regardless of keep-alive period.
/// Default lifetime period is 75 seconds. /// Default lifetime period is 75 seconds.
pub fn conn_lifetime(mut self, dur: Duration) -> Self { pub fn conn_lifetime(mut self, dur: Duration) -> Self {
self.conn_lifetime = dur; self.config.conn_lifetime = dur;
self self
} }
@ -202,7 +232,7 @@ where
/// ///
/// By default disconnect timeout is set to 3000 milliseconds. /// By default disconnect timeout is set to 3000 milliseconds.
pub fn disconnect_timeout(mut self, dur: Duration) -> Self { pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
self.disconnect_timeout = dur; self.config.disconnect_timeout = Some(dur);
self self
} }
@ -216,7 +246,7 @@ where
#[cfg(not(any(feature = "openssl", feature = "rustls")))] #[cfg(not(any(feature = "openssl", feature = "rustls")))]
{ {
let connector = TimeoutService::new( let connector = TimeoutService::new(
self.timeout, self.config.timeout,
apply_fn(self.connector, |msg: Connect, srv| { apply_fn(self.connector, |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
}) })
@ -231,10 +261,7 @@ where
connect_impl::InnerConnector { connect_impl::InnerConnector {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
connector, connector,
self.conn_lifetime, self.config.no_disconnect_timeout(),
self.conn_keep_alive,
None,
self.limit,
), ),
} }
} }
@ -248,7 +275,7 @@ where
use actix_service::{boxed::service, pipeline}; use actix_service::{boxed::service, pipeline};
let ssl_service = TimeoutService::new( let ssl_service = TimeoutService::new(
self.timeout, self.config.timeout,
pipeline( pipeline(
apply_fn(self.connector.clone(), |msg: Connect, srv| { apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
@ -301,7 +328,7 @@ where
}); });
let tcp_service = TimeoutService::new( let tcp_service = TimeoutService::new(
self.timeout, self.config.timeout,
apply_fn(self.connector, |msg: Connect, srv| { apply_fn(self.connector, |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
}) })
@ -316,18 +343,9 @@ where
connect_impl::InnerConnector { connect_impl::InnerConnector {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
tcp_service, tcp_service,
self.conn_lifetime, self.config.no_disconnect_timeout(),
self.conn_keep_alive,
None,
self.limit,
),
ssl_pool: ConnectionPool::new(
ssl_service,
self.conn_lifetime,
self.conn_keep_alive,
Some(self.disconnect_timeout),
self.limit,
), ),
ssl_pool: ConnectionPool::new(ssl_service, self.config),
} }
} }
} }

View file

@ -1,11 +1,15 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::future::Future;
use std::time; use std::time;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::poll_fn; use futures_util::future::poll_fn;
use futures_util::pin_mut; use futures_util::pin_mut;
use h2::{client::SendRequest, SendStream}; use h2::{
client::{Builder, Connection, SendRequest},
SendStream,
};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, Method, Version}; use http::{request::Request, Method, Version};
@ -14,6 +18,7 @@ use crate::header::HeaderMap;
use crate::message::{RequestHeadType, ResponseHead}; use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use super::config::ConnectorConfig;
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
@ -185,3 +190,18 @@ fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
} }
} }
} }
pub(crate) fn handshake<Io>(
io: Io,
config: &ConnectorConfig,
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
let mut builder = Builder::new();
builder
.initial_window_size(config.stream_window_size)
.initial_connection_window_size(config.conn_window_size)
.enable_push(false);
builder.handshake(io)
}

View file

@ -1,6 +1,7 @@
//! Http client api //! Http client api
use http::Uri; use http::Uri;
mod config;
mod connection; mod connection;
mod connector; mod connector;
mod error; mod error;

View file

@ -13,14 +13,16 @@ use actix_utils::{oneshot, task::LocalWaker};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture}; use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture};
use fxhash::FxHashMap; use fxhash::FxHashMap;
use h2::client::{handshake, Connection, SendRequest}; use h2::client::{Connection, SendRequest};
use http::uri::Authority; use http::uri::Authority;
use indexmap::IndexSet; use indexmap::IndexSet;
use pin_project::pin_project; use pin_project::pin_project;
use slab::Slab; use slab::Slab;
use super::config::ConnectorConfig;
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError; use super::error::ConnectError;
use super::h2proto::handshake;
use super::Connect; use super::Connect;
#[derive(Clone, Copy, PartialEq)] #[derive(Clone, Copy, PartialEq)]
@ -50,20 +52,11 @@ where
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError> T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ 'static, + 'static,
{ {
pub(crate) fn new( pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self {
connector: T,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Option<Duration>,
limit: usize,
) -> Self {
ConnectionPool( ConnectionPool(
Rc::new(RefCell::new(connector)), Rc::new(RefCell::new(connector)),
Rc::new(RefCell::new(Inner { Rc::new(RefCell::new(Inner {
conn_lifetime, config,
conn_keep_alive,
disconnect_timeout,
limit,
acquired: 0, acquired: 0,
waiters: Slab::new(), waiters: Slab::new(),
waiters_queue: IndexSet::new(), waiters_queue: IndexSet::new(),
@ -129,6 +122,8 @@ where
// open tcp connection // open tcp connection
let (io, proto) = connector.call(req).await?; let (io, proto) = connector.call(req).await?;
let config = inner.borrow().config.clone();
let guard = OpenGuard::new(key, inner); let guard = OpenGuard::new(key, inner);
if proto == Protocol::Http1 { if proto == Protocol::Http1 {
@ -138,7 +133,7 @@ where
Some(guard.consume()), Some(guard.consume()),
)) ))
} else { } else {
let (snd, connection) = handshake(io).await?; let (snd, connection) = handshake(io, &config).await?;
actix_rt::spawn(connection.map(|_| ())); actix_rt::spawn(connection.map(|_| ()));
Ok(IoConnection::new( Ok(IoConnection::new(
ConnectionType::H2(snd), ConnectionType::H2(snd),
@ -255,10 +250,7 @@ struct AvailableConnection<Io> {
} }
pub(crate) struct Inner<Io> { pub(crate) struct Inner<Io> {
conn_lifetime: Duration, config: ConnectorConfig,
conn_keep_alive: Duration,
disconnect_timeout: Option<Duration>,
limit: usize,
acquired: usize, acquired: usize,
available: FxHashMap<Key, VecDeque<AvailableConnection<Io>>>, available: FxHashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab< waiters: Slab<
@ -311,7 +303,7 @@ where
fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire<Io> { fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire<Io> {
// check limits // check limits
if self.limit > 0 && self.acquired >= self.limit { if self.config.limit > 0 && self.acquired >= self.config.limit {
return Acquire::NotAvailable; return Acquire::NotAvailable;
} }
@ -323,10 +315,10 @@ where
let now = Instant::now(); let now = Instant::now();
while let Some(conn) = connections.pop_back() { while let Some(conn) = connections.pop_back() {
// check if it still usable // check if it still usable
if (now - conn.used) > self.conn_keep_alive if (now - conn.used) > self.config.conn_keep_alive
|| (now - conn.created) > self.conn_lifetime || (now - conn.created) > self.config.conn_lifetime
{ {
if let Some(timeout) = self.disconnect_timeout { if let Some(timeout) = self.config.disconnect_timeout {
if let ConnectionType::H1(io) = conn.io { if let ConnectionType::H1(io) = conn.io {
actix_rt::spawn(CloseConnection::new(io, timeout)) actix_rt::spawn(CloseConnection::new(io, timeout))
} }
@ -338,7 +330,7 @@ where
match Pin::new(s).poll_read(cx, &mut buf) { match Pin::new(s).poll_read(cx, &mut buf) {
Poll::Pending => (), Poll::Pending => (),
Poll::Ready(Ok(n)) if n > 0 => { Poll::Ready(Ok(n)) if n > 0 => {
if let Some(timeout) = self.disconnect_timeout { if let Some(timeout) = self.config.disconnect_timeout {
if let ConnectionType::H1(io) = io { if let ConnectionType::H1(io) = io {
actix_rt::spawn(CloseConnection::new( actix_rt::spawn(CloseConnection::new(
io, timeout, io, timeout,
@ -372,7 +364,7 @@ where
fn release_close(&mut self, io: ConnectionType<Io>) { fn release_close(&mut self, io: ConnectionType<Io>) {
self.acquired -= 1; self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout { if let Some(timeout) = self.config.disconnect_timeout {
if let ConnectionType::H1(io) = io { if let ConnectionType::H1(io) = io {
actix_rt::spawn(CloseConnection::new(io, timeout)) actix_rt::spawn(CloseConnection::new(io, timeout))
} }
@ -381,7 +373,7 @@ where
} }
fn check_availibility(&self) { fn check_availibility(&self) {
if !self.waiters_queue.is_empty() && self.acquired < self.limit { if !self.waiters_queue.is_empty() && self.acquired < self.config.limit {
self.waker.wake(); self.waker.wake();
} }
} }
@ -480,6 +472,7 @@ where
tx, tx,
this.inner.clone(), this.inner.clone(),
this.connector.call(connect), this.connector.call(connect),
inner.config.clone(),
); );
} }
} }
@ -506,6 +499,7 @@ where
>, >,
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>, rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
inner: Option<Rc<RefCell<Inner<Io>>>>, inner: Option<Rc<RefCell<Inner<Io>>>>,
config: ConnectorConfig,
} }
impl<F, Io> OpenWaitingConnection<F, Io> impl<F, Io> OpenWaitingConnection<F, Io>
@ -518,6 +512,7 @@ where
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>, rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
inner: Rc<RefCell<Inner<Io>>>, inner: Rc<RefCell<Inner<Io>>>,
fut: F, fut: F,
config: ConnectorConfig,
) { ) {
actix_rt::spawn(OpenWaitingConnection { actix_rt::spawn(OpenWaitingConnection {
key, key,
@ -525,6 +520,7 @@ where
h2: None, h2: None,
rx: Some(rx), rx: Some(rx),
inner: Some(inner), inner: Some(inner),
config,
}) })
} }
} }
@ -594,7 +590,7 @@ where
))); )));
Poll::Ready(()) Poll::Ready(())
} else { } else {
*this.h2 = Some(handshake(io).boxed_local()); *this.h2 = Some(handshake(io, this.config).boxed_local());
self.poll(cx) self.poll(cx)
} }
} }

View file

@ -1,5 +1,11 @@
# Changes # Changes
## [NEXT]
* ClientBuilder accepts initial_window_size and initial_connection_window_size HTTP2 configuration
* ClientBuilder allowing to set max_http_version to limit HTTP version to be used
## [1.0.1] - 2019-12-15 ## [1.0.1] - 2019-12-15
* Fix compilation with default features off * Fix compilation with default features off

View file

@ -4,11 +4,11 @@ use std::fmt;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use actix_http::client::{Connect, ConnectError, Connection, Connector}; use actix_http::client::{Connect as HttpConnect, ConnectError, Connection, Connector};
use actix_http::http::{header, Error as HttpError, HeaderMap, HeaderName}; use actix_http::http::{header, Error as HttpError, HeaderMap, HeaderName, self};
use actix_service::Service; use actix_service::Service;
use crate::connect::ConnectorWrapper; use crate::connect::{ConnectorWrapper, Connect};
use crate::{Client, ClientConfig}; use crate::{Client, ClientConfig};
/// An HTTP Client builder /// An HTTP Client builder
@ -16,10 +16,15 @@ use crate::{Client, ClientConfig};
/// This type can be used to construct an instance of `Client` through a /// This type can be used to construct an instance of `Client` through a
/// builder-like pattern. /// builder-like pattern.
pub struct ClientBuilder { pub struct ClientBuilder {
config: ClientConfig,
default_headers: bool, default_headers: bool,
allow_redirects: bool, allow_redirects: bool,
max_redirects: usize, max_redirects: usize,
max_http_version: Option<http::Version>,
stream_window_size: Option<u32>,
conn_window_size: Option<u32>,
headers: HeaderMap,
timeout: Option<Duration>,
connector: Option<RefCell<Box<dyn Connect>>>,
} }
impl Default for ClientBuilder { impl Default for ClientBuilder {
@ -34,25 +39,24 @@ impl ClientBuilder {
default_headers: true, default_headers: true,
allow_redirects: true, allow_redirects: true,
max_redirects: 10, max_redirects: 10,
config: ClientConfig {
headers: HeaderMap::new(), headers: HeaderMap::new(),
timeout: Some(Duration::from_secs(5)), timeout: Some(Duration::from_secs(5)),
connector: RefCell::new(Box::new(ConnectorWrapper( connector: None,
Connector::new().finish(), max_http_version: None,
))), stream_window_size: None,
}, conn_window_size: None,
} }
} }
/// Use custom connector service. /// Use custom connector service.
pub fn connector<T>(mut self, connector: T) -> Self pub fn connector<T>(mut self, connector: T) -> Self
where where
T: Service<Request = Connect, Error = ConnectError> + 'static, T: Service<Request = HttpConnect, Error = ConnectError> + 'static,
T::Response: Connection, T::Response: Connection,
<T::Response as Connection>::Future: 'static, <T::Response as Connection>::Future: 'static,
T::Future: 'static, T::Future: 'static,
{ {
self.config.connector = RefCell::new(Box::new(ConnectorWrapper(connector))); self.connector = Some(RefCell::new(Box::new(ConnectorWrapper(connector))));
self self
} }
@ -61,13 +65,13 @@ impl ClientBuilder {
/// Request timeout is the total time before a response must be received. /// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds. /// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self { pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout); self.timeout = Some(timeout);
self self
} }
/// Disable request timeout. /// Disable request timeout.
pub fn disable_timeout(mut self) -> Self { pub fn disable_timeout(mut self) -> Self {
self.config.timeout = None; self.timeout = None;
self self
} }
@ -79,6 +83,31 @@ impl ClientBuilder {
self self
} }
/// Maximum supported http major version
/// Supported versions http/1.1, http/2
pub fn max_http_version(mut self, val: http::Version) -> Self {
self.max_http_version = Some(val);
self
}
/// Indicates the initial window size (in octets) for
/// HTTP2 stream-level flow control for received data.
///
/// The default value is 65,535 and is good for APIs, but not for big objects.
pub fn initial_window_size(mut self, size: u32) -> Self {
self.stream_window_size = Some(size);
self
}
/// Indicates the initial window size (in octets) for
/// HTTP2 connection-level flow control for received data.
///
/// The default value is 65,535 and is good for APIs, but not for big objects.
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
self.conn_window_size = Some(size);
self
}
/// Set max number of redirects. /// Set max number of redirects.
/// ///
/// Max redirects is set to 10 by default. /// Max redirects is set to 10 by default.
@ -106,7 +135,7 @@ impl ClientBuilder {
match HeaderName::try_from(key) { match HeaderName::try_from(key) {
Ok(key) => match value.try_into() { Ok(key) => match value.try_into() {
Ok(value) => { Ok(value) => {
self.config.headers.append(key, value); self.headers.append(key, value);
} }
Err(e) => log::error!("Header value error: {:?}", e), Err(e) => log::error!("Header value error: {:?}", e),
}, },
@ -140,7 +169,27 @@ impl ClientBuilder {
/// Finish build process and create `Client` instance. /// Finish build process and create `Client` instance.
pub fn finish(self) -> Client { pub fn finish(self) -> Client {
Client(Rc::new(self.config)) let connector = if let Some(connector) = self.connector {
connector
} else {
let mut connector = Connector::new();
if let Some(val) = self.max_http_version {
connector = connector.max_http_version(val)
};
if let Some(val) = self.conn_window_size {
connector = connector.initial_connection_window_size(val)
};
if let Some(val) = self.stream_window_size {
connector = connector.initial_window_size(val)
};
RefCell::new(Box::new(ConnectorWrapper(connector.finish())) as Box<dyn Connect>)
};
let config = ClientConfig {
headers: self.headers,
timeout: self.timeout,
connector,
};
Client(Rc::new(config))
} }
} }
@ -153,7 +202,6 @@ mod tests {
let client = ClientBuilder::new().basic_auth("username", Some("password")); let client = ClientBuilder::new().basic_auth("username", Some("password"));
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()
@ -165,7 +213,6 @@ mod tests {
let client = ClientBuilder::new().basic_auth("username", None); let client = ClientBuilder::new().basic_auth("username", None);
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()
@ -180,7 +227,6 @@ mod tests {
let client = ClientBuilder::new().bearer_auth("someS3cr3tAutht0k3n"); let client = ClientBuilder::new().bearer_auth("someS3cr3tAutht0k3n");
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()

View file

@ -0,0 +1,61 @@
#![cfg(feature = "openssl")]
use actix_http::HttpService;
use actix_http_test::test_server;
use actix_service::{map_config, ServiceFactory};
use actix_web::http::Version;
use actix_web::{dev::AppConfig, web, App, HttpResponse};
use open_ssl::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode};
fn ssl_acceptor() -> SslAcceptor {
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder
.set_private_key_file("../tests/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("../tests/cert.pem")
.unwrap();
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(open_ssl::ssl::AlpnError::NOACK)
}
});
builder.set_alpn_protos(b"\x02h2").unwrap();
builder.build()
}
#[actix_rt::test]
async fn test_connection_window_size() {
let srv = test_server(move || {
HttpService::build()
.h2(map_config(
App::new().service(
web::resource("/").route(web::to(|| HttpResponse::Ok())),
),
|_| AppConfig::default(),
))
.openssl(ssl_acceptor())
.map_err(|_| ())
});
// disable ssl verification
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
let client = awc::Client::build()
.connector(awc::Connector::new().ssl(builder.build()).finish())
.initial_window_size(100)
.initial_connection_window_size(100)
.finish();
let request = client.get(srv.surl("/")).send();
let response = request.await.unwrap();
assert!(response.status().is_success());
assert_eq!(response.version(), Version::HTTP_2);
}