Always put outgoing activities in a queue

This commit is contained in:
silverpill 2023-01-04 19:26:21 +00:00
parent 30857868a0
commit 5c9aa0f148
8 changed files with 38 additions and 52 deletions

View file

@ -208,6 +208,7 @@ async fn deliver_activity_worker(
} }
pub struct OutgoingActivity { pub struct OutgoingActivity {
pub db_pool: Option<DbPool>, // needed to track unreachable actors (optional)
pub instance: Instance, pub instance: Instance,
pub sender: User, pub sender: User,
pub activity: Value, pub activity: Value,
@ -233,6 +234,7 @@ impl OutgoingActivity {
}; };
}; };
Self { Self {
db_pool: None,
instance: instance.clone(), instance: instance.clone(),
sender: sender.clone(), sender: sender.clone(),
activity: serde_json::to_value(activity) activity: serde_json::to_value(activity)
@ -241,9 +243,11 @@ impl OutgoingActivity {
} }
} }
pub async fn deliver(self) -> Result<(), DelivererError> { pub async fn deliver(
self,
) -> Result<(), DelivererError> {
deliver_activity_worker( deliver_activity_worker(
None, self.db_pool,
self.instance, self.instance,
self.sender, self.sender,
self.activity, self.activity,
@ -251,30 +255,9 @@ impl OutgoingActivity {
).await ).await
} }
pub async fn deliver_or_log(self) -> () {
self.deliver().await.unwrap_or_else(|err| {
log::error!("{}", err);
});
}
pub fn spawn_deliver(self) -> () { pub fn spawn_deliver(self) -> () {
tokio::spawn(async move { tokio::spawn(async move {
self.deliver_or_log().await; self.deliver().await.unwrap_or_else(|err| {
});
}
pub fn spawn_deliver_with_tracking(
self,
db_pool: DbPool,
) -> () {
tokio::spawn(async move {
deliver_activity_worker(
Some(db_pool),
self.instance,
self.sender,
self.activity,
self.recipients,
).await.unwrap_or_else(|err| {
log::error!("{}", err); log::error!("{}", err);
}); });
}); });

View file

@ -69,7 +69,7 @@ pub async fn handle_follow(
&target_user, &target_user,
&source_actor, &source_actor,
&activity.id, &activity.id,
).spawn_deliver(); ).enqueue(db_client).await?;
Ok(Some(PERSON)) Ok(Some(PERSON))
} }

View file

@ -90,7 +90,6 @@ pub async fn handle_move(
}; };
let followers = get_followers(db_client, &old_profile.id).await?; let followers = get_followers(db_client, &old_profile.id).await?;
let mut activities = vec![];
for follower in followers { for follower in followers {
let follower = get_user_by_id(db_client, &follower.id).await?; let follower = get_user_by_id(db_client, &follower.id).await?;
// Unfollow old profile // Unfollow old profile
@ -103,12 +102,12 @@ pub async fn handle_move(
if let Some(ref old_actor) = old_profile.actor_json { if let Some(ref old_actor) = old_profile.actor_json {
let follow_request_id = maybe_follow_request_id let follow_request_id = maybe_follow_request_id
.expect("follow request must exist"); .expect("follow request must exist");
activities.push(prepare_undo_follow( prepare_undo_follow(
&instance, &instance,
&follower, &follower,
old_actor, old_actor,
&follow_request_id, &follow_request_id,
)); ).enqueue(db_client).await?;
}; };
// Follow new profile // Follow new profile
match create_follow_request( match create_follow_request(
@ -117,12 +116,12 @@ pub async fn handle_move(
&new_profile.id, &new_profile.id,
).await { ).await {
Ok(follow_request) => { Ok(follow_request) => {
activities.push(prepare_follow( prepare_follow(
&instance, &instance,
&follower, &follower,
new_actor, new_actor,
&follow_request.id, &follow_request.id,
)); ).enqueue(db_client).await?;
}, },
Err(DatabaseError::AlreadyExists(_)) => (), // already following Err(DatabaseError::AlreadyExists(_)) => (), // already following
Err(other_error) => return Err(other_error.into()), Err(other_error) => return Err(other_error.into()),
@ -133,11 +132,6 @@ pub async fn handle_move(
&follower.id, &follower.id,
).await?; ).await?;
}; };
tokio::spawn(async move {
for activity in activities {
activity.deliver_or_log().await;
};
});
Ok(Some(PERSON)) Ok(Some(PERSON))
} }

View file

@ -142,12 +142,13 @@ pub async fn process_queued_outgoing_activities(
.map_err(|_| DatabaseTypeError)?; .map_err(|_| DatabaseTypeError)?;
let sender = get_user_by_id(db_client, &job_data.sender_id).await?; let sender = get_user_by_id(db_client, &job_data.sender_id).await?;
let outgoing_activity = OutgoingActivity { let outgoing_activity = OutgoingActivity {
db_pool: Some(db_pool.clone()),
instance: config.instance(), instance: config.instance(),
sender, sender,
activity: job_data.activity, activity: job_data.activity,
recipients: job_data.recipients, recipients: job_data.recipients,
}; };
outgoing_activity.spawn_deliver_with_tracking(db_pool.clone()); outgoing_activity.spawn_deliver();
delete_job_from_queue(db_client, &job.id).await?; delete_job_from_queue(db_client, &job.id).await?;
}; };
Ok(()) Ok(())

View file

@ -80,7 +80,7 @@ pub async fn send_subscription_notifications(
recipient, recipient,
remote_sender, remote_sender,
LocalActorCollection::Subscribers, LocalActorCollection::Subscribers,
).spawn_deliver(); ).enqueue(db_client).await?;
}; };
Ok(()) Ok(())
} }
@ -277,7 +277,7 @@ pub async fn update_expired_subscriptions(
&recipient, &recipient,
remote_sender, remote_sender,
LocalActorCollection::Subscribers, LocalActorCollection::Subscribers,
).spawn_deliver(); ).enqueue(db_client).await?;
} else { } else {
create_subscription_expiration_notification( create_subscription_expiration_notification(
db_client, db_client,

View file

@ -236,8 +236,11 @@ async fn update_credentials(
).await?; ).await?;
// Federate // Federate
prepare_update_person(db_client, &config.instance(), &current_user).await? prepare_update_person(
.spawn_deliver(); db_client,
&config.instance(),
&current_user,
).await?.enqueue(db_client).await?;
let account = Account::from_user(current_user, &config.instance_url()); let account = Account::from_user(current_user, &config.instance_url());
Ok(HttpResponse::Ok().json(account)) Ok(HttpResponse::Ok().json(account))
@ -313,13 +316,12 @@ async fn move_followers(
.expect("actor data must be present"); .expect("actor data must be present");
let follow_request_id = maybe_follow_request_id let follow_request_id = maybe_follow_request_id
.expect("follow request must exist"); .expect("follow request must exist");
// TODO: send in a batch
prepare_undo_follow( prepare_undo_follow(
&config.instance(), &config.instance(),
&current_user, &current_user,
remote_actor, remote_actor,
&follow_request_id, &follow_request_id,
).spawn_deliver(); ).enqueue(db_client).await?;
}, },
// Not a follower, ignore // Not a follower, ignore
Err(DatabaseError::NotFound(_)) => continue, Err(DatabaseError::NotFound(_)) => continue,
@ -420,7 +422,7 @@ async fn send_signed_activity(
add_integrity_proof(&mut outgoing_activity.activity, proof) add_integrity_proof(&mut outgoing_activity.activity, proof)
.map_err(|_| HttpError::InternalError)?; .map_err(|_| HttpError::InternalError)?;
outgoing_activity.spawn_deliver(); outgoing_activity.enqueue(db_client).await?;
let account = Account::from_user(current_user, &config.instance_url()); let account = Account::from_user(current_user, &config.instance_url());
Ok(HttpResponse::Ok().json(account)) Ok(HttpResponse::Ok().json(account))
@ -530,8 +532,11 @@ async fn create_identity_proof(
).await?; ).await?;
// Federate // Federate
prepare_update_person(db_client, &config.instance(), &current_user).await? prepare_update_person(
.spawn_deliver(); db_client,
&config.instance(),
&current_user,
).await?.enqueue(db_client).await?;
let account = Account::from_user(current_user, &config.instance_url()); let account = Account::from_user(current_user, &config.instance_url());
Ok(HttpResponse::Ok().json(account)) Ok(HttpResponse::Ok().json(account))
@ -633,7 +638,7 @@ async fn follow_account(
&current_user, &current_user,
&remote_actor, &remote_actor,
&follow_request.id, &follow_request.id,
).spawn_deliver(); ).enqueue(db_client).await?;
}, },
Err(DatabaseError::AlreadyExists(_)) => (), // already following Err(DatabaseError::AlreadyExists(_)) => (), // already following
Err(other_error) => return Err(other_error.into()), Err(other_error) => return Err(other_error.into()),
@ -683,7 +688,7 @@ async fn unfollow_account(
&current_user, &current_user,
&remote_actor, &remote_actor,
&follow_request_id, &follow_request_id,
).spawn_deliver(); ).enqueue(db_client).await?;
}, },
Ok(None) => (), // local follow Ok(None) => (), // local follow
Err(DatabaseError::NotFound(_)) => (), // not following Err(DatabaseError::NotFound(_)) => (), // not following

View file

@ -245,7 +245,7 @@ async fn delete_status(
tokio::spawn(async move { tokio::spawn(async move {
deletion_queue.process(&config).await; deletion_queue.process(&config).await;
}); });
delete_note.spawn_deliver(); delete_note.enqueue(db_client).await?;
Ok(HttpResponse::NoContent().finish()) Ok(HttpResponse::NoContent().finish())
} }
@ -349,7 +349,7 @@ async fn unfavourite(
&current_user, &current_user,
&post, &post,
&reaction_id, &reaction_id,
).await?.spawn_deliver(); ).await?.enqueue(db_client).await?;
}; };
let status = build_status( let status = build_status(
@ -422,7 +422,7 @@ async fn unreblog(
&current_user, &current_user,
&post, &post,
repost_id, repost_id,
).await?.spawn_deliver(); ).await?.enqueue(db_client).await?;
let status = build_status( let status = build_status(
db_client, db_client,

View file

@ -142,8 +142,11 @@ pub async fn register_subscription_option(
).await?; ).await?;
// Federate // Federate
prepare_update_person(db_client, &config.instance(), &current_user) prepare_update_person(
.await?.spawn_deliver(); db_client,
&config.instance(),
&current_user,
).await?.enqueue(db_client).await?;
}; };
let account = Account::from_user(current_user, &config.instance_url()); let account = Account::from_user(current_user, &config.instance_url());