This commit is contained in:
Jack Allnutt 2023-10-09 19:31:09 -07:00 committed by GitHub
commit f9f485496f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 27 deletions

View file

@ -6,8 +6,9 @@ use crate::activitypub;
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum ActorKind {
TagRelay(String),
InstanceRelay(String),
Tag(String),
Instance(String),
Ingest,
}
impl ActorKind {
@ -15,7 +16,7 @@ impl ActorKind {
let tag = deunicode(tag)
.to_lowercase()
.replace(char::is_whitespace, "");
ActorKind::TagRelay(tag)
ActorKind::Tag(tag)
}
}
@ -28,10 +29,11 @@ pub struct Actor {
impl Actor {
pub fn uri(&self) -> String {
match &self.kind {
ActorKind::TagRelay(tag) =>
ActorKind::Tag(tag) =>
format!("https://{}/tag/{}", self.host, tag),
ActorKind::InstanceRelay(instance) =>
ActorKind::Instance(instance) =>
format!("https://{}/instance/{}", self.host, instance),
ActorKind::Ingest => format!("https://{}/ingest", self.host),
}
}
@ -45,10 +47,12 @@ impl Actor {
actor_type: "Service".to_string(),
id: self.uri(),
name: Some(match &self.kind {
ActorKind::TagRelay(tag) =>
ActorKind::Tag(tag) =>
format!("#{}", tag),
ActorKind::InstanceRelay(instance) =>
ActorKind::Instance(instance) =>
instance.to_string(),
ActorKind::Ingest =>
self.host.to_string()
}),
icon: Some(activitypub::Media {
media_type: "Image".to_string(),
@ -63,10 +67,12 @@ impl Actor {
pem: pub_key.to_pem().unwrap(),
},
preferred_username: Some(match &self.kind {
ActorKind::TagRelay(tag) =>
ActorKind::Tag(tag) =>
format!("tag-{}", tag),
ActorKind::InstanceRelay(instance) =>
ActorKind::Instance(instance) =>
format!("instance-{}", instance),
ActorKind::Ingest =>
String::from(env!("CARGO_PKG_NAME")),
}),
}
}

View file

@ -13,6 +13,7 @@ use sigh::{PrivateKey, PublicKey};
use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap};
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tokio::sync::mpsc::{channel, Sender};
mod error;
mod config;
@ -34,6 +35,7 @@ struct State {
hostname: Arc<String>,
priv_key: PrivateKey,
pub_key: PublicKey,
ingest_tx: Sender<String>,
}
@ -62,13 +64,16 @@ async fn webfinger(
if resource.starts_with("acct:tag-") {
let off = "acct:tag-".len();
let at = resource.find('@');
(actor::ActorKind::TagRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
(actor::ActorKind::Tag(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else if resource.starts_with("acct:instance-") {
let off = "acct:instance-".len();
let at = resource.find('@');
(actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
(actor::ActorKind::Instance(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else if resource.starts_with("acct:ingest") {
let at = resource.find('@');
(actor::ActorKind::Ingest, at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else {
track_request("GET", "webfinger", "not_found");
return StatusCode::NOT_FOUND.into_response();
@ -111,7 +116,19 @@ async fn get_instance_actor(
track_request("GET", "actor", "instance");
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()),
kind: actor::ActorKind::Instance(instance.to_lowercase()),
};
target.as_activitypub(&state.pub_key)
.into_response()
}
async fn get_ingest_actor(
axum::extract::State(state): axum::extract::State<State>,
) -> Response {
track_request("GET", "actor", "ingest");
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::Ingest,
};
target.as_activitypub(&state.pub_key)
.into_response()
@ -136,7 +153,18 @@ async fn post_instance_relay(
) -> Response {
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()),
kind: actor::ActorKind::Instance(instance.to_lowercase()),
};
post_relay(state, endpoint, target).await
}
async fn post_ingest_relay(
axum::extract::State(state): axum::extract::State<State>,
endpoint: endpoint::Endpoint<'_>
) -> Response{
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::Ingest,
};
post_relay(state, endpoint, target).await
}
@ -242,6 +270,23 @@ async fn post_relay(
).into_response()
}
}
} else if let actor::ActorKind::Ingest = target.kind {
match state.ingest_tx.send(endpoint.payload.to_string()).await {
Ok(()) => {
track_request("POST", "relay", "ingest");
(StatusCode::ACCEPTED,
[("content-type", "application/activity+json")],
"{}"
).into_response()
},
Err(e) => {
tracing::error!("ingest_tx.send: {}", e);
track_request("POST", "relay", "ingest");
(StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e)
).into_response()
}
}
} else {
track_request("POST", "relay", "unrecognized");
(StatusCode::BAD_REQUEST, "Not a recognized request").into_response()
@ -345,13 +390,16 @@ async fn main() {
.unwrap()
);
let hostname = Arc::new(config.hostname.clone());
relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx);
let (ingest_tx, ingest_rx) = channel::<String>(1024);
relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx, ingest_rx);
let app = Router::new()
.route("/tag/:tag", get(get_tag_actor).post(post_tag_relay))
.route("/instance/:instance", get(get_instance_actor).post(post_instance_relay))
.route("/ingest", get(get_ingest_actor).post(post_ingest_relay))
.route("/tag/:tag/outbox", get(outbox))
.route("/instance/:instance/outbox", get(outbox))
.route("/ingest/outbox", get(outbox))
.route("/.well-known/webfinger", get(webfinger))
.route("/.well-known/nodeinfo", get(nodeinfo))
.route("/metrics", get(|| async move {
@ -363,6 +411,7 @@ async fn main() {
hostname,
priv_key,
pub_key,
ingest_tx
})
.merge(SpaRouter::new("/", "static"));

View file

@ -6,6 +6,7 @@ use serde_json::json;
use sigh::PrivateKey;
use tokio::{
sync::mpsc::Receiver,
select,
};
use crate::{db::Database, send, actor};
@ -39,7 +40,7 @@ impl Post<'_> {
fn relay_target_kinds(&self) -> impl Iterator<Item = actor::ActorKind> {
self.host()
.into_iter()
.map(actor::ActorKind::InstanceRelay)
.map(actor::ActorKind::Instance)
.chain(
self.tags()
.into_iter()
@ -147,14 +148,18 @@ pub fn spawn(
hostname: Arc<String>,
database: Database,
private_key: PrivateKey,
mut stream_rx: Receiver<String>
mut stream_rx: Receiver<String>,
mut ingest_rx: Receiver<String>,
) {
let private_key = Arc::new(private_key);
tokio::spawn(async move {
let mut workers = HashMap::new();
while let Some(data) = stream_rx.recv().await {
while let Some(data) = select!{
data = stream_rx.recv() => data,
data = ingest_rx.recv() => data,
} {
let t1 = Instant::now();
let post: Post = match serde_json::from_str(&data) {
Ok(post) => post,
@ -254,8 +259,8 @@ mod test {
}]),
};
let mut kinds = post.relay_target_kinds();
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("foo".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Tag("foo".to_string())));
assert_eq!(kinds.next(), None);
}
@ -269,7 +274,7 @@ mod test {
}]),
};
let mut kinds = post.relay_target_kinds();
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string())));
assert_eq!(kinds.next(), None);
}
@ -283,8 +288,8 @@ mod test {
}]),
};
let mut kinds = post.relay_target_kinds();
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("23".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Tag("23".to_string())));
assert_eq!(kinds.next(), None);
}
@ -298,9 +303,9 @@ mod test {
}]),
};
let mut kinds = post.relay_target_kinds();
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd1302".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("dd".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Tag("dd1302".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Tag("dd".to_string())));
assert_eq!(kinds.next(), None);
}
@ -314,8 +319,8 @@ mod test {
}]),
};
let mut kinds = post.relay_target_kinds();
assert_eq!(kinds.next(), Some(ActorKind::InstanceRelay("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::TagRelay("sukoteitusiyuhuorudoronguhea".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Instance("example.com".to_string())));
assert_eq!(kinds.next(), Some(ActorKind::Tag("sukoteitusiyuhuorudoronguhea".to_string())));
assert_eq!(kinds.next(), None);
}
}