diff --git a/src/client/connector.rs b/src/client/connector.rs index 4f14a9e27..effee7fa7 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,11 +1,11 @@ use std::{fmt, mem, io, time}; -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::rc::Rc; use std::net::Shutdown; use std::time::{Duration, Instant}; use std::collections::{HashMap, VecDeque}; -use actix::{fut, Actor, ActorFuture, Context, AsyncContext, +use actix::{fut, Actor, ActorFuture, Arbiter, Context, AsyncContext, Handler, Message, ActorResponse, Supervised, ContextFutureSpawner}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; @@ -16,6 +16,7 @@ use futures::{Async, Future, Poll}; use futures::task::{Task, current as current_task}; use futures::unsync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::reactor::Timeout; #[cfg(feature="alpn")] use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError}; @@ -35,8 +36,9 @@ use server::IoStream; /// `Connect` type represents message that can be send to `ClientConnector` /// with connection request. pub struct Connect { - pub uri: Uri, - pub conn_timeout: Duration, + pub(crate) uri: Uri, + pub(crate) wait_time: Duration, + pub(crate) conn_timeout: Duration, } impl Connect { @@ -44,9 +46,25 @@ impl Connect { pub fn new(uri: U) -> Result where Uri: HttpTryFrom { Ok(Connect { uri: Uri::try_from(uri).map_err(|e| e.into())?, - conn_timeout: Duration::from_secs(1) + wait_time: Duration::from_secs(5), + conn_timeout: Duration::from_secs(1), }) } + + /// Connection timeout, max time to connect to remote host. + /// By default connect timeout is 1 seccond. + pub fn conn_timeout(mut self, timeout: Duration) -> Self { + self.conn_timeout = timeout; + self + } + + /// If connection pool limits are enabled, wait time indicates + /// max time to wait for available connection. + /// By default connect timeout is 5 secconds. + pub fn wait_time(mut self, timeout: Duration) -> Self { + self.wait_time = timeout; + self + } } impl Message for Connect { @@ -102,6 +120,7 @@ impl From for ClientConnectorError { struct Waiter { tx: oneshot::Sender>, + wait: Instant, conn_timeout: Duration, } @@ -114,6 +133,8 @@ pub struct ClientConnector { connector: TlsConnector, pool: Rc, + pool_modified: Rc>, + conn_lifetime: Duration, conn_keep_alive: Duration, limit: usize, @@ -123,6 +144,7 @@ pub struct ClientConnector { available: HashMap>, to_close: Vec, waiters: HashMap>, + wait_timeout: Option<(Instant, Timeout)>, } impl Actor for ClientConnector { @@ -140,6 +162,8 @@ impl ArbiterService for ClientConnector {} impl Default for ClientConnector { fn default() -> ClientConnector { + let modified = Rc::new(Cell::new(false)); + #[cfg(all(feature="alpn"))] { let builder = SslConnector::builder(SslMethod::tls()).unwrap(); @@ -149,7 +173,8 @@ impl Default for ClientConnector { { let builder = TlsConnector::builder().unwrap(); ClientConnector { - pool: Rc::new(Pool::new()), + pool: Rc::new(Pool::new(Rc::clone(&modified))), + pool_modified: modified, connector: builder.build().unwrap(), conn_lifetime: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(75), @@ -160,11 +185,13 @@ impl Default for ClientConnector { available: HashMap::new(), to_close: Vec::new(), waiters: HashMap::new(), + wait_timeout: None, } } #[cfg(not(any(feature="alpn", feature="tls")))] - ClientConnector {pool: Rc::new(Pool::new()), + ClientConnector {pool: Rc::new(Pool::new(Rc::clone(&modified))), + pool_modified: modified, conn_lifetime: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(75), limit: 100, @@ -174,6 +201,7 @@ impl Default for ClientConnector { available: HashMap::new(), to_close: Vec::new(), waiters: HashMap::new(), + wait_timeout: None, } } } @@ -224,9 +252,11 @@ impl ClientConnector { /// } /// ``` pub fn with_connector(connector: SslConnector) -> ClientConnector { + let modified = Rc::new(Cell::new(false)); ClientConnector { connector, - pool: Rc::new(Pool::new()), + pool: Rc::new(Pool::new(Rc::clone(&modified))), + pool_modified: modified, conn_lifetime: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(75), limit: 100, @@ -236,6 +266,7 @@ impl ClientConnector { available: HashMap::new(), to_close: Vec::new(), waiters: HashMap::new(), + wait_timeout: None, } } @@ -357,31 +388,33 @@ impl ClientConnector { fn collect(&mut self, periodic: bool) { let now = Instant::now(); - // collect half acquire keys - if let Some(keys) = self.pool.collect_keys() { - for key in keys { - self.release_key(&key); + if self.pool_modified.get() { + // collect half acquire keys + if let Some(keys) = self.pool.collect_keys() { + for key in keys { + self.release_key(&key); + } } - } - // collect connections for close - if let Some(to_close) = self.pool.collect_close() { - for conn in to_close { - self.release_key(&conn.key); - self.to_close.push(conn); + // collect connections for close + if let Some(to_close) = self.pool.collect_close() { + for conn in to_close { + self.release_key(&conn.key); + self.to_close.push(conn); + } } - } - // connection connections - if let Some(to_release) = self.pool.collect_release() { - for conn in to_release { - self.release_key(&conn.key); + // connection connections + if let Some(to_release) = self.pool.collect_release() { + for conn in to_release { + self.release_key(&conn.key); - // check connection lifetime and the return to available pool - if (now - conn.ts) < self.conn_lifetime { - self.available.entry(conn.key.clone()) - .or_insert_with(VecDeque::new) - .push_back(Conn(Instant::now(), conn)); + // check connection lifetime and the return to available pool + if (now - conn.ts) < self.conn_lifetime { + self.available.entry(conn.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(Conn(Instant::now(), conn)); + } } } } @@ -412,6 +445,8 @@ impl ClientConnector { } } } + + self.pool_modified.set(false); } fn collect_periodic(&mut self, ctx: &mut Context) { @@ -419,15 +454,58 @@ impl ClientConnector { // re-schedule next collect period ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); } + + fn collect_waiters(&mut self) { + let now = Instant::now(); + let mut next = None; + + for (_, waiters) in &mut self.waiters { + let mut idx = 0; + while idx < waiters.len() { + if waiters[idx].wait <= now { + let waiter = waiters.swap_remove_back(idx).unwrap(); + let _ = waiter.tx.send(Err(ClientConnectorError::Timeout)); + } else { + if let Some(n) = next { + if waiters[idx].wait < n { + next = Some(waiters[idx].wait); + } + } else { + next = Some(waiters[idx].wait); + } + idx += 1; + } + } + } + + if next.is_some() { + self.install_wait_timeout(next.unwrap()); + } + } + + fn install_wait_timeout(&mut self, time: Instant) { + if let Some(ref mut wait) = self.wait_timeout { + if wait.0 < time { + return + } + } + + let mut timeout = Timeout::new(time-Instant::now(), Arbiter::handle()).unwrap(); + let _ = timeout.poll(); + self.wait_timeout = Some((time, timeout)); + } } impl Handler for ClientConnector { type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { - self.collect(false); + if self.pool_modified.get() { + self.collect(false); + } let uri = &msg.uri; + let wait_time = msg.wait_time; let conn_timeout = msg.conn_timeout; // host name is required @@ -469,7 +547,11 @@ impl Handler for ClientConnector { Acquire::NotAvailable => { // connection is not available, wait let (tx, rx) = oneshot::channel(); - let waiter = Waiter{ tx, conn_timeout }; + + let wait = Instant::now() + wait_time; + self.install_wait_timeout(wait); + + let waiter = Waiter{ tx, wait, conn_timeout }; self.waiters.entry(key.clone()).or_insert_with(VecDeque::new) .push_back(waiter); return ActorResponse::async( @@ -563,8 +645,13 @@ impl fut::ActorFuture for Maintenance fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context) -> Poll { - // collecto connections - act.collect(false); + // collect connections + if act.pool_modified.get() { + act.collect(false); + } + + // collect wait timers + act.collect_waiters(); // check waiters let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)}; @@ -781,11 +868,13 @@ pub struct Pool { to_close: RefCell>, to_release: RefCell>, task: RefCell>, + modified: Rc>, } impl Pool { - fn new() -> Pool { + fn new(modified: Rc>) -> Pool { Pool { + modified, keys: RefCell::new(Vec::new()), to_close: RefCell::new(Vec::new()), to_release: RefCell::new(Vec::new()), @@ -818,6 +907,7 @@ impl Pool { } fn close(&self, conn: Connection) { + self.modified.set(true); self.to_close.borrow_mut().push(conn); if let Some(ref task) = *self.task.borrow() { task.notify() @@ -825,6 +915,7 @@ impl Pool { } fn release(&self, conn: Connection) { + self.modified.set(true); self.to_release.borrow_mut().push(conn); if let Some(ref task) = *self.task.borrow() { task.notify() @@ -832,6 +923,7 @@ impl Pool { } fn release_key(&self, key: Key) { + self.modified.set(true); self.keys.borrow_mut().push(key); if let Some(ref task) = *self.task.borrow() { task.notify() diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index aefffc891..feb443664 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -69,6 +69,7 @@ pub struct SendRequest { state: State, conn: Addr, conn_timeout: Duration, + wait_time: Duration, timeout: Option, } @@ -83,7 +84,8 @@ impl SendRequest { SendRequest{req, conn, state: State::New, timeout: None, - conn_timeout: Duration::from_secs(1) + wait_time: Duration::from_secs(5), + conn_timeout: Duration::from_secs(1), } } @@ -93,6 +95,7 @@ impl SendRequest { state: State::Connection(conn), conn: ClientConnector::from_registry(), timeout: None, + wait_time: Duration::from_secs(5), conn_timeout: Duration::from_secs(1), } } @@ -115,6 +118,15 @@ impl SendRequest { self.conn_timeout = timeout; self } + + /// Set wait time + /// + /// If connections pool limits are enabled, wait time indicates max time + /// to wait for available connection. Default value is 5 seconds. + pub fn wait_time(mut self, timeout: Duration) -> Self { + self.wait_time = timeout; + self + } } impl Future for SendRequest { @@ -129,6 +141,7 @@ impl Future for SendRequest { State::New => self.state = State::Connect(self.conn.send(Connect { uri: self.req.uri().clone(), + wait_time: self.wait_time, conn_timeout: self.conn_timeout, })), State::Connect(mut conn) => match conn.poll() {