mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-02 13:29:36 +00:00
Consolidate the main & retry worker
This commit is contained in:
parent
b00eaca1e8
commit
a8b661383a
|
@ -36,6 +36,7 @@ http-signature-normalization-reqwest = { version = "0.8.0", default-features = f
|
|||
http-signature-normalization = "0.7.0"
|
||||
bytes = "1.4.0"
|
||||
futures-core = { version = "0.3.28", default-features = false }
|
||||
futures-util = "0.3.28"
|
||||
pin-project-lite = "0.2.9"
|
||||
activitystreams-kinds = "0.3.0"
|
||||
regex = { version = "1.8.4", default-features = false, features = ["std", "unicode-case"] }
|
||||
|
@ -56,7 +57,6 @@ axum = { version = "0.6.18", features = [
|
|||
], default-features = false, optional = true }
|
||||
tower = { version = "0.4.13", optional = true }
|
||||
hyper = { version = "0.14", optional = true }
|
||||
futures = "0.3.28"
|
||||
|
||||
[features]
|
||||
default = ["actix-web", "axum"]
|
||||
|
|
|
@ -9,18 +9,23 @@ use crate::{
|
|||
};
|
||||
use anyhow::anyhow;
|
||||
use bytes::Bytes;
|
||||
use futures_util::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use serde::Serialize;
|
||||
use std::{fmt::Debug, sync::atomic::Ordering, time::Duration};
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
sync::atomic::Ordering,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{debug, info, warn};
|
||||
use url::Url;
|
||||
|
||||
pub(crate) mod request;
|
||||
pub(crate) mod retry_queue;
|
||||
pub(super) mod retry_worker;
|
||||
pub(super) mod util;
|
||||
pub(super) mod worker;
|
||||
|
||||
/// Send a new activity to the given inboxes
|
||||
///
|
||||
|
@ -43,60 +48,29 @@ where
|
|||
ActorType: Actor,
|
||||
{
|
||||
let config = &data.config;
|
||||
let actor_id = activity.actor();
|
||||
let activity_id = activity.id();
|
||||
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?;
|
||||
|
||||
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
|
||||
let private_key = tokio::task::spawn_blocking(move || {
|
||||
PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
|
||||
})
|
||||
.await
|
||||
.map_err(|err| anyhow!("Error joining:{err}"))??;
|
||||
|
||||
let inboxes: Vec<Url> = inboxes
|
||||
.into_iter()
|
||||
.unique()
|
||||
.filter(|i| !config.is_local_url(i))
|
||||
.collect();
|
||||
// This field is only optional to make builder work, its always present at this point
|
||||
let activity_queue = config
|
||||
.activity_queue
|
||||
.as_ref()
|
||||
.expect("Config has activity queue");
|
||||
for inbox in inboxes {
|
||||
if let Err(err) = config.verify_url_valid(&inbox).await {
|
||||
debug!("inbox url invalid, skipping: {inbox}: {err}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let message = SendActivityTask {
|
||||
actor_id: actor_id.to_string(),
|
||||
activity_id: activity_id.to_string(),
|
||||
inbox: inbox.to_string(),
|
||||
activity: activity_serialized.clone(),
|
||||
private_key: private_key.clone(),
|
||||
http_signature_compat: config.http_signature_compat,
|
||||
};
|
||||
|
||||
for raw_activity in prepare_raw(&activity, actor, inboxes, data).await? {
|
||||
// Don't use the activity queue if this is in debug mode, send and wait directly
|
||||
if config.debug {
|
||||
if let Err(err) = sign_and_send(
|
||||
&message,
|
||||
&raw_activity,
|
||||
&config.client,
|
||||
config.request_timeout,
|
||||
Default::default(),
|
||||
config.http_signature_compat,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("{err}");
|
||||
}
|
||||
} else {
|
||||
activity_queue.queue(message).await?;
|
||||
activity_queue.queue(raw_activity).await?;
|
||||
let stats = activity_queue.get_stats();
|
||||
let running = stats.running.load(Ordering::Relaxed);
|
||||
if running == config.worker_count && config.worker_count != 0 {
|
||||
|
@ -112,15 +86,96 @@ where
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// A raw opaque activity to an inbox that can be sent directly rather than via the queue
|
||||
// NOTE: These ids & the inbox are actually valid Urls but saved as String to reduce the size of this struct in memory
|
||||
// Make sure if you adjust the `send_activity` method, then they should be valid urls by the time they get to this struct
|
||||
pub(super) struct SendActivityTask {
|
||||
pub struct RawActivity {
|
||||
actor_id: String,
|
||||
activity_id: String,
|
||||
activity: Bytes,
|
||||
inbox: String,
|
||||
private_key: PKey<Private>,
|
||||
http_signature_compat: bool,
|
||||
}
|
||||
|
||||
impl RawActivity {
|
||||
/// Sends a raw activity directly, rather than using the background queue.
|
||||
/// This will sign and send the request using the configured [`client`](crate::config::FederationConfigBuilder::client) in the federation config
|
||||
pub async fn send<Datatype: Clone>(
|
||||
&self,
|
||||
data: &Data<Datatype>,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
let config = &data.config;
|
||||
Ok(sign_and_send(
|
||||
self,
|
||||
&config.client,
|
||||
config.request_timeout,
|
||||
Default::default(),
|
||||
config.http_signature_compat,
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RawActivity {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{} to {}", self.activity_id, self.inbox)
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare a list of raw activities for sending to individual inboxes.
|
||||
///
|
||||
/// All inboxes are checked to see if they are valid non-local urls.
|
||||
///
|
||||
/// If you want to send activities to a background queue, use [`send_activity`]
|
||||
/// Once prepared, you can use the [`RawActivity::send`] method to send the activity to an inbox
|
||||
pub async fn prepare_raw<Activity, Datatype, ActorType>(
|
||||
activity: &Activity,
|
||||
actor: &ActorType,
|
||||
inboxes: Vec<Url>,
|
||||
data: &Data<Datatype>,
|
||||
) -> Result<Vec<RawActivity>, <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler + Serialize,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
|
||||
Datatype: Clone,
|
||||
ActorType: Actor,
|
||||
{
|
||||
let config = &data.config;
|
||||
let actor_id = activity.actor();
|
||||
let activity_id = activity.id();
|
||||
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?;
|
||||
|
||||
let private_key = tokio::task::spawn_blocking(move || {
|
||||
PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
|
||||
})
|
||||
.await
|
||||
.map_err(|err| anyhow!("Error joining:{err}"))??;
|
||||
|
||||
Ok(futures_util::stream::iter(
|
||||
inboxes
|
||||
.into_iter()
|
||||
.unique()
|
||||
.filter(|i| !config.is_local_url(i)),
|
||||
)
|
||||
.filter_map(|inbox| async {
|
||||
let inbox = inbox;
|
||||
if let Err(err) = config.verify_url_valid(&inbox).await {
|
||||
debug!("inbox url invalid, skipping: {inbox}: {err}");
|
||||
return None;
|
||||
};
|
||||
Some(RawActivity {
|
||||
actor_id: actor_id.to_string(),
|
||||
activity_id: activity_id.to_string(),
|
||||
inbox: inbox.to_string(),
|
||||
activity: activity_serialized.clone(),
|
||||
private_key: private_key.clone(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
.await)
|
||||
}
|
||||
|
||||
/// Creates an activity queue using tokio spawned tasks
|
||||
|
@ -129,9 +184,19 @@ pub(crate) fn create_activity_queue(
|
|||
client: ClientWithMiddleware,
|
||||
worker_count: usize,
|
||||
retry_count: usize,
|
||||
disable_retry: bool,
|
||||
request_timeout: Duration,
|
||||
http_signature_compat: bool,
|
||||
) -> RetryQueue {
|
||||
RetryQueue::new(client, worker_count, retry_count, request_timeout, 60)
|
||||
RetryQueue::new(
|
||||
client,
|
||||
worker_count,
|
||||
retry_count,
|
||||
disable_retry,
|
||||
request_timeout,
|
||||
60,
|
||||
http_signature_compat,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -194,29 +259,29 @@ mod tests {
|
|||
|
||||
env_logger::builder()
|
||||
.filter_level(LevelFilter::Warn)
|
||||
.filter_module("activitypub_federation", LevelFilter::Debug)
|
||||
.filter_module("activitypub_federation", LevelFilter::Info)
|
||||
.format_timestamp(Some(env_logger::TimestampPrecision::Millis))
|
||||
.init();
|
||||
|
||||
*/
|
||||
|
||||
let activity_queue = RetryQueue::new(
|
||||
reqwest::Client::default().into(),
|
||||
num_workers,
|
||||
num_workers,
|
||||
false,
|
||||
Duration::from_secs(1),
|
||||
1,
|
||||
true,
|
||||
);
|
||||
|
||||
let keypair = generate_actor_keypair().unwrap();
|
||||
|
||||
let message = SendActivityTask {
|
||||
actor_id: "http://localhost:8001".parse().unwrap(),
|
||||
activity_id: "http://localhost:8001/activity".parse().unwrap(),
|
||||
let message = RawActivity {
|
||||
actor_id: "http://localhost:8001".into(),
|
||||
activity_id: "http://localhost:8001/activity".into(),
|
||||
activity: "{}".into(),
|
||||
inbox: "http://localhost:8001".parse().unwrap(),
|
||||
inbox: "http://localhost:8001".into(),
|
||||
private_key: keypair.private_key().unwrap(),
|
||||
http_signature_compat: true,
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
|
@ -236,6 +301,7 @@ mod tests {
|
|||
num_messages as f64 / start.elapsed().as_secs_f64()
|
||||
);
|
||||
|
||||
info!("Stats: {:?}", stats);
|
||||
assert_eq!(
|
||||
stats.completed_last_hour.load(Ordering::Relaxed),
|
||||
num_messages
|
||||
|
|
|
@ -16,38 +16,33 @@ use std::time::{Duration, SystemTime};
|
|||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
use super::{util::RetryStrategy, SendActivityTask};
|
||||
use super::{util::RetryStrategy, RawActivity};
|
||||
|
||||
pub(super) async fn sign_and_send(
|
||||
task: &SendActivityTask,
|
||||
raw: &RawActivity,
|
||||
client: &ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
debug!(
|
||||
"Sending {} to {}, contents:\n {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
serde_json::from_slice::<serde_json::Value>(&task.activity)?
|
||||
);
|
||||
let request_builder = client
|
||||
.post(task.inbox.to_string())
|
||||
.post(raw.inbox.to_string())
|
||||
.timeout(timeout)
|
||||
.headers(generate_request_headers(&task.inbox)?);
|
||||
.headers(generate_request_headers(&raw.inbox)?);
|
||||
let request = sign_request(
|
||||
request_builder,
|
||||
&task.actor_id,
|
||||
task.activity.clone(),
|
||||
task.private_key.clone(),
|
||||
task.http_signature_compat,
|
||||
&raw.actor_id,
|
||||
raw.activity.clone(),
|
||||
raw.private_key.clone(),
|
||||
http_signature_compat,
|
||||
)
|
||||
.await
|
||||
.context("signing request")?;
|
||||
.with_context(|| format!("signing activity {raw}"))?;
|
||||
|
||||
retry(
|
||||
|| {
|
||||
send(
|
||||
task,
|
||||
raw,
|
||||
client,
|
||||
request
|
||||
.try_clone()
|
||||
|
@ -60,7 +55,7 @@ pub(super) async fn sign_and_send(
|
|||
}
|
||||
|
||||
pub(super) async fn send(
|
||||
task: &SendActivityTask,
|
||||
raw: &RawActivity,
|
||||
client: &ClientWithMiddleware,
|
||||
request: Request,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
|
@ -68,35 +63,25 @@ pub(super) async fn send(
|
|||
|
||||
match response {
|
||||
Ok(o) if o.status().is_success() => {
|
||||
debug!(
|
||||
"Activity {} delivered successfully to {}",
|
||||
task.activity_id, task.inbox
|
||||
);
|
||||
debug!("Activity {raw} delivered successfully",);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) if o.status().is_client_error() => {
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
debug!(
|
||||
"Activity {} was rejected by {}, aborting: {}",
|
||||
task.activity_id, task.inbox, text,
|
||||
);
|
||||
debug!("Activity {raw} was rejected, aborting: {}", text);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) => {
|
||||
let status = o.status();
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after failure with status {}: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
"Activity {raw} failed with status {}: {}",
|
||||
status,
|
||||
text,
|
||||
))
|
||||
}
|
||||
Err(e) => Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after connection failure: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
"Activity {raw} failed with connection failure: {}",
|
||||
e
|
||||
)),
|
||||
}
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
use futures_core::Future;
|
||||
use super::{
|
||||
retry_worker::{RetryRawActivity, RetryWorker},
|
||||
RawActivity,
|
||||
};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
|
@ -6,28 +9,17 @@ use std::{
|
|||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
|
||||
use super::{
|
||||
util::RetryStrategy,
|
||||
worker::{MainWorker, RetryWorker},
|
||||
SendActivityTask,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
|
||||
|
||||
/// A simple activity queue which spawns tokio workers to send out requests
|
||||
/// When creating a queue, it will spawn a task per worker thread
|
||||
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
|
||||
pub(crate) struct RetryQueue {
|
||||
// Stats shared between the queue and workers
|
||||
stats: Arc<Stats>,
|
||||
sender: UnboundedSender<SendActivityTask>,
|
||||
sender: UnboundedSender<RetryRawActivity>,
|
||||
sender_task: JoinHandle<()>,
|
||||
retry_sender_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
/// Simple stat counter to show where we're up to with sending messages
|
||||
|
@ -61,8 +53,10 @@ impl RetryQueue {
|
|||
client: ClientWithMiddleware,
|
||||
worker_count: usize,
|
||||
retry_count: usize,
|
||||
disable_retry: bool,
|
||||
timeout: Duration,
|
||||
backoff: usize, // This should be 60 seconds by default or 1 second in tests
|
||||
http_signature_compat: bool,
|
||||
) -> Self {
|
||||
let stats: Arc<Stats> = Default::default();
|
||||
|
||||
|
@ -77,55 +71,36 @@ impl RetryQueue {
|
|||
}
|
||||
});
|
||||
|
||||
let retry_stats = stats.clone();
|
||||
let retry_client = client.clone();
|
||||
|
||||
// The "fast path" retry
|
||||
// The backoff should be < 5 mins for this to work otherwise signatures may expire
|
||||
// This strategy is the one that is used with the *same* signature
|
||||
let strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 1,
|
||||
};
|
||||
|
||||
let (retry_sender, retry_sender_task) = RetryWorker::new(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
retry_stats.clone(),
|
||||
retry_count,
|
||||
backoff.pow(2),
|
||||
);
|
||||
|
||||
let (sender, receiver) = unbounded_channel();
|
||||
|
||||
let sender_stats = stats.clone();
|
||||
|
||||
let main_worker = Arc::new(MainWorker::new(
|
||||
// Setup & spawn the retry worker
|
||||
let (sender, sender_task) = RetryWorker::spawn(
|
||||
client.clone(),
|
||||
timeout,
|
||||
retry_sender,
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
));
|
||||
|
||||
let sender_task = tokio::spawn(receiver_queue(worker_count, receiver, move |message| {
|
||||
let worker = main_worker.clone();
|
||||
async move {
|
||||
worker.send(message).await;
|
||||
}
|
||||
}));
|
||||
stats.clone(),
|
||||
worker_count,
|
||||
if disable_retry {
|
||||
None
|
||||
} else {
|
||||
Some(retry_count)
|
||||
},
|
||||
backoff,
|
||||
http_signature_compat,
|
||||
);
|
||||
|
||||
Self {
|
||||
stats,
|
||||
sender,
|
||||
sender_task,
|
||||
retry_sender_task,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
|
||||
pub(super) async fn queue(&self, raw: RawActivity) -> Result<(), anyhow::Error> {
|
||||
self.stats.pending.fetch_add(1, Ordering::Relaxed);
|
||||
self.sender.send(message)?;
|
||||
|
||||
self.sender.send(RetryRawActivity {
|
||||
message: raw,
|
||||
last_sent: Instant::now(),
|
||||
count: 1,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -144,45 +119,6 @@ impl RetryQueue {
|
|||
|
||||
self.sender_task.await?;
|
||||
|
||||
if wait_for_retries {
|
||||
self.retry_sender_task.await?;
|
||||
}
|
||||
|
||||
Ok(self.stats)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to abstract away the receiver queue task
|
||||
// This will use a join set to apply backpressure or have it entirely unbounded
|
||||
async fn receiver_queue<O: Future<Output = ()> + Send + 'static, F: Fn(SendActivityTask) -> O>(
|
||||
worker_count: usize,
|
||||
mut receiver: UnboundedReceiver<SendActivityTask>,
|
||||
spawn_fn: F,
|
||||
) {
|
||||
// If we're above the worker count, we create a joinset to apply a bit of backpressure here
|
||||
if worker_count > 0 {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
let task = spawn_fn(message);
|
||||
|
||||
join_set.spawn(task);
|
||||
}
|
||||
|
||||
// Drain the queue if we receive no extra messages
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = spawn_fn(message);
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
275
src/activity_queue/retry_worker.rs
Normal file
275
src/activity_queue/retry_worker.rs
Normal file
|
@ -0,0 +1,275 @@
|
|||
use super::{request::sign_and_send, retry_queue::Stats, util::RetryStrategy, RawActivity};
|
||||
use futures_core::Future;
|
||||
use futures_util::FutureExt;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{
|
||||
error::TryRecvError,
|
||||
unbounded_channel,
|
||||
UnboundedReceiver,
|
||||
UnboundedSender,
|
||||
WeakUnboundedSender,
|
||||
},
|
||||
task::{JoinHandle, JoinSet},
|
||||
time::MissedTickBehavior,
|
||||
};
|
||||
use tracing::error;
|
||||
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - >60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - >60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
pub(super) struct RetryWorker {
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
stats: Arc<Stats>,
|
||||
batch_sender: WeakUnboundedSender<RetryRawActivity>,
|
||||
backoff: usize,
|
||||
http_signature_compat: bool,
|
||||
}
|
||||
|
||||
/// A message that has tried to be sent but has not been able to be sent
|
||||
#[derive(Debug)]
|
||||
pub(super) struct RetryRawActivity {
|
||||
/// The message that is sent
|
||||
pub message: RawActivity,
|
||||
/// The time this was last sent
|
||||
pub last_sent: Instant,
|
||||
/// The current count
|
||||
pub count: usize,
|
||||
}
|
||||
|
||||
impl RetryWorker {
|
||||
/// Spawns a background task for managing the queue of retryables
|
||||
pub fn spawn(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
stats: Arc<Stats>,
|
||||
worker_count: usize,
|
||||
retry_count: Option<usize>,
|
||||
backoff: usize,
|
||||
http_signature_compat: bool,
|
||||
) -> (UnboundedSender<RetryRawActivity>, JoinHandle<()>) {
|
||||
// The main sender channel, gets called immediately when something is queued
|
||||
let (sender, receiver) = unbounded_channel::<RetryRawActivity>();
|
||||
// The batch sender channel, waits up to an hour before checking if anything needs to be sent
|
||||
let (batch_sender, batch_receiver) = unbounded_channel::<RetryRawActivity>();
|
||||
// The retry sender channel, is called by the batch
|
||||
let (retry_sender, retry_receiver) = unbounded_channel::<RetryRawActivity>();
|
||||
|
||||
let worker = Arc::new(Self {
|
||||
client,
|
||||
timeout,
|
||||
stats,
|
||||
batch_sender: batch_sender.clone().downgrade(),
|
||||
backoff,
|
||||
http_signature_compat,
|
||||
});
|
||||
|
||||
let loop_batch_sender = batch_sender.clone().downgrade();
|
||||
|
||||
let retry_task = tokio::spawn(async move {
|
||||
// This is the main worker queue, tasks sent here are sent immediately
|
||||
let main_worker = worker.clone();
|
||||
let worker_queue = receiver_queue(worker_count, receiver, move |message| {
|
||||
let worker = main_worker.clone();
|
||||
async move {
|
||||
worker.send(message).await;
|
||||
}
|
||||
});
|
||||
|
||||
// If retries are enabled, start up our batch task and retry queue
|
||||
if let Some(retry_count) = retry_count {
|
||||
// This task checks every hour anything that needs to be sent, based upon the last sent time
|
||||
// If any tasks need to be sent, they are then sent to the retry queue
|
||||
let batch_loop = retry_loop(
|
||||
backoff.pow(2),
|
||||
batch_receiver,
|
||||
loop_batch_sender,
|
||||
retry_sender,
|
||||
);
|
||||
|
||||
let retry_queue = receiver_queue(retry_count, retry_receiver, move |message| {
|
||||
let worker = worker.clone();
|
||||
async move {
|
||||
worker.send(message).await;
|
||||
}
|
||||
});
|
||||
|
||||
let wait_for_batch = worker_queue.then(|_| async move {
|
||||
// Wait a little bit before dropping the batch sender for tests
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
drop(batch_sender);
|
||||
});
|
||||
|
||||
tokio::join!(wait_for_batch, retry_queue, batch_loop);
|
||||
} else {
|
||||
drop(batch_sender);
|
||||
tokio::join!(worker_queue);
|
||||
}
|
||||
});
|
||||
|
||||
(sender, retry_task)
|
||||
}
|
||||
|
||||
async fn send(&self, mut retry: RetryRawActivity) {
|
||||
// If this is the first time running
|
||||
if retry.count == 1 {
|
||||
self.stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
self.stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let outcome = sign_and_send(
|
||||
&retry.message,
|
||||
&self.client,
|
||||
self.timeout,
|
||||
if retry.count == 1 {
|
||||
RetryStrategy {
|
||||
backoff: self.backoff,
|
||||
retries: 1,
|
||||
}
|
||||
} else {
|
||||
Default::default()
|
||||
},
|
||||
self.http_signature_compat,
|
||||
)
|
||||
.await;
|
||||
|
||||
if retry.count == 1 {
|
||||
self.stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.completed_last_hour
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
if retry.count != 1 {
|
||||
self.stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
Err(_err) => {
|
||||
// If retries are enabled
|
||||
if let Some(sender) = self.batch_sender.upgrade() {
|
||||
// If this is the first time, we append it to the retry count
|
||||
if retry.count == 1 {
|
||||
self.stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// If this is under 3 retries
|
||||
if retry.count < 3 {
|
||||
retry.count += 1;
|
||||
retry.last_sent = Instant::now();
|
||||
sender.send(retry).ok();
|
||||
} else {
|
||||
self.stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
self.stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a retry loop that will simply send tasks in batches
|
||||
/// It will check an incoming queue, and schedule any tasks that need to be sent
|
||||
/// The current sleep interval here is 1 hour
|
||||
async fn retry_loop(
|
||||
sleep_interval: usize,
|
||||
mut batch_receiver: UnboundedReceiver<RetryRawActivity>,
|
||||
batch_sender: WeakUnboundedSender<RetryRawActivity>,
|
||||
retry_sender: UnboundedSender<RetryRawActivity>,
|
||||
) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs((sleep_interval) as u64));
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// We requeue any messages to be checked next time if they haven't slept long enough yet
|
||||
let mut requeue_messages = Vec::new();
|
||||
|
||||
// Grab all the activities that are in the queue
|
||||
loop {
|
||||
// try_recv will not await anything
|
||||
match batch_receiver.try_recv() {
|
||||
Ok(message) => {
|
||||
let sleep_duration = Duration::from_secs(
|
||||
sleep_interval.pow(message.count as u32) as u64,
|
||||
// Take off 1 second for tests to pass
|
||||
) - Duration::from_secs(1);
|
||||
|
||||
// If the time between now and sending this message is greater than our sleep duration
|
||||
if message.last_sent.elapsed() > sleep_duration {
|
||||
if let Err(err) = retry_sender.send(message) {
|
||||
error!("Couldn't wake up task for sending: {err}");
|
||||
}
|
||||
} else {
|
||||
// If we haven't slept long enough, then we just add it to the end of the queue
|
||||
requeue_messages.push(message);
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
// no more to be had, break and wait for the next interval
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there are any messages that need to be retried later on
|
||||
if let Some(ref sender) = batch_sender.upgrade() {
|
||||
for message in requeue_messages {
|
||||
if let Err(err) = sender.send(message) {
|
||||
error!("Couldn't wake up task for sending: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to abstract away the receiver queue task.
|
||||
///
|
||||
/// This will use a join set to apply backpressure or have it entirely unbounded if the worker_count is 0
|
||||
pub(super) async fn receiver_queue<O: Future<Output = ()> + Send + 'static, F: Fn(A) -> O, A>(
|
||||
worker_count: usize,
|
||||
mut receiver: UnboundedReceiver<A>,
|
||||
spawn_fn: F,
|
||||
) {
|
||||
// If we're above the worker count, we create a joinset to apply a bit of backpressure here
|
||||
if worker_count > 0 {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
let task = spawn_fn(message);
|
||||
|
||||
join_set.spawn(task);
|
||||
}
|
||||
|
||||
// Drain the queue if we receive no extra messages
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = spawn_fn(message);
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,217 +0,0 @@
|
|||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedSender, WeakUnboundedSender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use super::{request::sign_and_send, retry_queue::Stats, util::RetryStrategy, SendActivityTask};
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
pub(super) struct MainWorker {
|
||||
pub client: ClientWithMiddleware,
|
||||
pub timeout: Duration,
|
||||
pub retry_queue: UnboundedSender<SendRetryTask>,
|
||||
pub stats: Arc<Stats>,
|
||||
pub strategy: RetryStrategy,
|
||||
}
|
||||
|
||||
impl MainWorker {
|
||||
pub fn new(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_queue: UnboundedSender<SendRetryTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
timeout,
|
||||
retry_queue,
|
||||
stats,
|
||||
strategy,
|
||||
}
|
||||
}
|
||||
pub async fn send(&self, message: SendActivityTask) {
|
||||
self.stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
self.stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let outcome = sign_and_send(&message, &self.client, self.timeout, self.strategy).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
self.stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.completed_last_hour
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
self.stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
debug!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
if let Err(err) = self.retry_queue.send(SendRetryTask {
|
||||
message,
|
||||
last_sent: Instant::now(),
|
||||
count: 2,
|
||||
}) {
|
||||
error!("Error sending to retry queue:{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// this is a retry worker that will basically keep a list of pending futures, and try at regular intervals to send them all in a batch
|
||||
pub(super) struct RetryWorker {
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
stats: Arc<Stats>,
|
||||
retry_sender: WeakUnboundedSender<SendRetryTask>,
|
||||
}
|
||||
|
||||
pub(super) struct SendRetryTask {
|
||||
message: SendActivityTask,
|
||||
// The time this was last sent
|
||||
last_sent: Instant,
|
||||
// The current count
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl RetryWorker {
|
||||
pub fn new(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
stats: Arc<Stats>,
|
||||
max_workers: usize,
|
||||
sleep_interval: usize,
|
||||
) -> (UnboundedSender<SendRetryTask>, JoinHandle<()>) {
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel::<SendRetryTask>();
|
||||
|
||||
let worker = Self {
|
||||
client,
|
||||
timeout,
|
||||
stats,
|
||||
retry_sender: retry_sender.clone().downgrade(),
|
||||
};
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs((sleep_interval) as u64));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
// A list of pending futures
|
||||
let futures = FuturesUnordered::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let mut requeue_messages = Vec::new();
|
||||
|
||||
// Grab all the activities that are waiting to be sent
|
||||
loop {
|
||||
// try_recv will not await anything
|
||||
match retry_receiver.try_recv() {
|
||||
Ok(message) => {
|
||||
let sleep_duration = Duration::from_secs(
|
||||
sleep_interval.pow(message.count as u32) as u64,
|
||||
// Take off 1 second for tests to pass
|
||||
) - Duration::from_secs(1);
|
||||
|
||||
// If the time between now and sending this message is greater than our sleep duration
|
||||
if now - message.last_sent > sleep_duration {
|
||||
futures.push(worker.send(message));
|
||||
} else {
|
||||
// If we haven't slept long enough, then we just add it to the end of the queue
|
||||
requeue_messages.push(message);
|
||||
}
|
||||
|
||||
// If we have reached our max concurrency count
|
||||
if max_workers > 0 && futures.len() >= max_workers {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
// no more to be had, break and wait for the next interval
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
// The queue has completely disconnected, and so we should shut down this task
|
||||
// Drain the queue then exit after
|
||||
futures.collect::<()>().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if futures.len() > 0 || requeue_messages.len() > 0 {
|
||||
// Drain the queue
|
||||
info!(
|
||||
"Retrying {}/{} messages",
|
||||
futures.len(),
|
||||
requeue_messages.len() + futures.len()
|
||||
);
|
||||
}
|
||||
|
||||
futures.collect::<()>().await;
|
||||
|
||||
for message in requeue_messages {
|
||||
worker
|
||||
.retry_sender
|
||||
.upgrade()
|
||||
.and_then(|sender| sender.send(message).ok());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(retry_sender, join_handle)
|
||||
}
|
||||
|
||||
async fn send(&self, mut retry: SendRetryTask) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = sign_and_send(
|
||||
&retry.message,
|
||||
&self.client,
|
||||
self.timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
self.stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.completed_last_hour
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
if retry.count < 3 {
|
||||
retry.count += 1;
|
||||
retry.last_sent = Instant::now();
|
||||
self.retry_sender
|
||||
.upgrade()
|
||||
.and_then(|sender| sender.send(retry).ok());
|
||||
} else {
|
||||
self.stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -66,6 +66,10 @@ pub struct FederationConfig<T: Clone> {
|
|||
/// Setting this count to `0` means that there is no limit to concurrency
|
||||
#[builder(default = "0")]
|
||||
pub(crate) retry_count: usize,
|
||||
/// Disable the retry queue completely
|
||||
/// This means that retries will not be kept around but fail after the first retry
|
||||
#[builder(default = "false")]
|
||||
pub(crate) disable_retry: bool,
|
||||
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
|
||||
/// outgoing activities synchronously, not in background thread. This helps to make tests
|
||||
/// more consistent. Do not use for production.
|
||||
|
@ -74,8 +78,7 @@ pub struct FederationConfig<T: Clone> {
|
|||
/// Allow HTTP urls even in production mode
|
||||
#[builder(default = "self.debug.unwrap_or(false)")]
|
||||
pub(crate) allow_http_urls: bool,
|
||||
/// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to
|
||||
/// use the same as timeout when sending
|
||||
/// Timeout for all HTTP requests. Default of 10 seconds.
|
||||
#[builder(default = "Duration::from_secs(10)")]
|
||||
pub(crate) request_timeout: Duration,
|
||||
/// Function used to verify that urls are valid, See [UrlVerifier] for details.
|
||||
|
@ -232,7 +235,9 @@ impl<T: Clone> FederationConfigBuilder<T> {
|
|||
config.client.clone(),
|
||||
config.worker_count,
|
||||
config.retry_count,
|
||||
config.disable_retry,
|
||||
config.request_timeout,
|
||||
config.http_signature_compat,
|
||||
);
|
||||
config.activity_queue = Some(Arc::new(queue));
|
||||
Ok(config)
|
||||
|
|
Loading…
Reference in a new issue