log federation instances

This commit is contained in:
phiresky 2023-07-23 22:15:12 +00:00
parent c19211f255
commit 0d6042ec11

View file

@ -56,13 +56,25 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
let (stats_sender, stats_receiver) = unbounded_channel(); let (stats_sender, stats_receiver) = unbounded_channel();
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
let pool2 = &mut DbPool::Pool(&pool); let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1;
loop { loop {
let dead: HashSet<String> = HashSet::from_iter(Instance::dead_instances(pool2).await?); let dead: HashSet<String> = HashSet::from_iter(Instance::dead_instances(pool2).await?);
let mut total_count = 0;
let mut dead_count = 0;
let mut disallowed_count = 0;
for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() { for (instance, allowed) in Instance::read_all_with_blocked(pool2).await?.into_iter() {
if instance.id.inner() % opts.process_count != opts.process_index { if instance.id.inner() % opts.process_count != process_index {
continue; continue;
} }
let should_federate = allowed && !dead.contains(&instance.domain); total_count += 1;
if !allowed {
disallowed_count += 1;
}
let is_dead = dead.contains(&instance.domain);
if is_dead {
dead_count += 1;
}
let should_federate = allowed && !is_dead;
if !workers.contains_key(&instance.id) && should_federate { if !workers.contains_key(&instance.id) && should_federate {
let stats_sender = stats_sender.clone(); let stats_sender = stats_sender.clone();
workers.insert( workers.insert(
@ -85,6 +97,8 @@ async fn start_stop_federation_workers<T: Clone + Send + Sync + 'static>(
} }
} }
} }
let worker_count = workers.len();
tracing::info!("Federating to {worker_count}/{total_count} instances ({dead_count} dead, {disallowed_count} disallowed)");
tokio::select! { tokio::select! {
() = sleep(Duration::from_secs(60)) => {}, () = sleep(Duration::from_secs(60)) => {},
_ = cancel.cancelled() => { break; } _ = cancel.cancelled() => { break; }