From eef20f6b7cc08997b49356b7c15f928a2cdf8b72 Mon Sep 17 00:00:00 2001 From: silverpill Date: Mon, 17 Oct 2022 23:29:04 +0000 Subject: [PATCH] Write list of unreachable instances to log after finishing delivery --- src/activitypub/deliverer.rs | 41 ++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 87c857c..df5769d 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::time::Duration; use actix_web::http::Method; @@ -9,6 +10,7 @@ use crate::config::Instance; use crate::http_signatures::create::{create_http_signature, SignatureError}; use crate::models::users::types::User; use crate::utils::crypto::deserialize_private_key; +use crate::utils::urls::get_hostname; use super::actors::types::Actor; use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX}; use super::identifiers::local_actor_id; @@ -84,6 +86,7 @@ fn backoff(retry_count: u32) -> Duration { Duration::from_secs(3 * 10_u64.pow(retry_count)) } +#[allow(clippy::bool_comparison)] async fn deliver_activity_worker( instance: Instance, sender: User, @@ -103,28 +106,28 @@ async fn deliver_activity_worker( if recipients.is_empty() { return Ok(()); }; - let mut inboxes: Vec = recipients.into_iter() - .map(|actor| actor.inbox) - .collect(); - inboxes.sort(); - inboxes.dedup(); + let mut inboxes: HashMap = HashMap::new(); + for recipient in recipients { + inboxes.insert(recipient.inbox, false); + }; log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json); let mut retry_count = 0; let max_retries = 2; - while !inboxes.is_empty() && retry_count <= max_retries { + + while inboxes.values().any(|res| *res == false) && retry_count <= max_retries { if retry_count > 0 { // Wait before next attempt sleep(backoff(retry_count)).await; }; - let mut failed = vec![]; - for inbox_url in inboxes { + let queue = inboxes.iter_mut().filter(|(_, res)| **res == false); + for (inbox_url, result) in queue { if let Err(error) = send_activity( &instance, &actor_key, &actor_key_id, &activity_json, - &inbox_url, + inbox_url, ).await { log::error!( "failed to deliver activity to {} (attempt #{}): {}", @@ -132,12 +135,28 @@ async fn deliver_activity_worker( retry_count + 1, error, ); - failed.push(inbox_url); + } else { + *result = true; }; }; - inboxes = failed; retry_count += 1; }; + + let mut instances: HashMap = HashMap::new(); + for (inbox_url, result) in inboxes { + let hostname = get_hostname(&inbox_url).unwrap_or(inbox_url); + if !instances.contains_key(&hostname) || result == true { + // If flag is not set and result is false, set flag to true + // If result is true, set flag to false + instances.insert(hostname, !result); + }; + }; + for (hostname, is_unreachable) in instances { + if is_unreachable { + log::info!("unreachable instance: {}", hostname); + }; + }; + Ok(()) }