1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-02 05:18:44 +00:00

fix awc pool leak (#1626)

This commit is contained in:
fakeshadow 2020-08-10 04:49:43 +08:00 committed by GitHub
parent 187646b2f9
commit 160995b8d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 48 deletions

View file

@ -1,9 +1,13 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
### Fixed
* Memory leak of `client::pool::ConnectorPoolSupport`. [#1626]
[#1626]: https://github.com/actix/actix-web/pull/1626
## 2.0.0-beta.2 - 2020-07-21 ## [2.0.0-beta.2] - 2020-07-21
### Fixed ### Fixed
* Potential UB in h1 decoder using uninitialized memory. [#1614] * Potential UB in h1 decoder using uninitialized memory. [#1614]

View file

@ -2,7 +2,7 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::rc::{Rc, Weak}; use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -65,8 +65,8 @@ where
// start support future // start support future
actix_rt::spawn(ConnectorPoolSupport { actix_rt::spawn(ConnectorPoolSupport {
connector: connector_rc.clone(), connector: Rc::clone(&connector_rc),
inner: Rc::downgrade(&inner_rc), inner: Rc::clone(&inner_rc),
}); });
ConnectionPool(connector_rc, inner_rc) ConnectionPool(connector_rc, inner_rc)
@ -82,6 +82,13 @@ where
} }
} }
impl<T, Io> Drop for ConnectionPool<T, Io> {
fn drop(&mut self) {
// wake up the ConnectorPoolSupport when dropping so it can exit properly.
self.1.borrow().waker.wake();
}
}
impl<T, Io> Service for ConnectionPool<T, Io> impl<T, Io> Service for ConnectionPool<T, Io>
where where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
@ -421,7 +428,7 @@ where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
connector: T, connector: T,
inner: Weak<RefCell<Inner<Io>>>, inner: Rc<RefCell<Inner<Io>>>,
} }
impl<T, Io> Future for ConnectorPoolSupport<T, Io> impl<T, Io> Future for ConnectorPoolSupport<T, Io>
@ -435,55 +442,57 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
if let Some(this_inner) = this.inner.upgrade() { if Rc::strong_count(this.inner) == 1 {
let mut inner = this_inner.as_ref().borrow_mut(); // If we are last copy of Inner<Io> it means the ConnectionPool is already gone
inner.waker.register(cx.waker()); // and we are safe to exit.
return Poll::Ready(());
}
// check waiters let mut inner = this.inner.borrow_mut();
loop { inner.waker.register(cx.waker());
let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token)
} else {
break;
}
};
if inner.waiters.get(token).unwrap().is_none() {
continue;
}
match inner.acquire(&key, cx) { // check waiters
Acquire::NotAvailable => break, loop {
Acquire::Acquired(io, created) => { let (key, token) = {
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; if let Some((key, token)) = inner.waiters_queue.get_index(0) {
if let Err(conn) = tx.send(Ok(IoConnection::new( (key.clone(), *token)
io, } else {
created, break;
Some(Acquired(key.clone(), Some(this_inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this_inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
}
} }
let _ = inner.waiters_queue.swap_remove_index(0); };
if inner.waiters.get(token).unwrap().is_none() {
continue;
} }
Poll::Pending match inner.acquire(&key, cx) {
} else { Acquire::NotAvailable => break,
Poll::Ready(()) Acquire::Acquired(io, created) => {
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Some(Acquired(key.clone(), Some(this.inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this.inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
}
}
let _ = inner.waiters_queue.swap_remove_index(0);
} }
Poll::Pending
} }
} }