From 97632fdbfe439a25d311d7c8b07b895fcb846c04 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 13:03:41 +0900 Subject: [PATCH] Broadcast asynchronously --- plume-common/src/activity_pub/mod.rs | 44 +++++++++++++++++++--------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 3715aadd..d83bc735 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,6 +1,7 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, ClientBuilder, Url}; +use futures::future::join_all; +use reqwest::{header::HeaderValue, ClientBuilder, RequestBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, @@ -145,6 +146,29 @@ where .build() .expect("Error while initializing tokio runtime for federation"); rt.block_on(async { + let capacity = 50; + let (tx, rx) = flume::bounded::(capacity); + let mut handles = Vec::with_capacity(capacity); + for _ in 0..capacity { + let rx = rx.clone(); + let handle = rt.spawn(async move { + while let Ok(request_builder) = rx.recv_async().await { + let _ = request_builder + .send() + .await + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({:?})", &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); + handles.push(handle); + } for inbox in boxes { let body = signed.to_string(); let mut headers = request::headers(); @@ -165,7 +189,7 @@ where } headers.insert("Host", host_header_value.unwrap()); headers.insert("Digest", request::Digest::digest(&body)); - let _ = client + let request_builder = client .post(&inbox) .headers(headers.clone()) .header( @@ -173,19 +197,11 @@ where request::signature(sender, &headers, ("post", url.path(), url.query())) .expect("activity_pub::broadcast: request signature error"), ) - .body(body) - .send() - .await - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + .body(body); + tx.send_async(request_builder).await.unwrap(); } + drop(tx); + join_all(handles).await; }); }