Add /ingest relay endpoint

This adds a new `/ingest` endpoint to allow instances to opt-in to relaying their users' posts to a buzzrelay instance without necessarily subscribing to receive posts from the relay
This commit is contained in:
Jack Allnutt 2023-08-10 03:50:01 +01:00
parent 276b78e8e0
commit 5f78fd131e
3 changed files with 63 additions and 3 deletions

View file

@ -8,6 +8,7 @@ use crate::activitypub;
pub enum ActorKind {
TagRelay(String),
InstanceRelay(String),
IngestRelay,
}
impl ActorKind {
@ -32,6 +33,7 @@ impl Actor {
format!("https://{}/tag/{}", self.host, tag),
ActorKind::InstanceRelay(instance) =>
format!("https://{}/instance/{}", self.host, instance),
ActorKind::IngestRelay => format!("https://{}/ingest", self.host),
}
}
@ -49,6 +51,8 @@ impl Actor {
format!("#{}", tag),
ActorKind::InstanceRelay(instance) =>
instance.to_string(),
ActorKind::IngestRelay =>
self.host.to_string()
}),
icon: Some(activitypub::Media {
media_type: "Image".to_string(),
@ -67,6 +71,8 @@ impl Actor {
format!("tag-{}", tag),
ActorKind::InstanceRelay(instance) =>
format!("instance-{}", instance),
ActorKind::IngestRelay =>
String::from(env!("CARGO_PKG_NAME")),
}),
}
}

View file

@ -13,6 +13,7 @@ use sigh::{PrivateKey, PublicKey};
use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap};
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tokio::sync::mpsc::{channel, Sender};
mod error;
mod config;
@ -34,6 +35,7 @@ struct State {
hostname: Arc<String>,
priv_key: PrivateKey,
pub_key: PublicKey,
ingest_tx: Sender<String>,
}
@ -69,6 +71,9 @@ async fn webfinger(
let at = resource.find('@');
(actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else if resource.starts_with("acct:ingest") {
let at = resource.find('@');
(actor::ActorKind::IngestRelay, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else {
track_request("GET", "webfinger", "not_found");
return StatusCode::NOT_FOUND.into_response();
@ -117,6 +122,18 @@ async fn get_instance_actor(
.into_response()
}
async fn get_ingest_actor(
axum::extract::State(state): axum::extract::State<State>,
) -> Response {
track_request("GET", "actor", "ingest");
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::IngestRelay,
};
target.as_activitypub(&state.pub_key)
.into_response()
}
async fn post_tag_relay(
axum::extract::State(state): axum::extract::State<State>,
Path(tag): Path<String>,
@ -141,6 +158,17 @@ async fn post_instance_relay(
post_relay(state, endpoint, target).await
}
async fn post_ingest_relay(
axum::extract::State(state): axum::extract::State<State>,
endpoint: endpoint::Endpoint<'_>
) -> Response{
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::IngestRelay,
};
post_relay(state, endpoint, target).await
}
async fn post_relay(
state: State,
endpoint: endpoint::Endpoint<'_>,
@ -242,6 +270,23 @@ async fn post_relay(
).into_response()
}
}
} else if let actor::ActorKind::IngestRelay = target.kind {
match state.ingest_tx.send(endpoint.payload.to_string()).await {
Ok(()) => {
track_request("POST", "relay", "ingest");
(StatusCode::ACCEPTED,
[("content-type", "application/activity+json")],
"{}"
).into_response()
},
Err(e) => {
tracing::error!("ingest_tx.send: {}", e);
track_request("POST", "relay", "ingest");
(StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e)
).into_response()
}
}
} else {
track_request("POST", "relay", "unrecognized");
(StatusCode::BAD_REQUEST, "Not a recognized request").into_response()
@ -345,13 +390,16 @@ async fn main() {
.unwrap()
);
let hostname = Arc::new(config.hostname.clone());
relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx);
let (ingest_tx, ingest_rx) = channel::<String>(1024);
relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx, ingest_rx);
let app = Router::new()
.route("/tag/:tag", get(get_tag_actor).post(post_tag_relay))
.route("/instance/:instance", get(get_instance_actor).post(post_instance_relay))
.route("/ingest", get(get_ingest_actor).post(post_ingest_relay))
.route("/tag/:tag/outbox", get(outbox))
.route("/instance/:instance/outbox", get(outbox))
.route("/ingest/outbox", get(outbox))
.route("/.well-known/webfinger", get(webfinger))
.route("/.well-known/nodeinfo", get(nodeinfo))
.route("/metrics", get(|| async move {
@ -363,6 +411,7 @@ async fn main() {
hostname,
priv_key,
pub_key,
ingest_tx
})
.merge(SpaRouter::new("/", "static"));

View file

@ -6,6 +6,7 @@ use serde_json::json;
use sigh::PrivateKey;
use tokio::{
sync::mpsc::Receiver,
select,
};
use crate::{db::Database, send, actor};
@ -147,14 +148,18 @@ pub fn spawn(
hostname: Arc<String>,
database: Database,
private_key: PrivateKey,
mut stream_rx: Receiver<String>
mut stream_rx: Receiver<String>,
mut ingest_rx: Receiver<String>,
) {
let private_key = Arc::new(private_key);
tokio::spawn(async move {
let mut workers = HashMap::new();
while let Some(data) = stream_rx.recv().await {
while let Some(data) = select!{
data = stream_rx.recv() => data,
data = ingest_rx.recv() => data,
} {
let t1 = Instant::now();
let post: Post = match serde_json::from_str(&data) {
Ok(post) => post,