From cf38183dcb345942bdef6b0cd5e403a99c7de5d6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 23 Jun 2018 12:40:21 +0600 Subject: [PATCH] refactor client connector waiters maintenance --- src/client/connector.rs | 237 ++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 117 deletions(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 58b6331db..f8c7ba7c7 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -207,7 +207,7 @@ pub struct ClientConnector { acquired_per_host: HashMap, available: HashMap>, to_close: Vec, - waiters: HashMap>, + waiters: Option>>, wait_timeout: Option<(Instant, Delay)>, paused: Paused, } @@ -255,7 +255,7 @@ impl Default for ClientConnector { acquired_per_host: HashMap::new(), available: HashMap::new(), to_close: Vec::new(), - waiters: HashMap::new(), + waiters: Some(HashMap::new()), wait_timeout: None, paused: Paused::No, } @@ -278,7 +278,7 @@ impl Default for ClientConnector { acquired_per_host: HashMap::new(), available: HashMap::new(), to_close: Vec::new(), - waiters: HashMap::new(), + waiters: Some(HashMap::new()), wait_timeout: None, paused: Paused::No, } @@ -344,7 +344,7 @@ impl ClientConnector { acquired_per_host: HashMap::new(), available: HashMap::new(), to_close: Vec::new(), - waiters: HashMap::new(), + waiters: Some(HashMap::new()), wait_timeout: None, paused: Paused::No, } @@ -504,7 +504,7 @@ impl ClientConnector { let now = Instant::now(); let mut next = None; - for waiters in self.waiters.values_mut() { + for waiters in self.waiters.as_mut().unwrap().values_mut() { let mut idx = 0; while idx < waiters.len() { if waiters[idx].wait <= now { @@ -556,6 +556,8 @@ impl ClientConnector { conn_timeout, }; self.waiters + .as_mut() + .unwrap() .entry(key) .or_insert_with(VecDeque::new) .push_back(waiter); @@ -835,9 +837,9 @@ impl fut::ActorFuture for Maintenance { act.collect_waiters(); // check waiters - let tmp: &mut ClientConnector = unsafe { &mut *(act as *mut _) }; + let mut waiters = act.waiters.take().unwrap(); - for (key, waiters) in &mut tmp.waiters { + for (key, waiters) in &mut waiters { while let Some(waiter) = waiters.pop_front() { if waiter.tx.is_canceled() { continue; @@ -865,118 +867,118 @@ impl fut::ActorFuture for Maintenance { ), ).map_err(|_, _, _| ()) .and_then(move |res, act, _| { - #[cfg_attr(rustfmt, rustfmt_skip)] - #[cfg(feature = "alpn")] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::Either::B(fut::err(())) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - fut::Either::A( - act.connector - .connect_async(&key.host, stream) - .then(move |res| { - match res { - Err(e) => { - let _ = waiter.tx.send( - Err(ClientConnectorError::SslError(e))); - } - Ok(stream) => { - let _ = waiter.tx.send( - Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - )), - ); - } - } - Ok(()) - }) - .actfuture(), - ) - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - fut::Either::B(fut::ok(())) - } - } - } + #[cfg(feature = "alpn")] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + fut::Either::A( + act.connector + .connect_async(&key.host, stream) + .then(move |res| { + match res { + Err(e) => { + let _ = waiter.tx.send( + Err(ClientConnectorError::SslError(e))); + } + Ok(stream) => { + let _ = waiter.tx.send( + Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + )), + ); + } + } + Ok(()) + }) + .actfuture(), + ) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + fut::Either::B(fut::ok(())) + } + } + } - #[cfg_attr(rustfmt, rustfmt_skip)] - #[cfg(all(feature = "tls", not(feature = "alpn")))] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::Either::B(fut::err(())) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - fut::Either::A( - act.connector - .connect_async(&conn.0.host, stream) - .then(|res| { - match res { - Err(e) => { - let _ = waiter.tx.send(Err( - ClientConnectorError::SslError(e), - )); - } - Ok(stream) => { - let _ = waiter.tx.send( - Ok(Connection::new( - conn.0.clone(), Some(conn), - Box::new(stream), - )), - ); - } - } - Ok(()) - }) - .into_actor(act), - ) - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - fut::Either::B(fut::ok(())) - } - } - } + #[cfg(all(feature = "tls", not(feature = "alpn")))] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::Either::B(fut::err(())) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + fut::Either::A( + act.connector + .connect_async(&conn.0.host, stream) + .then(|res| { + match res { + Err(e) => { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslError(e), + )); + } + Ok(stream) => { + let _ = waiter.tx.send( + Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + )), + ); + } + } + Ok(()) + }) + .into_actor(act), + ) + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + fut::Either::B(fut::ok(())) + } + } + } - #[cfg_attr(rustfmt, rustfmt_skip)] - #[cfg(not(any(feature = "alpn", feature = "tls")))] - match res { - Err(err) => { - act.stats.errors += 1; - let _ = waiter.tx.send(Err(err.into())); - fut::err(()) - } - Ok(stream) => { - act.stats.opened += 1; - if conn.0.ssl { - let _ = waiter.tx.send(Err(ClientConnectorError::SslIsNotSupported)); - } else { - let _ = waiter.tx.send(Ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))); - }; - fut::ok(()) - } - } + #[cfg(not(any(feature = "alpn", feature = "tls")))] + match res { + Err(err) => { + act.stats.errors += 1; + let _ = waiter.tx.send(Err(err.into())); + fut::err(()) + } + Ok(stream) => { + act.stats.opened += 1; + if conn.0.ssl { + let _ = waiter.tx.send(Err( + ClientConnectorError::SslIsNotSupported, + )); + } else { + let _ = waiter.tx.send(Ok(Connection::new( + conn.0.clone(), + Some(conn), + Box::new(stream), + ))); + }; + fut::ok(()) + } + } }) .spawn(ctx); } @@ -984,6 +986,7 @@ impl fut::ActorFuture for Maintenance { } } + act.waiters = Some(waiters); Ok(Async::NotReady) } }