Add an option to disable the retry queue

This commit is contained in:
cetra3 2023-07-03 23:00:09 +09:30
parent b64f4a8f3f
commit d029bfa224
2 changed files with 80 additions and 54 deletions

View file

@ -257,7 +257,7 @@ pub(crate) struct ActivityQueue {
stats: Arc<Stats>,
sender: UnboundedSender<SendActivityTask>,
sender_task: JoinHandle<()>,
retry_sender_task: JoinHandle<()>,
retry_sender_task: Option<JoinHandle<()>>,
}
/// Simple stat counter to show where we're up to with sending messages
@ -296,7 +296,7 @@ async fn worker(
client: ClientWithMiddleware,
timeout: Duration,
message: SendActivityTask,
retry_queue: UnboundedSender<SendActivityTask>,
retry_queue: Option<UnboundedSender<SendActivityTask>>,
stats: Arc<Stats>,
strategy: RetryStrategy,
) {
@ -313,13 +313,17 @@ async fn worker(
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
}
Err(_err) => {
stats.retries.fetch_add(1, Ordering::Relaxed);
warn!(
"Sending activity {} to {} to the retry queue to be tried again later",
message.activity_id, message.inbox
);
// Send to the retry queue. Ignoring whether it succeeds or not
retry_queue.send(message).ok();
if let Some(queue) = retry_queue.as_ref() {
stats.retries.fetch_add(1, Ordering::Relaxed);
warn!(
"Sending activity {} to {} to the retry queue to be tried again later",
message.activity_id, message.inbox
);
// Send to the retry queue. Ignoring whether it succeeds or not
queue.send(message).ok();
} else {
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
}
}
}
}
@ -367,6 +371,7 @@ impl ActivityQueue {
client: ClientWithMiddleware,
worker_count: usize,
retry_count: usize,
disable_retry: bool,
timeout: Duration,
backoff: usize, // This should be 60 seconds by default or 1 second in tests
) -> Self {
@ -383,9 +388,54 @@ impl ActivityQueue {
}
});
let (retry_sender, mut retry_receiver) = unbounded_channel();
let retry_stats = stats.clone();
let retry_client = client.clone();
let (retry_sender_task, retry_sender) = if disable_retry {
(None, None)
} else {
let (retry_sender, mut retry_receiver) = unbounded_channel();
let retry_stats = stats.clone();
let retry_client = client.clone();
// The "retry path" strategy
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
let retry_strategy = RetryStrategy {
backoff,
retries: 3,
offset: 2,
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
};
(
Some(tokio::spawn(async move {
let mut join_set = JoinSet::new();
while let Some(message) = retry_receiver.recv().await {
let retry_task = retry_worker(
retry_client.clone(),
timeout,
message,
retry_stats.clone(),
retry_strategy,
);
if retry_count > 0 {
// If we're over the limit of retries, wait for them to finish before spawning
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_task);
} else {
// If the retry worker count is `0` then just spawn and don't use the join_set
tokio::spawn(retry_task);
}
}
while !join_set.is_empty() {
join_set.join_next().await;
}
})),
Some(retry_sender),
)
};
// The "fast path" retry
// The backoff should be < 5 mins for this to work otherwise signatures may expire
@ -396,46 +446,6 @@ impl ActivityQueue {
offset: 0,
initial_sleep: 0,
};
// The "retry path" strategy
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
let retry_strategy = RetryStrategy {
backoff,
retries: 3,
offset: 2,
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
};
let retry_sender_task = tokio::spawn(async move {
let mut join_set = JoinSet::new();
while let Some(message) = retry_receiver.recv().await {
let retry_task = retry_worker(
retry_client.clone(),
timeout,
message,
retry_stats.clone(),
retry_strategy,
);
if retry_count > 0 {
// If we're over the limit of retries, wait for them to finish before spawning
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_task);
} else {
// If the retry worker count is `0` then just spawn and don't use the join_set
tokio::spawn(retry_task);
}
}
while !join_set.is_empty() {
join_set.join_next().await;
}
});
let (sender, mut receiver) = unbounded_channel();
let sender_stats = stats.clone();
@ -500,7 +510,9 @@ impl ActivityQueue {
self.sender_task.await?;
if wait_for_retries {
self.retry_sender_task.await?;
if let Some(retry_task) = self.retry_sender_task {
retry_task.await?;
}
}
Ok(self.stats)
@ -513,9 +525,17 @@ pub(crate) fn create_activity_queue(
client: ClientWithMiddleware,
worker_count: usize,
retry_count: usize,
disable_retry: bool,
request_timeout: Duration,
) -> ActivityQueue {
ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60)
ActivityQueue::new(
client,
worker_count,
retry_count,
disable_retry,
request_timeout,
60,
)
}
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
@ -618,6 +638,7 @@ mod tests {
reqwest::Client::default().into(),
num_workers,
num_workers,
false,
Duration::from_secs(10),
1,
);

View file

@ -65,6 +65,10 @@ pub struct FederationConfig<T: Clone> {
/// Setting this count to `0` means that there is no limit to concurrency
#[builder(default = "0")]
pub(crate) retry_count: usize,
/// Disable the retry queue completely
/// This means that retries will not be kept around but fail after the first retry
#[builder(default = "false")]
pub(crate) disable_retry: bool,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent. Do not use for production.
@ -206,6 +210,7 @@ impl<T: Clone> FederationConfigBuilder<T> {
config.client.clone(),
config.worker_count,
config.retry_count,
config.disable_retry,
config.request_timeout,
);
config.activity_queue = Some(Arc::new(queue));