working as intended

This commit is contained in:
Astro 2022-12-19 21:20:13 +01:00
parent bb781cb40f
commit a099746cf5
13 changed files with 723 additions and 154 deletions

2
.gitignore vendored
View file

@ -1 +1,3 @@
/target
/private-key.pem
/public-key.pem

259
Cargo.lock generated
View file

@ -181,6 +181,15 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.11.1"
@ -203,13 +212,21 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"sigh",
"thiserror",
"tokio",
"tokio-postgres",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.3.0"
@ -269,6 +286,25 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "cxx"
version = "1.0.83"
@ -313,6 +349,17 @@ dependencies = [
"syn",
]
[[package]]
name = "digest"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
"block-buffer",
"crypto-common",
"subtle",
]
[[package]]
name = "encoding_rs"
version = "0.8.31"
@ -333,6 +380,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "1.8.0"
@ -461,6 +514,27 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "h2"
version = "0.3.15"
@ -501,6 +575,15 @@ dependencies = [
"libc",
]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]]
name = "http"
version = "0.2.8"
@ -725,6 +808,15 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -918,6 +1010,24 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "phf"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@ -956,6 +1066,41 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.47"
@ -974,6 +1119,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -1163,6 +1338,30 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.9.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
@ -1175,6 +1374,8 @@ dependencies = [
[[package]]
name = "sigh"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17a67f3294aadf0022646d41c33b888d79db23670e4c34b1bc007c6f901a3b77"
dependencies = [
"base64 0.20.0",
"http",
@ -1192,6 +1393,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "slab"
version = "0.4.7"
@ -1217,6 +1424,22 @@ dependencies = [
"winapi",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.105"
@ -1353,6 +1576,30 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
@ -1498,6 +1745,12 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicase"
version = "2.6.0"
@ -1534,6 +1787,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "unsafe-libyaml"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2"
[[package]]
name = "url"
version = "2.3.1"

View file

@ -13,11 +13,13 @@ tracing = "*"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = "1"
serde_json = "1"
serde_yaml = "0.9"
reqwest = { version = "0.11", features = ["json", "stream"] }
sigh = { path = "../rust-sigh" }
sigh = "1.0"
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
thiserror = "1"
http = "0.2"
chrono = "0.4"
eventsource-stream = "0.2"
futures = "0.3"
tokio-postgres = "0.7"

5
config.toml Normal file
View file

@ -0,0 +1,5 @@
hostname: relay.fedi.buzz
listen_port: 3000
priv_key_file: private-key.pem
pub_key_file: public-key.pem
db: "host=localhost user=relay password=xyz dbname=buzzrelay"

View file

@ -1,3 +1,4 @@
use axum::{response::IntoResponse, Json};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -8,11 +9,10 @@ pub struct Actor {
pub actor_type: String,
pub id: String,
pub inbox: String,
// pub outbox: String,
#[serde(rename = "publicKey")]
pub public_key: ActorPublicKey,
#[serde(rename = "preferredUsername")]
pub preferredUsername: Option<String>,
pub preferred_username: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -35,3 +35,10 @@ pub struct Action<O> {
pub to: Option<String>,
pub object: Option<O>,
}
impl IntoResponse for Actor {
fn into_response(self) -> axum::response::Response {
([("content-type", "application/activity+json")],
Json(self)).into_response()
}
}

52
src/actor.rs Normal file
View file

@ -0,0 +1,52 @@
use std::sync::Arc;
use sigh::{PublicKey, Key};
use crate::activitypub;
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum ActorKind {
TagRelay(String),
InstanceRelay(String),
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Actor {
pub host: Arc<String>,
pub kind: ActorKind,
}
impl Actor {
pub fn uri(&self) -> String {
match &self.kind {
ActorKind::TagRelay(tag) =>
format!("https://{}/tag/{}", self.host, tag),
ActorKind::InstanceRelay(instance) =>
format!("https://{}/instance/{}", self.host, instance),
}
}
pub fn key_id(&self) -> String {
format!("{}#key", self.uri())
}
pub fn as_activitypub(&self, pub_key: &PublicKey) -> activitypub::Actor {
activitypub::Actor {
jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()),
actor_type: "Service".to_string(),
id: self.uri(),
inbox: self.uri(),
// outbox: "https://relay.fedi.buzz/outbox".to_string(),
public_key: activitypub::ActorPublicKey {
id: self.key_id(),
owner: Some(self.uri()),
pem: pub_key.to_pem().unwrap(),
},
preferred_username: Some(match &self.kind {
ActorKind::TagRelay(tag) =>
format!("tag-{}", tag),
ActorKind::InstanceRelay(instance) =>
format!("instance-{}", instance),
}),
}
}
}

40
src/config.rs Normal file
View file

@ -0,0 +1,40 @@
use serde::Deserialize;
use sigh::{PrivateKey, PublicKey, Key};
fn default_upstream() -> String {
"fedi.buzz".to_string()
}
#[derive(Deserialize)]
pub struct Config {
#[serde(default = "default_upstream")]
pub upstream: String,
pub db: String,
pub hostname: String,
pub listen_port: u16,
priv_key_file: String,
pub_key_file: String,
}
impl Config {
pub fn load(config_file: &str) -> Config {
let data = std::fs::read_to_string(config_file)
.expect("read config");
serde_yaml::from_str(&data)
.expect("parse config")
}
pub fn priv_key(&self) -> PrivateKey {
let data = std::fs::read_to_string(&self.priv_key_file)
.expect("read priv_key_file");
PrivateKey::from_pem(data.as_bytes())
.expect("priv_key")
}
pub fn pub_key(&self) -> PublicKey {
let data = std::fs::read_to_string(&self.pub_key_file)
.expect("read pub_key_file");
PublicKey::from_pem(data.as_bytes())
.expect("pub_key")
}
}

79
src/db.rs Normal file
View file

@ -0,0 +1,79 @@
use std::sync::Arc;
use tokio_postgres::{Client, Error, NoTls, Statement};
use crate::actor;
const CREATE_SCHEMA_COMMANDS: &[&str] = &[
"CREATE TABLE IF NOT EXISTS follows (id TEXT, inbox TEXT, actor TEXT, UNIQUE (inbox, actor))",
"CREATE INDEX ON follows (actor) INCLUDE (inbox)",
];
#[derive(Clone)]
pub struct Database {
inner: Arc<DatabaseInner>,
}
struct DatabaseInner {
client: Client,
add_follow: Statement,
del_follow: Statement,
get_following_inboxes: Statement,
}
impl Database {
pub async fn connect(conn_str: &str) -> Self {
let (client, connection) = tokio_postgres::connect(conn_str, NoTls)
.await
.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::error!("postgresql: {}", e);
}
});
for command in CREATE_SCHEMA_COMMANDS {
client.execute(*command, &[])
.await
.unwrap();
}
let add_follow = client.prepare("INSERT INTO follows (id, inbox, actor) VALUES ($1, $2, $3)")
.await
.unwrap();
let del_follow = client.prepare("DELETE FROM follows WHERE id=$1 AND actor=$2")
.await
.unwrap();
let get_following_inboxes = client.prepare("SELECT DISTINCT inbox FROM follows WHERE actor=$1")
.await
.unwrap();
Database {
inner: Arc::new(DatabaseInner {
client,
add_follow,
del_follow,
get_following_inboxes,
}),
}
}
pub async fn add_follow(&self, id: &str, inbox: &str, actor: &str) -> Result<(), Error> {
self.inner.client.execute(&self.inner.add_follow, &[&id, &inbox, &actor])
.await?;
Ok(())
}
pub async fn del_follow(&self, id: &str, actor: &str) -> Result<(), Error> {
self.inner.client.execute(&self.inner.del_follow, &[&id, &actor])
.await?;
Ok(())
}
pub async fn get_following_inboxes(&self, actor: &str) -> Result<impl Iterator<Item = String>, Error> {
let rows = self.inner.client.query(&self.inner.get_following_inboxes, &[&actor])
.await?;
Ok(rows.into_iter()
.map(|row| row.get(0))
)
}
}

View file

@ -1,6 +1,6 @@
use axum::{
async_trait,
extract::{FromRequest, FromRef},
extract::{FromRequest, FromRef, Path, Query},
http::{header::CONTENT_TYPE, Request, StatusCode},
response::{IntoResponse, Response},
routing::{get, post},
@ -9,27 +9,29 @@ use axum::{
use serde::{Deserialize, Serialize};
use serde_json::json;
use sigh::{PrivateKey, PublicKey, alg::{RsaSha256, Algorithm}, Key};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap};
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod config;
mod actor;
mod db;
mod fetch;
pub use fetch::fetch;
mod send;
pub use send::send;
mod stream;
mod relay;
mod activitypub;
mod webfinger;
mod endpoint;
const ACTOR_ID: &str = "https://relay.fedi.buzz/actor";
const ACTOR_KEY: &str = "https://relay.fedi.buzz/actor#key";
#[derive(Debug, Clone)]
#[derive(Clone)]
struct State {
database: db::Database,
client: Arc<reqwest::Client>,
private_key: PrivateKey,
public_key: PublicKey,
hostname: Arc<String>,
priv_key: PrivateKey,
pub_key: PublicKey,
}
@ -39,30 +41,101 @@ impl FromRef<State> for Arc<reqwest::Client> {
}
}
async fn actor(axum::extract::State(state): axum::extract::State<State>) -> Response {
let id = ACTOR_ID.to_string();
([("content-type", "application/activity+json")],
Json(activitypub::Actor {
jsonld_context: json!([
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
]),
actor_type: "Service".to_string(),
id: id.clone(),
inbox: "https://relay.fedi.buzz/relay".to_string(),
// outbox: "https://relay.fedi.buzz/outbox".to_string(),
public_key: activitypub::ActorPublicKey {
id: ACTOR_KEY.to_string(),
owner: Some(id.clone()),
pem: state.public_key.to_pem().unwrap(),
},
preferredUsername: Some("buzzrelay".to_string()),
})).into_response()
async fn webfinger(
axum::extract::State(state): axum::extract::State<State>,
Query(params): Query<HashMap<String, String>>,
) -> Response {
let resource = match params.get("resource") {
Some(resource) => resource,
None => return StatusCode::NOT_FOUND.into_response(),
};
let (target_kind, target_host) =
if resource.starts_with("acct:tag-") {
let off = "acct:tag-".len();
let at = resource.find('@');
(actor::ActorKind::TagRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else if resource.starts_with("acct:instance-") {
let off = "acct:instance-".len();
let at = resource.find('@');
(actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else {
return StatusCode::NOT_FOUND.into_response();
};
let target = actor::Actor {
host: target_host,
kind: target_kind,
};
Json(json!({
"subject": &resource,
"aliases": &[
target.uri(),
],
"links": &[json!({
"rel": "self",
"type": "application/activity+json",
"href": target.uri(),
})],
})).into_response()
}
async fn handler(
async fn get_tag_actor(
axum::extract::State(state): axum::extract::State<State>,
Path(tag): Path<String>
) -> Response {
// TODO: downcase
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::TagRelay(tag),
};
target.as_activitypub(&state.pub_key)
.into_response()
}
async fn get_instance_actor(
axum::extract::State(state): axum::extract::State<State>,
Path(instance): Path<String>
) -> Response {
// TODO: downcase
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::InstanceRelay(instance),
};
target.as_activitypub(&state.pub_key)
.into_response()
}
async fn post_tag_relay(
axum::extract::State(state): axum::extract::State<State>,
Path(tag): Path<String>,
endpoint: endpoint::Endpoint
) -> Response {
// TODO: downcase
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::TagRelay(tag),
};
post_relay(state, endpoint, target).await
}
async fn post_instance_relay(
axum::extract::State(state): axum::extract::State<State>,
Path(instance): Path<String>,
endpoint: endpoint::Endpoint
) -> Response {
// TODO: downcase
let target = actor::Actor {
host: state.hostname.clone(),
kind: actor::ActorKind::InstanceRelay(instance),
};
post_relay(state, endpoint, target).await
}
async fn post_relay(
state: State,
endpoint: endpoint::Endpoint,
target: actor::Actor
) -> Response {
dbg!(&endpoint);
let action = match serde_json::from_value::<activitypub::Action<serde_json::Value>>(endpoint.payload.clone()) {
@ -72,26 +145,39 @@ async fn handler(
format!("Bad action: {:?}", e)
).into_response(),
};
// endpoint.actor.inbox
// endpoint.actor.id
if action.action_type == "Follow" {
let private_key = state.private_key.clone();
let priv_key = state.priv_key.clone();
let client = state.client.clone();
tokio::spawn(async move {
let accept = activitypub::Action {
jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()),
action_type: "Accept".to_string(),
actor: ACTOR_ID.to_string(),
actor: target.uri(),
to: Some(endpoint.actor.id.clone()),
id: action.id,
object: Some(endpoint.payload),
};
send::send(
let result = send::send(
client.as_ref(), &endpoint.actor.inbox,
ACTOR_KEY,
&private_key,
accept,
).await
.map_err(|e| tracing::error!("post accept: {}", e));
&target.key_id(),
&priv_key,
&accept,
).await;
match result {
Ok(()) => {
state.database.add_follow(
&endpoint.actor.id,
&endpoint.actor.inbox,
&target.uri(),
).await.unwrap();
}
Err(e) => {
tracing::error!("post accept: {}", e);
}
}
});
(StatusCode::ACCEPTED,
@ -99,16 +185,15 @@ async fn handler(
"{}"
).into_response()
} else {
// TODO: Undo Follow
(StatusCode::BAD_REQUEST, "Not a recognized request").into_response()
}
}
async fn inbox() -> impl IntoResponse {
StatusCode::OK
}
#[tokio::main]
async fn main() {
exit_on_panic();
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
@ -118,8 +203,15 @@ async fn main() {
.with(tracing_subscriber::fmt::layer())
.init();
let (private_key, public_key) = RsaSha256.generate_keys().unwrap();
let stream_rx = stream::spawn("fedi.buzz");
let config = config::Config::load(
&std::env::args()
.skip(1)
.next()
.expect("Call with config.yaml")
);
let database = db::Database::connect(&config.db).await;
let stream_rx = stream::spawn(config.upstream.clone());
let client = Arc::new(
reqwest::Client::builder()
.timeout(Duration::from_secs(5))
@ -133,43 +225,34 @@ async fn main() {
.build()
.unwrap()
);
relay::spawn(client.clone(), ACTOR_KEY.to_string(), private_key.clone(), stream_rx);
let relay_url = "https://relay.dresden.network/inbox";
let client_ = client.clone();
let private_key_ = private_key.clone();
tokio::spawn(async move {
let follow = activitypub::Action::<()> {
jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()),
action_type: "Follow".to_string(),
actor: ACTOR_ID.to_string(),
to: Some(relay_url.to_string()),
id: "fnord".to_string(),
object: None,
};
send::send(
client_.as_ref(), relay_url,
ACTOR_KEY,
&private_key_,
follow,
).await
.map_err(|e| tracing::error!("post accept: {}", e));
});
let hostname = Arc::new(config.hostname.clone());
relay::spawn(client.clone(), hostname.clone(), database.clone(), config.priv_key(), stream_rx);
let app = Router::new()
.route("/actor", get(actor))
.route("/relay", post(handler))
.route("/inbox", post(inbox))
.route("/.well-known/webfinger", get(webfinger::webfinger))
.route("/tag/:tag", get(get_tag_actor).post(post_tag_relay))
.route("/instance/:instance", get(get_instance_actor).post(post_instance_relay))
.route("/.well-known/webfinger", get(webfinger))
.with_state(State {
database,
client,
private_key, public_key,
hostname,
priv_key: config.priv_key(),
pub_key: config.pub_key(),
});
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
let addr = SocketAddr::from(([127, 0, 0, 1], config.listen_port));
tracing::info!("serving on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
fn exit_on_panic() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
}

View file

@ -1,49 +1,114 @@
use std::sync::Arc;
use std::{sync::Arc, collections::HashSet};
use serde::Deserialize;
use serde_json::json;
use sigh::PrivateKey;
use tokio::{
sync::mpsc::Receiver,
};
use crate::send::send;
use crate::{db::Database, send, actor};
#[derive(Deserialize)]
struct Post<'a> {
// pub url: &'a str,
pub uri: &'a str,
pub tags: Option<Vec<Tag<'a>>>,
}
impl Post<'_> {
pub fn host(&self) -> Option<String> {
reqwest::Url::parse(&self.uri)
.ok()
.and_then(|url| url.domain()
.map(|s| s.to_lowercase())
)
}
pub fn tags(&self) -> Vec<String> {
match &self.tags {
None =>
vec![],
Some(tags) =>
tags.iter()
.map(|tag| tag.name.to_lowercase())
.collect()
}
}
fn relay_target_kinds(&self) -> impl Iterator<Item = actor::ActorKind> {
self.host()
.into_iter()
.map(|host| actor::ActorKind::InstanceRelay(host.clone()))
.chain(
self.tags()
.into_iter()
.map(|tag| actor::ActorKind::TagRelay(tag))
)
}
pub fn relay_targets(&self, hostname: Arc<String>) -> impl Iterator<Item = actor::Actor> {
self.relay_target_kinds()
.map(move |kind| actor::Actor {
host: hostname.clone(),
kind,
})
}
}
#[derive(Deserialize)]
struct Tag<'a> {
pub name: &'a str,
}
pub fn spawn(
client: Arc<reqwest::Client>,
key_id: String,
hostname: Arc<String>,
database: Database,
private_key: PrivateKey,
mut stream_rx: Receiver<serde_json::Value>
mut stream_rx: Receiver<String>
) {
tokio::spawn(async move {
while let Some(post) = stream_rx.recv().await {
dbg!(&post);
let url = if let Some(serde_json::Value::String(url)) = post.get("url") {
url
} else {
continue;
while let Some(data) = stream_rx.recv().await {
// dbg!(&data);
let post: Post = match serde_json::from_str(&data) {
Ok(post) => post,
Err(e) => {
tracing::error!("parse error: {}", e);
tracing::trace!("data: {}", data);
continue;
}
};
let uri = if let Some(serde_json::Value::String(uri)) = post.get("uri") {
uri
} else {
continue;
};
let account = if let Some(serde_json::Value::String(account)) = post.get("account").and_then(|a| a.get("url")) {
account
} else {
continue;
};
// {"@context": "https://www.w3.org/ns/activitystreams", "type": "Announce", "to": ["https://relay.dresden.network/followers"], "actor": "https://relay.dresden.network/actor", "object": "https://mastodon.online/users/evangreer/statuses/109521063161210607", "id": "https://relay.dresden.network/activities/5e41fd9c-bc51-408c-94ca-96a7bf9ce412"}
let body = json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"actor": "https://relay.fedi.buzz/actor",
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": &uri,
"id": &url,
});
dbg!(&body);
send(&client, "https://c3d2.social/inbox",
&key_id, &private_key, body).await
.map_err(|e| tracing::error!("relay::send {:?}", e));
// TODO: queue by target?
let mut seen = HashSet::new();
for actor in post.relay_targets(hostname.clone()) {
if seen.contains(&actor) {
continue;
}
let actor_id = actor.uri();
let body = json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"actor": &actor_id,
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": &post.uri,
"id": &post.uri,
});
let body = Arc::new(
serde_json::to_vec(&body)
.unwrap()
);
for inbox in database.get_following_inboxes(&actor_id).await.unwrap() {
if let Err(e) = send::send_raw(
&client, &inbox,
&actor.key_id(), &private_key, body.clone()
).await {
tracing::error!("relay::send {:?}", e);
}
}
seen.insert(actor);
}
}
});
}

View file

@ -1,3 +1,5 @@
use std::{sync::Arc, ops::Deref};
use futures::StreamExt;
use http::StatusCode;
use http_digest_headers::{DigestHeader, DigestMethod};
@ -28,10 +30,22 @@ pub async fn send<T: Serialize>(
uri: &str,
key_id: &str,
private_key: &PrivateKey,
body: T,
body: &T,
) -> Result<(), SendError> {
let body = Arc::new(
serde_json::to_vec(body)
.map_err(SendError::Json)?
);
send_raw(client, uri, key_id, private_key, body).await
}
pub async fn send_raw(
client: &reqwest::Client,
uri: &str,
key_id: &str,
private_key: &PrivateKey,
body: Arc<Vec<u8>>,
) -> Result<(), SendError> {
let body = serde_json::to_vec(&body)
.map_err(SendError::Json)?;
let mut digest_header = DigestHeader::new()
.with_method(DigestMethod::SHA256, &body)
.map(|h| format!("{}", h))
@ -55,7 +69,7 @@ pub async fn send<T: Serialize>(
.header("date", chrono::Utc::now().to_rfc2822()
.replace("+0000", "GMT"))
.header("digest", digest_header)
.body(body)
.body(body.as_ref().clone())
.map_err(SendError::HttpReq)?;
SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?;

View file

@ -13,7 +13,7 @@ pub enum StreamError {
InvalidContentType,
}
async fn run(host: &str) -> Result<impl Stream<Item = serde_json::Value>, StreamError> {
async fn run(host: &str) -> Result<impl Stream<Item = String>, StreamError> {
let url = format!("https://{}/api/v1/streaming/public", host);
let client = reqwest::Client::new();
let res = client.get(url)
@ -40,19 +40,11 @@ async fn run(host: &str) -> Result<impl Stream<Item = serde_json::Value>, Stream
None
}
})
.filter_map(|event| async move {
match serde_json::from_str(&event.data) {
Ok(post) => Some(post),
Err(e) => {
tracing::error!("Decode stream: {}", e);
None
}
}
});
.map(|event| event.data);
Ok(src)
}
pub fn spawn<H: Into<String>>(host: H) -> Receiver<serde_json::Value> {
pub fn spawn<H: Into<String>>(host: H) -> Receiver<String> {
let host = host.into();
let (tx, rx) = channel(1024);
tokio::spawn(async move {

View file

@ -1,31 +0,0 @@
use std::collections::HashMap;
use axum::{
async_trait,
body::{Bytes, HttpBody},
extract::{Query},
http::{header::CONTENT_TYPE, Request, StatusCode},
Json,
response::{IntoResponse, Response},
routing::post,
Form, RequestExt, Router, BoxError,
};
use serde_json::json;
pub async fn webfinger(Query(params): Query<HashMap<String, String>>) -> Response {
let resource = match params.get("resource") {
Some(resource) => resource,
None => return StatusCode::NOT_FOUND.into_response(),
};
Json(json!({
"subject": &resource,
"aliases": &[
"https://relay.fedi.buzz/actor",
],
"links": &[json!({
"rel": "self",
"type": "application/activity+json",
"href": "https://relay.fedi.buzz/actor",
})],
})).into_response()
}