more dedup

This commit is contained in:
Felix Ableitner 2024-03-01 10:44:10 +01:00
parent 40d0419a4a
commit 499f037b68
4 changed files with 64 additions and 102 deletions

View file

@ -26,7 +26,7 @@ let activity = Follow {
}; };
let inboxes = vec![recipient.shared_inbox_or_inbox()]; let inboxes = vec![recipient.shared_inbox_or_inbox()];
queue_activity(activity, &sender, inboxes, &data).await?; queue_activity(&activity, &sender, inboxes, &data).await?;
# Ok::<(), anyhow::Error>(()) # Ok::<(), anyhow::Error>(())
# }).unwrap() # }).unwrap()
``` ```

View file

@ -117,7 +117,7 @@ impl DbUser {
let activity = WithContext::new_default(activity); let activity = WithContext::new_default(activity);
// Send through queue in some cases and bypass it in others to test both code paths // Send through queue in some cases and bypass it in others to test both code paths
if use_queue { if use_queue {
queue_activity(activity, self, recipients, data).await?; queue_activity(&activity, self, recipients, data).await?;
} else { } else {
let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?;
for send in sends { for send in sends {

View file

@ -3,23 +3,15 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{ use crate::{
activity_sending::{ activity_sending::{build_tasks, generate_request_headers, send, SendActivityTask},
filter_inboxes,
generate_request_headers,
get_pkey_cached,
send,
serialize_activity,
},
config::Data, config::Data,
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use bytes::Bytes;
use futures::StreamExt;
use futures_core::Future; use futures_core::Future;
use itertools::Itertools;
use openssl::pkey::{PKey, Private};
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
@ -47,7 +39,7 @@ use url::Url;
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor. /// for each target actor.
pub async fn queue_activity<Activity, Datatype, ActorType>( pub async fn queue_activity<Activity, Datatype, ActorType>(
activity: Activity, activity: &Activity,
actor: &ActorType, actor: &ActorType,
inboxes: Vec<Url>, inboxes: Vec<Url>,
data: &Data<Datatype>, data: &Data<Datatype>,
@ -58,32 +50,7 @@ where
ActorType: Actor, ActorType: Actor,
{ {
let config = &data.config; let config = &data.config;
let actor_id = activity.actor(); let tasks = build_tasks(activity, actor, inboxes, data).await?;
let activity_id = activity.id();
let activity_serialized = serialize_activity(&activity)?;
let private_key = get_pkey_cached(data, actor).await?;
// This field is only optional to make builder work, its always present at this point
let activity_queue = config
.activity_queue
.as_ref()
.expect("Config has activity queue");
let tasks = futures::stream::iter(inboxes.into_iter().unique())
.filter_map(|inbox| async {
filter_inboxes(&inbox, config)
.await
.then(|| SendActivityTask {
actor_id: actor_id.clone(),
activity_id: activity_id.clone(),
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect::<Vec<_>>()
.await;
for task in tasks { for task in tasks {
// Don't use the activity queue if this is in debug mode, send and wait directly // Don't use the activity queue if this is in debug mode, send and wait directly
@ -99,6 +66,11 @@ where
warn!("{err}"); warn!("{err}");
} }
} else { } else {
// This field is only optional to make builder work, its always present at this point
let activity_queue = config
.activity_queue
.as_ref()
.expect("Config has activity queue");
activity_queue.queue(task).await?; activity_queue.queue(task).await?;
let stats = activity_queue.get_stats(); let stats = activity_queue.get_stats();
let running = stats.running.load(Ordering::Relaxed); let running = stats.running.load(Ordering::Relaxed);
@ -113,23 +85,6 @@ where
Ok(()) Ok(())
} }
// TODO: should use the existing struct but lifetimes are difficult
#[derive(Clone, Debug)]
pub(crate) struct SendActivityTask {
actor_id: Url,
activity_id: Url,
activity: Bytes,
inbox: Url,
private_key: PKey<Private>,
http_signature_compat: bool,
}
impl Display for SendActivityTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox)
}
}
async fn sign_and_send( async fn sign_and_send(
task: &SendActivityTask, task: &SendActivityTask,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,

View file

@ -3,7 +3,7 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{ use crate::{
config::{Data, FederationConfig}, config::Data,
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
reqwest_shim::ResponseExt, reqwest_shim::ResponseExt,
@ -32,60 +32,40 @@ use url::Url;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// All info needed to sign and send one activity to one inbox. You should generally use /// All info needed to sign and send one activity to one inbox. You should generally use
/// [[crate::activity_queue::queue_activity]] unless you want implement your own queue. /// [[crate::activity_queue::queue_activity]] unless you want implement your own queue.
pub struct SendActivityTask<'a> { pub struct SendActivityTask {
pub(crate) actor_id: &'a Url, pub(crate) actor_id: Url,
pub(crate) activity_id: &'a Url, pub(crate) activity_id: Url,
pub(crate) activity: Bytes, pub(crate) activity: Bytes,
pub(crate) inbox: Url, pub(crate) inbox: Url,
pub(crate) private_key: PKey<Private>, pub(crate) private_key: PKey<Private>,
pub(crate) http_signature_compat: bool, pub(crate) http_signature_compat: bool,
} }
impl Display for SendActivityTask<'_> { impl Display for SendActivityTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox) write!(f, "{} to {}", self.activity_id, self.inbox)
} }
} }
impl SendActivityTask<'_> { impl SendActivityTask {
/// Prepare an activity for sending /// Prepare an activity for sending
/// ///
/// - `activity`: The activity to be sent, gets converted to json /// - `activity`: The activity to be sent, gets converted to json
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor /// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor. /// for each target actor.
pub async fn prepare<'a, Activity, Datatype, ActorType>( pub async fn prepare<Activity, Datatype, ActorType>(
activity: &'a Activity, activity: &Activity,
actor: &ActorType, actor: &ActorType,
inboxes: Vec<Url>, inboxes: Vec<Url>,
data: &Data<Datatype>, data: &Data<Datatype>,
) -> Result<Vec<SendActivityTask<'a>>, Error> ) -> Result<Vec<SendActivityTask>, Error>
where where
Activity: ActivityHandler + Serialize + Debug, Activity: ActivityHandler + Serialize + Debug,
Datatype: Clone, Datatype: Clone,
ActorType: Actor, ActorType: Actor,
{ {
let config = &data.config; build_tasks(activity, actor, inboxes, data).await
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized = serialize_activity(activity)?;
let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter(inboxes.into_iter().unique())
.filter_map(|inbox| async {
filter_inboxes(&inbox, config)
.await
.then(|| SendActivityTask {
actor_id,
activity_id,
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect()
.await)
} }
/// convert a sendactivitydata to a request, signing and sending it /// convert a sendactivitydata to a request, signing and sending it
@ -97,7 +77,7 @@ impl SendActivityTask<'_> {
.headers(generate_request_headers(&self.inbox)); .headers(generate_request_headers(&self.inbox));
let request = sign_request( let request = sign_request(
request_builder, request_builder,
self.actor_id, &self.actor_id,
self.activity.clone(), self.activity.clone(),
self.private_key.clone(), self.private_key.clone(),
self.http_signature_compat, self.http_signature_compat,
@ -144,18 +124,45 @@ pub(crate) fn serialize_activity<Activity: Serialize + Debug>(
.into()) .into())
} }
pub(crate) async fn filter_inboxes<Data: Clone>( pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>(
inbox: &Url, activity: &'a Activity,
config: &FederationConfig<Data>, actor: &ActorType,
) -> bool { inboxes: Vec<Url>,
if config.is_local_url(inbox) { data: &Data<Datatype>,
false ) -> Result<Vec<SendActivityTask>, Error>
} else if let Err(e) = config.verify_url_valid(inbox).await { where
debug!("inbox url invalid, skipping: {inbox}: {e}"); Activity: ActivityHandler + Serialize + Debug,
false Datatype: Clone,
} else { ActorType: Actor,
true {
} let config = &data.config;
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized = serialize_activity(activity)?;
let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter(
inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i)),
)
.filter_map(|inbox| async {
if let Err(err) = config.verify_url_valid(&inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {err}");
return None;
};
Some(SendActivityTask {
actor_id: actor_id.clone(),
activity_id: activity_id.clone(),
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect()
.await)
} }
pub(crate) async fn get_pkey_cached<ActorType>( pub(crate) async fn get_pkey_cached<ActorType>(
@ -280,8 +287,8 @@ mod tests {
let keypair = generate_actor_keypair().unwrap(); let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask { let message = SendActivityTask {
actor_id: &"http://localhost:8001".parse().unwrap(), actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: &"http://localhost:8001/activity".parse().unwrap(), activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(), activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(), inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(), private_key: keypair.private_key().unwrap(),