add shutdown method (#53)

* add shutdown method

* simplify shutdown interface

* make work on rust < 1.70

* upgrade ci to rust 1.70

* make clippy and linux torvalds happy
This commit is contained in:
phiresky 2023-07-04 16:08:39 +02:00 committed by GitHub
parent 68f9210d4c
commit af92e0d532
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 22 deletions

View file

@ -5,14 +5,14 @@ pipeline:
- /root/.cargo/bin/cargo fmt -- --check - /root/.cargo/bin/cargo fmt -- --check
cargo_check: cargo_check:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:
- cargo check --all-features --all-targets - cargo check --all-features --all-targets
cargo_clippy: cargo_clippy:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:
@ -26,28 +26,28 @@ pipeline:
- cargo clippy --all-features -- -D clippy::unwrap_used - cargo clippy --all-features -- -D clippy::unwrap_used
cargo_test: cargo_test:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:
- cargo test --all-features --no-fail-fast - cargo test --all-features --no-fail-fast
cargo_doc: cargo_doc:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:
- cargo doc --all-features - cargo doc --all-features
cargo_run_actix_example: cargo_run_actix_example:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:
- cargo run --example local_federation actix-web - cargo run --example local_federation actix-web
cargo_run_axum_example: cargo_run_axum_example:
image: rust:1.65-bullseye image: rust:1.70-bullseye
environment: environment:
CARGO_HOME: .cargo CARGO_HOME: .cargo
commands: commands:

View file

@ -113,19 +113,11 @@ where
activity_queue.queue(message).await?; activity_queue.queue(message).await?;
let stats = activity_queue.get_stats(); let stats = activity_queue.get_stats();
let running = stats.running.load(Ordering::Relaxed); let running = stats.running.load(Ordering::Relaxed);
let stats_fmt = format!(
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
stats.pending.load(Ordering::Relaxed),
running,
stats.retries.load(Ordering::Relaxed),
stats.dead_last_hour.load(Ordering::Relaxed),
stats.completed_last_hour.load(Ordering::Relaxed),
);
if running == config.worker_count && config.worker_count != 0 { if running == config.worker_count && config.worker_count != 0 {
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count); warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
warn!(stats_fmt); warn!("{:?}", stats);
} else { } else {
info!(stats_fmt); info!("{:?}", stats);
} }
} }
} }
@ -264,7 +256,7 @@ pub(crate) struct ActivityQueue {
/// This is a lock-free way to share things between tasks /// This is a lock-free way to share things between tasks
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning /// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
#[derive(Default)] #[derive(Default)]
struct Stats { pub(crate) struct Stats {
pending: AtomicUsize, pending: AtomicUsize,
running: AtomicUsize, running: AtomicUsize,
retries: AtomicUsize, retries: AtomicUsize,
@ -272,6 +264,20 @@ struct Stats {
completed_last_hour: AtomicUsize, completed_last_hour: AtomicUsize,
} }
impl Debug for Stats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
self.pending.load(Ordering::Relaxed),
self.running.load(Ordering::Relaxed),
self.retries.load(Ordering::Relaxed),
self.dead_last_hour.load(Ordering::Relaxed),
self.completed_last_hour.load(Ordering::Relaxed)
)
}
}
#[derive(Clone, Copy, Default)] #[derive(Clone, Copy, Default)]
struct RetryStrategy { struct RetryStrategy {
/// Amount of time in seconds to back off /// Amount of time in seconds to back off
@ -494,7 +500,10 @@ impl ActivityQueue {
#[allow(unused)] #[allow(unused)]
// Drops all the senders and shuts down the workers // Drops all the senders and shuts down the workers
async fn shutdown(self, wait_for_retries: bool) -> Result<Arc<Stats>, anyhow::Error> { pub(crate) async fn shutdown(
self,
wait_for_retries: bool,
) -> Result<Arc<Stats>, anyhow::Error> {
drop(self.sender); drop(self.sender);
self.sender_task.await?; self.sender_task.await?;

View file

@ -21,6 +21,7 @@ use crate::{
protocol::verification::verify_domains_match, protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use derive_builder::Builder; use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone}; use dyn_clone::{clone_trait_object, DynClone};
@ -183,6 +184,28 @@ impl<T: Clone> FederationConfig<T> {
pub fn domain(&self) -> &str { pub fn domain(&self) -> &str {
&self.domain &self.domain
} }
/// Shut down this federation, waiting for the outgoing queue to be sent.
/// If the activityqueue is still in use in other requests or was never constructed, returns an error.
/// If wait_retries is true, also wait for requests that have initially failed and are being retried.
/// Returns a stats object that can be printed for debugging (structure currently not part of the public interface).
///
/// Currently, this method does not work correctly if worker_count = 0 (unlimited)
pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result<impl std::fmt::Debug> {
let q = self
.activity_queue
.take()
.context("ActivityQueue never constructed, build() not called?")?;
// Todo: use Arc::into_inner but is only part of rust 1.70.
let stats = Arc::<ActivityQueue>::try_unwrap(q)
.map_err(|_| {
anyhow::anyhow!(
"Could not cleanly shut down: activityqueue arc was still in use elsewhere "
)
})?
.shutdown(wait_retries)
.await?;
Ok(stats)
}
} }
impl<T: Clone> FederationConfigBuilder<T> { impl<T: Clone> FederationConfigBuilder<T> {

View file

@ -11,7 +11,7 @@ use url::Url;
impl<T> FromStr for ObjectId<T> impl<T> FromStr for ObjectId<T>
where where
T: Object + Send + 'static, T: Object + Send + Debug + 'static,
for<'de2> <T as Object>::Kind: Deserialize<'de2>, for<'de2> <T as Object>::Kind: Deserialize<'de2>,
{ {
type Err = url::ParseError; type Err = url::ParseError;
@ -62,7 +62,7 @@ where
impl<Kind> ObjectId<Kind> impl<Kind> ObjectId<Kind>
where where
Kind: Object + Send + 'static, Kind: Object + Send + Debug + 'static,
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>, for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
{ {
/// Construct a new objectid instance /// Construct a new objectid instance
@ -93,7 +93,6 @@ where
<Kind as Object>::Error: From<Error> + From<anyhow::Error>, <Kind as Object>::Error: From<Error> + From<anyhow::Error>,
{ {
let db_object = self.dereference_from_db(data).await?; let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http // if its a local object, only fetch it from the database and not over http
if data.config.is_local_url(&self.0) { if data.config.is_local_url(&self.0) {
return match db_object { return match db_object {

View file

@ -93,7 +93,7 @@ use url::Url;
/// ///
/// } /// }
#[async_trait] #[async_trait]
pub trait Object: Sized { pub trait Object: Sized + Debug {
/// App data type passed to handlers. Must be identical to /// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type. /// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync; type DataType: Clone + Send + Sync;