add federation queue state to get_federated_instances api (#4104)

* add federation queue state to get_federated_instances api

* feature gate

* move retry sleep function

* move stuff around
This commit is contained in:
phiresky 2023-11-06 22:07:04 +01:00 committed by GitHub
parent 8c85f35b19
commit b9b65c9c18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 244 additions and 139 deletions

View file

@ -17,7 +17,6 @@ doctest = false
full = [ full = [
"tracing", "tracing",
"rosetta-i18n", "rosetta-i18n",
"chrono",
"lemmy_utils", "lemmy_utils",
"lemmy_db_views/full", "lemmy_db_views/full",
"lemmy_db_views_actor/full", "lemmy_db_views_actor/full",
@ -47,7 +46,7 @@ activitypub_federation = { workspace = true, optional = true }
serde = { workspace = true } serde = { workspace = true }
serde_with = { workspace = true } serde_with = { workspace = true }
url = { workspace = true } url = { workspace = true }
chrono = { workspace = true, optional = true } chrono = { workspace = true }
tracing = { workspace = true, optional = true } tracing = { workspace = true, optional = true }
reqwest-middleware = { workspace = true, optional = true } reqwest-middleware = { workspace = true, optional = true }
regex = { workspace = true } regex = { workspace = true }

View file

@ -25,6 +25,7 @@ pub extern crate lemmy_db_views_actor;
pub extern crate lemmy_db_views_moderator; pub extern crate lemmy_db_views_moderator;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(ts_rs::TS))] #[cfg_attr(feature = "full", derive(ts_rs::TS))]
@ -39,3 +40,8 @@ impl Default for SuccessResponse {
SuccessResponse { success: true } SuccessResponse { success: true }
} }
} }
/// how long to sleep based on how many retries have already happened
pub fn federate_retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
}

View file

@ -297,7 +297,7 @@ pub fn client_builder(settings: &Settings) -> ClientBuilder {
); );
Client::builder() Client::builder()
.user_agent(user_agent.clone()) .user_agent(user_agent)
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.connect_timeout(REQWEST_TIMEOUT) .connect_timeout(REQWEST_TIMEOUT)
} }

View file

@ -1,6 +1,13 @@
use crate::federate_retry_sleep_duration;
use chrono::{DateTime, Utc};
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId}, newtypes::{CommentId, CommunityId, InstanceId, LanguageId, PersonId, PostId},
source::{instance::Instance, language::Language, tagline::Tagline}, source::{
federation_queue_state::FederationQueueState,
instance::Instance,
language::Language,
tagline::Tagline,
},
ListingType, ListingType,
ModlogActionType, ModlogActionType,
RegistrationMode, RegistrationMode,
@ -316,9 +323,41 @@ pub struct MyUserInfo {
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]
/// A list of federated instances. /// A list of federated instances.
pub struct FederatedInstances { pub struct FederatedInstances {
pub linked: Vec<Instance>, pub linked: Vec<InstanceWithFederationState>,
pub allowed: Vec<Instance>, pub allowed: Vec<InstanceWithFederationState>,
pub blocked: Vec<Instance>, pub blocked: Vec<InstanceWithFederationState>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct ReadableFederationState {
#[serde(flatten)]
internal_state: FederationQueueState,
/// timestamp of the next retry attempt (null if fail count is 0)
next_retry: Option<DateTime<Utc>>,
}
impl From<FederationQueueState> for ReadableFederationState {
fn from(internal_state: FederationQueueState) -> Self {
ReadableFederationState {
next_retry: internal_state.last_retry.map(|r| {
r + chrono::Duration::from_std(federate_retry_sleep_duration(internal_state.fail_count))
.expect("sleep duration longer than 2**63 ms (262 million years)")
}),
internal_state,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct InstanceWithFederationState {
#[serde(flatten)]
pub instance: Instance,
/// if federation to this instance is or was active, show state of outgoing federation to this instance
pub federation_state: Option<ReadableFederationState>,
} }
#[skip_serializing_none] #[skip_serializing_none]

View file

@ -2,7 +2,7 @@ use crate::{
context::LemmyContext, context::LemmyContext,
request::purge_image_from_pictrs, request::purge_image_from_pictrs,
sensitive::Sensitive, sensitive::Sensitive,
site::FederatedInstances, site::{FederatedInstances, InstanceWithFederationState},
}; };
use actix_web::cookie::{Cookie, SameSite}; use actix_web::cookie::{Cookie, SameSite};
use anyhow::Context; use anyhow::Context;
@ -275,12 +275,27 @@ pub async fn build_federated_instances(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<Option<FederatedInstances>, LemmyError> { ) -> Result<Option<FederatedInstances>, LemmyError> {
if local_site.federation_enabled { if local_site.federation_enabled {
// TODO I hate that this requires 3 queries let mut linked = Vec::new();
let (linked, allowed, blocked) = lemmy_db_schema::try_join_with_pool!(pool => ( let mut allowed = Vec::new();
Instance::linked, let mut blocked = Vec::new();
Instance::allowlist,
Instance::blocklist let all = Instance::read_all_with_fed_state(pool).await?;
))?; for (instance, federation_state, is_blocked, is_allowed) in all {
let i = InstanceWithFederationState {
instance,
federation_state: federation_state.map(std::convert::Into::into),
};
if is_blocked {
// blocked instances will only have an entry here if they had been federated with in the past.
blocked.push(i);
} else if is_allowed {
allowed.push(i.clone());
linked.push(i);
} else {
// not explicitly allowed but implicitly linked
linked.push(i);
}
}
Ok(Some(FederatedInstances { Ok(Some(FederatedInstances {
linked, linked,

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
diesel::OptionalExtension, diesel::OptionalExtension,
newtypes::DbUrl, newtypes::{ActivityId, DbUrl},
source::activity::{ReceivedActivity, SentActivity, SentActivityForm}, source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
utils::{get_conn, DbPool}, utils::{get_conn, DbPool},
}; };
@ -30,7 +30,7 @@ impl SentActivity {
.first::<Self>(conn) .first::<Self>(conn)
.await .await
} }
pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result<Self, Error> { pub async fn read(pool: &mut DbPool<'_>, object_id: ActivityId) -> Result<Self, Error> {
use crate::schema::sent_activity::dsl::sent_activity; use crate::schema::sent_activity::dsl::sent_activity;
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
sent_activity.find(object_id).first::<Self>(conn).await sent_activity.find(object_id).first::<Self>(conn).await

View file

@ -0,0 +1,46 @@
use crate::{
newtypes::InstanceId,
source::federation_queue_state::FederationQueueState,
utils::{get_conn, DbPool},
};
use diesel::{prelude::*, result::Error};
use diesel_async::RunQueryDsl;
impl FederationQueueState {
/// load state or return a default empty value
pub async fn load(
pool: &mut DbPool<'_>,
instance_id_: InstanceId,
) -> Result<FederationQueueState, Error> {
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
let conn = &mut get_conn(pool).await?;
Ok(
federation_queue_state
.filter(instance_id.eq(&instance_id_))
.select(FederationQueueState::as_select())
.get_result(conn)
.await
.optional()?
.unwrap_or(FederationQueueState {
instance_id: instance_id_,
fail_count: 0,
last_retry: None,
last_successful_id: None, // this value is set to the most current id for new instances
last_successful_published_time: None,
}),
)
}
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<(), Error> {
use crate::schema::federation_queue_state::dsl::{federation_queue_state, instance_id};
let conn = &mut get_conn(pool).await?;
state
.insert_into(federation_queue_state)
.on_conflict(instance_id)
.do_update()
.set(state)
.execute(conn)
.await?;
Ok(())
}
}

View file

@ -1,8 +1,18 @@
use crate::{ use crate::{
diesel::dsl::IntervalDsl, diesel::dsl::IntervalDsl,
newtypes::InstanceId, newtypes::InstanceId,
schema::{federation_allowlist, federation_blocklist, instance, local_site, site}, schema::{
source::instance::{Instance, InstanceForm}, federation_allowlist,
federation_blocklist,
federation_queue_state,
instance,
local_site,
site,
},
source::{
federation_queue_state::FederationQueueState,
instance::{Instance, InstanceForm},
},
utils::{functions::lower, get_conn, naive_now, now, DbPool}, utils::{functions::lower, get_conn, naive_now, now, DbPool},
}; };
use diesel::{ use diesel::{
@ -59,7 +69,7 @@ impl Instance {
pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> { pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
instance::table instance::table
.select(instance::all_columns) .select(Self::as_select())
.get_results(conn) .get_results(conn)
.await .await
} }
@ -73,7 +83,7 @@ impl Instance {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
instance::table instance::table
.inner_join(federation_allowlist::table) .inner_join(federation_allowlist::table)
.select(instance::all_columns) .select(Self::as_select())
.get_results(conn) .get_results(conn)
.await .await
} }
@ -82,14 +92,14 @@ impl Instance {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
instance::table instance::table
.inner_join(federation_blocklist::table) .inner_join(federation_blocklist::table)
.select(instance::all_columns) .select(Self::as_select())
.get_results(conn) .get_results(conn)
.await .await
} }
/// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not /// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not
/// ordered by id /// ordered by id
pub async fn read_all_with_blocked_and_dead( pub async fn read_federated_with_blocked_and_dead(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<Vec<(Self, bool, bool)>, Error> { ) -> Result<Vec<(Self, bool, bool)>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
@ -125,16 +135,24 @@ impl Instance {
} }
} }
pub async fn linked(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> { /// returns (instance, blocked, allowed, fed queue state) tuples
pub async fn read_all_with_fed_state(
pool: &mut DbPool<'_>,
) -> Result<Vec<(Self, Option<FederationQueueState>, bool, bool)>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
instance::table instance::table
// omit instance representing the local site // omit instance representing the local site
.left_join(site::table.inner_join(local_site::table)) .left_join(site::table.inner_join(local_site::table))
.filter(local_site::id.is_null()) .filter(local_site::id.is_null())
// omit instances in the blocklist
.left_join(federation_blocklist::table) .left_join(federation_blocklist::table)
.filter(federation_blocklist::id.is_null()) .left_join(federation_allowlist::table)
.select(instance::all_columns) .left_join(federation_queue_state::table)
.select((
Self::as_select(),
Option::<FederationQueueState>::as_select(),
federation_blocklist::id.nullable().is_not_null(),
federation_allowlist::id.nullable().is_not_null(),
))
.get_results(conn) .get_results(conn)
.await .await
} }

View file

@ -10,6 +10,7 @@ pub mod custom_emoji;
pub mod email_verification; pub mod email_verification;
pub mod federation_allowlist; pub mod federation_allowlist;
pub mod federation_blocklist; pub mod federation_blocklist;
pub mod federation_queue_state;
pub mod image_upload; pub mod image_upload;
pub mod instance; pub mod instance;
pub mod instance_block; pub mod instance_block;

View file

@ -149,6 +149,13 @@ pub struct ImageUploadId(i32);
/// The instance id. /// The instance id.
pub struct InstanceId(i32); pub struct InstanceId(i32);
#[derive(
Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default, PartialOrd, Ord,
)]
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))]
pub struct ActivityId(pub i64);
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)] #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
#[cfg_attr(feature = "full", derive(DieselNewType, TS))] #[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]

View file

@ -307,9 +307,10 @@ diesel::table! {
federation_queue_state (id) { federation_queue_state (id) {
id -> Int4, id -> Int4,
instance_id -> Int4, instance_id -> Int4,
last_successful_id -> Int8, last_successful_id -> Nullable<Int8>,
fail_count -> Int4, fail_count -> Int4,
last_retry -> Timestamptz, last_retry -> Nullable<Timestamptz>,
last_successful_published_time -> Nullable<Timestamptz>,
} }
} }

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
newtypes::{CommunityId, DbUrl}, newtypes::{ActivityId, CommunityId, DbUrl},
schema::sent_activity, schema::sent_activity,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -54,7 +54,7 @@ impl ActivitySendTargets {
#[derive(PartialEq, Eq, Debug, Queryable)] #[derive(PartialEq, Eq, Debug, Queryable)]
#[diesel(table_name = sent_activity)] #[diesel(table_name = sent_activity)]
pub struct SentActivity { pub struct SentActivity {
pub id: i64, pub id: ActivityId,
pub ap_id: DbUrl, pub ap_id: DbUrl,
pub data: Value, pub data: Value,
pub sensitive: bool, pub sensitive: bool,

View file

@ -0,0 +1,26 @@
use crate::newtypes::{ActivityId, InstanceId};
use chrono::{DateTime, Utc};
#[cfg(feature = "full")]
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
#[cfg(feature = "full")]
use ts_rs::TS;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(
feature = "full",
derive(Queryable, Selectable, Insertable, AsChangeset)
)]
#[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", diesel(table_name = crate::schema::federation_queue_state))]
#[cfg_attr(feature = "full", diesel(check_for_backend(diesel::pg::Pg)))]
pub struct FederationQueueState {
pub instance_id: InstanceId,
/// the last successfully sent activity id
pub last_successful_id: Option<ActivityId>,
pub last_successful_published_time: Option<DateTime<Utc>>,
/// how many failed attempts have been made to send the next activity
pub fail_count: i32,
/// timestamp of the last retry attempt (when the last failing activity was resent)
pub last_retry: Option<DateTime<Utc>>,
}

View file

@ -15,6 +15,7 @@ pub mod custom_emoji_keyword;
pub mod email_verification; pub mod email_verification;
pub mod federation_allowlist; pub mod federation_allowlist;
pub mod federation_blocklist; pub mod federation_blocklist;
pub mod federation_queue_state;
pub mod image_upload; pub mod image_upload;
pub mod instance; pub mod instance;
pub mod instance_block; pub mod instance_block;

View file

@ -1,63 +0,0 @@
use crate::util::ActivityId;
use anyhow::Result;
use chrono::{DateTime, TimeZone, Utc};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use lemmy_db_schema::{
newtypes::InstanceId,
utils::{get_conn, DbPool},
};
#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)]
#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct FederationQueueState {
pub instance_id: InstanceId,
pub last_successful_id: ActivityId, // todo: i64
pub fail_count: i32,
pub last_retry: DateTime<Utc>,
}
impl FederationQueueState {
/// load state or return a default empty value
pub async fn load(
pool: &mut DbPool<'_>,
instance_id_: InstanceId,
) -> Result<FederationQueueState> {
use lemmy_db_schema::schema::federation_queue_state::dsl::{
federation_queue_state,
instance_id,
};
let conn = &mut get_conn(pool).await?;
Ok(
federation_queue_state
.filter(instance_id.eq(&instance_id_))
.select(FederationQueueState::as_select())
.get_result(conn)
.await
.optional()?
.unwrap_or(FederationQueueState {
instance_id: instance_id_,
fail_count: 0,
last_retry: Utc.timestamp_nanos(0),
last_successful_id: -1, // this value is set to the most current id for new instances
}),
)
}
pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> {
use lemmy_db_schema::schema::federation_queue_state::dsl::{
federation_queue_state,
instance_id,
};
let conn = &mut get_conn(pool).await?;
state
.insert_into(federation_queue_state)
.on_conflict(instance_id)
.do_update()
.set(state)
.execute(conn)
.await?;
Ok(())
}
}

View file

@ -1,14 +1,10 @@
use crate::{ use crate::{util::CancellableTask, worker::InstanceWorker};
util::{retry_sleep_duration, CancellableTask},
worker::InstanceWorker,
};
use activitypub_federation::config::FederationConfig; use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike}; use chrono::{Local, Timelike};
use federation_queue_state::FederationQueueState; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::InstanceId, newtypes::InstanceId,
source::instance::Instance, source::{federation_queue_state::FederationQueueState, instance::Instance},
utils::{ActualDbPool, DbPool}, utils::{ActualDbPool, DbPool},
}; };
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
@ -18,7 +14,6 @@ use tokio::{
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
mod federation_queue_state;
mod util; mod util;
mod worker; mod worker;
@ -52,7 +47,9 @@ async fn start_stop_federation_workers(
let mut total_count = 0; let mut total_count = 0;
let mut dead_count = 0; let mut dead_count = 0;
let mut disallowed_count = 0; let mut disallowed_count = 0;
for (instance, allowed, is_dead) in Instance::read_all_with_blocked_and_dead(pool2).await? { for (instance, allowed, is_dead) in
Instance::read_federated_with_blocked_and_dead(pool2).await?
{
if instance.domain == local_domain { if instance.domain == local_domain {
continue; continue;
} }
@ -188,14 +185,14 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap<String, FederationQu
// todo: more stats (act/sec, avg http req duration) // todo: more stats (act/sec, avg http req duration)
let mut ok_count = 0; let mut ok_count = 0;
for (domain, stat) in stats { for (domain, stat) in stats {
let behind = last_id - stat.last_successful_id; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0);
if stat.fail_count > 0 { if stat.fail_count > 0 {
tracing::info!( tracing::info!(
"{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}",
domain, domain,
behind, behind,
stat.fail_count, stat.fail_count,
retry_sleep_duration(stat.fail_count) federate_retry_sleep_duration(stat.fail_count)
); );
} else if behind > 0 { } else if behind > 0 {
tracing::info!("{}: Ok. {} behind", domain, behind); tracing::info!("{}: Ok. {} behind", domain, behind);

View file

@ -6,6 +6,7 @@ use lemmy_apub::{
fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity}, fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
}; };
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::ActivityId,
source::{ source::{
activity::{ActorType, SentActivity}, activity::{ActorType, SentActivity},
community::Community, community::Community,
@ -141,9 +142,6 @@ pub(crate) async fn get_actor_cached(
.map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}")) .map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}"))
} }
/// this should maybe be a newtype like all the other PersonId CommunityId etc.
pub(crate) type ActivityId = i64;
type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>; type CachedActivityInfo = Option<Arc<(SentActivity, SharedInboxActivities)>>;
/// activities are immutable so cache does not need to have TTL /// activities are immutable so cache does not need to have TTL
/// May return None if the corresponding id does not exist or is a received activity. /// May return None if the corresponding id does not exist or is a received activity.
@ -192,14 +190,9 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<Acti
use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity}; use lemmy_db_schema::schema::sent_activity::dsl::{id, sent_activity};
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?; let seq: Option<ActivityId> = sent_activity.select(max(id)).get_result(conn).await?;
let latest_id = seq.unwrap_or(0); let latest_id = seq.unwrap_or(ActivityId(0));
anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId)
}) })
.await .await
.map_err(|e| anyhow::anyhow!("err getting id: {e:?}")) .map_err(|e| anyhow::anyhow!("err getting id: {e:?}"))
} }
/// how long to sleep based on how many retries have already happened
pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration {
Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count)))
}

View file

@ -1,22 +1,23 @@
use crate::{ use crate::util::{
federation_queue_state::FederationQueueState,
util::{
get_activity_cached, get_activity_cached,
get_actor_cached, get_actor_cached,
get_latest_activity_id, get_latest_activity_id,
retry_sleep_duration,
LEMMY_TEST_FAST_FEDERATION, LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY, WORK_FINISHED_RECHECK_DELAY,
},
}; };
use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; use activitypub_federation::{activity_sending::SendActivityTask, config::Data};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use lemmy_api_common::context::LemmyContext; use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_apub::activity_lists::SharedInboxActivities; use lemmy_apub::activity_lists::SharedInboxActivities;
use lemmy_db_schema::{ use lemmy_db_schema::{
newtypes::{CommunityId, InstanceId}, newtypes::{ActivityId, CommunityId, InstanceId},
source::{activity::SentActivity, instance::Instance, site::Site}, source::{
activity::SentActivity,
federation_queue_state::FederationQueueState,
instance::Instance,
site::Site,
},
utils::DbPool, utils::DbPool,
}; };
use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_db_views_actor::structs::CommunityFollowerView;
@ -122,8 +123,12 @@ impl InstanceWorker {
async fn initial_fail_sleep(&mut self) -> Result<()> { async fn initial_fail_sleep(&mut self) -> Result<()> {
// before starting queue, sleep remaining duration if last request failed // before starting queue, sleep remaining duration if last request failed
if self.state.fail_count > 0 { if self.state.fail_count > 0 {
let elapsed = (Utc::now() - self.state.last_retry).to_std()?; let last_retry = self
let required = retry_sleep_duration(self.state.fail_count); .state
.last_retry
.context("impossible: if fail count set last retry also set")?;
let elapsed = (Utc::now() - last_retry).to_std()?;
let required = federate_retry_sleep_duration(self.state.fail_count);
if elapsed >= required { if elapsed >= required {
return Ok(()); return Ok(());
} }
@ -138,14 +143,16 @@ impl InstanceWorker {
/// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities /// send out a batch of CHECK_SAVE_STATE_EVERY_IT activities
async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> {
let latest_id = get_latest_activity_id(pool).await?; let latest_id = get_latest_activity_id(pool).await?;
if self.state.last_successful_id == -1 { let mut id = if let Some(id) = self.state.last_successful_id {
id
} else {
// this is the initial creation (instance first seen) of the federation queue for this instance // this is the initial creation (instance first seen) of the federation queue for this instance
// skip all past activities: // skip all past activities:
self.state.last_successful_id = latest_id; self.state.last_successful_id = Some(latest_id);
// save here to ensure it's not read as 0 again later if no activities have happened // save here to ensure it's not read as 0 again later if no activities have happened
self.save_and_send_state(pool).await?; self.save_and_send_state(pool).await?;
} latest_id
let mut id = self.state.last_successful_id; };
if id == latest_id { if id == latest_id {
// no more work to be done, wait before rechecking // no more work to be done, wait before rechecking
tokio::select! { tokio::select! {
@ -159,13 +166,13 @@ impl InstanceWorker {
&& processed_activities < CHECK_SAVE_STATE_EVERY_IT && processed_activities < CHECK_SAVE_STATE_EVERY_IT
&& !self.stop.is_cancelled() && !self.stop.is_cancelled()
{ {
id += 1; id = ActivityId(id.0 + 1);
processed_activities += 1; processed_activities += 1;
let Some(ele) = get_activity_cached(pool, id) let Some(ele) = get_activity_cached(pool, id)
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
else { else {
self.state.last_successful_id = id; self.state.last_successful_id = Some(id);
continue; continue;
}; };
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
@ -179,7 +186,8 @@ impl InstanceWorker {
return Ok(()); return Ok(());
} }
// send success! // send success!
self.state.last_successful_id = id; self.state.last_successful_id = Some(id);
self.state.last_successful_published_time = Some(ele.0.published);
self.state.fail_count = 0; self.state.fail_count = 0;
} }
Ok(()) Ok(())
@ -198,7 +206,8 @@ impl InstanceWorker {
.await .await
.context("failed figuring out inbox urls")?; .context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() { if inbox_urls.is_empty() {
self.state.last_successful_id = activity.id; self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published);
return Ok(()); return Ok(());
} }
let Some(actor_apub_id) = &activity.actor_apub_id else { let Some(actor_apub_id) = &activity.actor_apub_id else {
@ -217,10 +226,10 @@ impl InstanceWorker {
tracing::info!("sending out {}", task); tracing::info!("sending out {}", task);
while let Err(e) = task.sign_and_send(&self.context).await { while let Err(e) = task.sign_and_send(&self.context).await {
self.state.fail_count += 1; self.state.fail_count += 1;
self.state.last_retry = Utc::now(); self.state.last_retry = Some(Utc::now());
let retry_delay: Duration = retry_sleep_duration(self.state.fail_count); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
tracing::info!( tracing::info!(
"{}: retrying {} attempt {} with delay {retry_delay:.2?}. ({e})", "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
self.instance.domain, self.instance.domain,
activity.id, activity.id,
self.state.fail_count self.state.fail_count

View file

@ -0,0 +1,5 @@
ALTER TABLE federation_queue_state
DROP COLUMN last_successful_published_time,
ALTER COLUMN last_successful_id SET NOT NULL,
ALTER COLUMN last_retry SET NOT NULL;

View file

@ -0,0 +1,5 @@
ALTER TABLE federation_queue_state
ADD COLUMN last_successful_published_time timestamptz NULL,
ALTER COLUMN last_successful_id DROP NOT NULL,
ALTER COLUMN last_retry DROP NOT NULL;