diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index d18af162d..138210f95 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -7,6 +7,7 @@ * Migrate cookie handling to `cookie` crate. * Update `sha-1` to 0.9 * MSRV is now 1.41.1 +* Fix client leak [#1580] ## [2.0.0-alpha.4] - 2020-05-21 diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 5a10725b0..3ce443794 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -2,7 +2,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -53,16 +53,25 @@ where + 'static, { pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self { + let connector_rc = Rc::new(RefCell::new(connector)); + let inner_rc = Rc::new(RefCell::new(Inner { + config, + acquired: 0, + waiters: Slab::new(), + waiters_queue: IndexSet::new(), + available: FxHashMap::default(), + waker: LocalWaker::new(), + })); + + // start support future + actix_rt::spawn(ConnectorPoolSupport { + connector: connector_rc.clone(), + inner: Rc::downgrade(&inner_rc), + }); + ConnectionPool( - Rc::new(RefCell::new(connector)), - Rc::new(RefCell::new(Inner { - config, - acquired: 0, - waiters: Slab::new(), - waiters_queue: IndexSet::new(), - available: FxHashMap::default(), - waker: LocalWaker::new(), - })), + connector_rc, + inner_rc, ) } } @@ -92,12 +101,6 @@ where } fn call(&mut self, req: Connect) -> Self::Future { - // start support future - actix_rt::spawn(ConnectorPoolSupport { - connector: self.0.clone(), - inner: self.1.clone(), - }); - let mut connector = self.0.clone(); let inner = self.1.clone(); @@ -421,7 +424,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, { connector: T, - inner: Rc>>, + inner: Weak>>, } impl Future for ConnectorPoolSupport @@ -435,51 +438,55 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let mut inner = this.inner.as_ref().borrow_mut(); - inner.waker.register(cx.waker()); + if let Some(this_inner) = this.inner.upgrade() { + let mut inner = this_inner.as_ref().borrow_mut(); + inner.waker.register(cx.waker()); - // check waiters - loop { - let (key, token) = { - if let Some((key, token)) = inner.waiters_queue.get_index(0) { - (key.clone(), *token) - } else { - break; + // check waiters + loop { + let (key, token) = { + if let Some((key, token)) = inner.waiters_queue.get_index(0) { + (key.clone(), *token) + } else { + break; + } + }; + if inner.waiters.get(token).unwrap().is_none() { + continue; } - }; - if inner.waiters.get(token).unwrap().is_none() { - continue; - } - match inner.acquire(&key, cx) { - Acquire::NotAvailable => break, - Acquire::Acquired(io, created) => { - let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; - if let Err(conn) = tx.send(Ok(IoConnection::new( - io, - created, - Some(Acquired(key.clone(), Some(this.inner.clone()))), - ))) { - let (io, created) = conn.unwrap().into_inner(); - inner.release_conn(&key, io, created); + match inner.acquire(&key, cx) { + Acquire::NotAvailable => break, + Acquire::Acquired(io, created) => { + let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; + if let Err(conn) = tx.send(Ok(IoConnection::new( + io, + created, + Some(Acquired(key.clone(), Some(this_inner.clone()))), + ))) { + let (io, created) = conn.unwrap().into_inner(); + inner.release_conn(&key, io, created); + } + } + Acquire::Available => { + let (connect, tx) = + inner.waiters.get_mut(token).unwrap().take().unwrap(); + OpenWaitingConnection::spawn( + key.clone(), + tx, + this_inner.clone(), + this.connector.call(connect), + inner.config.clone(), + ); } } - Acquire::Available => { - let (connect, tx) = - inner.waiters.get_mut(token).unwrap().take().unwrap(); - OpenWaitingConnection::spawn( - key.clone(), - tx, - this.inner.clone(), - this.connector.call(connect), - inner.config.clone(), - ); - } + let _ = inner.waiters_queue.swap_remove_index(0); } - let _ = inner.waiters_queue.swap_remove_index(0); - } - Poll::Pending + Poll::Pending + } else { + Poll::Ready(()) + } } } diff --git a/awc/src/request.rs b/awc/src/request.rs index 3dd8cb2ce..c34a8e221 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -586,16 +586,16 @@ mod tests { use super::*; use crate::Client; - #[test] - fn test_debug() { + #[actix_rt::test] + async fn test_debug() { let request = Client::new().get("/").header("x-test", "111"); let repr = format!("{:?}", request); assert!(repr.contains("ClientRequest")); assert!(repr.contains("x-test")); } - #[test] - fn test_basics() { + #[actix_rt::test] + async fn test_basics() { let mut req = Client::new() .put("/") .version(Version::HTTP_2) @@ -621,8 +621,8 @@ mod tests { let _ = req.send_body(""); } - #[test] - fn test_client_header() { + #[actix_rt::test] + async fn test_client_header() { let req = Client::build() .header(header::CONTENT_TYPE, "111") .finish() @@ -639,8 +639,8 @@ mod tests { ); } - #[test] - fn test_client_header_override() { + #[actix_rt::test] + async fn test_client_header_override() { let req = Client::build() .header(header::CONTENT_TYPE, "111") .finish() @@ -658,8 +658,8 @@ mod tests { ); } - #[test] - fn client_basic_auth() { + #[actix_rt::test] + async fn client_basic_auth() { let req = Client::new() .get("/") .basic_auth("username", Some("password")); @@ -685,8 +685,8 @@ mod tests { ); } - #[test] - fn client_bearer_auth() { + #[actix_rt::test] + async fn client_bearer_auth() { let req = Client::new().get("/").bearer_auth("someS3cr3tAutht0k3n"); assert_eq!( req.head @@ -699,8 +699,8 @@ mod tests { ); } - #[test] - fn client_query() { + #[actix_rt::test] + async fn client_query() { let req = Client::new() .get("/") .query(&[("key1", "val1"), ("key2", "val2")]) diff --git a/benches/server.rs b/benches/server.rs index 70531adf7..041d0fa57 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -27,15 +27,16 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ // benchmark sending all requests at the same time fn bench_async_burst(c: &mut Criterion) { + // We are using System here, since Runtime requires preinitialized tokio + // Maybe add to actix_rt docs + let mut rt = actix_rt::System::new("test"); + let srv = test::start(|| { App::new() .service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR)))) }); - // We are using System here, since Runtime requires preinitialized tokio - // Maybe add to actix_rt docs let url = srv.url("/"); - let mut rt = actix_rt::System::new("test"); c.bench_function("get_body_async_burst", move |b| { b.iter_custom(|iters| {