2023-05-14 23:14:42 +00:00
|
|
|
use std::{sync::Arc, collections::{HashSet, HashMap}, time::{Duration, Instant}};
|
2023-05-14 22:50:59 +00:00
|
|
|
use futures::{channel::mpsc::{channel, Sender}, StreamExt};
|
2022-12-20 02:59:32 +00:00
|
|
|
use metrics::{increment_counter, histogram};
|
2022-12-19 20:20:13 +00:00
|
|
|
use serde::Deserialize;
|
2022-12-18 20:31:50 +00:00
|
|
|
use serde_json::json;
|
|
|
|
use sigh::PrivateKey;
|
|
|
|
use tokio::{
|
|
|
|
sync::mpsc::Receiver,
|
|
|
|
};
|
2022-12-19 20:20:13 +00:00
|
|
|
use crate::{db::Database, send, actor};
|
|
|
|
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct Post<'a> {
|
2022-12-19 22:01:53 +00:00
|
|
|
pub url: Option<&'a str>,
|
2022-12-19 20:20:13 +00:00
|
|
|
pub uri: &'a str,
|
|
|
|
pub tags: Option<Vec<Tag<'a>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Post<'_> {
|
|
|
|
pub fn host(&self) -> Option<String> {
|
2022-12-19 22:01:53 +00:00
|
|
|
reqwest::Url::parse(self.url?)
|
2022-12-19 20:20:13 +00:00
|
|
|
.ok()
|
|
|
|
.and_then(|url| url.domain()
|
2022-12-19 23:15:00 +00:00
|
|
|
.map(str::to_lowercase)
|
2022-12-19 20:20:13 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn tags(&self) -> Vec<String> {
|
|
|
|
match &self.tags {
|
|
|
|
None =>
|
|
|
|
vec![],
|
|
|
|
Some(tags) =>
|
|
|
|
tags.iter()
|
2023-03-03 01:19:48 +00:00
|
|
|
.map(|tag| tag.name.to_string())
|
2022-12-19 20:20:13 +00:00
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn relay_target_kinds(&self) -> impl Iterator<Item = actor::ActorKind> {
|
|
|
|
self.host()
|
|
|
|
.into_iter()
|
2022-12-19 23:15:00 +00:00
|
|
|
.map(actor::ActorKind::InstanceRelay)
|
2022-12-19 20:20:13 +00:00
|
|
|
.chain(
|
|
|
|
self.tags()
|
|
|
|
.into_iter()
|
2023-06-16 19:32:18 +00:00
|
|
|
.flat_map(|ref s| {
|
2023-06-16 18:28:43 +00:00
|
|
|
// Don't handle the empty hashtag `#`
|
|
|
|
if s.is_empty() {
|
|
|
|
return vec![];
|
|
|
|
}
|
|
|
|
|
|
|
|
let actor1 = actor::ActorKind::from_tag(s);
|
|
|
|
|
|
|
|
// Distribute hashtags that end in a date to
|
|
|
|
// followers of the hashtag with the date
|
|
|
|
// stripped. Example: #dd1302 -> #dd
|
2023-06-23 16:49:12 +00:00
|
|
|
let mut first_trailing_digit = 0;
|
|
|
|
let mut scanning_digits = false;
|
|
|
|
for (pos, c) in s.char_indices() {
|
|
|
|
if char::is_digit(c, 10) {
|
|
|
|
if ! scanning_digits {
|
|
|
|
first_trailing_digit = pos;
|
|
|
|
scanning_digits = true;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
scanning_digits = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if scanning_digits && first_trailing_digit > 0 {
|
2023-06-16 18:28:43 +00:00
|
|
|
let tag = &s[..first_trailing_digit];
|
|
|
|
let actor2 = actor::ActorKind::from_tag(tag);
|
|
|
|
vec![actor1, actor2]
|
|
|
|
} else {
|
|
|
|
vec![actor1]
|
|
|
|
}
|
|
|
|
})
|
2022-12-19 20:20:13 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn relay_targets(&self, hostname: Arc<String>) -> impl Iterator<Item = actor::Actor> {
|
|
|
|
self.relay_target_kinds()
|
|
|
|
.map(move |kind| actor::Actor {
|
|
|
|
host: hostname.clone(),
|
|
|
|
kind,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct Tag<'a> {
|
|
|
|
pub name: &'a str,
|
|
|
|
}
|
2022-12-18 20:31:50 +00:00
|
|
|
|
2023-05-14 22:50:59 +00:00
|
|
|
struct Job {
|
2023-05-14 23:03:23 +00:00
|
|
|
post_url: Arc<String>,
|
|
|
|
actor_id: Arc<String>,
|
2023-05-14 22:50:59 +00:00
|
|
|
body: Arc<Vec<u8>>,
|
|
|
|
key_id: String,
|
|
|
|
private_key: Arc<PrivateKey>,
|
2023-06-16 19:25:24 +00:00
|
|
|
inbox_url: reqwest::Url,
|
2023-05-14 22:50:59 +00:00
|
|
|
}
|
|
|
|
|
2023-06-16 19:25:24 +00:00
|
|
|
fn spawn_worker(client: Arc<reqwest::Client>) -> Sender<Job> {
|
2023-10-09 00:25:25 +00:00
|
|
|
let (tx, mut rx) = channel(8);
|
2023-05-14 22:50:59 +00:00
|
|
|
|
|
|
|
tokio::spawn(async move {
|
2023-05-14 23:14:42 +00:00
|
|
|
let mut errors = 0u32;
|
|
|
|
let mut last_request = None;
|
|
|
|
|
2023-06-16 19:25:24 +00:00
|
|
|
while let Some(Job { post_url, actor_id, key_id, private_key, body, inbox_url }) = rx.next().await {
|
2023-05-14 23:14:42 +00:00
|
|
|
if errors > 0 && last_request.map_or(false, |last_request|
|
|
|
|
Instant::now() - last_request < Duration::from_secs(10) * errors
|
|
|
|
) {
|
|
|
|
// there have been errors, skip for time proportional
|
|
|
|
// to the number of subsequent errors
|
2023-06-16 19:25:24 +00:00
|
|
|
tracing::trace!("skip {} from {} to {}", post_url, actor_id, inbox_url);
|
2023-05-14 23:14:42 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-06-23 23:21:03 +00:00
|
|
|
tracing::debug!("relay {} from {} to {}", post_url, actor_id, inbox_url);
|
2023-05-14 23:14:42 +00:00
|
|
|
last_request = Some(Instant::now());
|
2023-05-14 22:50:59 +00:00
|
|
|
if let Err(e) = send::send_raw(
|
2023-06-16 19:25:24 +00:00
|
|
|
&client, inbox_url.as_str(),
|
2023-05-14 22:50:59 +00:00
|
|
|
&key_id, &private_key, body
|
|
|
|
).await {
|
|
|
|
tracing::error!("relay::send {:?}", e);
|
2023-05-14 23:14:42 +00:00
|
|
|
errors = errors.saturating_add(1);
|
2023-05-14 22:50:59 +00:00
|
|
|
} else {
|
|
|
|
// success
|
2023-05-14 23:14:42 +00:00
|
|
|
errors = 0;
|
2023-05-14 22:50:59 +00:00
|
|
|
systemd::daemon::notify(
|
|
|
|
false, [
|
|
|
|
(systemd::daemon::STATE_WATCHDOG, "1")
|
|
|
|
].iter()
|
|
|
|
).unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
panic!("Worker dead");
|
|
|
|
});
|
|
|
|
|
|
|
|
tx
|
|
|
|
}
|
|
|
|
|
2022-12-18 20:31:50 +00:00
|
|
|
pub fn spawn(
|
|
|
|
client: Arc<reqwest::Client>,
|
2022-12-19 20:20:13 +00:00
|
|
|
hostname: Arc<String>,
|
|
|
|
database: Database,
|
2022-12-18 20:31:50 +00:00
|
|
|
private_key: PrivateKey,
|
2022-12-19 20:20:13 +00:00
|
|
|
mut stream_rx: Receiver<String>
|
2022-12-18 20:31:50 +00:00
|
|
|
) {
|
2022-12-19 23:04:56 +00:00
|
|
|
let private_key = Arc::new(private_key);
|
|
|
|
|
2022-12-18 20:31:50 +00:00
|
|
|
tokio::spawn(async move {
|
2023-05-14 22:50:59 +00:00
|
|
|
let mut workers = HashMap::new();
|
|
|
|
|
2022-12-19 20:20:13 +00:00
|
|
|
while let Some(data) = stream_rx.recv().await {
|
2022-12-20 02:59:32 +00:00
|
|
|
let t1 = Instant::now();
|
2022-12-19 20:20:13 +00:00
|
|
|
let post: Post = match serde_json::from_str(&data) {
|
|
|
|
Ok(post) => post,
|
|
|
|
Err(e) => {
|
|
|
|
tracing::error!("parse error: {}", e);
|
|
|
|
tracing::trace!("data: {}", data);
|
|
|
|
continue;
|
|
|
|
}
|
2022-12-18 20:31:50 +00:00
|
|
|
};
|
2022-12-19 22:01:53 +00:00
|
|
|
let post_url = match post.url {
|
2023-05-14 23:03:23 +00:00
|
|
|
Some(ref url) => Arc::new(url.to_string()),
|
2022-12-19 22:01:53 +00:00
|
|
|
// skip reposts
|
2022-12-20 02:13:44 +00:00
|
|
|
None => {
|
2022-12-20 03:10:45 +00:00
|
|
|
increment_counter!("relay_posts_total", "action" => "skip");
|
2022-12-20 02:13:44 +00:00
|
|
|
continue;
|
|
|
|
}
|
2022-12-19 22:01:53 +00:00
|
|
|
};
|
|
|
|
let mut seen_actors = HashSet::new();
|
|
|
|
let mut seen_inboxes = HashSet::new();
|
2023-08-07 17:24:13 +00:00
|
|
|
let published = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
|
2022-12-19 20:20:13 +00:00
|
|
|
for actor in post.relay_targets(hostname.clone()) {
|
2022-12-19 22:01:53 +00:00
|
|
|
if seen_actors.contains(&actor) {
|
2022-12-19 20:20:13 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-05-14 23:03:23 +00:00
|
|
|
let actor_id = Arc::new(actor.uri());
|
2023-06-25 00:51:20 +00:00
|
|
|
let announce_id = format!("https://{}/announce/{}", hostname, urlencoding::encode(&post_url));
|
2022-12-19 20:20:13 +00:00
|
|
|
let body = json!({
|
|
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
|
|
"type": "Announce",
|
2023-05-14 23:18:38 +00:00
|
|
|
"actor": *actor_id,
|
2023-08-07 17:24:13 +00:00
|
|
|
"published": &published,
|
2022-12-19 20:20:13 +00:00
|
|
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
|
|
"object": &post.uri,
|
2023-06-25 00:51:20 +00:00
|
|
|
"id": announce_id,
|
2022-12-19 20:20:13 +00:00
|
|
|
});
|
2023-06-16 19:25:24 +00:00
|
|
|
let Ok(post_url_url) = reqwest::Url::parse(&post_url) else { continue; };
|
2022-12-19 20:20:13 +00:00
|
|
|
let body = Arc::new(
|
|
|
|
serde_json::to_vec(&body)
|
|
|
|
.unwrap()
|
|
|
|
);
|
|
|
|
for inbox in database.get_following_inboxes(&actor_id).await.unwrap() {
|
2023-06-16 19:25:24 +00:00
|
|
|
let Ok(inbox_url) = reqwest::Url::parse(&inbox) else { continue; };
|
|
|
|
|
|
|
|
// Avoid duplicate processing.
|
2022-12-19 22:01:53 +00:00
|
|
|
if seen_inboxes.contains(&inbox) {
|
|
|
|
continue;
|
|
|
|
}
|
2023-06-16 19:25:24 +00:00
|
|
|
seen_inboxes.insert(inbox);
|
|
|
|
|
2023-06-16 19:25:46 +00:00
|
|
|
// Prevent relaying back to the originating instance.
|
|
|
|
if inbox_url.host_str() == post_url_url.host_str() {
|
|
|
|
continue;
|
|
|
|
}
|
2023-05-14 22:50:59 +00:00
|
|
|
|
2023-06-16 19:25:24 +00:00
|
|
|
// Lookup/create worker queue per inbox.
|
|
|
|
let tx = workers.entry(inbox_url.host_str().unwrap_or("").to_string())
|
|
|
|
.or_insert_with(|| spawn_worker(client.clone()));
|
|
|
|
// Create queue item.
|
2023-05-14 22:50:59 +00:00
|
|
|
let job = Job {
|
2023-05-14 23:03:23 +00:00
|
|
|
post_url: post_url.clone(),
|
|
|
|
actor_id: actor_id.clone(),
|
2023-05-14 22:50:59 +00:00
|
|
|
body: body.clone(),
|
|
|
|
key_id: actor.key_id(),
|
|
|
|
private_key: private_key.clone(),
|
2023-06-16 19:25:24 +00:00
|
|
|
inbox_url,
|
2023-05-14 22:50:59 +00:00
|
|
|
};
|
2023-06-16 19:25:24 +00:00
|
|
|
// Enqueue job for worker.
|
2023-05-14 22:50:59 +00:00
|
|
|
let _ = tx.try_send(job);
|
2022-12-19 20:20:13 +00:00
|
|
|
}
|
|
|
|
|
2022-12-19 22:01:53 +00:00
|
|
|
seen_actors.insert(actor);
|
2022-12-19 20:20:13 +00:00
|
|
|
}
|
2022-12-20 02:13:44 +00:00
|
|
|
if seen_inboxes.is_empty() {
|
2022-12-20 03:10:45 +00:00
|
|
|
increment_counter!("relay_posts_total", "action" => "no_relay");
|
2022-12-20 02:13:44 +00:00
|
|
|
} else {
|
2022-12-20 03:10:45 +00:00
|
|
|
increment_counter!("relay_posts_total", "action" => "relay");
|
2022-12-20 02:13:44 +00:00
|
|
|
}
|
2022-12-20 02:59:32 +00:00
|
|
|
let t2 = Instant::now();
|
2022-12-20 03:10:45 +00:00
|
|
|
histogram!("relay_post_duration", t2 - t1);
|
2022-12-18 20:31:50 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2023-06-16 18:28:43 +00:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
|
|
|
use actor::ActorKind;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn post_relay_kind() {
|
|
|
|
let post = Post {
|
|
|
|
url: Some("http://example.com/post/1"),
|
|
|
|
uri: "http://example.com/post/1",
|
|
|
|
tags: Some(vec![Tag {
|
|
|
|
name: "foo",
|
|
|
|
}]),
|
|
|
|
};
|
|
|
|
let mut kinds = post.relay_target_kinds();
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("foo".to_string())));
|
|
|
|
assert_eq!(kinds.next(), None);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn post_relay_kind_empty() {
|
|
|
|
let post = Post {
|
|
|
|
url: Some("http://example.com/post/1"),
|
|
|
|
uri: "http://example.com/post/1",
|
|
|
|
tags: Some(vec![Tag {
|
|
|
|
name: "",
|
|
|
|
}]),
|
|
|
|
};
|
|
|
|
let mut kinds = post.relay_target_kinds();
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
|
|
|
|
assert_eq!(kinds.next(), None);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn post_relay_kind_numeric() {
|
|
|
|
let post = Post {
|
|
|
|
url: Some("http://example.com/post/1"),
|
|
|
|
uri: "http://example.com/post/1",
|
|
|
|
tags: Some(vec![Tag {
|
|
|
|
name: "23",
|
|
|
|
}]),
|
|
|
|
};
|
|
|
|
let mut kinds = post.relay_target_kinds();
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("23".to_string())));
|
|
|
|
assert_eq!(kinds.next(), None);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn post_relay_kind_date() {
|
|
|
|
let post = Post {
|
|
|
|
url: Some("http://example.com/post/1"),
|
|
|
|
uri: "http://example.com/post/1",
|
|
|
|
tags: Some(vec![Tag {
|
|
|
|
name: "dd1302",
|
|
|
|
}]),
|
|
|
|
};
|
|
|
|
let mut kinds = post.relay_target_kinds();
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd1302".to_string())));
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd".to_string())));
|
|
|
|
assert_eq!(kinds.next(), None);
|
|
|
|
}
|
2023-06-23 16:49:12 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn post_relay_kind_jp() {
|
|
|
|
let post = Post {
|
|
|
|
url: Some("http://example.com/post/1"),
|
|
|
|
uri: "http://example.com/post/1",
|
|
|
|
tags: Some(vec![Tag {
|
|
|
|
name: "スコティッシュ・フォールド・ロングヘアー",
|
|
|
|
}]),
|
|
|
|
};
|
|
|
|
let mut kinds = post.relay_target_kinds();
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
|
|
|
|
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("sukoteitusiyuhuorudoronguhea".to_string())));
|
|
|
|
assert_eq!(kinds.next(), None);
|
|
|
|
}
|
2023-06-16 18:28:43 +00:00
|
|
|
}
|