diff --git a/examples/local_federation/activities/follow.rs b/examples/local_federation/activities/follow.rs index 865a618..02879d4 100644 --- a/examples/local_federation/activities/follow.rs +++ b/examples/local_federation/activities/follow.rs @@ -67,7 +67,7 @@ impl ActivityHandler for Follow { let id = generate_object_id(data.domain())?; let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); local_user - .send(accept, vec![follower.shared_inbox_or_inbox()], data) + .send(accept, vec![follower.shared_inbox_or_inbox()], false, data) .await?; Ok(()) } diff --git a/examples/local_federation/objects/person.rs b/examples/local_federation/objects/person.rs index 2c47fcd..0b595c4 100644 --- a/examples/local_federation/objects/person.rs +++ b/examples/local_federation/objects/person.rs @@ -6,6 +6,7 @@ use crate::{ utils::generate_object_id, }; use activitypub_federation::{ + activity_queue::queue_activity, activity_sending::SendActivityTask, config::Data, fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, @@ -85,7 +86,7 @@ impl DbUser { let other: DbUser = webfinger_resolve_actor(other, data).await?; let id = generate_object_id(data.domain())?; let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send(follow, vec![other.shared_inbox_or_inbox()], data) + self.send(follow, vec![other.shared_inbox_or_inbox()], false, data) .await?; Ok(()) } @@ -98,7 +99,7 @@ impl DbUser { let user: DbUser = ObjectId::from(f).dereference(data).await?; inboxes.push(user.shared_inbox_or_inbox()); } - self.send(create, inboxes, data).await?; + self.send(create, inboxes, true, data).await?; Ok(()) } @@ -106,6 +107,7 @@ impl DbUser { &self, activity: Activity, recipients: Vec, + use_queue: bool, data: &Data, ) -> Result<(), Error> where @@ -113,9 +115,14 @@ impl DbUser { ::Error: From + From, { let activity = WithContext::new_default(activity); - let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; - for send in sends { - send.sign_and_send(data).await?; + // Send through queue in some cases and bypass it in others to test both code paths + if use_queue { + queue_activity(activity, self, recipients, data).await?; + } else { + let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; + for send in sends { + send.sign_and_send(data).await?; + } } Ok(()) } diff --git a/src/activity_queue.rs b/src/activity_queue.rs new file mode 100644 index 0000000..dea0289 --- /dev/null +++ b/src/activity_queue.rs @@ -0,0 +1,646 @@ +//! Queue for signing and sending outgoing activities with retry +//! +#![doc = include_str!("../docs/09_sending_activities.md")] + +use crate::{ + activity_sending::get_pkey_cached, + config::Data, + error::Error, + http_signatures::sign_request, + reqwest_shim::ResponseExt, + traits::{ActivityHandler, Actor}, + FEDERATION_CONTENT_TYPE, +}; +use bytes::Bytes; +use futures_core::Future; +use http::{header::HeaderName, HeaderMap, HeaderValue}; +use httpdate::fmt_http_date; +use itertools::Itertools; +use openssl::pkey::{PKey, Private}; +use reqwest::Request; +use reqwest_middleware::ClientWithMiddleware; +use serde::Serialize; +use std::{ + fmt::{Debug, Display}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, +}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedSender}, + task::{JoinHandle, JoinSet}, +}; +use tracing::{debug, info, warn}; +use url::Url; + +/// Send a new activity to the given inboxes +/// +/// - `activity`: The activity to be sent, gets converted to json +/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP +/// signature. Generated with [crate::http_signatures::generate_actor_keypair]. +/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor +/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] +/// for each target actor. +pub async fn queue_activity( + activity: Activity, + actor: &ActorType, + inboxes: Vec, + data: &Data, +) -> Result<(), Error> +where + Activity: ActivityHandler + Serialize + Debug, + Datatype: Clone, + ActorType: Actor, +{ + let config = &data.config; + let actor_id = activity.actor(); + let activity_id = activity.id(); + let activity_serialized: Bytes = serde_json::to_vec(&activity) + .map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))? + .into(); + let private_key = get_pkey_cached(data, actor).await?; + + let inboxes: Vec = inboxes + .into_iter() + .unique() + .filter(|i| !config.is_local_url(i)) + .collect(); + // This field is only optional to make builder work, its always present at this point + let activity_queue = config + .activity_queue + .as_ref() + .expect("Config has activity queue"); + for inbox in inboxes { + if let Err(err) = config.verify_url_valid(&inbox).await { + debug!("inbox url invalid, skipping: {inbox}: {err}"); + continue; + } + + let message = SendActivityTask { + actor_id: actor_id.clone(), + activity_id: activity_id.clone(), + inbox, + activity: activity_serialized.clone(), + private_key: private_key.clone(), + http_signature_compat: config.http_signature_compat, + }; + + // Don't use the activity queue if this is in debug mode, send and wait directly + if config.debug { + if let Err(err) = sign_and_send( + &message, + &config.client, + config.request_timeout, + Default::default(), + ) + .await + { + warn!("{err}"); + } + } else { + activity_queue.queue(message).await?; + let stats = activity_queue.get_stats(); + let running = stats.running.load(Ordering::Relaxed); + if running == config.queue_worker_count && config.queue_worker_count != 0 { + warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.queue_worker_count); + warn!("{:?}", stats); + } else { + info!("{:?}", stats); + } + } + } + + Ok(()) +} + +// TODO: should use the existing struct but lifetimes are difficult +#[derive(Clone, Debug)] +pub(crate) struct SendActivityTask { + actor_id: Url, + activity_id: Url, + activity: Bytes, + inbox: Url, + private_key: PKey, + http_signature_compat: bool, +} + +async fn sign_and_send( + task: &SendActivityTask, + client: &ClientWithMiddleware, + timeout: Duration, + retry_strategy: RetryStrategy, +) -> Result<(), Error> { + debug!("Sending {} to {}", task.activity_id, task.inbox,); + let request_builder = client + .post(task.inbox.to_string()) + .timeout(timeout) + .headers(generate_request_headers(&task.inbox)); + let request = sign_request( + request_builder, + &task.actor_id, + task.activity.clone(), + task.private_key.clone(), + task.http_signature_compat, + ) + .await?; + + retry( + || { + send( + task, + client, + request + .try_clone() + .expect("The body of the request is not cloneable"), + ) + }, + retry_strategy, + ) + .await +} + +async fn send( + task: &SendActivityTask, + client: &ClientWithMiddleware, + request: Request, +) -> Result<(), Error> { + let response = client.execute(request).await; + + match response { + Ok(o) if o.status().is_success() => { + debug!( + "Activity {} delivered successfully to {}", + task.activity_id, task.inbox + ); + Ok(()) + } + Ok(o) if o.status().is_client_error() => { + let text = o.text_limited().await?; + debug!( + "Activity {} was rejected by {}, aborting: {}", + task.activity_id, task.inbox, text, + ); + Ok(()) + } + Ok(o) => { + let status = o.status(); + let text = o.text_limited().await?; + Err(Error::Other(format!( + "Queueing activity {} to {} for retry after failure with status {}: {}", + task.activity_id, task.inbox, status, text, + ))) + } + Err(e) => Err(Error::Other(format!( + "Queueing activity {} to {} for retry after connection failure: {}", + task.activity_id, task.inbox, e + ))), + } +} + +pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { + let mut host = inbox_url.domain().expect("read inbox domain").to_string(); + if let Some(port) = inbox_url.port() { + host = format!("{}:{}", host, port); + } + + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::from_static("content-type"), + HeaderValue::from_static(FEDERATION_CONTENT_TYPE), + ); + headers.insert( + HeaderName::from_static("host"), + HeaderValue::from_str(&host).expect("Hostname is valid"), + ); + headers.insert( + "date", + HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"), + ); + headers +} + +/// A simple activity queue which spawns tokio workers to send out requests +/// When creating a queue, it will spawn a task per worker thread +/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) +pub(crate) struct ActivityQueue { + // Stats shared between the queue and workers + stats: Arc, + sender: UnboundedSender, + sender_task: JoinHandle<()>, + retry_sender_task: JoinHandle<()>, +} + +/// Simple stat counter to show where we're up to with sending messages +/// This is a lock-free way to share things between tasks +/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning +#[derive(Default)] +pub(crate) struct Stats { + pending: AtomicUsize, + running: AtomicUsize, + retries: AtomicUsize, + dead_last_hour: AtomicUsize, + completed_last_hour: AtomicUsize, +} + +impl Debug for Stats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}", + self.pending.load(Ordering::Relaxed), + self.running.load(Ordering::Relaxed), + self.retries.load(Ordering::Relaxed), + self.dead_last_hour.load(Ordering::Relaxed), + self.completed_last_hour.load(Ordering::Relaxed) + ) + } +} + +#[derive(Clone, Copy, Default)] +struct RetryStrategy { + /// Amount of time in seconds to back off + backoff: usize, + /// Amount of times to retry + retries: usize, + /// If this particular request has already been retried, you can add an offset here to increment the count to start + offset: usize, + /// Number of seconds to sleep before trying + initial_sleep: usize, +} + +/// A tokio spawned worker which is responsible for submitting requests to federated servers +/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue. +/// We need to retry activity sending in case the target instances is temporarily unreachable. +/// In this case, the task is stored and resent when the instance is hopefully back up. This +/// list shows the retry intervals, and which events of the target instance can be covered: +/// - 60s (one minute, service restart) -- happens in the worker w/ same signature +/// - 60min (one hour, instance maintenance) --- happens in the retry worker +/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker +async fn worker( + client: ClientWithMiddleware, + timeout: Duration, + message: SendActivityTask, + retry_queue: UnboundedSender, + stats: Arc, + strategy: RetryStrategy, +) { + stats.pending.fetch_sub(1, Ordering::Relaxed); + stats.running.fetch_add(1, Ordering::Relaxed); + + let outcome = sign_and_send(&message, &client, timeout, strategy).await; + + // "Running" has finished, check the outcome + stats.running.fetch_sub(1, Ordering::Relaxed); + + match outcome { + Ok(_) => { + stats.completed_last_hour.fetch_add(1, Ordering::Relaxed); + } + Err(_err) => { + stats.retries.fetch_add(1, Ordering::Relaxed); + warn!( + "Sending activity {} to {} to the retry queue to be tried again later", + message.activity_id, message.inbox + ); + // Send to the retry queue. Ignoring whether it succeeds or not + retry_queue.send(message).ok(); + } + } +} + +async fn retry_worker( + client: ClientWithMiddleware, + timeout: Duration, + message: SendActivityTask, + stats: Arc, + strategy: RetryStrategy, +) { + // Because the times are pretty extravagant between retries, we have to re-sign each time + let outcome = retry( + || { + sign_and_send( + &message, + &client, + timeout, + RetryStrategy { + backoff: 0, + retries: 0, + offset: 0, + initial_sleep: 0, + }, + ) + }, + strategy, + ) + .await; + + stats.retries.fetch_sub(1, Ordering::Relaxed); + + match outcome { + Ok(_) => { + stats.completed_last_hour.fetch_add(1, Ordering::Relaxed); + } + Err(_err) => { + stats.dead_last_hour.fetch_add(1, Ordering::Relaxed); + } + } +} + +impl ActivityQueue { + fn new( + client: ClientWithMiddleware, + worker_count: usize, + retry_count: usize, + timeout: Duration, + backoff: usize, // This should be 60 seconds by default or 1 second in tests + ) -> Self { + let stats: Arc = Default::default(); + + // This task clears the dead/completed stats every hour + let hour_stats = stats.clone(); + tokio::spawn(async move { + let duration = Duration::from_secs(3600); + loop { + tokio::time::sleep(duration).await; + hour_stats.completed_last_hour.store(0, Ordering::Relaxed); + hour_stats.dead_last_hour.store(0, Ordering::Relaxed); + } + }); + + let (retry_sender, mut retry_receiver) = unbounded_channel(); + let retry_stats = stats.clone(); + let retry_client = client.clone(); + + // The "fast path" retry + // The backoff should be < 5 mins for this to work otherwise signatures may expire + // This strategy is the one that is used with the *same* signature + let strategy = RetryStrategy { + backoff, + retries: 1, + offset: 0, + initial_sleep: 0, + }; + + // The "retry path" strategy + // After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again + let retry_strategy = RetryStrategy { + backoff, + retries: 3, + offset: 2, + initial_sleep: backoff.pow(2), // wait 60 mins before even trying + }; + + let retry_sender_task = tokio::spawn(async move { + let mut join_set = JoinSet::new(); + + while let Some(message) = retry_receiver.recv().await { + let retry_task = retry_worker( + retry_client.clone(), + timeout, + message, + retry_stats.clone(), + retry_strategy, + ); + + if retry_count > 0 { + // If we're over the limit of retries, wait for them to finish before spawning + while join_set.len() >= retry_count { + join_set.join_next().await; + } + + join_set.spawn(retry_task); + } else { + // If the retry worker count is `0` then just spawn and don't use the join_set + tokio::spawn(retry_task); + } + } + + while !join_set.is_empty() { + join_set.join_next().await; + } + }); + + let (sender, mut receiver) = unbounded_channel(); + + let sender_stats = stats.clone(); + + let sender_task = tokio::spawn(async move { + let mut join_set = JoinSet::new(); + + while let Some(message) = receiver.recv().await { + let task = worker( + client.clone(), + timeout, + message, + retry_sender.clone(), + sender_stats.clone(), + strategy, + ); + + if worker_count > 0 { + // If we're over the limit of workers, wait for them to finish before spawning + while join_set.len() >= worker_count { + join_set.join_next().await; + } + + join_set.spawn(task); + } else { + // If the worker count is `0` then just spawn and don't use the join_set + tokio::spawn(task); + } + } + + drop(retry_sender); + + while !join_set.is_empty() { + join_set.join_next().await; + } + }); + + Self { + stats, + sender, + sender_task, + retry_sender_task, + } + } + + async fn queue(&self, message: SendActivityTask) -> Result<(), Error> { + self.stats.pending.fetch_add(1, Ordering::Relaxed); + self.sender + .send(message) + .map_err(|e| Error::ActivityQueueError(e.0.activity_id))?; + + Ok(()) + } + + fn get_stats(&self) -> &Stats { + &self.stats + } + + #[allow(unused)] + // Drops all the senders and shuts down the workers + pub(crate) async fn shutdown(self, wait_for_retries: bool) -> Result, Error> { + drop(self.sender); + + self.sender_task.await?; + + if wait_for_retries { + self.retry_sender_task.await?; + } + + Ok(self.stats) + } +} + +/// Creates an activity queue using tokio spawned tasks +/// Note: requires a tokio runtime +pub(crate) fn create_activity_queue( + client: ClientWithMiddleware, + worker_count: usize, + retry_count: usize, + request_timeout: Duration, +) -> ActivityQueue { + ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60) +} + +/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries +async fn retry>, A: FnMut() -> F>( + mut action: A, + strategy: RetryStrategy, +) -> Result { + let mut count = strategy.offset; + + // Do an initial sleep if it's called for + if strategy.initial_sleep > 0 { + let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64); + tokio::time::sleep(sleep_dur).await; + } + + loop { + match action().await { + Ok(val) => return Ok(val), + Err(err) => { + if count < strategy.retries { + count += 1; + + let sleep_amt = strategy.backoff.pow(count as u32) as u64; + let sleep_dur = Duration::from_secs(sleep_amt); + warn!("{err:?}. Sleeping for {sleep_dur:?} and trying again"); + tokio::time::sleep(sleep_dur).await; + continue; + } else { + return Err(err); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use axum::extract::State; + use bytes::Bytes; + use http::StatusCode; + use std::time::Instant; + + use crate::http_signatures::generate_actor_keypair; + + use super::*; + + #[allow(unused)] + // This will periodically send back internal errors to test the retry + async fn dodgy_handler( + State(state): State>, + headers: HeaderMap, + body: Bytes, + ) -> Result<(), StatusCode> { + debug!("Headers:{:?}", headers); + debug!("Body len:{}", body.len()); + + if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 { + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + Ok(()) + } + + async fn test_server() { + use axum::{routing::post, Router}; + + // We should break every now and then ;) + let state = Arc::new(AtomicUsize::new(0)); + + let app = Router::new() + .route("/", post(dodgy_handler)) + .with_state(state); + + axum::Server::bind(&"0.0.0.0:8001".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + // Queues 100 messages and then asserts that the worker runs them + async fn test_activity_queue_workers() { + let num_workers = 64; + let num_messages: usize = 100; + + tokio::spawn(test_server()); + + /* + // uncomment for debug logs & stats + use tracing::log::LevelFilter; + + env_logger::builder() + .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) + .format_timestamp(None) + .init(); + + */ + + let activity_queue = ActivityQueue::new( + reqwest::Client::default().into(), + num_workers, + num_workers, + Duration::from_secs(10), + 1, + ); + + let keypair = generate_actor_keypair().unwrap(); + + let message = SendActivityTask { + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + + let start = Instant::now(); + + for _ in 0..num_messages { + activity_queue.queue(message.clone()).await.unwrap(); + } + + info!("Queue Sent: {:?}", start.elapsed()); + + let stats = activity_queue.shutdown(true).await.unwrap(); + + info!( + "Queue Finished. Num msgs: {}, Time {:?}, msg/s: {:0.0}", + num_messages, + start.elapsed(), + num_messages as f64 / start.elapsed().as_secs_f64() + ); + + assert_eq!( + stats.completed_last_hour.load(Ordering::Relaxed), + num_messages + ); + } +} diff --git a/src/activity_sending.rs b/src/activity_sending.rs index bd236ad..a333bb7 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -29,12 +29,12 @@ use url::Url; #[derive(Clone, Debug)] /// all info needed to send one activity to one inbox pub struct SendActivityTask<'a> { - actor_id: &'a Url, - activity_id: &'a Url, - activity: Bytes, - inbox: Url, - private_key: PKey, - http_signature_compat: bool, + pub(crate) actor_id: &'a Url, + pub(crate) activity_id: &'a Url, + pub(crate) activity: Bytes, + pub(crate) inbox: Url, + pub(crate) private_key: PKey, + pub(crate) http_signature_compat: bool, } impl Display for SendActivityTask<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -131,7 +131,7 @@ impl SendActivityTask<'_> { } } -async fn get_pkey_cached( +pub(crate) async fn get_pkey_cached( data: &Data, actor: &ActorType, ) -> Result, Error> diff --git a/src/config.rs b/src/config.rs index 7573234..b7aea9c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,7 @@ //! ``` use crate::{ + activity_queue::{create_activity_queue, ActivityQueue}, error::Error, protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor}, @@ -85,6 +86,20 @@ pub struct FederationConfig { setter(custom) )] pub(crate) actor_pkey_cache: Cache>, + /// Queue for sending outgoing activities. Only optional to make builder work, its always + /// present once constructed. + #[builder(setter(skip))] + pub(crate) activity_queue: Option>, + /// When sending with activity queue: Number of tasks that can be in-flight concurrently. + /// Tasks are retried once after a minute, then put into the retry queue. + /// Setting this count to `0` means that there is no limit to concurrency + #[builder(default = "0")] + pub(crate) queue_worker_count: usize, + /// When sending with activity queue: Number of concurrent tasks that are being retried + /// in-flight concurrently. Tasks are retried after an hour, then again in 60 hours. + /// Setting this count to `0` means that there is no limit to concurrency + #[builder(default = "0")] + pub(crate) queue_retry_count: usize, } impl FederationConfig { @@ -197,7 +212,14 @@ impl FederationConfigBuilder { /// queue for outgoing activities, which is stored internally in the config struct. /// Requires a tokio runtime for the background queue. pub async fn build(&mut self) -> Result, FederationConfigBuilderError> { - let config = self.partial_build()?; + let mut config = self.partial_build()?; + let queue = create_activity_queue( + config.client.clone(), + config.queue_worker_count, + config.queue_retry_count, + config.request_timeout, + ); + config.activity_queue = Some(Arc::new(queue)); Ok(config) } } diff --git a/src/error.rs b/src/error.rs index ba9248a..c169e3b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,9 +1,9 @@ //! Error messages returned by this library -use std::string::FromUtf8Error; - use http_signature_normalization_reqwest::SignError; use openssl::error::ErrorStack; +use std::string::FromUtf8Error; +use tokio::task::JoinError; use url::Url; use crate::fetch::webfinger::WebFingerError; @@ -62,6 +62,12 @@ pub enum Error { /// Signing errors #[error(transparent)] SignError(#[from] SignError), + /// Failed to queue activity for sending + #[error("Failed to queue activity {0} for sending")] + ActivityQueueError(Url), + /// Stop activity queue + #[error(transparent)] + StopActivityQueue(#[from] JoinError), /// Other generic errors #[error("{0}")] Other(String), diff --git a/src/lib.rs b/src/lib.rs index f482aa0..2ba88b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ #![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")] #![deny(missing_docs)] +pub mod activity_queue; pub mod activity_sending; #[cfg(feature = "actix-web")] pub mod actix_web;