diff --git a/README.md b/README.md index f177075..c369462 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,7 @@ LOCAL_DOMAINS=masto.asonix.dog LOCAL_BLURB="

Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!

" PROMETHEUS_ADDR=0.0.0.0 PROMETHEUS_PORT=9000 +CLIENT_POOL_SIZE=20 ``` #### Descriptions @@ -154,6 +155,11 @@ Optional - description for the relay Optional - Address to bind to for serving the prometheus scrape endpoint ##### `PROMETHEUS_PORT` Optional - Port to bind to for serving the prometheus scrape endpoint +##### `CLIENT_POOL_SIZE` +Optional - How many connections the relay should maintain per thread. This value will be multiplied +by two times the number of cores available to the relay. This defaults to 20, so a 4-core machine +will have a maximum of 160 simultaneous outbound connections. If you run into problems related to +"Too many open files", you can either decrease this number or increase the ulimit for your system. ### Subscribing Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. diff --git a/src/config.rs b/src/config.rs index db21f9d..bf58533 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,6 +45,7 @@ pub(crate) struct ParsedConfig { local_blurb: Option, prometheus_addr: Option, prometheus_port: Option, + client_pool_size: usize, } #[derive(Clone)] @@ -68,6 +69,7 @@ pub struct Config { local_domains: Vec, local_blurb: Option, prometheus_config: Option, + client_pool_size: usize, } #[derive(Clone)] @@ -135,6 +137,7 @@ impl std::fmt::Debug for Config { .field("local_domains", &self.local_domains) .field("local_blurb", &self.local_blurb) .field("prometheus_config", &self.prometheus_config) + .field("client_pool_size", &self.client_pool_size) .finish() } } @@ -164,6 +167,7 @@ impl Config { .set_default("local_blurb", None as Option<&str>)? .set_default("prometheus_addr", None as Option<&str>)? .set_default("prometheus_port", None as Option)? + .set_default("client_pool_size", 20u64)? .add_source(Environment::default()) .build()?; @@ -235,6 +239,7 @@ impl Config { local_domains, local_blurb: config.local_blurb, prometheus_config, + client_pool_size: config.client_pool_size, }) } @@ -414,6 +419,10 @@ impl Config { ) } + pub(crate) fn client_pool_size(&self) -> usize { + self.client_pool_size + } + pub(crate) fn source_code(&self) -> &IriString { &self.source_repo } diff --git a/src/data/state.rs b/src/data/state.rs index 4387974..d6c484b 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -47,6 +47,7 @@ impl State { config.user_agent(), self.breakers.clone(), self.last_online.clone(), + config.client_pool_size(), ) } diff --git a/src/main.rs b/src/main.rs index 5f733d7..10f764c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -148,7 +148,7 @@ fn client_main(config: Config, args: Args) -> JoinHandle Result<(), anyhow::Error> { - let client = requests::build_client(&config.user_agent()); + let client = requests::build_client(&config.user_agent(), config.client_pool_size()); if !args.blocks().is_empty() || !args.allowed().is_empty() { if args.undo() { diff --git a/src/requests.rs b/src/requests.rs index 0a2238e..efd3a99 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -4,7 +4,7 @@ use crate::{ }; use activitystreams::iri_string::types::IriString; use actix_web::http::header::Date; -use awc::{error::SendRequestError, Client, ClientResponse}; +use awc::{error::SendRequestError, Client, ClientResponse, Connector}; use base64::{engine::general_purpose::STANDARD, Engine}; use dashmap::DashMap; use http_signature_normalization_actix::prelude::*; @@ -145,6 +145,7 @@ impl Default for Breaker { #[derive(Clone)] pub(crate) struct Requests { + pool_size: usize, client: Rc>, consecutive_errors: Rc, error_limit: usize, @@ -159,6 +160,7 @@ pub(crate) struct Requests { impl std::fmt::Debug for Requests { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Requests") + .field("pool_size", &self.pool_size) .field("error_limit", &self.error_limit) .field("key_id", &self.key_id) .field("user_agent", &self.user_agent) @@ -168,8 +170,11 @@ impl std::fmt::Debug for Requests { } } -pub(crate) fn build_client(user_agent: &str) -> Client { +pub(crate) fn build_client(user_agent: &str, pool_size: usize) -> Client { + let connector = Connector::new().limit(pool_size); + Client::builder() + .connector(connector) .wrap(Tracing) .add_default_header(("User-Agent", user_agent.to_string())) .timeout(Duration::from_secs(15)) @@ -183,9 +188,11 @@ impl Requests { user_agent: String, breakers: Breakers, last_online: Arc, + pool_size: usize, ) -> Self { Requests { - client: Rc::new(RefCell::new(build_client(&user_agent))), + pool_size, + client: Rc::new(RefCell::new(build_client(&user_agent, pool_size))), consecutive_errors: Rc::new(AtomicUsize::new(0)), error_limit: 3, key_id, @@ -205,7 +212,7 @@ impl Requests { let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); if count + 1 >= self.error_limit { tracing::warn!("{} consecutive errors, rebuilding http client", count + 1); - *self.client.borrow_mut() = build_client(&self.user_agent); + *self.client.borrow_mut() = build_client(&self.user_agent, self.pool_size); self.reset_err(); } }