Refactor deliver_activity_worker() and prepare for queue serialization

This commit is contained in:
silverpill 2022-12-13 20:34:51 +00:00
parent fcab5b000a
commit d2e2b684e3

View file

@ -108,12 +108,11 @@ fn backoff(retry_count: u32) -> Duration {
Duration::from_secs(3 * 10_u64.pow(retry_count)) Duration::from_secs(3 * 10_u64.pow(retry_count))
} }
#[allow(clippy::bool_comparison)]
async fn deliver_activity_worker( async fn deliver_activity_worker(
instance: Instance, instance: Instance,
sender: User, sender: User,
activity: Value, activity: Value,
recipients: Vec<Actor>, inboxes: Vec<String>,
) -> Result<(), DelivererError> { ) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?; let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_key_id = format!( let actor_key_id = format!(
@ -132,25 +131,30 @@ async fn deliver_activity_worker(
}; };
let activity_json = serde_json::to_string(&activity_signed)?; let activity_json = serde_json::to_string(&activity_signed)?;
if recipients.is_empty() { if inboxes.is_empty() {
return Ok(()); return Ok(());
}; };
let mut inboxes: BTreeMap<String, bool> = BTreeMap::new();
for recipient in recipients {
inboxes.insert(recipient.inbox, false);
};
log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json); log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json);
let mut queue: BTreeMap<String, bool> = BTreeMap::new();
for inbox in inboxes {
// is_delivered: false
queue.insert(inbox, false);
};
let mut retry_count = 0; let mut retry_count = 0;
let max_retries = 2; let max_retries = 2;
while inboxes.values().any(|res| *res == false) && retry_count <= max_retries { while queue.values().any(|is_delivered| !is_delivered) &&
retry_count <= max_retries
{
if retry_count > 0 { if retry_count > 0 {
// Wait before next attempt // Wait before next attempt
sleep(backoff(retry_count)).await; sleep(backoff(retry_count)).await;
}; };
let queue = inboxes.iter_mut().filter(|(_, res)| **res == false); for (inbox_url, is_delivered) in queue.iter_mut() {
for (inbox_url, result) in queue { if *is_delivered {
continue;
};
if let Err(error) = send_activity( if let Err(error) = send_activity(
&instance, &instance,
&actor_key, &actor_key,
@ -165,19 +169,20 @@ async fn deliver_activity_worker(
error, error,
); );
} else { } else {
*result = true; *is_delivered = true;
}; };
}; };
retry_count += 1; retry_count += 1;
}; };
// Generate report
let mut instances: HashMap<String, bool> = HashMap::new(); let mut instances: HashMap<String, bool> = HashMap::new();
for (inbox_url, result) in inboxes { for (inbox_url, is_delivered) in queue {
let hostname = get_hostname(&inbox_url).unwrap_or(inbox_url); let hostname = get_hostname(&inbox_url).unwrap_or(inbox_url);
if !instances.contains_key(&hostname) || result == true { if !instances.contains_key(&hostname) || is_delivered {
// If flag is not set and result is false, set flag to true // If flag is not set and result is false, set flag to true
// If result is true, set flag to false // If result is true, set flag to false
instances.insert(hostname, !result); instances.insert(hostname, !is_delivered);
}; };
}; };
for (hostname, is_unreachable) in instances { for (hostname, is_unreachable) in instances {
@ -190,10 +195,10 @@ async fn deliver_activity_worker(
} }
pub struct OutgoingActivity { pub struct OutgoingActivity {
pub instance: Instance, instance: Instance,
pub sender: User, sender: User,
pub activity: Value, pub activity: Value,
pub recipients: Vec<Actor>, inboxes: Vec<String>,
} }
impl OutgoingActivity { impl OutgoingActivity {
@ -203,12 +208,14 @@ impl OutgoingActivity {
activity: impl Serialize, activity: impl Serialize,
recipients: Vec<Actor>, recipients: Vec<Actor>,
) -> Self { ) -> Self {
let inboxes = recipients.into_iter()
.map(|actor| actor.inbox).collect();
Self { Self {
instance: instance.clone(), instance: instance.clone(),
sender: sender.clone(), sender: sender.clone(),
activity: serde_json::to_value(activity) activity: serde_json::to_value(activity)
.expect("activity should be serializable"), .expect("activity should be serializable"),
recipients, inboxes,
} }
} }
@ -217,7 +224,7 @@ impl OutgoingActivity {
self.instance, self.instance,
self.sender, self.sender,
self.activity, self.activity,
self.recipients, self.inboxes,
).await ).await
} }