From 830fb2cdb2e52c97ffc7a53e1469aaf5d5e39f1a Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 19:51:36 -0800 Subject: [PATCH] properly drop h2 connection (#1926) --- actix-http/src/client/connection.rs | 82 ++++++++++++++++++++++++++++- actix-http/src/client/h2proto.rs | 5 +- actix-http/src/client/pool.rs | 11 ++-- 3 files changed, 89 insertions(+), 9 deletions(-) diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 26d392120..4c6a6dcb8 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -1,9 +1,11 @@ use std::future::Future; +use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; +use actix_rt::task::JoinHandle; use bytes::Bytes; use futures_core::future::LocalBoxFuture; use futures_util::future::{err, Either, FutureExt, Ready}; @@ -21,7 +23,53 @@ use super::{h1proto, h2proto}; pub(crate) enum ConnectionType { H1(Io), - H2(SendRequest), + H2(H2Connection), +} + +// h2 connection has two parts: SendRequest and Connection. +// Connection is spawned as async task on runtime and H2Connection would hold a handle for +// this task. So it can wake up and quit the task when SendRequest is dropped. +pub(crate) struct H2Connection { + handle: JoinHandle<()>, + sender: SendRequest, +} + +impl H2Connection { + pub(crate) fn new( + sender: SendRequest, + connection: h2::client::Connection, + ) -> Self + where + Io: AsyncRead + AsyncWrite + Unpin + 'static, + { + let handle = actix_rt::spawn(async move { + let _ = connection.await; + }); + + Self { handle, sender } + } +} + +// wake up waker when drop +impl Drop for H2Connection { + fn drop(&mut self) { + self.handle.abort(); + } +} + +// only expose sender type to public. +impl Deref for H2Connection { + type Target = SendRequest; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for H2Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } } pub trait Connection { @@ -266,3 +314,35 @@ where } } } + +#[cfg(test)] +mod test { + use std::net; + + use actix_rt::net::TcpStream; + + use super::*; + + #[actix_rt::test] + async fn test_h2_connection_drop() { + let addr = "127.0.0.1:0".parse::().unwrap(); + let listener = net::TcpListener::bind(addr).unwrap(); + let local = listener.local_addr().unwrap(); + + std::thread::spawn(move || while listener.accept().is_ok() {}); + + let tcp = TcpStream::connect(local).await.unwrap(); + let (sender, connection) = h2::client::handshake(tcp).await.unwrap(); + let conn = H2Connection::new(sender.clone(), connection); + + assert!(sender.clone().ready().await.is_ok()); + assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok()); + + drop(conn); + + match sender.ready().await { + Ok(_) => panic!("connection should be gone and can not be ready"), + Err(e) => assert!(e.is_io()), + }; + } +} diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 76915f214..a70bc1738 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -22,9 +22,10 @@ use super::config::ConnectorConfig; use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; use super::pool::Acquired; +use crate::client::connection::H2Connection; pub(crate) async fn send_request( - mut io: SendRequest, + mut io: H2Connection, head: RequestHeadType, body: B, created: time::Instant, @@ -173,7 +174,7 @@ async fn send_body( /// release SendRequest object fn release( - io: SendRequest, + io: H2Connection, pool: Option>, created: time::Instant, close: bool, diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 1eebef53b..867ba5c0c 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -26,6 +26,7 @@ use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; use super::h2proto::handshake; use super::Connect; +use crate::client::connection::H2Connection; #[derive(Clone, Copy, PartialEq)] /// Protocol version @@ -139,10 +140,9 @@ where Some(guard.consume()), )) } else { - let (snd, connection) = handshake(io, &config).await?; - actix_rt::spawn(connection.map(|_| ())); + let (sender, connection) = handshake(io, &config).await?; Ok(IoConnection::new( - ConnectionType::H2(snd), + ConnectionType::H2(H2Connection::new(sender, connection)), Instant::now(), Some(guard.consume()), )) @@ -566,11 +566,10 @@ where if let Some(ref mut h2) = this.h2 { return match Pin::new(h2).poll(cx) { - Poll::Ready(Ok((snd, connection))) => { - actix_rt::spawn(connection.map(|_| ())); + Poll::Ready(Ok((sender, connection))) => { let rx = this.rx.take().unwrap(); let _ = rx.send(Ok(IoConnection::new( - ConnectionType::H2(snd), + ConnectionType::H2(H2Connection::new(sender, connection)), Instant::now(), Some(Acquired(this.key.clone(), this.inner.take())), )));