diff --git a/Cargo.lock b/Cargo.lock index fd93b9c36..1426c11a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2918,6 +2918,7 @@ dependencies = [ "once_cell", "reqwest", "serde_json", + "serial_test", "tokio", "tokio-util", "tracing", diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index e0c516037..8f2e99cc5 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -131,7 +131,7 @@ pub struct CommentReplyId(i32); #[cfg_attr(feature = "full", derive(DieselNewType, TS))] #[cfg_attr(feature = "full", ts(export))] /// The instance id. -pub struct InstanceId(i32); +pub struct InstanceId(pub i32); #[derive( Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord, diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml index 0c3baf348..2ca17c037 100644 --- a/crates/federate/Cargo.toml +++ b/crates/federate/Cargo.toml @@ -34,3 +34,6 @@ tokio = { workspace = true, features = ["full"] } tracing.workspace = true moka.workspace = true tokio-util = "0.7.10" + +[dev-dependencies] +serial_test = { workspace = true } diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index febdb34e1..87c642998 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -44,6 +44,9 @@ pub struct SendManager { impl SendManager { pub fn new(opts: Opts, context: FederationConfig) -> Self { + assert!(opts.process_count > 0); + assert!(opts.process_index > 0); + let (stats_sender, stats_receiver) = unbounded_channel(); Self { opts, @@ -153,59 +156,102 @@ mod test { use super::*; use activitypub_federation::config::Data; -use tokio::{spawn, time::sleep}; + use lemmy_utils::error::LemmyError; + use serial_test::serial; + use std::sync::{Arc, Mutex}; + use tokio::{spawn, time::sleep}; struct TestData { send_manager: SendManager, context: Data, - instances: Vec + instances: Vec, } - - async fn init() -> LemmyResult { - let context = LemmyContext::init_test_context().await; - let opts = Opts { - process_count: 1, - process_index: 1, - }; - let federation_config = FederationConfig::builder() - .domain("local.com") - .app_data(context.clone()) - .build() - .await?; + impl TestData { + async fn init(process_count: i32, process_index: i32) -> LemmyResult { + let context = LemmyContext::init_test_context().await; + let opts = Opts { + process_count, + process_index, + }; + let federation_config = FederationConfig::builder() + .domain("local.com") + .app_data(context.clone()) + .build() + .await?; let pool = &mut context.pool(); - let instances = vec![ - Instance::read_or_create(pool, "alpha.com".to_string()).await?, - Instance::read_or_create(pool, "beta.com".to_string()).await?, - Instance::read_or_create(pool, "gamma.com".to_string()).await?, - ]; + let instances = vec![ + Instance::read_or_create(pool, "alpha.com".to_string()).await?, + Instance::read_or_create(pool, "beta.com".to_string()).await?, + Instance::read_or_create(pool, "gamma.com".to_string()).await?, + ]; - let send_manager = SendManager::new(opts, federation_config); - Ok(TestData {send_manager, - context,instances - }) + let send_manager = SendManager::new(opts, federation_config); + Ok(Self { + send_manager, + context, + instances, + }) + } + + async fn run(&mut self) -> LemmyResult<()> { + // start it and cancel after workers are running + let cancel = CancellationToken::new(); + let cancel_ = cancel.clone(); + spawn(async move { + sleep(Duration::from_millis(100)).await; + cancel_.cancel(); + }); + self.send_manager.do_loop(cancel.clone()).await?; + Ok(()) + } + + async fn cleanup(self) -> LemmyResult<()> { + self.send_manager.cancel().await?; + Instance::delete_all(&mut self.context.pool()).await?; + Ok(()) + } + } + + // check that correct number of instance workers was started + // TODO: need to wrap in Arc or something similar + // TODO: test with different `opts`, dead/blocked instances etc + + #[tokio::test] + #[serial] + async fn test_send_manager() -> LemmyResult<()> { + let mut data = TestData::init(1, 1).await?; + + data.run().await?; + assert_eq!(3, data.send_manager.workers.len()); + + data.cleanup().await?; + Ok(()) } #[tokio::test] - async fn test_start_stop_federation_workers() -> LemmyResult<()> { - let mut data = init().await?; + #[serial] + async fn test_send_manager_processes() -> LemmyResult<()> { + let active = Arc::new(Mutex::new(vec![])); + let execute = |count, index, len, active: Arc>>| async move { + let mut data = TestData::init(count, index).await?; + data.run().await?; + assert_eq!(len, data.send_manager.workers.len()); + for k in data.send_manager.workers.keys() { + active.lock().unwrap().push(*k); + } + data.cleanup().await?; + Ok::<(), LemmyError>(()) + }; + execute(3, 1, 1, active.clone()).await?; + execute(3, 2, 1, active.clone()).await?; + execute(3, 3, 1, active.clone()).await?; + execute(3, 4, 0, active.clone()).await?; + execute(3, 6, 0, active.clone()).await?; - // start it and wait a moment - let cancel = CancellationToken::new(); - let cancel_ = cancel.clone(); - spawn(async move { - sleep(Duration::from_millis(100)).await; - cancel_.cancel(); - }); - data.send_manager.do_loop(cancel.clone()).await.unwrap(); - assert_eq!(3, data.send_manager.workers.len()); + // Should run exactly three workers + assert_eq!(3, active.lock().unwrap().len()); - // check that correct number of instance workers was started - // TODO: need to wrap in Arc or something similar - // TODO: test with different `opts`, dead/blocked instances etc - - // cleanup - Instance::delete_all(&mut data.context.pool()).await?; Ok(()) } }