deduplicate

This commit is contained in:
Felix Ableitner 2024-02-29 19:50:55 +01:00
parent 502f9f63d7
commit 3c2f105868
2 changed files with 63 additions and 52 deletions

View file

@ -3,7 +3,12 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{ use crate::{
activity_sending::{generate_request_headers, get_pkey_cached}, activity_sending::{
filter_inboxes,
generate_request_headers,
get_pkey_cached,
serialize_activity,
},
config::Data, config::Data,
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
@ -11,6 +16,7 @@ use crate::{
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::StreamExt;
use futures_core::Future; use futures_core::Future;
use itertools::Itertools; use itertools::Itertools;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
@ -31,7 +37,6 @@ use tokio::{
}; };
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use url::Url; use url::Url;
use crate::activity_sending::serialize_activity;
/// Send a new activity to the given inboxes with automatic retry on failure. Alternatively you /// Send a new activity to the given inboxes with automatic retry on failure. Alternatively you
/// can implement your own queue and then send activities using [[crate::activity_sending::SendActivityTask]]. /// can implement your own queue and then send activities using [[crate::activity_sending::SendActivityTask]].
@ -59,35 +64,33 @@ where
let activity_serialized = serialize_activity(&activity)?; let activity_serialized = serialize_activity(&activity)?;
let private_key = get_pkey_cached(data, actor).await?; let private_key = get_pkey_cached(data, actor).await?;
let inboxes: Vec<Url> = inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i))
.collect();
// This field is only optional to make builder work, its always present at this point // This field is only optional to make builder work, its always present at this point
let activity_queue = config let activity_queue = config
.activity_queue .activity_queue
.as_ref() .as_ref()
.expect("Config has activity queue"); .expect("Config has activity queue");
for inbox in inboxes {
if let Err(err) = config.verify_url_valid(&inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {err}");
continue;
}
let message = SendActivityTask { let tasks = futures::stream::iter(inboxes.into_iter().unique())
actor_id: actor_id.clone(), .filter_map(|inbox| async {
activity_id: activity_id.clone(), filter_inboxes(&inbox, config)
inbox, .await
activity: activity_serialized.clone(), .then(|| SendActivityTask {
private_key: private_key.clone(), actor_id: actor_id.clone(),
http_signature_compat: config.http_signature_compat, activity_id: activity_id.clone(),
}; inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect::<Vec<_>>()
.await;
for task in tasks {
// Don't use the activity queue if this is in debug mode, send and wait directly // Don't use the activity queue if this is in debug mode, send and wait directly
if config.debug { if config.debug {
if let Err(err) = sign_and_send( if let Err(err) = sign_and_send(
&message, &task,
&config.client, &config.client,
config.request_timeout, config.request_timeout,
Default::default(), Default::default(),
@ -97,7 +100,7 @@ where
warn!("{err}"); warn!("{err}");
} }
} else { } else {
activity_queue.queue(message).await?; activity_queue.queue(task).await?;
let stats = activity_queue.get_stats(); let stats = activity_queue.get_stats();
let running = stats.running.load(Ordering::Relaxed); let running = stats.running.load(Ordering::Relaxed);
if running == config.queue_worker_count && config.queue_worker_count != 0 { if running == config.queue_worker_count && config.queue_worker_count != 0 {
@ -108,7 +111,6 @@ where
} }
} }
} }
Ok(()) Ok(())
} }
@ -519,7 +521,7 @@ mod tests {
use axum::extract::State; use axum::extract::State;
use bytes::Bytes; use bytes::Bytes;
use http::StatusCode; use http::StatusCode;
use std::time::Instant; use std::time::Instant;use http::HeaderMap;
use crate::http_signatures::generate_actor_keypair; use crate::http_signatures::generate_actor_keypair;

View file

@ -10,11 +10,11 @@ use crate::{
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
FEDERATION_CONTENT_TYPE, FEDERATION_CONTENT_TYPE,
}; };
use crate::config::FederationConfig;
use bytes::Bytes; use bytes::Bytes;
use futures::StreamExt; use futures::{StreamExt};
use httpdate::fmt_http_date; use httpdate::fmt_http_date;
use itertools::Itertools; use itertools::{Itertools};
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::Serialize; use serde::Serialize;
@ -67,28 +67,21 @@ impl SendActivityTask<'_> {
let activity_serialized = serialize_activity(activity)?; let activity_serialized = serialize_activity(activity)?;
let private_key = get_pkey_cached(data, actor).await?; let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter( Ok(futures::stream::iter(inboxes.into_iter().unique())
inboxes .filter_map(|inbox| async {
.into_iter() filter_inboxes(&inbox, config)
.unique() .await
.filter(|i| !config.is_local_url(i)), .then(|| SendActivityTask {
) actor_id,
.filter_map(|inbox| async { activity_id,
if let Err(err) = config.verify_url_valid(&inbox).await { inbox,
debug!("inbox url invalid, skipping: {inbox}: {err}"); activity: activity_serialized.clone(),
return None; private_key: private_key.clone(),
}; http_signature_compat: config.http_signature_compat,
Some(SendActivityTask { })
actor_id,
activity_id,
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
}) })
}) .collect()
.collect() .await)
.await)
} }
/// convert a sendactivitydata to a request, signing and sending it /// convert a sendactivitydata to a request, signing and sending it
@ -130,10 +123,26 @@ impl SendActivityTask<'_> {
} }
} }
pub(crate) fn serialize_activity<Activity: Serialize + Debug>(activity: &Activity) -> Result<Bytes, Error> { pub(crate) fn serialize_activity<Activity: Serialize + Debug>(
Ok(serde_json::to_vec(activity) activity: &Activity,
.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))? ) -> Result<Bytes, Error> {
.into()) Ok(serde_json::to_vec(activity)
.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))?
.into())
}
pub(crate) async fn filter_inboxes<Data: Clone>(
inbox: &Url,
config: &FederationConfig<Data>,
) -> bool {
if config.is_local_url(inbox) {
false
} else if let Err(e) = config.verify_url_valid(inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {e}");
false
} else {
true
}
} }
pub(crate) async fn get_pkey_cached<ActorType>( pub(crate) async fn get_pkey_cached<ActorType>(