Keep actor IDs when constructing OutgoingActivity

This commit is contained in:
silverpill 2022-12-31 17:14:36 +00:00
parent 1d234bd679
commit b392d9164b

View file

@ -107,11 +107,17 @@ fn backoff(retry_count: u32) -> Duration {
Duration::from_secs(3 * 10_u64.pow(retry_count)) Duration::from_secs(3 * 10_u64.pow(retry_count))
} }
#[allow(dead_code)]
struct Recipient {
id: String,
inbox: String,
}
async fn deliver_activity_worker( async fn deliver_activity_worker(
instance: Instance, instance: Instance,
sender: User, sender: User,
activity: Value, activity: Value,
inboxes: Vec<String>, recipients: Vec<Recipient>,
) -> Result<(), DelivererError> { ) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?; let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_key_id = format!( let actor_key_id = format!(
@ -128,29 +134,32 @@ async fn deliver_activity_worker(
} else { } else {
sign_object(&activity, &actor_key, &actor_key_id)? sign_object(&activity, &actor_key, &actor_key_id)?
}; };
let activity_json = serde_json::to_string(&activity_signed)?; let activity_json = serde_json::to_string(&activity_signed)?;
if inboxes.is_empty() {
if recipients.is_empty() {
return Ok(()); return Ok(());
}; };
log::info!("sending activity to {} inboxes: {}", inboxes.len(), activity_json); let mut queue: Vec<_> = recipients.into_iter()
let mut queue: BTreeMap<String, bool> = BTreeMap::new();
for inbox in inboxes {
// is_delivered: false // is_delivered: false
queue.insert(inbox, false); .map(|recipient| (recipient, false))
}; .collect();
log::info!(
"sending activity to {} inboxes: {}",
queue.len(),
activity_json,
);
let mut retry_count = 0; let mut retry_count = 0;
let max_retries = 2; let max_retries = 2;
while queue.values().any(|is_delivered| !is_delivered) && while queue.iter().any(|(_, is_delivered)| !is_delivered) &&
retry_count <= max_retries retry_count <= max_retries
{ {
if retry_count > 0 { if retry_count > 0 {
// Wait before next attempt // Wait before next attempt
sleep(backoff(retry_count)).await; sleep(backoff(retry_count)).await;
}; };
for (inbox_url, is_delivered) in queue.iter_mut() { for (recipient, is_delivered) in queue.iter_mut() {
if *is_delivered { if *is_delivered {
continue; continue;
}; };
@ -159,11 +168,11 @@ async fn deliver_activity_worker(
&actor_key, &actor_key,
&actor_key_id, &actor_key_id,
&activity_json, &activity_json,
inbox_url, &recipient.inbox,
).await { ).await {
log::error!( log::error!(
"failed to deliver activity to {} (attempt #{}): {}", "failed to deliver activity to {} (attempt #{}): {}",
inbox_url, recipient.inbox,
retry_count + 1, retry_count + 1,
error, error,
); );
@ -180,7 +189,7 @@ pub struct OutgoingActivity {
instance: Instance, instance: Instance,
sender: User, sender: User,
pub activity: Value, pub activity: Value,
inboxes: Vec<String>, recipients: Vec<Recipient>,
} }
impl OutgoingActivity { impl OutgoingActivity {
@ -190,16 +199,23 @@ impl OutgoingActivity {
activity: impl Serialize, activity: impl Serialize,
recipients: Vec<Actor>, recipients: Vec<Actor>,
) -> Self { ) -> Self {
let mut inboxes: Vec<String> = recipients.into_iter() // Sort and de-duplicate recipients
.map(|actor| actor.inbox).collect(); let mut recipient_map = BTreeMap::new();
inboxes.sort(); for actor in recipients {
inboxes.dedup(); if !recipient_map.contains_key(&actor.id) {
let recipient = Recipient {
id: actor.id.clone(),
inbox: actor.inbox,
};
recipient_map.insert(actor.id, recipient);
};
};
Self { Self {
instance: instance.clone(), instance: instance.clone(),
sender: sender.clone(), sender: sender.clone(),
activity: serde_json::to_value(activity) activity: serde_json::to_value(activity)
.expect("activity should be serializable"), .expect("activity should be serializable"),
inboxes, recipients: recipient_map.into_values().collect(),
} }
} }
@ -208,7 +224,7 @@ impl OutgoingActivity {
self.instance, self.instance,
self.sender, self.sender,
self.activity, self.activity,
self.inboxes, self.recipients,
).await ).await
} }