From 539f06af977b9ced7880870920dfc4099050fdcf Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 13 Apr 2024 23:18:28 +0200 Subject: [PATCH] federation: parallel sending --- Cargo.lock | 4 +- Cargo.toml | 2 +- crates/federate/src/lib.rs | 7 +- crates/federate/src/worker.rs | 424 ++++++++++++++++++++++++---------- 4 files changed, 303 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97504600b..979e9af10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5439,9 +5439,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 4d9485ca5..6737d3b9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ anyhow = { version = "1.0.81", features = [ diesel_ltree = "0.3.1" typed-builder = "0.18.1" serial_test = "2.0.0" -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } regex = "1.10.3" once_cell = "1.19.0" diesel-derive-newtype = "2.1.0" diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index a4dc49536..a48f31ed9 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -73,19 +73,16 @@ async fn start_stop_federation_workers( // create new worker let config = federation_config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { let instance = instance.clone(); - let req_data = config.clone().to_request_data(); + let config = config.clone(); let stats_sender = stats_sender.clone(); - let pool = pool.clone(); async move { InstanceWorker::init_and_loop( instance, - req_data, - &mut DbPool::Pool(&pool), + config, stop, stats_sender, ) diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index ff2a68e3c..0dbf1c391 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -7,7 +7,7 @@ use crate::util::{ }; use activitypub_federation::{ activity_sending::SendActivityTask, - config::Data, + config::{Data, FederationConfig}, protocol::context::WithContext, }; use anyhow::{Context, Result}; @@ -22,22 +22,22 @@ use lemmy_db_schema::{ instance::{Instance, InstanceForm}, site::Site, }, - utils::{naive_now, DbPool}, + utils::{naive_now, ActualDbPool, DbPool}, }; use lemmy_db_views_actor::structs::CommunityFollowerView; use once_cell::sync::Lazy; use reqwest::Url; use std::{ - collections::{HashMap, HashSet}, + collections::{BinaryHeap, HashMap, HashSet}, ops::{Add, Deref}, time::Duration, }; -use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio::{ + sync::mpsc::{self, UnboundedSender}, + time::sleep, +}; use tokio_util::sync::CancellationToken; -/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) -/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. -static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); /// interval with which new additions to community_followers are queried. @@ -57,6 +57,16 @@ static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { /// This is expected to happen pretty rarely and updating it in a timely manner is not too important. static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds")); + +static CONCURRENT_SENDS: Lazy = Lazy::new(|| { + std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(8) +}); +/// Maximum number of successful sends to allow out of order +const MAX_SUCCESSFULS: usize = 1000; + pub(crate) struct InstanceWorker { instance: Instance, // load site lazily because if an instance is first seen due to being on allowlist, @@ -67,60 +77,116 @@ pub(crate) struct InstanceWorker { site: Option, followed_communities: HashMap>, stop: CancellationToken, - context: Data, + config: FederationConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, last_state_insert: DateTime, + pool: ActualDbPool, +} + +#[derive(Debug, PartialEq, Eq)] +struct SendSuccessInfo { + activity_id: ActivityId, + published: Option>, + was_skipped: bool, +} +impl PartialOrd for SendSuccessInfo { + fn partial_cmp(&self, other: &Self) -> Option { + other.activity_id.partial_cmp(&self.activity_id) + } +} +impl Ord for SendSuccessInfo { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.activity_id.cmp(&self.activity_id) + } + +} +enum SendActivityResult { + Success(SendSuccessInfo), + Failure { + fail_count: i32, + // activity_id: ActivityId, + }, } impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, - context: Data, - pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes + config: FederationConfig, + // pool: ActualDbPool, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { - let state = FederationQueueState::load(pool, instance.id).await?; + let state = + FederationQueueState::load(&mut config.to_request_data().pool(), instance.id).await?; + let pool = config.to_request_data().inner_pool().clone(); let mut worker = InstanceWorker { instance, site_loaded: false, site: None, followed_communities: HashMap::new(), stop, - context, + config, stats_sender, last_full_communities_fetch: Utc.timestamp_nanos(0), last_incremental_communities_fetch: Utc.timestamp_nanos(0), state, last_state_insert: Utc.timestamp_nanos(0), + pool, }; - worker.loop_until_stopped(pool).await + worker.loop_until_stopped().await } /// loop fetch new activities from db and send them to the inboxes of the given instances /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) - pub(crate) async fn loop_until_stopped( - &mut self, - pool: &mut DbPool<'_>, - ) -> Result<(), anyhow::Error> { - let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - - self.update_communities(pool).await?; + async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; + let mut latest_id = self.get_latest_id().await?; + + // activities that have been successfully sent but + // that are not the lowest number and thus can't be written to the database yet + let mut successfuls = BinaryHeap::::new(); + let mut in_flight: i64 = 0; + + let (report_inbox_result, mut receive_inbox_result) = + tokio::sync::mpsc::unbounded_channel::(); while !self.stop.is_cancelled() { - self.loop_batch(pool).await?; - if self.stop.is_cancelled() { - break; + // check if we need to wait for a send to finish before sending the next one + let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) + || successfuls.len() > MAX_SUCCESSFULS + || in_flight >= *CONCURRENT_SENDS; + if need_wait_for_event || receive_inbox_result.len() > 4 { + self + .handle_send_results(&mut receive_inbox_result, &mut successfuls, &mut in_flight) + .await?; + } else { + self.update_communities().await?; + let last_successful_id = self + .state + .last_successful_id + .map(|e| e.0) + .expect("set above"); + let next_id = ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1); + if next_id > latest_id { + latest_id = self.get_latest_id().await?; + if next_id > latest_id { + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, + () = self.stop.cancelled() => {} + } + continue; + } + } + in_flight += 1; + self + .spawn_send_if_needed(next_id, report_inbox_result.clone()) + .await?; } - if (Utc::now() - self.last_state_insert) > save_state_every { - self.save_and_send_state(pool).await?; - } - self.update_communities(pool).await?; } - // final update of state in db - self.save_and_send_state(pool).await?; + // final update of state in db on shutdown + self.save_and_send_state().await?; Ok(()) } @@ -137,6 +203,11 @@ impl InstanceWorker { return Ok(()); } let remaining = required - elapsed; + tracing::debug!( + "{}: fail-sleeping for {:?} before starting queue", + self.instance.domain, + remaining + ); tokio::select! { () = sleep(remaining) => {}, () = self.stop.cancelled() => {} @@ -144,78 +215,174 @@ impl InstanceWorker { } Ok(()) } - /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities - async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { - let latest_id = get_latest_activity_id(pool).await?; - let mut id = if let Some(id) = self.state.last_successful_id { - id - } else { + + /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen + async fn get_latest_id(&mut self) -> Result { + let latest_id = get_latest_activity_id(&mut self.pool()).await?; + if let None = self.state.last_successful_id { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened - self.save_and_send_state(pool).await?; - latest_id - }; - if id >= latest_id { - // no more work to be done, wait before rechecking - tokio::select! { - () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, - () = self.stop.cancelled() => {} + self.save_and_send_state().await?; + } + Ok(latest_id) + } + + async fn handle_send_results( + &mut self, + receive_inbox_result: &mut mpsc::UnboundedReceiver, + successfuls: &mut BinaryHeap, + in_flight: &mut i64, + ) -> Result<(), anyhow::Error> { + let force_write = false; + let mut events = Vec::new(); + // wait for at least one event but if there's multiple handle them all + receive_inbox_result.recv_many(&mut events, 1000).await; + for event in events { + match event { + SendActivityResult::Success(s) => { + self.state.fail_count = 0; + *in_flight -= 1; + if !s.was_skipped { + self.mark_instance_alive().await?; + } + successfuls.push(s); + } + SendActivityResult::Failure { fail_count, .. } => { + if fail_count > self.state.fail_count { + // override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine + self.state.fail_count = fail_count; + self.state.last_retry = Some(Utc::now()); + } + } } + } + self + .pop_successfuls_and_write(successfuls, force_write) + .await?; + Ok(()) + } + async fn mark_instance_alive(&mut self) -> Result<()> { + // Activity send successful, mark instance as alive if it hasn't been updated in a while. + let updated = self.instance.updated.unwrap_or(self.instance.published); + if updated.add(Days::new(1)) < Utc::now() { + self.instance.updated = Some(Utc::now()); + + let form = InstanceForm::builder() + .domain(self.instance.domain.clone()) + .updated(Some(naive_now())) + .build(); + Instance::update(&mut self.pool(), self.instance.id, form).await?; + } + Ok(()) + } + /// checks whether the highest successful id can be updated and writes to db if so + async fn pop_successfuls_and_write( + &mut self, + successfuls: &mut BinaryHeap, + force_write: bool, + ) -> Result<()> { + let Some(mut last_id) = self.state.last_successful_id else { + tracing::warn!("should be impossible: last successful id is None"); + return Ok(()); + }; + tracing::debug!( + "last: {:?}, next: {:?}, currently in successfuls: {:?}", + last_id, + successfuls.peek(), + successfuls.iter() + ); + while successfuls + .peek() + .map(|a| &a.activity_id == &ActivityId(last_id.0 + 1)) + .unwrap_or(false) + { + let next = successfuls.pop().unwrap(); + last_id = next.activity_id; + self.state.last_successful_id = Some(next.activity_id); + self.state.last_successful_published_time = next.published; + } + + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + if force_write || (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state().await?; + } + Ok(()) + } + + async fn spawn_send_if_needed( + &mut self, + activity_id: ActivityId, + report: UnboundedSender, + ) -> Result<()> { + let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) + .await + .context("failed reading activity from db")? + else { + tracing::debug!("{}: {:?} does not exist", self.instance.domain, activity_id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: None, + was_skipped: true, + }))?; + return Ok(()); + }; + let activity = &ele.0; + let inbox_urls = self + .get_inbox_urls(activity) + .await + .context("failed figuring out inbox urls")?; + if inbox_urls.is_empty() { + tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id, + published: Some(activity.published), + was_skipped: true, + }))?; return Ok(()); } - let mut processed_activities = 0; - while id < latest_id - && processed_activities < CHECK_SAVE_STATE_EVERY_IT - && !self.stop.is_cancelled() - { - id = ActivityId(id.0 + 1); - processed_activities += 1; - let Some(ele) = get_activity_cached(pool, id) - .await - .context("failed reading activity from db")? - else { - tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); - self.state.last_successful_id = Some(id); - continue; - }; - if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { + let inbox_urls = inbox_urls.into_iter().collect(); + let initial_fail_count = self.state.fail_count; + let data = self.config.to_request_data(); + let stop = self.stop.clone(); + let domain = self.instance.domain.clone(); + tokio::spawn(async move { + if let Err(e) = InstanceWorker::send_retry_loop( + &ele.0, + &ele.1, + inbox_urls, + report, + initial_fail_count, + domain, + data, + stop, + ) + .await + { tracing::warn!( "sending {} errored internally, skipping activity: {:?}", ele.0.ap_id, e ); } - if self.stop.is_cancelled() { - return Ok(()); - } - // send success! - self.state.last_successful_id = Some(id); - self.state.last_successful_published_time = Some(ele.0.published); - self.state.fail_count = 0; - } + }); Ok(()) } // this function will return successfully when (a) send succeeded or (b) worker cancelled // and will return an error if an internal error occurred (send errors cause an infinite loop) async fn send_retry_loop( - &mut self, - pool: &mut DbPool<'_>, activity: &SentActivity, object: &SharedInboxActivities, + inbox_urls: Vec, + report: UnboundedSender, + initial_fail_count: i32, + domain: String, + context: Data, + stop: CancellationToken, ) -> Result<()> { - let inbox_urls = self - .get_inbox_urls(pool, activity) - .await - .context("failed figuring out inbox urls")?; - if inbox_urls.is_empty() { - tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); - self.state.last_successful_id = Some(activity.id); - self.state.last_successful_published_time = Some(activity.published); - return Ok(()); - } + let pool = &mut context.pool(); let Some(actor_apub_id) = &activity.actor_apub_id else { return Ok(()); // activity was inserted before persistent queue was activated }; @@ -224,61 +391,50 @@ impl InstanceWorker { .context("failed getting actor instance (was it marked deleted / removed?)")?; let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone()); - let inbox_urls = inbox_urls.into_iter().collect(); - let requests = - SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; + let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?; for task in requests { // usually only one due to shared inbox tracing::debug!("sending out {}", task); - while let Err(e) = task.sign_and_send(&self.context).await { - self.state.fail_count += 1; - self.state.last_retry = Some(Utc::now()); - let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); + let mut fail_count = initial_fail_count; + while let Err(e) = task.sign_and_send(&context).await { + fail_count += 1; + report.send(SendActivityResult::Failure { + fail_count, + // activity_id: activity.id, + })?; + let retry_delay: Duration = federate_retry_sleep_duration(fail_count); tracing::info!( "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", - self.instance.domain, + domain, activity.id, - self.state.fail_count + fail_count ); - self.save_and_send_state(pool).await?; tokio::select! { () = sleep(retry_delay) => {}, - () = self.stop.cancelled() => { + () = stop.cancelled() => { // save state to db and exit return Ok(()); } } } - - // Activity send successful, mark instance as alive if it hasn't been updated in a while. - let updated = self.instance.updated.unwrap_or(self.instance.published); - if updated.add(Days::new(1)) < Utc::now() { - self.instance.updated = Some(Utc::now()); - - let form = InstanceForm::builder() - .domain(self.instance.domain.clone()) - .updated(Some(naive_now())) - .build(); - Instance::update(pool, self.instance.id, form).await?; - } } + report.send(SendActivityResult::Success(SendSuccessInfo { + activity_id: activity.id, + published: Some(activity.published), + was_skipped: false, + }))?; Ok(()) } - /// get inbox urls of sending the given activity to the given instance /// most often this will return 0 values (if instance doesn't care about the activity) /// or 1 value (the shared inbox) /// > 1 values only happens for non-lemmy software - async fn get_inbox_urls( - &mut self, - pool: &mut DbPool<'_>, - activity: &SentActivity, - ) -> Result> { + async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result> { let mut inbox_urls: HashSet = HashSet::new(); if activity.send_all_instances { if !self.site_loaded { - self.site = Site::read_from_instance_id(pool, self.instance.id).await?; + self.site = Site::read_from_instance_id(&mut self.pool(), self.instance.id).await?; self.site_loaded = true; } if let Some(site) = &self.site { @@ -302,23 +458,30 @@ impl InstanceWorker { Ok(inbox_urls) } - async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn update_communities(&mut self) -> Result<()> { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + tracing::debug!( + "{}: fetching full list of communities", + self.instance.domain + ); // process removals every hour (self.followed_communities, self.last_full_communities_fetch) = self - .get_communities(pool, self.instance.id, Utc.timestamp_nanos(0)) + .get_communities(self.instance.id, Utc.timestamp_nanos(0)) .await?; self.last_incremental_communities_fetch = self.last_full_communities_fetch; } if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { // process additions every minute let (news, time) = self - .get_communities( - pool, - self.instance.id, - self.last_incremental_communities_fetch, - ) + .get_communities(self.instance.id, self.last_incremental_communities_fetch) .await?; + if !news.is_empty() { + tracing::debug!( + "{}: fetched {} incremental new followed communities", + self.instance.domain, + news.len() + ); + } self.followed_communities.extend(news); self.last_incremental_communities_fetch = time; } @@ -328,29 +491,38 @@ impl InstanceWorker { /// get a list of local communities with the remote inboxes on the given instance that cares about them async fn get_communities( &mut self, - pool: &mut DbPool<'_>, instance_id: InstanceId, last_fetch: DateTime, ) -> Result<(HashMap>, DateTime)> { let new_last_fetch = Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact Ok(( - CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) - .await? - .into_iter() - .fold(HashMap::new(), |mut map, (c, u)| { - map.entry(c).or_default().insert(u.into()); - map - }), + CommunityFollowerView::get_instance_followed_community_inboxes( + &mut self.pool(), + instance_id, + last_fetch, + ) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_default().insert(u.into()); + map + }), new_last_fetch, )) } - async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + async fn save_and_send_state(&mut self) -> Result<()> { + tracing::debug!("{}: saving and sending state", self.instance.domain); self.last_state_insert = Utc::now(); - FederationQueueState::upsert(pool, &self.state).await?; + FederationQueueState::upsert(&mut self.pool(), &self.state).await?; self .stats_sender .send((self.instance.domain.clone(), self.state.clone()))?; Ok(()) } + + fn pool(&self) -> DbPool<'_> { + //self.config.to_request_data() + DbPool::Pool(&self.pool) + } }