Store delivery statuses in delivery job data

Preparing for migration to a new delivery queue mechanism.
This commit is contained in:
silverpill 2023-02-25 00:13:07 +00:00
parent 0f3c247069
commit ca2e541ff5

View file

@ -132,6 +132,8 @@ fn backoff(retry_count: u32) -> Duration {
pub struct Recipient { pub struct Recipient {
id: String, id: String,
inbox: String, inbox: String,
#[serde(default)]
is_delivered: bool, // default to false if serialized data contains no value
} }
async fn deliver_activity_worker( async fn deliver_activity_worker(
@ -139,7 +141,7 @@ async fn deliver_activity_worker(
instance: Instance, instance: Instance,
sender: User, sender: User,
activity: Value, activity: Value,
recipients: Vec<Recipient>, mut recipients: Vec<Recipient>,
) -> Result<(), DelivererError> { ) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?; let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_id = local_actor_id( let actor_id = local_actor_id(
@ -155,31 +157,18 @@ async fn deliver_activity_worker(
}; };
let activity_json = serde_json::to_string(&activity_signed)?; let activity_json = serde_json::to_string(&activity_signed)?;
if recipients.is_empty() {
return Ok(());
};
let mut queue: Vec<_> = recipients.into_iter()
// is_delivered: false
.map(|recipient| (recipient, false))
.collect();
log::info!(
"sending activity to {} inboxes: {}",
queue.len(),
activity_json,
);
let mut retry_count = 0; let mut retry_count = 0;
let max_retries = 2; let max_retries = 2;
while queue.iter().any(|(_, is_delivered)| !is_delivered) && while recipients.iter().any(|recipient| !recipient.is_delivered) &&
retry_count <= max_retries retry_count <= max_retries
{ {
if retry_count > 0 { if retry_count > 0 {
// Wait before next attempt // Wait before next attempt
sleep(backoff(retry_count)).await; sleep(backoff(retry_count)).await;
}; };
for (recipient, is_delivered) in queue.iter_mut() { for recipient in recipients.iter_mut() {
if *is_delivered { if recipient.is_delivered {
continue; continue;
}; };
if let Err(error) = send_activity( if let Err(error) = send_activity(
@ -196,7 +185,7 @@ async fn deliver_activity_worker(
error, error,
); );
} else { } else {
*is_delivered = true; recipient.is_delivered = true;
}; };
}; };
retry_count += 1; retry_count += 1;
@ -205,11 +194,11 @@ async fn deliver_activity_worker(
if let Some(ref db_pool) = maybe_db_pool { if let Some(ref db_pool) = maybe_db_pool {
// Get connection from pool only after finishing delivery // Get connection from pool only after finishing delivery
let db_client = &**get_database_client(db_pool).await?; let db_client = &**get_database_client(db_pool).await?;
for (recipient, is_delivered) in queue { for recipient in recipients {
set_reachability_status( set_reachability_status(
db_client, db_client,
&recipient.id, &recipient.id,
is_delivered, recipient.is_delivered,
).await?; ).await?;
}; };
}; };
@ -238,6 +227,7 @@ impl OutgoingActivity {
let recipient = Recipient { let recipient = Recipient {
id: actor.id.clone(), id: actor.id.clone(),
inbox: actor.inbox, inbox: actor.inbox,
is_delivered: false,
}; };
recipient_map.insert(actor.id, recipient); recipient_map.insert(actor.id, recipient);
}; };
@ -276,6 +266,14 @@ impl OutgoingActivity {
self, self,
db_client: &impl DatabaseClient, db_client: &impl DatabaseClient,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
if self.recipients.is_empty() {
return Ok(());
};
log::info!(
"delivering activity to {} inboxes: {}",
self.recipients.len(),
self.activity,
);
let job_data = OutgoingActivityJobData { let job_data = OutgoingActivityJobData {
activity: self.activity, activity: self.activity,
sender_id: self.sender.id, sender_id: self.sender.id,