diff --git a/src/activity_queue.rs b/src/activity_queue.rs index c6c5864..6e4ad99 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -121,7 +121,7 @@ where stats.dead_last_hour.load(Ordering::Relaxed), stats.completed_last_hour.load(Ordering::Relaxed), ); - if running == config.worker_count { + if running == config.worker_count && config.worker_count != 0 { warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count); warn!(stats_fmt); } else { @@ -409,18 +409,25 @@ impl ActivityQueue { let mut join_set = JoinSet::new(); while let Some(message) = retry_receiver.recv().await { - // If we're over the limit of retries, wait for them to finish before spawning - while join_set.len() >= retry_count { - join_set.join_next().await; - } - - join_set.spawn(retry_worker( + let retry_task = retry_worker( retry_client.clone(), timeout, message, retry_stats.clone(), retry_strategy, - )); + ); + + if retry_count > 0 { + // If we're over the limit of retries, wait for them to finish before spawning + while join_set.len() >= retry_count { + join_set.join_next().await; + } + + join_set.spawn(retry_task); + } else { + // If the retry worker count is `0` then just spawn and don't use the join_set + tokio::spawn(retry_task); + } } while !join_set.is_empty() { @@ -436,19 +443,26 @@ impl ActivityQueue { let mut join_set = JoinSet::new(); while let Some(message) = receiver.recv().await { - // If we're over the limit of workers, wait for them to finish before spawning - while join_set.len() >= worker_count { - join_set.join_next().await; - } - - join_set.spawn(worker( + let task = worker( client.clone(), timeout, message, retry_sender.clone(), sender_stats.clone(), strategy, - )); + ); + + if worker_count > 0 { + // If we're over the limit of workers, wait for them to finish before spawning + while join_set.len() >= worker_count { + join_set.join_next().await; + } + + join_set.spawn(task); + } else { + // If the worker count is `0` then just spawn and don't use the join_set + tokio::spawn(task); + } } drop(retry_sender); @@ -500,11 +514,6 @@ pub(crate) fn create_activity_queue( retry_count: usize, request_timeout: Duration, ) -> ActivityQueue { - assert!( - worker_count > 0, - "worker count needs to be greater than zero" - ); - ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60) } diff --git a/src/config.rs b/src/config.rs index 3cd0ce8..3500289 100644 --- a/src/config.rs +++ b/src/config.rs @@ -56,12 +56,14 @@ pub struct FederationConfig { /// like log tracing or retry of failed requests. pub(crate) client: ClientWithMiddleware, /// Number of tasks that can be in-flight concurrently. - /// Tasks are retried once after a minute, then put into the retry queue - #[builder(default = "1024")] + /// Tasks are retried once after a minute, then put into the retry queue. + /// Setting this count to `0` means that there is no limit to concurrency + #[builder(default = "0")] pub(crate) worker_count: usize, /// Number of concurrent tasks that are being retried in-flight concurrently. /// Tasks are retried after an hour, then again in 60 hours. - #[builder(default = "128")] + /// Setting this count to `0` means that there is no limit to concurrency + #[builder(default = "0")] pub(crate) retry_count: usize, /// Run library in debug mode. This allows usage of http and localhost urls. It also sends /// outgoing activities synchronously, not in background thread. This helps to make tests