From 8d5fa6ee71c7e7aaba79c78af5d81a4107a3a229 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 6 Apr 2018 11:08:41 -0700 Subject: [PATCH] added Pause/Resume for client connector --- src/client/connector.rs | 102 ++++++++++++++++++++++++++++++++++------ src/client/mod.rs | 4 +- src/client/pipeline.rs | 16 +++---- 3 files changed, 99 insertions(+), 23 deletions(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 85ecd22ba..0ad066ae8 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -37,7 +37,7 @@ use server::IoStream; /// with connection request. pub struct Connect { pub(crate) uri: Uri, - pub(crate) wait_time: Duration, + pub(crate) wait_timeout: Duration, pub(crate) conn_timeout: Duration, } @@ -46,7 +46,7 @@ impl Connect { pub fn new(uri: U) -> Result where Uri: HttpTryFrom { Ok(Connect { uri: Uri::try_from(uri).map_err(|e| e.into())?, - wait_time: Duration::from_secs(5), + wait_timeout: Duration::from_secs(5), conn_timeout: Duration::from_secs(1), }) } @@ -60,9 +60,9 @@ impl Connect { /// If connection pool limits are enabled, wait time indicates /// max time to wait for available connection. - /// By default connect timeout is 5 secconds. - pub fn wait_time(mut self, timeout: Duration) -> Self { - self.wait_time = timeout; + /// By default wait timeout is 5 secconds. + pub fn wait_timeout(mut self, timeout: Duration) -> Self { + self.wait_timeout = timeout; self } } @@ -71,6 +71,21 @@ impl Message for Connect { type Result = Result; } +/// Pause connection process for `ClientConnector` +/// +/// All connect requests enter wait state during connector pause. +pub struct Pause { + time: Option, +} + +impl Message for Pause { + type Result = (); +} + +/// Resume connection process for `ClientConnector` +#[derive(Message)] +pub struct Resume; + /// A set of errors that can occur during connecting to a http host #[derive(Fail, Debug)] pub enum ClientConnectorError { @@ -145,6 +160,7 @@ pub struct ClientConnector { to_close: Vec, waiters: HashMap>, wait_timeout: Option<(Instant, Timeout)>, + paused: Option>, } impl Actor for ClientConnector { @@ -186,6 +202,7 @@ impl Default for ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, + paused: None, } } @@ -202,6 +219,7 @@ impl Default for ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, + paused: None, } } } @@ -267,6 +285,7 @@ impl ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, + paused: None, } } @@ -494,6 +513,47 @@ impl ClientConnector { let _ = timeout.poll(); self.wait_timeout = Some((time, timeout)); } + + fn wait_for(&mut self, key: Key, + wait: Duration, conn_timeout: Duration) + -> oneshot::Receiver> + { + // connection is not available, wait + let (tx, rx) = oneshot::channel(); + + let wait = Instant::now() + wait; + self.install_wait_timeout(wait); + + let waiter = Waiter{ tx, wait, conn_timeout }; + self.waiters.entry(key.clone()).or_insert_with(VecDeque::new) + .push_back(waiter); + rx + } +} + +impl Handler for ClientConnector { + type Result = (); + + fn handle(&mut self, msg: Pause, _: &mut Self::Context) { + if let Some(time) = msg.time { + let when = Instant::now() + time; + let mut timeout = Timeout::new(time, Arbiter::handle()).unwrap(); + let _ = timeout.poll(); + self.paused = Some(Some((when, timeout))); + } else { + if self.paused.is_none() { + self.paused = Some(None); + } + } + } +} + +impl Handler for ClientConnector { + type Result = (); + + fn handle(&mut self, _: Resume, _: &mut Self::Context) { + self.paused.take(); + } } impl Handler for ClientConnector { @@ -505,7 +565,7 @@ impl Handler for ClientConnector { } let uri = &msg.uri; - let wait_time = msg.wait_time; + let wait_timeout = msg.wait_timeout; let conn_timeout = msg.conn_timeout; // host name is required @@ -536,6 +596,19 @@ impl Handler for ClientConnector { let port = uri.port().unwrap_or_else(|| proto.port()); let key = Key {host, port, ssl: proto.is_secure()}; + // check pause state + if self.paused.is_some() { + let rx = self.wait_for(key, wait_timeout, conn_timeout); + return ActorResponse::async( + rx.map_err(|_| ClientConnectorError::Disconnected) + .into_actor(self) + .and_then(|res, _, _| match res { + Ok(conn) => fut::ok(conn), + Err(err) => fut::err(err), + })); + + } + // acquire connection let pool = if proto.is_http() { match self.acquire(&key) { @@ -546,14 +619,7 @@ impl Handler for ClientConnector { }, Acquire::NotAvailable => { // connection is not available, wait - let (tx, rx) = oneshot::channel(); - - let wait = Instant::now() + wait_time; - self.install_wait_timeout(wait); - - let waiter = Waiter{ tx, wait, conn_timeout }; - self.waiters.entry(key.clone()).or_insert_with(VecDeque::new) - .push_back(waiter); + let rx = self.wait_for(key, wait_timeout, conn_timeout); return ActorResponse::async( rx.map_err(|_| ClientConnectorError::Disconnected) .into_actor(self) @@ -645,6 +711,14 @@ impl fut::ActorFuture for Maintenance fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context) -> Poll { + // check pause duration + let done = if let Some(Some(ref pause)) = act.paused { + if pause.0 <= Instant::now() {true} else {false} + } else { false }; + if done { + act.paused.take(); + } + // collect connections if act.pool_modified.get() { act.collect(false); diff --git a/src/client/mod.rs b/src/client/mod.rs index 8becafc9d..8b5713a23 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -9,7 +9,9 @@ mod writer; pub use self::pipeline::{SendRequest, SendRequestError}; pub use self::request::{ClientRequest, ClientRequestBuilder}; pub use self::response::ClientResponse; -pub use self::connector::{Connect, Connection, ClientConnector, ClientConnectorError}; +pub use self::connector::{ + Connect, Pause, Resume, + Connection, ClientConnector, ClientConnectorError}; pub(crate) use self::writer::HttpClientWriter; pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError}; diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index feb443664..7b91adb21 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -62,14 +62,14 @@ enum State { None, } -/// `SendRequest` is a `Future` which represents asynchronous request sending process. +/// `SendRequest` is a `Future` which represents asynchronous sending process. #[must_use = "SendRequest does nothing unless polled"] pub struct SendRequest { req: ClientRequest, state: State, conn: Addr, conn_timeout: Duration, - wait_time: Duration, + wait_timeout: Duration, timeout: Option, } @@ -84,7 +84,7 @@ impl SendRequest { SendRequest{req, conn, state: State::New, timeout: None, - wait_time: Duration::from_secs(5), + wait_timeout: Duration::from_secs(5), conn_timeout: Duration::from_secs(1), } } @@ -95,7 +95,7 @@ impl SendRequest { state: State::Connection(conn), conn: ClientConnector::from_registry(), timeout: None, - wait_time: Duration::from_secs(5), + wait_timeout: Duration::from_secs(5), conn_timeout: Duration::from_secs(1), } } @@ -119,12 +119,12 @@ impl SendRequest { self } - /// Set wait time + /// Set wait timeout /// /// If connections pool limits are enabled, wait time indicates max time /// to wait for available connection. Default value is 5 seconds. - pub fn wait_time(mut self, timeout: Duration) -> Self { - self.wait_time = timeout; + pub fn wait_timeout(mut self, timeout: Duration) -> Self { + self.wait_timeout = timeout; self } } @@ -141,7 +141,7 @@ impl Future for SendRequest { State::New => self.state = State::Connect(self.conn.send(Connect { uri: self.req.uri().clone(), - wait_time: self.wait_time, + wait_timeout: self.wait_timeout, conn_timeout: self.conn_timeout, })), State::Connect(mut conn) => match conn.poll() {