From 38f5b2cb139031703fbbddf8697e4756e2601bcb Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 12 Oct 2023 22:09:09 +0200 Subject: [PATCH] publish Create/Announce on redis --- Cargo.lock | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 3 ++- config.yaml | 4 ++++ src/config.rs | 7 ++++++ src/endpoint.rs | 2 +- src/main.rs | 34 ++++++++++++++++++++++++--- src/state.rs | 4 +++- 7 files changed, 110 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e89b4f..0fc5878 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,12 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "askama" version = "0.11.1" @@ -287,6 +293,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "metrics-util", + "redis", "reqwest", "serde", "serde_json", @@ -342,6 +349,20 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1433,6 +1454,30 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.9", + "tokio", + "tokio-retry", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1661,6 +1706,12 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" @@ -1955,6 +2006,17 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.9" diff --git a/Cargo.toml b/Cargo.toml index 3888675..a719116 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ serde_json = "1" serde_yaml = "0.9" reqwest = { version = "0.11", features = ["json", "stream"] } sigh = "1.0" -http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] } +http_digest_headers = { version = "0.1.0", default-features = false, features = ["use_openssl"] } thiserror = "1" http = "0.2" chrono = "0.4" @@ -32,3 +32,4 @@ metrics-exporter-prometheus = "0.12" deunicode = "1.3" urlencoding = "2" httpdate = "1" +redis = { version = "0.23", features = ["tokio-comp", "connection-manager"] } diff --git a/config.yaml b/config.yaml index 8c8a80a..f844030 100644 --- a/config.yaml +++ b/config.yaml @@ -18,3 +18,7 @@ priv_key_file: private-key.pem pub_key_file: public-key.pem # PostgreSQL db: "host=localhost user=relay password=xyz dbname=buzzrelay" +# Optional Redis +redis: + connection: "redis://127.0.0.1:6378/" + in_topic: "relay-in" diff --git a/src/config.rs b/src/config.rs index 5dfb18c..383e4db 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,19 @@ use serde::Deserialize; use sigh::{PrivateKey, PublicKey, Key}; +#[derive(Clone, Deserialize)] +pub struct RedisConfig { + pub connection: String, + pub in_topic: String, +} + #[derive(Clone, Deserialize)] pub struct Config { pub streams: Vec, pub db: String, pub hostname: String, pub listen_port: u16, + pub redis: Option, priv_key_file: String, pub_key_file: String, } diff --git a/src/endpoint.rs b/src/endpoint.rs index c0b2b00..5450bad 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -25,7 +25,7 @@ const SIGNATURE_HEADERS_REQUIRED: &[&str] = &[ pub struct Endpoint<'a> { pub payload: serde_json::Value, signature: Signature<'a>, - remote_actor_uri: String, + pub remote_actor_uri: String, } #[async_trait] diff --git a/src/main.rs b/src/main.rs index fd4e32a..d1ee929 100644 --- a/src/main.rs +++ b/src/main.rs @@ -152,7 +152,7 @@ async fn post_relay( ).into_response(); } }; - let object_type = action.object + let object_type = action.object.as_ref() .and_then(|object| object.get("type").cloned()) .and_then(|object_type| object_type.as_str().map(std::string::ToString::to_string)); @@ -228,7 +228,27 @@ async fn post_relay( ).into_response() } } + } else if action.action_type == "Create" || action.action_type == "Announce" { + tracing::trace!("Received on {} from {}: {} {:?}", target.uri(), endpoint.remote_actor_uri, action.action_type, action.object); + if let Some((redis, in_topic)) = &state.redis { + if let Some(data) = &action.object + .and_then(|o| serde_json::to_vec(&o).ok()) + { + if let Err(e) = redis::Cmd::publish(in_topic.as_ref(), data) + .query_async::<_, redis::Value>(&mut redis.clone()) + .await + { + tracing::error!("redis publish: {}", e); + } + } + } + + (StatusCode::ACCEPTED, + [("content-type", "application/activity+json")], + "{}" + ).into_response() } else { + tracing::error!("Unrecognized action type {:?}", action.action_type); track_request("POST", "relay", "unrecognized"); (StatusCode::BAD_REQUEST, "Not a recognized request").into_response() } @@ -312,7 +332,15 @@ async fn main() { .expect("Call with config.yaml") ); let database = db::Database::connect(&config.db).await; - + let mut redis = None; + if let Some(redis_config) = config.redis.clone() { + let client = redis::Client::open(redis_config.connection) + .expect("redis::Client"); + let manager = redis::aio::ConnectionManager::new(client) + .await + .expect("redis::Client"); + redis = Some((manager, redis_config.in_topic)); + } let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .user_agent(concat!( @@ -324,7 +352,7 @@ async fn main() { .pool_idle_timeout(Some(Duration::from_secs(5))) .build() .unwrap(); - let state = State::new(config.clone(), database, client); + let state = State::new(config.clone(), database, redis, client); let stream_rx = stream::spawn(config.streams.clone().into_iter()); relay::spawn(state.clone(), stream_rx); diff --git a/src/state.rs b/src/state.rs index 08bb87a..5559725 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,6 +8,7 @@ use crate::{config::Config, db::Database}; #[derive(Clone)] pub struct State { pub database: Database, + pub redis: Option<(redis::aio::ConnectionManager, Arc)>, pub client: Arc, pub hostname: Arc, pub priv_key: Arc, @@ -22,11 +23,12 @@ impl FromRef for Arc { } impl State { - pub fn new(config: Config, database: Database, client: reqwest::Client) -> Self { + pub fn new(config: Config, database: Database, redis: Option<(redis::aio::ConnectionManager, String)>, client: reqwest::Client) -> Self { let priv_key = Arc::new(config.priv_key()); let pub_key = Arc::new(config.pub_key()); State { database, + redis: redis.map(|(connection, in_topic)| (connection, Arc::new(in_topic))), client: Arc::new(client), hostname: Arc::new(config.hostname), priv_key,