Extra logging to debug duplicate activities (ref #4609) (#4726)

* Extra logging to debug duplicate activities (ref #4609)

* Fix logging for api tests

* fmt
This commit is contained in:
Nutomic 2024-05-21 20:47:06 +02:00 committed by GitHub
parent 4ffaa93431
commit 6b46a70535
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 17 additions and 13 deletions

View file

@ -3,13 +3,13 @@
# it is expected that this script is called by run-federation-test.sh script. # it is expected that this script is called by run-federation-test.sh script.
set -e set -e
if [ -n "$LEMMY_LOG_LEVEL" ]; if [ -z "$LEMMY_LOG_LEVEL" ];
then then
LEMMY_LOG_LEVEL=warn LEMMY_LOG_LEVEL=info
fi fi
export RUST_BACKTRACE=1 export RUST_BACKTRACE=1
#export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL" export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL"
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min

View file

@ -13,6 +13,7 @@ use tokio::{
time::sleep, time::sleep,
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::info;
mod util; mod util;
mod worker; mod worker;
@ -44,6 +45,10 @@ async fn start_stop_federation_workers(
let pool2 = &mut DbPool::Pool(&pool); let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1; let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?; let local_domain = federation_config.settings().get_hostname_without_port()?;
info!(
"Starting federation workers for process count {} and index {}",
opts.process_count, process_index
);
loop { loop {
let mut total_count = 0; let mut total_count = 0;
let mut dead_count = 0; let mut dead_count = 0;

View file

@ -34,6 +34,7 @@ use std::{
}; };
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
@ -105,6 +106,7 @@ impl InstanceWorker {
&mut self, &mut self,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
debug!("Starting federation worker for {}", self.instance.domain);
let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
self.update_communities(pool).await?; self.update_communities(pool).await?;
@ -176,15 +178,14 @@ impl InstanceWorker {
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
else { else {
tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); debug!("{}: {:?} does not exist", self.instance.domain, id);
self.state.last_successful_id = Some(id); self.state.last_successful_id = Some(id);
continue; continue;
}; };
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
tracing::warn!( warn!(
"sending {} errored internally, skipping activity: {:?}", "sending {} errored internally, skipping activity: {:?}",
ele.0.ap_id, ele.0.ap_id, e
e
); );
} }
if self.stop.is_cancelled() { if self.stop.is_cancelled() {
@ -211,7 +212,7 @@ impl InstanceWorker {
.await .await
.context("failed figuring out inbox urls")?; .context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() { if inbox_urls.is_empty() {
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
self.state.last_successful_id = Some(activity.id); self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published); self.state.last_successful_published_time = Some(activity.published);
return Ok(()); return Ok(());
@ -229,16 +230,14 @@ impl InstanceWorker {
SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?;
for task in requests { for task in requests {
// usually only one due to shared inbox // usually only one due to shared inbox
tracing::debug!("sending out {}", task); trace!("sending out {}", task);
while let Err(e) = task.sign_and_send(&self.context).await { while let Err(e) = task.sign_and_send(&self.context).await {
self.state.fail_count += 1; self.state.fail_count += 1;
self.state.last_retry = Some(Utc::now()); self.state.last_retry = Some(Utc::now());
let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
tracing::info!( info!(
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
self.instance.domain, self.instance.domain, activity.id, self.state.fail_count
activity.id,
self.state.fail_count
); );
self.save_and_send_state(pool).await?; self.save_and_send_state(pool).await?;
tokio::select! { tokio::select! {