1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-03 13:58:44 +00:00

pool doc nits (#1999)

This commit is contained in:
Rob Ede 2021-02-16 09:08:30 +00:00 committed by GitHub
parent c065729468
commit 17b3e7e225
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 71 deletions

View file

@ -1,5 +1,6 @@
//! Traits and structures to aid consuming and writing HTTP payloads. //! Traits and structures to aid consuming and writing HTTP payloads.
#[allow(clippy::module_inception)]
mod body; mod body;
mod body_stream; mod body_stream;
mod message_body; mod message_body;

View file

@ -26,9 +26,10 @@ pub(crate) enum ConnectionType<Io> {
H2(H2Connection), H2(H2Connection),
} }
// h2 connection has two parts: SendRequest and Connection. /// `H2Connection` has two parts: `SendRequest` and `Connection`.
// Connection is spawned as async task on runtime and H2Connection would hold a handle for ///
// this task. So it can wake up and quit the task when SendRequest is dropped. /// `Connection` is spawned as an async task on runtime and `H2Connection` holds a handle for
/// this task. Therefore, it can wake up and quit the task when SendRequest is dropped.
pub(crate) struct H2Connection { pub(crate) struct H2Connection {
handle: JoinHandle<()>, handle: JoinHandle<()>,
sender: SendRequest<Bytes>, sender: SendRequest<Bytes>,

View file

@ -25,7 +25,7 @@ pub enum ConnectError {
Resolver(Box<dyn std::error::Error>), Resolver(Box<dyn std::error::Error>),
/// No dns records /// No dns records
#[display(fmt = "No dns records found for the input")] #[display(fmt = "No DNS records found for the input")]
NoRecords, NoRecords,
/// Http2 error /// Http2 error

View file

@ -1,4 +1,5 @@
use std::cell::RefCell; //! Client connection pooling keyed on the authority part of the connection URI.
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::ops::Deref; use std::ops::Deref;
@ -7,14 +8,16 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{cell::RefCell, io};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::{sleep, Sleep}; use actix_rt::time::{sleep, Sleep};
use actix_service::Service; use actix_service::Service;
use ahash::AHashMap; use ahash::AHashMap;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use http::uri::Authority; use http::uri::Authority;
use pin_project::pin_project; use pin_project::pin_project;
use tokio::io::ReadBuf;
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
@ -41,7 +44,7 @@ impl From<Authority> for Key {
} }
} }
/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key /// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key.
pub(crate) struct ConnectionPool<S, Io> pub(crate) struct ConnectionPool<S, Io>
where where
Io: AsyncWrite + Unpin + 'static, Io: AsyncWrite + Unpin + 'static,
@ -120,16 +123,16 @@ impl<S, Io> ConnectionPool<S, Io>
where where
Io: AsyncWrite + Unpin + 'static, Io: AsyncWrite + Unpin + 'static,
{ {
/// construct a new connection pool. /// Construct a new connection pool.
/// ///
/// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed /// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed for
/// for on flight connections. /// in-flight connections.
/// ///
/// The pool can only have equal to `limit` amount of requests spawning/using Io type /// The pool can only have equal to `limit` amount of requests spawning/using Io type
/// concurrently. /// concurrently.
/// ///
/// Any requests beyond limit would be wait in fifo order and get notified in async /// Any requests beyond limit would be wait in fifo order and get notified in async manner
/// manner by [`tokio::sync::Semaphore`] /// by [`tokio::sync::Semaphore`]
pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self { pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self {
let permits = Arc::new(Semaphore::new(config.limit)); let permits = Arc::new(Semaphore::new(config.limit));
let available = RefCell::new(AHashMap::default()); let available = RefCell::new(AHashMap::default());
@ -166,9 +169,7 @@ where
type Error = ConnectError; type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>; type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { actix_service::forward_ready!(connector);
self.connector.poll_ready(cx)
}
fn call(&self, req: Connect) -> Self::Future { fn call(&self, req: Connect) -> Self::Future {
let connector = self.connector.clone(); let connector = self.connector.clone();
@ -182,49 +183,55 @@ where
}; };
// acquire an owned permit and carry it with connection // acquire an owned permit and carry it with connection
let permit = inner let permit = inner.permits.clone().acquire_owned().await.map_err(|_| {
.permits ConnectError::Io(io::Error::new(
.clone() io::ErrorKind::Other,
.acquire_owned() "failed to acquire semaphore on client connection pool",
.await ))
// TODO: use specific error for semaphore acquire error })?;
.map_err(|_| ConnectError::NoRecords)?;
// check if there is idle connection for given key. let conn = {
let mut map = inner.available.borrow_mut(); let mut conn = None;
let mut conn = None; // check if there is idle connection for given key.
if let Some(conns) = map.get_mut(&key) { let mut map = inner.available.borrow_mut();
let now = Instant::now();
while let Some(mut c) = conns.pop_front() { if let Some(conns) = map.get_mut(&key) {
// check the lifetime and drop connection that live for too long. let now = Instant::now();
if (now - c.used) > inner.config.conn_keep_alive
|| (now - c.created) > inner.config.conn_lifetime while let Some(mut c) = conns.pop_front() {
{ let config = &inner.config;
inner.close(c.conn); let idle_dur = now - c.used;
// check if the connection is still usable. let age = now - c.created;
} else { let conn_ineligible = idle_dur > config.conn_keep_alive
if let ConnectionType::H1(ref mut io) = c.conn { || age > config.conn_lifetime;
let check = ConnectionCheckFuture { io };
match check.await { if conn_ineligible {
ConnectionState::Break => { // drop connections that are too old
inner.close(c.conn); inner.close(c.conn);
continue;
}
ConnectionState::Skip => continue,
ConnectionState::Live => conn = Some(c),
}
} else { } else {
conn = Some(c); // check if the connection is still usable
if let ConnectionType::H1(ref mut io) = c.conn {
let check = ConnectionCheckFuture { io };
match check.await {
ConnectionState::Tainted => {
inner.close(c.conn);
continue;
}
ConnectionState::Skip => continue,
ConnectionState::Live => conn = Some(c),
}
} else {
conn = Some(c);
}
break;
} }
break;
} }
} };
};
// drop map early to end the borrow_mut of RefCell. conn
drop(map); };
// construct acquired. It's used to put Io type back to pool/ close the Io type. // construct acquired. It's used to put Io type back to pool/ close the Io type.
// permit is carried with the whole lifecycle of Acquired. // permit is carried with the whole lifecycle of Acquired.
@ -263,8 +270,13 @@ struct ConnectionCheckFuture<'a, Io> {
} }
enum ConnectionState { enum ConnectionState {
/// IO is pending and a new request would wake it.
Live, Live,
Break,
/// IO unexpectedly has unread data and should be dropped.
Tainted,
/// IO should be skipped but not dropped.
Skip, Skip,
} }
@ -282,13 +294,11 @@ where
let mut read_buf = ReadBuf::new(&mut buf); let mut read_buf = ReadBuf::new(&mut buf);
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) { let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
// io is pending and new data would wake up it.
Poll::Pending => ConnectionState::Live,
// io have data inside. drop it.
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => {
ConnectionState::Break ConnectionState::Tainted
} }
// otherwise skip to next.
Poll::Pending => ConnectionState::Live,
_ => ConnectionState::Skip, _ => ConnectionState::Skip,
}; };
@ -350,16 +360,17 @@ impl<Io> Acquired<Io>
where where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
// close the Io type. /// Close the IO.
pub(crate) fn close(&mut self, conn: IoConnection<Io>) { pub(crate) fn close(&mut self, conn: IoConnection<Io>) {
let (conn, _) = conn.into_inner(); let (conn, _) = conn.into_inner();
self.inner.close(conn); self.inner.close(conn);
} }
// put the Io type back to pool. /// Release IO back into pool.
pub(crate) fn release(&mut self, conn: IoConnection<Io>) { pub(crate) fn release(&mut self, conn: IoConnection<Io>) {
let (io, created) = conn.into_inner(); let (io, created) = conn.into_inner();
let Acquired { key, inner, .. } = self; let Acquired { key, inner, .. } = self;
inner inner
.available .available
.borrow_mut() .borrow_mut()
@ -371,24 +382,22 @@ where
used: Instant::now(), used: Instant::now(),
}); });
// a no op bind. used to stop clippy warning without adding allow attribute. let _ = &mut self.permit;
let _permit = &mut self.permit;
} }
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use std::{cell::Cell, io};
use std::cell::Cell;
use std::io;
use http::Uri; use http::Uri;
use super::*;
use crate::client::connection::IoConnection; use crate::client::connection::IoConnection;
// A stream type always return pending on async read. /// A stream type that always returns pending on async read.
// mock a usable tcp stream that ready to be used as client ///
/// Mocks an idle TCP stream that is ready to be used for client connections.
struct TestStream(Rc<Cell<usize>>); struct TestStream(Rc<Cell<usize>>);
impl Drop for TestStream { impl Drop for TestStream {
@ -440,9 +449,7 @@ mod test {
type Error = ConnectError; type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { actix_service::always_ready!();
unimplemented!("poll_ready is not used in test")
}
fn call(&self, _: Connect) -> Self::Future { fn call(&self, _: Connect) -> Self::Future {
self.generated.set(self.generated.get() + 1); self.generated.set(self.generated.get() + 1);