From d43902ee7cfbcfeb3689c72004bb8a9a99c4e639 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 17 Jul 2018 17:23:03 +0600 Subject: [PATCH] proper handling for client connection release --- src/client/connector.rs | 323 +++++++++++++++++++++++----------------- 1 file changed, 188 insertions(+), 135 deletions(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 604af0b86..5d66d66d6 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -570,6 +570,131 @@ impl ClientConnector { .push_back(waiter); rx } + + fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context) { + let conn = AcquiredConn(key, Some(self.acq_tx.clone())); + + fut::WrapFuture::::actfuture( + self.resolver.as_ref().unwrap().send( + ResolveConnect::host_and_port(&conn.0.host, conn.0.port) + .timeout(waiter.conn_timeout), + ), + ).map_err(|_, _, _| ()) + .and_then(move |res, act, _| { + #[cfg(feature = "alpn")] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + fut::Either::A( + act.connector + .connect_async(&key.host, stream) + .then(move |res| { + match res { + Err(e) => { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslError(e), + )); + } + Ok(stream) => { + let _ = + waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + } + } + Ok(()) + }) + .into_actor(act), + ) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + fut::Either::B(fut::ok(())) + } + } + } + + #[cfg(all(feature = "tls", not(feature = "alpn")))] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + fut::Either::A( + act.connector + .connect_async(&conn.0.host, stream) + .then(|res| { + match res { + Err(e) => { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslError(e), + )); + } + Ok(stream) => { + let _ = + waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + } + } + Ok(()) + }) + .into_actor(act), + ) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + fut::Either::B(fut::ok(())) + } + } + } + + #[cfg(not(any(feature = "alpn", feature = "tls")))] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::err(()) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + let _ = waiter + .tx + .send(Err(ClientConnectorError::SslIsNotSupported)); + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + }; + fut::ok(()) + } + } + }) + .spawn(ctx); + } } impl Handler for ClientConnector { @@ -777,28 +902,78 @@ impl Handler for ClientConnector { } impl StreamHandler for ClientConnector { - fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context) { + fn handle(&mut self, msg: AcquiredConnOperation, ctx: &mut Context) { let now = Instant::now(); + // check if we have queued up waiters + let waiter = { + let key = match msg { + AcquiredConnOperation::Close(ref conn) => &conn.key, + AcquiredConnOperation::Release(ref conn) => &conn.key, + AcquiredConnOperation::ReleaseKey(ref key) => key, + }; + + if let Some(ref mut waiters) = self.waiters.as_mut().unwrap().get_mut(key) { + loop { + if let Some(waiter) = waiters.pop_front() { + if waiter.tx.is_canceled() { + continue; + } + break Some(waiter); + } else { + break None; + } + } + } else { + None + } + }; + match msg { AcquiredConnOperation::Close(conn) => { - self.release_key(&conn.key); + if let Some(waiter) = waiter { + // create new connection + self.connect_waiter(conn.key.clone(), waiter, ctx); + } else { + self.release_key(&conn.key); + } self.to_close.push(conn); self.stats.closed += 1; } - AcquiredConnOperation::Release(conn) => { - self.release_key(&conn.key); + AcquiredConnOperation::Release(mut conn) => { + let alive = (Instant::now() - conn.ts) < self.conn_lifetime; - // check connection lifetime and the return to available pool - if (Instant::now() - conn.ts) < self.conn_lifetime { - self.available - .entry(conn.key.clone()) - .or_insert_with(VecDeque::new) - .push_back(Conn(Instant::now(), conn)); + if let Some(waiter) = waiter { + // check connection lifetime and the return to available pool + if alive { + // use existing connection + self.stats.reused += 1; + conn.pool = Some(AcquiredConn( + conn.key.clone(), + Some(self.acq_tx.clone()), + )); + let _ = waiter.tx.send(Ok(conn)); + } else { + // create new connection + self.connect_waiter(conn.key.clone(), waiter, ctx); + } + } else { + self.release_key(&conn.key); + if alive { + self.available + .entry(conn.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(Conn(Instant::now(), conn)); + } } } AcquiredConnOperation::ReleaseKey(key) => { - self.release_key(&key); + if let Some(waiter) = waiter { + // create new connection + self.connect_waiter(key, waiter, ctx); + } else { + self.release_key(&key); + } } } @@ -861,130 +1036,8 @@ impl fut::ActorFuture for Maintenance { break; } Acquire::Available => { - let key = key.clone(); - let conn = AcquiredConn(key.clone(), Some(act.acq_tx.clone())); - - fut::WrapFuture::::actfuture( - act.resolver.as_ref().unwrap().send( - ResolveConnect::host_and_port(&conn.0.host, conn.0.port) - .timeout(waiter.conn_timeout), - ), - ).map_err(|_, _, _| ()) - .and_then(move |res, act, _| { - #[cfg(feature = "alpn")] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::Either::B(fut::err(())) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - fut::Either::A( - act.connector - .connect_async(&key.host, stream) - .then(move |res| { - match res { - Err(e) => { - let _ = waiter.tx.send( - Err(ClientConnectorError::SslError(e))); - } - Ok(stream) => { - let _ = waiter.tx.send( - Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - )), - ); - } - } - Ok(()) - }) - .into_actor(act), - ) - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - fut::Either::B(fut::ok(())) - } - } - } - - #[cfg(all(feature = "tls", not(feature = "alpn")))] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::Either::B(fut::err(())) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - fut::Either::A( - act.connector - .connect_async(&conn.0.host, stream) - .then(|res| { - match res { - Err(e) => { - let _ = waiter.tx.send(Err( - ClientConnectorError::SslError(e), - )); - } - Ok(stream) => { - let _ = waiter.tx.send( - Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - )), - ); - } - } - Ok(()) - }) - .into_actor(act), - ) - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - fut::Either::B(fut::ok(())) - } - } - } - - #[cfg(not(any(feature = "alpn", feature = "tls")))] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::err(()) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - let _ = waiter.tx.send(Err( - ClientConnectorError::SslIsNotSupported, - )); - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - }; - fut::ok(()) - } - } - }) - .spawn(ctx); + // create new connection + act.connect_waiter(key.clone(), waiter, ctx); } } }