processes test

This commit is contained in:
Felix Ableitner 2024-05-23 12:30:04 +02:00
parent a56c3de124
commit 0096976b24
4 changed files with 91 additions and 41 deletions

1
Cargo.lock generated
View file

@ -2918,6 +2918,7 @@ dependencies = [
"once_cell",
"reqwest",
"serde_json",
"serial_test",
"tokio",
"tokio-util",
"tracing",

View file

@ -131,7 +131,7 @@ pub struct CommentReplyId(i32);
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The instance id.
pub struct InstanceId(i32);
pub struct InstanceId(pub i32);
#[derive(
Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord,

View file

@ -34,3 +34,6 @@ tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
moka.workspace = true
tokio-util = "0.7.10"
[dev-dependencies]
serial_test = { workspace = true }

View file

@ -44,6 +44,9 @@ pub struct SendManager {
impl SendManager {
pub fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
assert!(opts.process_count > 0);
assert!(opts.process_index > 0);
let (stats_sender, stats_receiver) = unbounded_channel();
Self {
opts,
@ -153,59 +156,102 @@ mod test {
use super::*;
use activitypub_federation::config::Data;
use tokio::{spawn, time::sleep};
use lemmy_utils::error::LemmyError;
use serial_test::serial;
use std::sync::{Arc, Mutex};
use tokio::{spawn, time::sleep};
struct TestData {
send_manager: SendManager,
context: Data<LemmyContext>,
instances: Vec<Instance>
instances: Vec<Instance>,
}
async fn init() -> LemmyResult<TestData> {
let context = LemmyContext::init_test_context().await;
let opts = Opts {
process_count: 1,
process_index: 1,
};
let federation_config = FederationConfig::builder()
.domain("local.com")
.app_data(context.clone())
.build()
.await?;
impl TestData {
async fn init(process_count: i32, process_index: i32) -> LemmyResult<Self> {
let context = LemmyContext::init_test_context().await;
let opts = Opts {
process_count,
process_index,
};
let federation_config = FederationConfig::builder()
.domain("local.com")
.app_data(context.clone())
.build()
.await?;
let pool = &mut context.pool();
let instances = vec![
Instance::read_or_create(pool, "alpha.com".to_string()).await?,
Instance::read_or_create(pool, "beta.com".to_string()).await?,
Instance::read_or_create(pool, "gamma.com".to_string()).await?,
];
let instances = vec![
Instance::read_or_create(pool, "alpha.com".to_string()).await?,
Instance::read_or_create(pool, "beta.com".to_string()).await?,
Instance::read_or_create(pool, "gamma.com".to_string()).await?,
];
let send_manager = SendManager::new(opts, federation_config);
Ok(TestData {send_manager,
context,instances
})
let send_manager = SendManager::new(opts, federation_config);
Ok(Self {
send_manager,
context,
instances,
})
}
async fn run(&mut self) -> LemmyResult<()> {
// start it and cancel after workers are running
let cancel = CancellationToken::new();
let cancel_ = cancel.clone();
spawn(async move {
sleep(Duration::from_millis(100)).await;
cancel_.cancel();
});
self.send_manager.do_loop(cancel.clone()).await?;
Ok(())
}
async fn cleanup(self) -> LemmyResult<()> {
self.send_manager.cancel().await?;
Instance::delete_all(&mut self.context.pool()).await?;
Ok(())
}
}
// check that correct number of instance workers was started
// TODO: need to wrap in Arc or something similar
// TODO: test with different `opts`, dead/blocked instances etc
#[tokio::test]
#[serial]
async fn test_send_manager() -> LemmyResult<()> {
let mut data = TestData::init(1, 1).await?;
data.run().await?;
assert_eq!(3, data.send_manager.workers.len());
data.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_start_stop_federation_workers() -> LemmyResult<()> {
let mut data = init().await?;
#[serial]
async fn test_send_manager_processes() -> LemmyResult<()> {
let active = Arc::new(Mutex::new(vec![]));
let execute = |count, index, len, active: Arc<Mutex<Vec<InstanceId>>>| async move {
let mut data = TestData::init(count, index).await?;
data.run().await?;
assert_eq!(len, data.send_manager.workers.len());
for k in data.send_manager.workers.keys() {
active.lock().unwrap().push(*k);
}
data.cleanup().await?;
Ok::<(), LemmyError>(())
};
execute(3, 1, 1, active.clone()).await?;
execute(3, 2, 1, active.clone()).await?;
execute(3, 3, 1, active.clone()).await?;
execute(3, 4, 0, active.clone()).await?;
execute(3, 6, 0, active.clone()).await?;
// start it and wait a moment
let cancel = CancellationToken::new();
let cancel_ = cancel.clone();
spawn(async move {
sleep(Duration::from_millis(100)).await;
cancel_.cancel();
});
data.send_manager.do_loop(cancel.clone()).await.unwrap();
assert_eq!(3, data.send_manager.workers.len());
// Should run exactly three workers
assert_eq!(3, active.lock().unwrap().len());
// check that correct number of instance workers was started
// TODO: need to wrap in Arc or something similar
// TODO: test with different `opts`, dead/blocked instances etc
// cleanup
Instance::delete_all(&mut data.context.pool()).await?;
Ok(())
}
}