diff --git a/src/client/connector.rs b/src/client/connector.rs index b97cfc711..c95c47cd8 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -46,8 +46,6 @@ pub struct ClientConnectorStats { pub errors: usize, /// Number of connection timeouts pub timeouts: usize, - /// Number of released connections - pub released: usize, } #[derive(Debug)] @@ -413,14 +411,14 @@ impl ClientConnector { } if self.limit_per_host > 0 { if let Some(per_host) = self.acquired_per_host.get(key) { - if self.limit_per_host >= *per_host { + if *per_host >= self.limit_per_host { return Acquire::NotAvailable; } } } } else if self.limit_per_host > 0 { if let Some(per_host) = self.acquired_per_host.get(key) { - if self.limit_per_host >= *per_host { + if *per_host >= self.limit_per_host { return Acquire::NotAvailable; } } @@ -469,7 +467,9 @@ impl ClientConnector { } fn release_key(&mut self, key: &Key) { - self.acquired -= 1; + if self.acquired > 0 { + self.acquired -= 1; + } let per_host = if let Some(per_host) = self.acquired_per_host.get(key) { *per_host } else { @@ -514,23 +514,23 @@ impl ClientConnector { let mut next = None; for waiters in self.waiters.as_mut().unwrap().values_mut() { - let mut idx = 0; - while idx < waiters.len() { - if waiters[idx].wait <= now { + let mut new_waiters = VecDeque::new(); + while let Some(waiter) = waiters.pop_front() { + if waiter.wait <= now { self.stats.timeouts += 1; - let waiter = waiters.swap_remove_back(idx).unwrap(); let _ = waiter.tx.send(Err(ClientConnectorError::Timeout)); } else { if let Some(n) = next { - if waiters[idx].wait < n { - next = Some(waiters[idx].wait); + if waiter.wait < n { + next = Some(waiter.wait); } } else { - next = Some(waiters[idx].wait); + next = Some(waiter.wait); } - idx += 1; + new_waiters.push_back(waiter); } } + *waiters = new_waiters; } if next.is_some() { @@ -573,20 +573,56 @@ impl ClientConnector { rx } - fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context) { - let conn = AcquiredConn(key, Some(self.acq_tx.clone())); + fn check_availibility(&mut self, ctx: &mut Context) { + // check waiters + let mut act_waiters = self.waiters.take().unwrap(); + for (key, ref mut waiters) in &mut act_waiters { + while let Some(waiter) = waiters.pop_front() { + if waiter.tx.is_canceled() { + continue; + } + + match self.acquire(key) { + Acquire::Acquired(mut conn) => { + // use existing connection + self.stats.reused += 1; + conn.pool = + Some(AcquiredConn(key.clone(), Some(self.acq_tx.clone()))); + let _ = waiter.tx.send(Ok(conn)); + } + Acquire::NotAvailable => { + waiters.push_front(waiter); + break; + } + Acquire::Available => { + // create new connection + self.connect_waiter(key.clone(), waiter, ctx); + } + } + } + } + + self.waiters = Some(act_waiters); + } + + fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context) { + let conn = AcquiredConn(key.clone(), Some(self.acq_tx.clone())); + + let key2 = key.clone(); fut::WrapFuture::::actfuture( self.resolver.as_ref().unwrap().send( ResolveConnect::host_and_port(&conn.0.host, conn.0.port) .timeout(waiter.conn_timeout), ), - ).map_err(|_, _, _| ()) + ).map_err(move |_, act, _| { + act.release_key(&key2); + () + }) .and_then(move |res, act, _| { #[cfg(feature = "alpn")] match res { Err(err) => { - act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(())) } @@ -596,7 +632,8 @@ impl ClientConnector { fut::Either::A( act.connector .connect_async(&key.host, stream) - .then(move |res| { + .into_actor(act) + .then(move |res, act, _| { match res { Err(e) => { let _ = waiter.tx.send(Err( @@ -612,9 +649,8 @@ impl ClientConnector { ))); } } - Ok(()) - }) - .into_actor(act), + fut::ok(()) + }), ) } else { let _ = waiter.tx.send(Ok(Connection::new( @@ -630,7 +666,6 @@ impl ClientConnector { #[cfg(all(feature = "tls", not(feature = "alpn")))] match res { Err(err) => { - act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(())) } @@ -640,7 +675,8 @@ impl ClientConnector { fut::Either::A( act.connector .connect_async(&conn.0.host, stream) - .then(|res| { + .into_actor(act) + .then(move |res, _, _| { match res { Err(e) => { let _ = waiter.tx.send(Err( @@ -656,9 +692,8 @@ impl ClientConnector { ))); } } - Ok(()) - }) - .into_actor(act), + fut::ok(()) + }), ) } else { let _ = waiter.tx.send(Ok(Connection::new( @@ -674,7 +709,6 @@ impl ClientConnector { #[cfg(not(any(feature = "alpn", feature = "tls")))] match res { Err(err) => { - act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::err(()) } @@ -725,7 +759,7 @@ impl Handler for ClientConnector { impl Handler for ClientConnector { type Result = ActorResponse; - fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: Connect, ctx: &mut Self::Context) -> Self::Result { let uri = &msg.uri; let wait_timeout = msg.wait_timeout; let conn_timeout = msg.conn_timeout; @@ -761,239 +795,142 @@ impl Handler for ClientConnector { // check pause state if self.paused.is_paused() { - let rx = self.wait_for(key, wait_timeout, conn_timeout); + let rx = self.wait_for(key.clone(), wait_timeout, conn_timeout); self.stats.waits += 1; return ActorResponse::async( rx.map_err(|_| ClientConnectorError::Disconnected) .into_actor(self) - .and_then(|res, _, _| match res { + .and_then(move |res, act, ctx| match res { Ok(conn) => fut::ok(conn), - Err(err) => fut::err(err), + Err(err) => { + match err { + ClientConnectorError::Timeout => (), + _ => { + act.release_key(&key); + } + } + act.stats.errors += 1; + act.check_availibility(ctx); + fut::err(err) + } + }), + ); + } + + // do not re-use websockets connection + if !proto.is_http() { + let (tx, rx) = oneshot::channel(); + let wait = Instant::now() + wait_timeout; + let waiter = Waiter { + tx, + wait, + conn_timeout, + }; + self.connect_waiter(key.clone(), waiter, ctx); + + return ActorResponse::async( + rx.map_err(|_| ClientConnectorError::Disconnected) + .into_actor(self) + .and_then(move |res, act, ctx| match res { + Ok(conn) => fut::ok(conn), + Err(err) => { + act.stats.errors += 1; + act.release_key(&key); + act.check_availibility(ctx); + fut::err(err) + } }), ); } // acquire connection - let pool = if proto.is_http() { - match self.acquire(&key) { - Acquire::Acquired(mut conn) => { - // use existing connection - conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone()))); - self.stats.reused += 1; - return ActorResponse::async(fut::ok(conn)); - } - Acquire::NotAvailable => { - // connection is not available, wait - let rx = self.wait_for(key, wait_timeout, conn_timeout); - self.stats.waits += 1; - return ActorResponse::async( - rx.map_err(|_| ClientConnectorError::Disconnected) - .into_actor(self) - .and_then(|res, _, _| match res { - Ok(conn) => fut::ok(conn), - Err(err) => fut::err(err), - }), - ); - } - Acquire::Available => Some(self.acq_tx.clone()), + match self.acquire(&key) { + Acquire::Acquired(mut conn) => { + // use existing connection + conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone()))); + self.stats.reused += 1; + ActorResponse::async(fut::ok(conn)) } - } else { - None - }; - let conn = AcquiredConn(key, pool); + Acquire::NotAvailable => { + // connection is not available, wait + let rx = self.wait_for(key.clone(), wait_timeout, conn_timeout); + self.stats.waits += 1; - { - ActorResponse::async( - self.resolver - .as_ref() - .unwrap() - .send( - ResolveConnect::host_and_port(&conn.0.host, port) - .timeout(conn_timeout), - ) - .into_actor(self) - .map_err(|_, _, _| ClientConnectorError::Disconnected) - .and_then(move |res, act, _| { - #[cfg(feature = "alpn")] - match res { + ActorResponse::async( + rx.map_err(|_| ClientConnectorError::Disconnected) + .into_actor(self) + .and_then(move |res, act, ctx| match res { + Ok(conn) => fut::ok(conn), Err(err) => { - act.stats.opened += 1; - fut::Either::B(fut::err(err.into())) - } - Ok(stream) => { - act.stats.opened += 1; - if proto.is_secure() { - fut::Either::A( - act.connector - .connect_async(&conn.0.host, stream) - .map_err(ClientConnectorError::SslError) - .map(|stream| { - Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ) - }) - .into_actor(act), - ) - } else { - fut::Either::B(fut::ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))) + match err { + ClientConnectorError::Timeout => (), + _ => { + act.release_key(&key); + } } + act.stats.errors += 1; + act.check_availibility(ctx); + fut::err(err) } - } + }), + ) + } + Acquire::Available => { + let (tx, rx) = oneshot::channel(); + let wait = Instant::now() + wait_timeout; + let waiter = Waiter { + tx, + wait, + conn_timeout, + }; + self.connect_waiter(key.clone(), waiter, ctx); - #[cfg(all(feature = "tls", not(feature = "alpn")))] - match res { + ActorResponse::async( + rx.map_err(|_| ClientConnectorError::Disconnected) + .into_actor(self) + .and_then(move |res, act, ctx| match res { + Ok(conn) => fut::ok(conn), Err(err) => { - act.stats.opened += 1; - fut::Either::B(fut::err(err.into())) + act.stats.errors += 1; + act.release_key(&key); + act.check_availibility(ctx); + fut::err(err) } - Ok(stream) => { - act.stats.opened += 1; - if proto.is_secure() { - fut::Either::A( - act.connector - .connect_async(&conn.0.host, stream) - .map_err(ClientConnectorError::SslError) - .map(|stream| { - Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ) - }) - .into_actor(act), - ) - } else { - fut::Either::B(fut::ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - ))) - } - } - } - - #[cfg(not(any(feature = "alpn", feature = "tls")))] - match res { - Err(err) => { - act.stats.opened += 1; - fut::err(err.into()) - } - Ok(stream) => { - act.stats.opened += 1; - if proto.is_secure() { - fut::err(ClientConnectorError::SslIsNotSupported) - } else { - fut::ok(Connection::new( - conn.0.clone(), - Some(conn), - Box::new(stream), - )) - } - } - } - }), - ) + }), + ) + } } } } impl StreamHandler for ClientConnector { fn handle(&mut self, msg: AcquiredConnOperation, ctx: &mut Context) { - let now = Instant::now(); - self.stats.released += 1; - - // check if we have queued up waiters - let waiter = { - let key = match msg { - AcquiredConnOperation::Close(ref conn) => &conn.key, - AcquiredConnOperation::Release(ref conn) => &conn.key, - AcquiredConnOperation::ReleaseKey(ref key) => key, - }; - - if let Some(ref mut waiters) = self.waiters.as_mut().unwrap().get_mut(key) { - loop { - if let Some(waiter) = waiters.pop_front() { - if waiter.tx.is_canceled() { - continue; - } - break Some(waiter); - } else { - break None; - } - } - } else { - None - } - }; - match msg { AcquiredConnOperation::Close(conn) => { - if let Some(waiter) = waiter { - // create new connection - self.connect_waiter(conn.key.clone(), waiter, ctx); - } else { - self.release_key(&conn.key); - } + self.release_key(&conn.key); self.to_close.push(conn); self.stats.closed += 1; } - AcquiredConnOperation::Release(mut conn) => { - let alive = (Instant::now() - conn.ts) < self.conn_lifetime; - - if let Some(waiter) = waiter { - // check connection lifetime and the return to available pool - if alive { - // use existing connection - self.stats.reused += 1; - conn.pool = Some(AcquiredConn( - conn.key.clone(), - Some(self.acq_tx.clone()), - )); - let _ = waiter.tx.send(Ok(conn)); - } else { - // create new connection - self.connect_waiter(conn.key.clone(), waiter, ctx); - } + AcquiredConnOperation::Release(conn) => { + self.release_key(&conn.key); + if (Instant::now() - conn.ts) < self.conn_lifetime { + self.available + .entry(conn.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(Conn(Instant::now(), conn)); } else { - self.release_key(&conn.key); - if alive { - self.available - .entry(conn.key.clone()) - .or_insert_with(VecDeque::new) - .push_back(Conn(Instant::now(), conn)); - } + self.to_close.push(conn); + self.stats.closed += 1; } } AcquiredConnOperation::ReleaseKey(key) => { - if let Some(waiter) = waiter { - // create new connection - self.connect_waiter(key, waiter, ctx); - } else { - self.release_key(&key); - } + // closed + self.stats.closed += 1; + self.release_key(&key); } } - // check keep-alive - for conns in self.available.values_mut() { - while !conns.is_empty() { - if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive - || (now - conns[0].1.ts) > self.conn_lifetime - { - let conn = conns.pop_front().unwrap().1; - self.to_close.push(conn); - self.stats.closed += 1; - } else { - break; - } - } - } + self.check_availibility(ctx); } } @@ -1018,35 +955,8 @@ impl fut::ActorFuture for Maintenance { act.collect_waiters(); // check waiters - let mut act_waiters = act.waiters.take().unwrap(); + act.check_availibility(ctx); - for (key, ref mut waiters) in &mut act_waiters { - while let Some(waiter) = waiters.pop_front() { - if waiter.tx.is_canceled() { - continue; - } - - match act.acquire(key) { - Acquire::Acquired(mut conn) => { - // use existing connection - act.stats.reused += 1; - conn.pool = - Some(AcquiredConn(key.clone(), Some(act.acq_tx.clone()))); - let _ = waiter.tx.send(Ok(conn)); - } - Acquire::NotAvailable => { - waiters.push_front(waiter); - break; - } - Acquire::Available => { - // create new connection - act.connect_waiter(key.clone(), waiter, ctx); - } - } - } - } - - act.waiters = Some(act_waiters); Ok(Async::NotReady) } } @@ -1181,14 +1091,14 @@ impl Connection { Connection::new(Key::empty(), None, Box::new(io)) } - /// Close connection pool + /// Close connection pub fn close(mut self) { if let Some(mut pool) = self.pool.take() { pool.close(self) } } - /// Release this connection from the connection pool + /// Release this connection to the connection pool pub fn release(mut self) { if let Some(mut pool) = self.pool.take() { pool.release(self) diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index c3f3bf4cd..e5538b060 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -17,7 +17,7 @@ use context::{ActorHttpContext, Frame}; use error::Error; use error::PayloadError; use header::ContentEncoding; -use http::Method; +use http::{Method, Uri}; use httpmessage::HttpMessage; use server::input::PayloadStream; use server::WriterState; @@ -203,7 +203,8 @@ impl Future for SendRequest { should_decompress: self.req.response_decompress(), write_state: RunningState::Running, timeout: Some(Delay::new(Instant::now() + timeout)), - close: self.req.method() == &Method::HEAD, + meth: self.req.method().clone(), + path: self.req.uri().clone(), }); self.state = State::Send(pl); } @@ -249,7 +250,8 @@ pub struct Pipeline { should_decompress: bool, write_state: RunningState, timeout: Option, - close: bool, + meth: Method, + path: Uri, } enum IoBody { @@ -283,7 +285,7 @@ impl RunningState { impl Pipeline { fn release_conn(&mut self) { if let Some(conn) = self.conn.take() { - if self.close { + if self.meth == Method::HEAD { conn.close() } else { conn.release() @@ -529,6 +531,11 @@ impl Pipeline { impl Drop for Pipeline { fn drop(&mut self) { if let Some(conn) = self.conn.take() { + debug!( + "Client http transaction is not completed, dropping connection: {:?} {:?}", + self.meth, + self.path, + ); conn.close() } }