Use BinaryHeap for more efficient retry selection

This commit is contained in:
cetra3 2023-07-20 18:52:41 +09:30
parent ea2f6b4f69
commit b2e45f8287
4 changed files with 124 additions and 71 deletions

View file

@ -2,7 +2,7 @@
//! //!
#![doc = include_str!("../../docs/09_sending_activities.md")] #![doc = include_str!("../../docs/09_sending_activities.md")]
use self::{request::sign_and_send, retry_queue::RetryQueue}; use self::{queue::ActivityQueue, request::sign_and_send};
use crate::{ use crate::{
config::Data, config::Data,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
@ -22,16 +22,15 @@ use std::{
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use url::Url; use url::Url;
pub(crate) mod queue;
pub(crate) mod request; pub(crate) mod request;
pub(crate) mod retry_queue;
pub(super) mod retry_worker; pub(super) mod retry_worker;
pub(super) mod util; pub(super) mod util;
/// Send a new activity to the given inboxes /// Send a new activity to the given inboxes
/// ///
/// - `activity`: The activity to be sent, gets converted to json /// - `activity`: The activity to be sent, gets converted to json
/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP /// - `actor`: The actor doing the sending
/// signature. Generated with [crate::http_signatures::generate_actor_keypair].
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor /// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor. /// for each target actor.
@ -96,6 +95,17 @@ pub struct RawActivity {
private_key: PKey<Private>, private_key: PKey<Private>,
} }
impl PartialEq for RawActivity {
fn eq(&self, other: &Self) -> bool {
self.actor_id == other.actor_id
&& self.activity_id == other.activity_id
&& self.activity == other.activity
&& self.inbox == other.inbox
}
}
impl Eq for RawActivity {}
impl RawActivity { impl RawActivity {
/// Sends a raw activity directly, rather than using the background queue. /// Sends a raw activity directly, rather than using the background queue.
/// This will sign and send the request using the configured [`client`](crate::config::FederationConfigBuilder::client) in the federation config /// This will sign and send the request using the configured [`client`](crate::config::FederationConfigBuilder::client) in the federation config
@ -187,8 +197,8 @@ pub(crate) fn create_activity_queue(
disable_retry: bool, disable_retry: bool,
request_timeout: Duration, request_timeout: Duration,
http_signature_compat: bool, http_signature_compat: bool,
) -> RetryQueue { ) -> ActivityQueue {
RetryQueue::new( ActivityQueue::new(
client, client,
worker_count, worker_count,
retry_count, retry_count,
@ -264,7 +274,7 @@ mod tests {
.init(); .init();
*/ */
let activity_queue = RetryQueue::new( let activity_queue = ActivityQueue::new(
reqwest::Client::default().into(), reqwest::Client::default().into(),
num_workers, num_workers,
num_workers, num_workers,

View file

@ -15,7 +15,7 @@ use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
/// A simple activity queue which spawns tokio workers to send out requests /// A simple activity queue which spawns tokio workers to send out requests
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
pub(crate) struct RetryQueue { pub(crate) struct ActivityQueue {
// Stats shared between the queue and workers // Stats shared between the queue and workers
stats: Arc<Stats>, stats: Arc<Stats>,
sender: UnboundedSender<RetryRawActivity>, sender: UnboundedSender<RetryRawActivity>,
@ -48,7 +48,7 @@ impl Debug for Stats {
} }
} }
impl RetryQueue { impl ActivityQueue {
pub fn new( pub fn new(
client: ClientWithMiddleware, client: ClientWithMiddleware,
worker_count: usize, worker_count: usize,

View file

@ -1,23 +1,18 @@
use super::{request::sign_and_send, retry_queue::Stats, util::RetryStrategy, RawActivity}; use super::{queue::Stats, request::sign_and_send, util::RetryStrategy, RawActivity};
use futures_core::Future; use futures_core::Future;
use futures_util::FutureExt; use futures_util::FutureExt;
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use std::{ use std::{
collections::{BTreeMap, BinaryHeap},
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::{ use tokio::{
sync::mpsc::{ sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
error::TryRecvError,
unbounded_channel,
UnboundedReceiver,
UnboundedSender,
WeakUnboundedSender,
},
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet},
time::MissedTickBehavior, time::MissedTickBehavior,
}; };
use tracing::error; use tracing::{error, info};
/// A tokio spawned worker which is responsible for submitting requests to federated servers /// A tokio spawned worker which is responsible for submitting requests to federated servers
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue. /// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
@ -37,7 +32,7 @@ pub(super) struct RetryWorker {
} }
/// A message that has tried to be sent but has not been able to be sent /// A message that has tried to be sent but has not been able to be sent
#[derive(Debug)] #[derive(Debug, PartialEq, Eq)]
pub(super) struct RetryRawActivity { pub(super) struct RetryRawActivity {
/// The message that is sent /// The message that is sent
pub message: RawActivity, pub message: RawActivity,
@ -47,6 +42,20 @@ pub(super) struct RetryRawActivity {
pub count: usize, pub count: usize,
} }
// We reverse the order here as we want the "highest" to be the earliest, not latest
// So that we can retry the oldest sent first
impl Ord for RetryRawActivity {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.last_sent.cmp(&other.last_sent).reverse()
}
}
impl PartialOrd for RetryRawActivity {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl RetryWorker { impl RetryWorker {
/// Spawns a background task for managing the queue of retryables /// Spawns a background task for managing the queue of retryables
pub fn spawn( pub fn spawn(
@ -60,7 +69,7 @@ impl RetryWorker {
) -> (UnboundedSender<RetryRawActivity>, JoinHandle<()>) { ) -> (UnboundedSender<RetryRawActivity>, JoinHandle<()>) {
// The main sender channel, gets called immediately when something is queued // The main sender channel, gets called immediately when something is queued
let (sender, receiver) = unbounded_channel::<RetryRawActivity>(); let (sender, receiver) = unbounded_channel::<RetryRawActivity>();
// The batch sender channel, waits up to an hour before checking if anything needs to be sent // The batch sender channel, checks every hour if anything needs to be sent
let (batch_sender, batch_receiver) = unbounded_channel::<RetryRawActivity>(); let (batch_sender, batch_receiver) = unbounded_channel::<RetryRawActivity>();
// The retry sender channel, is called by the batch // The retry sender channel, is called by the batch
let (retry_sender, retry_receiver) = unbounded_channel::<RetryRawActivity>(); let (retry_sender, retry_receiver) = unbounded_channel::<RetryRawActivity>();
@ -69,13 +78,11 @@ impl RetryWorker {
client, client,
timeout, timeout,
stats, stats,
batch_sender: batch_sender.clone().downgrade(), batch_sender: batch_sender.downgrade(),
backoff, backoff,
http_signature_compat, http_signature_compat,
}); });
let loop_batch_sender = batch_sender.clone().downgrade();
let retry_task = tokio::spawn(async move { let retry_task = tokio::spawn(async move {
// This is the main worker queue, tasks sent here are sent immediately // This is the main worker queue, tasks sent here are sent immediately
let main_worker = worker.clone(); let main_worker = worker.clone();
@ -90,12 +97,7 @@ impl RetryWorker {
if let Some(retry_count) = retry_count { if let Some(retry_count) = retry_count {
// This task checks every hour anything that needs to be sent, based upon the last sent time // This task checks every hour anything that needs to be sent, based upon the last sent time
// If any tasks need to be sent, they are then sent to the retry queue // If any tasks need to be sent, they are then sent to the retry queue
let batch_loop = retry_loop( let batch_loop = retry_loop(backoff.pow(2), batch_receiver, retry_sender);
backoff.pow(2),
batch_receiver,
loop_batch_sender,
retry_sender,
);
let retry_queue = receiver_queue(retry_count, retry_receiver, move |message| { let retry_queue = receiver_queue(retry_count, retry_receiver, move |message| {
let worker = worker.clone(); let worker = worker.clone();
@ -179,60 +181,101 @@ impl RetryWorker {
} }
} }
/// Ordered list of raw activities based upon retry count
///
/// Uses separate binary heaps per count to keep things in order
///
/// When flushed it will go through each queue and check to see if there are any retries ready to be sent
///
/// If enought time has elapsed it'll send them with the sender, otherwise they'll stay in the queue
struct RetryQueue {
/// Queue per retry count for ordering
queues: BTreeMap<usize, BinaryHeap<RetryRawActivity>>,
sender: UnboundedSender<RetryRawActivity>,
sleep_interval: usize,
}
impl RetryQueue {
/// Push a raw activity onto the queue
fn push(&mut self, retry: RetryRawActivity) {
let queue = self.queues.entry(retry.count).or_default();
queue.push(retry);
}
/// Flush out & send any retries that need to be retried
fn flush(&mut self) {
let mut count = 0;
let mut total = 0;
// We check each queue separately
for (retry_count, queue) in self.queues.iter_mut() {
// We check the duration based on the retry count using an exponential backoff, i.e, 60s, 60m, 60h
let sleep_duration =
Duration::from_secs(self.sleep_interval.pow(*retry_count as u32) as u64);
total += queue.len();
'queue: loop {
match queue.pop() {
Some(retry) => {
// If the elapsed time is long enough we send it
if retry.last_sent.elapsed() > sleep_duration {
if let Err(err) = self.sender.send(retry) {
error!("Error sending retry: {err}");
}
count += 1;
// If it's too young, then we exit the loop
// No more entries after this will be old enough in the binary heap
} else {
queue.push(retry);
break 'queue;
}
}
None => break 'queue,
}
}
}
if total > 0 {
info!("Scheduled {count}/{total} activities for retry");
}
}
}
/// This is a retry loop that will simply send tasks in batches /// This is a retry loop that will simply send tasks in batches
/// It will check an incoming queue, and schedule any tasks that need to be sent /// It will check an incoming queue, and schedule any tasks that need to be sent
/// The current sleep interval here is 1 hour /// The current sleep interval here is 1 hour
async fn retry_loop( async fn retry_loop(
sleep_interval: usize, sleep_interval: usize,
mut batch_receiver: UnboundedReceiver<RetryRawActivity>, mut batch_receiver: UnboundedReceiver<RetryRawActivity>,
batch_sender: WeakUnboundedSender<RetryRawActivity>,
retry_sender: UnboundedSender<RetryRawActivity>, retry_sender: UnboundedSender<RetryRawActivity>,
) { ) {
let mut interval = tokio::time::interval(Duration::from_secs((sleep_interval) as u64)); let mut interval = tokio::time::interval(Duration::from_secs((sleep_interval) as u64));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut inner = RetryQueue {
queues: Default::default(),
sender: retry_sender,
sleep_interval,
};
loop { loop {
interval.tick().await; tokio::select! {
message = batch_receiver.recv() => {
// We requeue any messages to be checked next time if they haven't slept long enough yet match message {
let mut requeue_messages = Vec::new(); // We have a new message, add it to our queue
Some(retry) => {
// Grab all the activities that are in the queue inner.push(retry);
loop { },
// try_recv will not await anything // The receiver has dropped, so flush out everything and then exit the loop
match batch_receiver.try_recv() { None => {
Ok(message) => { inner.flush();
let sleep_duration = Duration::from_secs( break;
sleep_interval.pow(message.count as u32) as u64,
// Take off 1 second for tests to pass
) - Duration::from_secs(1);
// If the time between now and sending this message is greater than our sleep duration
if message.last_sent.elapsed() > sleep_duration {
if let Err(err) = retry_sender.send(message) {
error!("Couldn't wake up task for sending: {err}");
}
} else {
// If we haven't slept long enough, then we just add it to the end of the queue
requeue_messages.push(message);
} }
} }
Err(TryRecvError::Empty) => {
// no more to be had, break and wait for the next interval
break;
}
Err(TryRecvError::Disconnected) => {
return;
}
} }
} _ = interval.tick() => {
inner.flush();
// If there are any messages that need to be retried later on
if let Some(ref sender) = batch_sender.upgrade() {
for message in requeue_messages {
if let Err(err) = sender.send(message) {
error!("Couldn't wake up task for sending: {err}");
}
} }
} }
} }

View file

@ -16,7 +16,7 @@
//! ``` //! ```
use crate::{ use crate::{
activity_queue::{create_activity_queue, retry_queue::RetryQueue}, activity_queue::{create_activity_queue, queue::ActivityQueue},
error::Error, error::Error,
protocol::verification::verify_domains_match, protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
@ -98,7 +98,7 @@ pub struct FederationConfig<T: Clone> {
/// Queue for sending outgoing activities. Only optional to make builder work, its always /// Queue for sending outgoing activities. Only optional to make builder work, its always
/// present once constructed. /// present once constructed.
#[builder(setter(skip))] #[builder(setter(skip))]
pub(crate) activity_queue: Option<Arc<RetryQueue>>, pub(crate) activity_queue: Option<Arc<ActivityQueue>>,
} }
impl<T: Clone> FederationConfig<T> { impl<T: Clone> FederationConfig<T> {
@ -199,7 +199,7 @@ impl<T: Clone> FederationConfig<T> {
.take() .take()
.context("ActivityQueue never constructed, build() not called?")?; .context("ActivityQueue never constructed, build() not called?")?;
// Todo: use Arc::into_inner but is only part of rust 1.70. // Todo: use Arc::into_inner but is only part of rust 1.70.
let stats = Arc::<RetryQueue>::try_unwrap(q) let stats = Arc::<ActivityQueue>::try_unwrap(q)
.map_err(|_| { .map_err(|_| {
anyhow::anyhow!( anyhow::anyhow!(
"Could not cleanly shut down: activityqueue arc was still in use elsewhere " "Could not cleanly shut down: activityqueue arc was still in use elsewhere "