diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 47554e0..21401ee 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -132,6 +132,8 @@ fn backoff(retry_count: u32) -> Duration { pub struct Recipient { id: String, inbox: String, + #[serde(default)] + is_delivered: bool, // default to false if serialized data contains no value } async fn deliver_activity_worker( @@ -139,7 +141,7 @@ async fn deliver_activity_worker( instance: Instance, sender: User, activity: Value, - recipients: Vec, + mut recipients: Vec, ) -> Result<(), DelivererError> { let actor_key = deserialize_private_key(&sender.private_key)?; let actor_id = local_actor_id( @@ -155,31 +157,18 @@ async fn deliver_activity_worker( }; let activity_json = serde_json::to_string(&activity_signed)?; - if recipients.is_empty() { - return Ok(()); - }; - let mut queue: Vec<_> = recipients.into_iter() - // is_delivered: false - .map(|recipient| (recipient, false)) - .collect(); - log::info!( - "sending activity to {} inboxes: {}", - queue.len(), - activity_json, - ); - let mut retry_count = 0; let max_retries = 2; - while queue.iter().any(|(_, is_delivered)| !is_delivered) && + while recipients.iter().any(|recipient| !recipient.is_delivered) && retry_count <= max_retries { if retry_count > 0 { // Wait before next attempt sleep(backoff(retry_count)).await; }; - for (recipient, is_delivered) in queue.iter_mut() { - if *is_delivered { + for recipient in recipients.iter_mut() { + if recipient.is_delivered { continue; }; if let Err(error) = send_activity( @@ -196,7 +185,7 @@ async fn deliver_activity_worker( error, ); } else { - *is_delivered = true; + recipient.is_delivered = true; }; }; retry_count += 1; @@ -205,11 +194,11 @@ async fn deliver_activity_worker( if let Some(ref db_pool) = maybe_db_pool { // Get connection from pool only after finishing delivery let db_client = &**get_database_client(db_pool).await?; - for (recipient, is_delivered) in queue { + for recipient in recipients { set_reachability_status( db_client, &recipient.id, - is_delivered, + recipient.is_delivered, ).await?; }; }; @@ -238,6 +227,7 @@ impl OutgoingActivity { let recipient = Recipient { id: actor.id.clone(), inbox: actor.inbox, + is_delivered: false, }; recipient_map.insert(actor.id, recipient); }; @@ -276,6 +266,14 @@ impl OutgoingActivity { self, db_client: &impl DatabaseClient, ) -> Result<(), DatabaseError> { + if self.recipients.is_empty() { + return Ok(()); + }; + log::info!( + "delivering activity to {} inboxes: {}", + self.recipients.len(), + self.activity, + ); let job_data = OutgoingActivityJobData { activity: self.activity, sender_id: self.sender.id,