diff --git a/.gitignore b/.gitignore index ea8c4bf..a411107 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +/private-key.pem +/public-key.pem diff --git a/Cargo.lock b/Cargo.lock index 1ad9f16..ce92ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,15 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -203,13 +212,21 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_yaml", "sigh", "thiserror", "tokio", + "tokio-postgres", "tracing", "tracing-subscriber", ] +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.3.0" @@ -269,6 +286,25 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "cpufeatures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "cxx" version = "1.0.83" @@ -313,6 +349,17 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "encoding_rs" version = "0.8.31" @@ -333,6 +380,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastrand" version = "1.8.0" @@ -461,6 +514,27 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + [[package]] name = "h2" version = "0.3.15" @@ -501,6 +575,15 @@ dependencies = [ "libc", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.8" @@ -725,6 +808,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" +[[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest", +] + [[package]] name = "memchr" version = "2.5.0" @@ -918,6 +1010,24 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "phf" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -956,6 +1066,41 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +[[package]] +name = "postgres-protocol" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.47" @@ -974,6 +1119,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1163,6 +1338,30 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + +[[package]] +name = "sha2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -1175,6 +1374,8 @@ dependencies = [ [[package]] name = "sigh" version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17a67f3294aadf0022646d41c33b888d79db23670e4c34b1bc007c6f901a3b77" dependencies = [ "base64 0.20.0", "http", @@ -1192,6 +1393,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.7" @@ -1217,6 +1424,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.105" @@ -1353,6 +1576,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.4" @@ -1498,6 +1745,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + [[package]] name = "unicase" version = "2.6.0" @@ -1534,6 +1787,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unsafe-libyaml" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" + [[package]] name = "url" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index 65820f3..a959c14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,13 @@ tracing = "*" tracing-subscriber = { version = "0.3", features = ["env-filter"] } serde = "1" serde_json = "1" +serde_yaml = "0.9" reqwest = { version = "0.11", features = ["json", "stream"] } -sigh = { path = "../rust-sigh" } +sigh = "1.0" http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] } thiserror = "1" http = "0.2" chrono = "0.4" eventsource-stream = "0.2" futures = "0.3" +tokio-postgres = "0.7" diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..73fa539 --- /dev/null +++ b/config.toml @@ -0,0 +1,5 @@ +hostname: relay.fedi.buzz +listen_port: 3000 +priv_key_file: private-key.pem +pub_key_file: public-key.pem +db: "host=localhost user=relay password=xyz dbname=buzzrelay" diff --git a/src/activitypub.rs b/src/activitypub.rs index 74efb45..7f05b08 100644 --- a/src/activitypub.rs +++ b/src/activitypub.rs @@ -1,3 +1,4 @@ +use axum::{response::IntoResponse, Json}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -8,11 +9,10 @@ pub struct Actor { pub actor_type: String, pub id: String, pub inbox: String, - // pub outbox: String, #[serde(rename = "publicKey")] pub public_key: ActorPublicKey, #[serde(rename = "preferredUsername")] - pub preferredUsername: Option, + pub preferred_username: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -35,3 +35,10 @@ pub struct Action { pub to: Option, pub object: Option, } + +impl IntoResponse for Actor { + fn into_response(self) -> axum::response::Response { + ([("content-type", "application/activity+json")], + Json(self)).into_response() + } +} diff --git a/src/actor.rs b/src/actor.rs new file mode 100644 index 0000000..f6c3b69 --- /dev/null +++ b/src/actor.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; +use sigh::{PublicKey, Key}; + +use crate::activitypub; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum ActorKind { + TagRelay(String), + InstanceRelay(String), +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct Actor { + pub host: Arc, + pub kind: ActorKind, +} + +impl Actor { + pub fn uri(&self) -> String { + match &self.kind { + ActorKind::TagRelay(tag) => + format!("https://{}/tag/{}", self.host, tag), + ActorKind::InstanceRelay(instance) => + format!("https://{}/instance/{}", self.host, instance), + } + } + + pub fn key_id(&self) -> String { + format!("{}#key", self.uri()) + } + + pub fn as_activitypub(&self, pub_key: &PublicKey) -> activitypub::Actor { + activitypub::Actor { + jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()), + actor_type: "Service".to_string(), + id: self.uri(), + inbox: self.uri(), + // outbox: "https://relay.fedi.buzz/outbox".to_string(), + public_key: activitypub::ActorPublicKey { + id: self.key_id(), + owner: Some(self.uri()), + pem: pub_key.to_pem().unwrap(), + }, + preferred_username: Some(match &self.kind { + ActorKind::TagRelay(tag) => + format!("tag-{}", tag), + ActorKind::InstanceRelay(instance) => + format!("instance-{}", instance), + }), + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..65b01f3 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,40 @@ +use serde::Deserialize; +use sigh::{PrivateKey, PublicKey, Key}; + +fn default_upstream() -> String { + "fedi.buzz".to_string() +} + +#[derive(Deserialize)] +pub struct Config { + #[serde(default = "default_upstream")] + pub upstream: String, + pub db: String, + pub hostname: String, + pub listen_port: u16, + priv_key_file: String, + pub_key_file: String, +} + +impl Config { + pub fn load(config_file: &str) -> Config { + let data = std::fs::read_to_string(config_file) + .expect("read config"); + serde_yaml::from_str(&data) + .expect("parse config") + } + + pub fn priv_key(&self) -> PrivateKey { + let data = std::fs::read_to_string(&self.priv_key_file) + .expect("read priv_key_file"); + PrivateKey::from_pem(data.as_bytes()) + .expect("priv_key") + } + + pub fn pub_key(&self) -> PublicKey { + let data = std::fs::read_to_string(&self.pub_key_file) + .expect("read pub_key_file"); + PublicKey::from_pem(data.as_bytes()) + .expect("pub_key") + } +} diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..0e937f8 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; +use tokio_postgres::{Client, Error, NoTls, Statement}; + +use crate::actor; + +const CREATE_SCHEMA_COMMANDS: &[&str] = &[ + "CREATE TABLE IF NOT EXISTS follows (id TEXT, inbox TEXT, actor TEXT, UNIQUE (inbox, actor))", + "CREATE INDEX ON follows (actor) INCLUDE (inbox)", +]; + +#[derive(Clone)] +pub struct Database { + inner: Arc, +} + +struct DatabaseInner { + client: Client, + add_follow: Statement, + del_follow: Statement, + get_following_inboxes: Statement, +} + +impl Database { + pub async fn connect(conn_str: &str) -> Self { + let (client, connection) = tokio_postgres::connect(conn_str, NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!("postgresql: {}", e); + } + }); + + for command in CREATE_SCHEMA_COMMANDS { + client.execute(*command, &[]) + .await + .unwrap(); + } + let add_follow = client.prepare("INSERT INTO follows (id, inbox, actor) VALUES ($1, $2, $3)") + .await + .unwrap(); + let del_follow = client.prepare("DELETE FROM follows WHERE id=$1 AND actor=$2") + .await + .unwrap(); + let get_following_inboxes = client.prepare("SELECT DISTINCT inbox FROM follows WHERE actor=$1") + .await + .unwrap(); + + Database { + inner: Arc::new(DatabaseInner { + client, + add_follow, + del_follow, + get_following_inboxes, + }), + } + } + + pub async fn add_follow(&self, id: &str, inbox: &str, actor: &str) -> Result<(), Error> { + self.inner.client.execute(&self.inner.add_follow, &[&id, &inbox, &actor]) + .await?; + Ok(()) + } + + pub async fn del_follow(&self, id: &str, actor: &str) -> Result<(), Error> { + self.inner.client.execute(&self.inner.del_follow, &[&id, &actor]) + .await?; + Ok(()) + } + + pub async fn get_following_inboxes(&self, actor: &str) -> Result, Error> { + let rows = self.inner.client.query(&self.inner.get_following_inboxes, &[&actor]) + .await?; + Ok(rows.into_iter() + .map(|row| row.get(0)) + ) + } +} diff --git a/src/main.rs b/src/main.rs index 6f447a9..0ac35b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use axum::{ async_trait, - extract::{FromRequest, FromRef}, + extract::{FromRequest, FromRef, Path, Query}, http::{header::CONTENT_TYPE, Request, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, @@ -9,27 +9,29 @@ use axum::{ use serde::{Deserialize, Serialize}; use serde_json::json; use sigh::{PrivateKey, PublicKey, alg::{RsaSha256, Algorithm}, Key}; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap}; +use std::{panic, process}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +mod config; +mod actor; +mod db; mod fetch; pub use fetch::fetch; mod send; -pub use send::send; mod stream; mod relay; mod activitypub; -mod webfinger; mod endpoint; -const ACTOR_ID: &str = "https://relay.fedi.buzz/actor"; -const ACTOR_KEY: &str = "https://relay.fedi.buzz/actor#key"; -#[derive(Debug, Clone)] +#[derive(Clone)] struct State { + database: db::Database, client: Arc, - private_key: PrivateKey, - public_key: PublicKey, + hostname: Arc, + priv_key: PrivateKey, + pub_key: PublicKey, } @@ -39,30 +41,101 @@ impl FromRef for Arc { } } -async fn actor(axum::extract::State(state): axum::extract::State) -> Response { - let id = ACTOR_ID.to_string(); - ([("content-type", "application/activity+json")], - Json(activitypub::Actor { - jsonld_context: json!([ - "https://www.w3.org/ns/activitystreams", - "https://w3id.org/security/v1", - ]), - actor_type: "Service".to_string(), - id: id.clone(), - inbox: "https://relay.fedi.buzz/relay".to_string(), - // outbox: "https://relay.fedi.buzz/outbox".to_string(), - public_key: activitypub::ActorPublicKey { - id: ACTOR_KEY.to_string(), - owner: Some(id.clone()), - pem: state.public_key.to_pem().unwrap(), - }, - preferredUsername: Some("buzzrelay".to_string()), - })).into_response() +async fn webfinger( + axum::extract::State(state): axum::extract::State, + Query(params): Query>, +) -> Response { + let resource = match params.get("resource") { + Some(resource) => resource, + None => return StatusCode::NOT_FOUND.into_response(), + }; + let (target_kind, target_host) = + if resource.starts_with("acct:tag-") { + let off = "acct:tag-".len(); + let at = resource.find('@'); + (actor::ActorKind::TagRelay(resource[off..at.unwrap_or(resource.len())].to_string()), + at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) + } else if resource.starts_with("acct:instance-") { + let off = "acct:instance-".len(); + let at = resource.find('@'); + (actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()), + at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) + } else { + return StatusCode::NOT_FOUND.into_response(); + }; + let target = actor::Actor { + host: target_host, + kind: target_kind, + }; + Json(json!({ + "subject": &resource, + "aliases": &[ + target.uri(), + ], + "links": &[json!({ + "rel": "self", + "type": "application/activity+json", + "href": target.uri(), + })], + })).into_response() } -async fn handler( +async fn get_tag_actor( axum::extract::State(state): axum::extract::State, + Path(tag): Path +) -> Response { + // TODO: downcase + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::TagRelay(tag), + }; + target.as_activitypub(&state.pub_key) + .into_response() +} + +async fn get_instance_actor( + axum::extract::State(state): axum::extract::State, + Path(instance): Path +) -> Response { + // TODO: downcase + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::InstanceRelay(instance), + }; + target.as_activitypub(&state.pub_key) + .into_response() +} + +async fn post_tag_relay( + axum::extract::State(state): axum::extract::State, + Path(tag): Path, + endpoint: endpoint::Endpoint +) -> Response { + // TODO: downcase + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::TagRelay(tag), + }; + post_relay(state, endpoint, target).await +} + +async fn post_instance_relay( + axum::extract::State(state): axum::extract::State, + Path(instance): Path, + endpoint: endpoint::Endpoint +) -> Response { + // TODO: downcase + let target = actor::Actor { + host: state.hostname.clone(), + kind: actor::ActorKind::InstanceRelay(instance), + }; + post_relay(state, endpoint, target).await +} + +async fn post_relay( + state: State, endpoint: endpoint::Endpoint, + target: actor::Actor ) -> Response { dbg!(&endpoint); let action = match serde_json::from_value::>(endpoint.payload.clone()) { @@ -72,26 +145,39 @@ async fn handler( format!("Bad action: {:?}", e) ).into_response(), }; + // endpoint.actor.inbox + // endpoint.actor.id if action.action_type == "Follow" { - let private_key = state.private_key.clone(); + let priv_key = state.priv_key.clone(); let client = state.client.clone(); tokio::spawn(async move { let accept = activitypub::Action { jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()), action_type: "Accept".to_string(), - actor: ACTOR_ID.to_string(), + actor: target.uri(), to: Some(endpoint.actor.id.clone()), id: action.id, object: Some(endpoint.payload), }; - send::send( + let result = send::send( client.as_ref(), &endpoint.actor.inbox, - ACTOR_KEY, - &private_key, - accept, - ).await - .map_err(|e| tracing::error!("post accept: {}", e)); + &target.key_id(), + &priv_key, + &accept, + ).await; + match result { + Ok(()) => { + state.database.add_follow( + &endpoint.actor.id, + &endpoint.actor.inbox, + &target.uri(), + ).await.unwrap(); + } + Err(e) => { + tracing::error!("post accept: {}", e); + } + } }); (StatusCode::ACCEPTED, @@ -99,16 +185,15 @@ async fn handler( "{}" ).into_response() } else { + // TODO: Undo Follow (StatusCode::BAD_REQUEST, "Not a recognized request").into_response() } } -async fn inbox() -> impl IntoResponse { - StatusCode::OK -} - #[tokio::main] async fn main() { + exit_on_panic(); + tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { @@ -118,8 +203,15 @@ async fn main() { .with(tracing_subscriber::fmt::layer()) .init(); - let (private_key, public_key) = RsaSha256.generate_keys().unwrap(); - let stream_rx = stream::spawn("fedi.buzz"); + let config = config::Config::load( + &std::env::args() + .skip(1) + .next() + .expect("Call with config.yaml") + ); + let database = db::Database::connect(&config.db).await; + + let stream_rx = stream::spawn(config.upstream.clone()); let client = Arc::new( reqwest::Client::builder() .timeout(Duration::from_secs(5)) @@ -133,43 +225,34 @@ async fn main() { .build() .unwrap() ); - relay::spawn(client.clone(), ACTOR_KEY.to_string(), private_key.clone(), stream_rx); - - let relay_url = "https://relay.dresden.network/inbox"; - let client_ = client.clone(); - let private_key_ = private_key.clone(); - tokio::spawn(async move { - let follow = activitypub::Action::<()> { - jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()), - action_type: "Follow".to_string(), - actor: ACTOR_ID.to_string(), - to: Some(relay_url.to_string()), - id: "fnord".to_string(), - object: None, - }; - send::send( - client_.as_ref(), relay_url, - ACTOR_KEY, - &private_key_, - follow, - ).await - .map_err(|e| tracing::error!("post accept: {}", e)); - }); + let hostname = Arc::new(config.hostname.clone()); + relay::spawn(client.clone(), hostname.clone(), database.clone(), config.priv_key(), stream_rx); let app = Router::new() - .route("/actor", get(actor)) - .route("/relay", post(handler)) - .route("/inbox", post(inbox)) - .route("/.well-known/webfinger", get(webfinger::webfinger)) + .route("/tag/:tag", get(get_tag_actor).post(post_tag_relay)) + .route("/instance/:instance", get(get_instance_actor).post(post_instance_relay)) + .route("/.well-known/webfinger", get(webfinger)) .with_state(State { + database, client, - private_key, public_key, + hostname, + priv_key: config.priv_key(), + pub_key: config.pub_key(), }); - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - tracing::debug!("listening on {}", addr); + let addr = SocketAddr::from(([127, 0, 0, 1], config.listen_port)); + tracing::info!("serving on {}", addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); } + +fn exit_on_panic() { + let orig_hook = panic::take_hook(); + panic::set_hook(Box::new(move |panic_info| { + // invoke the default handler and exit the process + orig_hook(panic_info); + process::exit(1); + })); +} diff --git a/src/relay.rs b/src/relay.rs index 04b86ab..19faaf2 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,49 +1,114 @@ -use std::sync::Arc; +use std::{sync::Arc, collections::HashSet}; +use serde::Deserialize; use serde_json::json; use sigh::PrivateKey; use tokio::{ sync::mpsc::Receiver, }; -use crate::send::send; +use crate::{db::Database, send, actor}; + +#[derive(Deserialize)] +struct Post<'a> { + // pub url: &'a str, + pub uri: &'a str, + pub tags: Option>>, +} + +impl Post<'_> { + pub fn host(&self) -> Option { + reqwest::Url::parse(&self.uri) + .ok() + .and_then(|url| url.domain() + .map(|s| s.to_lowercase()) + ) + } + + pub fn tags(&self) -> Vec { + match &self.tags { + None => + vec![], + Some(tags) => + tags.iter() + .map(|tag| tag.name.to_lowercase()) + .collect() + } + } + + fn relay_target_kinds(&self) -> impl Iterator { + self.host() + .into_iter() + .map(|host| actor::ActorKind::InstanceRelay(host.clone())) + .chain( + self.tags() + .into_iter() + .map(|tag| actor::ActorKind::TagRelay(tag)) + ) + } + + pub fn relay_targets(&self, hostname: Arc) -> impl Iterator { + self.relay_target_kinds() + .map(move |kind| actor::Actor { + host: hostname.clone(), + kind, + }) + } +} + +#[derive(Deserialize)] +struct Tag<'a> { + pub name: &'a str, +} pub fn spawn( client: Arc, - key_id: String, + hostname: Arc, + database: Database, private_key: PrivateKey, - mut stream_rx: Receiver + mut stream_rx: Receiver ) { tokio::spawn(async move { - while let Some(post) = stream_rx.recv().await { - dbg!(&post); - let url = if let Some(serde_json::Value::String(url)) = post.get("url") { - url - } else { - continue; + while let Some(data) = stream_rx.recv().await { + // dbg!(&data); + let post: Post = match serde_json::from_str(&data) { + Ok(post) => post, + Err(e) => { + tracing::error!("parse error: {}", e); + tracing::trace!("data: {}", data); + continue; + } }; - let uri = if let Some(serde_json::Value::String(uri)) = post.get("uri") { - uri - } else { - continue; - }; - let account = if let Some(serde_json::Value::String(account)) = post.get("account").and_then(|a| a.get("url")) { - account - } else { - continue; - }; - // {"@context": "https://www.w3.org/ns/activitystreams", "type": "Announce", "to": ["https://relay.dresden.network/followers"], "actor": "https://relay.dresden.network/actor", "object": "https://mastodon.online/users/evangreer/statuses/109521063161210607", "id": "https://relay.dresden.network/activities/5e41fd9c-bc51-408c-94ca-96a7bf9ce412"} - let body = json!({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "actor": "https://relay.fedi.buzz/actor", - "to": ["https://www.w3.org/ns/activitystreams#Public"], - "object": &uri, - "id": &url, - }); - dbg!(&body); - send(&client, "https://c3d2.social/inbox", - &key_id, &private_key, body).await - .map_err(|e| tracing::error!("relay::send {:?}", e)); + // TODO: queue by target? + let mut seen = HashSet::new(); + for actor in post.relay_targets(hostname.clone()) { + if seen.contains(&actor) { + continue; + } + + let actor_id = actor.uri(); + let body = json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Announce", + "actor": &actor_id, + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "object": &post.uri, + "id": &post.uri, + }); + let body = Arc::new( + serde_json::to_vec(&body) + .unwrap() + ); + for inbox in database.get_following_inboxes(&actor_id).await.unwrap() { + if let Err(e) = send::send_raw( + &client, &inbox, + &actor.key_id(), &private_key, body.clone() + ).await { + tracing::error!("relay::send {:?}", e); + } + } + + seen.insert(actor); + } } }); } diff --git a/src/send.rs b/src/send.rs index 433f575..b695f9f 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,3 +1,5 @@ +use std::{sync::Arc, ops::Deref}; + use futures::StreamExt; use http::StatusCode; use http_digest_headers::{DigestHeader, DigestMethod}; @@ -28,10 +30,22 @@ pub async fn send( uri: &str, key_id: &str, private_key: &PrivateKey, - body: T, + body: &T, +) -> Result<(), SendError> { + let body = Arc::new( + serde_json::to_vec(body) + .map_err(SendError::Json)? + ); + send_raw(client, uri, key_id, private_key, body).await +} + +pub async fn send_raw( + client: &reqwest::Client, + uri: &str, + key_id: &str, + private_key: &PrivateKey, + body: Arc>, ) -> Result<(), SendError> { - let body = serde_json::to_vec(&body) - .map_err(SendError::Json)?; let mut digest_header = DigestHeader::new() .with_method(DigestMethod::SHA256, &body) .map(|h| format!("{}", h)) @@ -55,7 +69,7 @@ pub async fn send( .header("date", chrono::Utc::now().to_rfc2822() .replace("+0000", "GMT")) .header("digest", digest_header) - .body(body) + .body(body.as_ref().clone()) .map_err(SendError::HttpReq)?; SigningConfig::new(RsaSha256, private_key, key_id) .sign(&mut req)?; diff --git a/src/stream.rs b/src/stream.rs index 4f235ca..8a8ae09 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -13,7 +13,7 @@ pub enum StreamError { InvalidContentType, } -async fn run(host: &str) -> Result, StreamError> { +async fn run(host: &str) -> Result, StreamError> { let url = format!("https://{}/api/v1/streaming/public", host); let client = reqwest::Client::new(); let res = client.get(url) @@ -40,19 +40,11 @@ async fn run(host: &str) -> Result, Stream None } }) - .filter_map(|event| async move { - match serde_json::from_str(&event.data) { - Ok(post) => Some(post), - Err(e) => { - tracing::error!("Decode stream: {}", e); - None - } - } - }); + .map(|event| event.data); Ok(src) } -pub fn spawn>(host: H) -> Receiver { +pub fn spawn>(host: H) -> Receiver { let host = host.into(); let (tx, rx) = channel(1024); tokio::spawn(async move { diff --git a/src/webfinger.rs b/src/webfinger.rs deleted file mode 100644 index c8b00c4..0000000 --- a/src/webfinger.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::collections::HashMap; - -use axum::{ - async_trait, - body::{Bytes, HttpBody}, - extract::{Query}, - http::{header::CONTENT_TYPE, Request, StatusCode}, - Json, - response::{IntoResponse, Response}, - routing::post, - Form, RequestExt, Router, BoxError, -}; -use serde_json::json; - -pub async fn webfinger(Query(params): Query>) -> Response { - let resource = match params.get("resource") { - Some(resource) => resource, - None => return StatusCode::NOT_FOUND.into_response(), - }; - Json(json!({ - "subject": &resource, - "aliases": &[ - "https://relay.fedi.buzz/actor", - ], - "links": &[json!({ - "rel": "self", - "type": "application/activity+json", - "href": "https://relay.fedi.buzz/actor", - })], - })).into_response() -}