From 361ffd8d2fdf4e52f8f3f72632702f9c963a36e2 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 29 Aug 2018 15:15:24 -0700 Subject: [PATCH] refactor Connector service --- src/connector.rs | 141 ++++++++++++++++++++++++++++----------------- src/lib.rs | 3 +- src/ssl/openssl.rs | 53 ++++++++++------- 3 files changed, 121 insertions(+), 76 deletions(-) diff --git a/src/connector.rs b/src/connector.rs index f6ff75f7b..faae08ffb 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; use std::io; +use std::marker::PhantomData; use std::net::SocketAddr; use futures::{ @@ -15,6 +16,16 @@ use trust_dns_resolver::{AsyncResolver, Background}; use super::{NewService, Service}; +pub trait HostAware { + fn host(&self) -> &str; +} + +impl HostAware for String { + fn host(&self) -> &str { + self.as_ref() + } +} + #[derive(Fail, Debug)] pub enum ConnectorError { /// Failed to resolve the hostname @@ -35,11 +46,12 @@ pub struct ConnectionInfo { pub addr: SocketAddr, } -pub struct Connector { +pub struct Connector { resolver: AsyncResolver, + req: PhantomData, } -impl Default for Connector { +impl Default for Connector { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -51,66 +63,72 @@ impl Default for Connector { } } -impl Connector { +impl Connector { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio::spawn(bg); - Connector { resolver } + Connector { + resolver, + req: PhantomData, + } } pub fn new_service() -> impl NewService< - Request = String, - Response = (ConnectionInfo, TcpStream), + Request = T, + Response = (T, ConnectionInfo, TcpStream), Error = ConnectorError, InitError = E, > + Clone { - || -> FutureResult { ok(Connector::default()) } + || -> FutureResult, E> { ok(Connector::default()) } } pub fn new_service_with_config( cfg: ResolverConfig, opts: ResolverOpts, ) -> impl NewService< - Request = String, - Response = (ConnectionInfo, TcpStream), + Request = T, + Response = (T, ConnectionInfo, TcpStream), Error = ConnectorError, InitError = E, > + Clone { - move || -> FutureResult { ok(Connector::new(cfg.clone(), opts.clone())) } - } -} - -impl Clone for Connector { - fn clone(&self) -> Self { - Connector { - resolver: self.resolver.clone(), + move || -> FutureResult, E> { + ok(Connector::new(cfg.clone(), opts.clone())) } } } -impl Service for Connector { - type Request = String; - type Response = (ConnectionInfo, TcpStream); +impl Clone for Connector { + fn clone(&self) -> Self { + Connector { + resolver: self.resolver.clone(), + req: PhantomData, + } + } +} + +impl Service for Connector { + type Request = T; + type Response = (T, ConnectionInfo, TcpStream); type Error = ConnectorError; - type Future = ConnectorFuture; + type Future = ConnectorFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, addr: String) -> Self::Future { - let fut = ResolveFut::new(addr, 0, &self.resolver); + fn call(&mut self, req: Self::Request) -> Self::Future { + let fut = ResolveFut::new(req, 0, &self.resolver); ConnectorFuture { fut, fut2: None } } } -pub struct ConnectorFuture { - fut: ResolveFut, - fut2: Option, +pub struct ConnectorFuture { + fut: ResolveFut, + fut2: Option>, } -impl Future for ConnectorFuture { - type Item = (ConnectionInfo, TcpStream); +impl Future for ConnectorFuture { + type Item = (T, ConnectionInfo, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -118,8 +136,8 @@ impl Future for ConnectorFuture { return fut.poll(); } match self.fut.poll()? { - Async::Ready((host, addrs)) => { - self.fut2 = Some(TcpConnector::new(host, addrs)); + Async::Ready((req, host, addrs)) => { + self.fut2 = Some(TcpConnector::new(req, host, addrs)); self.poll() } Async::NotReady => Ok(Async::NotReady), @@ -128,7 +146,8 @@ impl Future for ConnectorFuture { } /// Resolver future -struct ResolveFut { +struct ResolveFut { + req: Option, host: Option, port: u16, lookup: Option>, @@ -137,20 +156,25 @@ struct ResolveFut { error2: Option, } -impl ResolveFut { - pub fn new(addr: String, port: u16, resolver: &AsyncResolver) -> Self { +impl ResolveFut { + pub fn new(addr: T, port: u16, resolver: &AsyncResolver) -> Self { // we need to do dns resolution - match ResolveFut::parse(addr.as_ref(), port) { - Ok((host, port)) => ResolveFut { - port, - host: Some(host.to_owned()), - lookup: Some(resolver.lookup_ip(host)), - addrs: None, - error: None, - error2: None, - }, + match ResolveFut::::parse(addr.host(), port) { + Ok((host, port)) => { + let lookup = Some(resolver.lookup_ip(host.as_str())); + ResolveFut { + port, + lookup, + req: Some(addr), + host: Some(host), + addrs: None, + error: None, + error2: None, + } + } Err(err) => ResolveFut { port, + req: None, host: None, lookup: None, addrs: None, @@ -160,7 +184,7 @@ impl ResolveFut { } } - fn parse(addr: &str, port: u16) -> Result<(&str, u16), ConnectorError> { + fn parse(addr: &str, port: u16) -> Result<(String, u16), ConnectorError> { macro_rules! try_opt { ($e:expr, $msg:expr) => { match $e { @@ -176,12 +200,12 @@ impl ResolveFut { let port_str = parts_iter.next().unwrap_or(""); let port: u16 = port_str.parse().unwrap_or(port); - Ok((host, port)) + Ok((host.to_owned(), port)) } } -impl Future for ResolveFut { - type Item = (String, VecDeque); +impl Future for ResolveFut { + type Item = (T, String, VecDeque); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -190,7 +214,11 @@ impl Future for ResolveFut { } else if let Some(err) = self.error2.take() { Err(ConnectorError::Resolver(err)) } else if let Some(addrs) = self.addrs.take() { - Ok(Async::Ready((self.host.take().unwrap(), addrs))) + Ok(Async::Ready(( + self.req.take().unwrap(), + self.host.take().unwrap(), + addrs, + ))) } else { match self.lookup.as_mut().unwrap().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -204,7 +232,11 @@ impl Future for ResolveFut { "Expect at least one A dns record".to_owned(), )) } else { - Ok(Async::Ready((self.host.take().unwrap(), addrs))) + Ok(Async::Ready(( + self.req.take().unwrap(), + self.host.take().unwrap(), + addrs, + ))) } } Err(err) => Err(ConnectorError::Resolver(format!("{}", err))), @@ -214,17 +246,19 @@ impl Future for ResolveFut { } /// Tcp stream connector -pub struct TcpConnector { +pub struct TcpConnector { + req: Option, host: Option, addr: Option, addrs: VecDeque, stream: Option, } -impl TcpConnector { - pub fn new(host: String, addrs: VecDeque) -> TcpConnector { +impl TcpConnector { + pub fn new(req: T, host: String, addrs: VecDeque) -> TcpConnector { TcpConnector { addrs, + req: Some(req), host: Some(host), addr: None, stream: None, @@ -232,8 +266,8 @@ impl TcpConnector { } } -impl Future for TcpConnector { - type Item = (ConnectionInfo, TcpStream); +impl Future for TcpConnector { + type Item = (T, ConnectionInfo, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -243,6 +277,7 @@ impl Future for TcpConnector { match new.poll() { Ok(Async::Ready(sock)) => { return Ok(Async::Ready(( + self.req.take().unwrap(), ConnectionInfo { host: self.host.take().unwrap(), addr: self.addr.take().unwrap(), diff --git a/src/lib.rs b/src/lib.rs index 65b15ee67..b6ebaf5a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,7 @@ pub use tower_service::{NewService, Service}; pub(crate) mod accept; pub mod configurable; -mod connector; +pub mod connector; mod server; mod server_service; pub mod service; @@ -61,7 +61,6 @@ pub mod ssl; mod worker; pub use configurable::{IntoNewConfigurableService, NewConfigurableService}; -pub use connector::{ConnectionInfo, Connector, ConnectorError}; pub use server::Server; pub use service::{IntoNewService, IntoService, NewServiceExt, ServiceExt}; diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index 65a784b89..052cbf4bd 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -92,83 +92,94 @@ impl Service for OpensslAcceptorService { } /// Openssl connector factory -pub struct OpensslConnector { +pub struct OpensslConnector { connector: SslConnector, - io: PhantomData, + t: PhantomData, + io: PhantomData, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { OpensslConnector { connector, + t: PhantomData, io: PhantomData, } } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), + t: PhantomData, io: PhantomData, } } } -impl NewService for OpensslConnector { - type Request = (ConnectionInfo, T); - type Response = (ConnectionInfo, SslStream); +impl NewService for OpensslConnector { + type Request = (T, ConnectionInfo, Io); + type Response = (T, ConnectionInfo, SslStream); type Error = Error; - type Service = OpensslConnectorService; + type Service = OpensslConnectorService; type InitError = io::Error; type Future = FutureResult; fn new_service(&self) -> Self::Future { future::ok(OpensslConnectorService { connector: self.connector.clone(), + t: PhantomData, io: PhantomData, }) } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - io: PhantomData, + t: PhantomData, + io: PhantomData, } -impl Service for OpensslConnectorService { - type Request = (ConnectionInfo, T); - type Response = (ConnectionInfo, SslStream); +impl Service for OpensslConnectorService { + type Request = (T, ConnectionInfo, Io); + type Response = (T, ConnectionInfo, SslStream); type Error = Error; - type Future = ConnectAsyncExt; + type Future = ConnectAsyncExt; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, (info, stream): Self::Request) -> Self::Future { + fn call(&mut self, (req, info, stream): Self::Request) -> Self::Future { ConnectAsyncExt { fut: SslConnectorExt::connect_async(&self.connector, &info.host, stream), + req: Some(req), host: Some(info), } } } -pub struct ConnectAsyncExt { - fut: ConnectAsync, +pub struct ConnectAsyncExt { + fut: ConnectAsync, + req: Option, host: Option, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where - T: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite, { - type Item = (ConnectionInfo, SslStream); + type Item = (T, ConnectionInfo, SslStream); type Error = Error; fn poll(&mut self) -> Poll { match self.fut.poll()? { - Async::Ready(stream) => Ok(Async::Ready((self.host.take().unwrap(), stream))), + Async::Ready(stream) => Ok(Async::Ready(( + self.req.take().unwrap(), + self.host.take().unwrap(), + stream, + ))), Async::NotReady => Ok(Async::NotReady), } }