From e719bafc9b413712ab36a715a4670bc2ff8e824c Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 15 Apr 2024 19:25:40 +0200 Subject: [PATCH] extract sending task code to separate file --- crates/federate/src/lib.rs | 1 + crates/federate/src/send.rs | 117 ++++++++++++++++++++++++++++++++++ crates/federate/src/worker.rs | 97 +++------------------------- 3 files changed, 126 insertions(+), 89 deletions(-) create mode 100644 crates/federate/src/send.rs diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index ebc6c783e..0a95c85aa 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -15,6 +15,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; mod inboxes; +mod send; mod util; mod worker; diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs new file mode 100644 index 000000000..6a2ac5364 --- /dev/null +++ b/crates/federate/src/send.rs @@ -0,0 +1,117 @@ +use crate::util::{get_activity_cached, get_actor_cached}; +use activitypub_federation::{ + activity_sending::SendActivityTask, + config::Data, + protocol::context::WithContext, +}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; +use reqwest::Url; +use std::{ops::Deref, sync::Arc, time::Duration}; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct SendSuccessInfo { + pub activity_id: ActivityId, + pub published: Option>, + pub was_skipped: bool, +} +// need to be able to order them for the binary heap in the worker +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } +} +pub(crate) enum SendActivityResult { + Success(SendSuccessInfo), + Failure { + fail_count: i32, + // activity_id: ActivityId, + }, +} + +pub(crate) struct SendRetryTask<'a> { + pub activity: &'a SentActivity, + pub object: &'a SharedInboxActivities, + /// must not be empty at this point + pub inbox_urls: Vec, + /// report to the main instance worker + pub report: &'a mut UnboundedSender, + /// the first request will be sent immediately, but the next one will be delayed according to the number of previous fails + 1 + pub initial_fail_count: i32, + /// for logging + pub domain: String, + pub context: Data, + pub stop: CancellationToken, +} + +impl<'a> SendRetryTask<'a> { + // this function will return successfully when (a) send succeeded or (b) worker cancelled + // and will return an error if an internal error occurred (send errors cause an infinite loop) + pub async fn send_retry_loop(self) -> Result<()> { + let SendRetryTask { + activity, + object, + inbox_urls, + report, + initial_fail_count, + domain, + context, + stop, + } = self; + debug_assert!(!inbox_urls.is_empty()); + + let pool = &mut context.pool(); + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) + .await + .context("failed getting actor instance (was it marked deleted / removed?)")?; + + let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; + for task in requests { + // usually only one due to shared inbox + tracing::debug!("sending out {}", task); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay = federate_retry_sleep_duration(fail_count); + tracing::info!( + "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", + domain, + activity.id, + fail_count + ); + tokio::select! { + () = sleep(retry_delay) => {}, + () = stop.cancelled() => { + // save state to db and exit + // TODO: do we need to report state here to prevent hang on exit? + return Ok(()); + } + } + } + } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; + Ok(()) + } +} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index acf52ca96..5ed7c22d6 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,5 +1,6 @@ use crate::{ inboxes::CommunityInboxCollector, + send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, util::{ get_activity_cached, get_actor_cached, @@ -61,30 +62,6 @@ pub(crate) struct InstanceWorker { inbox_collector: CommunityInboxCollector, } -#[derive(Debug, PartialEq, Eq)] -struct SendSuccessInfo { - activity_id: ActivityId, - published: Option>, - was_skipped: bool, -} -impl PartialOrd for SendSuccessInfo { - fn partial_cmp(&self, other: &Self) -> Option { - other.activity_id.partial_cmp(&self.activity_id) - } -} -impl Ord for SendSuccessInfo { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other.activity_id.cmp(&self.activity_id) - } -} -enum SendActivityResult { - Success(SendSuccessInfo), - Failure { - fail_count: i32, - // activity_id: ActivityId, - }, -} - impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, @@ -340,16 +317,17 @@ impl InstanceWorker { let domain = self.instance.domain.clone(); tokio::spawn(async move { let mut report = report; - let res = InstanceWorker::send_retry_loop( - &ele.0, - &ele.1, + let res = SendRetryTask { + activity: &ele.0, + object: &ele.1, inbox_urls, - &mut report, + report: &mut report, initial_fail_count, domain, - data, + context: data, stop, - ) + } + .send_retry_loop() .await; if let Err(e) = res { tracing::warn!( @@ -369,65 +347,6 @@ impl InstanceWorker { Ok(()) } - // this function will return successfully when (a) send succeeded or (b) worker cancelled - // and will return an error if an internal error occurred (send errors cause an infinite loop) - async fn send_retry_loop( - activity: &SentActivity, - object: &SharedInboxActivities, - inbox_urls: Vec, - report: &mut UnboundedSender, - initial_fail_count: i32, - domain: String, - context: Data, - stop: CancellationToken, - ) -> Result<()> { - debug_assert!(!inbox_urls.is_empty()); - - let pool = &mut context.pool(); - let Some(actor_apub_id) = &activity.actor_apub_id else { - return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); - }; - let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) - .await - .context("failed getting actor instance (was it marked deleted / removed?)")?; - - let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; - for task in requests { - // usually only one due to shared inbox - tracing::debug!("sending out {}", task); - let mut fail_count = initial_fail_count; - while let Err(e) = task.sign_and_send(&context).await { - fail_count += 1; - report.send(SendActivityResult::Failure { - fail_count, - // activity_id: activity.id, - })?; - let retry_delay: Duration = federate_retry_sleep_duration(fail_count); - tracing::info!( - "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - domain, - activity.id, - fail_count - ); - tokio::select! { - () = sleep(retry_delay) => {}, - () = stop.cancelled() => { - // save state to db and exit - // TODO: do we need to report state here to prevent hang on exit? - return Ok(()); - } - } - } - } - report.send(SendActivityResult::Success(SendSuccessInfo { - activity_id: activity.id, - published: Some(activity.published), - was_skipped: false, - }))?; - Ok(()) - } - async fn save_and_send_state(&mut self) -> Result<()> { tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now();