Make delivery concurrency configurable

This commit is contained in:
asonix 2023-07-25 14:45:15 -05:00
parent 582f311a20
commit d1c6f6ff5d
2 changed files with 14 additions and 3 deletions

View file

@ -45,6 +45,7 @@ pub(crate) struct ParsedConfig {
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_addr: Option<IpAddr>, prometheus_addr: Option<IpAddr>,
prometheus_port: Option<u16>, prometheus_port: Option<u16>,
deliver_concurrency: u64,
client_pool_size: usize, client_pool_size: usize,
} }
@ -69,6 +70,7 @@ pub struct Config {
local_domains: Vec<String>, local_domains: Vec<String>,
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_config: Option<PrometheusConfig>, prometheus_config: Option<PrometheusConfig>,
deliver_concurrency: u64,
client_pool_size: usize, client_pool_size: usize,
} }
@ -137,6 +139,7 @@ impl std::fmt::Debug for Config {
.field("local_domains", &self.local_domains) .field("local_domains", &self.local_domains)
.field("local_blurb", &self.local_blurb) .field("local_blurb", &self.local_blurb)
.field("prometheus_config", &self.prometheus_config) .field("prometheus_config", &self.prometheus_config)
.field("deliver_concurrency", &self.deliver_concurrency)
.field("client_pool_size", &self.client_pool_size) .field("client_pool_size", &self.client_pool_size)
.finish() .finish()
} }
@ -167,6 +170,7 @@ impl Config {
.set_default("local_blurb", None as Option<&str>)? .set_default("local_blurb", None as Option<&str>)?
.set_default("prometheus_addr", None as Option<&str>)? .set_default("prometheus_addr", None as Option<&str>)?
.set_default("prometheus_port", None as Option<u16>)? .set_default("prometheus_port", None as Option<u16>)?
.set_default("deliver_concurrency", 8u64)?
.set_default("client_pool_size", 20u64)? .set_default("client_pool_size", 20u64)?
.add_source(Environment::default()) .add_source(Environment::default())
.build()?; .build()?;
@ -239,10 +243,15 @@ impl Config {
local_domains, local_domains,
local_blurb: config.local_blurb, local_blurb: config.local_blurb,
prometheus_config, prometheus_config,
deliver_concurrency: config.deliver_concurrency,
client_pool_size: config.client_pool_size, client_pool_size: config.client_pool_size,
}) })
} }
pub(crate) fn deliver_concurrency(&self) -> u64 {
self.deliver_concurrency
}
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> { pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
let config = self.prometheus_config.as_ref()?; let config = self.prometheus_config.as_ref()?;
@ -298,7 +307,7 @@ impl Config {
.add_tag_attributes("link", &["rel"]) .add_tag_attributes("link", &["rel"])
.link_rel(None) .link_rel(None)
.clean(blurb) .clean(blurb)
.to_string() .to_string(),
)); ));
} }
} }
@ -316,7 +325,7 @@ impl Config {
.add_tag_attributes("link", &["rel"]) .add_tag_attributes("link", &["rel"])
.link_rel(None) .link_rel(None)
.clean(blurb) .clean(blurb)
.to_string() .to_string(),
)); ));
} }
} }

View file

@ -45,6 +45,8 @@ pub(crate) fn create_workers(
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) -> JobServer { ) -> JobServer {
let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| { let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
JobState::new( JobState::new(
state.clone(), state.clone(),
@ -68,7 +70,7 @@ pub(crate) fn create_workers(
.register::<apub::Undo>() .register::<apub::Undo>()
.set_worker_count("maintenance", 2) .set_worker_count("maintenance", 2)
.set_worker_count("apub", 2) .set_worker_count("apub", 2)
.set_worker_count("deliver", 8) .set_worker_count("deliver", deliver_concurrency)
.start(); .start();
queue_handle.every(Duration::from_secs(60 * 5), Listeners); queue_handle.every(Duration::from_secs(60 * 5), Listeners);