Remove semaphore and use join set for backpressure

This commit is contained in:
cetra3 2023-06-18 10:38:26 +09:30
parent a1901a2961
commit 3c8867dabc

View file

@ -30,11 +30,7 @@ use std::{
time::{Duration, SystemTime},
};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedSender},
OwnedSemaphorePermit,
Semaphore,
},
sync::mpsc::{unbounded_channel, UnboundedSender},
task::{JoinHandle, JoinSet},
};
use tracing::{debug, info, warn};
@ -295,7 +291,6 @@ async fn worker(
client: ClientWithMiddleware,
timeout: Duration,
message: SendActivityTask,
permit: OwnedSemaphorePermit,
retry_queue: UnboundedSender<SendActivityTask>,
stats: Arc<Stats>,
strategy: RetryStrategy,
@ -322,15 +317,12 @@ async fn worker(
retry_queue.send(message).ok();
}
}
drop(permit);
}
async fn retry_worker(
client: ClientWithMiddleware,
timeout: Duration,
message: SendActivityTask,
permit: OwnedSemaphorePermit,
stats: Arc<Stats>,
strategy: RetryStrategy,
) {
@ -363,8 +355,6 @@ async fn retry_worker(
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
}
}
drop(permit)
}
impl ActivityQueue {
@ -412,21 +402,18 @@ impl ActivityQueue {
};
let retry_sender_task = tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(retry_count));
let mut join_set = JoinSet::new();
while let Some(message) = retry_receiver.recv().await {
let permit = semaphore
.clone()
.acquire_owned()
.await
.expect("should never be closed");
// If we're over the limit of retries, wait for them to finish before spawning
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_worker(
retry_client.clone(),
timeout,
message,
permit,
retry_stats.clone(),
retry_strategy,
));
@ -442,21 +429,18 @@ impl ActivityQueue {
let sender_stats = stats.clone();
let sender_task = tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(worker_count));
let mut join_set = JoinSet::new();
while let Some(message) = receiver.recv().await {
let permit = semaphore
.clone()
.acquire_owned()
.await
.expect("should never be closed");
// If we're over the limit of workers, wait for them to finish before spawning
while join_set.len() >= worker_count {
join_set.join_next().await;
}
join_set.spawn(worker(
client.clone(),
timeout,
message,
permit,
retry_sender.clone(),
sender_stats.clone(),
strategy,