From 51443aa57c9f33986feee42cb61ac48306b3c3db Mon Sep 17 00:00:00 2001 From: phiresky Date: Fri, 1 Sep 2023 11:19:22 +0200 Subject: [PATCH] Remove activity queue and add raw sending (#75) * make prepare_raw, sign_raw, send_raw functions public * remove in-memory activity queue * rename module * comment * don"t clone * fix doc comment * remove send_activity function --------- Co-authored-by: Nutomic --- Cargo.toml | 2 + docs/05_configuration.md | 2 +- docs/09_sending_activities.md | 9 +- .../live_federation/activities/create_post.rs | 9 +- examples/local_federation/objects/person.rs | 7 +- src/activity_queue.rs | 667 ------------------ src/activity_sending.rs | 300 ++++++++ src/actix_web/inbox.rs | 2 +- src/config.rs | 61 +- src/http_signatures.rs | 4 +- src/lib.rs | 2 +- 11 files changed, 340 insertions(+), 725 deletions(-) delete mode 100644 src/activity_queue.rs create mode 100644 src/activity_sending.rs diff --git a/Cargo.toml b/Cargo.toml index e9e5a06..2b8da86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,8 @@ axum = { version = "0.6.18", features = [ ], default-features = false, optional = true } tower = { version = "0.4.13", optional = true } hyper = { version = "0.14", optional = true } +futures = "0.3.28" +moka = { version = "0.11.2", features = ["future"] } [features] default = ["actix-web", "axum"] diff --git a/docs/05_configuration.md b/docs/05_configuration.md index 315bc66..8569e11 100644 --- a/docs/05_configuration.md +++ b/docs/05_configuration.md @@ -14,4 +14,4 @@ let config = FederationConfig::builder() # }).unwrap() ``` -`debug` is necessary to test federation with http and localhost URLs, but it should never be used in production. The `worker_count` value can be adjusted depending on the instance size. A lower value saves resources on a small instance, while a higher value is necessary on larger instances to keep up with send jobs. `url_verifier` can be used to implement a domain blacklist. \ No newline at end of file +`debug` is necessary to test federation with http and localhost URLs, but it should never be used in production. `url_verifier` can be used to implement a domain blacklist. diff --git a/docs/09_sending_activities.md b/docs/09_sending_activities.md index 649ec17..215be5e 100644 --- a/docs/09_sending_activities.md +++ b/docs/09_sending_activities.md @@ -4,7 +4,7 @@ To send an activity we need to initialize our previously defined struct, and pic ``` # use activitypub_federation::config::FederationConfig; -# use activitypub_federation::activity_queue::send_activity; +# use activitypub_federation::activity_sending::SendActivityTask; # use activitypub_federation::http_signatures::generate_actor_keypair; # use activitypub_federation::traits::Actor; # use activitypub_federation::fetch::object_id::ObjectId; @@ -25,7 +25,11 @@ let activity = Follow { id: "https://lemmy.ml/activities/321".try_into()? }; let inboxes = vec![recipient.shared_inbox_or_inbox()]; -send_activity(activity, &sender, inboxes, &data).await?; + +let sends = SendActivityTask::prepare(&activity, &sender, inboxes, &data).await?; +for send in sends { + send.sign_and_send(&data).await?; +} # Ok::<(), anyhow::Error>(()) # }).unwrap() ``` @@ -38,6 +42,7 @@ private key. Finally the activity is delivered to the inbox. It is possible that delivery fails because the target instance is temporarily unreachable. In this case the task is scheduled for retry after a certain waiting time. For each task delivery is retried up to 3 times after the initial attempt. The retry intervals are as follows: + - one minute, in case of service restart - one hour, in case of instance maintenance - 2.5 days, in case of major incident with rebuild from backup diff --git a/examples/live_federation/activities/create_post.rs b/examples/live_federation/activities/create_post.rs index 66928a6..51ee5e5 100644 --- a/examples/live_federation/activities/create_post.rs +++ b/examples/live_federation/activities/create_post.rs @@ -6,7 +6,7 @@ use crate::{ DbPost, }; use activitypub_federation::{ - activity_queue::send_activity, + activity_sending::SendActivityTask, config::Data, fetch::object_id::ObjectId, kinds::activity::CreateType, @@ -39,7 +39,12 @@ impl CreatePost { id: generate_object_id(data.domain())?, }; let create_with_context = WithContext::new_default(create); - send_activity(create_with_context, &data.local_user(), vec![inbox], data).await?; + let sends = + SendActivityTask::prepare(&create_with_context, &data.local_user(), vec![inbox], data) + .await?; + for send in sends { + send.sign_and_send(data).await?; + } Ok(()) } } diff --git a/examples/local_federation/objects/person.rs b/examples/local_federation/objects/person.rs index efdcc70..5961205 100644 --- a/examples/local_federation/objects/person.rs +++ b/examples/local_federation/objects/person.rs @@ -6,7 +6,7 @@ use crate::{ utils::generate_object_id, }; use activitypub_federation::{ - activity_queue::send_activity, + activity_sending::SendActivityTask, config::Data, fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, http_signatures::generate_actor_keypair, @@ -113,7 +113,10 @@ impl DbUser { ::Error: From + From, { let activity = WithContext::new_default(activity); - send_activity(activity, self, recipients, data).await?; + let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; + for send in sends { + send.sign_and_send(data).await?; + } Ok(()) } } diff --git a/src/activity_queue.rs b/src/activity_queue.rs deleted file mode 100644 index 3f06d09..0000000 --- a/src/activity_queue.rs +++ /dev/null @@ -1,667 +0,0 @@ -//! Queue for signing and sending outgoing activities with retry -//! -#![doc = include_str!("../docs/09_sending_activities.md")] - -use crate::{ - config::Data, - error::Error, - http_signatures::sign_request, - reqwest_shim::ResponseExt, - traits::{ActivityHandler, Actor}, - FEDERATION_CONTENT_TYPE, -}; -use anyhow::{anyhow, Context}; - -use bytes::Bytes; -use futures_core::Future; -use http::{header::HeaderName, HeaderMap, HeaderValue}; -use httpdate::fmt_http_date; -use itertools::Itertools; -use openssl::pkey::{PKey, Private}; -use reqwest::Request; -use reqwest_middleware::ClientWithMiddleware; -use serde::Serialize; -use std::{ - fmt::{Debug, Display}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::{Duration, SystemTime}, -}; -use tokio::{ - sync::mpsc::{unbounded_channel, UnboundedSender}, - task::{JoinHandle, JoinSet}, -}; -use tracing::{debug, info, warn}; -use url::Url; - -/// Send a new activity to the given inboxes -/// -/// - `activity`: The activity to be sent, gets converted to json -/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP -/// signature. Generated with [crate::http_signatures::generate_actor_keypair]. -/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor -/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] -/// for each target actor. -pub async fn send_activity( - activity: Activity, - actor: &ActorType, - inboxes: Vec, - data: &Data, -) -> Result<(), ::Error> -where - Activity: ActivityHandler + Serialize, - ::Error: From + From, - Datatype: Clone, - ActorType: Actor, -{ - let config = &data.config; - let actor_id = activity.actor(); - let activity_id = activity.id(); - let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into(); - let private_key_pem = actor - .private_key_pem() - .ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?; - - // This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening - let private_key = tokio::task::spawn_blocking(move || { - PKey::private_key_from_pem(private_key_pem.as_bytes()) - .map_err(|err| anyhow!("Could not create private key from PEM data:{err}")) - }) - .await - .map_err(|err| anyhow!("Error joining:{err}"))??; - - let inboxes: Vec = inboxes - .into_iter() - .unique() - .filter(|i| !config.is_local_url(i)) - .collect(); - // This field is only optional to make builder work, its always present at this point - let activity_queue = config - .activity_queue - .as_ref() - .expect("Config has activity queue"); - for inbox in inboxes { - if let Err(err) = config.verify_url_valid(&inbox).await { - debug!("inbox url invalid, skipping: {inbox}: {err}"); - continue; - } - - let message = SendActivityTask { - actor_id: actor_id.clone(), - activity_id: activity_id.clone(), - inbox, - activity: activity_serialized.clone(), - private_key: private_key.clone(), - http_signature_compat: config.http_signature_compat, - }; - - // Don't use the activity queue if this is in debug mode, send and wait directly - if config.debug { - if let Err(err) = sign_and_send( - &message, - &config.client, - config.request_timeout, - Default::default(), - ) - .await - { - warn!("{err}"); - } - } else { - activity_queue.queue(message).await?; - let stats = activity_queue.get_stats(); - let running = stats.running.load(Ordering::Relaxed); - if running == config.worker_count && config.worker_count != 0 { - warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count); - warn!("{:?}", stats); - } else { - info!("{:?}", stats); - } - } - } - - Ok(()) -} - -#[derive(Clone, Debug)] -struct SendActivityTask { - actor_id: Url, - activity_id: Url, - activity: Bytes, - inbox: Url, - private_key: PKey, - http_signature_compat: bool, -} - -async fn sign_and_send( - task: &SendActivityTask, - client: &ClientWithMiddleware, - timeout: Duration, - retry_strategy: RetryStrategy, -) -> Result<(), anyhow::Error> { - debug!( - "Sending {} to {}, contents:\n {}", - task.activity_id, - task.inbox, - serde_json::from_slice::(&task.activity)? - ); - let request_builder = client - .post(task.inbox.to_string()) - .timeout(timeout) - .headers(generate_request_headers(&task.inbox)); - let request = sign_request( - request_builder, - &task.actor_id, - task.activity.clone(), - task.private_key.clone(), - task.http_signature_compat, - ) - .await - .context("signing request")?; - - retry( - || { - send( - task, - client, - request - .try_clone() - .expect("The body of the request is not cloneable"), - ) - }, - retry_strategy, - ) - .await -} - -async fn send( - task: &SendActivityTask, - client: &ClientWithMiddleware, - request: Request, -) -> Result<(), anyhow::Error> { - let response = client.execute(request).await; - - match response { - Ok(o) if o.status().is_success() => { - debug!( - "Activity {} delivered successfully to {}", - task.activity_id, task.inbox - ); - Ok(()) - } - Ok(o) if o.status().is_client_error() => { - let text = o.text_limited().await.map_err(Error::other)?; - debug!( - "Activity {} was rejected by {}, aborting: {}", - task.activity_id, task.inbox, text, - ); - Ok(()) - } - Ok(o) => { - let status = o.status(); - let text = o.text_limited().await.map_err(Error::other)?; - Err(anyhow!( - "Queueing activity {} to {} for retry after failure with status {}: {}", - task.activity_id, - task.inbox, - status, - text, - )) - } - Err(e) => Err(anyhow!( - "Queueing activity {} to {} for retry after connection failure: {}", - task.activity_id, - task.inbox, - e - )), - } -} - -pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { - let mut host = inbox_url.domain().expect("read inbox domain").to_string(); - if let Some(port) = inbox_url.port() { - host = format!("{}:{}", host, port); - } - - let mut headers = HeaderMap::new(); - headers.insert( - HeaderName::from_static("content-type"), - HeaderValue::from_static(FEDERATION_CONTENT_TYPE), - ); - headers.insert( - HeaderName::from_static("host"), - HeaderValue::from_str(&host).expect("Hostname is valid"), - ); - headers.insert( - "date", - HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"), - ); - headers -} - -/// A simple activity queue which spawns tokio workers to send out requests -/// When creating a queue, it will spawn a task per worker thread -/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) -pub(crate) struct ActivityQueue { - // Stats shared between the queue and workers - stats: Arc, - sender: UnboundedSender, - sender_task: JoinHandle<()>, - retry_sender_task: JoinHandle<()>, -} - -/// Simple stat counter to show where we're up to with sending messages -/// This is a lock-free way to share things between tasks -/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning -#[derive(Default)] -pub(crate) struct Stats { - pending: AtomicUsize, - running: AtomicUsize, - retries: AtomicUsize, - dead_last_hour: AtomicUsize, - completed_last_hour: AtomicUsize, -} - -impl Debug for Stats { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}", - self.pending.load(Ordering::Relaxed), - self.running.load(Ordering::Relaxed), - self.retries.load(Ordering::Relaxed), - self.dead_last_hour.load(Ordering::Relaxed), - self.completed_last_hour.load(Ordering::Relaxed) - ) - } -} - -#[derive(Clone, Copy, Default)] -struct RetryStrategy { - /// Amount of time in seconds to back off - backoff: usize, - /// Amount of times to retry - retries: usize, - /// If this particular request has already been retried, you can add an offset here to increment the count to start - offset: usize, - /// Number of seconds to sleep before trying - initial_sleep: usize, -} - -/// A tokio spawned worker which is responsible for submitting requests to federated servers -/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue. -/// We need to retry activity sending in case the target instances is temporarily unreachable. -/// In this case, the task is stored and resent when the instance is hopefully back up. This -/// list shows the retry intervals, and which events of the target instance can be covered: -/// - 60s (one minute, service restart) -- happens in the worker w/ same signature -/// - 60min (one hour, instance maintenance) --- happens in the retry worker -/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker -async fn worker( - client: ClientWithMiddleware, - timeout: Duration, - message: SendActivityTask, - retry_queue: UnboundedSender, - stats: Arc, - strategy: RetryStrategy, -) { - stats.pending.fetch_sub(1, Ordering::Relaxed); - stats.running.fetch_add(1, Ordering::Relaxed); - - let outcome = sign_and_send(&message, &client, timeout, strategy).await; - - // "Running" has finished, check the outcome - stats.running.fetch_sub(1, Ordering::Relaxed); - - match outcome { - Ok(_) => { - stats.completed_last_hour.fetch_add(1, Ordering::Relaxed); - } - Err(_err) => { - stats.retries.fetch_add(1, Ordering::Relaxed); - warn!( - "Sending activity {} to {} to the retry queue to be tried again later", - message.activity_id, message.inbox - ); - // Send to the retry queue. Ignoring whether it succeeds or not - retry_queue.send(message).ok(); - } - } -} - -async fn retry_worker( - client: ClientWithMiddleware, - timeout: Duration, - message: SendActivityTask, - stats: Arc, - strategy: RetryStrategy, -) { - // Because the times are pretty extravagant between retries, we have to re-sign each time - let outcome = retry( - || { - sign_and_send( - &message, - &client, - timeout, - RetryStrategy { - backoff: 0, - retries: 0, - offset: 0, - initial_sleep: 0, - }, - ) - }, - strategy, - ) - .await; - - stats.retries.fetch_sub(1, Ordering::Relaxed); - - match outcome { - Ok(_) => { - stats.completed_last_hour.fetch_add(1, Ordering::Relaxed); - } - Err(_err) => { - stats.dead_last_hour.fetch_add(1, Ordering::Relaxed); - } - } -} - -impl ActivityQueue { - fn new( - client: ClientWithMiddleware, - worker_count: usize, - retry_count: usize, - timeout: Duration, - backoff: usize, // This should be 60 seconds by default or 1 second in tests - ) -> Self { - let stats: Arc = Default::default(); - - // This task clears the dead/completed stats every hour - let hour_stats = stats.clone(); - tokio::spawn(async move { - let duration = Duration::from_secs(3600); - loop { - tokio::time::sleep(duration).await; - hour_stats.completed_last_hour.store(0, Ordering::Relaxed); - hour_stats.dead_last_hour.store(0, Ordering::Relaxed); - } - }); - - let (retry_sender, mut retry_receiver) = unbounded_channel(); - let retry_stats = stats.clone(); - let retry_client = client.clone(); - - // The "fast path" retry - // The backoff should be < 5 mins for this to work otherwise signatures may expire - // This strategy is the one that is used with the *same* signature - let strategy = RetryStrategy { - backoff, - retries: 1, - offset: 0, - initial_sleep: 0, - }; - - // The "retry path" strategy - // After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again - let retry_strategy = RetryStrategy { - backoff, - retries: 3, - offset: 2, - initial_sleep: backoff.pow(2), // wait 60 mins before even trying - }; - - let retry_sender_task = tokio::spawn(async move { - let mut join_set = JoinSet::new(); - - while let Some(message) = retry_receiver.recv().await { - let retry_task = retry_worker( - retry_client.clone(), - timeout, - message, - retry_stats.clone(), - retry_strategy, - ); - - if retry_count > 0 { - // If we're over the limit of retries, wait for them to finish before spawning - while join_set.len() >= retry_count { - join_set.join_next().await; - } - - join_set.spawn(retry_task); - } else { - // If the retry worker count is `0` then just spawn and don't use the join_set - tokio::spawn(retry_task); - } - } - - while !join_set.is_empty() { - join_set.join_next().await; - } - }); - - let (sender, mut receiver) = unbounded_channel(); - - let sender_stats = stats.clone(); - - let sender_task = tokio::spawn(async move { - let mut join_set = JoinSet::new(); - - while let Some(message) = receiver.recv().await { - let task = worker( - client.clone(), - timeout, - message, - retry_sender.clone(), - sender_stats.clone(), - strategy, - ); - - if worker_count > 0 { - // If we're over the limit of workers, wait for them to finish before spawning - while join_set.len() >= worker_count { - join_set.join_next().await; - } - - join_set.spawn(task); - } else { - // If the worker count is `0` then just spawn and don't use the join_set - tokio::spawn(task); - } - } - - drop(retry_sender); - - while !join_set.is_empty() { - join_set.join_next().await; - } - }); - - Self { - stats, - sender, - sender_task, - retry_sender_task, - } - } - - async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> { - self.stats.pending.fetch_add(1, Ordering::Relaxed); - self.sender.send(message)?; - - Ok(()) - } - - fn get_stats(&self) -> &Stats { - &self.stats - } - - #[allow(unused)] - // Drops all the senders and shuts down the workers - pub(crate) async fn shutdown( - self, - wait_for_retries: bool, - ) -> Result, anyhow::Error> { - drop(self.sender); - - self.sender_task.await?; - - if wait_for_retries { - self.retry_sender_task.await?; - } - - Ok(self.stats) - } -} - -/// Creates an activity queue using tokio spawned tasks -/// Note: requires a tokio runtime -pub(crate) fn create_activity_queue( - client: ClientWithMiddleware, - worker_count: usize, - retry_count: usize, - request_timeout: Duration, -) -> ActivityQueue { - ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60) -} - -/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries -async fn retry>, A: FnMut() -> F>( - mut action: A, - strategy: RetryStrategy, -) -> Result { - let mut count = strategy.offset; - - // Do an initial sleep if it's called for - if strategy.initial_sleep > 0 { - let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64); - tokio::time::sleep(sleep_dur).await; - } - - loop { - match action().await { - Ok(val) => return Ok(val), - Err(err) => { - if count < strategy.retries { - count += 1; - - let sleep_amt = strategy.backoff.pow(count as u32) as u64; - let sleep_dur = Duration::from_secs(sleep_amt); - warn!("{err:?}. Sleeping for {sleep_dur:?} and trying again"); - tokio::time::sleep(sleep_dur).await; - continue; - } else { - return Err(err); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use axum::extract::State; - use bytes::Bytes; - use http::StatusCode; - use std::time::Instant; - - use crate::http_signatures::generate_actor_keypair; - - use super::*; - - #[allow(unused)] - // This will periodically send back internal errors to test the retry - async fn dodgy_handler( - State(state): State>, - headers: HeaderMap, - body: Bytes, - ) -> Result<(), StatusCode> { - debug!("Headers:{:?}", headers); - debug!("Body len:{}", body.len()); - - if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 { - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - Ok(()) - } - - async fn test_server() { - use axum::{routing::post, Router}; - - // We should break every now and then ;) - let state = Arc::new(AtomicUsize::new(0)); - - let app = Router::new() - .route("/", post(dodgy_handler)) - .with_state(state); - - axum::Server::bind(&"0.0.0.0:8001".parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); - } - - #[tokio::test(flavor = "multi_thread")] - // Queues 100 messages and then asserts that the worker runs them - async fn test_activity_queue_workers() { - let num_workers = 64; - let num_messages: usize = 100; - - tokio::spawn(test_server()); - - /* - // uncomment for debug logs & stats - use tracing::log::LevelFilter; - - env_logger::builder() - .filter_level(LevelFilter::Warn) - .filter_module("activitypub_federation", LevelFilter::Info) - .format_timestamp(None) - .init(); - - */ - - let activity_queue = ActivityQueue::new( - reqwest::Client::default().into(), - num_workers, - num_workers, - Duration::from_secs(10), - 1, - ); - - let keypair = generate_actor_keypair().unwrap(); - - let message = SendActivityTask { - actor_id: "http://localhost:8001".parse().unwrap(), - activity_id: "http://localhost:8001/activity".parse().unwrap(), - activity: "{}".into(), - inbox: "http://localhost:8001".parse().unwrap(), - private_key: keypair.private_key().unwrap(), - http_signature_compat: true, - }; - - let start = Instant::now(); - - for _ in 0..num_messages { - activity_queue.queue(message.clone()).await.unwrap(); - } - - info!("Queue Sent: {:?}", start.elapsed()); - - let stats = activity_queue.shutdown(true).await.unwrap(); - - info!( - "Queue Finished. Num msgs: {}, Time {:?}, msg/s: {:0.0}", - num_messages, - start.elapsed(), - num_messages as f64 / start.elapsed().as_secs_f64() - ); - - assert_eq!( - stats.completed_last_hour.load(Ordering::Relaxed), - num_messages - ); - } -} diff --git a/src/activity_sending.rs b/src/activity_sending.rs new file mode 100644 index 0000000..30713b4 --- /dev/null +++ b/src/activity_sending.rs @@ -0,0 +1,300 @@ +//! Queue for signing and sending outgoing activities with retry +//! +#![doc = include_str!("../docs/09_sending_activities.md")] + +use crate::{ + config::Data, + error::Error, + http_signatures::sign_request, + reqwest_shim::ResponseExt, + traits::{ActivityHandler, Actor}, + FEDERATION_CONTENT_TYPE, +}; +use anyhow::{anyhow, Context}; + +use bytes::Bytes; +use futures::StreamExt; +use http::{header::HeaderName, HeaderMap, HeaderValue}; +use httpdate::fmt_http_date; +use itertools::Itertools; +use openssl::pkey::{PKey, Private}; +use reqwest::Request; +use reqwest_middleware::ClientWithMiddleware; +use serde::Serialize; +use std::{ + self, + fmt::{Debug, Display}, + time::{Duration, SystemTime}, +}; +use tracing::debug; +use url::Url; + +#[derive(Clone, Debug)] +/// all info needed to send one activity to one inbox +pub struct SendActivityTask<'a> { + actor_id: &'a Url, + activity_id: &'a Url, + activity: Bytes, + inbox: Url, + private_key: PKey, + http_signature_compat: bool, +} +impl Display for SendActivityTask<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} to {}", self.activity_id, self.inbox) + } +} + +impl SendActivityTask<'_> { + /// prepare an activity for sending + /// + /// - `activity`: The activity to be sent, gets converted to json + /// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor + /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] + /// for each target actor. + pub async fn prepare<'a, Activity, Datatype, ActorType>( + activity: &'a Activity, + actor: &ActorType, + inboxes: Vec, + data: &Data, + ) -> Result>, ::Error> + where + Activity: ActivityHandler + Serialize, + ::Error: From + From, + Datatype: Clone, + ActorType: Actor, + { + let config = &data.config; + let actor_id = activity.actor(); + let activity_id = activity.id(); + let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into(); + let private_key = get_pkey_cached(data, actor).await?; + + Ok(futures::stream::iter( + inboxes + .into_iter() + .unique() + .filter(|i| !config.is_local_url(i)), + ) + .filter_map(|inbox| async { + if let Err(err) = config.verify_url_valid(&inbox).await { + debug!("inbox url invalid, skipping: {inbox}: {err}"); + return None; + }; + Some(SendActivityTask { + actor_id, + activity_id, + inbox, + activity: activity_serialized.clone(), + private_key: private_key.clone(), + http_signature_compat: config.http_signature_compat, + }) + }) + .collect() + .await) + } + + /// convert a sendactivitydata to a request, signing and sending it + pub async fn sign_and_send( + &self, + data: &Data, + ) -> Result<(), anyhow::Error> { + let req = self + .sign(&data.config.client, data.config.request_timeout) + .await?; + self.send(&data.config.client, req).await + } + async fn sign( + &self, + client: &ClientWithMiddleware, + timeout: Duration, + ) -> Result { + let task = self; + let request_builder = client + .post(task.inbox.to_string()) + .timeout(timeout) + .headers(generate_request_headers(&task.inbox)); + let request = sign_request( + request_builder, + task.actor_id, + task.activity.clone(), + task.private_key.clone(), + task.http_signature_compat, + ) + .await + .context("signing request")?; + Ok(request) + } + + async fn send( + &self, + client: &ClientWithMiddleware, + request: Request, + ) -> Result<(), anyhow::Error> { + let response = client.execute(request).await; + + match response { + Ok(o) if o.status().is_success() => { + debug!("Activity {self} delivered successfully"); + Ok(()) + } + Ok(o) if o.status().is_client_error() => { + let text = o.text_limited().await.map_err(Error::other)?; + debug!("Activity {self} was rejected, aborting: {text}"); + Ok(()) + } + Ok(o) => { + let status = o.status(); + let text = o.text_limited().await.map_err(Error::other)?; + Err(anyhow!( + "Activity {self} failure with status {status}: {text}", + )) + } + Err(e) => Err(anyhow!("Activity {self} connection failure: {e}")), + } + } +} + +async fn get_pkey_cached( + data: &Data, + actor: &ActorType, +) -> Result, anyhow::Error> +where + ActorType: Actor, +{ + let actor_id = actor.id(); + // PKey is internally like an Arc<>, so cloning is ok + data.config + .actor_pkey_cache + .try_get_with_by_ref(&actor_id, async { + let private_key_pem = actor.private_key_pem().ok_or_else(|| { + anyhow!("Actor {actor_id} does not contain a private key for signing") + })?; + + // This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening + let pkey = tokio::task::spawn_blocking(move || { + PKey::private_key_from_pem(private_key_pem.as_bytes()) + .map_err(|err| anyhow!("Could not create private key from PEM data:{err}")) + }) + .await + .map_err(|err| anyhow!("Error joining: {err}"))??; + std::result::Result::, anyhow::Error>::Ok(pkey) + }) + .await + .map_err(|e| anyhow!("cloned error: {e}")) +} + +pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { + let mut host = inbox_url.domain().expect("read inbox domain").to_string(); + if let Some(port) = inbox_url.port() { + host = format!("{}:{}", host, port); + } + + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::from_static("content-type"), + HeaderValue::from_static(FEDERATION_CONTENT_TYPE), + ); + headers.insert( + HeaderName::from_static("host"), + HeaderValue::from_str(&host).expect("Hostname is valid"), + ); + headers.insert( + "date", + HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"), + ); + headers +} + +#[cfg(test)] +mod tests { + use axum::extract::State; + use bytes::Bytes; + use http::StatusCode; + use std::{ + sync::{atomic::AtomicUsize, Arc}, + time::Instant, + }; + use tracing::info; + + use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; + + use super::*; + + #[allow(unused)] + // This will periodically send back internal errors to test the retry + async fn dodgy_handler( + State(state): State>, + headers: HeaderMap, + body: Bytes, + ) -> Result<(), StatusCode> { + debug!("Headers:{:?}", headers); + debug!("Body len:{}", body.len()); + + /*if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 { + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }*/ + Ok(()) + } + + async fn test_server() { + use axum::{routing::post, Router}; + + // We should break every now and then ;) + let state = Arc::new(AtomicUsize::new(0)); + + let app = Router::new() + .route("/", post(dodgy_handler)) + .with_state(state); + + axum::Server::bind(&"0.0.0.0:8001".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + // Sends 100 messages + async fn test_activity_sending() -> anyhow::Result<()> { + let num_messages: usize = 100; + + tokio::spawn(test_server()); + + /* + // uncomment for debug logs & stats + use tracing::log::LevelFilter; + + env_logger::builder() + .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) + .format_timestamp(None) + .init(); + + */ + let keypair = generate_actor_keypair().unwrap(); + + let message = SendActivityTask { + actor_id: &"http://localhost:8001".parse().unwrap(), + activity_id: &"http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + let data = FederationConfig::builder() + .app_data(()) + .domain("localhost") + .build() + .await? + .to_request_data(); + + let start = Instant::now(); + + for _ in 0..num_messages { + message.sign_and_send(&data).await?; + } + + info!("Queue Sent: {:?}", start.elapsed()); + Ok(()) + } +} diff --git a/src/actix_web/inbox.rs b/src/actix_web/inbox.rs index d4d1abf..ba5a20b 100644 --- a/src/actix_web/inbox.rs +++ b/src/actix_web/inbox.rs @@ -57,7 +57,7 @@ where mod test { use super::*; use crate::{ - activity_queue::generate_request_headers, + activity_sending::generate_request_headers, config::FederationConfig, http_signatures::sign_request, traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, diff --git a/src/config.rs b/src/config.rs index bd2f675..7f52aa7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,22 +9,21 @@ //! .domain("example.com") //! .app_data(()) //! .http_fetch_limit(50) -//! .worker_count(16) //! .build().await?; //! # Ok::<(), anyhow::Error>(()) //! # }).unwrap() //! ``` use crate::{ - activity_queue::{create_activity_queue, ActivityQueue}, error::Error, protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor}, }; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use async_trait::async_trait; use derive_builder::Builder; use dyn_clone::{clone_trait_object, DynClone}; +use moka::future::Cache; use openssl::pkey::{PKey, Private}; use reqwest_middleware::ClientWithMiddleware; use serde::de::DeserializeOwned; @@ -56,16 +55,6 @@ pub struct FederationConfig { /// HTTP client used for all outgoing requests. Middleware can be used to add functionality /// like log tracing or retry of failed requests. pub(crate) client: ClientWithMiddleware, - /// Number of tasks that can be in-flight concurrently. - /// Tasks are retried once after a minute, then put into the retry queue. - /// Setting this count to `0` means that there is no limit to concurrency - #[builder(default = "0")] - pub(crate) worker_count: usize, - /// Number of concurrent tasks that are being retried in-flight concurrently. - /// Tasks are retried after an hour, then again in 60 hours. - /// Setting this count to `0` means that there is no limit to concurrency - #[builder(default = "0")] - pub(crate) retry_count: usize, /// Run library in debug mode. This allows usage of http and localhost urls. It also sends /// outgoing activities synchronously, not in background thread. This helps to make tests /// more consistent. Do not use for production. @@ -92,10 +81,11 @@ pub struct FederationConfig { /// #[builder(default = "None", setter(custom))] pub(crate) signed_fetch_actor: Option)>>, - /// Queue for sending outgoing activities. Only optional to make builder work, its always - /// present once constructed. - #[builder(setter(skip))] - pub(crate) activity_queue: Option>, + #[builder( + default = "Cache::builder().max_capacity(10000).build()", + setter(custom) + )] + pub(crate) actor_pkey_cache: Cache>, } impl FederationConfig { @@ -186,28 +176,6 @@ impl FederationConfig { pub fn domain(&self) -> &str { &self.domain } - /// Shut down this federation, waiting for the outgoing queue to be sent. - /// If the activityqueue is still in use in other requests or was never constructed, returns an error. - /// If wait_retries is true, also wait for requests that have initially failed and are being retried. - /// Returns a stats object that can be printed for debugging (structure currently not part of the public interface). - /// - /// Currently, this method does not work correctly if worker_count = 0 (unlimited) - pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result { - let q = self - .activity_queue - .take() - .context("ActivityQueue never constructed, build() not called?")?; - // Todo: use Arc::into_inner but is only part of rust 1.70. - let stats = Arc::::try_unwrap(q) - .map_err(|_| { - anyhow::anyhow!( - "Could not cleanly shut down: activityqueue arc was still in use elsewhere " - ) - })? - .shutdown(wait_retries) - .await?; - Ok(stats) - } } impl FederationConfigBuilder { @@ -223,20 +191,19 @@ impl FederationConfigBuilder { self } + /// sets the number of parsed actor private keys to keep in memory + pub fn actor_pkey_cache(&mut self, cache_size: u64) -> &mut Self { + self.actor_pkey_cache = Some(Cache::builder().max_capacity(cache_size).build()); + self + } + /// Constructs a new config instance with the values supplied to builder. /// /// Values which are not explicitly specified use the defaults. Also initializes the /// queue for outgoing activities, which is stored internally in the config struct. /// Requires a tokio runtime for the background queue. pub async fn build(&mut self) -> Result, FederationConfigBuilderError> { - let mut config = self.partial_build()?; - let queue = create_activity_queue( - config.client.clone(), - config.worker_count, - config.retry_count, - config.request_timeout, - ); - config.activity_queue = Some(Arc::new(queue)); + let config = self.partial_build()?; Ok(config) } } diff --git a/src/http_signatures.rs b/src/http_signatures.rs index 0b9d06c..96ce936 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -1,7 +1,7 @@ //! Generating keypairs, creating and verifying signatures //! //! Signature creation and verification is handled internally in the library. See -//! [send_activity](crate::activity_queue::send_activity) and +//! [send_activity](crate::activity_sending::send_activity) and //! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) / //! [receive_activity (axum)](crate::axum::inbox::receive_activity). @@ -274,7 +274,7 @@ pub(crate) fn verify_body_hash( #[cfg(test)] pub mod test { use super::*; - use crate::activity_queue::generate_request_headers; + use crate::activity_sending::generate_request_headers; use reqwest::Client; use reqwest_middleware::ClientWithMiddleware; use std::str::FromStr; diff --git a/src/lib.rs b/src/lib.rs index 2031c98..c660253 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ #![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")] #![deny(missing_docs)] -pub mod activity_queue; +pub mod activity_sending; #[cfg(feature = "actix-web")] pub mod actix_web; #[cfg(feature = "axum")]