1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-02 13:29:24 +00:00

use actix-connect crate

This commit is contained in:
Nikolay Kim 2019-03-13 14:41:40 -07:00
parent f627d01055
commit 1941aa0217
16 changed files with 280 additions and 300 deletions

View file

@ -31,7 +31,7 @@ path = "src/lib.rs"
default = ["fail"]
# openssl
ssl = ["openssl", "actix-connector/ssl"]
ssl = ["openssl", "actix-connect/ssl"]
# failure integration. it is on by default, it will be off in future versions
# actix itself does not use failure anymore
@ -40,7 +40,8 @@ fail = ["failure"]
[dependencies]
actix-service = "0.3.4"
actix-codec = "0.1.1"
actix-connector = "0.3.0"
#actix-connector = "0.3.0"
actix-connect = { path="../actix-net/actix-connect" }
actix-utils = "0.3.4"
actix-server-config = "0.1.0"
@ -71,6 +72,7 @@ sha1 = "0.6"
slab = "0.4"
serde_urlencoded = "0.5.3"
time = "0.1"
tokio-tcp = "0.1.3"
tokio-timer = "0.2"
tokio-current-thread = "0.1"
trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false }

View file

@ -8,7 +8,7 @@ fn main() -> Result<(), Error> {
env_logger::init();
System::new("test").block_on(lazy(|| {
let mut connector = client::Connector::default().service();
let mut connector = client::Connector::new().service();
client::ClientRequest::get("https://www.rust-lang.org/") // <- Create request builder
.header("User-Agent", "Actix-web")

View file

@ -1,8 +1,7 @@
use actix_connector::{RequestHost, RequestPort};
use http::uri::Uri;
use http::{Error as HttpError, HttpTryFrom};
use http::HttpTryFrom;
use super::error::{ConnectorError, InvalidUrlKind};
use super::error::InvalidUrl;
use super::pool::Key;
#[derive(Debug)]
@ -19,7 +18,7 @@ impl Connect {
}
/// Construct `Uri` instance and create `Connect` message.
pub fn try_from<U>(uri: U) -> Result<Connect, HttpError>
pub fn try_from<U>(uri: U) -> Result<Connect, InvalidUrl>
where
Uri: HttpTryFrom<U>,
{
@ -40,30 +39,26 @@ impl Connect {
self.uri.authority_part().unwrap().clone().into()
}
pub(crate) fn validate(&self) -> Result<(), ConnectorError> {
pub(crate) fn validate(&self) -> Result<(), InvalidUrl> {
if self.uri.host().is_none() {
Err(ConnectorError::InvalidUrl(InvalidUrlKind::MissingHost))
Err(InvalidUrl::MissingHost)
} else if self.uri.scheme_part().is_none() {
Err(ConnectorError::InvalidUrl(InvalidUrlKind::MissingScheme))
Err(InvalidUrl::MissingScheme)
} else if let Some(scheme) = self.uri.scheme_part() {
match scheme.as_str() {
"http" | "ws" | "https" | "wss" => Ok(()),
_ => Err(ConnectorError::InvalidUrl(InvalidUrlKind::UnknownScheme)),
_ => Err(InvalidUrl::UnknownScheme),
}
} else {
Ok(())
}
}
}
impl RequestHost for Connect {
fn host(&self) -> &str {
pub(crate) fn host(&self) -> &str {
&self.uri.host().unwrap()
}
}
impl RequestPort for Connect {
fn port(&self) -> u16 {
pub(crate) fn port(&self) -> u16 {
if let Some(port) = self.uri.port() {
port
} else if let Some(scheme) = self.uri.scheme_part() {

View file

@ -1,14 +1,16 @@
use std::fmt;
use std::marker::PhantomData;
use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_connector::{Resolver, TcpConnector};
use actix_service::{Service, ServiceExt};
use actix_connect::{default_connector, Stream};
use actix_service::{apply_fn, Service, ServiceExt};
use actix_utils::timeout::{TimeoutError, TimeoutService};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use tokio_tcp::TcpStream;
use super::connect::Connect;
use super::connection::Connection;
use super::error::ConnectorError;
use super::error::ConnectError;
use super::pool::{ConnectionPool, Protocol};
#[cfg(feature = "ssl")]
@ -19,20 +21,28 @@ type SslConnector = ();
/// Http client connector builde instance.
/// `Connector` type uses builder-like pattern for connector service construction.
pub struct Connector {
resolver: Resolver<Connect>,
pub struct Connector<T, U> {
connector: T,
timeout: Duration,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Duration,
limit: usize,
#[allow(dead_code)]
connector: SslConnector,
ssl: SslConnector,
_t: PhantomData<U>,
}
impl Default for Connector {
fn default() -> Connector {
let connector = {
impl Connector<(), ()> {
pub fn new() -> Connector<
impl Service<
Request = actix_connect::Connect,
Response = Stream<TcpStream>,
Error = actix_connect::ConnectError,
> + Clone,
TcpStream,
> {
let ssl = {
#[cfg(feature = "ssl")]
{
use log::error;
@ -49,30 +59,51 @@ impl Default for Connector {
};
Connector {
connector,
resolver: Resolver::default(),
ssl,
connector: default_connector(),
timeout: Duration::from_secs(1),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
disconnect_timeout: Duration::from_millis(3000),
limit: 100,
_t: PhantomData,
}
}
}
impl Connector {
/// Use custom resolver.
pub fn resolver(mut self, resolver: Resolver<Connect>) -> Self {
self.resolver = resolver;;
self
}
/// Use custom resolver configuration.
pub fn resolver_config(mut self, cfg: ResolverConfig, opts: ResolverOpts) -> Self {
self.resolver = Resolver::new(cfg, opts);
self
impl<T, U> Connector<T, U> {
/// Use custom connector.
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1, U1>
where
U1: AsyncRead + AsyncWrite + fmt::Debug,
T1: Service<
Request = actix_connect::Connect,
Response = Stream<U1>,
Error = actix_connect::ConnectError,
> + Clone,
{
Connector {
connector,
timeout: self.timeout,
conn_lifetime: self.conn_lifetime,
conn_keep_alive: self.conn_keep_alive,
disconnect_timeout: self.disconnect_timeout,
limit: self.limit,
ssl: self.ssl,
_t: PhantomData,
}
}
}
impl<T, U> Connector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
T: Service<
Request = actix_connect::Connect,
Response = Stream<U>,
Error = actix_connect::ConnectError,
> + Clone,
{
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default.
pub fn timeout(mut self, timeout: Duration) -> Self {
@ -83,7 +114,7 @@ impl Connector {
#[cfg(feature = "ssl")]
/// Use custom `SslConnector` instance.
pub fn ssl(mut self, connector: SslConnector) -> Self {
self.connector = connector;
self.ssl = connector;
self
}
@ -133,24 +164,18 @@ impl Connector {
/// Finish configuration process and create connector service.
pub fn service(
self,
) -> impl Service<
Request = Connect,
Response = impl Connection,
Error = ConnectorError,
> + Clone {
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
#[cfg(not(feature = "ssl"))]
{
let connector = TimeoutService::new(
self.timeout,
self.resolver.map_err(ConnectorError::from).and_then(
TcpConnector::default()
.from_err()
.map(|(msg, io)| (msg, io, Protocol::Http1)),
),
self.connector
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectorError::Timeout,
TimeoutError::Timeout => ConnectError::Timeout,
});
connect_impl::InnerConnector {
@ -166,48 +191,49 @@ impl Connector {
#[cfg(feature = "ssl")]
{
const H2: &[u8] = b"h2";
use actix_connector::ssl::OpensslConnector;
use actix_connect::ssl::OpensslConnector;
let ssl_service = TimeoutService::new(
self.timeout,
self.resolver
.clone()
.map_err(ConnectorError::from)
.and_then(TcpConnector::default().from_err())
.and_then(
OpensslConnector::service(self.connector)
.map_err(ConnectorError::from)
.map(|(msg, io)| {
let h2 = io
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(msg, io, Protocol::Http2)
} else {
(msg, io, Protocol::Http1)
}
}),
),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectorError::Timeout,
});
let tcp_service = TimeoutService::new(
self.timeout,
self.resolver.map_err(ConnectorError::from).and_then(
TcpConnector::default()
.from_err()
.map(|(msg, io)| (msg, io, Protocol::Http1)),
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg.host(), msg.port()))
})
.map_err(ConnectError::from)
.and_then(
OpensslConnector::service(self.ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(sock, Protocol::Http2)
} else {
(sock, Protocol::Http1)
}
}),
),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectorError::Timeout,
TimeoutError::Timeout => ConnectError::Timeout,
});
let tcp_service = TimeoutService::new(
self.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg.host(), msg.port()))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectError::Timeout,
});
connect_impl::InnerConnector {
@ -253,11 +279,8 @@ mod connect_impl {
impl<T, Io> Clone for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io, Protocol),
Error = ConnectorError,
> + Clone,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
InnerConnector {
@ -269,11 +292,7 @@ mod connect_impl {
impl<T, Io> Service for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
{
type Request = Connect;
type Response = IoConnection<Io>;
@ -289,7 +308,7 @@ mod connect_impl {
fn call(&mut self, req: Connect) -> Self::Future {
if req.is_secure() {
Either::B(err(ConnectorError::SslIsNotSupported))
Either::B(err(ConnectError::SslIsNotSupported))
} else if let Err(e) = req.validate() {
Either::B(err(e))
} else {
@ -303,7 +322,7 @@ mod connect_impl {
mod connect_impl {
use std::marker::PhantomData;
use futures::future::{err, Either, FutureResult};
use futures::future::{Either, FutureResult};
use futures::{Async, Future, Poll};
use super::*;
@ -313,16 +332,8 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<
Request = Connect,
Response = (Connect, Io1, Protocol),
Error = ConnectorError,
>,
T2: Service<
Request = Connect,
Response = (Connect, Io2, Protocol),
Error = ConnectorError,
>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
@ -332,16 +343,10 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<
Request = Connect,
Response = (Connect, Io1, Protocol),
Error = ConnectorError,
> + Clone,
T2: Service<
Request = Connect,
Response = (Connect, Io2, Protocol),
Error = ConnectorError,
> + Clone,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
InnerConnector {
@ -355,20 +360,12 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<
Request = Connect,
Response = (Connect, Io1, Protocol),
Error = ConnectorError,
>,
T2: Service<
Request = Connect,
Response = (Connect, Io2, Protocol),
Error = ConnectorError,
>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
type Request = Connect;
type Response = EitherConnection<Io1, Io2>;
type Error = ConnectorError;
type Error = ConnectError;
type Future = Either<
FutureResult<Self::Response, Self::Error>,
Either<
@ -382,9 +379,7 @@ mod connect_impl {
}
fn call(&mut self, req: Connect) -> Self::Future {
if let Err(e) = req.validate() {
Either::A(err(e))
} else if req.is_secure() {
if req.is_secure() {
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
_t: PhantomData,
@ -401,11 +396,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
where
Io1: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io1, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io1> as Service>::Future,
_t: PhantomData<Io2>,
@ -413,16 +404,12 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<
Request = Connect,
Response = (Connect, Io1, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
type Item = EitherConnection<Io1, Io2>;
type Error = ConnectorError;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? {
@ -435,11 +422,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io2, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io2> as Service>::Future,
_t: PhantomData<Io1>,
@ -447,16 +430,12 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<
Request = Connect,
Response = (Connect, Io2, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
type Item = EitherConnection<Io1, Io2>;
type Error = ConnectorError;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? {

View file

@ -11,11 +11,7 @@ use crate::response::Response;
/// A set of errors that can occur while connecting to an HTTP host
#[derive(Debug, Display, From)]
pub enum ConnectorError {
/// Invalid URL
#[display(fmt = "Invalid URL")]
InvalidUrl(InvalidUrlKind),
pub enum ConnectError {
/// SSL feature is not enabled
#[display(fmt = "SSL is not supported")]
SslIsNotSupported,
@ -45,24 +41,30 @@ pub enum ConnectorError {
#[display(fmt = "Internal error: connector has been disconnected")]
Disconnected,
/// Unresolved host name
#[display(fmt = "Connector received `Connect` method with unresolved host")]
Unresolverd,
/// Connection io error
#[display(fmt = "{}", _0)]
Io(io::Error),
}
#[derive(Debug, Display)]
pub enum InvalidUrlKind {
#[display(fmt = "Missing url scheme")]
MissingScheme,
#[display(fmt = "Unknown url scheme")]
UnknownScheme,
#[display(fmt = "Missing host name")]
MissingHost,
impl From<actix_connect::ConnectError> for ConnectError {
fn from(err: actix_connect::ConnectError) -> ConnectError {
match err {
actix_connect::ConnectError::Resolver(e) => ConnectError::Resolver(e),
actix_connect::ConnectError::NoRecords => ConnectError::NoRecords,
actix_connect::ConnectError::InvalidInput => panic!(),
actix_connect::ConnectError::Unresolverd => ConnectError::Unresolverd,
actix_connect::ConnectError::Io(e) => ConnectError::Io(e),
}
}
}
#[cfg(feature = "ssl")]
impl<T> From<HandshakeError<T>> for ConnectorError {
fn from(err: HandshakeError<T>) -> ConnectorError {
impl<T> From<HandshakeError<T>> for ConnectError {
fn from(err: HandshakeError<T>) -> ConnectError {
match err {
HandshakeError::SetupFailure(stack) => SslError::from(stack).into(),
HandshakeError::Failure(stream) => stream.into_error().into(),
@ -71,12 +73,27 @@ impl<T> From<HandshakeError<T>> for ConnectorError {
}
}
#[derive(Debug, Display, From)]
pub enum InvalidUrl {
#[display(fmt = "Missing url scheme")]
MissingScheme,
#[display(fmt = "Unknown url scheme")]
UnknownScheme,
#[display(fmt = "Missing host name")]
MissingHost,
#[display(fmt = "Url parse error: {}", _0)]
HttpError(http::Error),
}
/// A set of errors that can occur during request sending and response reading
#[derive(Debug, Display, From)]
pub enum SendRequestError {
/// Invalid URL
#[display(fmt = "Invalid URL: {}", _0)]
Url(InvalidUrl),
/// Failed to connect to host
#[display(fmt = "Failed to connect to host: {}", _0)]
Connector(ConnectorError),
Connect(ConnectError),
/// Error sending request
Send(io::Error),
/// Error parsing response
@ -92,10 +109,10 @@ pub enum SendRequestError {
impl ResponseError for SendRequestError {
fn error_response(&self) -> Response {
match *self {
SendRequestError::Connector(ConnectorError::Timeout) => {
SendRequestError::Connect(ConnectError::Timeout) => {
Response::GatewayTimeout()
}
SendRequestError::Connector(_) => Response::BadGateway(),
SendRequestError::Connect(_) => Response::BadGateway(),
_ => Response::InternalServerError(),
}
.into()

View file

@ -6,7 +6,7 @@ use futures::future::{err, ok, Either};
use futures::{Async, Future, Poll, Sink, Stream};
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
use super::error::{ConnectorError, SendRequestError};
use super::error::{ConnectError, SendRequestError};
use super::pool::Acquired;
use super::response::ClientResponse;
use crate::body::{BodyLength, MessageBody};
@ -62,7 +62,7 @@ where
}
ok(res)
} else {
err(ConnectorError::Disconnected.into())
err(ConnectError::Disconnected.into())
}
})
})

View file

@ -12,6 +12,6 @@ mod response;
pub use self::connect::Connect;
pub use self::connection::Connection;
pub use self::connector::Connector;
pub use self::error::{ConnectorError, InvalidUrlKind, SendRequestError};
pub use self::error::{ConnectError, InvalidUrl, SendRequestError};
pub use self::request::{ClientRequest, ClientRequestBuilder};
pub use self::response::ClientResponse;

View file

@ -20,7 +20,7 @@ use tokio_timer::{sleep, Delay};
use super::connect::Connect;
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectorError;
use super::error::ConnectError;
#[derive(Clone, Copy, PartialEq)]
pub enum Protocol {
@ -48,11 +48,7 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
pub(crate) fn new(
connector: T,
@ -91,17 +87,13 @@ where
impl<T, Io> Service for ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<
Request = Connect,
Response = (Connect, Io, Protocol),
Error = ConnectorError,
>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
type Request = Connect;
type Response = IoConnection<Io>;
type Error = ConnectorError;
type Error = ConnectError;
type Future = Either<
FutureResult<IoConnection<Io>, ConnectorError>,
FutureResult<Self::Response, Self::Error>,
Either<WaitForConnection<Io>, OpenConnection<T::Future, Io>>,
>;
@ -151,7 +143,7 @@ where
{
key: Key,
token: usize,
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
@ -173,7 +165,7 @@ where
Io: AsyncRead + AsyncWrite,
{
type Item = IoConnection<Io>;
type Error = ConnectorError;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
@ -187,7 +179,7 @@ where
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => {
let _ = self.inner.take();
Err(ConnectorError::Disconnected)
Err(ConnectError::Disconnected)
}
}
}
@ -206,7 +198,7 @@ where
impl<F, Io> OpenConnection<F, Io>
where
F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
F: Future<Item = (Io, Protocol), Error = ConnectError>,
Io: AsyncRead + AsyncWrite + 'static,
{
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>, fut: F) -> Self {
@ -234,11 +226,11 @@ where
impl<F, Io> Future for OpenConnection<F, Io>
where
F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
F: Future<Item = (Io, Protocol), Error = ConnectError>,
Io: AsyncRead + AsyncWrite,
{
type Item = IoConnection<Io>;
type Error = ConnectorError;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut h2) = self.h2 {
@ -258,7 +250,7 @@ where
match self.fut.poll() {
Err(err) => Err(err),
Ok(Async::Ready((_, io, proto))) => {
Ok(Async::Ready((io, proto))) => {
let _ = self.inner.take();
if proto == Protocol::Http1 {
Ok(Async::Ready(IoConnection::new(
@ -289,7 +281,7 @@ where
// impl<F, Io> OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError> + 'static,
// F: Future<Item = (Io, Protocol), Error = ConnectorError> + 'static,
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fn spawn(
@ -323,7 +315,7 @@ where
// impl<F, Io> Future for OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
// F: Future<Item = (Io, Protocol), Error = ConnectorError>,
// Io: AsyncRead + AsyncWrite,
// {
// type Item = ();
@ -402,7 +394,7 @@ pub(crate) struct Inner<Io> {
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab<(
Connect,
oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
@ -444,7 +436,7 @@ where
&mut self,
connect: Connect,
) -> (
oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
usize,
) {
let (tx, rx) = oneshot::channel();
@ -534,7 +526,7 @@ where
// impl<T, Io> Future for ConnectorPoolSupport<T, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// T: Service<Connect, Response = (Connect, Io, Protocol), Error = ConnectorError>,
// T: Service<Connect, Response = (Io, Protocol), Error = ConnectorError>,
// T::Future: 'static,
// {
// type Item = ();

View file

@ -5,6 +5,7 @@ use std::io::Write;
use actix_service::Service;
use bytes::{BufMut, Bytes, BytesMut};
use cookie::{Cookie, CookieJar};
use futures::future::{err, Either};
use futures::{Future, Stream};
use percent_encoding::{percent_encode, USERINFO_ENCODE_SET};
use serde::Serialize;
@ -21,7 +22,7 @@ use crate::message::{ConnectionType, Head, RequestHead};
use super::connection::Connection;
use super::response::ClientResponse;
use super::{Connect, ConnectorError, SendRequestError};
use super::{Connect, ConnectError, SendRequestError};
/// An HTTP Client Request
///
@ -32,7 +33,8 @@ use super::{Connect, ConnectorError, SendRequestError};
///
/// fn main() {
/// System::new("test").block_on(lazy(|| {
/// let mut connector = client::Connector::default().service();
/// let mut connector = client::Connector::new().service();
///
/// client::ClientRequest::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web")
/// .finish().unwrap()
@ -178,17 +180,24 @@ where
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
B: 'static,
T: Service<Request = Connect, Response = I, Error = ConnectorError>,
T: Service<Request = Connect, Response = I, Error = ConnectError>,
I: Connection,
{
let Self { head, body } = self;
connector
// connect to the host
.call(Connect::new(head.uri.clone()))
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body))
let connect = Connect::new(head.uri.clone());
if let Err(e) = connect.validate() {
Either::A(err(e.into()))
} else {
Either::B(
connector
// connect to the host
.call(connect)
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body)),
)
}
}
}

View file

@ -1,7 +1,7 @@
//! Http client request
use std::io;
use actix_connector::ConnectorError;
use actix_connect::ConnectError;
use derive_more::{Display, From};
use http::{header::HeaderValue, Error as HttpError, StatusCode};
@ -43,7 +43,7 @@ pub enum ClientError {
Protocol(ProtocolError),
/// Connect error
#[display(fmt = "Connector error: {:?}", _0)]
Connect(ConnectorError),
Connect(ConnectError),
/// IO Error
#[display(fmt = "{}", _0)]
Io(io::Error),

View file

@ -4,7 +4,7 @@ mod service;
pub use self::connect::Connect;
pub use self::error::ClientError;
pub use self::service::{Client, DefaultClient};
pub use self::service::Client;
#[derive(PartialEq, Hash, Debug, Clone, Copy)]
pub(crate) enum Protocol {

View file

@ -2,8 +2,8 @@
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_connector::{Connect as TcpConnect, ConnectorError, DefaultConnector};
use actix_service::Service;
use actix_connect::{default_connector, Connect as TcpConnect, ConnectError};
use actix_service::{apply_fn, Service};
use base64;
use futures::future::{err, Either, FutureResult};
use futures::{try_ready, Async, Future, Poll, Sink, Stream};
@ -20,21 +20,29 @@ use crate::ws::Codec;
use super::{ClientError, Connect, Protocol};
/// Default client, uses default connector.
pub type DefaultClient = Client<DefaultConnector>;
/// WebSocket's client
pub struct Client<T>
where
T: Service<Request = TcpConnect, Error = ConnectorError>,
T::Response: AsyncRead + AsyncWrite,
{
pub struct Client<T> {
connector: T,
}
impl Client<()> {
/// Create client with default connector.
pub fn default() -> Client<
impl Service<
Request = TcpConnect,
Response = impl AsyncRead + AsyncWrite,
Error = ConnectError,
> + Clone,
> {
Client::new(apply_fn(default_connector(), |msg: TcpConnect, srv| {
srv.call(msg).map(|stream| stream.into_parts().0)
}))
}
}
impl<T> Client<T>
where
T: Service<Request = TcpConnect, Error = ConnectorError>,
T: Service<Request = TcpConnect, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite,
{
/// Create new websocket's client factory
@ -43,15 +51,9 @@ where
}
}
impl Default for Client<DefaultConnector> {
fn default() -> Self {
Client::new(DefaultConnector::default())
}
}
impl<T> Clone for Client<T>
where
T: Service<Request = TcpConnect, Error = ConnectorError> + Clone,
T: Service<Request = TcpConnect, Error = ConnectError> + Clone,
T::Response: AsyncRead + AsyncWrite,
{
fn clone(&self) -> Self {
@ -63,7 +65,7 @@ where
impl<T> Service for Client<T>
where
T: Service<Request = TcpConnect, Error = ConnectorError>,
T: Service<Request = TcpConnect, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite + 'static,
T::Future: 'static,
{

View file

@ -21,7 +21,7 @@ mod proto;
mod service;
mod transport;
pub use self::client::{Client, ClientError, Connect, DefaultClient};
pub use self::client::{Client, ClientError, Connect};
pub use self::codec::{Codec, Frame, Message};
pub use self::frame::Parser;
pub use self::proto::{CloseCode, CloseReason, OpCode};

View file

@ -1,18 +1,17 @@
//! Various helpers for Actix applications to use during testing.
use std::sync::mpsc;
use std::{net, thread};
use std::{net, thread, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::body::MessageBody;
use actix_http::client::{
ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connection, Connector,
ConnectorError, SendRequestError,
ClientRequest, ClientRequestBuilder, ClientResponse, Connect, ConnectError,
Connection, Connector, SendRequestError,
};
use actix_http::ws;
use actix_rt::{Runtime, System};
use actix_server::{Server, StreamServiceFactory};
use actix_service::Service;
use futures::future::{lazy, Future};
use http::Method;
use net2::TcpBuilder;
@ -44,21 +43,15 @@ use net2::TcpBuilder;
/// ```
pub struct TestServer;
///
pub struct TestServerRuntime<T> {
/// Test server controller
pub struct TestServerRuntime {
addr: net::SocketAddr,
conn: T,
rt: Runtime,
}
impl TestServer {
/// Start new test server with application factory
pub fn new<F: StreamServiceFactory>(
factory: F,
) -> TestServerRuntime<
impl Service<Request = Connect, Response = impl Connection, Error = ConnectorError>
+ Clone,
> {
pub fn new<F: StreamServiceFactory>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel();
// run server in separate thread
@ -79,35 +72,9 @@ impl TestServer {
let (system, addr) = rx.recv().unwrap();
System::set_current(system);
let mut rt = Runtime::new().unwrap();
let conn = rt
.block_on(lazy(|| Ok::<_, ()>(TestServer::new_connector())))
.unwrap();
TestServerRuntime { addr, conn, rt }
}
fn new_connector(
) -> impl Service<
Request = Connect,
Response = impl Connection,
Error = ConnectorError,
> + Clone {
#[cfg(feature = "ssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
Connector::default().ssl(builder.build()).service()
}
#[cfg(not(feature = "ssl"))]
{
Connector::default().service()
TestServerRuntime {
addr,
rt: Runtime::new().unwrap(),
}
}
@ -122,7 +89,7 @@ impl TestServer {
}
}
impl<T> TestServerRuntime<T> {
impl TestServerRuntime {
/// Execute future on current core
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
where
@ -131,12 +98,12 @@ impl<T> TestServerRuntime<T> {
self.rt.block_on(fut)
}
/// Execute future on current core
pub fn execute<F, I, E>(&mut self, fut: F) -> Result<I, E>
/// Execute function on current core
pub fn execute<F, R>(&mut self, fut: F) -> R
where
F: Future<Item = I, Error = E>,
F: FnOnce() -> R,
{
self.rt.block_on(fut)
self.rt.block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap()
}
/// Construct test server url
@ -190,17 +157,37 @@ impl<T> TestServerRuntime<T> {
.take()
}
/// Http connector
pub fn connector(&mut self) -> &mut T {
&mut self.conn
fn new_connector(
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
#[cfg(feature = "ssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
Connector::new()
.timeout(time::Duration::from_millis(500))
.ssl(builder.build())
.service()
}
#[cfg(not(feature = "ssl"))]
{
Connector::new()
.timeout(time::Duration::from_millis(500))
.service()
}
}
/// Http connector
pub fn new_connector(&mut self) -> T
where
T: Clone,
{
self.conn.clone()
pub fn connector(
&mut self,
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
self.execute(|| TestServerRuntime::new_connector())
}
/// Stop http server
@ -209,11 +196,7 @@ impl<T> TestServerRuntime<T> {
}
}
impl<T> TestServerRuntime<T>
where
T: Service<Request = Connect, Error = ConnectorError> + Clone,
T::Response: Connection,
{
impl TestServerRuntime {
/// Connect to websocket server at a given path
pub fn ws_at(
&mut self,
@ -236,11 +219,12 @@ where
&mut self,
req: ClientRequest<B>,
) -> Result<ClientResponse, SendRequestError> {
self.rt.block_on(req.send(&mut self.conn))
let mut conn = self.connector();
self.rt.block_on(req.send(&mut conn))
}
}
impl<T> Drop for TestServerRuntime<T> {
impl Drop for TestServerRuntime {
fn drop(&mut self) {
self.stop()
}

View file

@ -36,7 +36,7 @@ fn test_h1_v2() {
.finish(|_| future::ok::<_, ()>(Response::Ok().body(STR)))
.map(|_| ())
});
let mut connector = srv.new_connector();
let mut connector = srv.connector();
let request = srv.get().finish().unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
@ -70,7 +70,7 @@ fn test_connection_close() {
.finish(|_| ok::<_, ()>(Response::Ok().body(STR)))
.map(|_| ())
});
let mut connector = srv.new_connector();
let mut connector = srv.connector();
let request = srv.get().close().finish().unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
@ -90,7 +90,7 @@ fn test_with_query_parameter() {
})
.map(|_| ())
});
let mut connector = srv.new_connector();
let mut connector = srv.connector();
let request = client::ClientRequest::get(srv.url("/?qp=5"))
.finish()

View file

@ -432,7 +432,7 @@ fn test_h1_headers() {
future::ok::<_, ()>(builder.body(data.clone()))
})
});
let mut connector = srv.new_connector();
let mut connector = srv.connector();
let req = srv.get().finish().unwrap();
@ -479,7 +479,7 @@ fn test_h2_headers() {
future::ok::<_, ()>(builder.body(data.clone()))
}).map_err(|_| ()))
});
let mut connector = srv.new_connector();
let mut connector = srv.connector();
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap();
let mut response = srv.block_on(req.send(&mut connector)).unwrap();