From d1c6f6ff5d93cd3474fef75712d7017cfc709386 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 25 Jul 2023 14:45:15 -0500 Subject: [PATCH] Make delivery concurrency configurable --- src/config.rs | 13 +++++++++++-- src/jobs.rs | 4 +++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5b185dc..b7c9a2d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,6 +45,7 @@ pub(crate) struct ParsedConfig { local_blurb: Option, prometheus_addr: Option, prometheus_port: Option, + deliver_concurrency: u64, client_pool_size: usize, } @@ -69,6 +70,7 @@ pub struct Config { local_domains: Vec, local_blurb: Option, prometheus_config: Option, + deliver_concurrency: u64, client_pool_size: usize, } @@ -137,6 +139,7 @@ impl std::fmt::Debug for Config { .field("local_domains", &self.local_domains) .field("local_blurb", &self.local_blurb) .field("prometheus_config", &self.prometheus_config) + .field("deliver_concurrency", &self.deliver_concurrency) .field("client_pool_size", &self.client_pool_size) .finish() } @@ -167,6 +170,7 @@ impl Config { .set_default("local_blurb", None as Option<&str>)? .set_default("prometheus_addr", None as Option<&str>)? .set_default("prometheus_port", None as Option)? + .set_default("deliver_concurrency", 8u64)? .set_default("client_pool_size", 20u64)? .add_source(Environment::default()) .build()?; @@ -239,10 +243,15 @@ impl Config { local_domains, local_blurb: config.local_blurb, prometheus_config, + deliver_concurrency: config.deliver_concurrency, client_pool_size: config.client_pool_size, }) } + pub(crate) fn deliver_concurrency(&self) -> u64 { + self.deliver_concurrency + } + pub(crate) fn prometheus_bind_address(&self) -> Option { let config = self.prometheus_config.as_ref()?; @@ -298,7 +307,7 @@ impl Config { .add_tag_attributes("link", &["rel"]) .link_rel(None) .clean(blurb) - .to_string() + .to_string(), )); } } @@ -316,7 +325,7 @@ impl Config { .add_tag_attributes("link", &["rel"]) .link_rel(None) .clean(blurb) - .to_string() + .to_string(), )); } } diff --git a/src/jobs.rs b/src/jobs.rs index e2b6aef..ceae891 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -45,6 +45,8 @@ pub(crate) fn create_workers( media: MediaCache, config: Config, ) -> JobServer { + let deliver_concurrency = config.deliver_concurrency(); + let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), @@ -68,7 +70,7 @@ pub(crate) fn create_workers( .register::() .set_worker_count("maintenance", 2) .set_worker_count("apub", 2) - .set_worker_count("deliver", 8) + .set_worker_count("deliver", deliver_concurrency) .start(); queue_handle.every(Duration::from_secs(60 * 5), Listeners);