diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 11a5ccf15..0e3a707e2 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -46,6 +46,7 @@ impl SendManager { pub fn new(opts: Opts, context: FederationConfig) -> Self { assert!(opts.process_count > 0); assert!(opts.process_index > 0); + assert!(opts.process_index <= opts.process_count); let (stats_sender, stats_receiver) = unbounded_channel(); Self { @@ -61,11 +62,10 @@ impl SendManager { } pub fn run(mut self) -> CancellableTask { - let cancel = CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { self.do_loop(cancel).await.unwrap(); self.cancel().await.unwrap(); - }); - cancel + }) } async fn do_loop(&mut self, cancel: CancellationToken) -> LemmyResult<()> { @@ -243,21 +243,19 @@ mod test { #[serial] async fn test_send_manager_processes() -> LemmyResult<()> { let active = Arc::new(Mutex::new(vec![])); - let execute = |count, index, len, active: Arc>>| async move { + let execute = |count, index, active: Arc>>| async move { let mut data = TestData::init(count, index).await?; data.run().await?; - assert_eq!(len, data.send_manager.workers.len()); + assert_eq!(1, data.send_manager.workers.len()); for k in data.send_manager.workers.keys() { active.lock().unwrap().push(*k); } data.cleanup().await?; Ok::<(), LemmyError>(()) }; - execute(3, 1, 1, active.clone()).await?; - execute(3, 2, 1, active.clone()).await?; - execute(3, 3, 1, active.clone()).await?; - execute(3, 4, 0, active.clone()).await?; - execute(3, 6, 0, active.clone()).await?; + execute(3, 1, active.clone()).await?; + execute(3, 2, active.clone()).await?; + execute(3, 3, active.clone()).await?; // Should run exactly three workers assert_eq!(3, active.lock().unwrap().len());