relay: change queue from by actor to by host

This commit is contained in:
Astro 2023-06-16 21:25:24 +02:00
parent 585f26eab5
commit 4d74af642e

View file

@ -89,29 +89,30 @@ struct Job {
body: Arc<Vec<u8>>, body: Arc<Vec<u8>>,
key_id: String, key_id: String,
private_key: Arc<PrivateKey>, private_key: Arc<PrivateKey>,
inbox_url: reqwest::Url,
} }
fn spawn_worker(client: Arc<reqwest::Client>, inbox: String) -> Sender<Job> { fn spawn_worker(client: Arc<reqwest::Client>) -> Sender<Job> {
let (tx, mut rx) = channel(1024); let (tx, mut rx) = channel(1024);
tokio::spawn(async move { tokio::spawn(async move {
let mut errors = 0u32; let mut errors = 0u32;
let mut last_request = None; let mut last_request = None;
while let Some(Job { post_url, actor_id, key_id, private_key, body }) = rx.next().await { while let Some(Job { post_url, actor_id, key_id, private_key, body, inbox_url }) = rx.next().await {
if errors > 0 && last_request.map_or(false, |last_request| if errors > 0 && last_request.map_or(false, |last_request|
Instant::now() - last_request < Duration::from_secs(10) * errors Instant::now() - last_request < Duration::from_secs(10) * errors
) { ) {
// there have been errors, skip for time proportional // there have been errors, skip for time proportional
// to the number of subsequent errors // to the number of subsequent errors
tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox); tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox_url);
continue; continue;
} }
tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox); tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox_url);
last_request = Some(Instant::now()); last_request = Some(Instant::now());
if let Err(e) = send::send_raw( if let Err(e) = send::send_raw(
&client, &inbox, &client, inbox_url.as_str(),
&key_id, &private_key, body &key_id, &private_key, body
).await { ).await {
tracing::error!("relay::send {:?}", e); tracing::error!("relay::send {:?}", e);
@ -179,25 +180,34 @@ pub fn spawn(
"object": &post.uri, "object": &post.uri,
"id": *post_url, "id": *post_url,
}); });
let Ok(post_url_url) = reqwest::Url::parse(&post_url) else { continue; };
let body = Arc::new( let body = Arc::new(
serde_json::to_vec(&body) serde_json::to_vec(&body)
.unwrap() .unwrap()
); );
for inbox in database.get_following_inboxes(&actor_id).await.unwrap() { for inbox in database.get_following_inboxes(&actor_id).await.unwrap() {
let Ok(inbox_url) = reqwest::Url::parse(&inbox) else { continue; };
// Avoid duplicate processing.
if seen_inboxes.contains(&inbox) { if seen_inboxes.contains(&inbox) {
continue; continue;
} }
seen_inboxes.insert(inbox.clone()); seen_inboxes.insert(inbox);
// Lookup/create worker queue per inbox.
let tx = workers.entry(inbox_url.host_str().unwrap_or("").to_string())
.or_insert_with(|| spawn_worker(client.clone()));
// Create queue item.
let job = Job { let job = Job {
post_url: post_url.clone(), post_url: post_url.clone(),
actor_id: actor_id.clone(), actor_id: actor_id.clone(),
body: body.clone(), body: body.clone(),
key_id: actor.key_id(), key_id: actor.key_id(),
private_key: private_key.clone(), private_key: private_key.clone(),
inbox_url,
}; };
let tx = workers.entry(inbox.clone()) // Enqueue job for worker.
.or_insert_with(|| spawn_worker(client.clone(), inbox.clone()));
let _ = tx.try_send(job); let _ = tx.try_send(job);
} }