diff --git a/Cargo.toml b/Cargo.toml index 8e0f3a68a..ee9d28ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ path = "src/lib.rs" default = ["fail"] # openssl -ssl = ["openssl", "actix-connector/ssl"] +ssl = ["openssl", "actix-connect/ssl"] # failure integration. it is on by default, it will be off in future versions # actix itself does not use failure anymore @@ -40,7 +40,8 @@ fail = ["failure"] [dependencies] actix-service = "0.3.4" actix-codec = "0.1.1" -actix-connector = "0.3.0" +#actix-connector = "0.3.0" +actix-connect = { path="../actix-net/actix-connect" } actix-utils = "0.3.4" actix-server-config = "0.1.0" @@ -71,6 +72,7 @@ sha1 = "0.6" slab = "0.4" serde_urlencoded = "0.5.3" time = "0.1" +tokio-tcp = "0.1.3" tokio-timer = "0.2" tokio-current-thread = "0.1" trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false } diff --git a/examples/client.rs b/examples/client.rs index 06b708e20..7f5f8c91a 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -8,7 +8,7 @@ fn main() -> Result<(), Error> { env_logger::init(); System::new("test").block_on(lazy(|| { - let mut connector = client::Connector::default().service(); + let mut connector = client::Connector::new().service(); client::ClientRequest::get("https://www.rust-lang.org/") // <- Create request builder .header("User-Agent", "Actix-web") diff --git a/src/client/connect.rs b/src/client/connect.rs index f4112cfaa..43be57703 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,8 +1,7 @@ -use actix_connector::{RequestHost, RequestPort}; use http::uri::Uri; -use http::{Error as HttpError, HttpTryFrom}; +use http::HttpTryFrom; -use super::error::{ConnectorError, InvalidUrlKind}; +use super::error::InvalidUrl; use super::pool::Key; #[derive(Debug)] @@ -19,7 +18,7 @@ impl Connect { } /// Construct `Uri` instance and create `Connect` message. - pub fn try_from(uri: U) -> Result + pub fn try_from(uri: U) -> Result where Uri: HttpTryFrom, { @@ -40,30 +39,26 @@ impl Connect { self.uri.authority_part().unwrap().clone().into() } - pub(crate) fn validate(&self) -> Result<(), ConnectorError> { + pub(crate) fn validate(&self) -> Result<(), InvalidUrl> { if self.uri.host().is_none() { - Err(ConnectorError::InvalidUrl(InvalidUrlKind::MissingHost)) + Err(InvalidUrl::MissingHost) } else if self.uri.scheme_part().is_none() { - Err(ConnectorError::InvalidUrl(InvalidUrlKind::MissingScheme)) + Err(InvalidUrl::MissingScheme) } else if let Some(scheme) = self.uri.scheme_part() { match scheme.as_str() { "http" | "ws" | "https" | "wss" => Ok(()), - _ => Err(ConnectorError::InvalidUrl(InvalidUrlKind::UnknownScheme)), + _ => Err(InvalidUrl::UnknownScheme), } } else { Ok(()) } } -} -impl RequestHost for Connect { - fn host(&self) -> &str { + pub(crate) fn host(&self) -> &str { &self.uri.host().unwrap() } -} -impl RequestPort for Connect { - fn port(&self) -> u16 { + pub(crate) fn port(&self) -> u16 { if let Some(port) = self.uri.port() { port } else if let Some(scheme) = self.uri.scheme_part() { diff --git a/src/client/connector.rs b/src/client/connector.rs index ccb5dbce5..1579cd5ef 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,14 +1,16 @@ +use std::fmt; +use std::marker::PhantomData; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_connector::{Resolver, TcpConnector}; -use actix_service::{Service, ServiceExt}; +use actix_connect::{default_connector, Stream}; +use actix_service::{apply_fn, Service, ServiceExt}; use actix_utils::timeout::{TimeoutError, TimeoutService}; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use tokio_tcp::TcpStream; use super::connect::Connect; use super::connection::Connection; -use super::error::ConnectorError; +use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; #[cfg(feature = "ssl")] @@ -19,20 +21,28 @@ type SslConnector = (); /// Http client connector builde instance. /// `Connector` type uses builder-like pattern for connector service construction. -pub struct Connector { - resolver: Resolver, +pub struct Connector { + connector: T, timeout: Duration, conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Duration, limit: usize, #[allow(dead_code)] - connector: SslConnector, + ssl: SslConnector, + _t: PhantomData, } -impl Default for Connector { - fn default() -> Connector { - let connector = { +impl Connector<(), ()> { + pub fn new() -> Connector< + impl Service< + Request = actix_connect::Connect, + Response = Stream, + Error = actix_connect::ConnectError, + > + Clone, + TcpStream, + > { + let ssl = { #[cfg(feature = "ssl")] { use log::error; @@ -49,30 +59,51 @@ impl Default for Connector { }; Connector { - connector, - resolver: Resolver::default(), + ssl, + connector: default_connector(), timeout: Duration::from_secs(1), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), disconnect_timeout: Duration::from_millis(3000), limit: 100, + _t: PhantomData, } } } -impl Connector { - /// Use custom resolver. - pub fn resolver(mut self, resolver: Resolver) -> Self { - self.resolver = resolver;; - self - } - - /// Use custom resolver configuration. - pub fn resolver_config(mut self, cfg: ResolverConfig, opts: ResolverOpts) -> Self { - self.resolver = Resolver::new(cfg, opts); - self +impl Connector { + /// Use custom connector. + pub fn connector(self, connector: T1) -> Connector + where + U1: AsyncRead + AsyncWrite + fmt::Debug, + T1: Service< + Request = actix_connect::Connect, + Response = Stream, + Error = actix_connect::ConnectError, + > + Clone, + { + Connector { + connector, + timeout: self.timeout, + conn_lifetime: self.conn_lifetime, + conn_keep_alive: self.conn_keep_alive, + disconnect_timeout: self.disconnect_timeout, + limit: self.limit, + ssl: self.ssl, + _t: PhantomData, + } } +} +impl Connector +where + U: AsyncRead + AsyncWrite + fmt::Debug + 'static, + T: Service< + Request = actix_connect::Connect, + Response = Stream, + Error = actix_connect::ConnectError, + > + Clone, +{ /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. pub fn timeout(mut self, timeout: Duration) -> Self { @@ -83,7 +114,7 @@ impl Connector { #[cfg(feature = "ssl")] /// Use custom `SslConnector` instance. pub fn ssl(mut self, connector: SslConnector) -> Self { - self.connector = connector; + self.ssl = connector; self } @@ -133,24 +164,18 @@ impl Connector { /// Finish configuration process and create connector service. pub fn service( self, - ) -> impl Service< - Request = Connect, - Response = impl Connection, - Error = ConnectorError, - > + Clone { + ) -> impl Service + + Clone { #[cfg(not(feature = "ssl"))] { let connector = TimeoutService::new( self.timeout, - self.resolver.map_err(ConnectorError::from).and_then( - TcpConnector::default() - .from_err() - .map(|(msg, io)| (msg, io, Protocol::Http1)), - ), + self.connector + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectorError::Timeout, + TimeoutError::Timeout => ConnectError::Timeout, }); connect_impl::InnerConnector { @@ -166,48 +191,49 @@ impl Connector { #[cfg(feature = "ssl")] { const H2: &[u8] = b"h2"; - use actix_connector::ssl::OpensslConnector; + use actix_connect::ssl::OpensslConnector; let ssl_service = TimeoutService::new( self.timeout, - self.resolver - .clone() - .map_err(ConnectorError::from) - .and_then(TcpConnector::default().from_err()) - .and_then( - OpensslConnector::service(self.connector) - .map_err(ConnectorError::from) - .map(|(msg, io)| { - let h2 = io - .get_ref() - .ssl() - .selected_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (msg, io, Protocol::Http2) - } else { - (msg, io, Protocol::Http1) - } - }), - ), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectorError::Timeout, - }); - - let tcp_service = TimeoutService::new( - self.timeout, - self.resolver.map_err(ConnectorError::from).and_then( - TcpConnector::default() - .from_err() - .map(|(msg, io)| (msg, io, Protocol::Http1)), + apply_fn(self.connector.clone(), |msg: Connect, srv| { + srv.call(actix_connect::Connect::new(msg.host(), msg.port())) + }) + .map_err(ConnectError::from) + .and_then( + OpensslConnector::service(self.ssl) + .map_err(ConnectError::from) + .map(|stream| { + let sock = stream.into_parts().0; + let h2 = sock + .get_ref() + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (sock, Protocol::Http2) + } else { + (sock, Protocol::Http1) + } + }), ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectorError::Timeout, + TimeoutError::Timeout => ConnectError::Timeout, + }); + + let tcp_service = TimeoutService::new( + self.timeout, + apply_fn(self.connector.clone(), |msg: Connect, srv| { + srv.call(actix_connect::Connect::new(msg.host(), msg.port())) + }) + .map_err(ConnectError::from) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), + ) + .map_err(|e| match e { + TimeoutError::Service(e) => e, + TimeoutError::Timeout => ConnectError::Timeout, }); connect_impl::InnerConnector { @@ -253,11 +279,8 @@ mod connect_impl { impl Clone for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io, Protocol), - Error = ConnectorError, - > + Clone, + T: Service + + Clone, { fn clone(&self) -> Self { InnerConnector { @@ -269,11 +292,7 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io, Protocol), - Error = ConnectorError, - >, + T: Service, { type Request = Connect; type Response = IoConnection; @@ -289,7 +308,7 @@ mod connect_impl { fn call(&mut self, req: Connect) -> Self::Future { if req.is_secure() { - Either::B(err(ConnectorError::SslIsNotSupported)) + Either::B(err(ConnectError::SslIsNotSupported)) } else if let Err(e) = req.validate() { Either::B(err(e)) } else { @@ -303,7 +322,7 @@ mod connect_impl { mod connect_impl { use std::marker::PhantomData; - use futures::future::{err, Either, FutureResult}; + use futures::future::{Either, FutureResult}; use futures::{Async, Future, Poll}; use super::*; @@ -313,16 +332,8 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service< - Request = Connect, - Response = (Connect, Io1, Protocol), - Error = ConnectorError, - >, - T2: Service< - Request = Connect, - Response = (Connect, Io2, Protocol), - Error = ConnectorError, - >, + T1: Service, + T2: Service, { pub(crate) tcp_pool: ConnectionPool, pub(crate) ssl_pool: ConnectionPool, @@ -332,16 +343,10 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service< - Request = Connect, - Response = (Connect, Io1, Protocol), - Error = ConnectorError, - > + Clone, - T2: Service< - Request = Connect, - Response = (Connect, Io2, Protocol), - Error = ConnectorError, - > + Clone, + T1: Service + + Clone, + T2: Service + + Clone, { fn clone(&self) -> Self { InnerConnector { @@ -355,20 +360,12 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service< - Request = Connect, - Response = (Connect, Io1, Protocol), - Error = ConnectorError, - >, - T2: Service< - Request = Connect, - Response = (Connect, Io2, Protocol), - Error = ConnectorError, - >, + T1: Service, + T2: Service, { type Request = Connect; type Response = EitherConnection; - type Error = ConnectorError; + type Error = ConnectError; type Future = Either< FutureResult, Either< @@ -382,9 +379,7 @@ mod connect_impl { } fn call(&mut self, req: Connect) -> Self::Future { - if let Err(e) = req.validate() { - Either::A(err(e)) - } else if req.is_secure() { + if req.is_secure() { Either::B(Either::B(InnerConnectorResponseB { fut: self.ssl_pool.call(req), _t: PhantomData, @@ -401,11 +396,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io1, Protocol), - Error = ConnectorError, - >, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -413,16 +404,12 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service< - Request = Connect, - Response = (Connect, Io1, Protocol), - Error = ConnectorError, - >, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { type Item = EitherConnection; - type Error = ConnectorError; + type Error = ConnectError; fn poll(&mut self) -> Poll { match self.fut.poll()? { @@ -435,11 +422,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io2, Protocol), - Error = ConnectorError, - >, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -447,16 +430,12 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service< - Request = Connect, - Response = (Connect, Io2, Protocol), - Error = ConnectorError, - >, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { type Item = EitherConnection; - type Error = ConnectorError; + type Error = ConnectError; fn poll(&mut self) -> Poll { match self.fut.poll()? { diff --git a/src/client/error.rs b/src/client/error.rs index 6c91ff976..4fce904f1 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -11,11 +11,7 @@ use crate::response::Response; /// A set of errors that can occur while connecting to an HTTP host #[derive(Debug, Display, From)] -pub enum ConnectorError { - /// Invalid URL - #[display(fmt = "Invalid URL")] - InvalidUrl(InvalidUrlKind), - +pub enum ConnectError { /// SSL feature is not enabled #[display(fmt = "SSL is not supported")] SslIsNotSupported, @@ -45,24 +41,30 @@ pub enum ConnectorError { #[display(fmt = "Internal error: connector has been disconnected")] Disconnected, + /// Unresolved host name + #[display(fmt = "Connector received `Connect` method with unresolved host")] + Unresolverd, + /// Connection io error #[display(fmt = "{}", _0)] Io(io::Error), } -#[derive(Debug, Display)] -pub enum InvalidUrlKind { - #[display(fmt = "Missing url scheme")] - MissingScheme, - #[display(fmt = "Unknown url scheme")] - UnknownScheme, - #[display(fmt = "Missing host name")] - MissingHost, +impl From for ConnectError { + fn from(err: actix_connect::ConnectError) -> ConnectError { + match err { + actix_connect::ConnectError::Resolver(e) => ConnectError::Resolver(e), + actix_connect::ConnectError::NoRecords => ConnectError::NoRecords, + actix_connect::ConnectError::InvalidInput => panic!(), + actix_connect::ConnectError::Unresolverd => ConnectError::Unresolverd, + actix_connect::ConnectError::Io(e) => ConnectError::Io(e), + } + } } #[cfg(feature = "ssl")] -impl From> for ConnectorError { - fn from(err: HandshakeError) -> ConnectorError { +impl From> for ConnectError { + fn from(err: HandshakeError) -> ConnectError { match err { HandshakeError::SetupFailure(stack) => SslError::from(stack).into(), HandshakeError::Failure(stream) => stream.into_error().into(), @@ -71,12 +73,27 @@ impl From> for ConnectorError { } } +#[derive(Debug, Display, From)] +pub enum InvalidUrl { + #[display(fmt = "Missing url scheme")] + MissingScheme, + #[display(fmt = "Unknown url scheme")] + UnknownScheme, + #[display(fmt = "Missing host name")] + MissingHost, + #[display(fmt = "Url parse error: {}", _0)] + HttpError(http::Error), +} + /// A set of errors that can occur during request sending and response reading #[derive(Debug, Display, From)] pub enum SendRequestError { + /// Invalid URL + #[display(fmt = "Invalid URL: {}", _0)] + Url(InvalidUrl), /// Failed to connect to host #[display(fmt = "Failed to connect to host: {}", _0)] - Connector(ConnectorError), + Connect(ConnectError), /// Error sending request Send(io::Error), /// Error parsing response @@ -92,10 +109,10 @@ pub enum SendRequestError { impl ResponseError for SendRequestError { fn error_response(&self) -> Response { match *self { - SendRequestError::Connector(ConnectorError::Timeout) => { + SendRequestError::Connect(ConnectError::Timeout) => { Response::GatewayTimeout() } - SendRequestError::Connector(_) => Response::BadGateway(), + SendRequestError::Connect(_) => Response::BadGateway(), _ => Response::InternalServerError(), } .into() diff --git a/src/client/h1proto.rs b/src/client/h1proto.rs index 3329fcfec..34521cc2f 100644 --- a/src/client/h1proto.rs +++ b/src/client/h1proto.rs @@ -6,7 +6,7 @@ use futures::future::{err, ok, Either}; use futures::{Async, Future, Poll, Sink, Stream}; use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; -use super::error::{ConnectorError, SendRequestError}; +use super::error::{ConnectError, SendRequestError}; use super::pool::Acquired; use super::response::ClientResponse; use crate::body::{BodyLength, MessageBody}; @@ -62,7 +62,7 @@ where } ok(res) } else { - err(ConnectorError::Disconnected.into()) + err(ConnectError::Disconnected.into()) } }) }) diff --git a/src/client/mod.rs b/src/client/mod.rs index 8d041827f..0bff97e49 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -12,6 +12,6 @@ mod response; pub use self::connect::Connect; pub use self::connection::Connection; pub use self::connector::Connector; -pub use self::error::{ConnectorError, InvalidUrlKind, SendRequestError}; +pub use self::error::{ConnectError, InvalidUrl, SendRequestError}; pub use self::request::{ClientRequest, ClientRequestBuilder}; pub use self::response::ClientResponse; diff --git a/src/client/pool.rs b/src/client/pool.rs index 188980cb3..214b7a382 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -20,7 +20,7 @@ use tokio_timer::{sleep, Delay}; use super::connect::Connect; use super::connection::{ConnectionType, IoConnection}; -use super::error::ConnectorError; +use super::error::ConnectError; #[derive(Clone, Copy, PartialEq)] pub enum Protocol { @@ -48,11 +48,7 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io, Protocol), - Error = ConnectorError, - >, + T: Service, { pub(crate) fn new( connector: T, @@ -91,17 +87,13 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io, Protocol), - Error = ConnectorError, - >, + T: Service, { type Request = Connect; type Response = IoConnection; - type Error = ConnectorError; + type Error = ConnectError; type Future = Either< - FutureResult, ConnectorError>, + FutureResult, Either, OpenConnection>, >; @@ -151,7 +143,7 @@ where { key: Key, token: usize, - rx: oneshot::Receiver, ConnectorError>>, + rx: oneshot::Receiver, ConnectError>>, inner: Option>>>, } @@ -173,7 +165,7 @@ where Io: AsyncRead + AsyncWrite, { type Item = IoConnection; - type Error = ConnectorError; + type Error = ConnectError; fn poll(&mut self) -> Poll { match self.rx.poll() { @@ -187,7 +179,7 @@ where Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => { let _ = self.inner.take(); - Err(ConnectorError::Disconnected) + Err(ConnectError::Disconnected) } } } @@ -206,7 +198,7 @@ where impl OpenConnection where - F: Future, + F: Future, Io: AsyncRead + AsyncWrite + 'static, { fn new(key: Key, inner: Rc>>, fut: F) -> Self { @@ -234,11 +226,11 @@ where impl Future for OpenConnection where - F: Future, + F: Future, Io: AsyncRead + AsyncWrite, { type Item = IoConnection; - type Error = ConnectorError; + type Error = ConnectError; fn poll(&mut self) -> Poll { if let Some(ref mut h2) = self.h2 { @@ -258,7 +250,7 @@ where match self.fut.poll() { Err(err) => Err(err), - Ok(Async::Ready((_, io, proto))) => { + Ok(Async::Ready((io, proto))) => { let _ = self.inner.take(); if proto == Protocol::Http1 { Ok(Async::Ready(IoConnection::new( @@ -289,7 +281,7 @@ where // impl OpenWaitingConnection // where -// F: Future + 'static, +// F: Future + 'static, // Io: AsyncRead + AsyncWrite + 'static, // { // fn spawn( @@ -323,7 +315,7 @@ where // impl Future for OpenWaitingConnection // where -// F: Future, +// F: Future, // Io: AsyncRead + AsyncWrite, // { // type Item = (); @@ -402,7 +394,7 @@ pub(crate) struct Inner { available: HashMap>>, waiters: Slab<( Connect, - oneshot::Sender, ConnectorError>>, + oneshot::Sender, ConnectError>>, )>, waiters_queue: IndexSet<(Key, usize)>, task: AtomicTask, @@ -444,7 +436,7 @@ where &mut self, connect: Connect, ) -> ( - oneshot::Receiver, ConnectorError>>, + oneshot::Receiver, ConnectError>>, usize, ) { let (tx, rx) = oneshot::channel(); @@ -534,7 +526,7 @@ where // impl Future for ConnectorPoolSupport // where // Io: AsyncRead + AsyncWrite + 'static, -// T: Service, +// T: Service, // T::Future: 'static, // { // type Item = (); diff --git a/src/client/request.rs b/src/client/request.rs index efae5b94e..199e13b93 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -5,6 +5,7 @@ use std::io::Write; use actix_service::Service; use bytes::{BufMut, Bytes, BytesMut}; use cookie::{Cookie, CookieJar}; +use futures::future::{err, Either}; use futures::{Future, Stream}; use percent_encoding::{percent_encode, USERINFO_ENCODE_SET}; use serde::Serialize; @@ -21,7 +22,7 @@ use crate::message::{ConnectionType, Head, RequestHead}; use super::connection::Connection; use super::response::ClientResponse; -use super::{Connect, ConnectorError, SendRequestError}; +use super::{Connect, ConnectError, SendRequestError}; /// An HTTP Client Request /// @@ -32,7 +33,8 @@ use super::{Connect, ConnectorError, SendRequestError}; /// /// fn main() { /// System::new("test").block_on(lazy(|| { -/// let mut connector = client::Connector::default().service(); +/// let mut connector = client::Connector::new().service(); +/// /// client::ClientRequest::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() @@ -178,17 +180,24 @@ where ) -> impl Future where B: 'static, - T: Service, + T: Service, I: Connection, { let Self { head, body } = self; - connector - // connect to the host - .call(Connect::new(head.uri.clone())) - .from_err() - // send request - .and_then(move |connection| connection.send_request(head, body)) + let connect = Connect::new(head.uri.clone()); + if let Err(e) = connect.validate() { + Either::A(err(e.into())) + } else { + Either::B( + connector + // connect to the host + .call(connect) + .from_err() + // send request + .and_then(move |connection| connection.send_request(head, body)), + ) + } } } diff --git a/src/ws/client/error.rs b/src/ws/client/error.rs index 1eccb0b95..ae1e39967 100644 --- a/src/ws/client/error.rs +++ b/src/ws/client/error.rs @@ -1,7 +1,7 @@ //! Http client request use std::io; -use actix_connector::ConnectorError; +use actix_connect::ConnectError; use derive_more::{Display, From}; use http::{header::HeaderValue, Error as HttpError, StatusCode}; @@ -43,7 +43,7 @@ pub enum ClientError { Protocol(ProtocolError), /// Connect error #[display(fmt = "Connector error: {:?}", _0)] - Connect(ConnectorError), + Connect(ConnectError), /// IO Error #[display(fmt = "{}", _0)] Io(io::Error), diff --git a/src/ws/client/mod.rs b/src/ws/client/mod.rs index 12c7229b9..0dbf081c6 100644 --- a/src/ws/client/mod.rs +++ b/src/ws/client/mod.rs @@ -4,7 +4,7 @@ mod service; pub use self::connect::Connect; pub use self::error::ClientError; -pub use self::service::{Client, DefaultClient}; +pub use self::service::Client; #[derive(PartialEq, Hash, Debug, Clone, Copy)] pub(crate) enum Protocol { diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs index 586873d19..e3781e15f 100644 --- a/src/ws/client/service.rs +++ b/src/ws/client/service.rs @@ -2,8 +2,8 @@ use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_connector::{Connect as TcpConnect, ConnectorError, DefaultConnector}; -use actix_service::Service; +use actix_connect::{default_connector, Connect as TcpConnect, ConnectError}; +use actix_service::{apply_fn, Service}; use base64; use futures::future::{err, Either, FutureResult}; use futures::{try_ready, Async, Future, Poll, Sink, Stream}; @@ -20,21 +20,29 @@ use crate::ws::Codec; use super::{ClientError, Connect, Protocol}; -/// Default client, uses default connector. -pub type DefaultClient = Client; - /// WebSocket's client -pub struct Client -where - T: Service, - T::Response: AsyncRead + AsyncWrite, -{ +pub struct Client { connector: T, } +impl Client<()> { + /// Create client with default connector. + pub fn default() -> Client< + impl Service< + Request = TcpConnect, + Response = impl AsyncRead + AsyncWrite, + Error = ConnectError, + > + Clone, + > { + Client::new(apply_fn(default_connector(), |msg: TcpConnect, srv| { + srv.call(msg).map(|stream| stream.into_parts().0) + })) + } +} + impl Client where - T: Service, + T: Service, T::Response: AsyncRead + AsyncWrite, { /// Create new websocket's client factory @@ -43,15 +51,9 @@ where } } -impl Default for Client { - fn default() -> Self { - Client::new(DefaultConnector::default()) - } -} - impl Clone for Client where - T: Service + Clone, + T: Service + Clone, T::Response: AsyncRead + AsyncWrite, { fn clone(&self) -> Self { @@ -63,7 +65,7 @@ where impl Service for Client where - T: Service, + T: Service, T::Response: AsyncRead + AsyncWrite + 'static, T::Future: 'static, { diff --git a/src/ws/mod.rs b/src/ws/mod.rs index a8de59dd4..8f9bc83ba 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -21,7 +21,7 @@ mod proto; mod service; mod transport; -pub use self::client::{Client, ClientError, Connect, DefaultClient}; +pub use self::client::{Client, ClientError, Connect}; pub use self::codec::{Codec, Frame, Message}; pub use self::frame::Parser; pub use self::proto::{CloseCode, CloseReason, OpCode}; diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index d810d8915..3bb5feffb 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -1,18 +1,17 @@ //! Various helpers for Actix applications to use during testing. use std::sync::mpsc; -use std::{net, thread}; +use std::{net, thread, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::MessageBody; use actix_http::client::{ - ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connection, Connector, - ConnectorError, SendRequestError, + ClientRequest, ClientRequestBuilder, ClientResponse, Connect, ConnectError, + Connection, Connector, SendRequestError, }; use actix_http::ws; use actix_rt::{Runtime, System}; use actix_server::{Server, StreamServiceFactory}; use actix_service::Service; - use futures::future::{lazy, Future}; use http::Method; use net2::TcpBuilder; @@ -44,21 +43,15 @@ use net2::TcpBuilder; /// ``` pub struct TestServer; -/// -pub struct TestServerRuntime { +/// Test server controller +pub struct TestServerRuntime { addr: net::SocketAddr, - conn: T, rt: Runtime, } impl TestServer { /// Start new test server with application factory - pub fn new( - factory: F, - ) -> TestServerRuntime< - impl Service - + Clone, - > { + pub fn new(factory: F) -> TestServerRuntime { let (tx, rx) = mpsc::channel(); // run server in separate thread @@ -79,35 +72,9 @@ impl TestServer { let (system, addr) = rx.recv().unwrap(); System::set_current(system); - - let mut rt = Runtime::new().unwrap(); - let conn = rt - .block_on(lazy(|| Ok::<_, ()>(TestServer::new_connector()))) - .unwrap(); - - TestServerRuntime { addr, conn, rt } - } - - fn new_connector( - ) -> impl Service< - Request = Connect, - Response = impl Connection, - Error = ConnectorError, - > + Clone { - #[cfg(feature = "ssl")] - { - use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; - - let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); - builder.set_verify(SslVerifyMode::NONE); - let _ = builder - .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); - Connector::default().ssl(builder.build()).service() - } - #[cfg(not(feature = "ssl"))] - { - Connector::default().service() + TestServerRuntime { + addr, + rt: Runtime::new().unwrap(), } } @@ -122,7 +89,7 @@ impl TestServer { } } -impl TestServerRuntime { +impl TestServerRuntime { /// Execute future on current core pub fn block_on(&mut self, fut: F) -> Result where @@ -131,12 +98,12 @@ impl TestServerRuntime { self.rt.block_on(fut) } - /// Execute future on current core - pub fn execute(&mut self, fut: F) -> Result + /// Execute function on current core + pub fn execute(&mut self, fut: F) -> R where - F: Future, + F: FnOnce() -> R, { - self.rt.block_on(fut) + self.rt.block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap() } /// Construct test server url @@ -190,17 +157,37 @@ impl TestServerRuntime { .take() } - /// Http connector - pub fn connector(&mut self) -> &mut T { - &mut self.conn + fn new_connector( + ) -> impl Service + + Clone { + #[cfg(feature = "ssl")] + { + use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; + + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let _ = builder + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + Connector::new() + .timeout(time::Duration::from_millis(500)) + .ssl(builder.build()) + .service() + } + #[cfg(not(feature = "ssl"))] + { + Connector::new() + .timeout(time::Duration::from_millis(500)) + .service() + } } /// Http connector - pub fn new_connector(&mut self) -> T - where - T: Clone, - { - self.conn.clone() + pub fn connector( + &mut self, + ) -> impl Service + + Clone { + self.execute(|| TestServerRuntime::new_connector()) } /// Stop http server @@ -209,11 +196,7 @@ impl TestServerRuntime { } } -impl TestServerRuntime -where - T: Service + Clone, - T::Response: Connection, -{ +impl TestServerRuntime { /// Connect to websocket server at a given path pub fn ws_at( &mut self, @@ -236,11 +219,12 @@ where &mut self, req: ClientRequest, ) -> Result { - self.rt.block_on(req.send(&mut self.conn)) + let mut conn = self.connector(); + self.rt.block_on(req.send(&mut conn)) } } -impl Drop for TestServerRuntime { +impl Drop for TestServerRuntime { fn drop(&mut self) { self.stop() } diff --git a/tests/test_client.rs b/tests/test_client.rs index 782e487ca..90e1a4f4a 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -36,7 +36,7 @@ fn test_h1_v2() { .finish(|_| future::ok::<_, ()>(Response::Ok().body(STR))) .map(|_| ()) }); - let mut connector = srv.new_connector(); + let mut connector = srv.connector(); let request = srv.get().finish().unwrap(); let response = srv.block_on(request.send(&mut connector)).unwrap(); @@ -70,7 +70,7 @@ fn test_connection_close() { .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) .map(|_| ()) }); - let mut connector = srv.new_connector(); + let mut connector = srv.connector(); let request = srv.get().close().finish().unwrap(); let response = srv.block_on(request.send(&mut connector)).unwrap(); @@ -90,7 +90,7 @@ fn test_with_query_parameter() { }) .map(|_| ()) }); - let mut connector = srv.new_connector(); + let mut connector = srv.connector(); let request = client::ClientRequest::get(srv.url("/?qp=5")) .finish() diff --git a/tests/test_server.rs b/tests/test_server.rs index 8266bb9af..98f740941 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -432,7 +432,7 @@ fn test_h1_headers() { future::ok::<_, ()>(builder.body(data.clone())) }) }); - let mut connector = srv.new_connector(); + let mut connector = srv.connector(); let req = srv.get().finish().unwrap(); @@ -479,7 +479,7 @@ fn test_h2_headers() { future::ok::<_, ()>(builder.body(data.clone())) }).map_err(|_| ())) }); - let mut connector = srv.new_connector(); + let mut connector = srv.connector(); let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); let mut response = srv.block_on(req.send(&mut connector)).unwrap();