diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index 1080c67e2..a4d6ba2b6 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -1,5 +1,6 @@ //! Traits and structures to aid consuming and writing HTTP payloads. +#[allow(clippy::module_inception)] mod body; mod body_stream; mod message_body; diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 0b2c6e1df..3ab902422 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -26,9 +26,10 @@ pub(crate) enum ConnectionType { H2(H2Connection), } -// h2 connection has two parts: SendRequest and Connection. -// Connection is spawned as async task on runtime and H2Connection would hold a handle for -// this task. So it can wake up and quit the task when SendRequest is dropped. +/// `H2Connection` has two parts: `SendRequest` and `Connection`. +/// +/// `Connection` is spawned as an async task on runtime and `H2Connection` holds a handle for +/// this task. Therefore, it can wake up and quit the task when SendRequest is dropped. pub(crate) struct H2Connection { handle: JoinHandle<()>, sender: SendRequest, diff --git a/actix-http/src/client/error.rs b/actix-http/src/client/error.rs index 7768462b8..d27363456 100644 --- a/actix-http/src/client/error.rs +++ b/actix-http/src/client/error.rs @@ -25,7 +25,7 @@ pub enum ConnectError { Resolver(Box), /// No dns records - #[display(fmt = "No dns records found for the input")] + #[display(fmt = "No DNS records found for the input")] NoRecords, /// Http2 error diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index b0b1613ab..3800696fa 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -1,4 +1,5 @@ -use std::cell::RefCell; +//! Client connection pooling keyed on the authority part of the connection URI. + use std::collections::VecDeque; use std::future::Future; use std::ops::Deref; @@ -7,14 +8,16 @@ use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use std::{cell::RefCell, io}; -use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; +use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use ahash::AHashMap; use futures_core::future::LocalBoxFuture; use http::uri::Authority; use pin_project::pin_project; +use tokio::io::ReadBuf; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::config::ConnectorConfig; @@ -41,7 +44,7 @@ impl From for Key { } } -/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key +/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key. pub(crate) struct ConnectionPool where Io: AsyncWrite + Unpin + 'static, @@ -120,16 +123,16 @@ impl ConnectionPool where Io: AsyncWrite + Unpin + 'static, { - /// construct a new connection pool. + /// Construct a new connection pool. /// - /// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed - /// for on flight connections. + /// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed for + /// in-flight connections. /// /// The pool can only have equal to `limit` amount of requests spawning/using Io type /// concurrently. /// - /// Any requests beyond limit would be wait in fifo order and get notified in async - /// manner by [`tokio::sync::Semaphore`] + /// Any requests beyond limit would be wait in fifo order and get notified in async manner + /// by [`tokio::sync::Semaphore`] pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self { let permits = Arc::new(Semaphore::new(config.limit)); let available = RefCell::new(AHashMap::default()); @@ -166,9 +169,7 @@ where type Error = ConnectError; type Future = LocalBoxFuture<'static, Result, ConnectError>>; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.connector.poll_ready(cx) - } + actix_service::forward_ready!(connector); fn call(&self, req: Connect) -> Self::Future { let connector = self.connector.clone(); @@ -182,49 +183,55 @@ where }; // acquire an owned permit and carry it with connection - let permit = inner - .permits - .clone() - .acquire_owned() - .await - // TODO: use specific error for semaphore acquire error - .map_err(|_| ConnectError::NoRecords)?; + let permit = inner.permits.clone().acquire_owned().await.map_err(|_| { + ConnectError::Io(io::Error::new( + io::ErrorKind::Other, + "failed to acquire semaphore on client connection pool", + )) + })?; - // check if there is idle connection for given key. - let mut map = inner.available.borrow_mut(); + let conn = { + let mut conn = None; - let mut conn = None; - if let Some(conns) = map.get_mut(&key) { - let now = Instant::now(); - while let Some(mut c) = conns.pop_front() { - // check the lifetime and drop connection that live for too long. - if (now - c.used) > inner.config.conn_keep_alive - || (now - c.created) > inner.config.conn_lifetime - { - inner.close(c.conn); - // check if the connection is still usable. - } else { - if let ConnectionType::H1(ref mut io) = c.conn { - let check = ConnectionCheckFuture { io }; - match check.await { - ConnectionState::Break => { - inner.close(c.conn); - continue; - } - ConnectionState::Skip => continue, - ConnectionState::Live => conn = Some(c), - } + // check if there is idle connection for given key. + let mut map = inner.available.borrow_mut(); + + if let Some(conns) = map.get_mut(&key) { + let now = Instant::now(); + + while let Some(mut c) = conns.pop_front() { + let config = &inner.config; + let idle_dur = now - c.used; + let age = now - c.created; + let conn_ineligible = idle_dur > config.conn_keep_alive + || age > config.conn_lifetime; + + if conn_ineligible { + // drop connections that are too old + inner.close(c.conn); } else { - conn = Some(c); + // check if the connection is still usable + if let ConnectionType::H1(ref mut io) = c.conn { + let check = ConnectionCheckFuture { io }; + match check.await { + ConnectionState::Tainted => { + inner.close(c.conn); + continue; + } + ConnectionState::Skip => continue, + ConnectionState::Live => conn = Some(c), + } + } else { + conn = Some(c); + } + + break; } - - break; } - } - }; + }; - // drop map early to end the borrow_mut of RefCell. - drop(map); + conn + }; // construct acquired. It's used to put Io type back to pool/ close the Io type. // permit is carried with the whole lifecycle of Acquired. @@ -263,8 +270,13 @@ struct ConnectionCheckFuture<'a, Io> { } enum ConnectionState { + /// IO is pending and a new request would wake it. Live, - Break, + + /// IO unexpectedly has unread data and should be dropped. + Tainted, + + /// IO should be skipped but not dropped. Skip, } @@ -282,13 +294,11 @@ where let mut read_buf = ReadBuf::new(&mut buf); let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) { - // io is pending and new data would wake up it. - Poll::Pending => ConnectionState::Live, - // io have data inside. drop it. Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { - ConnectionState::Break + ConnectionState::Tainted } - // otherwise skip to next. + + Poll::Pending => ConnectionState::Live, _ => ConnectionState::Skip, }; @@ -350,16 +360,17 @@ impl Acquired where Io: AsyncRead + AsyncWrite + Unpin + 'static, { - // close the Io type. + /// Close the IO. pub(crate) fn close(&mut self, conn: IoConnection) { let (conn, _) = conn.into_inner(); self.inner.close(conn); } - // put the Io type back to pool. + /// Release IO back into pool. pub(crate) fn release(&mut self, conn: IoConnection) { let (io, created) = conn.into_inner(); let Acquired { key, inner, .. } = self; + inner .available .borrow_mut() @@ -371,24 +382,22 @@ where used: Instant::now(), }); - // a no op bind. used to stop clippy warning without adding allow attribute. - let _permit = &mut self.permit; + let _ = &mut self.permit; } } #[cfg(test)] mod test { - use super::*; - - use std::cell::Cell; - use std::io; + use std::{cell::Cell, io}; use http::Uri; + use super::*; use crate::client::connection::IoConnection; - // A stream type always return pending on async read. - // mock a usable tcp stream that ready to be used as client + /// A stream type that always returns pending on async read. + /// + /// Mocks an idle TCP stream that is ready to be used for client connections. struct TestStream(Rc>); impl Drop for TestStream { @@ -440,9 +449,7 @@ mod test { type Error = ConnectError; type Future = LocalBoxFuture<'static, Result>; - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - unimplemented!("poll_ready is not used in test") - } + actix_service::always_ready!(); fn call(&self, _: Connect) -> Self::Future { self.generated.set(self.generated.get() + 1);