From 5f78fd131ed1cd2254518feb85d9ea165177e49f Mon Sep 17 00:00:00 2001 From: Jack Allnutt Date: Thu, 10 Aug 2023 03:50:01 +0100 Subject: [PATCH] Add /ingest relay endpoint This adds a new `/ingest` endpoint to allow instances to opt-in to relaying their users' posts to a buzzrelay instance without necessarily subscribing to receive posts from the relay --- src/actor.rs | 6 ++++++ src/main.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/relay.rs | 9 +++++++-- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 843e4ee..a41e355 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -8,6 +8,7 @@ use crate::activitypub; pub enum ActorKind { TagRelay(String), InstanceRelay(String), + IngestRelay, } impl ActorKind { @@ -32,6 +33,7 @@ impl Actor { format!("https://{}/tag/{}", self.host, tag), ActorKind::InstanceRelay(instance) => format!("https://{}/instance/{}", self.host, instance), + ActorKind::IngestRelay => format!("https://{}/ingest", self.host), } } @@ -49,6 +51,8 @@ impl Actor { format!("#{}", tag), ActorKind::InstanceRelay(instance) => instance.to_string(), + ActorKind::IngestRelay => + self.host.to_string() }), icon: Some(activitypub::Media { media_type: "Image".to_string(), @@ -67,6 +71,8 @@ impl Actor { format!("tag-{}", tag), ActorKind::InstanceRelay(instance) => format!("instance-{}", instance), + ActorKind::IngestRelay => + String::from(env!("CARGO_PKG_NAME")), }), } } diff --git a/src/main.rs b/src/main.rs index 572952d..75e2233 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use sigh::{PrivateKey, PublicKey}; use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap}; use std::{panic, process}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tokio::sync::mpsc::{channel, Sender}; mod error; mod config; @@ -34,6 +35,7 @@ struct State { hostname: Arc, priv_key: PrivateKey, pub_key: PublicKey, + ingest_tx: Sender, } @@ -69,6 +71,9 @@ async fn webfinger( let at = resource.find('@'); (actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()), at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) + } else if resource.starts_with("acct:ingest") { + let at = resource.find('@'); + (actor::ActorKind::IngestRelay, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) } else { track_request("GET", "webfinger", "not_found"); return StatusCode::NOT_FOUND.into_response(); @@ -117,6 +122,18 @@ async fn get_instance_actor( .into_response() } +async fn get_ingest_actor( + axum::extract::State(state): axum::extract::State, +) -> Response { + track_request("GET", "actor", "ingest"); + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::IngestRelay, + }; + target.as_activitypub(&state.pub_key) + .into_response() +} + async fn post_tag_relay( axum::extract::State(state): axum::extract::State, Path(tag): Path, @@ -141,6 +158,17 @@ async fn post_instance_relay( post_relay(state, endpoint, target).await } +async fn post_ingest_relay( + axum::extract::State(state): axum::extract::State, + endpoint: endpoint::Endpoint<'_> +) -> Response{ + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::IngestRelay, + }; + post_relay(state, endpoint, target).await +} + async fn post_relay( state: State, endpoint: endpoint::Endpoint<'_>, @@ -242,6 +270,23 @@ async fn post_relay( ).into_response() } } + } else if let actor::ActorKind::IngestRelay = target.kind { + match state.ingest_tx.send(endpoint.payload.to_string()).await { + Ok(()) => { + track_request("POST", "relay", "ingest"); + (StatusCode::ACCEPTED, + [("content-type", "application/activity+json")], + "{}" + ).into_response() + }, + Err(e) => { + tracing::error!("ingest_tx.send: {}", e); + track_request("POST", "relay", "ingest"); + (StatusCode::INTERNAL_SERVER_ERROR, + format!("{}", e) + ).into_response() + } + } } else { track_request("POST", "relay", "unrecognized"); (StatusCode::BAD_REQUEST, "Not a recognized request").into_response() @@ -345,13 +390,16 @@ async fn main() { .unwrap() ); let hostname = Arc::new(config.hostname.clone()); - relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx); + let (ingest_tx, ingest_rx) = channel::(1024); + relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx, ingest_rx); let app = Router::new() .route("/tag/:tag", get(get_tag_actor).post(post_tag_relay)) .route("/instance/:instance", get(get_instance_actor).post(post_instance_relay)) + .route("/ingest", get(get_ingest_actor).post(post_ingest_relay)) .route("/tag/:tag/outbox", get(outbox)) .route("/instance/:instance/outbox", get(outbox)) + .route("/ingest/outbox", get(outbox)) .route("/.well-known/webfinger", get(webfinger)) .route("/.well-known/nodeinfo", get(nodeinfo)) .route("/metrics", get(|| async move { @@ -363,6 +411,7 @@ async fn main() { hostname, priv_key, pub_key, + ingest_tx }) .merge(SpaRouter::new("/", "static")); diff --git a/src/relay.rs b/src/relay.rs index f453c92..05969e9 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -6,6 +6,7 @@ use serde_json::json; use sigh::PrivateKey; use tokio::{ sync::mpsc::Receiver, + select, }; use crate::{db::Database, send, actor}; @@ -147,14 +148,18 @@ pub fn spawn( hostname: Arc, database: Database, private_key: PrivateKey, - mut stream_rx: Receiver + mut stream_rx: Receiver, + mut ingest_rx: Receiver, ) { let private_key = Arc::new(private_key); tokio::spawn(async move { let mut workers = HashMap::new(); - while let Some(data) = stream_rx.recv().await { + while let Some(data) = select!{ + data = stream_rx.recv() => data, + data = ingest_rx.recv() => data, + } { let t1 = Instant::now(); let post: Post = match serde_json::from_str(&data) { Ok(post) => post,