From 4d74af642e52539be2b710c9b672982e1cb1a0e7 Mon Sep 17 00:00:00 2001 From: Astro Date: Fri, 16 Jun 2023 21:25:24 +0200 Subject: [PATCH] relay: change queue from by actor to by host --- src/relay.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/relay.rs b/src/relay.rs index 6d574cd..1ac90dc 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -89,29 +89,30 @@ struct Job { body: Arc>, key_id: String, private_key: Arc, + inbox_url: reqwest::Url, } -fn spawn_worker(client: Arc, inbox: String) -> Sender { +fn spawn_worker(client: Arc) -> Sender { let (tx, mut rx) = channel(1024); tokio::spawn(async move { let mut errors = 0u32; let mut last_request = None; - while let Some(Job { post_url, actor_id, key_id, private_key, body }) = rx.next().await { + while let Some(Job { post_url, actor_id, key_id, private_key, body, inbox_url }) = rx.next().await { if errors > 0 && last_request.map_or(false, |last_request| Instant::now() - last_request < Duration::from_secs(10) * errors ) { // there have been errors, skip for time proportional // to the number of subsequent errors - tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox); + tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox_url); continue; } - tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox); + tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox_url); last_request = Some(Instant::now()); if let Err(e) = send::send_raw( - &client, &inbox, + &client, inbox_url.as_str(), &key_id, &private_key, body ).await { tracing::error!("relay::send {:?}", e); @@ -179,25 +180,34 @@ pub fn spawn( "object": &post.uri, "id": *post_url, }); + let Ok(post_url_url) = reqwest::Url::parse(&post_url) else { continue; }; let body = Arc::new( serde_json::to_vec(&body) .unwrap() ); for inbox in database.get_following_inboxes(&actor_id).await.unwrap() { + let Ok(inbox_url) = reqwest::Url::parse(&inbox) else { continue; }; + + // Avoid duplicate processing. if seen_inboxes.contains(&inbox) { continue; } - seen_inboxes.insert(inbox.clone()); + seen_inboxes.insert(inbox); + + // Lookup/create worker queue per inbox. + let tx = workers.entry(inbox_url.host_str().unwrap_or("").to_string()) + .or_insert_with(|| spawn_worker(client.clone())); + // Create queue item. let job = Job { post_url: post_url.clone(), actor_id: actor_id.clone(), body: body.clone(), key_id: actor.key_id(), private_key: private_key.clone(), + inbox_url, }; - let tx = workers.entry(inbox.clone()) - .or_insert_with(|| spawn_worker(client.clone(), inbox.clone())); + // Enqueue job for worker. let _ = tx.try_send(job); }