Add a no limit option to the config (#45)

* Add a no limit option to the config

* Set defaults to `0`
This commit is contained in:
cetra3 2023-06-22 17:10:59 +09:30 committed by GitHub
parent 607aca7739
commit 5de4a34550
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 23 deletions

View file

@ -121,7 +121,7 @@ where
stats.dead_last_hour.load(Ordering::Relaxed), stats.dead_last_hour.load(Ordering::Relaxed),
stats.completed_last_hour.load(Ordering::Relaxed), stats.completed_last_hour.load(Ordering::Relaxed),
); );
if running == config.worker_count { if running == config.worker_count && config.worker_count != 0 {
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count); warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
warn!(stats_fmt); warn!(stats_fmt);
} else { } else {
@ -409,18 +409,25 @@ impl ActivityQueue {
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
while let Some(message) = retry_receiver.recv().await { while let Some(message) = retry_receiver.recv().await {
// If we're over the limit of retries, wait for them to finish before spawning let retry_task = retry_worker(
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_worker(
retry_client.clone(), retry_client.clone(),
timeout, timeout,
message, message,
retry_stats.clone(), retry_stats.clone(),
retry_strategy, retry_strategy,
)); );
if retry_count > 0 {
// If we're over the limit of retries, wait for them to finish before spawning
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_task);
} else {
// If the retry worker count is `0` then just spawn and don't use the join_set
tokio::spawn(retry_task);
}
} }
while !join_set.is_empty() { while !join_set.is_empty() {
@ -436,19 +443,26 @@ impl ActivityQueue {
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
while let Some(message) = receiver.recv().await { while let Some(message) = receiver.recv().await {
// If we're over the limit of workers, wait for them to finish before spawning let task = worker(
while join_set.len() >= worker_count {
join_set.join_next().await;
}
join_set.spawn(worker(
client.clone(), client.clone(),
timeout, timeout,
message, message,
retry_sender.clone(), retry_sender.clone(),
sender_stats.clone(), sender_stats.clone(),
strategy, strategy,
)); );
if worker_count > 0 {
// If we're over the limit of workers, wait for them to finish before spawning
while join_set.len() >= worker_count {
join_set.join_next().await;
}
join_set.spawn(task);
} else {
// If the worker count is `0` then just spawn and don't use the join_set
tokio::spawn(task);
}
} }
drop(retry_sender); drop(retry_sender);
@ -500,11 +514,6 @@ pub(crate) fn create_activity_queue(
retry_count: usize, retry_count: usize,
request_timeout: Duration, request_timeout: Duration,
) -> ActivityQueue { ) -> ActivityQueue {
assert!(
worker_count > 0,
"worker count needs to be greater than zero"
);
ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60) ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60)
} }

View file

@ -56,12 +56,14 @@ pub struct FederationConfig<T: Clone> {
/// like log tracing or retry of failed requests. /// like log tracing or retry of failed requests.
pub(crate) client: ClientWithMiddleware, pub(crate) client: ClientWithMiddleware,
/// Number of tasks that can be in-flight concurrently. /// Number of tasks that can be in-flight concurrently.
/// Tasks are retried once after a minute, then put into the retry queue /// Tasks are retried once after a minute, then put into the retry queue.
#[builder(default = "1024")] /// Setting this count to `0` means that there is no limit to concurrency
#[builder(default = "0")]
pub(crate) worker_count: usize, pub(crate) worker_count: usize,
/// Number of concurrent tasks that are being retried in-flight concurrently. /// Number of concurrent tasks that are being retried in-flight concurrently.
/// Tasks are retried after an hour, then again in 60 hours. /// Tasks are retried after an hour, then again in 60 hours.
#[builder(default = "128")] /// Setting this count to `0` means that there is no limit to concurrency
#[builder(default = "0")]
pub(crate) retry_count: usize, pub(crate) retry_count: usize,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends /// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests /// outgoing activities synchronously, not in background thread. This helps to make tests