diff --git a/config/defaults.hjson b/config/defaults.hjson index c52f9055e..ce440ab69 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -106,10 +106,11 @@ port: 8536 # Whether the site is available over TLS. Needs to be true for federation to work. tls_enabled: true - # The number of activitypub federation workers that can be in-flight concurrently - worker_count: 0 - # The number of activitypub federation retry workers that can be in-flight concurrently - retry_count: 0 + federation: { + # Limit to the number of concurrent outgoing federation requests per target instance. + # Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + concurrent_sends_per_instance: 1 + } prometheus: { bind: "127.0.0.1" port: 10002 diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 0a95c85aa..cb2fd1fa6 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,7 +1,11 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ newtypes::InstanceId, source::{federation_queue_state::FederationQueueState, instance::Instance}, @@ -36,7 +40,8 @@ pub struct Opts { async fn start_stop_federation_workers( opts: Opts, pool: ActualDbPool, - federation_config: FederationConfig, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, cancel: CancellationToken, ) -> anyhow::Result<()> { let mut workers = HashMap::::new(); @@ -45,7 +50,9 @@ async fn start_stop_federation_workers( let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let pool2 = &mut DbPool::Pool(&pool); let process_index = opts.process_index - 1; - let local_domain = federation_config.settings().get_hostname_without_port()?; + let local_domain = federation_lib_config + .settings() + .get_hostname_without_port()?; loop { let mut total_count = 0; let mut dead_count = 0; @@ -73,15 +80,19 @@ async fn start_stop_federation_workers( continue; } // create new worker - let config = federation_config.clone(); + let config = federation_lib_config.clone(); let stats_sender = stats_sender.clone(); + let federation_worker_config = federation_worker_config.clone(); workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let instance = instance.clone(); - let config = config.clone(); - let stats_sender = stats_sender.clone(); - async move { InstanceWorker::init_and_loop(instance, config, stop, stats_sender).await } + InstanceWorker::init_and_loop( + instance.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + stats_sender.clone(), + ) }), ); } else if !should_federate { @@ -117,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable( opts: Opts, pool: ActualDbPool, config: FederationConfig, + federation_worker_config: FederationWorkerConfig, ) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { - let opts = opts.clone(); - let pool = pool.clone(); - let config = config.clone(); - async move { start_stop_federation_workers(opts, pool, config, stop).await } + start_stop_federation_workers( + opts.clone(), + pool.clone(), + config.clone(), + federation_worker_config.clone(), + stop, + ) }) } diff --git a/crates/federate/src/send.rs b/crates/federate/src/send.rs index 6a2ac5364..41516eef7 100644 --- a/crates/federate/src/send.rs +++ b/crates/federate/src/send.rs @@ -1,4 +1,4 @@ -use crate::util::{get_activity_cached, get_actor_cached}; +use crate::util::get_actor_cached; use activitypub_federation::{ activity_sending::SendActivityTask, config::Data, @@ -10,7 +10,7 @@ use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity}; use reqwest::Url; -use std::{ops::Deref, sync::Arc, time::Duration}; +use std::ops::Deref; use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio_util::sync::CancellationToken; diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 5ed7c22d6..edf530fb3 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,38 +1,25 @@ use crate::{ inboxes::CommunityInboxCollector, send::{SendActivityResult, SendRetryTask, SendSuccessInfo}, - util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - WORK_FINISHED_RECHECK_DELAY, - }, -}; -use activitypub_federation::{ - activity_sending::SendActivityTask, - config::{Data, FederationConfig}, - protocol::context::WithContext, + util::{get_activity_cached, get_latest_activity_id, WORK_FINISHED_RECHECK_DELAY}, }; +use activitypub_federation::config::FederationConfig; use anyhow::{Context, Result}; use chrono::{DateTime, Days, TimeZone, Utc}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; -use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT}; +use lemmy_api_common::{ + context::LemmyContext, + federate_retry_sleep_duration, + lemmy_utils::settings::structs::FederationWorkerConfig, +}; use lemmy_db_schema::{ newtypes::ActivityId, source::{ - activity::SentActivity, federation_queue_state::FederationQueueState, instance::{Instance, InstanceForm}, }, utils::{naive_now, ActualDbPool, DbPool}, }; -use once_cell::sync::Lazy; -use reqwest::Url; -use std::{ - collections::BinaryHeap, - ops::{Add, Deref}, - time::Duration, -}; +use std::{collections::BinaryHeap, ops::Add, time::Duration}; use tokio::{ sync::mpsc::{self, UnboundedSender}, time::sleep, @@ -42,19 +29,14 @@ use tokio_util::sync::CancellationToken; /// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent) static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); -static CONCURRENT_SENDS: Lazy = Lazy::new(|| { - std::env::var("LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(8) -}); /// Maximum number of successful sends to allow out of order const MAX_SUCCESSFULS: usize = 1000; pub(crate) struct InstanceWorker { instance: Instance, stop: CancellationToken, - config: FederationConfig, + federation_lib_config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stats_sender: UnboundedSender<(String, FederationQueueState)>, state: FederationQueueState, last_state_insert: DateTime, @@ -66,6 +48,7 @@ impl InstanceWorker { pub(crate) async fn init_and_loop( instance: Instance, config: FederationConfig, + federation_worker_config: FederationWorkerConfig, stop: CancellationToken, stats_sender: UnboundedSender<(String, FederationQueueState)>, ) -> Result<(), anyhow::Error> { @@ -77,9 +60,10 @@ impl InstanceWorker { instance.id, instance.domain.clone(), ), + federation_worker_config, instance, stop, - config, + federation_lib_config: config, stats_sender, state, last_state_insert: Utc.timestamp_nanos(0), @@ -108,7 +92,7 @@ impl InstanceWorker { // or (b) if we have too many successfuls in memory or (c) if we have too many in flight let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0) || successfuls.len() >= MAX_SUCCESSFULS - || in_flight >= *CONCURRENT_SENDS; + || in_flight >= self.federation_worker_config.concurrent_sends_per_instance; if need_wait_for_event || receive_send_result.len() > 4 { // if len() > 0 then this does not block and allows us to write to db more often // if len is 0 then this means we wait for something to change our above conditions, @@ -312,7 +296,7 @@ impl InstanceWorker { return Ok(()); } let initial_fail_count = self.state.fail_count; - let data = self.config.to_request_data(); + let data = self.federation_lib_config.to_request_data(); let stop = self.stop.clone(); let domain = self.instance.domain.clone(); tokio::spawn(async move { diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 4a8d8afb6..c5394a908 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -42,12 +42,8 @@ pub struct Settings { #[default(None)] #[doku(skip)] pub opentelemetry_url: Option, - /// The number of activitypub federation workers that can be in-flight concurrently - #[default(0)] - pub worker_count: usize, - /// The number of activitypub federation retry workers that can be in-flight concurrently - #[default(0)] - pub retry_count: usize, + #[default(Default::default())] + pub federation: FederationWorkerConfig, // Prometheus configuration. #[default(None)] #[doku(example = "Some(Default::default())")] @@ -234,3 +230,13 @@ pub struct PrometheusConfig { #[doku(example = "10002")] pub port: i32, } + +#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] +#[serde(default)] +// named federation"worker"config to disambiguate from the activitypub library configuration +pub struct FederationWorkerConfig { + /// Limit to the number of concurrent outgoing federation requests per target instance. + /// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up. + #[default(1)] + pub concurrent_sends_per_instance: i64, +} diff --git a/src/lib.rs b/src/lib.rs index 00960826a..e9ac06e62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -213,6 +213,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { }, pool.clone(), federation_config.clone(), + SETTINGS.federation.clone(), ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;