1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-02 21:39:26 +00:00
actix-web/actix-http/src/client/connection.rs

339 lines
9.1 KiB
Rust
Raw Normal View History

2021-02-07 03:51:36 +00:00
use std::ops::{Deref, DerefMut};
2019-11-18 12:42:27 +00:00
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io, time};
2018-11-12 07:12:54 +00:00
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
2021-02-07 03:51:36 +00:00
use actix_rt::task::JoinHandle;
use bytes::Bytes;
use futures_core::future::LocalBoxFuture;
2019-01-29 04:41:09 +00:00
use h2::client::SendRequest;
use pin_project::pin_project;
2018-11-12 07:12:54 +00:00
2019-01-29 04:41:09 +00:00
use crate::body::MessageBody;
2019-03-28 01:53:19 +00:00
use crate::h1::ClientCodec;
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload;
2019-01-29 04:41:09 +00:00
use super::error::SendRequestError;
use super::pool::Acquired;
2019-01-29 04:41:09 +00:00
use super::{h1proto, h2proto};
pub(crate) enum ConnectionType<Io> {
H1(Io),
2021-02-07 03:51:36 +00:00
H2(H2Connection),
}
2021-02-16 09:08:30 +00:00
/// `H2Connection` has two parts: `SendRequest` and `Connection`.
///
/// `Connection` is spawned as an async task on runtime and `H2Connection` holds a handle for
/// this task. Therefore, it can wake up and quit the task when SendRequest is dropped.
2021-02-07 03:51:36 +00:00
pub(crate) struct H2Connection {
handle: JoinHandle<()>,
sender: SendRequest<Bytes>,
}
impl H2Connection {
pub(crate) fn new<Io>(
sender: SendRequest<Bytes>,
connection: h2::client::Connection<Io>,
) -> Self
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
let handle = actix_rt::spawn(async move {
let _ = connection.await;
});
Self { handle, sender }
}
}
// cancel spawned connection task on drop.
2021-02-07 03:51:36 +00:00
impl Drop for H2Connection {
fn drop(&mut self) {
self.handle.abort();
}
}
// only expose sender type to public.
impl Deref for H2Connection {
type Target = SendRequest<Bytes>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for H2Connection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
2019-01-29 04:41:09 +00:00
}
2019-01-29 18:34:27 +00:00
pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin;
2019-01-29 04:41:09 +00:00
2019-03-26 04:52:45 +00:00
/// Send request and body
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-29 04:41:09 +00:00
self,
head: H,
2019-01-29 04:41:09 +00:00
body: B,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
2019-03-28 01:53:19 +00:00
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType> + 'static>(
self,
head: H,
) -> LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
>;
2019-01-29 04:41:09 +00:00
}
2018-11-12 07:12:54 +00:00
2019-11-19 12:54:19 +00:00
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
2018-11-15 19:10:23 +00:00
/// Close connection
fn close(self: Pin<&mut Self>);
2018-11-15 19:10:23 +00:00
/// Release connection to the connection pool
fn release(self: Pin<&mut Self>);
2018-11-15 19:10:23 +00:00
}
#[doc(hidden)]
2018-11-12 07:12:54 +00:00
/// HTTP client connection
2021-02-16 08:27:14 +00:00
pub struct IoConnection<T>
where
T: AsyncWrite + Unpin + 'static,
{
2019-01-29 04:41:09 +00:00
io: Option<ConnectionType<T>>,
2018-11-12 07:12:54 +00:00
created: time::Instant,
pool: Option<Acquired<T>>,
}
2018-11-15 19:10:23 +00:00
impl<T> fmt::Debug for IoConnection<T>
2018-11-12 07:12:54 +00:00
where
2021-02-16 08:27:14 +00:00
T: AsyncWrite + Unpin + fmt::Debug + 'static,
2018-11-12 07:12:54 +00:00
{
2019-12-07 18:46:51 +00:00
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2019-01-29 04:41:09 +00:00
match self.io {
Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io),
Some(ConnectionType::H2(_)) => write!(f, "H2Connection"),
None => write!(f, "Connection(Empty)"),
}
2018-11-12 07:12:54 +00:00
}
}
2019-11-19 12:54:19 +00:00
impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
2019-01-29 04:41:09 +00:00
pub(crate) fn new(
io: ConnectionType<T>,
created: time::Instant,
pool: Option<Acquired<T>>,
) -> Self {
2018-11-15 19:10:23 +00:00
IoConnection {
2019-01-29 04:41:09 +00:00
pool,
2018-11-12 07:12:54 +00:00
created,
2018-11-15 19:10:23 +00:00
io: Some(io),
2018-11-12 07:12:54 +00:00
}
}
2019-01-29 04:41:09 +00:00
pub(crate) fn into_inner(self) -> (ConnectionType<T>, time::Instant) {
2018-11-15 19:10:23 +00:00
(self.io.unwrap(), self.created)
}
2021-02-16 08:27:14 +00:00
#[cfg(test)]
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
(self.io.unwrap(), self.created, self.pool.unwrap())
}
2018-11-15 19:10:23 +00:00
}
2019-01-29 18:34:27 +00:00
impl<T> Connection for IoConnection<T>
2019-01-29 04:41:09 +00:00
where
2019-11-18 12:42:27 +00:00
T: AsyncRead + AsyncWrite + Unpin + 'static,
2019-01-29 04:41:09 +00:00
{
2019-03-28 01:53:19 +00:00
type Io = T;
2019-01-29 04:41:09 +00:00
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-29 04:41:09 +00:00
mut self,
head: H,
2019-01-29 04:41:09 +00:00
body: B,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> {
2019-01-29 04:41:09 +00:00
match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::pin(h1proto::send_request(
io,
head.into(),
body,
self.created,
self.pool,
)),
ConnectionType::H2(io) => Box::pin(h2proto::send_request(
io,
head.into(),
body,
self.created,
self.pool,
)),
2018-11-12 07:12:54 +00:00
}
}
2019-03-28 01:53:19 +00:00
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType>>(
mut self,
head: H,
) -> LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
> {
2019-03-28 01:53:19 +00:00
match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::pin(h1proto::open_tunnel(io, head.into())),
2019-03-28 01:53:19 +00:00
ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() {
pool.release(IoConnection::new(
ConnectionType::H2(io),
self.created,
None,
));
}
Box::pin(async { Err(SendRequestError::TunnelNotSupported) })
2019-03-28 01:53:19 +00:00
}
}
}
2018-11-12 07:12:54 +00:00
}
2019-01-29 04:41:09 +00:00
#[allow(dead_code)]
pub(crate) enum EitherIoConnection<A, B>
2021-02-16 08:27:14 +00:00
where
A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static,
{
2019-01-29 04:41:09 +00:00
A(IoConnection<A>),
B(IoConnection<B>),
2018-11-12 07:12:54 +00:00
}
impl<A, B> Connection for EitherIoConnection<A, B>
2019-01-29 04:41:09 +00:00
where
2019-11-18 12:42:27 +00:00
A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static,
2019-01-29 04:41:09 +00:00
{
2019-03-28 01:53:19 +00:00
type Io = EitherIo<A, B>;
2019-01-29 04:41:09 +00:00
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-29 04:41:09 +00:00
self,
head: H,
2019-01-29 04:41:09 +00:00
body: RB,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> {
2019-01-29 04:41:09 +00:00
match self {
EitherIoConnection::A(con) => con.send_request(head, body),
EitherIoConnection::B(con) => con.send_request(head, body),
2019-01-29 04:41:09 +00:00
}
2018-11-12 07:12:54 +00:00
}
2019-03-28 01:53:19 +00:00
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType> + 'static>(
self,
head: H,
) -> LocalBoxFuture<
2019-11-18 12:42:27 +00:00
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
> {
2019-03-28 01:53:19 +00:00
match self {
EitherIoConnection::A(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::A)))
}),
EitherIoConnection::B(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::B)))
}),
2019-03-28 01:53:19 +00:00
}
}
}
#[pin_project(project = EitherIoProj)]
2019-03-28 01:53:19 +00:00
pub enum EitherIo<A, B> {
2019-11-19 12:54:19 +00:00
A(#[pin] A),
B(#[pin] B),
2019-03-28 01:53:19 +00:00
}
2019-11-18 12:42:27 +00:00
impl<A, B> AsyncRead for EitherIo<A, B>
2019-03-28 01:53:19 +00:00
where
2019-11-19 12:54:19 +00:00
A: AsyncRead,
B: AsyncRead,
2019-03-28 01:53:19 +00:00
{
2019-11-18 12:42:27 +00:00
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
2019-11-19 12:54:19 +00:00
match self.project() {
EitherIoProj::A(val) => val.poll_read(cx, buf),
EitherIoProj::B(val) => val.poll_read(cx, buf),
2019-03-28 01:53:19 +00:00
}
}
}
2019-11-18 12:42:27 +00:00
impl<A, B> AsyncWrite for EitherIo<A, B>
2019-03-28 01:53:19 +00:00
where
2019-11-19 12:54:19 +00:00
A: AsyncWrite,
B: AsyncWrite,
2019-03-28 01:53:19 +00:00
{
2019-11-18 12:42:27 +00:00
fn poll_write(
self: Pin<&mut Self>,
2019-12-07 18:46:51 +00:00
cx: &mut Context<'_>,
2019-11-18 12:42:27 +00:00
buf: &[u8],
) -> Poll<io::Result<usize>> {
2019-11-19 12:54:19 +00:00
match self.project() {
EitherIoProj::A(val) => val.poll_write(cx, buf),
EitherIoProj::B(val) => val.poll_write(cx, buf),
2019-03-28 01:53:19 +00:00
}
}
2019-11-18 12:42:27 +00:00
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
2019-11-19 12:54:19 +00:00
match self.project() {
EitherIoProj::A(val) => val.poll_flush(cx),
EitherIoProj::B(val) => val.poll_flush(cx),
2019-03-28 01:53:19 +00:00
}
}
2019-11-18 12:42:27 +00:00
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
2019-11-19 12:54:19 +00:00
match self.project() {
EitherIoProj::A(val) => val.poll_shutdown(cx),
EitherIoProj::B(val) => val.poll_shutdown(cx),
2019-03-28 01:53:19 +00:00
}
}
2018-11-12 07:12:54 +00:00
}
2021-02-07 03:51:36 +00:00
#[cfg(test)]
mod test {
use std::net;
use actix_rt::net::TcpStream;
use super::*;
#[actix_rt::test]
async fn test_h2_connection_drop() {
let addr = "127.0.0.1:0".parse::<net::SocketAddr>().unwrap();
let listener = net::TcpListener::bind(addr).unwrap();
let local = listener.local_addr().unwrap();
std::thread::spawn(move || while listener.accept().is_ok() {});
let tcp = TcpStream::connect(local).await.unwrap();
let (sender, connection) = h2::client::handshake(tcp).await.unwrap();
let conn = H2Connection::new(sender.clone(), connection);
assert!(sender.clone().ready().await.is_ok());
assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok());
drop(conn);
match sender.ready().await {
Ok(_) => panic!("connection should be gone and can not be ready"),
Err(e) => assert!(e.is_io()),
};
}
}