From b8bfd29d2c5e9b5a9ce10e27a1e1898d54e40444 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Thu, 14 Mar 2019 11:52:52 -0700
Subject: [PATCH] use Uri as client connect message

---
 Cargo.toml               |   2 +-
 src/client/connect.rs    |  78 ----------------------
 src/client/connector.rs  | 136 +++++++++++++++++++--------------------
 src/client/mod.rs        |   2 -
 src/client/pool.rs       | 132 ++++---------------------------------
 src/client/request.rs    |  37 +++++++----
 src/ws/client/service.rs |   4 +-
 test-server/src/lib.rs   |  14 ++--
 8 files changed, 112 insertions(+), 293 deletions(-)
 delete mode 100644 src/client/connect.rs

diff --git a/Cargo.toml b/Cargo.toml
index 11f532c8..3b9a8498 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,7 +55,7 @@ encoding = "0.2"
 futures = "0.1"
 hashbrown = "0.1.8"
 h2 = "0.1.16"
-http = "0.1.8"
+http = "0.1.16"
 httparse = "1.3"
 indexmap = "1.0"
 lazy_static = "1.0"
diff --git a/src/client/connect.rs b/src/client/connect.rs
deleted file mode 100644
index 82e5e45c..00000000
--- a/src/client/connect.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-use actix_connect::Address;
-use http::uri::Uri;
-use http::HttpTryFrom;
-
-use super::error::InvalidUrl;
-use super::pool::Key;
-
-#[derive(Debug)]
-/// `Connect` type represents a message that can be sent to
-/// `Connector` with a connection request.
-pub struct Connect {
-    pub(crate) uri: Uri,
-}
-
-impl Connect {
-    /// Create `Connect` message for specified `Uri`
-    pub fn new(uri: Uri) -> Connect {
-        Connect { uri }
-    }
-
-    /// Construct `Uri` instance and create `Connect` message.
-    pub fn try_from<U>(uri: U) -> Result<Connect, InvalidUrl>
-    where
-        Uri: HttpTryFrom<U>,
-    {
-        Ok(Connect {
-            uri: Uri::try_from(uri).map_err(|e| e.into())?,
-        })
-    }
-
-    pub(crate) fn is_secure(&self) -> bool {
-        if let Some(scheme) = self.uri.scheme_part() {
-            scheme.as_str() == "https"
-        } else {
-            false
-        }
-    }
-
-    pub(crate) fn key(&self) -> Key {
-        self.uri.authority_part().unwrap().clone().into()
-    }
-
-    pub(crate) fn validate(&self) -> Result<(), InvalidUrl> {
-        if self.uri.host().is_none() {
-            Err(InvalidUrl::MissingHost)
-        } else if self.uri.scheme_part().is_none() {
-            Err(InvalidUrl::MissingScheme)
-        } else if let Some(scheme) = self.uri.scheme_part() {
-            match scheme.as_str() {
-                "http" | "ws" | "https" | "wss" => Ok(()),
-                _ => Err(InvalidUrl::UnknownScheme),
-            }
-        } else {
-            Ok(())
-        }
-    }
-}
-
-impl Address for Connect {
-    fn host(&self) -> &str {
-        &self.uri.host().unwrap()
-    }
-
-    fn port(&self) -> Option<u16> {
-        let port = if let Some(port) = self.uri.port() {
-            port
-        } else if let Some(scheme) = self.uri.scheme_part() {
-            match scheme.as_str() {
-                "http" | "ws" => 80,
-                "https" | "wss" => 443,
-                _ => 80,
-            }
-        } else {
-            80
-        };
-        Some(port)
-    }
-}
diff --git a/src/client/connector.rs b/src/client/connector.rs
index c764b93c..b8054151 100644
--- a/src/client/connector.rs
+++ b/src/client/connector.rs
@@ -8,9 +8,9 @@ use actix_connect::{
 };
 use actix_service::{apply_fn, Service, ServiceExt};
 use actix_utils::timeout::{TimeoutError, TimeoutService};
+use http::Uri;
 use tokio_tcp::TcpStream;
 
-use super::connect::Connect;
 use super::connection::Connection;
 use super::error::ConnectError;
 use super::pool::{ConnectionPool, Protocol};
@@ -38,8 +38,8 @@ pub struct Connector<T, U> {
 impl Connector<(), ()> {
     pub fn new() -> Connector<
         impl Service<
-                Request = TcpConnect<Connect>,
-                Response = TcpConnection<Connect, TcpStream>,
+                Request = TcpConnect<Uri>,
+                Response = TcpConnection<Uri, TcpStream>,
                 Error = actix_connect::ConnectError,
             > + Clone,
         TcpStream,
@@ -79,8 +79,8 @@ impl<T, U> Connector<T, U> {
     where
         U1: AsyncRead + AsyncWrite + fmt::Debug,
         T1: Service<
-                Request = TcpConnect<Connect>,
-                Response = TcpConnection<Connect, U1>,
+                Request = TcpConnect<Uri>,
+                Response = TcpConnection<Uri, U1>,
                 Error = actix_connect::ConnectError,
             > + Clone,
     {
@@ -101,8 +101,8 @@ impl<T, U> Connector<T, U>
 where
     U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
     T: Service<
-            Request = TcpConnect<Connect>,
-            Response = TcpConnection<Connect, U>,
+            Request = TcpConnect<Uri>,
+            Response = TcpConnection<Uri, U>,
             Error = actix_connect::ConnectError,
         > + Clone,
 {
@@ -166,16 +166,14 @@ where
     /// Finish configuration process and create connector service.
     pub fn service(
         self,
-    ) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
-                 + Clone {
+    ) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
+    {
         #[cfg(not(feature = "ssl"))]
         {
             let connector = TimeoutService::new(
                 self.timeout,
-                apply_fn(self.connector, |msg: Connect, srv| {
-                    srv.call(actix_connect::Connect::new(msg))
-                })
-                .map(|stream| (stream.into_parts().0, Protocol::Http1)),
+                apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into()))
+                    .map(|stream| (stream.into_parts().0, Protocol::Http1)),
             )
             .map_err(|e| match e {
                 TimeoutError::Service(e) => e,
@@ -199,28 +197,26 @@ where
 
             let ssl_service = TimeoutService::new(
                 self.timeout,
-                apply_fn(self.connector.clone(), |msg: Connect, srv| {
-                    srv.call(actix_connect::Connect::new(msg))
-                })
-                .map_err(ConnectError::from)
-                .and_then(
-                    OpensslConnector::service(self.ssl)
-                        .map_err(ConnectError::from)
-                        .map(|stream| {
-                            let sock = stream.into_parts().0;
-                            let h2 = sock
-                                .get_ref()
-                                .ssl()
-                                .selected_alpn_protocol()
-                                .map(|protos| protos.windows(2).any(|w| w == H2))
-                                .unwrap_or(false);
-                            if h2 {
-                                (sock, Protocol::Http2)
-                            } else {
-                                (sock, Protocol::Http1)
-                            }
-                        }),
-                ),
+                apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
+                    .map_err(ConnectError::from)
+                    .and_then(
+                        OpensslConnector::service(self.ssl)
+                            .map_err(ConnectError::from)
+                            .map(|stream| {
+                                let sock = stream.into_parts().0;
+                                let h2 = sock
+                                    .get_ref()
+                                    .ssl()
+                                    .selected_alpn_protocol()
+                                    .map(|protos| protos.windows(2).any(|w| w == H2))
+                                    .unwrap_or(false);
+                                if h2 {
+                                    (sock, Protocol::Http2)
+                                } else {
+                                    (sock, Protocol::Http1)
+                                }
+                            }),
+                    ),
             )
             .map_err(|e| match e {
                 TimeoutError::Service(e) => e,
@@ -229,11 +225,9 @@ where
 
             let tcp_service = TimeoutService::new(
                 self.timeout,
-                apply_fn(self.connector.clone(), |msg: Connect, srv| {
-                    srv.call(actix_connect::Connect::new(msg))
-                })
-                .map_err(ConnectError::from)
-                .map(|stream| (stream.into_parts().0, Protocol::Http1)),
+                apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
+                    .map_err(ConnectError::from)
+                    .map(|stream| (stream.into_parts().0, Protocol::Http1)),
             )
             .map_err(|e| match e {
                 TimeoutError::Service(e) => e,
@@ -271,7 +265,7 @@ mod connect_impl {
     pub(crate) struct InnerConnector<T, Io>
     where
         Io: AsyncRead + AsyncWrite + 'static,
-        T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
+        T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectorError>,
     {
         pub(crate) tcp_pool: ConnectionPool<T, Io>,
     }
@@ -279,7 +273,7 @@ mod connect_impl {
     impl<T, Io> Clone for InnerConnector<T, Io>
     where
         Io: AsyncRead + AsyncWrite + 'static,
-        T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+        T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>
             + Clone,
     {
         fn clone(&self) -> Self {
@@ -292,9 +286,9 @@ mod connect_impl {
     impl<T, Io> Service for InnerConnector<T, Io>
     where
         Io: AsyncRead + AsyncWrite + 'static,
-        T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
+        T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectorError>,
     {
-        type Request = Connect;
+        type Request = Uri;
         type Response = IoConnection<Io>;
         type Error = ConnectorError;
         type Future = Either<
@@ -306,13 +300,12 @@ mod connect_impl {
             self.tcp_pool.poll_ready()
         }
 
-        fn call(&mut self, req: Connect) -> Self::Future {
-            if req.is_secure() {
-                Either::B(err(ConnectError::SslIsNotSupported))
-            } else if let Err(e) = req.validate() {
-                Either::B(err(e))
-            } else {
-                Either::A(self.tcp_pool.call(req))
+        fn call(&mut self, req: Uri) -> Self::Future {
+            match req.scheme_str() {
+                Some("https") | Some("wss") => {
+                    Either::B(err(ConnectError::SslIsNotSupported))
+                }
+                _ => Either::A(self.tcp_pool.call(req)),
             }
         }
     }
@@ -332,8 +325,8 @@ mod connect_impl {
     where
         Io1: AsyncRead + AsyncWrite + 'static,
         Io2: AsyncRead + AsyncWrite + 'static,
-        T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
-        T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
+        T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
+        T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
     {
         pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
         pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
@@ -343,9 +336,9 @@ mod connect_impl {
     where
         Io1: AsyncRead + AsyncWrite + 'static,
         Io2: AsyncRead + AsyncWrite + 'static,
-        T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+        T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>
             + Clone,
-        T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+        T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>
             + Clone,
     {
         fn clone(&self) -> Self {
@@ -360,10 +353,10 @@ mod connect_impl {
     where
         Io1: AsyncRead + AsyncWrite + 'static,
         Io2: AsyncRead + AsyncWrite + 'static,
-        T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
-        T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
+        T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
+        T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
     {
-        type Request = Connect;
+        type Request = Uri;
         type Response = EitherConnection<Io1, Io2>;
         type Error = ConnectError;
         type Future = Either<
@@ -378,17 +371,18 @@ mod connect_impl {
             self.tcp_pool.poll_ready()
         }
 
-        fn call(&mut self, req: Connect) -> Self::Future {
-            if req.is_secure() {
-                Either::B(Either::B(InnerConnectorResponseB {
-                    fut: self.ssl_pool.call(req),
-                    _t: PhantomData,
-                }))
-            } else {
-                Either::B(Either::A(InnerConnectorResponseA {
+        fn call(&mut self, req: Uri) -> Self::Future {
+            match req.scheme_str() {
+                Some("https") | Some("wss") => {
+                    Either::B(Either::B(InnerConnectorResponseB {
+                        fut: self.ssl_pool.call(req),
+                        _t: PhantomData,
+                    }))
+                }
+                _ => Either::B(Either::A(InnerConnectorResponseA {
                     fut: self.tcp_pool.call(req),
                     _t: PhantomData,
-                }))
+                })),
             }
         }
     }
@@ -396,7 +390,7 @@ mod connect_impl {
     pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
     where
         Io1: AsyncRead + AsyncWrite + 'static,
-        T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
+        T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
     {
         fut: <ConnectionPool<T, Io1> as Service>::Future,
         _t: PhantomData<Io2>,
@@ -404,7 +398,7 @@ mod connect_impl {
 
     impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
     where
-        T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
+        T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
         Io1: AsyncRead + AsyncWrite + 'static,
         Io2: AsyncRead + AsyncWrite + 'static,
     {
@@ -422,7 +416,7 @@ mod connect_impl {
     pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
     where
         Io2: AsyncRead + AsyncWrite + 'static,
-        T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
+        T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
     {
         fut: <ConnectionPool<T, Io2> as Service>::Future,
         _t: PhantomData<Io1>,
@@ -430,7 +424,7 @@ mod connect_impl {
 
     impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
     where
-        T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
+        T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
         Io1: AsyncRead + AsyncWrite + 'static,
         Io2: AsyncRead + AsyncWrite + 'static,
     {
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 0bff97e4..86b1a0cc 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -1,5 +1,4 @@
 //! Http client api
-mod connect;
 mod connection;
 mod connector;
 mod error;
@@ -9,7 +8,6 @@ mod pool;
 mod request;
 mod response;
 
-pub use self::connect::Connect;
 pub use self::connection::Connection;
 pub use self::connector::Connector;
 pub use self::error::{ConnectError, InvalidUrl, SendRequestError};
diff --git a/src/client/pool.rs b/src/client/pool.rs
index 214b7a38..a94b1e52 100644
--- a/src/client/pool.rs
+++ b/src/client/pool.rs
@@ -7,18 +7,17 @@ use std::time::{Duration, Instant};
 use actix_codec::{AsyncRead, AsyncWrite};
 use actix_service::Service;
 use bytes::Bytes;
-use futures::future::{ok, Either, FutureResult};
+use futures::future::{err, ok, Either, FutureResult};
 use futures::task::AtomicTask;
 use futures::unsync::oneshot;
 use futures::{Async, Future, Poll};
 use h2::client::{handshake, Handshake};
 use hashbrown::HashMap;
-use http::uri::Authority;
+use http::uri::{Authority, Uri};
 use indexmap::IndexSet;
 use slab::Slab;
 use tokio_timer::{sleep, Delay};
 
-use super::connect::Connect;
 use super::connection::{ConnectionType, IoConnection};
 use super::error::ConnectError;
 
@@ -48,7 +47,7 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
 impl<T, Io> ConnectionPool<T, Io>
 where
     Io: AsyncRead + AsyncWrite + 'static,
-    T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
+    T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
 {
     pub(crate) fn new(
         connector: T,
@@ -87,9 +86,9 @@ where
 impl<T, Io> Service for ConnectionPool<T, Io>
 where
     Io: AsyncRead + AsyncWrite + 'static,
-    T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
+    T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
 {
-    type Request = Connect;
+    type Request = Uri;
     type Response = IoConnection<Io>;
     type Error = ConnectError;
     type Future = Either<
@@ -101,8 +100,12 @@ where
         self.0.poll_ready()
     }
 
-    fn call(&mut self, req: Connect) -> Self::Future {
-        let key = req.key();
+    fn call(&mut self, req: Uri) -> Self::Future {
+        let key = if let Some(authority) = req.authority_part() {
+            authority.clone().into()
+        } else {
+            return Either::A(err(ConnectError::Unresolverd));
+        };
 
         // acquire connection
         match self.1.as_ref().borrow_mut().acquire(&key) {
@@ -268,110 +271,6 @@ where
     }
 }
 
-// struct OpenWaitingConnection<F, Io>
-// where
-//     Io: AsyncRead + AsyncWrite + 'static,
-// {
-//     fut: F,
-//     key: Key,
-//     h2: Option<Handshake<Io, Bytes>>,
-//     rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>>,
-//     inner: Option<Rc<RefCell<Inner<Io>>>>,
-// }
-
-// impl<F, Io> OpenWaitingConnection<F, Io>
-// where
-//     F: Future<Item = (Io, Protocol), Error = ConnectorError> + 'static,
-//     Io: AsyncRead + AsyncWrite + 'static,
-// {
-//     fn spawn(
-//         key: Key,
-//         rx: oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
-//         inner: Rc<RefCell<Inner<Io>>>,
-//         fut: F,
-//     ) {
-//         tokio_current_thread::spawn(OpenWaitingConnection {
-//             key,
-//             fut,
-//             h2: None,
-//             rx: Some(rx),
-//             inner: Some(inner),
-//         })
-//     }
-// }
-
-// impl<F, Io> Drop for OpenWaitingConnection<F, Io>
-// where
-//     Io: AsyncRead + AsyncWrite + 'static,
-// {
-//     fn drop(&mut self) {
-//         if let Some(inner) = self.inner.take() {
-//             let mut inner = inner.as_ref().borrow_mut();
-//             inner.release();
-//             inner.check_availibility();
-//         }
-//     }
-// }
-
-// impl<F, Io> Future for OpenWaitingConnection<F, Io>
-// where
-//     F: Future<Item = (Io, Protocol), Error = ConnectorError>,
-//     Io: AsyncRead + AsyncWrite,
-// {
-//     type Item = ();
-//     type Error = ();
-
-//     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-//         if let Some(ref mut h2) = self.h2 {
-//             return match h2.poll() {
-//                 Ok(Async::Ready((snd, connection))) => {
-//                     tokio_current_thread::spawn(connection.map_err(|_| ()));
-//                     let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
-//                         ConnectionType::H2(snd),
-//                         Instant::now(),
-//                         Some(Acquired(self.key.clone(), self.inner.clone())),
-//                     )));
-//                     Ok(Async::Ready(()))
-//                 }
-//                 Ok(Async::NotReady) => Ok(Async::NotReady),
-//                 Err(e) => {
-//                     let _ = self.inner.take();
-//                     if let Some(rx) = self.rx.take() {
-//                         let _ = rx.send(Err(e.into()));
-//                     }
-
-//                     Err(())
-//                 }
-//             };
-//         }
-
-//         match self.fut.poll() {
-//             Err(err) => {
-//                 let _ = self.inner.take();
-//                 if let Some(rx) = self.rx.take() {
-//                     let _ = rx.send(Err(err));
-//                 }
-//                 Err(())
-//             }
-//             Ok(Async::Ready((_, io, proto))) => {
-//                 let _ = self.inner.take();
-//                 if proto == Protocol::Http1 {
-//                     let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
-//                         ConnectionType::H1(io),
-//                         Instant::now(),
-//                         Some(Acquired(self.key.clone(), self.inner.clone())),
-//                     )));
-//                 } else {
-//                     self.h2 = Some(handshake(io));
-//                     return self.poll();
-//                 }
-//                 Ok(Async::Ready(()))
-//             }
-//             Ok(Async::NotReady) => Ok(Async::NotReady),
-//         }
-//     }
-// }
-
 enum Acquire<T> {
     Acquired(ConnectionType<T>, Instant),
     Available,
@@ -392,10 +291,7 @@ pub(crate) struct Inner<Io> {
     limit: usize,
     acquired: usize,
     available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
-    waiters: Slab<(
-        Connect,
-        oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
-    )>,
+    waiters: Slab<(Uri, oneshot::Sender<Result<IoConnection<Io>, ConnectError>>)>,
     waiters_queue: IndexSet<(Key, usize)>,
     task: AtomicTask,
 }
@@ -434,14 +330,14 @@ where
     /// connection is not available, wait
     fn wait_for(
         &mut self,
-        connect: Connect,
+        connect: Uri,
     ) -> (
         oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
         usize,
     ) {
         let (tx, rx) = oneshot::channel();
 
-        let key = connect.key();
+        let key: Key = connect.authority_part().unwrap().clone().into();
         let entry = self.waiters.vacant_entry();
         let token = entry.key();
         entry.insert((connect, tx));
diff --git a/src/client/request.rs b/src/client/request.rs
index 199e13b9..7c7079fb 100644
--- a/src/client/request.rs
+++ b/src/client/request.rs
@@ -21,8 +21,8 @@ use crate::http::{
 use crate::message::{ConnectionType, Head, RequestHead};
 
 use super::connection::Connection;
+use super::error::{ConnectError, InvalidUrl, SendRequestError};
 use super::response::ClientResponse;
-use super::{Connect, ConnectError, SendRequestError};
 
 /// An HTTP Client Request
 ///
@@ -180,23 +180,32 @@ where
     ) -> impl Future<Item = ClientResponse, Error = SendRequestError>
     where
         B: 'static,
-        T: Service<Request = Connect, Response = I, Error = ConnectError>,
+        T: Service<Request = Uri, Response = I, Error = ConnectError>,
         I: Connection,
     {
         let Self { head, body } = self;
 
-        let connect = Connect::new(head.uri.clone());
-        if let Err(e) = connect.validate() {
-            Either::A(err(e.into()))
+        let uri = head.uri.clone();
+
+        // validate uri
+        if uri.host().is_none() {
+            Either::A(err(InvalidUrl::MissingHost.into()))
+        } else if uri.scheme_part().is_none() {
+            Either::A(err(InvalidUrl::MissingScheme.into()))
+        } else if let Some(scheme) = uri.scheme_part() {
+            match scheme.as_str() {
+                "http" | "ws" | "https" | "wss" => Either::B(
+                    connector
+                        // connect to the host
+                        .call(uri)
+                        .from_err()
+                        // send request
+                        .and_then(move |connection| connection.send_request(head, body)),
+                ),
+                _ => Either::A(err(InvalidUrl::UnknownScheme.into())),
+            }
         } else {
-            Either::B(
-                connector
-                    // connect to the host
-                    .call(connect)
-                    .from_err()
-                    // send request
-                    .and_then(move |connection| connection.send_request(head, body)),
-            )
+            Either::A(err(InvalidUrl::UnknownScheme.into()))
         }
     }
 }
@@ -529,7 +538,7 @@ impl ClientRequestBuilder {
                     if !parts.headers.contains_key(header::HOST) {
                         let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
 
-                        let _ = match parts.uri.port() {
+                        let _ = match parts.uri.port_u16() {
                             None | Some(80) | Some(443) => write!(wrt, "{}", host),
                             Some(port) => write!(wrt, "{}:{}", host, port),
                         };
diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs
index 1aa39124..a0a9b203 100644
--- a/src/ws/client/service.rs
+++ b/src/ws/client/service.rs
@@ -2,7 +2,7 @@
 use std::marker::PhantomData;
 
 use actix_codec::{AsyncRead, AsyncWrite, Framed};
-use actix_connect::{default_connector, Address, Connect as TcpConnect, ConnectError};
+use actix_connect::{default_connector, Connect as TcpConnect, ConnectError};
 use actix_service::{apply_fn, Service};
 use base64;
 use futures::future::{err, Either, FutureResult};
@@ -131,7 +131,7 @@ where
 
             // prep connection
             let connect = TcpConnect::new(request.uri().host().unwrap().to_string())
-                .set_port(request.uri().port().unwrap_or_else(|| proto.port()));
+                .set_port(request.uri().port_u16().unwrap_or_else(|| proto.port()));
 
             let fut = Box::new(
                 self.connector
diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs
index 3bb5feff..26bca787 100644
--- a/test-server/src/lib.rs
+++ b/test-server/src/lib.rs
@@ -5,10 +5,10 @@ use std::{net, thread, time};
 use actix_codec::{AsyncRead, AsyncWrite, Framed};
 use actix_http::body::MessageBody;
 use actix_http::client::{
-    ClientRequest, ClientRequestBuilder, ClientResponse, Connect, ConnectError,
-    Connection, Connector, SendRequestError,
+    ClientRequest, ClientRequestBuilder, ClientResponse, ConnectError, Connection,
+    Connector, SendRequestError,
 };
-use actix_http::ws;
+use actix_http::{http::Uri, ws};
 use actix_rt::{Runtime, System};
 use actix_server::{Server, StreamServiceFactory};
 use actix_service::Service;
@@ -158,8 +158,8 @@ impl TestServerRuntime {
     }
 
     fn new_connector(
-    ) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
-                 + Clone {
+    ) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
+    {
         #[cfg(feature = "ssl")]
         {
             use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
@@ -185,8 +185,8 @@ impl TestServerRuntime {
     /// Http connector
     pub fn connector(
         &mut self,
-    ) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
-                 + Clone {
+    ) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
+    {
         self.execute(|| TestServerRuntime::new_connector())
     }