relay: add backoff on error

This commit is contained in:
Astro 2023-05-15 01:14:42 +02:00
parent 7b93419a91
commit fc659b57a1

View file

@ -1,4 +1,4 @@
use std::{sync::Arc, collections::{HashSet, HashMap}, time::Instant}; use std::{sync::Arc, collections::{HashSet, HashMap}, time::{Duration, Instant}};
use futures::{channel::mpsc::{channel, Sender}, StreamExt}; use futures::{channel::mpsc::{channel, Sender}, StreamExt};
use metrics::{increment_counter, histogram}; use metrics::{increment_counter, histogram};
use serde::Deserialize; use serde::Deserialize;
@ -73,15 +73,30 @@ fn spawn_worker(client: Arc<reqwest::Client>, inbox: String) -> Sender<Job> {
let (tx, mut rx) = channel(1024); let (tx, mut rx) = channel(1024);
tokio::spawn(async move { tokio::spawn(async move {
let mut errors = 0u32;
let mut last_request = None;
while let Some(Job { post_url, actor_id, key_id, private_key, body }) = rx.next().await { while let Some(Job { post_url, actor_id, key_id, private_key, body }) = rx.next().await {
if errors > 0 && last_request.map_or(false, |last_request|
Instant::now() - last_request < Duration::from_secs(10) * errors
) {
// there have been errors, skip for time proportional
// to the number of subsequent errors
tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox);
continue;
}
tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox); tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox);
last_request = Some(Instant::now());
if let Err(e) = send::send_raw( if let Err(e) = send::send_raw(
&client, &inbox, &client, &inbox,
&key_id, &private_key, body &key_id, &private_key, body
).await { ).await {
tracing::error!("relay::send {:?}", e); tracing::error!("relay::send {:?}", e);
errors = errors.saturating_add(1);
} else { } else {
// success // success
errors = 0;
systemd::daemon::notify( systemd::daemon::notify(
false, [ false, [
(systemd::daemon::STATE_WATCHDOG, "1") (systemd::daemon::STATE_WATCHDOG, "1")