From 76393bd964b42f04bd3469571433782ad201ccd9 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 23 May 2024 11:17:17 +0200 Subject: [PATCH] move stats to own file --- crates/federate/src/lib.rs | 71 +++++--------------------------------- 1 file changed, 8 insertions(+), 63 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index c9adbd09b..9c2fdf57d 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,25 +1,22 @@ -use crate::{ - util::{get_latest_activity_id, CancellableTask}, - worker::InstanceWorker, -}; +use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; -use chrono::{Local, Timelike}; -use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; +use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::InstanceId, source::{federation_queue_state::FederationQueueState, instance::Instance}, - utils::{ActualDbPool, DbPool}, }; use lemmy_utils::error::LemmyResult; +use stats::receive_print_stats; use std::{collections::HashMap, time::Duration}; use tokio::{ - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + sync::mpsc::{unbounded_channel, UnboundedSender}, task::JoinHandle, - time::{interval, sleep}, + time::sleep, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::info; +mod stats; mod util; mod worker; @@ -149,65 +146,13 @@ impl SendManager { } } -/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) -async fn receive_print_stats( - pool: ActualDbPool, - mut receiver: UnboundedReceiver<(String, FederationQueueState)>, -) { - let pool = &mut DbPool::Pool(&pool); - let mut printerval = interval(Duration::from_secs(60)); - let mut stats = HashMap::new(); - loop { - tokio::select! { - ele = receiver.recv() => { - let Some((domain, ele)) = ele else { - print_stats(pool, &stats).await; - return; - }; - stats.insert(domain, ele); - }, - _ = printerval.tick() => { - print_stats(pool, &stats).await; - } - } - } -} - -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { - let last_id = get_latest_activity_id(pool).await; - let Ok(last_id) = last_id else { - error!("could not get last id"); - return; - }; - // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date - info!("Federation state as of {}:", Local::now().to_rfc3339()); - // todo: more stats (act/sec, avg http req duration) - let mut ok_count = 0; - let mut behind_count = 0; - for (domain, stat) in stats { - let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); - if stat.fail_count > 0 { - info!( - "{domain}: Warning. {behind} behind, {} consecutive fails, current retry delay {:.2?}", - stat.fail_count, - federate_retry_sleep_duration(stat.fail_count) - ); - } else if behind > 0 { - debug!("{}: Ok. {} activities behind", domain, behind); - behind_count += 1; - } else { - ok_count += 1; - } - } - info!("{ok_count} others up to date. {behind_count} instances behind."); -} - #[cfg(test)] #[allow(clippy::unwrap_used)] #[allow(clippy::indexing_slicing)] mod test { use super::*; + use tokio::time::sleep; #[tokio::test] async fn test_start_stop_federation_workers() -> LemmyResult<()> {