diff --git a/Cargo.lock b/Cargo.lock index d926aebe3..ffdf71756 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2977,6 +2977,7 @@ name = "lemmy_federate" version = "0.19.4-rc.3" dependencies = [ "activitypub_federation", + "actix-web", "anyhow", "chrono", "diesel", @@ -2995,6 +2996,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", ] [[package]] diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index 31eb111c2..871bbb050 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -11,8 +11,6 @@ fi export RUST_BACKTRACE=1 export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL" -export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min - # pictrs setup if [ ! -f "api_tests/pict-rs" ]; then curl "https://git.asonix.dog/asonix/pict-rs/releases/download/v0.5.13/pict-rs-linux-amd64" -o api_tests/pict-rs diff --git a/crates/api_common/src/context.rs b/crates/api_common/src/context.rs index f4ac41db1..f088212ee 100644 --- a/crates/api_common/src/context.rs +++ b/crates/api_common/src/context.rs @@ -75,6 +75,7 @@ impl LemmyContext { .app_data(context) // Dont allow any network fetches .http_fetch_limit(0) + .debug(true) .build() .await .expect("build federation config"); diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 2405d3af0..1316f6bfc 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -37,3 +37,5 @@ tokio-util = "0.7.11" [dev-dependencies] serial_test = { workspace = true } +url.workspace = true +actix-web.workspace = true diff --git a/crates/federate/src/inboxes.rs b/crates/federate/src/inboxes.rs index d50a5d044..33fe9f601 100644 --- a/crates/federate/src/inboxes.rs +++ b/crates/federate/src/inboxes.rs @@ -1,5 +1,4 @@ -use crate::util::LEMMY_TEST_FAST_FEDERATION; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, TimeDelta, TimeZone, Utc}; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::CommunityId, @@ -20,11 +19,11 @@ use std::collections::{HashMap, HashSet}; /// currently fairly high because of the current structure of storing inboxes for every person, not /// having a separate list of shared_inboxes, and the architecture of having every instance queue be /// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958) -static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = Lazy::new(|| { + if cfg!(debug_assertions) { + TimeDelta::try_seconds(1).expect("TimeDelta out of bounds") } else { - chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") + TimeDelta::try_minutes(2).expect("TimeDelta out of bounds") } }); @@ -92,9 +91,11 @@ impl CommunityInboxCollector { .send_inboxes .iter() .filter_map(std::option::Option::as_ref) - .filter(|&u| (u.domain() == Some(&self.instance.domain))) .map(|u| u.inner().clone()), ); + + // TODO: also needs to send to user followers + Ok(inbox_urls) } diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 02a90dee9..4476c6f5e 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -27,28 +27,16 @@ use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; use tracing::error; -/// Decrease the delays of the federation queue. -/// Should only be used for federation tests since it significantly increases CPU and DB load of the -/// federation queue. -pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy = Lazy::new(|| { - std::env::var("LEMMY_TEST_FAST_FEDERATION") - .map(|s| !s.is_empty()) - .unwrap_or(false) -}); - /// Recheck for new federation work every n seconds. /// /// When the queue is processed faster than new activities are added and it reaches the current time /// with an empty batch, this is the delay the queue waits before it checks if new activities have /// been added to the sent_activities table. This delay is only applied if no federated activity /// happens during sending activities of the last batch. -pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy = Lazy::new(|| { - if *LEMMY_TEST_FAST_FEDERATION { - Duration::from_millis(100) - } else { - Duration::from_secs(30) - } -}); +#[cfg(debug_assertions)] +pub(crate) static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_millis(100); +#[cfg(not(debug_assertions))] +pub(crate) static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); /// A task that will be run in an infinite loop, unless it is cancelled. /// If the task exits without being cancelled, an error will be logged and the task will be diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 6b7100954..41dadc765 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -40,7 +40,10 @@ use tracing::{debug, info, trace, warn}; static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; /// Save state to db after this time has passed since the last state (so if the server crashes or is /// SIGKILLed, less than X seconds of activities are resent) -static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); +#[cfg(debug_assertions)] +static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(1); +#[cfg(not(debug_assertions))] +static SAVE_STATE_EVERY_TIME: chrono::Duration = chrono::Duration::seconds(60); pub(crate) struct InstanceWorker { instance: Instance, @@ -78,8 +81,6 @@ impl InstanceWorker { /// cancelled (graceful exit) pub(crate) async fn loop_until_stopped(&mut self) -> LemmyResult<()> { debug!("Starting federation worker for {}", self.instance.domain); - let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); - self.inboxes.update_communities(&self.context).await?; self.initial_fail_sleep().await?; while !self.stop.is_cancelled() { @@ -87,7 +88,7 @@ impl InstanceWorker { if self.stop.is_cancelled() { break; } - if (Utc::now() - self.last_state_insert) > save_state_every { + if (Utc::now() - self.last_state_insert) > SAVE_STATE_EVERY_TIME { self.save_and_send_state().await?; } self.inboxes.update_communities(&self.context).await?; @@ -135,7 +136,7 @@ impl InstanceWorker { if id >= latest_id { // no more work to be done, wait before rechecking tokio::select! { - () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, + () = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, () = self.stop.cancelled() => {} } return Ok(()); @@ -179,6 +180,7 @@ impl InstanceWorker { activity: &SentActivity, object: &SharedInboxActivities, ) -> LemmyResult<()> { + println!("send retry loop {:?}", activity.id); let inbox_urls = self.inboxes.get_inbox_urls(activity, &self.context).await?; if inbox_urls.is_empty() { trace!("{}: {:?} no inboxes", self.instance.domain, activity.id); @@ -186,6 +188,7 @@ impl InstanceWorker { self.state.last_successful_published_time = Some(activity.published); return Ok(()); } + // TODO: make db column not null let Some(actor_apub_id) = &activity.actor_apub_id else { return Ok(()); // activity was inserted before persistent queue was activated }; @@ -242,3 +245,146 @@ impl InstanceWorker { Ok(()) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod test { + + use super::*; + use activitypub_federation::http_signatures::generate_actor_keypair; + use actix_web::{rt::System, web, App, HttpResponse, HttpServer}; + use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url}; + use lemmy_db_schema::{ + newtypes::DbUrl, + source::{ + activity::{ActorType, SentActivityForm}, + person::{Person, PersonInsertForm}, + }, + traits::Crud, + }; + use reqwest::StatusCode; + use serde_json::Value; + use serial_test::serial; + use std::{fs::File, io::BufReader}; + use tokio::{ + select, + spawn, + sync::mpsc::{error::TryRecvError, unbounded_channel}, + }; + use url::Url; + + #[tokio::test] + #[serial] + async fn test_worker() -> LemmyResult<()> { + let context = LemmyContext::init_test_context().await; + let instance = Instance::read_or_create(&mut context.pool(), "alpha.com".to_string()).await?; + + let actor_keypair = generate_actor_keypair()?; + let actor_id: DbUrl = Url::parse("http://local.com/u/alice")?.into(); + let person_form = PersonInsertForm::builder() + .name("alice".to_string()) + .actor_id(Some(actor_id.clone())) + .private_key(Some(actor_keypair.private_key)) + .public_key(actor_keypair.public_key) + .inbox_url(Some(generate_inbox_url(&actor_id)?)) + .shared_inbox_url(Some(generate_shared_inbox_url(context.settings())?)) + .instance_id(instance.id) + .build(); + let person = Person::create(&mut context.pool(), &person_form).await?; + + let cancel = CancellationToken::new(); + let (stats_sender, mut stats_receiver) = unbounded_channel(); + let (inbox_sender, mut inbox_receiver) = unbounded_channel(); + + // listen for received activities in background + let cancel_ = cancel.clone(); + std::thread::spawn(move || System::new().block_on(listen_activities(inbox_sender, cancel_))); + + spawn(InstanceWorker::init_and_loop( + instance.clone(), + context.reset_request_count(), + cancel.clone(), + stats_sender, + )); + // wait for startup before creating sent activity + sleep(WORK_FINISHED_RECHECK_DELAY).await; + + // create outgoing activity + let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?; + let reader = BufReader::new(file); + let form = SentActivityForm { + ap_id: Url::parse("http://local.com/activity/1")?.into(), + data: serde_json::from_reader(reader)?, + sensitive: false, + send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())], + send_all_instances: false, + send_community_followers_of: None, + actor_type: ActorType::Person, + actor_apub_id: person.actor_id, + }; + let sent = SentActivity::create(&mut context.pool(), form).await?; + + sleep(WORK_FINISHED_RECHECK_DELAY).await; + + // first receive at startup + let rcv = stats_receiver.recv().await.unwrap(); + assert_eq!(instance.id, rcv.0); + assert_eq!(instance.id, rcv.1.instance_id); + assert_eq!(Some(ActivityId(0)), rcv.1.last_successful_id); + + // receive for successfully sent activity + let inbox_rcv = inbox_receiver.recv().await.unwrap(); + let parsed_activity = serde_json::from_str::>(&inbox_rcv)?; + assert_eq!(&sent.data, parsed_activity.inner()); + + let rcv = stats_receiver.recv().await.unwrap(); + assert_eq!(instance.id, rcv.0); + assert_eq!(instance.id, rcv.1.instance_id); + assert_eq!(Some(sent.id), rcv.1.last_successful_id); + + // cleanup + cancel.cancel(); + Instance::delete_all(&mut context.pool()).await?; + Person::delete(&mut context.pool(), person.id).await?; + + // also receive state on shutdown + let rcv = stats_receiver.try_recv(); + assert!(rcv.is_ok()); + + // nothing further received + let rcv = stats_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Disconnected), rcv.err()); + let inbox_rcv = inbox_receiver.try_recv(); + assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err()); + + Ok(()) + } + + async fn listen_activities( + inbox_sender: UnboundedSender, + cancel: CancellationToken, + ) -> LemmyResult<()> { + let run = HttpServer::new(move || { + App::new() + .app_data(actix_web::web::Data::new(inbox_sender.clone())) + .route( + "/inbox", + web::post().to( + |inbox_sender: actix_web::web::Data>, body: String| async move { + inbox_sender.send(body.clone()).unwrap(); + HttpResponse::new(StatusCode::OK) + }, + ), + ) + }) + .bind(("127.0.0.1", 8085))? + .run(); + select! { + _ = run => {}, + _ = cancel.cancelled() => { + } + } + Ok(()) + } +} diff --git a/crates/utils/src/apub.rs b/crates/utils/src/apub.rs deleted file mode 100644 index 53e069d77..000000000 --- a/crates/utils/src/apub.rs +++ /dev/null @@ -1,26 +0,0 @@ -use openssl::{pkey::PKey, rsa::Rsa}; -use std::io::{Error, ErrorKind}; - -pub struct Keypair { - pub private_key: String, - pub public_key: String, -} - -/// Generate the asymmetric keypair for ActivityPub HTTP signatures. -pub fn generate_actor_keypair() -> Result { - let rsa = Rsa::generate(2048)?; - let pkey = PKey::from_rsa(rsa)?; - let public_key = pkey.public_key_to_pem()?; - let private_key = pkey.private_key_to_pem_pkcs8()?; - let key_to_string = |key| match String::from_utf8(key) { - Ok(s) => Ok(s), - Err(e) => Err(Error::new( - ErrorKind::Other, - format!("Failed converting key to string: {e}"), - )), - }; - Ok(Keypair { - private_key: key_to_string(private_key)?, - public_key: key_to_string(public_key)?, - }) -} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 1adb3f6cf..5a5e76d2a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -2,7 +2,6 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(feature = "full")] { - pub mod apub; pub mod cache_header; pub mod email; pub mod rate_limit;