diff --git a/src/relay.rs b/src/relay.rs index 351b209..1fe374c 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,4 +1,5 @@ -use std::{sync::Arc, collections::HashSet, time::Instant}; +use std::{sync::Arc, collections::{HashSet, HashMap}, time::Instant}; +use futures::{channel::mpsc::{channel, Sender}, StreamExt}; use metrics::{increment_counter, histogram}; use serde::Deserialize; use serde_json::json; @@ -60,6 +61,39 @@ struct Tag<'a> { pub name: &'a str, } +struct Job { + body: Arc>, + key_id: String, + private_key: Arc, +} + +fn spawn_worker(client: Arc, inbox: String) -> Sender { + let (tx, mut rx) = channel(1024); + + tokio::spawn(async move { + while let Some(Job { key_id, private_key, body }) = rx.next().await { + tracing::debug!("relay to {}", inbox); + if let Err(e) = send::send_raw( + &client, &inbox, + &key_id, &private_key, body + ).await { + tracing::error!("relay::send {:?}", e); + } else { + // success + systemd::daemon::notify( + false, [ + (systemd::daemon::STATE_WATCHDOG, "1") + ].iter() + ).unwrap(); + } + } + + panic!("Worker dead"); + }); + + tx +} + pub fn spawn( client: Arc, hostname: Arc, @@ -70,6 +104,8 @@ pub fn spawn( let private_key = Arc::new(private_key); tokio::spawn(async move { + let mut workers = HashMap::new(); + while let Some(data) = stream_rx.recv().await { let t1 = Instant::now(); let post: Post = match serde_json::from_str(&data) { @@ -113,26 +149,15 @@ pub fn spawn( continue; } seen_inboxes.insert(inbox.clone()); - let client_ = client.clone(); - let body_ = body.clone(); - let key_id = actor.key_id(); - let private_key_ = private_key.clone(); - tracing::debug!("relay {} from {} to {}", &post_url, actor_id, inbox); - tokio::spawn(async move { - if let Err(e) = send::send_raw( - &client_, &inbox, - &key_id, &private_key_, body_ - ).await { - tracing::error!("relay::send {:?}", e); - } else { - // success - systemd::daemon::notify( - false, [ - (systemd::daemon::STATE_WATCHDOG, "1") - ].iter() - ).unwrap(); - } - }); + + let job = Job { + body: body.clone(), + key_id: actor.key_id(), + private_key: private_key.clone(), + }; + let tx = workers.entry(inbox.clone()) + .or_insert_with(|| spawn_worker(client.clone(), inbox.clone())); + let _ = tx.try_send(job); } seen_actors.insert(actor);