From b9b65c9c184b360b318c785f0775b351debc78bf Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 6 Nov 2023 22:07:04 +0100 Subject: [PATCH] add federation queue state to get_federated_instances api (#4104) * add federation queue state to get_federated_instances api * feature gate * move retry sleep function * move stuff around --- crates/api_common/Cargo.toml | 3 +- crates/api_common/src/lib.rs | 6 ++ crates/api_common/src/request.rs | 2 +- crates/api_common/src/site.rs | 47 ++++++++++++-- crates/api_common/src/utils.rs | 29 ++++++--- crates/db_schema/src/impls/activity.rs | 4 +- .../src/impls/federation_queue_state.rs | 46 ++++++++++++++ crates/db_schema/src/impls/instance.rs | 38 ++++++++--- crates/db_schema/src/impls/mod.rs | 1 + crates/db_schema/src/newtypes.rs | 7 +++ crates/db_schema/src/schema.rs | 5 +- crates/db_schema/src/source/activity.rs | 4 +- .../src/source/federation_queue_state.rs | 26 ++++++++ crates/db_schema/src/source/mod.rs | 1 + crates/federate/src/federation_queue_state.rs | 63 ------------------- crates/federate/src/lib.rs | 19 +++--- crates/federate/src/util.rs | 11 +--- crates/federate/src/worker.rs | 61 ++++++++++-------- .../down.sql | 5 ++ .../up.sql | 5 ++ 20 files changed, 244 insertions(+), 139 deletions(-) create mode 100644 crates/db_schema/src/impls/federation_queue_state.rs create mode 100644 crates/db_schema/src/source/federation_queue_state.rs delete mode 100644 crates/federate/src/federation_queue_state.rs create mode 100644 migrations/2023-11-01-223740_federation-published/down.sql create mode 100644 migrations/2023-11-01-223740_federation-published/up.sql diff --git a/crates/api_common/Cargo.toml b/crates/api_common/Cargo.toml index 26b32c80b..7894ffa57 100644 --- a/crates/api_common/Cargo.toml +++ b/crates/api_common/Cargo.toml @@ -17,7 +17,6 @@ doctest = false full = [ "tracing", "rosetta-i18n", - "chrono", "lemmy_utils", "lemmy_db_views/full", "lemmy_db_views_actor/full", @@ -47,7 +46,7 @@ activitypub_federation = { workspace = true, optional = true } serde = { workspace = true } serde_with = { workspace = true } url = { workspace = true } -chrono = { workspace = true, optional = true } +chrono = { workspace = true } tracing = { workspace = true, optional = true } reqwest-middleware = { workspace = true, optional = true } regex = { workspace = true } diff --git a/crates/api_common/src/lib.rs b/crates/api_common/src/lib.rs index 6f7da52ee..a82b9327e 100644 --- a/crates/api_common/src/lib.rs +++ b/crates/api_common/src/lib.rs @@ -25,6 +25,7 @@ pub extern crate lemmy_db_views_actor; pub extern crate lemmy_db_views_moderator; use serde::{Deserialize, Serialize}; +use std::time::Duration; #[derive(Debug, Serialize, Deserialize, Clone)] #[cfg_attr(feature = "full", derive(ts_rs::TS))] @@ -39,3 +40,8 @@ impl Default for SuccessResponse { SuccessResponse { success: true } } } + +/// how long to sleep based on how many retries have already happened +pub fn federate_retry_sleep_duration(retry_count: i32) -> Duration { + Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) +} diff --git a/crates/api_common/src/request.rs b/crates/api_common/src/request.rs index 0064c8045..11ef4002e 100644 --- a/crates/api_common/src/request.rs +++ b/crates/api_common/src/request.rs @@ -297,7 +297,7 @@ pub fn client_builder(settings: &Settings) -> ClientBuilder { ); Client::builder() - .user_agent(user_agent.clone()) + .user_agent(user_agent) .timeout(REQWEST_TIMEOUT) .connect_timeout(REQWEST_TIMEOUT) } diff --git a/crates/api_common/src/site.rs b/crates/api_common/src/site.rs index d40729e35..c13b96f30 100644 --- a/crates/api_common/src/site.rs +++ b/crates/api_common/src/site.rs @@ -1,6 +1,13 @@ +use crate::federate_retry_sleep_duration; +use chrono::{DateTime, Utc}; use lemmy_db_schema::{ newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId}, - source::{instance::Instance, language::Language, tagline::Tagline}, + source::{ + federation_queue_state::FederationQueueState, + instance::Instance, + language::Language, + tagline::Tagline, + }, ListingType, ModlogActionType, RegistrationMode, @@ -316,9 +323,41 @@ pub struct MyUserInfo { #[cfg_attr(feature = "full", ts(export))] /// A list of federated instances. pub struct FederatedInstances { - pub linked: Vec, - pub allowed: Vec, - pub blocked: Vec, + pub linked: Vec, + pub allowed: Vec, + pub blocked: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[cfg_attr(feature = "full", derive(TS))] +#[cfg_attr(feature = "full", ts(export))] +pub struct ReadableFederationState { + #[serde(flatten)] + internal_state: FederationQueueState, + /// timestamp of the next retry attempt (null if fail count is 0) + next_retry: Option>, +} + +impl From for ReadableFederationState { + fn from(internal_state: FederationQueueState) -> Self { + ReadableFederationState { + next_retry: internal_state.last_retry.map(|r| { + r + chrono::Duration::from_std(federate_retry_sleep_duration(internal_state.fail_count)) + .expect("sleep duration longer than 2**63 ms (262 million years)") + }), + internal_state, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[cfg_attr(feature = "full", derive(TS))] +#[cfg_attr(feature = "full", ts(export))] +pub struct InstanceWithFederationState { + #[serde(flatten)] + pub instance: Instance, + /// if federation to this instance is or was active, show state of outgoing federation to this instance + pub federation_state: Option, } #[skip_serializing_none] diff --git a/crates/api_common/src/utils.rs b/crates/api_common/src/utils.rs index 5060f2983..670271be7 100644 --- a/crates/api_common/src/utils.rs +++ b/crates/api_common/src/utils.rs @@ -2,7 +2,7 @@ use crate::{ context::LemmyContext, request::purge_image_from_pictrs, sensitive::Sensitive, - site::FederatedInstances, + site::{FederatedInstances, InstanceWithFederationState}, }; use actix_web::cookie::{Cookie, SameSite}; use anyhow::Context; @@ -275,12 +275,27 @@ pub async fn build_federated_instances( pool: &mut DbPool<'_>, ) -> Result, LemmyError> { if local_site.federation_enabled { - // TODO I hate that this requires 3 queries - let (linked, allowed, blocked) = lemmy_db_schema::try_join_with_pool!(pool => ( - Instance::linked, - Instance::allowlist, - Instance::blocklist - ))?; + let mut linked = Vec::new(); + let mut allowed = Vec::new(); + let mut blocked = Vec::new(); + + let all = Instance::read_all_with_fed_state(pool).await?; + for (instance, federation_state, is_blocked, is_allowed) in all { + let i = InstanceWithFederationState { + instance, + federation_state: federation_state.map(std::convert::Into::into), + }; + if is_blocked { + // blocked instances will only have an entry here if they had been federated with in the past. + blocked.push(i); + } else if is_allowed { + allowed.push(i.clone()); + linked.push(i); + } else { + // not explicitly allowed but implicitly linked + linked.push(i); + } + } Ok(Some(FederatedInstances { linked, diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 16b0fca40..9b781d597 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -1,6 +1,6 @@ use crate::{ diesel::OptionalExtension, - newtypes::DbUrl, + newtypes::{ActivityId, DbUrl}, source::activity::{ReceivedActivity, SentActivity, SentActivityForm}, utils::{get_conn, DbPool}, }; @@ -30,7 +30,7 @@ impl SentActivity { .first::(conn) .await } - pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result { + pub async fn read(pool: &mut DbPool<'_>, object_id: ActivityId) -> Result { use crate::schema::sent_activity::dsl::sent_activity; let conn = &mut get_conn(pool).await?; sent_activity.find(object_id).first::(conn).await diff --git a/crates/db_schema/src/impls/federation_queue_state.rs b/crates/db_schema/src/impls/federation_queue_state.rs new file mode 100644 index 000000000..8c49dc568 --- /dev/null +++ b/crates/db_schema/src/impls/federation_queue_state.rs @@ -0,0 +1,46 @@ +use crate::{ + newtypes::InstanceId, + source::federation_queue_state::FederationQueueState, + utils::{get_conn, DbPool}, +}; +use diesel::{prelude::*, result::Error}; +use diesel_async::RunQueryDsl; + +impl FederationQueueState { + /// load state or return a default empty value + pub async fn load( + pool: &mut DbPool<'_>, + instance_id_: InstanceId, + ) -> Result { + use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id}; + let conn = &mut get_conn(pool).await?; + Ok( + federation_queue_state + .filter(instance_id.eq(&instance_id_)) + .select(FederationQueueState::as_select()) + .get_result(conn) + .await + .optional()? + .unwrap_or(FederationQueueState { + instance_id: instance_id_, + fail_count: 0, + last_retry: None, + last_successful_id: None, // this value is set to the most current id for new instances + last_successful_published_time: None, + }), + ) + } + pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<(), Error> { + use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id}; + let conn = &mut get_conn(pool).await?; + + state + .insert_into(federation_queue_state) + .on_conflict(instance_id) + .do_update() + .set(state) + .execute(conn) + .await?; + Ok(()) + } +} diff --git a/crates/db_schema/src/impls/instance.rs b/crates/db_schema/src/impls/instance.rs index 7e162717c..b94e628d6 100644 --- a/crates/db_schema/src/impls/instance.rs +++ b/crates/db_schema/src/impls/instance.rs @@ -1,8 +1,18 @@ use crate::{ diesel::dsl::IntervalDsl, newtypes::InstanceId, - schema::{federation_allowlist, federation_blocklist, instance, local_site, site}, - source::instance::{Instance, InstanceForm}, + schema::{ + federation_allowlist, + federation_blocklist, + federation_queue_state, + instance, + local_site, + site, + }, + source::{ + federation_queue_state::FederationQueueState, + instance::{Instance, InstanceForm}, + }, utils::{functions::lower, get_conn, naive_now, now, DbPool}, }; use diesel::{ @@ -59,7 +69,7 @@ impl Instance { pub async fn read_all(pool: &mut DbPool<'_>) -> Result, Error> { let conn = &mut get_conn(pool).await?; instance::table - .select(instance::all_columns) + .select(Self::as_select()) .get_results(conn) .await } @@ -73,7 +83,7 @@ impl Instance { let conn = &mut get_conn(pool).await?; instance::table .inner_join(federation_allowlist::table) - .select(instance::all_columns) + .select(Self::as_select()) .get_results(conn) .await } @@ -82,14 +92,14 @@ impl Instance { let conn = &mut get_conn(pool).await?; instance::table .inner_join(federation_blocklist::table) - .select(instance::all_columns) + .select(Self::as_select()) .get_results(conn) .await } /// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not /// ordered by id - pub async fn read_all_with_blocked_and_dead( + pub async fn read_federated_with_blocked_and_dead( pool: &mut DbPool<'_>, ) -> Result, Error> { let conn = &mut get_conn(pool).await?; @@ -125,16 +135,24 @@ impl Instance { } } - pub async fn linked(pool: &mut DbPool<'_>) -> Result, Error> { + /// returns (instance, blocked, allowed, fed queue state) tuples + pub async fn read_all_with_fed_state( + pool: &mut DbPool<'_>, + ) -> Result, bool, bool)>, Error> { let conn = &mut get_conn(pool).await?; instance::table // omit instance representing the local site .left_join(site::table.inner_join(local_site::table)) .filter(local_site::id.is_null()) - // omit instances in the blocklist .left_join(federation_blocklist::table) - .filter(federation_blocklist::id.is_null()) - .select(instance::all_columns) + .left_join(federation_allowlist::table) + .left_join(federation_queue_state::table) + .select(( + Self::as_select(), + Option::::as_select(), + federation_blocklist::id.nullable().is_not_null(), + federation_allowlist::id.nullable().is_not_null(), + )) .get_results(conn) .await } diff --git a/crates/db_schema/src/impls/mod.rs b/crates/db_schema/src/impls/mod.rs index 3cf0f1066..cf75750ca 100644 --- a/crates/db_schema/src/impls/mod.rs +++ b/crates/db_schema/src/impls/mod.rs @@ -10,6 +10,7 @@ pub mod custom_emoji; pub mod email_verification; pub mod federation_allowlist; pub mod federation_blocklist; +pub mod federation_queue_state; pub mod image_upload; pub mod instance; pub mod instance_block; diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index 555b98256..c59577670 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -149,6 +149,13 @@ pub struct ImageUploadId(i32); /// The instance id. pub struct InstanceId(i32); +#[derive( + Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord, +)] +#[cfg_attr(feature = "full", derive(DieselNewType, TS))] +#[cfg_attr(feature = "full", ts(export))] +pub struct ActivityId(pub i64); + #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "full", derive(DieselNewType, TS))] #[cfg_attr(feature = "full", ts(export))] diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 0d9e9401a..71c3b0b20 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -307,9 +307,10 @@ diesel::table! { federation_queue_state (id) { id -> Int4, instance_id -> Int4, - last_successful_id -> Int8, + last_successful_id -> Nullable, fail_count -> Int4, - last_retry -> Timestamptz, + last_retry -> Nullable, + last_successful_published_time -> Nullable, } } diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index fc4bb0ec5..6c297e886 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -1,5 +1,5 @@ use crate::{ - newtypes::{CommunityId, DbUrl}, + newtypes::{ActivityId, CommunityId, DbUrl}, schema::sent_activity, }; use chrono::{DateTime, Utc}; @@ -54,7 +54,7 @@ impl ActivitySendTargets { #[derive(PartialEq, Eq, Debug, Queryable)] #[diesel(table_name = sent_activity)] pub struct SentActivity { - pub id: i64, + pub id: ActivityId, pub ap_id: DbUrl, pub data: Value, pub sensitive: bool, diff --git a/crates/db_schema/src/source/federation_queue_state.rs b/crates/db_schema/src/source/federation_queue_state.rs new file mode 100644 index 000000000..31bee8819 --- /dev/null +++ b/crates/db_schema/src/source/federation_queue_state.rs @@ -0,0 +1,26 @@ +use crate::newtypes::{ActivityId, InstanceId}; +use chrono::{DateTime, Utc}; +#[cfg(feature = "full")] +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +#[cfg(feature = "full")] +use ts_rs::TS; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[cfg_attr( + feature = "full", + derive(Queryable, Selectable, Insertable, AsChangeset) +)] +#[cfg_attr(feature = "full", derive(TS))] +#[cfg_attr(feature = "full", diesel(table_name = crate::schema::federation_queue_state))] +#[cfg_attr(feature = "full", diesel(check_for_backend(diesel::pg::Pg)))] +pub struct FederationQueueState { + pub instance_id: InstanceId, + /// the last successfully sent activity id + pub last_successful_id: Option, + pub last_successful_published_time: Option>, + /// how many failed attempts have been made to send the next activity + pub fail_count: i32, + /// timestamp of the last retry attempt (when the last failing activity was resent) + pub last_retry: Option>, +} diff --git a/crates/db_schema/src/source/mod.rs b/crates/db_schema/src/source/mod.rs index 9879ef35f..814318848 100644 --- a/crates/db_schema/src/source/mod.rs +++ b/crates/db_schema/src/source/mod.rs @@ -15,6 +15,7 @@ pub mod custom_emoji_keyword; pub mod email_verification; pub mod federation_allowlist; pub mod federation_blocklist; +pub mod federation_queue_state; pub mod image_upload; pub mod instance; pub mod instance_block; diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs deleted file mode 100644 index 8a3506121..000000000 --- a/crates/federate/src/federation_queue_state.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::util::ActivityId; -use anyhow::Result; -use chrono::{DateTime, TimeZone, Utc}; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; -use lemmy_db_schema::{ - newtypes::InstanceId, - utils::{get_conn, DbPool}, -}; - -#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)] -#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)] -#[diesel(check_for_backend(diesel::pg::Pg))] -pub struct FederationQueueState { - pub instance_id: InstanceId, - pub last_successful_id: ActivityId, // todo: i64 - pub fail_count: i32, - pub last_retry: DateTime, -} - -impl FederationQueueState { - /// load state or return a default empty value - pub async fn load( - pool: &mut DbPool<'_>, - instance_id_: InstanceId, - ) -> Result { - use lemmy_db_schema::schema::federation_queue_state::dsl::{ - federation_queue_state, - instance_id, - }; - let conn = &mut get_conn(pool).await?; - Ok( - federation_queue_state - .filter(instance_id.eq(&instance_id_)) - .select(FederationQueueState::as_select()) - .get_result(conn) - .await - .optional()? - .unwrap_or(FederationQueueState { - instance_id: instance_id_, - fail_count: 0, - last_retry: Utc.timestamp_nanos(0), - last_successful_id: -1, // this value is set to the most current id for new instances - }), - ) - } - pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> { - use lemmy_db_schema::schema::federation_queue_state::dsl::{ - federation_queue_state, - instance_id, - }; - let conn = &mut get_conn(pool).await?; - - state - .insert_into(federation_queue_state) - .on_conflict(instance_id) - .do_update() - .set(state) - .execute(conn) - .await?; - Ok(()) - } -} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index dad7daad6..b6c820aaa 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,14 +1,10 @@ -use crate::{ - util::{retry_sleep_duration, CancellableTask}, - worker::InstanceWorker, -}; +use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use chrono::{Local, Timelike}; -use federation_queue_state::FederationQueueState; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_db_schema::{ newtypes::InstanceId, - source::instance::Instance, + source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; use std::{collections::HashMap, time::Duration}; @@ -18,7 +14,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -mod federation_queue_state; mod util; mod worker; @@ -52,7 +47,9 @@ async fn start_stop_federation_workers( let mut total_count = 0; let mut dead_count = 0; let mut disallowed_count = 0; - for (instance, allowed, is_dead) in Instance::read_all_with_blocked_and_dead(pool2).await? { + for (instance, allowed, is_dead) in + Instance::read_federated_with_blocked_and_dead(pool2).await? + { if instance.domain == local_domain { continue; } @@ -188,14 +185,14 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap 0 { tracing::info!( "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", domain, behind, stat.fail_count, - retry_sleep_duration(stat.fail_count) + federate_retry_sleep_duration(stat.fail_count) ); } else if behind > 0 { tracing::info!("{}: Ok. {} behind", domain, behind); diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index f744d45f4..1775e4153 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -6,6 +6,7 @@ use lemmy_apub::{ fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity}, }; use lemmy_db_schema::{ + newtypes::ActivityId, source::{ activity::{ActorType, SentActivity}, community::Community, @@ -141,9 +142,6 @@ pub(crate) async fn get_actor_cached( .map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}")) } -/// this should maybe be a newtype like all the other PersonId CommunityId etc. -pub(crate) type ActivityId = i64; - type CachedActivityInfo = Option>; /// activities are immutable so cache does not need to have TTL /// May return None if the corresponding id does not exist or is a received activity. @@ -192,14 +190,9 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result = sent_activity.select(max(id)).get_result(conn).await?; - let latest_id = seq.unwrap_or(0); + let latest_id = seq.unwrap_or(ActivityId(0)); anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) }) .await .map_err(|e| anyhow::anyhow!("err getting id: {e:?}")) } - -/// how long to sleep based on how many retries have already happened -pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration { - Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) -} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index 3eda2e746..8e7726fee 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -1,22 +1,23 @@ -use crate::{ - federation_queue_state::FederationQueueState, - util::{ - get_activity_cached, - get_actor_cached, - get_latest_activity_id, - retry_sleep_duration, - LEMMY_TEST_FAST_FEDERATION, - WORK_FINISHED_RECHECK_DELAY, - }, +use crate::util::{ + get_activity_cached, + get_actor_cached, + get_latest_activity_id, + LEMMY_TEST_FAST_FEDERATION, + WORK_FINISHED_RECHECK_DELAY, }; use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use anyhow::{Context, Result}; use chrono::{DateTime, TimeZone, Utc}; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration}; use lemmy_apub::activity_lists::SharedInboxActivities; use lemmy_db_schema::{ - newtypes::{CommunityId, InstanceId}, - source::{activity::SentActivity, instance::Instance, site::Site}, + newtypes::{ActivityId, CommunityId, InstanceId}, + source::{ + activity::SentActivity, + federation_queue_state::FederationQueueState, + instance::Instance, + site::Site, + }, utils::DbPool, }; use lemmy_db_views_actor::structs::CommunityFollowerView; @@ -122,8 +123,12 @@ impl InstanceWorker { async fn initial_fail_sleep(&mut self) -> Result<()> { // before starting queue, sleep remaining duration if last request failed if self.state.fail_count > 0 { - let elapsed = (Utc::now() - self.state.last_retry).to_std()?; - let required = retry_sleep_duration(self.state.fail_count); + let last_retry = self + .state + .last_retry + .context("impossible: if fail count set last retry also set")?; + let elapsed = (Utc::now() - last_retry).to_std()?; + let required = federate_retry_sleep_duration(self.state.fail_count); if elapsed >= required { return Ok(()); } @@ -138,14 +143,16 @@ impl InstanceWorker { /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { let latest_id = get_latest_activity_id(pool).await?; - if self.state.last_successful_id == -1 { + let mut id = if let Some(id) = self.state.last_successful_id { + id + } else { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: - self.state.last_successful_id = latest_id; + self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened self.save_and_send_state(pool).await?; - } - let mut id = self.state.last_successful_id; + latest_id + }; if id == latest_id { // no more work to be done, wait before rechecking tokio::select! { @@ -159,13 +166,13 @@ impl InstanceWorker { && processed_activities < CHECK_SAVE_STATE_EVERY_IT && !self.stop.is_cancelled() { - id += 1; + id = ActivityId(id.0 + 1); processed_activities += 1; let Some(ele) = get_activity_cached(pool, id) .await .context("failed reading activity from db")? else { - self.state.last_successful_id = id; + self.state.last_successful_id = Some(id); continue; }; if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { @@ -179,7 +186,8 @@ impl InstanceWorker { return Ok(()); } // send success! - self.state.last_successful_id = id; + self.state.last_successful_id = Some(id); + self.state.last_successful_published_time = Some(ele.0.published); self.state.fail_count = 0; } Ok(()) @@ -198,7 +206,8 @@ impl InstanceWorker { .await .context("failed figuring out inbox urls")?; if inbox_urls.is_empty() { - self.state.last_successful_id = activity.id; + self.state.last_successful_id = Some(activity.id); + self.state.last_successful_published_time = Some(activity.published); return Ok(()); } let Some(actor_apub_id) = &activity.actor_apub_id else { @@ -217,10 +226,10 @@ impl InstanceWorker { tracing::info!("sending out {}", task); while let Err(e) = task.sign_and_send(&self.context).await { self.state.fail_count += 1; - self.state.last_retry = Utc::now(); - let retry_delay: Duration = retry_sleep_duration(self.state.fail_count); + self.state.last_retry = Some(Utc::now()); + let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); tracing::info!( - "{}: retrying {} attempt {} with delay {retry_delay:.2?}. ({e})", + "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", self.instance.domain, activity.id, self.state.fail_count diff --git a/migrations/2023-11-01-223740_federation-published/down.sql b/migrations/2023-11-01-223740_federation-published/down.sql new file mode 100644 index 000000000..adad9bd18 --- /dev/null +++ b/migrations/2023-11-01-223740_federation-published/down.sql @@ -0,0 +1,5 @@ +ALTER TABLE federation_queue_state + DROP COLUMN last_successful_published_time, + ALTER COLUMN last_successful_id SET NOT NULL, + ALTER COLUMN last_retry SET NOT NULL; + diff --git a/migrations/2023-11-01-223740_federation-published/up.sql b/migrations/2023-11-01-223740_federation-published/up.sql new file mode 100644 index 000000000..e36b44afa --- /dev/null +++ b/migrations/2023-11-01-223740_federation-published/up.sql @@ -0,0 +1,5 @@ +ALTER TABLE federation_queue_state + ADD COLUMN last_successful_published_time timestamptz NULL, + ALTER COLUMN last_successful_id DROP NOT NULL, + ALTER COLUMN last_retry DROP NOT NULL; +