diff --git a/.travis.yml b/.travis.yml index 8810ee72c..f27a445ad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -87,7 +87,7 @@ after_success: fi - | - if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then + if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh) USE_SKEPTIC=1 cargo tarpaulin --out Xml bash <(curl -s https://codecov.io/bash) diff --git a/src/client/connector.rs b/src/client/connector.rs index 8f2828935..4f14a9e27 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,4 +1,4 @@ -use std::{fmt, io, time}; +use std::{fmt, mem, io, time}; use std::cell::RefCell; use std::rc::Rc; use std::net::Shutdown; @@ -6,28 +6,26 @@ use std::time::{Duration, Instant}; use std::collections::{HashMap, VecDeque}; use actix::{fut, Actor, ActorFuture, Context, AsyncContext, - Handler, Message, ActorResponse, Supervised}; + Handler, Message, ActorResponse, Supervised, ContextFutureSpawner}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use http::{Uri, HttpTryFrom, Error as HttpError}; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll}; +use futures::task::{Task, current as current_task}; +use futures::unsync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature="alpn")] use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError}; #[cfg(feature="alpn")] use tokio_openssl::SslConnectorExt; -#[cfg(feature="alpn")] -use futures::Future; #[cfg(all(feature="tls", not(feature="alpn")))] use native_tls::{TlsConnector, Error as TlsError}; #[cfg(all(feature="tls", not(feature="alpn")))] use tokio_tls::TlsConnectorExt; -#[cfg(all(feature="tls", not(feature="alpn")))] -use futures::Future; use {HAS_OPENSSL, HAS_TLS}; use server::IoStream; @@ -102,19 +100,37 @@ impl From for ClientConnectorError { } } +struct Waiter { + tx: oneshot::Sender>, + conn_timeout: Duration, +} + +/// `ClientConnector` type is responsible for transport layer of a client connection +/// of a client connection. pub struct ClientConnector { #[cfg(all(feature="alpn"))] connector: SslConnector, #[cfg(all(feature="tls", not(feature="alpn")))] connector: TlsConnector, + pool: Rc, + conn_lifetime: Duration, + conn_keep_alive: Duration, + limit: usize, + limit_per_host: usize, + acquired: usize, + acquired_per_host: HashMap, + available: HashMap>, + to_close: Vec, + waiters: HashMap>, } impl Actor for ClientConnector { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - self.collect(ctx); + self.collect_periodic(ctx); + ctx.spawn(Maintenance); } } @@ -127,22 +143,38 @@ impl Default for ClientConnector { #[cfg(all(feature="alpn"))] { let builder = SslConnector::builder(SslMethod::tls()).unwrap(); - ClientConnector { - connector: builder.build(), - pool: Rc::new(Pool::new()), - } + ClientConnector::with_connector(builder.build()) } #[cfg(all(feature="tls", not(feature="alpn")))] { let builder = TlsConnector::builder().unwrap(); ClientConnector { - connector: builder.build().unwrap(), pool: Rc::new(Pool::new()), + connector: builder.build().unwrap(), + conn_lifetime: Duration::from_secs(15), + conn_keep_alive: Duration::from_secs(75), + limit: 100, + limit_per_host: 0, + acquired: 0, + acquired_per_host: HashMap::new(), + available: HashMap::new(), + to_close: Vec::new(), + waiters: HashMap::new(), } } #[cfg(not(any(feature="alpn", feature="tls")))] - ClientConnector {pool: Rc::new(Pool::new())} + ClientConnector {pool: Rc::new(Pool::new()), + conn_lifetime: Duration::from_secs(15), + conn_keep_alive: Duration::from_secs(75), + limit: 100, + limit_per_host: 0, + acquired: 0, + acquired_per_host: HashMap::new(), + available: HashMap::new(), + to_close: Vec::new(), + waiters: HashMap::new(), + } } } @@ -192,12 +224,200 @@ impl ClientConnector { /// } /// ``` pub fn with_connector(connector: SslConnector) -> ClientConnector { - ClientConnector { connector, pool: Rc::new(Pool::new()) } + ClientConnector { + connector, + pool: Rc::new(Pool::new()), + conn_lifetime: Duration::from_secs(15), + conn_keep_alive: Duration::from_secs(75), + limit: 100, + limit_per_host: 0, + acquired: 0, + acquired_per_host: HashMap::new(), + available: HashMap::new(), + to_close: Vec::new(), + waiters: HashMap::new(), + } } - fn collect(&mut self, ctx: &mut Context) { - self.pool.collect(); - ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx)); + /// Set total number of simultaneous connections. + /// + /// If limit is 0, the connector has no limit. + /// The default limit size is 100. + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + /// Set total number of simultaneous connections to the same endpoint. + /// + /// Endpoints are the same if they are have equal (host, port, ssl) triplet. + /// If limit is 0, the connector has no limit. The default limit size is 0. + pub fn limit_per_host(mut self, limit: usize) -> Self { + self.limit_per_host = limit; + self + } + + /// Set keep-alive period for opened connection. + /// + /// Keep-alive period is period between connection usage. + /// if deley between connection usage exceeds this period + /// connection closes. + pub fn conn_keep_alive(mut self, dur: Duration) -> Self { + self.conn_keep_alive = dur; + self + } + + /// Set max lifetime period for connection. + /// + /// Connection lifetime is max lifetime of any opened connection + /// until it get closed regardless of keep-alive period. + pub fn conn_lifetime(mut self, dur: Duration) -> Self { + self.conn_lifetime = dur; + self + } + + fn acquire(&mut self, key: &Key) -> Acquire { + // check limits + if self.limit > 0 { + if self.acquired >= self.limit { + return Acquire::NotAvailable + } + if self.limit_per_host > 0 { + if let Some(per_host) = self.acquired_per_host.get(key) { + if self.limit_per_host >= *per_host { + return Acquire::NotAvailable + } + } + } + } + else if self.limit_per_host > 0 { + if let Some(per_host) = self.acquired_per_host.get(key) { + if self.limit_per_host >= *per_host { + return Acquire::NotAvailable + } + } + } + + self.reserve(key); + + // check if open connection is available + // cleanup stale connections at the same time + if let Some(ref mut connections) = self.available.get_mut(key) { + let now = Instant::now(); + while let Some(conn) = connections.pop_back() { + // check if it still usable + if (now - conn.0) > self.conn_keep_alive + || (now - conn.1.ts) > self.conn_lifetime + { + self.to_close.push(conn.1); + } else { + let mut conn = conn.1; + let mut buf = [0; 2]; + match conn.stream().read(&mut buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), + Ok(n) if n > 0 => { + self.to_close.push(conn); + continue + }, + Ok(_) | Err(_) => continue, + } + return Acquire::Acquired(conn) + } + } + } + Acquire::Available + } + + fn reserve(&mut self, key: &Key) { + self.acquired += 1; + let per_host = + if let Some(per_host) = self.acquired_per_host.get(key) { + *per_host + } else { + 0 + }; + self.acquired_per_host.insert(key.clone(), per_host + 1); + } + + fn release_key(&mut self, key: &Key) { + self.acquired -= 1; + let per_host = + if let Some(per_host) = self.acquired_per_host.get(key) { + *per_host + } else { + return + }; + if per_host > 1 { + self.acquired_per_host.insert(key.clone(), per_host - 1); + } else { + self.acquired_per_host.remove(key); + } + } + + fn collect(&mut self, periodic: bool) { + let now = Instant::now(); + + // collect half acquire keys + if let Some(keys) = self.pool.collect_keys() { + for key in keys { + self.release_key(&key); + } + } + + // collect connections for close + if let Some(to_close) = self.pool.collect_close() { + for conn in to_close { + self.release_key(&conn.key); + self.to_close.push(conn); + } + } + + // connection connections + if let Some(to_release) = self.pool.collect_release() { + for conn in to_release { + self.release_key(&conn.key); + + // check connection lifetime and the return to available pool + if (now - conn.ts) < self.conn_lifetime { + self.available.entry(conn.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(Conn(Instant::now(), conn)); + } + } + } + + // check keep-alive + for conns in self.available.values_mut() { + while !conns.is_empty() { + if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive + || (now - conns[0].1.ts) > self.conn_lifetime + { + let conn = conns.pop_front().unwrap().1; + self.to_close.push(conn); + } else { + break + } + } + } + + // check connections for shutdown + if periodic { + let mut idx = 0; + while idx < self.to_close.len() { + match AsyncWrite::shutdown(&mut self.to_close[idx]) { + Ok(Async::NotReady) => idx += 1, + _ => { + self.to_close.swap_remove(idx); + }, + } + } + } + } + + fn collect_periodic(&mut self, ctx: &mut Context) { + self.collect(true); + // re-schedule next collect period + ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); } } @@ -205,6 +425,8 @@ impl Handler for ClientConnector { type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { + self.collect(false); + let uri = &msg.uri; let conn_timeout = msg.conn_timeout; @@ -227,76 +449,244 @@ impl Handler for ClientConnector { return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)) } + // check if pool has task reference + if self.pool.task.borrow().is_none() { + *self.pool.task.borrow_mut() = Some(current_task()); + } + let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); let key = Key {host, port, ssl: proto.is_secure()}; + // acquire connection let pool = if proto.is_http() { - if let Some(mut conn) = self.pool.query(&key) { - conn.pool = Some(self.pool.clone()); - return ActorResponse::async(fut::ok(conn)) - } else { - Some(Rc::clone(&self.pool)) + match self.acquire(&key) { + Acquire::Acquired(mut conn) => { + // use existing connection + conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool)))); + return ActorResponse::async(fut::ok(conn)) + }, + Acquire::NotAvailable => { + // connection is not available, wait + let (tx, rx) = oneshot::channel(); + let waiter = Waiter{ tx, conn_timeout }; + self.waiters.entry(key.clone()).or_insert_with(VecDeque::new) + .push_back(waiter); + return ActorResponse::async( + rx.map_err(|_| ClientConnectorError::Disconnected) + .into_actor(self) + .and_then(|res, _, _| match res { + Ok(conn) => fut::ok(conn), + Err(err) => fut::err(err), + })); + } + Acquire::Available => { + Some(Rc::clone(&self.pool)) + }, } } else { None }; + let conn = AcquiredConn(key, pool); - ActorResponse::async( - Connector::from_registry() - .send(ResolveConnect::host_and_port(&key.host, port) - .timeout(conn_timeout)) - .into_actor(self) - .map_err(|_, _, _| ClientConnectorError::Disconnected) - .and_then(move |res, _act, _| { - #[cfg(feature="alpn")] - match res { - Err(err) => fut::Either::B(fut::err(err.into())), - Ok(stream) => { - if proto.is_secure() { - fut::Either::A( - _act.connector.connect_async(&key.host, stream) - .map_err(ClientConnectorError::SslError) - .map(|stream| Connection::new( - key, pool, Box::new(stream))) - .into_actor(_act)) - } else { - fut::Either::B(fut::ok( - Connection::new(key, pool, Box::new(stream)))) - } +{ + ActorResponse::async( + Connector::from_registry() + .send(ResolveConnect::host_and_port(&conn.0.host, port) + .timeout(conn_timeout)) + .into_actor(self) + .map_err(|_, _, _| ClientConnectorError::Disconnected) + .and_then(move |res, _act, _| { + #[cfg(feature="alpn")] + match res { + Err(err) => fut::Either::B(fut::err(err.into())), + Ok(stream) => { + if proto.is_secure() { + fut::Either::A( + _act.connector.connect_async(&conn.0.host, stream) + .map_err(ClientConnectorError::SslError) + .map(|stream| Connection::new( + conn.0.clone(), Some(conn), Box::new(stream))) + .into_actor(_act)) + } else { + fut::Either::B(fut::ok( + Connection::new( + conn.0.clone(), Some(conn), Box::new(stream)))) } } + } - #[cfg(all(feature="tls", not(feature="alpn")))] - match res { - Err(err) => fut::Either::B(fut::err(err.into())), - Ok(stream) => { - if proto.is_secure() { - fut::Either::A( - _act.connector.connect_async(&key.host, stream) - .map_err(ClientConnectorError::SslError) - .map(|stream| Connection::new( - key, pool, Box::new(stream))) - .into_actor(_act)) - } else { - fut::Either::B(fut::ok( - Connection::new(key, pool, Box::new(stream)))) - } + #[cfg(all(feature="tls", not(feature="alpn")))] + match res { + Err(err) => fut::Either::B(fut::err(err.into())), + Ok(stream) => { + if proto.is_secure() { + fut::Either::A( + _act.connector.connect_async(&conn.0.host, stream) + .map_err(ClientConnectorError::SslError) + .map(|stream| Connection::new( + conn.0.clone(), Some(conn), Box::new(stream))) + .into_actor(_act)) + } else { + fut::Either::B(fut::ok( + Connection::new( + conn.0.clone(), Some(conn), Box::new(stream)))) } } + } - #[cfg(not(any(feature="alpn", feature="tls")))] - match res { - Err(err) => fut::err(err.into()), - Ok(stream) => { - if proto.is_secure() { - fut::err(ClientConnectorError::SslIsNotSupported) - } else { - fut::ok(Connection::new(key, pool, Box::new(stream))) - } + #[cfg(not(any(feature="alpn", feature="tls")))] + match res { + Err(err) => fut::err(err.into()), + Ok(stream) => { + if proto.is_secure() { + fut::err(ClientConnectorError::SslIsNotSupported) + } else { + fut::ok(Connection::new( + conn.0.clone(), Some(conn), Box::new(stream))) } } - })) + } + })) +} + } +} + +struct Maintenance; + +impl fut::ActorFuture for Maintenance +{ + type Item = (); + type Error = (); + type Actor = ClientConnector; + + fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context) + -> Poll + { + // collecto connections + act.collect(false); + + // check waiters + let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)}; + + for (key, waiters) in &mut tmp.waiters { + while let Some(waiter) = waiters.pop_front() { + if waiter.tx.is_canceled() { continue } + + match act.acquire(key) { + Acquire::Acquired(mut conn) => { + // use existing connection + conn.pool = Some( + AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)))); + let _ = waiter.tx.send(Ok(conn)); + }, + Acquire::NotAvailable => { + waiters.push_front(waiter); + break + } + Acquire::Available => + { + let conn = AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))); + + fut::WrapFuture::::actfuture( + Connector::from_registry() + .send(ResolveConnect::host_and_port(&conn.0.host, conn.0.port) + .timeout(waiter.conn_timeout))) + .map_err(|_, _, _| ()) + .and_then(move |res, _act, _| { + #[cfg(feature="alpn")] + match res { + Err(err) => { + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + }, + Ok(stream) => { + if conn.0.ssl { + fut::Either::A( + _act.connector.connect_async(&key.host, stream) + .then(move |res| { + match res { + Err(e) => { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslError(e))); + }, + Ok(stream) => { + let _ = waiter.tx.send(Ok( + Connection::new( + conn.0.clone(), + Some(conn), Box::new(stream)))); + } + } + Ok(()) + }) + .actfuture()) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), Some(conn), Box::new(stream)))); + fut::Either::B(fut::ok(())) + } + } + } + + #[cfg(all(feature="tls", not(feature="alpn")))] + match res { + Err(err) => { + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + }, + Ok(stream) => { + if conn.0.ssl { + fut::Either::A( + _act.connector.connect_async(&conn.0.host, stream) + .then(|res| { + match res { + Err(e) => { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslError(e))); + }, + Ok(stream) => { + let _ = waiter.tx.send(Ok( + Connection::new( + conn.0.clone(), Some(conn), + Box::new(stream)))); + } + } + Ok(()) + }) + .into_actor(_act)) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), Some(conn), Box::new(stream)))); + fut::Either::B(fut::ok(())) + } + } + } + + #[cfg(not(any(feature="alpn", feature="tls")))] + match res { + Err(err) => { + let _ = waiter.tx.send(Err(err.into())); + fut::err(()) + }, + Ok(stream) => { + if conn.0.ssl { + let _ = waiter.tx.send( + Err(ClientConnectorError::SslIsNotSupported)); + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), Some(conn), Box::new(stream)))); + }; + fut::ok(()) + }, + } + }) + .spawn(ctx); + } + } + } + } + + Ok(Async::NotReady) } } @@ -357,104 +747,94 @@ impl Key { #[derive(Debug)] struct Conn(Instant, Connection); +enum Acquire { + Acquired(Connection), + Available, + NotAvailable, +} + +struct AcquiredConn(Key, Option>); + +impl AcquiredConn { + fn close(&mut self, conn: Connection) { + if let Some(pool) = self.1.take() { + pool.close(conn); + } + } + fn release(&mut self, conn: Connection) { + if let Some(pool) = self.1.take() { + pool.release(conn); + } + } +} + +impl Drop for AcquiredConn { + fn drop(&mut self) { + if let Some(pool) = self.1.take() { + pool.release_key(self.0.clone()); + } + } +} + pub struct Pool { - max_size: usize, - keep_alive: Duration, - max_lifetime: Duration, - pool: RefCell>>, + keys: RefCell>, to_close: RefCell>, + to_release: RefCell>, + task: RefCell>, } impl Pool { fn new() -> Pool { Pool { - max_size: 128, - keep_alive: Duration::from_secs(15), - max_lifetime: Duration::from_secs(75), - pool: RefCell::new(HashMap::new()), + keys: RefCell::new(Vec::new()), to_close: RefCell::new(Vec::new()), + to_release: RefCell::new(Vec::new()), + task: RefCell::new(None), } } - fn collect(&self) { - let mut pool = self.pool.borrow_mut(); - let mut to_close = self.to_close.borrow_mut(); - - // check keep-alive - let now = Instant::now(); - for conns in pool.values_mut() { - while !conns.is_empty() { - if (now - conns[0].0) > self.keep_alive - || (now - conns[0].1.ts) > self.max_lifetime - { - let conn = conns.pop_front().unwrap().1; - to_close.push(conn); - } else { - break - } - } - } - - // check connections for shutdown - let mut idx = 0; - while idx < to_close.len() { - match AsyncWrite::shutdown(&mut to_close[idx]) { - Ok(Async::NotReady) => idx += 1, - _ => { - to_close.swap_remove(idx); - }, - } + fn collect_keys(&self) -> Option> { + if self.keys.borrow().is_empty() { + None + } else { + Some(mem::replace(&mut *self.keys.borrow_mut(), Vec::new())) } } - fn query(&self, key: &Key) -> Option { - let mut pool = self.pool.borrow_mut(); - let mut to_close = self.to_close.borrow_mut(); - - if let Some(ref mut connections) = pool.get_mut(key) { - let now = Instant::now(); - while let Some(conn) = connections.pop_back() { - // check if it still usable - if (now - conn.0) > self.keep_alive - || (now - conn.1.ts) > self.max_lifetime - { - to_close.push(conn.1); - } else { - let mut conn = conn.1; - let mut buf = [0; 2]; - match conn.stream().read(&mut buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), - Ok(n) if n > 0 => { - to_close.push(conn); - continue - }, - Ok(_) | Err(_) => continue, - } - return Some(conn) - } - } + fn collect_close(&self) -> Option> { + if self.to_close.borrow().is_empty() { + None + } else { + Some(mem::replace(&mut *self.to_close.borrow_mut(), Vec::new())) + } + } + + fn collect_release(&self) -> Option> { + if self.to_release.borrow().is_empty() { + None + } else { + Some(mem::replace(&mut *self.to_release.borrow_mut(), Vec::new())) + } + } + + fn close(&self, conn: Connection) { + self.to_close.borrow_mut().push(conn); + if let Some(ref task) = *self.task.borrow() { + task.notify() } - None } fn release(&self, conn: Connection) { - if (Instant::now() - conn.ts) < self.max_lifetime { - let mut pool = self.pool.borrow_mut(); - if !pool.contains_key(&conn.key) { - let key = conn.key.clone(); - let mut vec = VecDeque::new(); - vec.push_back(Conn(Instant::now(), conn)); - pool.insert(key, vec); - } else { - let vec = pool.get_mut(&conn.key).unwrap(); - vec.push_back(Conn(Instant::now(), conn)); - if vec.len() > self.max_size { - let conn = vec.pop_front().unwrap(); - self.to_close.borrow_mut().push(conn.1); - } - } - } else { - self.to_close.borrow_mut().push(conn); + self.to_release.borrow_mut().push(conn); + if let Some(ref task) = *self.task.borrow() { + task.notify() + } + } + + fn release_key(&self, key: Key) { + self.keys.borrow_mut().push(key); + if let Some(ref task) = *self.task.borrow() { + task.notify() } } } @@ -463,7 +843,7 @@ impl Pool { pub struct Connection { key: Key, stream: Box, - pool: Option>, + pool: Option, ts: Instant, } @@ -474,11 +854,8 @@ impl fmt::Debug for Connection { } impl Connection { - fn new(key: Key, pool: Option>, stream: Box) -> Self { - Connection { - key, pool, stream, - ts: Instant::now(), - } + fn new(key: Key, pool: Option, stream: Box) -> Self { + Connection {key, stream, pool, ts: Instant::now()} } pub fn stream(&mut self) -> &mut IoStream { @@ -489,8 +866,14 @@ impl Connection { Connection::new(Key::empty(), None, Box::new(io)) } + pub fn close(mut self) { + if let Some(mut pool) = self.pool.take() { + pool.close(self) + } + } + pub fn release(mut self) { - if let Some(pool) = self.pool.take() { + if let Some(mut pool) = self.pool.take() { pool.release(self) } } diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 19ccf8927..15e7ef472 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -458,3 +458,11 @@ impl Pipeline { } } } + +impl Drop for Pipeline { + fn drop(&mut self) { + if let Some(conn) = self.conn.take() { + conn.close() + } + } +}