This commit is contained in:
Felix Ableitner 2024-05-23 16:57:35 +02:00
parent 8baa5ee2f3
commit 5caf532e5c
3 changed files with 8 additions and 4 deletions

View file

@ -138,7 +138,8 @@ impl SendManager {
self.workers.len(), self.workers.len(),
WORKER_EXIT_TIMEOUT WORKER_EXIT_TIMEOUT
); );
// the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered
// concurrently
futures::future::join_all( futures::future::join_all(
self self
.workers .workers

View file

@ -9,7 +9,8 @@ use std::{collections::HashMap, time::Duration};
use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) /// every 60s, print the state for every instance. exits if the receiver is done (all senders
/// dropped)
pub(crate) async fn receive_print_stats( pub(crate) async fn receive_print_stats(
pool: ActualDbPool, pool: ActualDbPool,
mut receiver: UnboundedReceiver<(String, FederationQueueState)>, mut receiver: UnboundedReceiver<(String, FederationQueueState)>,
@ -39,7 +40,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
error!("could not get last id"); error!("could not get last id");
return; return;
}; };
// it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be
// considered up to date
info!("Federation state as of {}:", Local::now().to_rfc3339()); info!("Federation state as of {}:", Local::now().to_rfc3339());
// todo: more stats (act/sec, avg http req duration) // todo: more stats (act/sec, avg http req duration)
let mut ok_count = 0; let mut ok_count = 0;

View file

@ -107,7 +107,8 @@ impl InstanceWorker {
worker.loop_until_stopped().await worker.loop_until_stopped().await
} }
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit)
pub(crate) async fn loop_until_stopped(&mut self) -> Result<(), anyhow::Error> { pub(crate) async fn loop_until_stopped(&mut self) -> Result<(), anyhow::Error> {
debug!("Starting federation worker for {}", self.instance.domain); debug!("Starting federation worker for {}", self.instance.domain);
let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");