1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-30 03:50:42 +00:00

refactor Resolver service

This commit is contained in:
Nikolay Kim 2018-10-29 20:29:47 -07:00
parent 67961f8a36
commit dc19a9f862
14 changed files with 107 additions and 134 deletions

View file

@ -80,8 +80,7 @@ fn main() {
future::ok(()) future::ok(())
}) })
}, },
) ).unwrap()
.unwrap()
.start(); .start();
sys.run(); sys.run();

View file

@ -24,7 +24,8 @@ struct ServiceState {
} }
fn service<T: AsyncRead + AsyncWrite>( fn service<T: AsyncRead + AsyncWrite>(
st: &mut ServiceState, _: T, st: &mut ServiceState,
_: T,
) -> impl Future<Item = (), Error = ()> { ) -> impl Future<Item = (), Error = ()> {
let num = st.num.fetch_add(1, Ordering::Relaxed); let num = st.num.fetch_add(1, Ordering::Relaxed);
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
@ -60,8 +61,7 @@ fn main() {
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
future::ok(()) future::ok(())
}) })
}) }).unwrap()
.unwrap()
.start(); .start();
sys.run(); sys.run();

View file

@ -176,7 +176,8 @@ where
type SinkError = U::Error; type SinkError = U::Error;
fn start_send( fn start_send(
&mut self, item: Self::SinkItem, &mut self,
item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> { ) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.get_mut().start_send(item) self.inner.get_mut().start_send(item)
} }

View file

@ -191,7 +191,8 @@ where
type SinkError = E::Error; type SinkError = E::Error;
fn start_send( fn start_send(
&mut self, item: Self::SinkItem, &mut self,
item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> { ) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.get_mut().start_send(item) self.inner.get_mut().start_send(item)
} }

View file

@ -98,7 +98,8 @@ where
type SinkError = T::SinkError; type SinkError = T::SinkError;
fn start_send( fn start_send(
&mut self, item: Self::SinkItem, &mut self,
item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> { ) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.inner.0.start_send(item) self.inner.inner.0.start_send(item)
} }

View file

@ -197,8 +197,7 @@ where
io::ErrorKind::WriteZero, io::ErrorKind::WriteZero,
"failed to \ "failed to \
write frame to transport", write frame to transport",
) ).into());
.into());
} }
// TODO: Add a way to `bytes` to do this w/o returning the drained // TODO: Add a way to `bytes` to do this w/o returning the drained

View file

@ -11,7 +11,7 @@ use tokio_tcp::{ConnectFuture, TcpStream};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::system_conf::read_system_conf;
use super::resolver::{HostAware, Resolver, ResolverError, ResolverFuture}; use super::resolver::{HostAware, ResolveError, Resolver, ResolverFuture};
use super::service::{NewService, Service}; use super::service::{NewService, Service};
// #[derive(Fail, Debug)] // #[derive(Fail, Debug)]
@ -19,9 +19,9 @@ use super::service::{NewService, Service};
pub enum ConnectorError { pub enum ConnectorError {
/// Failed to resolve the hostname /// Failed to resolve the hostname
// #[fail(display = "Failed resolving hostname: {}", _0)] // #[fail(display = "Failed resolving hostname: {}", _0)]
Resolver(ResolverError), Resolver(ResolveError),
/// Not dns records /// No dns records
// #[fail(display = "Invalid input: {}", _0)] // #[fail(display = "Invalid input: {}", _0)]
NoRecords, NoRecords,
@ -29,17 +29,21 @@ pub enum ConnectorError {
// #[fail(display = "Timeout out while establishing connection")] // #[fail(display = "Timeout out while establishing connection")]
Timeout, Timeout,
/// Invalid input
InvalidInput,
/// Connection io error /// Connection io error
// #[fail(display = "{}", _0)] // #[fail(display = "{}", _0)]
IoError(io::Error), IoError(io::Error),
} }
impl From<ResolverError> for ConnectorError { impl From<ResolveError> for ConnectorError {
fn from(err: ResolverError) -> Self { fn from(err: ResolveError) -> Self {
ConnectorError::Resolver(err) ConnectorError::Resolver(err)
} }
} }
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)] #[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect { pub struct Connect {
pub host: String, pub host: String,
@ -48,6 +52,7 @@ pub struct Connect {
} }
impl Connect { impl Connect {
/// Create new `Connect` instance.
pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect { pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect {
Connect { Connect {
port, port,
@ -56,20 +61,19 @@ impl Connect {
} }
} }
/// split the string by ':' and convert the second part to u16 /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
pub fn parse<T: AsRef<str>>(host: T) -> Option<Connect> { pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectorError> {
let mut parts_iter = host.as_ref().splitn(2, ':'); let mut parts_iter = host.as_ref().splitn(2, ':');
if let Some(host) = parts_iter.next() { let host = parts_iter.next().ok_or(ConnectorError::InvalidInput)?;
let port_str = parts_iter.next().unwrap_or(""); let port_str = parts_iter.next().unwrap_or("");
if let Ok(port) = port_str.parse::<u16>() { let port = port_str
return Some(Connect { .parse::<u16>()
port, .map_err(|_| ConnectorError::InvalidInput)?;
host: host.to_owned(), Ok(Connect {
timeout: Duration::from_secs(1), port,
}); host: host.to_owned(),
} timeout: Duration::from_secs(1),
} })
None
} }
/// Set connect timeout /// Set connect timeout
@ -93,6 +97,7 @@ impl fmt::Display for Connect {
} }
} }
/// Tcp connector
pub struct Connector { pub struct Connector {
resolver: Resolver<Connect>, resolver: Resolver<Connect>,
} }
@ -110,12 +115,14 @@ impl Default for Connector {
} }
impl Connector { impl Connector {
/// Create new connector with resolver configuration
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
Connector { Connector {
resolver: Resolver::new(cfg, opts), resolver: Resolver::new(cfg, opts),
} }
} }
/// Create new connector with custom resolver
pub fn with_resolver( pub fn with_resolver(
resolver: Resolver<Connect>, resolver: Resolver<Connect>,
) -> impl Service<Request = Connect, Response = (Connect, TcpStream), Error = ConnectorError> ) -> impl Service<Request = Connect, Response = (Connect, TcpStream), Error = ConnectorError>
@ -123,17 +130,10 @@ impl Connector {
Connector { resolver } Connector { resolver }
} }
pub fn new_service<E>() -> impl NewService< /// Create new default connector service
Request = Connect,
Response = (Connect, TcpStream),
Error = ConnectorError,
InitError = E,
> + Clone {
|| -> FutureResult<Connector, E> { ok(Connector::default()) }
}
pub fn new_service_with_config<E>( pub fn new_service_with_config<E>(
cfg: ResolverConfig, opts: ResolverOpts, cfg: ResolverConfig,
opts: ResolverOpts,
) -> impl NewService< ) -> impl NewService<
Request = Connect, Request = Connect,
Response = (Connect, TcpStream), Response = (Connect, TcpStream),
@ -185,19 +185,14 @@ impl Future for ConnectorFuture {
return fut.poll(); return fut.poll();
} }
match self.fut.poll().map_err(ConnectorError::from)? { match self.fut.poll().map_err(ConnectorError::from)? {
Async::Ready((req, _, mut addrs)) => { Async::Ready((req, mut addrs)) => {
if addrs.is_empty() { if addrs.is_empty() {
Err(ConnectorError::NoRecords) Err(ConnectorError::NoRecords)
} else { } else {
for addr in &mut addrs { for addr in &mut addrs {
match addr { match addr {
SocketAddr::V4(ref mut addr) if addr.port() == 0 => { SocketAddr::V4(ref mut addr) => addr.set_port(req.port),
addr.set_port(req.port) SocketAddr::V6(ref mut addr) => addr.set_port(req.port),
}
SocketAddr::V6(ref mut addr) if addr.port() == 0 => {
addr.set_port(req.port)
}
_ => (),
} }
} }
self.fut2 = Some(TcpConnector::new(req, addrs)); self.fut2 = Some(TcpConnector::new(req, addrs));

View file

@ -9,7 +9,10 @@
#![cfg_attr( #![cfg_attr(
feature = "cargo-clippy", feature = "cargo-clippy",
allow(declare_interior_mutable_const, borrow_interior_mutable_const) allow(
declare_interior_mutable_const,
borrow_interior_mutable_const
)
)] )]
#[macro_use] #[macro_use]

View file

@ -6,19 +6,13 @@ use futures::{Async, Future, Poll};
use tokio_current_thread::spawn; use tokio_current_thread::spawn;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::error::ResolveError; pub use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::lookup_ip::LookupIpFuture;
use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::system_conf::read_system_conf;
use trust_dns_resolver::{AsyncResolver, Background}; use trust_dns_resolver::{AsyncResolver, Background};
use super::service::Service; use super::service::Service;
#[derive(Debug)]
pub enum ResolverError {
Resolve(ResolveError),
InvalidInput,
}
pub trait HostAware { pub trait HostAware {
fn host(&self) -> &str; fn host(&self) -> &str;
} }
@ -75,8 +69,8 @@ impl<T> Clone for Resolver<T> {
impl<T: HostAware> Service for Resolver<T> { impl<T: HostAware> Service for Resolver<T> {
type Request = T; type Request = T;
type Response = (T, String, VecDeque<SocketAddr>); type Response = (T, VecDeque<SocketAddr>);
type Error = ResolverError; type Error = ResolveError;
type Future = ResolverFuture<T>; type Future = ResolverFuture<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
@ -84,7 +78,7 @@ impl<T: HostAware> Service for Resolver<T> {
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
ResolverFuture::new(req, 0, &self.resolver) ResolverFuture::new(req, &self.resolver)
} }
} }
@ -92,78 +86,38 @@ impl<T: HostAware> Service for Resolver<T> {
/// Resolver future /// Resolver future
pub struct ResolverFuture<T> { pub struct ResolverFuture<T> {
req: Option<T>, req: Option<T>,
port: u16,
lookup: Option<Background<LookupIpFuture>>, lookup: Option<Background<LookupIpFuture>>,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
error: Option<ResolverError>,
host: Option<String>,
} }
impl<T: HostAware> ResolverFuture<T> { impl<T: HostAware> ResolverFuture<T> {
pub fn new(addr: T, port: u16, resolver: &AsyncResolver) -> Self { pub fn new(addr: T, resolver: &AsyncResolver) -> Self {
// we need to do dns resolution // we need to do dns resolution
match ResolverFuture::<T>::parse(addr.host(), port) { let lookup = Some(resolver.lookup_ip(addr.host()));
Ok((host, port)) => { ResolverFuture {
let lookup = Some(resolver.lookup_ip(host.as_str())); lookup,
ResolverFuture { req: Some(addr),
port, addrs: None,
lookup,
req: Some(addr),
host: Some(host.to_owned()),
addrs: None,
error: None,
}
}
Err(err) => ResolverFuture {
port,
req: None,
host: None,
lookup: None,
addrs: None,
error: Some(err),
},
} }
} }
fn parse(addr: &str, port: u16) -> Result<(String, u16), ResolverError> {
// split the string by ':' and convert the second part to u16
let mut parts_iter = addr.splitn(2, ':');
let host = parts_iter.next().ok_or(ResolverError::InvalidInput)?;
let port_str = parts_iter.next().unwrap_or("");
let port: u16 = port_str.parse().unwrap_or(port);
Ok((host.to_owned(), port))
}
} }
impl<T: HostAware> Future for ResolverFuture<T> { impl<T: HostAware> Future for ResolverFuture<T> {
type Item = (T, String, VecDeque<SocketAddr>); type Item = (T, VecDeque<SocketAddr>);
type Error = ResolverError; type Error = ResolveError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() { if let Some(addrs) = self.addrs.take() {
Err(err) Ok(Async::Ready((self.req.take().unwrap(), addrs)))
} else if let Some(addrs) = self.addrs.take() {
Ok(Async::Ready((
self.req.take().unwrap(),
self.host.take().unwrap(),
addrs,
)))
} else { } else {
match self.lookup.as_mut().unwrap().poll() { match self.lookup.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(ips)) => { Ok(Async::Ready(ips)) => {
let addrs: VecDeque<_> = ips let addrs: VecDeque<_> =
.iter() ips.iter().map(|ip| SocketAddr::new(ip, 0)).collect();
.map(|ip| SocketAddr::new(ip, self.port)) Ok(Async::Ready((self.req.take().unwrap(), addrs)))
.collect();
Ok(Async::Ready((
self.req.take().unwrap(),
self.host.take().unwrap(),
addrs,
)))
} }
Err(err) => Err(ResolverError::Resolve(err)), Err(err) => Err(err),
} }
} }
} }

View file

@ -87,7 +87,9 @@ impl AcceptLoop {
} }
pub(crate) fn start( pub(crate) fn start(
&mut self, socks: Vec<(Token, net::TcpListener)>, workers: Vec<WorkerClient>, &mut self,
socks: Vec<(Token, net::TcpListener)>,
workers: Vec<WorkerClient>,
) -> mpsc::UnboundedReceiver<ServerCommand> { ) -> mpsc::UnboundedReceiver<ServerCommand> {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
@ -135,9 +137,12 @@ fn connection_error(e: &io::Error) -> bool {
impl Accept { impl Accept {
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] #![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub(crate) fn start( pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration, rx: sync_mpsc::Receiver<Command>,
notify_reg: mio::Registration, socks: Vec<(Token, net::TcpListener)>, cmd_reg: mio::Registration,
srv: mpsc::UnboundedSender<ServerCommand>, workers: Vec<WorkerClient>, notify_reg: mio::Registration,
socks: Vec<(Token, net::TcpListener)>,
srv: mpsc::UnboundedSender<ServerCommand>,
workers: Vec<WorkerClient>,
) { ) {
let sys = System::current(); let sys = System::current();
@ -173,8 +178,10 @@ impl Accept {
} }
fn new( fn new(
rx: sync_mpsc::Receiver<Command>, socks: Vec<(Token, net::TcpListener)>, rx: sync_mpsc::Receiver<Command>,
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>, socks: Vec<(Token, net::TcpListener)>,
workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept { ) -> Accept {
// Create a poll instance // Create a poll instance
let poll = match mio::Poll::new() { let poll = match mio::Poll::new() {

View file

@ -137,7 +137,10 @@ impl Server {
/// Add new service to server /// Add new service to server
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, N: AsRef<str>>(
mut self, name: N, lst: net::TcpListener, factory: F, mut self,
name: N,
lst: net::TcpListener,
factory: F,
) -> Self ) -> Self
where where
F: StreamServiceFactory, F: StreamServiceFactory,
@ -151,7 +154,10 @@ impl Server {
/// Add new service to server /// Add new service to server
pub fn listen2<F, N: AsRef<str>>( pub fn listen2<F, N: AsRef<str>>(
mut self, name: N, lst: net::TcpListener, factory: F, mut self,
name: N,
lst: net::TcpListener,
factory: F,
) -> Self ) -> Self
where where
F: ServiceFactory, F: ServiceFactory,

View file

@ -61,7 +61,9 @@ pub(crate) struct WorkerClient {
impl WorkerClient { impl WorkerClient {
pub fn new( pub fn new(
idx: usize, tx: UnboundedSender<WorkerCommand>, avail: WorkerAvailability, idx: usize,
tx: UnboundedSender<WorkerCommand>,
avail: WorkerAvailability,
) -> Self { ) -> Self {
WorkerClient { idx, tx, avail } WorkerClient { idx, tx, avail }
} }
@ -128,8 +130,10 @@ pub(crate) struct Worker {
impl Worker { impl Worker {
pub(crate) fn start( pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>, factories: Vec<Box<InternalServiceFactory>>, rx: UnboundedReceiver<WorkerCommand>,
availability: WorkerAvailability, shutdown_timeout: time::Duration, factories: Vec<Box<InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
) { ) {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
@ -151,8 +155,7 @@ impl Worker {
.map_err(|e| { .map_err(|e| {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0)); Arbiter::current().do_send(StopArbiter(0));
}) }).and_then(move |services| {
.and_then(move |services| {
wrk.services.extend(services); wrk.services.extend(services);
wrk wrk
}), }),

View file

@ -27,7 +27,9 @@ pub trait ServiceExt: Service {
/// Apply function to specified service and use it as a next service in /// Apply function to specified service and use it as a next service in
/// chain. /// chain.
fn apply<S, I, F, R>( fn apply<S, I, F, R>(
self, service: I, f: F, self,
service: I,
f: F,
) -> AndThen<Self, Apply<S, F, R, Self::Response>> ) -> AndThen<Self, Apply<S, F, R, Self::Response>>
where where
Self: Sized, Self: Sized,
@ -120,7 +122,9 @@ pub trait ServiceExt: Service {
pub trait NewServiceExt: NewService { pub trait NewServiceExt: NewService {
fn apply<S, I, F, R>( fn apply<S, I, F, R>(
self, service: I, f: F, self,
service: I,
f: F,
) -> AndThenNewService<Self, ApplyNewService<S, F, R, Self::Response>> ) -> AndThenNewService<Self, ApplyNewService<S, F, R, Self::Response>>
where where
Self: Sized, Self: Sized,

View file

@ -1,4 +1,4 @@
//! Tower middleware that applies a timeout to requests. //! Service that applies a timeout to requests.
//! //!
//! If the response does not complete within the specified timeout, the response //! If the response does not complete within the specified timeout, the response
//! will be aborted. //! will be aborted.
@ -34,13 +34,6 @@ impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
} }
} }
/// `Timeout` response future
#[derive(Debug)]
pub struct TimeoutFut<T: NewService> {
fut: T::Future,
timeout: Duration,
}
impl<T> Timeout<T> impl<T> Timeout<T>
where where
T: NewService + Clone, T: NewService + Clone,
@ -69,6 +62,13 @@ where
} }
} }
/// `Timeout` response future
#[derive(Debug)]
pub struct TimeoutFut<T: NewService> {
fut: T::Future,
timeout: Duration,
}
impl<T> Future for TimeoutFut<T> impl<T> Future for TimeoutFut<T>
where where
T: NewService, T: NewService,