relay: concurrentize

This commit is contained in:
Astro 2022-12-20 00:04:56 +01:00
parent 970f8e8d1b
commit 7399f9c631
2 changed files with 16 additions and 11 deletions

View file

@ -67,6 +67,8 @@ pub fn spawn(
private_key: PrivateKey, private_key: PrivateKey,
mut stream_rx: Receiver<String> mut stream_rx: Receiver<String>
) { ) {
let private_key = Arc::new(private_key);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(data) = stream_rx.recv().await { while let Some(data) = stream_rx.recv().await {
// dbg!(&data); // dbg!(&data);
@ -84,7 +86,6 @@ pub fn spawn(
// skip reposts // skip reposts
None => continue, None => continue,
}; };
// TODO: queue by target?
let mut seen_actors = HashSet::new(); let mut seen_actors = HashSet::new();
let mut seen_inboxes = HashSet::new(); let mut seen_inboxes = HashSet::new();
for actor in post.relay_targets(hostname.clone()) { for actor in post.relay_targets(hostname.clone()) {
@ -109,16 +110,20 @@ pub fn spawn(
if seen_inboxes.contains(&inbox) { if seen_inboxes.contains(&inbox) {
continue; continue;
} }
seen_inboxes.insert(inbox.clone());
let client_ = client.clone();
let body_ = body.clone();
let key_id = actor.key_id();
let private_key_ = private_key.clone();
tracing::debug!("relay {} to {}", actor_id, inbox); tracing::debug!("relay {} to {}", actor_id, inbox);
tokio::spawn(async move {
if let Err(e) = send::send_raw( if let Err(e) = send::send_raw(
&client, &inbox, &client_, &inbox,
&actor.key_id(), &private_key, body.clone() &key_id, &private_key_, body_
).await { ).await {
tracing::error!("relay::send {:?}", e); tracing::error!("relay::send {:?}", e);
} }
});
seen_inboxes.insert(inbox);
} }
seen_actors.insert(actor); seen_actors.insert(actor);

View file

@ -71,7 +71,7 @@ pub async fn send_raw(
.header("digest", digest_header) .header("digest", digest_header)
.body(body.as_ref().clone()) .body(body.as_ref().clone())
.map_err(SendError::HttpReq)?; .map_err(SendError::HttpReq)?;
SigningConfig::new(RsaSha256, private_key, key_id) SigningConfig::new(RsaSha256, &private_key, key_id)
.sign(&mut req)?; .sign(&mut req)?;
let req: reqwest::Request = req.try_into()?; let req: reqwest::Request = req.try_into()?;
let res = client.execute(req) let res = client.execute(req)