mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-02 13:29:36 +00:00
WIP on new Retry Queue
This commit is contained in:
parent
cb38550977
commit
b00eaca1e8
|
@ -7,9 +7,9 @@ pipeline:
|
|||
commands:
|
||||
# need make existing toolchain available
|
||||
- cp -r -n /usr/local/cargo .cargo
|
||||
- rustup toolchain install nightly
|
||||
- rustup component add rustfmt --toolchain nightly
|
||||
- cargo +nightly fmt -- --check
|
||||
- rustup toolchain install nightly-2023-07-10
|
||||
- rustup component add rustfmt --toolchain nightly-2023-07-10
|
||||
- cargo +nightly-2023-07-10 fmt -- --check
|
||||
|
||||
cargo_check:
|
||||
image: rust:1.70-bullseye
|
||||
|
|
|
@ -56,6 +56,7 @@ axum = { version = "0.6.18", features = [
|
|||
], default-features = false, optional = true }
|
||||
tower = { version = "0.4.13", optional = true }
|
||||
hyper = { version = "0.14", optional = true }
|
||||
futures = "0.3.28"
|
||||
|
||||
[features]
|
||||
default = ["actix-web", "axum"]
|
||||
|
|
|
@ -2,24 +2,21 @@
|
|||
//!
|
||||
#![doc = include_str!("../../docs/09_sending_activities.md")]
|
||||
|
||||
use self::{request::sign_and_send, retry_queue::RetryQueue};
|
||||
use crate::{
|
||||
config::Data,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
|
||||
use bytes::Bytes;
|
||||
use itertools::Itertools;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, sync::atomic::Ordering, time::Duration};
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use self::{request::sign_and_send, retry_queue::RetryQueue};
|
||||
|
||||
pub(crate) mod request;
|
||||
pub(crate) mod retry_queue;
|
||||
pub(super) mod util;
|
||||
|
@ -146,20 +143,20 @@ mod tests {
|
|||
sync::{atomic::AtomicUsize, Arc},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::http_signatures::generate_actor_keypair;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[allow(unused)]
|
||||
// This will periodically send back internal errors to test the retry
|
||||
async fn dodgy_handler(
|
||||
State(state): State<Arc<AtomicUsize>>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<(), StatusCode> {
|
||||
debug!("Headers:{:?}", headers);
|
||||
debug!("Body len:{}", body.len());
|
||||
trace!("Headers:{:?}", headers);
|
||||
trace!("Body len:{}", body.len());
|
||||
|
||||
if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
@ -197,8 +194,8 @@ mod tests {
|
|||
|
||||
env_logger::builder()
|
||||
.filter_level(LevelFilter::Warn)
|
||||
.filter_module("activitypub_federation", LevelFilter::Info)
|
||||
.format_timestamp(None)
|
||||
.filter_module("activitypub_federation", LevelFilter::Debug)
|
||||
.format_timestamp(Some(env_logger::TimestampPrecision::Millis))
|
||||
.init();
|
||||
|
||||
*/
|
||||
|
@ -207,7 +204,7 @@ mod tests {
|
|||
reqwest::Client::default().into(),
|
||||
num_workers,
|
||||
num_workers,
|
||||
Duration::from_secs(10),
|
||||
Duration::from_secs(1),
|
||||
1,
|
||||
);
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use futures_core::Future;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
|
@ -8,13 +9,13 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
|
||||
use super::{
|
||||
util::RetryStrategy,
|
||||
worker::{main_worker, retry_worker},
|
||||
worker::{MainWorker, RetryWorker},
|
||||
SendActivityTask,
|
||||
};
|
||||
|
||||
|
@ -76,7 +77,6 @@ impl RetryQueue {
|
|||
}
|
||||
});
|
||||
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel();
|
||||
let retry_stats = stats.clone();
|
||||
let retry_client = client.clone();
|
||||
|
||||
|
@ -86,85 +86,34 @@ impl RetryQueue {
|
|||
let strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 1,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
};
|
||||
|
||||
// The "retry path" strategy
|
||||
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
|
||||
let retry_strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 3,
|
||||
offset: 2,
|
||||
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
|
||||
};
|
||||
let (retry_sender, retry_sender_task) = RetryWorker::new(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
retry_stats.clone(),
|
||||
retry_count,
|
||||
backoff.pow(2),
|
||||
);
|
||||
|
||||
let retry_sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = retry_receiver.recv().await {
|
||||
let retry_task = retry_worker(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_stats.clone(),
|
||||
retry_strategy,
|
||||
);
|
||||
|
||||
if retry_count > 0 {
|
||||
// If we're over the limit of retries, wait for them to finish before spawning
|
||||
while join_set.len() >= retry_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(retry_task);
|
||||
} else {
|
||||
// If the retry worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(retry_task);
|
||||
}
|
||||
}
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
|
||||
let (sender, mut receiver) = unbounded_channel();
|
||||
let (sender, receiver) = unbounded_channel();
|
||||
|
||||
let sender_stats = stats.clone();
|
||||
|
||||
let sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
let main_worker = Arc::new(MainWorker::new(
|
||||
client.clone(),
|
||||
timeout,
|
||||
retry_sender,
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
));
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = main_worker(
|
||||
client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_sender.clone(),
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
);
|
||||
|
||||
if worker_count > 0 {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(task);
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(task);
|
||||
}
|
||||
let sender_task = tokio::spawn(receiver_queue(worker_count, receiver, move |message| {
|
||||
let worker = main_worker.clone();
|
||||
async move {
|
||||
worker.send(message).await;
|
||||
}
|
||||
|
||||
drop(retry_sender);
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
}));
|
||||
|
||||
Self {
|
||||
stats,
|
||||
|
@ -202,3 +151,38 @@ impl RetryQueue {
|
|||
Ok(self.stats)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to abstract away the receiver queue task
|
||||
// This will use a join set to apply backpressure or have it entirely unbounded
|
||||
async fn receiver_queue<O: Future<Output = ()> + Send + 'static, F: Fn(SendActivityTask) -> O>(
|
||||
worker_count: usize,
|
||||
mut receiver: UnboundedReceiver<SendActivityTask>,
|
||||
spawn_fn: F,
|
||||
) {
|
||||
// If we're above the worker count, we create a joinset to apply a bit of backpressure here
|
||||
if worker_count > 0 {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
let task = spawn_fn(message);
|
||||
|
||||
join_set.spawn(task);
|
||||
}
|
||||
|
||||
// Drain the queue if we receive no extra messages
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = spawn_fn(message);
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,6 @@ pub(super) struct RetryStrategy {
|
|||
pub backoff: usize,
|
||||
/// Amount of times to retry
|
||||
pub retries: usize,
|
||||
/// If this particular request has already been retried, you can add an offset here to increment the count to start
|
||||
pub offset: usize,
|
||||
/// Number of seconds to sleep before trying
|
||||
pub initial_sleep: usize,
|
||||
}
|
||||
|
||||
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
|
||||
|
@ -28,13 +24,7 @@ pub(super) async fn retry<
|
|||
mut action: A,
|
||||
strategy: RetryStrategy,
|
||||
) -> Result<T, E> {
|
||||
let mut count = strategy.offset;
|
||||
|
||||
// Do an initial sleep if it's called for
|
||||
if strategy.initial_sleep > 0 {
|
||||
let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
let mut count = 0;
|
||||
|
||||
loop {
|
||||
match action().await {
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::warn;
|
||||
|
||||
use super::{
|
||||
request::sign_and_send,
|
||||
retry_queue::Stats,
|
||||
util::{retry, RetryStrategy},
|
||||
SendActivityTask,
|
||||
use tokio::{
|
||||
sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedSender, WeakUnboundedSender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use super::{request::sign_and_send, retry_queue::Stats, util::RetryStrategy, SendActivityTask};
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
|
@ -21,72 +19,199 @@ use super::{
|
|||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
pub(super) async fn main_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
retry_queue: UnboundedSender<SendActivityTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) {
|
||||
stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
pub(super) struct MainWorker {
|
||||
pub client: ClientWithMiddleware,
|
||||
pub timeout: Duration,
|
||||
pub retry_queue: UnboundedSender<SendRetryTask>,
|
||||
pub stats: Arc<Stats>,
|
||||
pub strategy: RetryStrategy,
|
||||
}
|
||||
|
||||
let outcome = sign_and_send(&message, &client, timeout, strategy).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
impl MainWorker {
|
||||
pub fn new(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_queue: UnboundedSender<SendRetryTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
timeout,
|
||||
retry_queue,
|
||||
stats,
|
||||
strategy,
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
warn!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
retry_queue.send(message).ok();
|
||||
}
|
||||
pub async fn send(&self, message: SendActivityTask) {
|
||||
self.stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
self.stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let outcome = sign_and_send(&message, &self.client, self.timeout, self.strategy).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
self.stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.completed_last_hour
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
self.stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
debug!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
if let Err(err) = self.retry_queue.send(SendRetryTask {
|
||||
message,
|
||||
last_sent: Instant::now(),
|
||||
count: 2,
|
||||
}) {
|
||||
error!("Error sending to retry queue:{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn retry_worker(
|
||||
// this is a retry worker that will basically keep a list of pending futures, and try at regular intervals to send them all in a batch
|
||||
pub(super) struct RetryWorker {
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = retry(
|
||||
|| {
|
||||
sign_and_send(
|
||||
&message,
|
||||
&client,
|
||||
timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
},
|
||||
)
|
||||
},
|
||||
strategy,
|
||||
)
|
||||
.await;
|
||||
retry_sender: WeakUnboundedSender<SendRetryTask>,
|
||||
}
|
||||
|
||||
stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
pub(super) struct SendRetryTask {
|
||||
message: SendActivityTask,
|
||||
// The time this was last sent
|
||||
last_sent: Instant,
|
||||
// The current count
|
||||
count: usize,
|
||||
}
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
impl RetryWorker {
|
||||
pub fn new(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
stats: Arc<Stats>,
|
||||
max_workers: usize,
|
||||
sleep_interval: usize,
|
||||
) -> (UnboundedSender<SendRetryTask>, JoinHandle<()>) {
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel::<SendRetryTask>();
|
||||
|
||||
let worker = Self {
|
||||
client,
|
||||
timeout,
|
||||
stats,
|
||||
retry_sender: retry_sender.clone().downgrade(),
|
||||
};
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs((sleep_interval) as u64));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
// A list of pending futures
|
||||
let futures = FuturesUnordered::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let mut requeue_messages = Vec::new();
|
||||
|
||||
// Grab all the activities that are waiting to be sent
|
||||
loop {
|
||||
// try_recv will not await anything
|
||||
match retry_receiver.try_recv() {
|
||||
Ok(message) => {
|
||||
let sleep_duration = Duration::from_secs(
|
||||
sleep_interval.pow(message.count as u32) as u64,
|
||||
// Take off 1 second for tests to pass
|
||||
) - Duration::from_secs(1);
|
||||
|
||||
// If the time between now and sending this message is greater than our sleep duration
|
||||
if now - message.last_sent > sleep_duration {
|
||||
futures.push(worker.send(message));
|
||||
} else {
|
||||
// If we haven't slept long enough, then we just add it to the end of the queue
|
||||
requeue_messages.push(message);
|
||||
}
|
||||
|
||||
// If we have reached our max concurrency count
|
||||
if max_workers > 0 && futures.len() >= max_workers {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
// no more to be had, break and wait for the next interval
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
// The queue has completely disconnected, and so we should shut down this task
|
||||
// Drain the queue then exit after
|
||||
futures.collect::<()>().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if futures.len() > 0 || requeue_messages.len() > 0 {
|
||||
// Drain the queue
|
||||
info!(
|
||||
"Retrying {}/{} messages",
|
||||
futures.len(),
|
||||
requeue_messages.len() + futures.len()
|
||||
);
|
||||
}
|
||||
|
||||
futures.collect::<()>().await;
|
||||
|
||||
for message in requeue_messages {
|
||||
worker
|
||||
.retry_sender
|
||||
.upgrade()
|
||||
.and_then(|sender| sender.send(message).ok());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(retry_sender, join_handle)
|
||||
}
|
||||
|
||||
async fn send(&self, mut retry: SendRetryTask) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = sign_and_send(
|
||||
&retry.message,
|
||||
&self.client,
|
||||
self.timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
self.stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.completed_last_hour
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
if retry.count < 3 {
|
||||
retry.count += 1;
|
||||
retry.last_sent = Instant::now();
|
||||
self.retry_sender
|
||||
.upgrade()
|
||||
.and_then(|sender| sender.send(retry).ok());
|
||||
} else {
|
||||
self.stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue