diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock index 6130d6f..1ad9f16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,8 @@ dependencies = [ "axum-extra", "axum-macros", "chrono", + "eventsource-stream", + "futures", "http", "http_digest_headers", "reqwest", @@ -320,6 +322,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -359,6 +372,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.25" @@ -366,6 +394,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -374,6 +403,34 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +[[package]] +name = "futures-executor" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.25" @@ -392,10 +449,16 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -982,6 +1045,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -1110,7 +1174,7 @@ dependencies = [ [[package]] name = "sigh" -version = "0.1.0" +version = "1.0.1" dependencies = [ "base64 0.20.0", "http", diff --git a/Cargo.toml b/Cargo.toml index deb6ce4..65820f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,14 +8,16 @@ axum = "0.6" axum-macros = "0.3" axum-extra = { version = "0.4", features = ["spa"] } askama = "0.11" -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["full", "time"] } tracing = "*" tracing-subscriber = { version = "0.3", features = ["env-filter"] } serde = "1" serde_json = "1" -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["json", "stream"] } sigh = { path = "../rust-sigh" } http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] } thiserror = "1" http = "0.2" chrono = "0.4" +eventsource-stream = "0.2" +futures = "0.3" diff --git a/src/main.rs b/src/main.rs index 960adba..6f447a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,15 @@ use axum::{ use serde::{Deserialize, Serialize}; use serde_json::json; use sigh::{PrivateKey, PublicKey, alg::{RsaSha256, Algorithm}, Key}; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod fetch; pub use fetch::fetch; mod send; pub use send::send; +mod stream; +mod relay; mod activitypub; mod webfinger; mod endpoint; @@ -47,7 +49,7 @@ async fn actor(axum::extract::State(state): axum::extract::State) -> Resp ]), actor_type: "Service".to_string(), id: id.clone(), - inbox: "https://relay.fedi.buzz/inbox".to_string(), + inbox: "https://relay.fedi.buzz/relay".to_string(), // outbox: "https://relay.fedi.buzz/outbox".to_string(), public_key: activitypub::ActorPublicKey { id: ACTOR_KEY.to_string(), @@ -70,7 +72,7 @@ async fn handler( format!("Bad action: {:?}", e) ).into_response(), }; - + if action.action_type == "Follow" { let private_key = state.private_key.clone(); let client = state.client.clone(); @@ -91,7 +93,7 @@ async fn handler( ).await .map_err(|e| tracing::error!("post accept: {}", e)); }); - + (StatusCode::ACCEPTED, [("content-type", "application/activity+json")], "{}" @@ -101,6 +103,10 @@ async fn handler( } } +async fn inbox() -> impl IntoResponse { + StatusCode::OK +} + #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -113,13 +119,50 @@ async fn main() { .init(); let (private_key, public_key) = RsaSha256.generate_keys().unwrap(); + let stream_rx = stream::spawn("fedi.buzz"); + let client = Arc::new( + reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .user_agent(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + )) + .pool_max_idle_per_host(1) + .pool_idle_timeout(Some(Duration::from_secs(5))) + .build() + .unwrap() + ); + relay::spawn(client.clone(), ACTOR_KEY.to_string(), private_key.clone(), stream_rx); + + let relay_url = "https://relay.dresden.network/inbox"; + let client_ = client.clone(); + let private_key_ = private_key.clone(); + tokio::spawn(async move { + let follow = activitypub::Action::<()> { + jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()), + action_type: "Follow".to_string(), + actor: ACTOR_ID.to_string(), + to: Some(relay_url.to_string()), + id: "fnord".to_string(), + object: None, + }; + send::send( + client_.as_ref(), relay_url, + ACTOR_KEY, + &private_key_, + follow, + ).await + .map_err(|e| tracing::error!("post accept: {}", e)); + }); let app = Router::new() .route("/actor", get(actor)) .route("/relay", post(handler)) + .route("/inbox", post(inbox)) .route("/.well-known/webfinger", get(webfinger::webfinger)) .with_state(State { - client: Arc::new(reqwest::Client::new()), + client, private_key, public_key, }); diff --git a/src/relay.rs b/src/relay.rs new file mode 100644 index 0000000..04b86ab --- /dev/null +++ b/src/relay.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use serde_json::json; +use sigh::PrivateKey; +use tokio::{ + sync::mpsc::Receiver, +}; +use crate::send::send; + +pub fn spawn( + client: Arc, + key_id: String, + private_key: PrivateKey, + mut stream_rx: Receiver +) { + tokio::spawn(async move { + while let Some(post) = stream_rx.recv().await { + dbg!(&post); + let url = if let Some(serde_json::Value::String(url)) = post.get("url") { + url + } else { + continue; + }; + let uri = if let Some(serde_json::Value::String(uri)) = post.get("uri") { + uri + } else { + continue; + }; + let account = if let Some(serde_json::Value::String(account)) = post.get("account").and_then(|a| a.get("url")) { + account + } else { + continue; + }; + // {"@context": "https://www.w3.org/ns/activitystreams", "type": "Announce", "to": ["https://relay.dresden.network/followers"], "actor": "https://relay.dresden.network/actor", "object": "https://mastodon.online/users/evangreer/statuses/109521063161210607", "id": "https://relay.dresden.network/activities/5e41fd9c-bc51-408c-94ca-96a7bf9ce412"} + let body = json!({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Announce", + "actor": "https://relay.fedi.buzz/actor", + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "object": &uri, + "id": &url, + }); + dbg!(&body); + send(&client, "https://c3d2.social/inbox", + &key_id, &private_key, body).await + .map_err(|e| tracing::error!("relay::send {:?}", e)); + } + }); +} diff --git a/src/send.rs b/src/send.rs index d4ff177..433f575 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,5 +1,7 @@ +use futures::StreamExt; use http::StatusCode; use http_digest_headers::{DigestHeader, DigestMethod}; +use reqwest::Body; use serde::Serialize; use sigh::{PrivateKey, SigningConfig, alg::RsaSha256}; @@ -57,7 +59,8 @@ pub async fn send( .map_err(SendError::HttpReq)?; SigningConfig::new(RsaSha256, private_key, key_id) .sign(&mut req)?; - let res = client.execute(req.try_into()?) + let req: reqwest::Request = req.try_into()?; + let res = client.execute(req) .await?; if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES { Ok(()) diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..4f235ca --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,73 @@ +use std::time::Duration; +use futures::{Stream, StreamExt}; +use eventsource_stream::Eventsource; +use tokio::{ + sync::mpsc::{channel, Receiver}, + time::sleep, +}; + +#[derive(Debug)] +pub enum StreamError { + Http(reqwest::Error), + HttpStatus(reqwest::StatusCode), + InvalidContentType, +} + +async fn run(host: &str) -> Result, StreamError> { + let url = format!("https://{}/api/v1/streaming/public", host); + let client = reqwest::Client::new(); + let res = client.get(url) + .timeout(Duration::MAX) + .send() + .await + .map_err(StreamError::Http)?; + if res.status() != 200 { + return Err(StreamError::HttpStatus(res.status())); + } + let ct = res.headers().get("content-type") + .and_then(|c| c.to_str().ok()); + if ct.map_or(true, |ct| ct != "text/event-stream") { + return Err(StreamError::InvalidContentType); + } + + let src = res.bytes_stream() + .eventsource() + .filter_map(|result| async { + let result = result.ok()?; + if result.event == "update" { + Some(result) + } else { + None + } + }) + .filter_map(|event| async move { + match serde_json::from_str(&event.data) { + Ok(post) => Some(post), + Err(e) => { + tracing::error!("Decode stream: {}", e); + None + } + } + }); + Ok(src) +} + +pub fn spawn>(host: H) -> Receiver { + let host = host.into(); + let (tx, rx) = channel(1024); + tokio::spawn(async move { + loop { + match run(&host).await { + Ok(stream) => + stream.for_each(|post| async { + tx.send(post).await.unwrap(); + }).await, + Err(e) => + tracing::error!("stream: {:?}", e), + } + + sleep(Duration::from_secs(1)).await; + } + }); + rx +}