mirror of
https://github.com/actix/actix-web.git
synced 2025-01-09 00:35:56 +00:00
properly drop h2 connection (#1926)
This commit is contained in:
parent
7cfed73be8
commit
830fb2cdb2
3 changed files with 89 additions and 9 deletions
|
@ -1,9 +1,11 @@
|
|||
use std::future::Future;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io, time};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
|
||||
use actix_rt::task::JoinHandle;
|
||||
use bytes::Bytes;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use futures_util::future::{err, Either, FutureExt, Ready};
|
||||
|
@ -21,7 +23,53 @@ use super::{h1proto, h2proto};
|
|||
|
||||
pub(crate) enum ConnectionType<Io> {
|
||||
H1(Io),
|
||||
H2(SendRequest<Bytes>),
|
||||
H2(H2Connection),
|
||||
}
|
||||
|
||||
// h2 connection has two parts: SendRequest and Connection.
|
||||
// Connection is spawned as async task on runtime and H2Connection would hold a handle for
|
||||
// this task. So it can wake up and quit the task when SendRequest is dropped.
|
||||
pub(crate) struct H2Connection {
|
||||
handle: JoinHandle<()>,
|
||||
sender: SendRequest<Bytes>,
|
||||
}
|
||||
|
||||
impl H2Connection {
|
||||
pub(crate) fn new<Io>(
|
||||
sender: SendRequest<Bytes>,
|
||||
connection: h2::client::Connection<Io>,
|
||||
) -> Self
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
let handle = actix_rt::spawn(async move {
|
||||
let _ = connection.await;
|
||||
});
|
||||
|
||||
Self { handle, sender }
|
||||
}
|
||||
}
|
||||
|
||||
// wake up waker when drop
|
||||
impl Drop for H2Connection {
|
||||
fn drop(&mut self) {
|
||||
self.handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
// only expose sender type to public.
|
||||
impl Deref for H2Connection {
|
||||
type Target = SendRequest<Bytes>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.sender
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for H2Connection {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.sender
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Connection {
|
||||
|
@ -266,3 +314,35 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::net;
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_h2_connection_drop() {
|
||||
let addr = "127.0.0.1:0".parse::<net::SocketAddr>().unwrap();
|
||||
let listener = net::TcpListener::bind(addr).unwrap();
|
||||
let local = listener.local_addr().unwrap();
|
||||
|
||||
std::thread::spawn(move || while listener.accept().is_ok() {});
|
||||
|
||||
let tcp = TcpStream::connect(local).await.unwrap();
|
||||
let (sender, connection) = h2::client::handshake(tcp).await.unwrap();
|
||||
let conn = H2Connection::new(sender.clone(), connection);
|
||||
|
||||
assert!(sender.clone().ready().await.is_ok());
|
||||
assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok());
|
||||
|
||||
drop(conn);
|
||||
|
||||
match sender.ready().await {
|
||||
Ok(_) => panic!("connection should be gone and can not be ready"),
|
||||
Err(e) => assert!(e.is_io()),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,9 +22,10 @@ use super::config::ConnectorConfig;
|
|||
use super::connection::{ConnectionType, IoConnection};
|
||||
use super::error::SendRequestError;
|
||||
use super::pool::Acquired;
|
||||
use crate::client::connection::H2Connection;
|
||||
|
||||
pub(crate) async fn send_request<T, B>(
|
||||
mut io: SendRequest<Bytes>,
|
||||
mut io: H2Connection,
|
||||
head: RequestHeadType,
|
||||
body: B,
|
||||
created: time::Instant,
|
||||
|
@ -173,7 +174,7 @@ async fn send_body<B: MessageBody>(
|
|||
|
||||
/// release SendRequest object
|
||||
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
|
||||
io: SendRequest<Bytes>,
|
||||
io: H2Connection,
|
||||
pool: Option<Acquired<T>>,
|
||||
created: time::Instant,
|
||||
close: bool,
|
||||
|
|
|
@ -26,6 +26,7 @@ use super::connection::{ConnectionType, IoConnection};
|
|||
use super::error::ConnectError;
|
||||
use super::h2proto::handshake;
|
||||
use super::Connect;
|
||||
use crate::client::connection::H2Connection;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
/// Protocol version
|
||||
|
@ -139,10 +140,9 @@ where
|
|||
Some(guard.consume()),
|
||||
))
|
||||
} else {
|
||||
let (snd, connection) = handshake(io, &config).await?;
|
||||
actix_rt::spawn(connection.map(|_| ()));
|
||||
let (sender, connection) = handshake(io, &config).await?;
|
||||
Ok(IoConnection::new(
|
||||
ConnectionType::H2(snd),
|
||||
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||
Instant::now(),
|
||||
Some(guard.consume()),
|
||||
))
|
||||
|
@ -566,11 +566,10 @@ where
|
|||
|
||||
if let Some(ref mut h2) = this.h2 {
|
||||
return match Pin::new(h2).poll(cx) {
|
||||
Poll::Ready(Ok((snd, connection))) => {
|
||||
actix_rt::spawn(connection.map(|_| ()));
|
||||
Poll::Ready(Ok((sender, connection))) => {
|
||||
let rx = this.rx.take().unwrap();
|
||||
let _ = rx.send(Ok(IoConnection::new(
|
||||
ConnectionType::H2(snd),
|
||||
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||
Instant::now(),
|
||||
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||
)));
|
||||
|
|
Loading…
Reference in a new issue