2023-11-06 21:07:04 +00:00
use crate ::util ::{
get_activity_cached ,
get_actor_cached ,
get_latest_activity_id ,
LEMMY_TEST_FAST_FEDERATION ,
WORK_FINISHED_RECHECK_DELAY ,
2023-09-09 16:25:03 +00:00
} ;
2024-01-05 14:42:46 +00:00
use activitypub_federation ::{
activity_sending ::SendActivityTask ,
2024-04-13 21:18:28 +00:00
config ::{ Data , FederationConfig } ,
2024-01-05 14:42:46 +00:00
protocol ::context ::WithContext ,
} ;
2023-09-09 16:25:03 +00:00
use anyhow ::{ Context , Result } ;
2024-01-19 14:40:12 +00:00
use chrono ::{ DateTime , Days , TimeZone , Utc } ;
2023-11-06 21:07:04 +00:00
use lemmy_api_common ::{ context ::LemmyContext , federate_retry_sleep_duration } ;
2024-01-05 14:42:46 +00:00
use lemmy_apub ::{ activity_lists ::SharedInboxActivities , FEDERATION_CONTEXT } ;
2023-09-09 16:25:03 +00:00
use lemmy_db_schema ::{
2023-11-06 21:07:04 +00:00
newtypes ::{ ActivityId , CommunityId , InstanceId } ,
source ::{
activity ::SentActivity ,
federation_queue_state ::FederationQueueState ,
2024-01-19 14:40:12 +00:00
instance ::{ Instance , InstanceForm } ,
2023-11-06 21:07:04 +00:00
site ::Site ,
} ,
2024-04-13 21:18:28 +00:00
utils ::{ naive_now , ActualDbPool , DbPool } ,
2023-09-09 16:25:03 +00:00
} ;
use lemmy_db_views_actor ::structs ::CommunityFollowerView ;
use once_cell ::sync ::Lazy ;
use reqwest ::Url ;
use std ::{
2024-04-13 21:18:28 +00:00
collections ::{ BinaryHeap , HashMap , HashSet } ,
2024-01-19 14:40:12 +00:00
ops ::{ Add , Deref } ,
2023-09-09 16:25:03 +00:00
time ::Duration ,
} ;
2024-04-13 21:18:28 +00:00
use tokio ::{
sync ::mpsc ::{ self , UnboundedSender } ,
time ::sleep ,
} ;
2023-09-09 16:25:03 +00:00
use tokio_util ::sync ::CancellationToken ;
2023-09-13 11:20:09 +00:00
/// Save state to db after this time has passed since the last state (so if the server crashes or is SIGKILLed, less than X seconds of activities are resent)
2023-09-09 16:25:03 +00:00
static SAVE_STATE_EVERY_TIME : Duration = Duration ::from_secs ( 60 ) ;
2023-09-13 11:20:09 +00:00
/// interval with which new additions to community_followers are queried.
///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
2024-03-18 09:36:49 +00:00
static FOLLOW_ADDITIONS_RECHECK_DELAY : Lazy < chrono ::TimeDelta > = Lazy ::new ( | | {
2023-09-13 11:20:09 +00:00
if * LEMMY_TEST_FAST_FEDERATION {
2024-03-18 09:36:49 +00:00
chrono ::TimeDelta ::try_seconds ( 1 ) . expect ( " TimeDelta out of bounds " )
2023-09-13 11:20:09 +00:00
} else {
2024-03-18 09:36:49 +00:00
chrono ::TimeDelta ::try_minutes ( 2 ) . expect ( " TimeDelta out of bounds " )
2023-09-13 11:20:09 +00:00
}
} ) ;
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community.
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important.
2024-03-18 09:36:49 +00:00
static FOLLOW_REMOVALS_RECHECK_DELAY : Lazy < chrono ::TimeDelta > =
Lazy ::new ( | | chrono ::TimeDelta ::try_hours ( 1 ) . expect ( " TimeDelta out of bounds " ) ) ;
2024-04-13 21:18:28 +00:00
static CONCURRENT_SENDS : Lazy < i64 > = Lazy ::new ( | | {
std ::env ::var ( " LEMMY_FEDERATION_CONCURRENT_SENDS_PER_INSTANCE " )
. ok ( )
. and_then ( | s | s . parse ( ) . ok ( ) )
. unwrap_or ( 8 )
} ) ;
/// Maximum number of successful sends to allow out of order
const MAX_SUCCESSFULS : usize = 1000 ;
2023-09-09 16:25:03 +00:00
pub ( crate ) struct InstanceWorker {
instance : Instance ,
// load site lazily because if an instance is first seen due to being on allowlist,
// the corresponding row in `site` may not exist yet since that is only added once
// `fetch_instance_actor_for_object` is called.
// (this should be unlikely to be relevant outside of the federation tests)
site_loaded : bool ,
site : Option < Site > ,
followed_communities : HashMap < CommunityId , HashSet < Url > > ,
stop : CancellationToken ,
2024-04-13 21:18:28 +00:00
config : FederationConfig < LemmyContext > ,
2023-09-09 16:25:03 +00:00
stats_sender : UnboundedSender < ( String , FederationQueueState ) > ,
last_full_communities_fetch : DateTime < Utc > ,
last_incremental_communities_fetch : DateTime < Utc > ,
state : FederationQueueState ,
last_state_insert : DateTime < Utc > ,
2024-04-13 21:18:28 +00:00
pool : ActualDbPool ,
}
#[ derive(Debug, PartialEq, Eq) ]
struct SendSuccessInfo {
activity_id : ActivityId ,
published : Option < DateTime < Utc > > ,
was_skipped : bool ,
}
impl PartialOrd for SendSuccessInfo {
2024-04-13 21:48:32 +00:00
fn partial_cmp ( & self , other : & Self ) -> Option < std ::cmp ::Ordering > {
other . activity_id . partial_cmp ( & self . activity_id )
}
2024-04-13 21:18:28 +00:00
}
impl Ord for SendSuccessInfo {
2024-04-13 21:48:32 +00:00
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
other . activity_id . cmp ( & self . activity_id )
}
2024-04-13 21:18:28 +00:00
}
enum SendActivityResult {
Success ( SendSuccessInfo ) ,
Failure {
fail_count : i32 ,
// activity_id: ActivityId,
} ,
2023-09-09 16:25:03 +00:00
}
impl InstanceWorker {
pub ( crate ) async fn init_and_loop (
instance : Instance ,
2024-04-13 21:18:28 +00:00
config : FederationConfig < LemmyContext > ,
2023-09-09 16:25:03 +00:00
stop : CancellationToken ,
stats_sender : UnboundedSender < ( String , FederationQueueState ) > ,
) -> Result < ( ) , anyhow ::Error > {
2024-04-13 21:18:28 +00:00
let pool = config . to_request_data ( ) . inner_pool ( ) . clone ( ) ;
2024-04-13 21:48:32 +00:00
let state = FederationQueueState ::load ( & mut DbPool ::Pool ( & pool ) , instance . id ) . await ? ;
2023-09-09 16:25:03 +00:00
let mut worker = InstanceWorker {
instance ,
site_loaded : false ,
site : None ,
followed_communities : HashMap ::new ( ) ,
stop ,
2024-04-13 21:18:28 +00:00
config ,
2023-09-09 16:25:03 +00:00
stats_sender ,
last_full_communities_fetch : Utc . timestamp_nanos ( 0 ) ,
last_incremental_communities_fetch : Utc . timestamp_nanos ( 0 ) ,
state ,
last_state_insert : Utc . timestamp_nanos ( 0 ) ,
2024-04-13 21:18:28 +00:00
pool ,
2023-09-09 16:25:03 +00:00
} ;
2024-04-13 21:18:28 +00:00
worker . loop_until_stopped ( ) . await
2023-09-09 16:25:03 +00:00
}
/// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit)
2024-04-13 21:18:28 +00:00
async fn loop_until_stopped ( & mut self ) -> Result < ( ) > {
2023-09-09 16:25:03 +00:00
self . initial_fail_sleep ( ) . await ? ;
2024-04-13 21:18:28 +00:00
let mut latest_id = self . get_latest_id ( ) . await ? ;
// activities that have been successfully sent but
// that are not the lowest number and thus can't be written to the database yet
let mut successfuls = BinaryHeap ::< SendSuccessInfo > ::new ( ) ;
2024-04-13 21:48:32 +00:00
// number of activities that currently have a task spawned to send it
2024-04-13 21:18:28 +00:00
let mut in_flight : i64 = 0 ;
2024-04-13 21:48:32 +00:00
// each HTTP send will report back to this channel concurrently
let ( report_send_result , mut receive_send_result ) =
2024-04-13 21:18:28 +00:00
tokio ::sync ::mpsc ::unbounded_channel ::< SendActivityResult > ( ) ;
2023-09-09 16:25:03 +00:00
while ! self . stop . is_cancelled ( ) {
2024-04-13 21:18:28 +00:00
// check if we need to wait for a send to finish before sending the next one
2024-04-13 21:48:32 +00:00
// we wait if (a) the last request failed, only if a request is already in flight (not at the start of the loop)
// or (b) if we have too many successfuls in memory or (c) if we have too many in flight
2024-04-13 21:18:28 +00:00
let need_wait_for_event = ( in_flight ! = 0 & & self . state . fail_count > 0 )
2024-04-13 21:48:32 +00:00
| | successfuls . len ( ) > = MAX_SUCCESSFULS
2024-04-13 21:18:28 +00:00
| | in_flight > = * CONCURRENT_SENDS ;
2024-04-13 21:48:32 +00:00
if need_wait_for_event | | receive_send_result . len ( ) > 4 {
// if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions,
// which can only happen by an event sent into the channel
2024-04-13 21:18:28 +00:00
self
2024-04-13 21:48:32 +00:00
. handle_send_results ( & mut receive_send_result , & mut successfuls , & mut in_flight )
2024-04-13 21:18:28 +00:00
. await ? ;
2024-04-13 21:48:32 +00:00
// handle_send_results does not guarantee that we are now in a condition where we want to send a new one,
// so repeat this check until the if no longer applies
continue ;
2024-04-13 21:18:28 +00:00
} else {
2024-04-13 21:48:32 +00:00
// send a new activity if there is one
2024-04-13 21:18:28 +00:00
self . update_communities ( ) . await ? ;
2024-04-13 21:48:32 +00:00
let next_id = {
// calculate next id to send based on the last id and the in flight requests
let last_successful_id = self
. state
. last_successful_id
. map ( | e | e . 0 )
. expect ( " set above " ) ;
ActivityId ( last_successful_id + ( successfuls . len ( ) as i64 ) + in_flight + 1 )
} ;
2024-04-13 21:18:28 +00:00
if next_id > latest_id {
2024-04-13 21:48:32 +00:00
// lazily fetch latest id only if we have cought up
2024-04-13 21:18:28 +00:00
latest_id = self . get_latest_id ( ) . await ? ;
if next_id > latest_id {
// no more work to be done, wait before rechecking
tokio ::select! {
( ) = sleep ( * WORK_FINISHED_RECHECK_DELAY ) = > { } ,
( ) = self . stop . cancelled ( ) = > { }
}
continue ;
}
}
in_flight + = 1 ;
self
2024-04-13 21:48:32 +00:00
. spawn_send_if_needed ( next_id , report_send_result . clone ( ) )
2024-04-13 21:18:28 +00:00
. await ? ;
2023-09-09 16:25:03 +00:00
}
}
2024-04-13 21:18:28 +00:00
// final update of state in db on shutdown
self . save_and_send_state ( ) . await ? ;
2023-09-09 16:25:03 +00:00
Ok ( ( ) )
}
async fn initial_fail_sleep ( & mut self ) -> Result < ( ) > {
// before starting queue, sleep remaining duration if last request failed
if self . state . fail_count > 0 {
2023-11-06 21:07:04 +00:00
let last_retry = self
. state
. last_retry
. context ( " impossible: if fail count set last retry also set " ) ? ;
let elapsed = ( Utc ::now ( ) - last_retry ) . to_std ( ) ? ;
let required = federate_retry_sleep_duration ( self . state . fail_count ) ;
2023-09-09 16:25:03 +00:00
if elapsed > = required {
return Ok ( ( ) ) ;
}
let remaining = required - elapsed ;
2024-04-13 21:18:28 +00:00
tracing ::debug! (
" {}: fail-sleeping for {:?} before starting queue " ,
self . instance . domain ,
remaining
) ;
2023-09-09 16:25:03 +00:00
tokio ::select! {
( ) = sleep ( remaining ) = > { } ,
( ) = self . stop . cancelled ( ) = > { }
}
}
Ok ( ( ) )
}
2024-04-13 21:18:28 +00:00
/// get newest activity id and set it as last_successful_id if it's the first time this instance is seen
async fn get_latest_id ( & mut self ) -> Result < ActivityId > {
let latest_id = get_latest_activity_id ( & mut self . pool ( ) ) . await ? ;
2024-04-15 15:59:26 +00:00
if self . state . last_successful_id . is_none ( ) {
2023-09-09 16:25:03 +00:00
// this is the initial creation (instance first seen) of the federation queue for this instance
// skip all past activities:
2023-11-06 21:07:04 +00:00
self . state . last_successful_id = Some ( latest_id ) ;
2023-09-09 16:25:03 +00:00
// save here to ensure it's not read as 0 again later if no activities have happened
2024-04-13 21:18:28 +00:00
self . save_and_send_state ( ) . await ? ;
}
Ok ( latest_id )
}
async fn handle_send_results (
& mut self ,
receive_inbox_result : & mut mpsc ::UnboundedReceiver < SendActivityResult > ,
successfuls : & mut BinaryHeap < SendSuccessInfo > ,
in_flight : & mut i64 ,
) -> Result < ( ) , anyhow ::Error > {
2024-04-15 15:59:26 +00:00
let mut force_write = false ;
2024-04-13 21:18:28 +00:00
let mut events = Vec ::new ( ) ;
// wait for at least one event but if there's multiple handle them all
receive_inbox_result . recv_many ( & mut events , 1000 ) . await ;
for event in events {
match event {
SendActivityResult ::Success ( s ) = > {
self . state . fail_count = 0 ;
* in_flight - = 1 ;
if ! s . was_skipped {
self . mark_instance_alive ( ) . await ? ;
}
successfuls . push ( s ) ;
}
SendActivityResult ::Failure { fail_count , .. } = > {
if fail_count > self . state . fail_count {
// override fail count - if multiple activities are currently sending this value may get conflicting info but that's fine
self . state . fail_count = fail_count ;
self . state . last_retry = Some ( Utc ::now ( ) ) ;
2024-04-15 15:59:26 +00:00
force_write = true ;
2024-04-13 21:18:28 +00:00
}
}
2023-09-09 16:25:03 +00:00
}
}
2024-04-13 21:18:28 +00:00
self
. pop_successfuls_and_write ( successfuls , force_write )
. await ? ;
Ok ( ( ) )
}
async fn mark_instance_alive ( & mut self ) -> Result < ( ) > {
// Activity send successful, mark instance as alive if it hasn't been updated in a while.
let updated = self . instance . updated . unwrap_or ( self . instance . published ) ;
if updated . add ( Days ::new ( 1 ) ) < Utc ::now ( ) {
self . instance . updated = Some ( Utc ::now ( ) ) ;
let form = InstanceForm ::builder ( )
. domain ( self . instance . domain . clone ( ) )
. updated ( Some ( naive_now ( ) ) )
. build ( ) ;
Instance ::update ( & mut self . pool ( ) , self . instance . id , form ) . await ? ;
}
Ok ( ( ) )
}
/// checks whether the highest successful id can be updated and writes to db if so
async fn pop_successfuls_and_write (
& mut self ,
successfuls : & mut BinaryHeap < SendSuccessInfo > ,
force_write : bool ,
) -> Result < ( ) > {
let Some ( mut last_id ) = self . state . last_successful_id else {
tracing ::warn! ( " should be impossible: last successful id is None " ) ;
return Ok ( ( ) ) ;
} ;
tracing ::debug! (
" last: {:?}, next: {:?}, currently in successfuls: {:?} " ,
last_id ,
successfuls . peek ( ) ,
successfuls . iter ( )
) ;
while successfuls
. peek ( )
. map ( | a | & a . activity_id = = & ActivityId ( last_id . 0 + 1 ) )
. unwrap_or ( false )
2023-09-09 16:25:03 +00:00
{
2024-04-13 21:18:28 +00:00
let next = successfuls . pop ( ) . unwrap ( ) ;
last_id = next . activity_id ;
self . state . last_successful_id = Some ( next . activity_id ) ;
self . state . last_successful_published_time = next . published ;
}
let save_state_every = chrono ::Duration ::from_std ( SAVE_STATE_EVERY_TIME ) . expect ( " not negative " ) ;
if force_write | | ( Utc ::now ( ) - self . last_state_insert ) > save_state_every {
self . save_and_send_state ( ) . await ? ;
}
Ok ( ( ) )
}
async fn spawn_send_if_needed (
& mut self ,
activity_id : ActivityId ,
report : UnboundedSender < SendActivityResult > ,
) -> Result < ( ) > {
let Some ( ele ) = get_activity_cached ( & mut self . pool ( ) , activity_id )
. await
. context ( " failed reading activity from db " ) ?
else {
tracing ::debug! ( " {}: {:?} does not exist " , self . instance . domain , activity_id ) ;
report . send ( SendActivityResult ::Success ( SendSuccessInfo {
activity_id ,
published : None ,
was_skipped : true ,
} ) ) ? ;
return Ok ( ( ) ) ;
} ;
let activity = & ele . 0 ;
let inbox_urls = self
. get_inbox_urls ( activity )
. await
. context ( " failed figuring out inbox urls " ) ? ;
if inbox_urls . is_empty ( ) {
tracing ::debug! ( " {}: {:?} no inboxes " , self . instance . domain , activity . id ) ;
report . send ( SendActivityResult ::Success ( SendSuccessInfo {
activity_id ,
published : Some ( activity . published ) ,
was_skipped : true ,
} ) ) ? ;
return Ok ( ( ) ) ;
}
let inbox_urls = inbox_urls . into_iter ( ) . collect ( ) ;
let initial_fail_count = self . state . fail_count ;
let data = self . config . to_request_data ( ) ;
let stop = self . stop . clone ( ) ;
let domain = self . instance . domain . clone ( ) ;
tokio ::spawn ( async move {
2024-04-13 21:48:32 +00:00
let mut report = report ;
2024-04-13 21:18:28 +00:00
if let Err ( e ) = InstanceWorker ::send_retry_loop (
& ele . 0 ,
& ele . 1 ,
inbox_urls ,
2024-04-13 21:48:32 +00:00
& mut report ,
2024-04-13 21:18:28 +00:00
initial_fail_count ,
domain ,
data ,
stop ,
)
. await
{
2023-09-09 16:25:03 +00:00
tracing ::warn! (
" sending {} errored internally, skipping activity: {:?} " ,
ele . 0. ap_id ,
e
) ;
2024-04-15 15:59:26 +00:00
report
. send ( SendActivityResult ::Success ( SendSuccessInfo {
activity_id ,
published : None ,
was_skipped : true ,
} ) )
. ok ( ) ;
2023-09-09 16:25:03 +00:00
}
2024-04-13 21:18:28 +00:00
} ) ;
2023-09-09 16:25:03 +00:00
Ok ( ( ) )
}
// this function will return successfully when (a) send succeeded or (b) worker cancelled
// and will return an error if an internal error occurred (send errors cause an infinite loop)
async fn send_retry_loop (
activity : & SentActivity ,
object : & SharedInboxActivities ,
2024-04-13 21:18:28 +00:00
inbox_urls : Vec < Url > ,
2024-04-13 21:48:32 +00:00
report : & mut UnboundedSender < SendActivityResult > ,
2024-04-13 21:18:28 +00:00
initial_fail_count : i32 ,
domain : String ,
context : Data < LemmyContext > ,
stop : CancellationToken ,
2023-09-09 16:25:03 +00:00
) -> Result < ( ) > {
2024-04-13 21:18:28 +00:00
let pool = & mut context . pool ( ) ;
2023-09-09 16:25:03 +00:00
let Some ( actor_apub_id ) = & activity . actor_apub_id else {
2024-04-13 21:48:32 +00:00
return Err ( anyhow ::anyhow! ( " activity is from before lemmy 0.19 " ) ) ;
2023-09-09 16:25:03 +00:00
} ;
let actor = get_actor_cached ( pool , activity . actor_type , actor_apub_id )
. await
. context ( " failed getting actor instance (was it marked deleted / removed?) " ) ? ;
2024-01-05 14:42:46 +00:00
let object = WithContext ::new ( object . clone ( ) , FEDERATION_CONTEXT . deref ( ) . clone ( ) ) ;
2024-04-13 21:18:28 +00:00
let requests = SendActivityTask ::prepare ( & object , actor . as_ref ( ) , inbox_urls , & context ) . await ? ;
2023-09-09 16:25:03 +00:00
for task in requests {
// usually only one due to shared inbox
2023-12-14 12:25:04 +00:00
tracing ::debug! ( " sending out {} " , task ) ;
2024-04-13 21:18:28 +00:00
let mut fail_count = initial_fail_count ;
while let Err ( e ) = task . sign_and_send ( & context ) . await {
fail_count + = 1 ;
report . send ( SendActivityResult ::Failure {
fail_count ,
// activity_id: activity.id,
} ) ? ;
let retry_delay : Duration = federate_retry_sleep_duration ( fail_count ) ;
2023-09-09 16:25:03 +00:00
tracing ::info! (
2023-11-06 21:07:04 +00:00
" {}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e}) " ,
2024-04-13 21:18:28 +00:00
domain ,
2023-09-09 16:25:03 +00:00
activity . id ,
2024-04-13 21:18:28 +00:00
fail_count
2023-09-09 16:25:03 +00:00
) ;
tokio ::select! {
( ) = sleep ( retry_delay ) = > { } ,
2024-04-13 21:18:28 +00:00
( ) = stop . cancelled ( ) = > {
2023-09-09 16:25:03 +00:00
// save state to db and exit
2024-04-13 21:48:32 +00:00
// TODO: do we need to report state here to prevent hang on exit?
2023-09-09 16:25:03 +00:00
return Ok ( ( ) ) ;
}
}
}
}
2024-04-13 21:18:28 +00:00
report . send ( SendActivityResult ::Success ( SendSuccessInfo {
activity_id : activity . id ,
published : Some ( activity . published ) ,
was_skipped : false ,
} ) ) ? ;
2023-09-09 16:25:03 +00:00
Ok ( ( ) )
}
/// get inbox urls of sending the given activity to the given instance
/// most often this will return 0 values (if instance doesn't care about the activity)
/// or 1 value (the shared inbox)
/// > 1 values only happens for non-lemmy software
2024-04-13 21:18:28 +00:00
async fn get_inbox_urls ( & mut self , activity : & SentActivity ) -> Result < HashSet < Url > > {
2023-09-09 16:25:03 +00:00
let mut inbox_urls : HashSet < Url > = HashSet ::new ( ) ;
if activity . send_all_instances {
if ! self . site_loaded {
2024-04-13 21:18:28 +00:00
self . site = Site ::read_from_instance_id ( & mut self . pool ( ) , self . instance . id ) . await ? ;
2023-09-09 16:25:03 +00:00
self . site_loaded = true ;
}
if let Some ( site ) = & self . site {
// Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine.
inbox_urls . insert ( site . inbox_url . inner ( ) . clone ( ) ) ;
}
}
if let Some ( t ) = & activity . send_community_followers_of {
if let Some ( urls ) = self . followed_communities . get ( t ) {
2024-01-23 23:47:28 +00:00
inbox_urls . extend ( urls . iter ( ) . cloned ( ) ) ;
2023-09-09 16:25:03 +00:00
}
}
inbox_urls . extend (
activity
. send_inboxes
. iter ( )
. filter_map ( std ::option ::Option ::as_ref )
2023-09-12 19:54:49 +00:00
. filter ( | & u | ( u . domain ( ) = = Some ( & self . instance . domain ) ) )
. map ( | u | u . inner ( ) . clone ( ) ) ,
2023-09-09 16:25:03 +00:00
) ;
Ok ( inbox_urls )
}
2024-04-13 21:18:28 +00:00
async fn update_communities ( & mut self ) -> Result < ( ) > {
2023-09-09 16:25:03 +00:00
if ( Utc ::now ( ) - self . last_full_communities_fetch ) > * FOLLOW_REMOVALS_RECHECK_DELAY {
2024-04-13 21:18:28 +00:00
tracing ::debug! (
" {}: fetching full list of communities " ,
self . instance . domain
) ;
2023-09-09 16:25:03 +00:00
// process removals every hour
( self . followed_communities , self . last_full_communities_fetch ) = self
2024-04-13 21:18:28 +00:00
. get_communities ( self . instance . id , Utc . timestamp_nanos ( 0 ) )
2023-09-09 16:25:03 +00:00
. await ? ;
self . last_incremental_communities_fetch = self . last_full_communities_fetch ;
}
if ( Utc ::now ( ) - self . last_incremental_communities_fetch ) > * FOLLOW_ADDITIONS_RECHECK_DELAY {
// process additions every minute
let ( news , time ) = self
2024-04-13 21:18:28 +00:00
. get_communities ( self . instance . id , self . last_incremental_communities_fetch )
2023-09-09 16:25:03 +00:00
. await ? ;
2024-04-13 21:18:28 +00:00
if ! news . is_empty ( ) {
tracing ::debug! (
" {}: fetched {} incremental new followed communities " ,
self . instance . domain ,
news . len ( )
) ;
}
2023-09-09 16:25:03 +00:00
self . followed_communities . extend ( news ) ;
self . last_incremental_communities_fetch = time ;
}
Ok ( ( ) )
}
/// get a list of local communities with the remote inboxes on the given instance that cares about them
async fn get_communities (
& mut self ,
instance_id : InstanceId ,
last_fetch : DateTime < Utc > ,
) -> Result < ( HashMap < CommunityId , HashSet < Url > > , DateTime < Utc > ) > {
2024-03-18 09:36:49 +00:00
let new_last_fetch =
Utc ::now ( ) - chrono ::TimeDelta ::try_seconds ( 10 ) . expect ( " TimeDelta out of bounds " ) ; // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact
2023-09-09 16:25:03 +00:00
Ok ( (
2024-04-13 21:18:28 +00:00
CommunityFollowerView ::get_instance_followed_community_inboxes (
& mut self . pool ( ) ,
instance_id ,
last_fetch ,
)
. await ?
. into_iter ( )
. fold ( HashMap ::new ( ) , | mut map , ( c , u ) | {
map . entry ( c ) . or_default ( ) . insert ( u . into ( ) ) ;
map
} ) ,
2023-09-09 16:25:03 +00:00
new_last_fetch ,
) )
}
2024-04-13 21:18:28 +00:00
async fn save_and_send_state ( & mut self ) -> Result < ( ) > {
tracing ::debug! ( " {}: saving and sending state " , self . instance . domain ) ;
2023-09-09 16:25:03 +00:00
self . last_state_insert = Utc ::now ( ) ;
2024-04-13 21:18:28 +00:00
FederationQueueState ::upsert ( & mut self . pool ( ) , & self . state ) . await ? ;
2023-09-09 16:25:03 +00:00
self
. stats_sender
. send ( ( self . instance . domain . clone ( ) , self . state . clone ( ) ) ) ? ;
Ok ( ( ) )
}
2024-04-13 21:18:28 +00:00
fn pool ( & self ) -> DbPool < '_ > {
//self.config.to_request_data()
DbPool ::Pool ( & self . pool )
}
2023-09-09 16:25:03 +00:00
}