buzzrelay/src/relay.rs

175 lines
5.2 KiB
Rust
Raw Normal View History

use std::{sync::Arc, collections::{HashSet, HashMap}, time::Instant};
use futures::{channel::mpsc::{channel, Sender}, StreamExt};
2022-12-20 02:59:32 +00:00
use metrics::{increment_counter, histogram};
2022-12-19 20:20:13 +00:00
use serde::Deserialize;
2022-12-18 20:31:50 +00:00
use serde_json::json;
use sigh::PrivateKey;
use tokio::{
sync::mpsc::Receiver,
};
2022-12-19 20:20:13 +00:00
use crate::{db::Database, send, actor};
#[derive(Deserialize)]
struct Post<'a> {
2022-12-19 22:01:53 +00:00
pub url: Option<&'a str>,
2022-12-19 20:20:13 +00:00
pub uri: &'a str,
pub tags: Option<Vec<Tag<'a>>>,
}
impl Post<'_> {
pub fn host(&self) -> Option<String> {
2022-12-19 22:01:53 +00:00
reqwest::Url::parse(self.url?)
2022-12-19 20:20:13 +00:00
.ok()
.and_then(|url| url.domain()
2022-12-19 23:15:00 +00:00
.map(str::to_lowercase)
2022-12-19 20:20:13 +00:00
)
}
pub fn tags(&self) -> Vec<String> {
match &self.tags {
None =>
vec![],
Some(tags) =>
tags.iter()
.map(|tag| tag.name.to_string())
2022-12-19 20:20:13 +00:00
.collect()
}
}
fn relay_target_kinds(&self) -> impl Iterator<Item = actor::ActorKind> {
self.host()
.into_iter()
2022-12-19 23:15:00 +00:00
.map(actor::ActorKind::InstanceRelay)
2022-12-19 20:20:13 +00:00
.chain(
self.tags()
.into_iter()
.map(|ref s| actor::ActorKind::from_tag(s))
2022-12-19 20:20:13 +00:00
)
}
pub fn relay_targets(&self, hostname: Arc<String>) -> impl Iterator<Item = actor::Actor> {
self.relay_target_kinds()
.map(move |kind| actor::Actor {
host: hostname.clone(),
kind,
})
}
}
#[derive(Deserialize)]
struct Tag<'a> {
pub name: &'a str,
}
2022-12-18 20:31:50 +00:00
struct Job {
body: Arc<Vec<u8>>,
key_id: String,
private_key: Arc<PrivateKey>,
}
fn spawn_worker(client: Arc<reqwest::Client>, inbox: String) -> Sender<Job> {
let (tx, mut rx) = channel(1024);
tokio::spawn(async move {
while let Some(Job { key_id, private_key, body }) = rx.next().await {
tracing::debug!("relay to {}", inbox);
if let Err(e) = send::send_raw(
&client, &inbox,
&key_id, &private_key, body
).await {
tracing::error!("relay::send {:?}", e);
} else {
// success
systemd::daemon::notify(
false, [
(systemd::daemon::STATE_WATCHDOG, "1")
].iter()
).unwrap();
}
}
panic!("Worker dead");
});
tx
}
2022-12-18 20:31:50 +00:00
pub fn spawn(
client: Arc<reqwest::Client>,
2022-12-19 20:20:13 +00:00
hostname: Arc<String>,
database: Database,
2022-12-18 20:31:50 +00:00
private_key: PrivateKey,
2022-12-19 20:20:13 +00:00
mut stream_rx: Receiver<String>
2022-12-18 20:31:50 +00:00
) {
2022-12-19 23:04:56 +00:00
let private_key = Arc::new(private_key);
2022-12-18 20:31:50 +00:00
tokio::spawn(async move {
let mut workers = HashMap::new();
2022-12-19 20:20:13 +00:00
while let Some(data) = stream_rx.recv().await {
2022-12-20 02:59:32 +00:00
let t1 = Instant::now();
2022-12-19 20:20:13 +00:00
let post: Post = match serde_json::from_str(&data) {
Ok(post) => post,
Err(e) => {
tracing::error!("parse error: {}", e);
tracing::trace!("data: {}", data);
continue;
}
2022-12-18 20:31:50 +00:00
};
2022-12-19 22:01:53 +00:00
let post_url = match post.url {
Some(url) => url,
// skip reposts
2022-12-20 02:13:44 +00:00
None => {
2022-12-20 03:10:45 +00:00
increment_counter!("relay_posts_total", "action" => "skip");
2022-12-20 02:13:44 +00:00
continue;
}
2022-12-19 22:01:53 +00:00
};
let mut seen_actors = HashSet::new();
let mut seen_inboxes = HashSet::new();
2022-12-19 20:20:13 +00:00
for actor in post.relay_targets(hostname.clone()) {
2022-12-19 22:01:53 +00:00
if seen_actors.contains(&actor) {
2022-12-19 20:20:13 +00:00
continue;
}
let actor_id = actor.uri();
let body = json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"actor": &actor_id,
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": &post.uri,
2022-12-19 22:01:53 +00:00
"id": &post_url,
2022-12-19 20:20:13 +00:00
});
let body = Arc::new(
serde_json::to_vec(&body)
.unwrap()
);
for inbox in database.get_following_inboxes(&actor_id).await.unwrap() {
2022-12-19 22:01:53 +00:00
if seen_inboxes.contains(&inbox) {
continue;
}
2022-12-19 23:04:56 +00:00
seen_inboxes.insert(inbox.clone());
let job = Job {
body: body.clone(),
key_id: actor.key_id(),
private_key: private_key.clone(),
};
let tx = workers.entry(inbox.clone())
.or_insert_with(|| spawn_worker(client.clone(), inbox.clone()));
let _ = tx.try_send(job);
2022-12-19 20:20:13 +00:00
}
2022-12-19 22:01:53 +00:00
seen_actors.insert(actor);
2022-12-19 20:20:13 +00:00
}
2022-12-20 02:13:44 +00:00
if seen_inboxes.is_empty() {
2022-12-20 03:10:45 +00:00
increment_counter!("relay_posts_total", "action" => "no_relay");
2022-12-20 02:13:44 +00:00
} else {
2022-12-20 03:10:45 +00:00
increment_counter!("relay_posts_total", "action" => "relay");
2022-12-20 02:13:44 +00:00
}
2022-12-20 02:59:32 +00:00
let t2 = Instant::now();
2022-12-20 03:10:45 +00:00
histogram!("relay_post_duration", t2 - t1);
2022-12-18 20:31:50 +00:00
}
});
}