1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-10-03 08:41:55 +00:00

add connection limits to pool

This commit is contained in:
Nikolay Kim 2018-04-04 16:39:01 -07:00
parent 8038a52287
commit d8a9606162
3 changed files with 549 additions and 158 deletions

View file

@ -87,7 +87,7 @@ after_success:
fi fi
- | - |
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then
bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh) bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh)
USE_SKEPTIC=1 cargo tarpaulin --out Xml USE_SKEPTIC=1 cargo tarpaulin --out Xml
bash <(curl -s https://codecov.io/bash) bash <(curl -s https://codecov.io/bash)

View file

@ -1,4 +1,4 @@
use std::{fmt, io, time}; use std::{fmt, mem, io, time};
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
use std::net::Shutdown; use std::net::Shutdown;
@ -6,28 +6,26 @@ use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context, AsyncContext, use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised}; Handler, Message, ActorResponse, Supervised, ContextFutureSpawner};
use actix::registry::ArbiterService; use actix::registry::ArbiterService;
use actix::fut::WrapFuture; use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError}; use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::{Async, Poll}; use futures::{Async, Future, Poll};
use futures::task::{Task, current as current_task};
use futures::unsync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError}; use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use tokio_openssl::SslConnectorExt; use tokio_openssl::SslConnectorExt;
#[cfg(feature="alpn")]
use futures::Future;
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
use native_tls::{TlsConnector, Error as TlsError}; use native_tls::{TlsConnector, Error as TlsError};
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
use tokio_tls::TlsConnectorExt; use tokio_tls::TlsConnectorExt;
#[cfg(all(feature="tls", not(feature="alpn")))]
use futures::Future;
use {HAS_OPENSSL, HAS_TLS}; use {HAS_OPENSSL, HAS_TLS};
use server::IoStream; use server::IoStream;
@ -102,19 +100,37 @@ impl From<ConnectorError> for ClientConnectorError {
} }
} }
struct Waiter {
tx: oneshot::Sender<Result<Connection, ClientConnectorError>>,
conn_timeout: Duration,
}
/// `ClientConnector` type is responsible for transport layer of a client connection
/// of a client connection.
pub struct ClientConnector { pub struct ClientConnector {
#[cfg(all(feature="alpn"))] #[cfg(all(feature="alpn"))]
connector: SslConnector, connector: SslConnector,
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector, connector: TlsConnector,
pool: Rc<Pool>, pool: Rc<Pool>,
conn_lifetime: Duration,
conn_keep_alive: Duration,
limit: usize,
limit_per_host: usize,
acquired: usize,
acquired_per_host: HashMap<Key, usize>,
available: HashMap<Key, VecDeque<Conn>>,
to_close: Vec<Connection>,
waiters: HashMap<Key, VecDeque<Waiter>>,
} }
impl Actor for ClientConnector { impl Actor for ClientConnector {
type Context = Context<ClientConnector>; type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
self.collect(ctx); self.collect_periodic(ctx);
ctx.spawn(Maintenance);
} }
} }
@ -127,22 +143,38 @@ impl Default for ClientConnector {
#[cfg(all(feature="alpn"))] #[cfg(all(feature="alpn"))]
{ {
let builder = SslConnector::builder(SslMethod::tls()).unwrap(); let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector { ClientConnector::with_connector(builder.build())
connector: builder.build(),
pool: Rc::new(Pool::new()),
}
} }
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
{ {
let builder = TlsConnector::builder().unwrap(); let builder = TlsConnector::builder().unwrap();
ClientConnector { ClientConnector {
connector: builder.build().unwrap(),
pool: Rc::new(Pool::new()), pool: Rc::new(Pool::new()),
connector: builder.build().unwrap(),
conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: HashMap::new(),
} }
} }
#[cfg(not(any(feature="alpn", feature="tls")))] #[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {pool: Rc::new(Pool::new())} ClientConnector {pool: Rc::new(Pool::new()),
conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: HashMap::new(),
}
} }
} }
@ -192,12 +224,200 @@ impl ClientConnector {
/// } /// }
/// ``` /// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { connector, pool: Rc::new(Pool::new()) } ClientConnector {
connector,
pool: Rc::new(Pool::new()),
conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: HashMap::new(),
}
} }
fn collect(&mut self, ctx: &mut Context<Self>) { /// Set total number of simultaneous connections.
self.pool.collect(); ///
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx)); /// If limit is 0, the connector has no limit.
/// The default limit size is 100.
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
/// Set total number of simultaneous connections to the same endpoint.
///
/// Endpoints are the same if they are have equal (host, port, ssl) triplet.
/// If limit is 0, the connector has no limit. The default limit size is 0.
pub fn limit_per_host(mut self, limit: usize) -> Self {
self.limit_per_host = limit;
self
}
/// Set keep-alive period for opened connection.
///
/// Keep-alive period is period between connection usage.
/// if deley between connection usage exceeds this period
/// connection closes.
pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
self.conn_keep_alive = dur;
self
}
/// Set max lifetime period for connection.
///
/// Connection lifetime is max lifetime of any opened connection
/// until it get closed regardless of keep-alive period.
pub fn conn_lifetime(mut self, dur: Duration) -> Self {
self.conn_lifetime = dur;
self
}
fn acquire(&mut self, key: &Key) -> Acquire {
// check limits
if self.limit > 0 {
if self.acquired >= self.limit {
return Acquire::NotAvailable
}
if self.limit_per_host > 0 {
if let Some(per_host) = self.acquired_per_host.get(key) {
if self.limit_per_host >= *per_host {
return Acquire::NotAvailable
}
}
}
}
else if self.limit_per_host > 0 {
if let Some(per_host) = self.acquired_per_host.get(key) {
if self.limit_per_host >= *per_host {
return Acquire::NotAvailable
}
}
}
self.reserve(key);
// check if open connection is available
// cleanup stale connections at the same time
if let Some(ref mut connections) = self.available.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.0) > self.conn_keep_alive
|| (now - conn.1.ts) > self.conn_lifetime
{
self.to_close.push(conn.1);
} else {
let mut conn = conn.1;
let mut buf = [0; 2];
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
self.to_close.push(conn);
continue
},
Ok(_) | Err(_) => continue,
}
return Acquire::Acquired(conn)
}
}
}
Acquire::Available
}
fn reserve(&mut self, key: &Key) {
self.acquired += 1;
let per_host =
if let Some(per_host) = self.acquired_per_host.get(key) {
*per_host
} else {
0
};
self.acquired_per_host.insert(key.clone(), per_host + 1);
}
fn release_key(&mut self, key: &Key) {
self.acquired -= 1;
let per_host =
if let Some(per_host) = self.acquired_per_host.get(key) {
*per_host
} else {
return
};
if per_host > 1 {
self.acquired_per_host.insert(key.clone(), per_host - 1);
} else {
self.acquired_per_host.remove(key);
}
}
fn collect(&mut self, periodic: bool) {
let now = Instant::now();
// collect half acquire keys
if let Some(keys) = self.pool.collect_keys() {
for key in keys {
self.release_key(&key);
}
}
// collect connections for close
if let Some(to_close) = self.pool.collect_close() {
for conn in to_close {
self.release_key(&conn.key);
self.to_close.push(conn);
}
}
// connection connections
if let Some(to_release) = self.pool.collect_release() {
for conn in to_release {
self.release_key(&conn.key);
// check connection lifetime and the return to available pool
if (now - conn.ts) < self.conn_lifetime {
self.available.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
}
}
}
// check keep-alive
for conns in self.available.values_mut() {
while !conns.is_empty() {
if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive
|| (now - conns[0].1.ts) > self.conn_lifetime
{
let conn = conns.pop_front().unwrap().1;
self.to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
if periodic {
let mut idx = 0;
while idx < self.to_close.len() {
match AsyncWrite::shutdown(&mut self.to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
self.to_close.swap_remove(idx);
},
}
}
}
}
fn collect_periodic(&mut self, ctx: &mut Context<Self>) {
self.collect(true);
// re-schedule next collect period
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx));
} }
} }
@ -205,6 +425,8 @@ impl Handler<Connect> for ClientConnector {
type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>; type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
self.collect(false);
let uri = &msg.uri; let uri = &msg.uri;
let conn_timeout = msg.conn_timeout; let conn_timeout = msg.conn_timeout;
@ -227,76 +449,244 @@ impl Handler<Connect> for ClientConnector {
return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)) return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported))
} }
// check if pool has task reference
if self.pool.task.borrow().is_none() {
*self.pool.task.borrow_mut() = Some(current_task());
}
let host = uri.host().unwrap().to_owned(); let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port()); let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()}; let key = Key {host, port, ssl: proto.is_secure()};
// acquire connection
let pool = if proto.is_http() { let pool = if proto.is_http() {
if let Some(mut conn) = self.pool.query(&key) { match self.acquire(&key) {
conn.pool = Some(self.pool.clone()); Acquire::Acquired(mut conn) => {
return ActorResponse::async(fut::ok(conn)) // use existing connection
} else { conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool))));
Some(Rc::clone(&self.pool)) return ActorResponse::async(fut::ok(conn))
},
Acquire::NotAvailable => {
// connection is not available, wait
let (tx, rx) = oneshot::channel();
let waiter = Waiter{ tx, conn_timeout };
self.waiters.entry(key.clone()).or_insert_with(VecDeque::new)
.push_back(waiter);
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
.and_then(|res, _, _| match res {
Ok(conn) => fut::ok(conn),
Err(err) => fut::err(err),
}));
}
Acquire::Available => {
Some(Rc::clone(&self.pool))
},
} }
} else { } else {
None None
}; };
let conn = AcquiredConn(key, pool);
ActorResponse::async( {
Connector::from_registry() ActorResponse::async(
.send(ResolveConnect::host_and_port(&key.host, port) Connector::from_registry()
.timeout(conn_timeout)) .send(ResolveConnect::host_and_port(&conn.0.host, port)
.into_actor(self) .timeout(conn_timeout))
.map_err(|_, _, _| ClientConnectorError::Disconnected) .into_actor(self)
.and_then(move |res, _act, _| { .map_err(|_, _, _| ClientConnectorError::Disconnected)
#[cfg(feature="alpn")] .and_then(move |res, _act, _| {
match res { #[cfg(feature="alpn")]
Err(err) => fut::Either::B(fut::err(err.into())), match res {
Ok(stream) => { Err(err) => fut::Either::B(fut::err(err.into())),
if proto.is_secure() { Ok(stream) => {
fut::Either::A( if proto.is_secure() {
_act.connector.connect_async(&key.host, stream) fut::Either::A(
.map_err(ClientConnectorError::SslError) _act.connector.connect_async(&conn.0.host, stream)
.map(|stream| Connection::new( .map_err(ClientConnectorError::SslError)
key, pool, Box::new(stream))) .map(|stream| Connection::new(
.into_actor(_act)) conn.0.clone(), Some(conn), Box::new(stream)))
} else { .into_actor(_act))
fut::Either::B(fut::ok( } else {
Connection::new(key, pool, Box::new(stream)))) fut::Either::B(fut::ok(
} Connection::new(
conn.0.clone(), Some(conn), Box::new(stream))))
} }
} }
}
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
match res { match res {
Err(err) => fut::Either::B(fut::err(err.into())), Err(err) => fut::Either::B(fut::err(err.into())),
Ok(stream) => { Ok(stream) => {
if proto.is_secure() { if proto.is_secure() {
fut::Either::A( fut::Either::A(
_act.connector.connect_async(&key.host, stream) _act.connector.connect_async(&conn.0.host, stream)
.map_err(ClientConnectorError::SslError) .map_err(ClientConnectorError::SslError)
.map(|stream| Connection::new( .map(|stream| Connection::new(
key, pool, Box::new(stream))) conn.0.clone(), Some(conn), Box::new(stream)))
.into_actor(_act)) .into_actor(_act))
} else { } else {
fut::Either::B(fut::ok( fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream)))) Connection::new(
} conn.0.clone(), Some(conn), Box::new(stream))))
} }
} }
}
#[cfg(not(any(feature="alpn", feature="tls")))] #[cfg(not(any(feature="alpn", feature="tls")))]
match res { match res {
Err(err) => fut::err(err.into()), Err(err) => fut::err(err.into()),
Ok(stream) => { Ok(stream) => {
if proto.is_secure() { if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported) fut::err(ClientConnectorError::SslIsNotSupported)
} else { } else {
fut::ok(Connection::new(key, pool, Box::new(stream))) fut::ok(Connection::new(
} conn.0.clone(), Some(conn), Box::new(stream)))
} }
} }
})) }
}))
}
}
}
struct Maintenance;
impl fut::ActorFuture for Maintenance
{
type Item = ();
type Error = ();
type Actor = ClientConnector;
fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>)
-> Poll<Self::Item, Self::Error>
{
// collecto connections
act.collect(false);
// check waiters
let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)};
for (key, waiters) in &mut tmp.waiters {
while let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() { continue }
match act.acquire(key) {
Acquire::Acquired(mut conn) => {
// use existing connection
conn.pool = Some(
AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))));
let _ = waiter.tx.send(Ok(conn));
},
Acquire::NotAvailable => {
waiters.push_front(waiter);
break
}
Acquire::Available =>
{
let conn = AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)));
fut::WrapFuture::<ClientConnector>::actfuture(
Connector::from_registry()
.send(ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
.timeout(waiter.conn_timeout)))
.map_err(|_, _, _| ())
.and_then(move |res, _act, _| {
#[cfg(feature="alpn")]
match res {
Err(err) => {
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
},
Ok(stream) => {
if conn.0.ssl {
fut::Either::A(
_act.connector.connect_async(&key.host, stream)
.then(move |res| {
match res {
Err(e) => {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslError(e)));
},
Ok(stream) => {
let _ = waiter.tx.send(Ok(
Connection::new(
conn.0.clone(),
Some(conn), Box::new(stream))));
}
}
Ok(())
})
.actfuture())
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(), Some(conn), Box::new(stream))));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(all(feature="tls", not(feature="alpn")))]
match res {
Err(err) => {
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
},
Ok(stream) => {
if conn.0.ssl {
fut::Either::A(
_act.connector.connect_async(&conn.0.host, stream)
.then(|res| {
match res {
Err(e) => {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslError(e)));
},
Ok(stream) => {
let _ = waiter.tx.send(Ok(
Connection::new(
conn.0.clone(), Some(conn),
Box::new(stream))));
}
}
Ok(())
})
.into_actor(_act))
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(), Some(conn), Box::new(stream))));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(not(any(feature="alpn", feature="tls")))]
match res {
Err(err) => {
let _ = waiter.tx.send(Err(err.into()));
fut::err(())
},
Ok(stream) => {
if conn.0.ssl {
let _ = waiter.tx.send(
Err(ClientConnectorError::SslIsNotSupported));
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(), Some(conn), Box::new(stream))));
};
fut::ok(())
},
}
})
.spawn(ctx);
}
}
}
}
Ok(Async::NotReady)
} }
} }
@ -357,104 +747,94 @@ impl Key {
#[derive(Debug)] #[derive(Debug)]
struct Conn(Instant, Connection); struct Conn(Instant, Connection);
enum Acquire {
Acquired(Connection),
Available,
NotAvailable,
}
struct AcquiredConn(Key, Option<Rc<Pool>>);
impl AcquiredConn {
fn close(&mut self, conn: Connection) {
if let Some(pool) = self.1.take() {
pool.close(conn);
}
}
fn release(&mut self, conn: Connection) {
if let Some(pool) = self.1.take() {
pool.release(conn);
}
}
}
impl Drop for AcquiredConn {
fn drop(&mut self) {
if let Some(pool) = self.1.take() {
pool.release_key(self.0.clone());
}
}
}
pub struct Pool { pub struct Pool {
max_size: usize, keys: RefCell<Vec<Key>>,
keep_alive: Duration,
max_lifetime: Duration,
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
to_close: RefCell<Vec<Connection>>, to_close: RefCell<Vec<Connection>>,
to_release: RefCell<Vec<Connection>>,
task: RefCell<Option<Task>>,
} }
impl Pool { impl Pool {
fn new() -> Pool { fn new() -> Pool {
Pool { Pool {
max_size: 128, keys: RefCell::new(Vec::new()),
keep_alive: Duration::from_secs(15),
max_lifetime: Duration::from_secs(75),
pool: RefCell::new(HashMap::new()),
to_close: RefCell::new(Vec::new()), to_close: RefCell::new(Vec::new()),
to_release: RefCell::new(Vec::new()),
task: RefCell::new(None),
} }
} }
fn collect(&self) { fn collect_keys(&self) -> Option<Vec<Key>> {
let mut pool = self.pool.borrow_mut(); if self.keys.borrow().is_empty() {
let mut to_close = self.to_close.borrow_mut(); None
} else {
// check keep-alive Some(mem::replace(&mut *self.keys.borrow_mut(), Vec::new()))
let now = Instant::now();
for conns in pool.values_mut() {
while !conns.is_empty() {
if (now - conns[0].0) > self.keep_alive
|| (now - conns[0].1.ts) > self.max_lifetime
{
let conn = conns.pop_front().unwrap().1;
to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
let mut idx = 0;
while idx < to_close.len() {
match AsyncWrite::shutdown(&mut to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
to_close.swap_remove(idx);
},
}
} }
} }
fn query(&self, key: &Key) -> Option<Connection> { fn collect_close(&self) -> Option<Vec<Connection>> {
let mut pool = self.pool.borrow_mut(); if self.to_close.borrow().is_empty() {
let mut to_close = self.to_close.borrow_mut(); None
} else {
if let Some(ref mut connections) = pool.get_mut(key) { Some(mem::replace(&mut *self.to_close.borrow_mut(), Vec::new()))
let now = Instant::now(); }
while let Some(conn) = connections.pop_back() { }
// check if it still usable
if (now - conn.0) > self.keep_alive fn collect_release(&self) -> Option<Vec<Connection>> {
|| (now - conn.1.ts) > self.max_lifetime if self.to_release.borrow().is_empty() {
{ None
to_close.push(conn.1); } else {
} else { Some(mem::replace(&mut *self.to_release.borrow_mut(), Vec::new()))
let mut conn = conn.1; }
let mut buf = [0; 2]; }
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), fn close(&self, conn: Connection) {
Ok(n) if n > 0 => { self.to_close.borrow_mut().push(conn);
to_close.push(conn); if let Some(ref task) = *self.task.borrow() {
continue task.notify()
},
Ok(_) | Err(_) => continue,
}
return Some(conn)
}
}
} }
None
} }
fn release(&self, conn: Connection) { fn release(&self, conn: Connection) {
if (Instant::now() - conn.ts) < self.max_lifetime { self.to_release.borrow_mut().push(conn);
let mut pool = self.pool.borrow_mut(); if let Some(ref task) = *self.task.borrow() {
if !pool.contains_key(&conn.key) { task.notify()
let key = conn.key.clone(); }
let mut vec = VecDeque::new(); }
vec.push_back(Conn(Instant::now(), conn));
pool.insert(key, vec); fn release_key(&self, key: Key) {
} else { self.keys.borrow_mut().push(key);
let vec = pool.get_mut(&conn.key).unwrap(); if let Some(ref task) = *self.task.borrow() {
vec.push_back(Conn(Instant::now(), conn)); task.notify()
if vec.len() > self.max_size {
let conn = vec.pop_front().unwrap();
self.to_close.borrow_mut().push(conn.1);
}
}
} else {
self.to_close.borrow_mut().push(conn);
} }
} }
} }
@ -463,7 +843,7 @@ impl Pool {
pub struct Connection { pub struct Connection {
key: Key, key: Key,
stream: Box<IoStream>, stream: Box<IoStream>,
pool: Option<Rc<Pool>>, pool: Option<AcquiredConn>,
ts: Instant, ts: Instant,
} }
@ -474,11 +854,8 @@ impl fmt::Debug for Connection {
} }
impl Connection { impl Connection {
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self { fn new(key: Key, pool: Option<AcquiredConn>, stream: Box<IoStream>) -> Self {
Connection { Connection {key, stream, pool, ts: Instant::now()}
key, pool, stream,
ts: Instant::now(),
}
} }
pub fn stream(&mut self) -> &mut IoStream { pub fn stream(&mut self) -> &mut IoStream {
@ -489,8 +866,14 @@ impl Connection {
Connection::new(Key::empty(), None, Box::new(io)) Connection::new(Key::empty(), None, Box::new(io))
} }
pub fn close(mut self) {
if let Some(mut pool) = self.pool.take() {
pool.close(self)
}
}
pub fn release(mut self) { pub fn release(mut self) {
if let Some(pool) = self.pool.take() { if let Some(mut pool) = self.pool.take() {
pool.release(self) pool.release(self)
} }
} }

View file

@ -458,3 +458,11 @@ impl Pipeline {
} }
} }
} }
impl Drop for Pipeline {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
conn.close()
}
}
}