1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-20 23:26:44 +00:00

refactor connector and resolver services

This commit is contained in:
Nikolay Kim 2018-11-11 21:12:30 -08:00
parent a4b81a256c
commit 9b9599500a
2 changed files with 83 additions and 43 deletions

View file

@ -1,5 +1,6 @@
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::marker::PhantomData;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use std::{fmt, io};
@ -11,9 +12,14 @@ use tokio_tcp::{ConnectFuture, TcpStream};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::system_conf::read_system_conf;
use super::resolver::{HostAware, ResolveError, Resolver, ResolverFuture};
use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture};
use super::service::{NewService, Service};
/// Port of the request
pub trait RequestPort {
fn port(&self) -> u16;
}
// #[derive(Fail, Debug)]
#[derive(Debug)]
pub enum ConnectorError {
@ -22,7 +28,7 @@ pub enum ConnectorError {
Resolver(ResolveError),
/// No dns records
// #[fail(display = "Invalid input: {}", _0)]
// #[fail(display = "No dns records found for the input")]
NoRecords,
/// Connecting took too long
@ -43,6 +49,12 @@ impl From<ResolveError> for ConnectorError {
}
}
impl From<io::Error> for ConnectorError {
fn from(err: io::Error) -> Self {
ConnectorError::IoError(err)
}
}
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect {
@ -85,12 +97,18 @@ impl Connect {
}
}
impl HostAware for Connect {
impl RequestHost for Connect {
fn host(&self) -> &str {
&self.host
}
}
impl RequestPort for Connect {
fn port(&self) -> u16 {
self.port
}
}
impl fmt::Display for Connect {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{}", self.host, self.port)
@ -173,7 +191,7 @@ impl Service for Connector {
#[doc(hidden)]
pub struct ConnectorFuture {
fut: ResolverFuture<Connect>,
fut2: Option<TcpConnector>,
fut2: Option<TcpConnectorResponse<Connect>>,
}
impl Future for ConnectorFuture {
@ -182,20 +200,14 @@ impl Future for ConnectorFuture {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut2 {
return fut.poll();
return fut.poll().map_err(ConnectorError::from);
}
match self.fut.poll().map_err(ConnectorError::from)? {
Async::Ready((req, mut addrs)) => {
Async::Ready((req, addrs)) => {
if addrs.is_empty() {
Err(ConnectorError::NoRecords)
} else {
for addr in &mut addrs {
match addr {
SocketAddr::V4(ref mut addr) => addr.set_port(req.port),
SocketAddr::V6(ref mut addr) => addr.set_port(req.port),
}
}
self.fut2 = Some(TcpConnector::new(req, addrs));
self.fut2 = Some(TcpConnectorResponse::new(req, addrs));
self.poll()
}
}
@ -204,19 +216,45 @@ impl Future for ConnectorFuture {
}
}
/// Tcp stream connector service
pub struct TcpConnector<T: RequestPort>(PhantomData<T>);
impl<T: RequestPort> Default for TcpConnector<T> {
fn default() -> TcpConnector<T> {
TcpConnector(PhantomData)
}
}
impl<T: RequestPort> Service for TcpConnector<T> {
type Request = (T, VecDeque<IpAddr>);
type Response = (T, TcpStream);
type Error = io::Error;
type Future = TcpConnectorResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, (req, addrs): Self::Request) -> Self::Future {
TcpConnectorResponse::new(req, addrs)
}
}
#[doc(hidden)]
/// Tcp stream connector
pub struct TcpConnector {
req: Option<Connect>,
/// Tcp stream connector response future
pub struct TcpConnectorResponse<T: RequestPort> {
port: u16,
req: Option<T>,
addr: Option<SocketAddr>,
addrs: VecDeque<SocketAddr>,
addrs: VecDeque<IpAddr>,
stream: Option<ConnectFuture>,
}
impl TcpConnector {
pub fn new(req: Connect, addrs: VecDeque<SocketAddr>) -> TcpConnector {
TcpConnector {
impl<T: RequestPort> TcpConnectorResponse<T> {
pub fn new(req: T, addrs: VecDeque<IpAddr>) -> TcpConnectorResponse<T> {
TcpConnectorResponse {
addrs,
port: req.port(),
req: Some(req),
addr: None,
stream: None,
@ -224,9 +262,9 @@ impl TcpConnector {
}
}
impl Future for TcpConnector {
type Item = (Connect, TcpStream);
type Error = ConnectorError;
impl<T: RequestPort> Future for TcpConnectorResponse<T> {
type Item = (T, TcpStream);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// connect
@ -239,14 +277,14 @@ impl Future for TcpConnector {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
if self.addrs.is_empty() {
return Err(ConnectorError::IoError(err));
return Err(err);
}
}
}
}
// try to connect
let addr = self.addrs.pop_front().unwrap();
let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port);
self.stream = Some(TcpStream::connect(&addr));
self.addr = Some(addr)
}

View file

@ -1,6 +1,6 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::net::IpAddr;
use futures::{Async, Future, Poll};
@ -13,11 +13,12 @@ use trust_dns_resolver::{AsyncResolver, Background};
use super::service::Service;
pub trait HostAware {
/// Host name of the request
pub trait RequestHost {
fn host(&self) -> &str;
}
impl HostAware for String {
impl RequestHost for String {
fn host(&self) -> &str {
self.as_ref()
}
@ -28,7 +29,7 @@ pub struct Resolver<T = String> {
req: PhantomData<T>,
}
impl<T: HostAware> Default for Resolver<T> {
impl<T: RequestHost> Default for Resolver<T> {
fn default() -> Self {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts)
@ -40,7 +41,8 @@ impl<T: HostAware> Default for Resolver<T> {
}
}
impl<T: HostAware> Resolver<T> {
impl<T: RequestHost> Resolver<T> {
/// Create new resolver instance with custom configuration and options.
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
let (resolver, bg) = AsyncResolver::new(cfg, opts);
spawn(bg);
@ -50,7 +52,8 @@ impl<T: HostAware> Resolver<T> {
}
}
pub fn change_request<T2: HostAware>(&self) -> Resolver<T2> {
/// Change type of resolver request.
pub fn into_request<T2: RequestHost>(&self) -> Resolver<T2> {
Resolver {
resolver: self.resolver.clone(),
req: PhantomData,
@ -67,9 +70,9 @@ impl<T> Clone for Resolver<T> {
}
}
impl<T: HostAware> Service for Resolver<T> {
impl<T: RequestHost> Service for Resolver<T> {
type Request = T;
type Response = (T, VecDeque<SocketAddr>);
type Response = (T, VecDeque<IpAddr>);
type Error = ResolveError;
type Future = ResolverFuture<T>;
@ -87,10 +90,10 @@ impl<T: HostAware> Service for Resolver<T> {
pub struct ResolverFuture<T> {
req: Option<T>,
lookup: Option<Background<LookupIpFuture>>,
addrs: Option<VecDeque<SocketAddr>>,
addrs: Option<VecDeque<IpAddr>>,
}
impl<T: HostAware> ResolverFuture<T> {
impl<T: RequestHost> ResolverFuture<T> {
pub fn new(addr: T, resolver: &AsyncResolver) -> Self {
// we need to do dns resolution
let lookup = Some(resolver.lookup_ip(addr.host()));
@ -102,8 +105,8 @@ impl<T: HostAware> ResolverFuture<T> {
}
}
impl<T: HostAware> Future for ResolverFuture<T> {
type Item = (T, VecDeque<SocketAddr>);
impl<T: RequestHost> Future for ResolverFuture<T> {
type Item = (T, VecDeque<IpAddr>);
type Error = ResolveError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -112,11 +115,10 @@ impl<T: HostAware> Future for ResolverFuture<T> {
} else {
match self.lookup.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(ips)) => {
let addrs: VecDeque<_> =
ips.iter().map(|ip| SocketAddr::new(ip, 0)).collect();
Ok(Async::Ready((self.req.take().unwrap(), addrs)))
}
Ok(Async::Ready(ips)) => Ok(Async::Ready((
self.req.take().unwrap(),
ips.iter().collect(),
))),
Err(err) => Err(err),
}
}