diff --git a/Cargo.lock b/Cargo.lock index 1af7659..c9106ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,7 @@ dependencies = [ "http", "http-signature-normalization-actix", "http-signature-normalization-reqwest", + "itertools", "once_cell", "openssl", "rand", @@ -520,6 +521,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c97b9233581d84b8e1e689cdd3a47b6f69770084fc246e86a7f78b0d9c1d4a5" +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "encoding_rs" version = "0.8.31" @@ -899,6 +906,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 4dd620b..59d5c12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ http-signature-normalization-reqwest = { version = "0.5.0", default-features = f background-jobs = "0.12.0" thiserror = "1.0.31" derive_builder = "0.11.2" +itertools = "0.10.3" [dev-dependencies] activitystreams-kinds = "0.2.1" diff --git a/examples/federation/activities/follow.rs b/examples/federation/activities/follow.rs index 6122e39..270dff3 100644 --- a/examples/federation/activities/follow.rs +++ b/examples/federation/activities/follow.rs @@ -73,12 +73,7 @@ impl ActivityHandler for Follow { let id = generate_object_id(data.local_instance().hostname())?; let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); local_user - .send( - id, - accept, - vec![follower.inbox.clone()], - data.local_instance(), - ) + .send(accept, &[follower], data.local_instance()) .await?; Ok(()) } diff --git a/examples/federation/objects/person.rs b/examples/federation/objects/person.rs index 6cc5cfd..55d1316 100644 --- a/examples/federation/objects/person.rs +++ b/examples/federation/objects/person.rs @@ -7,19 +7,17 @@ use crate::{ }; use activitypub_federation::{ core::{ - activity_queue::SendActivity, - inbox::ActorPublicKey, + activity_queue::send_activity, object_id::ObjectId, signatures::{Keypair, PublicKey}, }, deser::context::WithContext, - traits::ApubObject, + traits::{ActivityHandler, Actor, ApubObject}, LocalInstance, }; use activitypub_federation_derive::activity_handler; use activitystreams_kinds::actor::PersonType; use serde::{Deserialize, Serialize}; -use tracing::log::debug; use url::Url; #[derive(Debug, Clone)] @@ -86,48 +84,45 @@ impl MyUser { pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { let id = generate_object_id(instance.local_instance().hostname())?; let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send( - id, - follow, - vec![other.inbox.clone()], - instance.local_instance(), - ) - .await?; + self.send(follow, &[other.clone()], instance.local_instance()) + .await?; Ok(()) } pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { let id = generate_object_id(instance.local_instance().hostname())?; let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); - let mut inboxes = vec![]; + let mut recipients = vec![]; for f in self.followers.clone() { let user: MyUser = ObjectId::new(f) .dereference(instance, instance.local_instance(), &mut 0) .await?; - inboxes.push(user.inbox); + recipients.push(user); } - self.send(id, &create, inboxes, instance.local_instance()) + self.send(create, &recipients, instance.local_instance()) .await?; Ok(()) } - pub(crate) async fn send( + pub(crate) async fn send( &self, - activity_id: Url, activity: Activity, - inboxes: Vec, + recipients: &[ActorT], local_instance: &LocalInstance, - ) -> Result<(), Error> { - let serialized = serde_json::to_string_pretty(&WithContext::new_default(activity))?; - debug!("Sending activity: {}", &serialized); - SendActivity { - activity_id, - actor_public_key: self.public_key(), - actor_private_key: self.private_key.clone().expect("has private key"), - inboxes, - activity: serialized, - } - .send(local_instance) + ) -> Result<(), ::Error> + where + Activity: ActivityHandler + Serialize, + ActorT: Actor, + ::Error: From + From, + { + let activity = WithContext::new_default(activity); + send_activity( + activity, + self.public_key(), + self.private_key.clone().expect("has private key"), + recipients, + local_instance, + ) .await?; Ok(()) } @@ -186,8 +181,12 @@ impl ApubObject for MyUser { } } -impl ActorPublicKey for MyUser { +impl Actor for MyUser { fn public_key(&self) -> &str { &self.public_key } + + fn inbox(&self) -> Url { + self.inbox.clone() + } } diff --git a/src/core/activity_queue.rs b/src/core/activity_queue.rs index 0e3a497..3964158 100644 --- a/src/core/activity_queue.rs +++ b/src/core/activity_queue.rs @@ -1,5 +1,6 @@ use crate::{ core::signatures::{sign_request, PublicKey}, + traits::{ActivityHandler, Actor}, utils::verify_url_valid, Error, InstanceSettings, @@ -16,70 +17,78 @@ use background_jobs::{ WorkerConfig, }; use http::{header::HeaderName, HeaderMap, HeaderValue}; +use itertools::Itertools; use reqwest_middleware::ClientWithMiddleware; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, future::Future, pin::Pin, time::Duration}; use tracing::{info, warn}; use url::Url; -/// Necessary data for sending out an activity -#[derive(Debug)] -pub struct SendActivity { - /// Id of the sent activity, used for logging - pub activity_id: Url, - /// Public key and actor id of the sender - pub actor_public_key: PublicKey, - /// Signing key of sender for HTTP signatures - pub actor_private_key: String, - /// List of Activitypub inboxes that the activity gets delivered to - pub inboxes: Vec, - /// Activity json - pub activity: String, -} +/// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By +/// default, sending is done on a background thread, and automatically retried on failure with +/// exponential backoff. +/// +/// - `activity`: The activity to be sent, gets converted to json +/// - `public_key`: The sending actor's public key. In fact, we only need the key id for signing +/// - `private_key`: The sending actor's private key for signing HTTP signature +/// - `recipients`: List of actors who should receive the activity. This gets deduplicated, and +/// local/invalid inbox urls removed +pub async fn send_activity( + activity: Activity, + public_key: PublicKey, + private_key: String, + recipients: &[ActorT], + instance: &LocalInstance, +) -> Result<(), ::Error> +where + Activity: ActivityHandler + Serialize, + ActorT: Actor, + ::Error: From + From, +{ + let activity_id = activity.id(); + let activity_serialized = serde_json::to_string_pretty(&activity)?; + let inboxes: Vec = recipients + .iter() + .map(|r| r.inbox()) + .unique() + .filter(|i| !instance.is_local_url(i)) + .filter(|i| verify_url_valid(i, &instance.settings).is_ok()) + .collect(); -impl SendActivity { - /// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By - /// default, sending is done on a background thread, and automatically retried on failure with - /// exponential backoff. - /// - /// For debugging or testing, you might want to set [[InstanceSettings.testing_send_sync]]. - pub async fn send(self, instance: &LocalInstance) -> Result<(), Error> { - let activity_queue = &instance.activity_queue; - for inbox in self.inboxes { - if verify_url_valid(&inbox, &instance.settings).is_err() { - continue; + let activity_queue = &instance.activity_queue; + for inbox in inboxes { + if verify_url_valid(&inbox, &instance.settings).is_err() { + continue; + } + let message = SendActivityTask { + activity_id: activity_id.clone(), + inbox, + activity: activity_serialized.clone(), + public_key: public_key.clone(), + private_key: private_key.clone(), + }; + if instance.settings.debug { + let res = do_send(message, &instance.client, instance.settings.request_timeout).await; + // Don't fail on error, as we intentionally do some invalid actions in tests, to verify that + // they are rejected on the receiving side. These errors shouldn't bubble up to make the API + // call fail. This matches the behaviour in production. + if let Err(e) = res { + warn!("{}", e); } - let message = SendActivityTask { - activity_id: self.activity_id.clone(), - inbox, - activity: self.activity.clone(), - public_key: self.actor_public_key.clone(), - private_key: self.actor_private_key.clone(), - }; - if instance.settings.debug { - let res = - do_send(message, &instance.client, instance.settings.request_timeout).await; - // Don't fail on error, as we intentionally do some invalid actions in tests, to verify that - // they are rejected on the receiving side. These errors shouldn't bubble up to make the API - // call fail. This matches the behaviour in production. - if let Err(e) = res { - warn!("{}", e); - } - } else { - activity_queue.queue::(message).await?; - let stats = activity_queue.get_stats().await?; - info!( + } else { + activity_queue.queue::(message).await?; + let stats = activity_queue.get_stats().await?; + info!( "Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}", stats.pending, stats.running, stats.dead.this_hour(), stats.complete.this_hour() ); - } } - - Ok(()) } + + Ok(()) } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/src/core/inbox.rs b/src/core/inbox.rs index 39276b5..a42aefe 100644 --- a/src/core/inbox.rs +++ b/src/core/inbox.rs @@ -1,7 +1,7 @@ use crate::{ core::{object_id::ObjectId, signatures::verify_signature}, data::Data, - traits::{ActivityHandler, ApubObject}, + traits::{ActivityHandler, Actor, ApubObject}, utils::{verify_domains_match, verify_url_valid}, Error, LocalInstance, @@ -10,13 +10,8 @@ use actix_web::{HttpRequest, HttpResponse}; use serde::de::DeserializeOwned; use tracing::log::debug; -pub trait ActorPublicKey { - /// Returns the actor's public key for verification of HTTP signatures - fn public_key(&self) -> &str; -} - /// Receive an activity and perform some basic checks, including HTTP signature verification. -pub async fn receive_activity( +pub async fn receive_activity( request: HttpRequest, activity: Activity, local_instance: &LocalInstance, @@ -24,11 +19,11 @@ pub async fn receive_activity( ) -> Result::Error> where Activity: ActivityHandler + DeserializeOwned + Send + 'static, - Actor: ApubObject + ActorPublicKey + Send + 'static, - for<'de2> ::ApubType: serde::Deserialize<'de2>, + ActorT: ApubObject + Actor + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, ::Error: - From + From + From<::Error>, - ::Error: From + From, + From + From + From<::Error>, + ::Error: From + From, { verify_domains_match(activity.id(), activity.actor())?; verify_url_valid(activity.id(), &local_instance.settings)?; @@ -37,7 +32,7 @@ where } let request_counter = &mut 0; - let actor = ObjectId::::new(activity.actor().clone()) + let actor = ObjectId::::new(activity.actor().clone()) .dereference(data, local_instance, request_counter) .await?; verify_signature(&request, actor.public_key())?; diff --git a/src/traits.rs b/src/traits.rs index dec0181..3b579bc 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -89,3 +89,11 @@ pub trait ApubObject { where Self: Sized; } + +pub trait Actor: ApubObject { + /// Returns the actor's public key for verification of HTTP signatures + fn public_key(&self) -> &str; + + /// The inbox or shared inbox where activities for this user should be sent to + fn inbox(&self) -> Url; +}