Write list of unreachable instances to log after finishing delivery

This commit is contained in:
silverpill 2022-10-17 23:29:04 +00:00
parent 09d025d461
commit eef20f6b7c

View file

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::time::Duration; use std::time::Duration;
use actix_web::http::Method; use actix_web::http::Method;
@ -9,6 +10,7 @@ use crate::config::Instance;
use crate::http_signatures::create::{create_http_signature, SignatureError}; use crate::http_signatures::create::{create_http_signature, SignatureError};
use crate::models::users::types::User; use crate::models::users::types::User;
use crate::utils::crypto::deserialize_private_key; use crate::utils::crypto::deserialize_private_key;
use crate::utils::urls::get_hostname;
use super::actors::types::Actor; use super::actors::types::Actor;
use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX}; use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX};
use super::identifiers::local_actor_id; use super::identifiers::local_actor_id;
@ -84,6 +86,7 @@ fn backoff(retry_count: u32) -> Duration {
Duration::from_secs(3 * 10_u64.pow(retry_count)) Duration::from_secs(3 * 10_u64.pow(retry_count))
} }
#[allow(clippy::bool_comparison)]
async fn deliver_activity_worker( async fn deliver_activity_worker(
instance: Instance, instance: Instance,
sender: User, sender: User,
@ -103,28 +106,28 @@ async fn deliver_activity_worker(
if recipients.is_empty() { if recipients.is_empty() {
return Ok(()); return Ok(());
}; };
let mut inboxes: Vec<String> = recipients.into_iter() let mut inboxes: HashMap<String, bool> = HashMap::new();
.map(|actor| actor.inbox) for recipient in recipients {
.collect(); inboxes.insert(recipient.inbox, false);
inboxes.sort(); };
inboxes.dedup();
log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json); log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json);
let mut retry_count = 0; let mut retry_count = 0;
let max_retries = 2; let max_retries = 2;
while !inboxes.is_empty() && retry_count <= max_retries {
while inboxes.values().any(|res| *res == false) && retry_count <= max_retries {
if retry_count > 0 { if retry_count > 0 {
// Wait before next attempt // Wait before next attempt
sleep(backoff(retry_count)).await; sleep(backoff(retry_count)).await;
}; };
let mut failed = vec![]; let queue = inboxes.iter_mut().filter(|(_, res)| **res == false);
for inbox_url in inboxes { for (inbox_url, result) in queue {
if let Err(error) = send_activity( if let Err(error) = send_activity(
&instance, &instance,
&actor_key, &actor_key,
&actor_key_id, &actor_key_id,
&activity_json, &activity_json,
&inbox_url, inbox_url,
).await { ).await {
log::error!( log::error!(
"failed to deliver activity to {} (attempt #{}): {}", "failed to deliver activity to {} (attempt #{}): {}",
@ -132,12 +135,28 @@ async fn deliver_activity_worker(
retry_count + 1, retry_count + 1,
error, error,
); );
failed.push(inbox_url); } else {
*result = true;
}; };
}; };
inboxes = failed;
retry_count += 1; retry_count += 1;
}; };
let mut instances: HashMap<String, bool> = HashMap::new();
for (inbox_url, result) in inboxes {
let hostname = get_hostname(&inbox_url).unwrap_or(inbox_url);
if !instances.contains_key(&hostname) || result == true {
// If flag is not set and result is false, set flag to true
// If result is true, set flag to false
instances.insert(hostname, !result);
};
};
for (hostname, is_unreachable) in instances {
if is_unreachable {
log::info!("unreachable instance: {}", hostname);
};
};
Ok(()) Ok(())
} }