diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs index edf530fb3..c282e482a 100644 --- a/crates/federate/src/worker.rs +++ b/crates/federate/src/worker.rs @@ -75,7 +75,7 @@ impl InstanceWorker { /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) async fn loop_until_stopped(&mut self) -> Result<()> { self.initial_fail_sleep().await?; - let mut latest_id = self.get_latest_id().await?; + let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; // activities that have been successfully sent but // that are not the lowest number and thus can't be written to the database yet @@ -106,19 +106,30 @@ impl InstanceWorker { } else { // send a new activity if there is one self.inbox_collector.update_communities().await?; - let next_id = { - // calculate next id to send based on the last id and the in flight requests + let next_id_to_send = ActivityId(last_sent_id.0 + 1); + { + // sanity check: calculate next id to send based on the last id and the in flight requests let last_successful_id = self .state .last_successful_id .map(|e| e.0) .expect("set above"); - ActivityId(last_successful_id + (successfuls.len() as i64) + in_flight + 1) - }; - if next_id > latest_id { + let expected_next_id = last_successful_id + (successfuls.len() as i64) + in_flight + 1; + // compare to next id based on incrementing + if expected_next_id != next_id_to_send.0 { + anyhow::bail!( + "{}: next id to send is not as expected: {:?} != {:?}", + self.instance.domain, + expected_next_id, + next_id_to_send + ) + } + } + + if next_id_to_send > newest_id { // lazily fetch latest id only if we have cought up - latest_id = self.get_latest_id().await?; - if next_id > latest_id { + newest_id = self.get_latest_ids().await?.1; + if next_id_to_send > newest_id { // no more work to be done, wait before rechecking tokio::select! { () = sleep(*WORK_FINISHED_RECHECK_DELAY) => {}, @@ -128,8 +139,9 @@ impl InstanceWorker { } } in_flight += 1; + last_sent_id = next_id_to_send; self - .spawn_send_if_needed(next_id, report_send_result.clone()) + .spawn_send_if_needed(next_id_to_send, report_send_result.clone()) .await?; } } @@ -164,17 +176,20 @@ impl InstanceWorker { Ok(()) } - /// get newest activity id and set it as last_successful_id if it's the first time this instance is seen - async fn get_latest_id(&mut self) -> Result { + /// return the last successfully sent id and the newest activity id in the database + /// sets last_successful_id in database if it's the first time this instance is seen + async fn get_latest_ids(&mut self) -> Result<(ActivityId, ActivityId)> { let latest_id = get_latest_activity_id(&mut self.pool()).await?; - if self.state.last_successful_id.is_none() { + if let Some(last) = self.state.last_successful_id { + Ok((last, latest_id)) + } else { // this is the initial creation (instance first seen) of the federation queue for this instance // skip all past activities: self.state.last_successful_id = Some(latest_id); // save here to ensure it's not read as 0 again later if no activities have happened self.save_and_send_state().await?; + Ok((latest_id, latest_id)) } - Ok(latest_id) } async fn handle_send_results( @@ -234,11 +249,12 @@ impl InstanceWorker { force_write: bool, ) -> Result<()> { let Some(mut last_id) = self.state.last_successful_id else { - tracing::warn!("should be impossible: last successful id is None"); + tracing::warn!("{} should be impossible: last successful id is None", self.instance.domain); return Ok(()); }; tracing::debug!( - "last: {:?}, next: {:?}, currently in successfuls: {:?}", + "{} last: {:?}, next: {:?}, currently in successfuls: {:?}", + self.instance.domain, last_id, successfuls.peek(), successfuls.iter()