From 13ff059f833609ad1c750908d789488c9daa87c1 Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 29 May 2024 15:11:29 +0200 Subject: [PATCH] fix both permanent stopping of federation queues and multiple creation of the same federation queues --- crates/federate/src/lib.rs | 48 ++++++++++++++++++++++++++++--------- crates/federate/src/util.rs | 21 +++++++++------- src/lib.rs | 5 ++-- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index d3876226f..7cae2ebe9 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -43,7 +43,7 @@ pub struct SendManager { } impl SendManager { - pub fn new(opts: Opts, context: FederationConfig) -> Self { + fn new(opts: Opts, context: FederationConfig) -> Self { assert!(opts.process_count > 0); assert!(opts.process_index > 0); assert!(opts.process_index <= opts.process_count); @@ -61,11 +61,27 @@ impl SendManager { } } - pub fn run(mut self) -> CancellableTask { - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { - self.do_loop(cancel).await?; - self.cancel().await?; - Ok(()) + pub fn run(opts: Opts, context: FederationConfig) -> CancellableTask { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| { + let opts = opts.clone(); + let context = context.clone(); + let mut manager = Self::new(opts, context); + async move { + let result = manager.do_loop(cancel).await; + // the loop function will only return if there is (a) an internal error (e.g. db connection + // failure) or (b) it was cancelled from outside. + if let Err(e) = result { + // don't let this error bubble up, just log it, so the below cancel function will run + // regardless + tracing::error!("SendManager failed: {e}"); + } + // cancel all the dependent workers as well to ensure they don't get orphaned and keep + // running. + manager.cancel().await?; + LemmyResult::Ok(()) + // if the task was not intentionally cancelled, then this whole lambda will be run again by + // CancellableTask after this + } }) } @@ -104,14 +120,24 @@ impl SendManager { continue; } // create new worker - let instance = instance.clone(); - let req_data = self.context.to_request_data(); + let context = self.context.clone(); let stats_sender = self.stats_sender.clone(); self.workers.insert( instance.id, - CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { - InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?; - Ok(()) + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| { + // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask. + let instance = instance.clone(); + let req_data = context.to_request_data(); + let stats_sender = stats_sender.clone(); + async move { + InstanceWorker::init_and_loop( + instance, + req_data, + stop, + stats_sender, + ) + .await + } }), ); } else if !should_federate { diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 02a90dee9..08186f436 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -17,7 +17,6 @@ use lemmy_db_schema::{ traits::ApubActor, utils::{get_conn, DbPool}, }; -use lemmy_utils::error::LemmyResult; use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; @@ -25,7 +24,6 @@ use serde_json::Value; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; -use tracing::error; /// Decrease the delays of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the @@ -61,24 +59,31 @@ impl CancellableTask { /// spawn a task but with graceful shutdown pub fn spawn( timeout: Duration, - task: impl FnOnce(CancellationToken) -> F + Send + 'static, + task: impl Fn(CancellationToken) -> F + Send + 'static, ) -> CancellableTask where - F: Future> + Send + 'static, + F: Future + Send + 'static, R: Send + 'static, { let stop = CancellationToken::new(); let stop2 = stop.clone(); - let task: JoinHandle> = tokio::spawn(task(stop2)); + let task: JoinHandle<()> = tokio::spawn(async move { + loop { + let res = task(stop2.clone()).await; + if stop2.is_cancelled() { + return; + } else { + tracing::warn!("task exited, restarting: {res:?}"); + } + } + }); let abort = task.abort_handle(); CancellableTask { f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { - if let Err(ref e) = r? { - error!("CancellableTask threw error: {e}"); - } + r.context("CancellableTask failed to cancel cleanly, returned error")?; Ok(()) }, _ = sleep(timeout) => { diff --git a/src/lib.rs b/src/lib.rs index c2b5e57c2..26740a444 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,14 +210,13 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { None }; let federate = (!args.disable_activity_sending).then(|| { - let task = SendManager::new( + SendManager::run( Opts { process_index: args.federate_process_index, process_count: args.federate_process_count, }, federation_config, - ); - task.run() + ) }); let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;