From 9b9599500a534e7afb414edf041f6741965edb0b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Nov 2018 21:12:30 -0800 Subject: [PATCH] refactor connector and resolver services --- src/connector.rs | 90 ++++++++++++++++++++++++++++++++++-------------- src/resolver.rs | 36 ++++++++++--------- 2 files changed, 83 insertions(+), 43 deletions(-) diff --git a/src/connector.rs b/src/connector.rs index 901b97ba0..8c8d85a8d 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; -use std::net::SocketAddr; +use std::marker::PhantomData; +use std::net::{IpAddr, SocketAddr}; use std::time::Duration; use std::{fmt, io}; @@ -11,9 +12,14 @@ use tokio_tcp::{ConnectFuture, TcpStream}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::system_conf::read_system_conf; -use super::resolver::{HostAware, ResolveError, Resolver, ResolverFuture}; +use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture}; use super::service::{NewService, Service}; +/// Port of the request +pub trait RequestPort { + fn port(&self) -> u16; +} + // #[derive(Fail, Debug)] #[derive(Debug)] pub enum ConnectorError { @@ -22,7 +28,7 @@ pub enum ConnectorError { Resolver(ResolveError), /// No dns records - // #[fail(display = "Invalid input: {}", _0)] + // #[fail(display = "No dns records found for the input")] NoRecords, /// Connecting took too long @@ -43,6 +49,12 @@ impl From for ConnectorError { } } +impl From for ConnectorError { + fn from(err: io::Error) -> Self { + ConnectorError::IoError(err) + } +} + /// Connect request #[derive(Eq, PartialEq, Debug, Hash)] pub struct Connect { @@ -85,12 +97,18 @@ impl Connect { } } -impl HostAware for Connect { +impl RequestHost for Connect { fn host(&self) -> &str { &self.host } } +impl RequestPort for Connect { + fn port(&self) -> u16 { + self.port + } +} + impl fmt::Display for Connect { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}:{}", self.host, self.port) @@ -173,7 +191,7 @@ impl Service for Connector { #[doc(hidden)] pub struct ConnectorFuture { fut: ResolverFuture, - fut2: Option, + fut2: Option>, } impl Future for ConnectorFuture { @@ -182,20 +200,14 @@ impl Future for ConnectorFuture { fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut2 { - return fut.poll(); + return fut.poll().map_err(ConnectorError::from); } match self.fut.poll().map_err(ConnectorError::from)? { - Async::Ready((req, mut addrs)) => { + Async::Ready((req, addrs)) => { if addrs.is_empty() { Err(ConnectorError::NoRecords) } else { - for addr in &mut addrs { - match addr { - SocketAddr::V4(ref mut addr) => addr.set_port(req.port), - SocketAddr::V6(ref mut addr) => addr.set_port(req.port), - } - } - self.fut2 = Some(TcpConnector::new(req, addrs)); + self.fut2 = Some(TcpConnectorResponse::new(req, addrs)); self.poll() } } @@ -204,19 +216,45 @@ impl Future for ConnectorFuture { } } +/// Tcp stream connector service +pub struct TcpConnector(PhantomData); + +impl Default for TcpConnector { + fn default() -> TcpConnector { + TcpConnector(PhantomData) + } +} + +impl Service for TcpConnector { + type Request = (T, VecDeque); + type Response = (T, TcpStream); + type Error = io::Error; + type Future = TcpConnectorResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, (req, addrs): Self::Request) -> Self::Future { + TcpConnectorResponse::new(req, addrs) + } +} + #[doc(hidden)] -/// Tcp stream connector -pub struct TcpConnector { - req: Option, +/// Tcp stream connector response future +pub struct TcpConnectorResponse { + port: u16, + req: Option, addr: Option, - addrs: VecDeque, + addrs: VecDeque, stream: Option, } -impl TcpConnector { - pub fn new(req: Connect, addrs: VecDeque) -> TcpConnector { - TcpConnector { +impl TcpConnectorResponse { + pub fn new(req: T, addrs: VecDeque) -> TcpConnectorResponse { + TcpConnectorResponse { addrs, + port: req.port(), req: Some(req), addr: None, stream: None, @@ -224,9 +262,9 @@ impl TcpConnector { } } -impl Future for TcpConnector { - type Item = (Connect, TcpStream); - type Error = ConnectorError; +impl Future for TcpConnectorResponse { + type Item = (T, TcpStream); + type Error = io::Error; fn poll(&mut self) -> Poll { // connect @@ -239,14 +277,14 @@ impl Future for TcpConnector { Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { if self.addrs.is_empty() { - return Err(ConnectorError::IoError(err)); + return Err(err); } } } } // try to connect - let addr = self.addrs.pop_front().unwrap(); + let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port); self.stream = Some(TcpStream::connect(&addr)); self.addr = Some(addr) } diff --git a/src/resolver.rs b/src/resolver.rs index a4b8141b0..28196fee0 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; use std::marker::PhantomData; -use std::net::SocketAddr; +use std::net::IpAddr; use futures::{Async, Future, Poll}; @@ -13,11 +13,12 @@ use trust_dns_resolver::{AsyncResolver, Background}; use super::service::Service; -pub trait HostAware { +/// Host name of the request +pub trait RequestHost { fn host(&self) -> &str; } -impl HostAware for String { +impl RequestHost for String { fn host(&self) -> &str { self.as_ref() } @@ -28,7 +29,7 @@ pub struct Resolver { req: PhantomData, } -impl Default for Resolver { +impl Default for Resolver { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -40,7 +41,8 @@ impl Default for Resolver { } } -impl Resolver { +impl Resolver { + /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); spawn(bg); @@ -50,7 +52,8 @@ impl Resolver { } } - pub fn change_request(&self) -> Resolver { + /// Change type of resolver request. + pub fn into_request(&self) -> Resolver { Resolver { resolver: self.resolver.clone(), req: PhantomData, @@ -67,9 +70,9 @@ impl Clone for Resolver { } } -impl Service for Resolver { +impl Service for Resolver { type Request = T; - type Response = (T, VecDeque); + type Response = (T, VecDeque); type Error = ResolveError; type Future = ResolverFuture; @@ -87,10 +90,10 @@ impl Service for Resolver { pub struct ResolverFuture { req: Option, lookup: Option>, - addrs: Option>, + addrs: Option>, } -impl ResolverFuture { +impl ResolverFuture { pub fn new(addr: T, resolver: &AsyncResolver) -> Self { // we need to do dns resolution let lookup = Some(resolver.lookup_ip(addr.host())); @@ -102,8 +105,8 @@ impl ResolverFuture { } } -impl Future for ResolverFuture { - type Item = (T, VecDeque); +impl Future for ResolverFuture { + type Item = (T, VecDeque); type Error = ResolveError; fn poll(&mut self) -> Poll { @@ -112,11 +115,10 @@ impl Future for ResolverFuture { } else { match self.lookup.as_mut().unwrap().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(ips)) => { - let addrs: VecDeque<_> = - ips.iter().map(|ip| SocketAddr::new(ip, 0)).collect(); - Ok(Async::Ready((self.req.take().unwrap(), addrs))) - } + Ok(Async::Ready(ips)) => Ok(Async::Ready(( + self.req.take().unwrap(), + ips.iter().collect(), + ))), Err(err) => Err(err), } }