From af92e0d53204a2ccd13bc0db3c58de24bff646bf Mon Sep 17 00:00:00 2001 From: phiresky Date: Tue, 4 Jul 2023 16:08:39 +0200 Subject: [PATCH] add shutdown method (#53) * add shutdown method * simplify shutdown interface * make work on rust < 1.70 * upgrade ci to rust 1.70 * make clippy and linux torvalds happy --- .woodpecker.yml | 12 ++++++------ src/activity_queue.rs | 33 +++++++++++++++++++++------------ src/config.rs | 23 +++++++++++++++++++++++ src/fetch/object_id.rs | 5 ++--- src/traits.rs | 2 +- 5 files changed, 53 insertions(+), 22 deletions(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index 8bf3e9a..b563544 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -5,14 +5,14 @@ pipeline: - /root/.cargo/bin/cargo fmt -- --check cargo_check: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: - cargo check --all-features --all-targets cargo_clippy: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: @@ -26,28 +26,28 @@ pipeline: - cargo clippy --all-features -- -D clippy::unwrap_used cargo_test: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: - cargo test --all-features --no-fail-fast cargo_doc: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: - cargo doc --all-features cargo_run_actix_example: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: - cargo run --example local_federation actix-web cargo_run_axum_example: - image: rust:1.65-bullseye + image: rust:1.70-bullseye environment: CARGO_HOME: .cargo commands: diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 2c60662..3f06d09 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -113,19 +113,11 @@ where activity_queue.queue(message).await?; let stats = activity_queue.get_stats(); let running = stats.running.load(Ordering::Relaxed); - let stats_fmt = format!( - "Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}", - stats.pending.load(Ordering::Relaxed), - running, - stats.retries.load(Ordering::Relaxed), - stats.dead_last_hour.load(Ordering::Relaxed), - stats.completed_last_hour.load(Ordering::Relaxed), - ); if running == config.worker_count && config.worker_count != 0 { warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count); - warn!(stats_fmt); + warn!("{:?}", stats); } else { - info!(stats_fmt); + info!("{:?}", stats); } } } @@ -264,7 +256,7 @@ pub(crate) struct ActivityQueue { /// This is a lock-free way to share things between tasks /// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning #[derive(Default)] -struct Stats { +pub(crate) struct Stats { pending: AtomicUsize, running: AtomicUsize, retries: AtomicUsize, @@ -272,6 +264,20 @@ struct Stats { completed_last_hour: AtomicUsize, } +impl Debug for Stats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}", + self.pending.load(Ordering::Relaxed), + self.running.load(Ordering::Relaxed), + self.retries.load(Ordering::Relaxed), + self.dead_last_hour.load(Ordering::Relaxed), + self.completed_last_hour.load(Ordering::Relaxed) + ) + } +} + #[derive(Clone, Copy, Default)] struct RetryStrategy { /// Amount of time in seconds to back off @@ -494,7 +500,10 @@ impl ActivityQueue { #[allow(unused)] // Drops all the senders and shuts down the workers - async fn shutdown(self, wait_for_retries: bool) -> Result, anyhow::Error> { + pub(crate) async fn shutdown( + self, + wait_for_retries: bool, + ) -> Result, anyhow::Error> { drop(self.sender); self.sender_task.await?; diff --git a/src/config.rs b/src/config.rs index c57b636..bd817fa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,7 @@ use crate::{ protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor}, }; +use anyhow::Context; use async_trait::async_trait; use derive_builder::Builder; use dyn_clone::{clone_trait_object, DynClone}; @@ -183,6 +184,28 @@ impl FederationConfig { pub fn domain(&self) -> &str { &self.domain } + /// Shut down this federation, waiting for the outgoing queue to be sent. + /// If the activityqueue is still in use in other requests or was never constructed, returns an error. + /// If wait_retries is true, also wait for requests that have initially failed and are being retried. + /// Returns a stats object that can be printed for debugging (structure currently not part of the public interface). + /// + /// Currently, this method does not work correctly if worker_count = 0 (unlimited) + pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result { + let q = self + .activity_queue + .take() + .context("ActivityQueue never constructed, build() not called?")?; + // Todo: use Arc::into_inner but is only part of rust 1.70. + let stats = Arc::::try_unwrap(q) + .map_err(|_| { + anyhow::anyhow!( + "Could not cleanly shut down: activityqueue arc was still in use elsewhere " + ) + })? + .shutdown(wait_retries) + .await?; + Ok(stats) + } } impl FederationConfigBuilder { diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index b210f11..08c1d07 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -11,7 +11,7 @@ use url::Url; impl FromStr for ObjectId where - T: Object + Send + 'static, + T: Object + Send + Debug + 'static, for<'de2> ::Kind: Deserialize<'de2>, { type Err = url::ParseError; @@ -62,7 +62,7 @@ where impl ObjectId where - Kind: Object + Send + 'static, + Kind: Object + Send + Debug + 'static, for<'de2> ::Kind: serde::Deserialize<'de2>, { /// Construct a new objectid instance @@ -93,7 +93,6 @@ where ::Error: From + From, { let db_object = self.dereference_from_db(data).await?; - // if its a local object, only fetch it from the database and not over http if data.config.is_local_url(&self.0) { return match db_object { diff --git a/src/traits.rs b/src/traits.rs index 876bdbf..c6f8741 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -93,7 +93,7 @@ use url::Url; /// /// } #[async_trait] -pub trait Object: Sized { +pub trait Object: Sized + Debug { /// App data type passed to handlers. Must be identical to /// [crate::config::FederationConfigBuilder::app_data] type. type DataType: Clone + Send + Sync;