From 2698534b057a01c2f778f41ce615594d6d094cd6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 10 Sep 2018 19:16:46 -0700 Subject: [PATCH] add resolver service --- src/lib.rs | 1 + src/resolver.rs | 151 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 src/resolver.rs diff --git a/src/lib.rs b/src/lib.rs index f6a061a63..2db03f08f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,6 +64,7 @@ pub use tower_service::{NewService, Service}; pub(crate) mod accept; pub mod connector; +pub mod resolver; pub mod server; mod server_service; pub mod service; diff --git a/src/resolver.rs b/src/resolver.rs new file mode 100644 index 000000000..9b58e48d4 --- /dev/null +++ b/src/resolver.rs @@ -0,0 +1,151 @@ +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::net::SocketAddr; + +use futures::{Async, Future, Poll}; + +use tokio_current_thread::spawn; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::error::ResolveError; +use trust_dns_resolver::lookup_ip::LookupIpFuture; +use trust_dns_resolver::system_conf::read_system_conf; +use trust_dns_resolver::{AsyncResolver, Background}; + +use super::Service; + +pub enum ResolverError { + Resolve(ResolveError), + InvalidInput, +} + +pub trait HostAware { + fn host(&self) -> &str; +} + +impl HostAware for String { + fn host(&self) -> &str { + self.as_ref() + } +} + +pub struct Resolver { + resolver: AsyncResolver, + req: PhantomData, +} + +impl Default for Resolver { + fn default() -> Self { + let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { + (cfg, opts) + } else { + (ResolverConfig::default(), ResolverOpts::default()) + }; + + Resolver::new(cfg, opts) + } +} + +impl Resolver { + pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { + let (resolver, bg) = AsyncResolver::new(cfg, opts); + spawn(bg); + Resolver { + resolver, + req: PhantomData, + } + } +} + +impl Clone for Resolver { + fn clone(&self) -> Self { + Resolver { + resolver: self.resolver.clone(), + req: PhantomData, + } + } +} + +impl Service for Resolver { + type Request = T; + type Response = VecDeque; + type Error = ResolverError; + type Future = ResolverFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ResolverFuture::new(req, 0, &self.resolver) + } +} + +#[doc(hidden)] +/// Resolver future +pub struct ResolverFuture { + port: u16, + lookup: Option>, + addrs: Option>, + error: Option, + req: PhantomData, +} + +impl ResolverFuture { + pub fn new(addr: T, port: u16, resolver: &AsyncResolver) -> Self { + // we need to do dns resolution + match ResolverFuture::::parse(addr.host(), port) { + Ok((host, port)) => { + let lookup = Some(resolver.lookup_ip(host.as_str())); + ResolverFuture { + port, + lookup, + addrs: None, + error: None, + req: PhantomData, + } + } + Err(err) => ResolverFuture { + port, + lookup: None, + addrs: None, + error: Some(err), + req: PhantomData, + }, + } + } + + fn parse(addr: &str, port: u16) -> Result<(String, u16), ResolverError> { + // split the string by ':' and convert the second part to u16 + let mut parts_iter = addr.splitn(2, ':'); + let host = parts_iter.next().ok_or(ResolverError::InvalidInput)?; + let port_str = parts_iter.next().unwrap_or(""); + let port: u16 = port_str.parse().unwrap_or(port); + + Ok((host.to_owned(), port)) + } +} + +impl Future for ResolverFuture { + type Item = VecDeque; + type Error = ResolverError; + + fn poll(&mut self) -> Poll { + if let Some(err) = self.error.take() { + Err(err) + } else if let Some(addrs) = self.addrs.take() { + Ok(Async::Ready(addrs)) + } else { + match self.lookup.as_mut().unwrap().poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(ips)) => { + let addrs: VecDeque<_> = ips + .iter() + .map(|ip| SocketAddr::new(ip, self.port)) + .collect(); + Ok(Async::Ready(addrs)) + } + Err(err) => Err(ResolverError::Resolve(err)), + } + } + } +}