From 3c2f105868f780055656ea46aecf3a5f4237494d Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 29 Feb 2024 19:50:55 +0100 Subject: [PATCH] deduplicate --- src/activity_queue.rs | 50 ++++++++++++++++--------------- src/activity_sending.rs | 65 +++++++++++++++++++++++------------------ 2 files changed, 63 insertions(+), 52 deletions(-) diff --git a/src/activity_queue.rs b/src/activity_queue.rs index d46c26f..10a0fa1 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -3,7 +3,12 @@ #![doc = include_str!("../docs/09_sending_activities.md")] use crate::{ - activity_sending::{generate_request_headers, get_pkey_cached}, + activity_sending::{ + filter_inboxes, + generate_request_headers, + get_pkey_cached, + serialize_activity, + }, config::Data, error::Error, http_signatures::sign_request, @@ -11,6 +16,7 @@ use crate::{ traits::{ActivityHandler, Actor}, }; use bytes::Bytes; +use futures::StreamExt; use futures_core::Future; use itertools::Itertools; use openssl::pkey::{PKey, Private}; @@ -31,7 +37,6 @@ use tokio::{ }; use tracing::{debug, info, warn}; use url::Url; -use crate::activity_sending::serialize_activity; /// Send a new activity to the given inboxes with automatic retry on failure. Alternatively you /// can implement your own queue and then send activities using [[crate::activity_sending::SendActivityTask]]. @@ -59,35 +64,33 @@ where let activity_serialized = serialize_activity(&activity)?; let private_key = get_pkey_cached(data, actor).await?; - let inboxes: Vec = inboxes - .into_iter() - .unique() - .filter(|i| !config.is_local_url(i)) - .collect(); // This field is only optional to make builder work, its always present at this point let activity_queue = config .activity_queue .as_ref() .expect("Config has activity queue"); - for inbox in inboxes { - if let Err(err) = config.verify_url_valid(&inbox).await { - debug!("inbox url invalid, skipping: {inbox}: {err}"); - continue; - } - let message = SendActivityTask { - actor_id: actor_id.clone(), - activity_id: activity_id.clone(), - inbox, - activity: activity_serialized.clone(), - private_key: private_key.clone(), - http_signature_compat: config.http_signature_compat, - }; + let tasks = futures::stream::iter(inboxes.into_iter().unique()) + .filter_map(|inbox| async { + filter_inboxes(&inbox, config) + .await + .then(|| SendActivityTask { + actor_id: actor_id.clone(), + activity_id: activity_id.clone(), + inbox, + activity: activity_serialized.clone(), + private_key: private_key.clone(), + http_signature_compat: config.http_signature_compat, + }) + }) + .collect::>() + .await; + for task in tasks { // Don't use the activity queue if this is in debug mode, send and wait directly if config.debug { if let Err(err) = sign_and_send( - &message, + &task, &config.client, config.request_timeout, Default::default(), @@ -97,7 +100,7 @@ where warn!("{err}"); } } else { - activity_queue.queue(message).await?; + activity_queue.queue(task).await?; let stats = activity_queue.get_stats(); let running = stats.running.load(Ordering::Relaxed); if running == config.queue_worker_count && config.queue_worker_count != 0 { @@ -108,7 +111,6 @@ where } } } - Ok(()) } @@ -519,7 +521,7 @@ mod tests { use axum::extract::State; use bytes::Bytes; use http::StatusCode; - use std::time::Instant; + use std::time::Instant;use http::HeaderMap; use crate::http_signatures::generate_actor_keypair; diff --git a/src/activity_sending.rs b/src/activity_sending.rs index a621168..9940a96 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -10,11 +10,11 @@ use crate::{ traits::{ActivityHandler, Actor}, FEDERATION_CONTENT_TYPE, }; - +use crate::config::FederationConfig; use bytes::Bytes; -use futures::StreamExt; +use futures::{StreamExt}; use httpdate::fmt_http_date; -use itertools::Itertools; +use itertools::{Itertools}; use openssl::pkey::{PKey, Private}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::Serialize; @@ -67,28 +67,21 @@ impl SendActivityTask<'_> { let activity_serialized = serialize_activity(activity)?; let private_key = get_pkey_cached(data, actor).await?; - Ok(futures::stream::iter( - inboxes - .into_iter() - .unique() - .filter(|i| !config.is_local_url(i)), - ) - .filter_map(|inbox| async { - if let Err(err) = config.verify_url_valid(&inbox).await { - debug!("inbox url invalid, skipping: {inbox}: {err}"); - return None; - }; - Some(SendActivityTask { - actor_id, - activity_id, - inbox, - activity: activity_serialized.clone(), - private_key: private_key.clone(), - http_signature_compat: config.http_signature_compat, + Ok(futures::stream::iter(inboxes.into_iter().unique()) + .filter_map(|inbox| async { + filter_inboxes(&inbox, config) + .await + .then(|| SendActivityTask { + actor_id, + activity_id, + inbox, + activity: activity_serialized.clone(), + private_key: private_key.clone(), + http_signature_compat: config.http_signature_compat, + }) }) - }) - .collect() - .await) + .collect() + .await) } /// convert a sendactivitydata to a request, signing and sending it @@ -130,10 +123,26 @@ impl SendActivityTask<'_> { } } -pub(crate) fn serialize_activity(activity: &Activity) -> Result { -Ok(serde_json::to_vec(activity) -.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))? -.into()) +pub(crate) fn serialize_activity( + activity: &Activity, +) -> Result { + Ok(serde_json::to_vec(activity) + .map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))? + .into()) +} + +pub(crate) async fn filter_inboxes( + inbox: &Url, + config: &FederationConfig, +) -> bool { + if config.is_local_url(inbox) { + false + } else if let Err(e) = config.verify_url_valid(inbox).await { + debug!("inbox url invalid, skipping: {inbox}: {e}"); + false + } else { + true + } } pub(crate) async fn get_pkey_cached(