forked from mirrors/relay
Add ability to tweak client pool size
This commit is contained in:
parent
18ff2864a0
commit
8d565a1fbe
5 changed files with 28 additions and 5 deletions
|
@ -105,6 +105,7 @@ LOCAL_DOMAINS=masto.asonix.dog
|
||||||
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
|
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
|
||||||
PROMETHEUS_ADDR=0.0.0.0
|
PROMETHEUS_ADDR=0.0.0.0
|
||||||
PROMETHEUS_PORT=9000
|
PROMETHEUS_PORT=9000
|
||||||
|
CLIENT_POOL_SIZE=20
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Descriptions
|
#### Descriptions
|
||||||
|
@ -154,6 +155,11 @@ Optional - description for the relay
|
||||||
Optional - Address to bind to for serving the prometheus scrape endpoint
|
Optional - Address to bind to for serving the prometheus scrape endpoint
|
||||||
##### `PROMETHEUS_PORT`
|
##### `PROMETHEUS_PORT`
|
||||||
Optional - Port to bind to for serving the prometheus scrape endpoint
|
Optional - Port to bind to for serving the prometheus scrape endpoint
|
||||||
|
##### `CLIENT_POOL_SIZE`
|
||||||
|
Optional - How many connections the relay should maintain per thread. This value will be multiplied
|
||||||
|
by two times the number of cores available to the relay. This defaults to 20, so a 4-core machine
|
||||||
|
will have a maximum of 160 simultaneous outbound connections. If you run into problems related to
|
||||||
|
"Too many open files", you can either decrease this number or increase the ulimit for your system.
|
||||||
|
|
||||||
### Subscribing
|
### Subscribing
|
||||||
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
|
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
|
||||||
|
|
|
@ -45,6 +45,7 @@ pub(crate) struct ParsedConfig {
|
||||||
local_blurb: Option<String>,
|
local_blurb: Option<String>,
|
||||||
prometheus_addr: Option<IpAddr>,
|
prometheus_addr: Option<IpAddr>,
|
||||||
prometheus_port: Option<u16>,
|
prometheus_port: Option<u16>,
|
||||||
|
client_pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -68,6 +69,7 @@ pub struct Config {
|
||||||
local_domains: Vec<String>,
|
local_domains: Vec<String>,
|
||||||
local_blurb: Option<String>,
|
local_blurb: Option<String>,
|
||||||
prometheus_config: Option<PrometheusConfig>,
|
prometheus_config: Option<PrometheusConfig>,
|
||||||
|
client_pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -135,6 +137,7 @@ impl std::fmt::Debug for Config {
|
||||||
.field("local_domains", &self.local_domains)
|
.field("local_domains", &self.local_domains)
|
||||||
.field("local_blurb", &self.local_blurb)
|
.field("local_blurb", &self.local_blurb)
|
||||||
.field("prometheus_config", &self.prometheus_config)
|
.field("prometheus_config", &self.prometheus_config)
|
||||||
|
.field("client_pool_size", &self.client_pool_size)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,6 +167,7 @@ impl Config {
|
||||||
.set_default("local_blurb", None as Option<&str>)?
|
.set_default("local_blurb", None as Option<&str>)?
|
||||||
.set_default("prometheus_addr", None as Option<&str>)?
|
.set_default("prometheus_addr", None as Option<&str>)?
|
||||||
.set_default("prometheus_port", None as Option<u16>)?
|
.set_default("prometheus_port", None as Option<u16>)?
|
||||||
|
.set_default("client_pool_size", 20u64)?
|
||||||
.add_source(Environment::default())
|
.add_source(Environment::default())
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
|
@ -235,6 +239,7 @@ impl Config {
|
||||||
local_domains,
|
local_domains,
|
||||||
local_blurb: config.local_blurb,
|
local_blurb: config.local_blurb,
|
||||||
prometheus_config,
|
prometheus_config,
|
||||||
|
client_pool_size: config.client_pool_size,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,6 +419,10 @@ impl Config {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn client_pool_size(&self) -> usize {
|
||||||
|
self.client_pool_size
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn source_code(&self) -> &IriString {
|
pub(crate) fn source_code(&self) -> &IriString {
|
||||||
&self.source_repo
|
&self.source_repo
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ impl State {
|
||||||
config.user_agent(),
|
config.user_agent(),
|
||||||
self.breakers.clone(),
|
self.breakers.clone(),
|
||||||
self.last_online.clone(),
|
self.last_online.clone(),
|
||||||
|
config.client_pool_size(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,7 @@ fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Erro
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
|
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
|
||||||
let client = requests::build_client(&config.user_agent());
|
let client = requests::build_client(&config.user_agent(), config.client_pool_size());
|
||||||
|
|
||||||
if !args.blocks().is_empty() || !args.allowed().is_empty() {
|
if !args.blocks().is_empty() || !args.allowed().is_empty() {
|
||||||
if args.undo() {
|
if args.undo() {
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use activitystreams::iri_string::types::IriString;
|
use activitystreams::iri_string::types::IriString;
|
||||||
use actix_web::http::header::Date;
|
use actix_web::http::header::Date;
|
||||||
use awc::{error::SendRequestError, Client, ClientResponse};
|
use awc::{error::SendRequestError, Client, ClientResponse, Connector};
|
||||||
use base64::{engine::general_purpose::STANDARD, Engine};
|
use base64::{engine::general_purpose::STANDARD, Engine};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use http_signature_normalization_actix::prelude::*;
|
use http_signature_normalization_actix::prelude::*;
|
||||||
|
@ -145,6 +145,7 @@ impl Default for Breaker {
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct Requests {
|
pub(crate) struct Requests {
|
||||||
|
pool_size: usize,
|
||||||
client: Rc<RefCell<Client>>,
|
client: Rc<RefCell<Client>>,
|
||||||
consecutive_errors: Rc<AtomicUsize>,
|
consecutive_errors: Rc<AtomicUsize>,
|
||||||
error_limit: usize,
|
error_limit: usize,
|
||||||
|
@ -159,6 +160,7 @@ pub(crate) struct Requests {
|
||||||
impl std::fmt::Debug for Requests {
|
impl std::fmt::Debug for Requests {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("Requests")
|
f.debug_struct("Requests")
|
||||||
|
.field("pool_size", &self.pool_size)
|
||||||
.field("error_limit", &self.error_limit)
|
.field("error_limit", &self.error_limit)
|
||||||
.field("key_id", &self.key_id)
|
.field("key_id", &self.key_id)
|
||||||
.field("user_agent", &self.user_agent)
|
.field("user_agent", &self.user_agent)
|
||||||
|
@ -168,8 +170,11 @@ impl std::fmt::Debug for Requests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn build_client(user_agent: &str) -> Client {
|
pub(crate) fn build_client(user_agent: &str, pool_size: usize) -> Client {
|
||||||
|
let connector = Connector::new().limit(pool_size);
|
||||||
|
|
||||||
Client::builder()
|
Client::builder()
|
||||||
|
.connector(connector)
|
||||||
.wrap(Tracing)
|
.wrap(Tracing)
|
||||||
.add_default_header(("User-Agent", user_agent.to_string()))
|
.add_default_header(("User-Agent", user_agent.to_string()))
|
||||||
.timeout(Duration::from_secs(15))
|
.timeout(Duration::from_secs(15))
|
||||||
|
@ -183,9 +188,11 @@ impl Requests {
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
breakers: Breakers,
|
breakers: Breakers,
|
||||||
last_online: Arc<LastOnline>,
|
last_online: Arc<LastOnline>,
|
||||||
|
pool_size: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Requests {
|
Requests {
|
||||||
client: Rc::new(RefCell::new(build_client(&user_agent))),
|
pool_size,
|
||||||
|
client: Rc::new(RefCell::new(build_client(&user_agent, pool_size))),
|
||||||
consecutive_errors: Rc::new(AtomicUsize::new(0)),
|
consecutive_errors: Rc::new(AtomicUsize::new(0)),
|
||||||
error_limit: 3,
|
error_limit: 3,
|
||||||
key_id,
|
key_id,
|
||||||
|
@ -205,7 +212,7 @@ impl Requests {
|
||||||
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
||||||
if count + 1 >= self.error_limit {
|
if count + 1 >= self.error_limit {
|
||||||
tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
|
tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
|
||||||
*self.client.borrow_mut() = build_client(&self.user_agent);
|
*self.client.borrow_mut() = build_client(&self.user_agent, self.pool_size);
|
||||||
self.reset_err();
|
self.reset_err();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue