From 0cbe679e653895a140501b244c5846af08a15733 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 15 Mar 2020 17:37:53 -0500 Subject: [PATCH] Basic relay functionality --- .env | 1 + Cargo.lock | 10 + Cargo.toml | 1 + .../2020-03-14-211045_create_listeners/up.sql | 2 +- .../2020-03-14-213217_create_blocks/up.sql | 2 +- .../up.sql | 2 +- src/apub.rs | 30 +++ src/inbox.rs | 238 ++++++++++++++++-- src/main.rs | 45 +++- src/schema.rs | 6 +- src/state.rs | 91 ++++++- 11 files changed, 391 insertions(+), 37 deletions(-) diff --git a/.env b/.env index c60e236..8d33ab5 100644 --- a/.env +++ b/.env @@ -1 +1,2 @@ DATABASE_URL=postgres://ap_actix:ap_actix@localhost:5432/ap_actix +HOSTNAME=localhost:8080 diff --git a/Cargo.lock b/Cargo.lock index 8f862ee..0ab0b95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,6 +374,7 @@ dependencies = [ "thiserror", "tokio", "ttl_cache", + "uuid", ] [[package]] @@ -2117,6 +2118,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" +dependencies = [ + "rand", +] + [[package]] name = "vcpkg" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 6b979e2..7fc43a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ serde_json = "1.0" thiserror = "1.0" tokio = { version = "0.2.13", features = ["sync"] } ttl_cache = "0.5.1" +uuid = { version = "0.8", features = ["v4"] } diff --git a/migrations/2020-03-14-211045_create_listeners/up.sql b/migrations/2020-03-14-211045_create_listeners/up.sql index e1fd942..ee6324e 100644 --- a/migrations/2020-03-14-211045_create_listeners/up.sql +++ b/migrations/2020-03-14-211045_create_listeners/up.sql @@ -3,7 +3,7 @@ CREATE TABLE listeners ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), actor_id TEXT UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL + updated_at TIMESTAMP ); CREATE INDEX listeners_actor_id_index ON listeners(actor_id); diff --git a/migrations/2020-03-14-213217_create_blocks/up.sql b/migrations/2020-03-14-213217_create_blocks/up.sql index 75b5871..364816b 100644 --- a/migrations/2020-03-14-213217_create_blocks/up.sql +++ b/migrations/2020-03-14-213217_create_blocks/up.sql @@ -3,7 +3,7 @@ CREATE TABLE blocks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), domain_name TEXT UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL + updated_at TIMESTAMP ); CREATE INDEX blocks_domain_name_index ON blocks(domain_name); diff --git a/migrations/2020-03-14-213511_create_whitelists/up.sql b/migrations/2020-03-14-213511_create_whitelists/up.sql index 7a90398..5a2ab11 100644 --- a/migrations/2020-03-14-213511_create_whitelists/up.sql +++ b/migrations/2020-03-14-213511_create_whitelists/up.sql @@ -3,7 +3,7 @@ CREATE TABLE whitelists ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), domain_name TEXT UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL + updated_at TIMESTAMP ); CREATE INDEX whitelists_domain_name_index ON whitelists(domain_name); diff --git a/src/apub.rs b/src/apub.rs index 687b508..8757e02 100644 --- a/src/apub.rs +++ b/src/apub.rs @@ -3,6 +3,7 @@ use activitystreams::{ primitives::XsdAnyUri, PropRefs, }; +use std::collections::HashMap; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PropRefs)] #[serde(rename_all = "camelCase")] @@ -12,6 +13,9 @@ pub struct AnyExistingObject { #[serde(rename = "type")] pub kind: String, + + #[serde(flatten)] + ext: HashMap, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -22,6 +26,7 @@ pub enum ValidTypes { Delete, Follow, Undo, + Update, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -43,6 +48,9 @@ pub struct AcceptedObjects { pub actor: XsdAnyUri, pub object: ValidObjects, + + #[serde(flatten)] + ext: HashMap, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -71,6 +79,28 @@ impl ValidObjects { ValidObjects::Object(ref obj) => &obj.id, } } + + pub fn is_kind(&self, query_kind: &str) -> bool { + match self { + ValidObjects::Id(_) => false, + ValidObjects::Object(AnyExistingObject { kind, .. }) => kind == query_kind, + } + } + + pub fn child_object_is_actor(&self) -> bool { + match self { + ValidObjects::Id(_) => false, + ValidObjects::Object(AnyExistingObject { ext, .. }) => { + if let Some(o) = ext.get("object") { + if let Ok(s) = serde_json::from_value::(o.clone()) { + return s.ends_with("/actor"); + } + } + + false + } + } + } } impl AcceptedActors { diff --git a/src/inbox.rs b/src/inbox.rs index ab0db0b..059dfe4 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -1,16 +1,18 @@ use activitystreams::{ - activity::apub::{Accept, Follow}, + activity::apub::{Accept, Announce, Follow, Undo}, + context, primitives::XsdAnyUri, }; use actix::Addr; -use actix_web::{client::Client, web, Responder}; +use actix_web::{client::Client, web, HttpResponse}; use futures::join; -use log::{error, info}; +use log::error; +use std::collections::HashMap; use crate::{ apub::{AcceptedActors, AcceptedObjects, ValidTypes}, db_actor::{DbActor, DbQuery, Pool}, - state::State, + state::{State, UrlKind}, }; #[derive(Clone, Debug, thiserror::Error)] @@ -22,28 +24,146 @@ pub async fn inbox( state: web::Data, client: web::Data, input: web::Json, -) -> Result { +) -> Result { let input = input.into_inner(); - let actor = fetch_actor(state.clone(), client, &input.actor).await?; + let actor = fetch_actor(state.clone(), &client, &input.actor).await?; match input.kind { - ValidTypes::Announce => (), - ValidTypes::Create => (), - ValidTypes::Delete => (), - ValidTypes::Follow => return handle_follow(db_actor, state, input, actor).await, - ValidTypes::Undo => (), + ValidTypes::Announce | ValidTypes::Create => { + handle_relay(state, client, input, actor).await + } + ValidTypes::Follow => handle_follow(db_actor, state, client, input, actor).await, + ValidTypes::Delete | ValidTypes::Update => { + handle_forward(state, client, input, actor).await + } + ValidTypes::Undo => handle_undo(db_actor, state, client, input, actor).await, + } +} + +pub fn response(item: T) -> HttpResponse +where + T: serde::ser::Serialize, +{ + HttpResponse::Accepted() + .content_type("application/activity+json") + .json(item) +} + +async fn handle_undo( + db_actor: web::Data>, + state: web::Data, + client: web::Data, + input: AcceptedObjects, + actor: AcceptedActors, +) -> Result { + if !input.object.is_kind("Follow") { + return Err(MyError); } - Err(MyError) + let inbox = actor.inbox().to_owned(); + + let state2 = state.clone().into_inner(); + db_actor.do_send(DbQuery(move |pool: Pool| { + let inbox = inbox.clone(); + + async move { + let conn = pool.get().await?; + + state2.remove_listener(&conn, &inbox).await.map_err(|e| { + error!("Error removing listener, {}", e); + e + }) + } + })); + + let mut undo = Undo::default(); + let mut follow = Follow::default(); + + follow + .object_props + .set_id(state.generate_url(UrlKind::Activity))?; + follow + .follow_props + .set_actor_xsd_any_uri(actor.id.clone())? + .set_object_xsd_any_uri(actor.id.clone())?; + + undo.object_props + .set_id(state.generate_url(UrlKind::Activity))? + .set_many_to_xsd_any_uris(vec![actor.id.clone()])? + .set_context_xsd_any_uri(context())?; + undo.undo_props + .set_object_object_box(follow)? + .set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?; + + if input.object.child_object_is_actor() { + let undo2 = undo.clone(); + let client = client.into_inner(); + actix::Arbiter::spawn(async move { + let _ = deliver(&client, actor.id, &undo2).await; + }); + } + + Ok(response(undo)) +} + +async fn handle_forward( + state: web::Data, + client: web::Data, + input: AcceptedObjects, + actor: AcceptedActors, +) -> Result { + let object_id = input.object.id(); + + let inboxes = get_inboxes(&state, &actor, &object_id).await?; + + deliver_many(client, inboxes, input); + + Ok(response(HashMap::<(), ()>::new())) +} + +async fn handle_relay( + state: web::Data, + client: web::Data, + input: AcceptedObjects, + actor: AcceptedActors, +) -> Result { + let object_id = input.object.id(); + + if state.is_cached(object_id).await { + return Err(MyError); + } + + let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?; + + let mut announce = Announce::default(); + announce + .object_props + .set_context_xsd_any_uri(context())? + .set_many_to_xsd_any_uris(vec![state.generate_url(UrlKind::Followers)])? + .set_id(activity_id.clone())?; + + announce + .announce_props + .set_object_xsd_any_uri(object_id.clone())? + .set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?; + + let inboxes = get_inboxes(&state, &actor, &object_id).await?; + + deliver_many(client, inboxes, announce); + + state.cache(object_id.to_owned(), activity_id).await; + + Ok(response(HashMap::<(), ()>::new())) } async fn handle_follow( db_actor: web::Data>, state: web::Data, + client: web::Data, input: AcceptedObjects, actor: AcceptedActors, -) -> Result, MyError> { +) -> Result { let (is_listener, is_blocked, is_whitelisted) = join!( state.is_listener(&actor.id), state.is_blocked(&actor.id), @@ -61,44 +181,55 @@ async fn handle_follow( } if !is_listener { - let state = state.into_inner(); + let state = state.clone().into_inner(); - let actor = actor.clone(); + let inbox = actor.inbox().to_owned(); db_actor.do_send(DbQuery(move |pool: Pool| { - let actor_id = actor.id.clone(); + let inbox = inbox.clone(); let state = state.clone(); async move { let conn = pool.get().await?; - state.add_listener(&conn, actor_id).await + state.add_listener(&conn, inbox).await.map_err(|e| { + error!("Error adding listener, {}", e); + e + }) } })); } + let actor_inbox = actor.inbox().clone(); + let mut accept = Accept::default(); let mut follow = Follow::default(); follow.object_props.set_id(input.id)?; follow .follow_props - .set_object_xsd_any_uri(format!("https://{}/actor", "localhost"))? + .set_object_xsd_any_uri(state.generate_url(UrlKind::Actor))? .set_actor_xsd_any_uri(actor.id.clone())?; accept .object_props - .set_id(format!("https://{}/activities/{}", "localhost", "1"))? + .set_id(state.generate_url(UrlKind::Activity))? .set_many_to_xsd_any_uris(vec![actor.id])?; accept .accept_props .set_object_object_box(follow)? - .set_actor_xsd_any_uri(format!("https://{}/actor", "localhost"))?; + .set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?; - Ok(web::Json(accept)) + let client = client.into_inner(); + let accept2 = accept.clone(); + actix::Arbiter::spawn(async move { + let _ = deliver(&client, actor_inbox, &accept2).await; + }); + + Ok(response(accept)) } async fn fetch_actor( state: web::Data, - client: web::Data, + client: &web::Data, actor_id: &XsdAnyUri, ) -> Result { if let Some(actor) = state.get_actor(actor_id).await { @@ -111,13 +242,13 @@ async fn fetch_actor( .send() .await .map_err(|e| { - error!("Couldn't send request for actor, {}", e); + error!("Couldn't send request to {} for actor, {}", actor_id, e); MyError })? .json() .await .map_err(|e| { - error!("Coudn't fetch actor, {}", e); + error!("Coudn't fetch actor from {}, {}", actor_id, e); MyError })?; @@ -126,6 +257,65 @@ async fn fetch_actor( Ok(actor) } +fn deliver_many(client: web::Data, inboxes: Vec, item: T) +where + T: serde::ser::Serialize + 'static, +{ + let client = client.into_inner(); + + actix::Arbiter::spawn(async move { + use futures::stream::StreamExt; + + let client = client.clone(); + let mut unordered = futures::stream::FuturesUnordered::new(); + + for inbox in inboxes { + unordered.push(deliver(&client, inbox, &item)); + } + + while let Some(_) = unordered.next().await {} + }); +} + +async fn deliver( + client: &std::sync::Arc, + inbox: XsdAnyUri, + item: &T, +) -> Result<(), MyError> +where + T: serde::ser::Serialize, +{ + let res = client + .post(inbox.as_str()) + .header("Accept", "application/activity+json") + .header("Content-Type", "application/activity+json") + .send_json(item) + .await + .map_err(|e| { + error!("Couldn't send deliver request to {}, {}", inbox, e); + MyError + })?; + + if !res.status().is_success() { + error!("Invalid response status from {}, {}", inbox, res.status()); + return Err(MyError); + } + + Ok(()) +} + +async fn get_inboxes( + state: &web::Data, + actor: &AcceptedActors, + object_id: &XsdAnyUri, +) -> Result, MyError> { + let domain = object_id.as_url().host().ok_or(MyError)?.to_string(); + + let inbox = actor.inbox(); + + Ok(state.listeners_without(&inbox, &domain).await) +} + impl actix_web::error::ResponseError for MyError {} impl From for MyError { diff --git a/src/main.rs b/src/main.rs index 53a7cf2..69e31b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![feature(drain_filter)] -use actix_web::{client::Client, web, App, HttpServer, Responder}; +use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties}; +use actix_web::{client::Client, middleware::Logger, web, App, HttpServer, Responder}; use bb8_postgres::tokio_postgres; mod apub; @@ -8,12 +9,42 @@ mod inbox; mod label; mod state; -use self::{db_actor::DbActor, label::ArbiterLabelFactory, state::State}; +use self::{ + db_actor::DbActor, + inbox::MyError, + label::ArbiterLabelFactory, + state::{State, UrlKind}, +}; async fn index() -> impl Responder { "hewwo, mr obama" } +async fn actor_route(state: web::Data) -> Result { + let mut application = Application::default(); + let mut endpoint = EndpointProperties::default(); + + endpoint.set_shared_inbox(format!("https://{}/inbox", "localhost"))?; + + application + .object_props + .set_id(state.generate_url(UrlKind::Actor))? + .set_summary_xsd_string("AodeRelay bot")? + .set_name_xsd_string("AodeRelay")? + .set_url_xsd_any_uri(state.generate_url(UrlKind::Actor))? + .set_context_xsd_any_uri(context())?; + + application + .ap_actor_props + .set_preferred_username("relay")? + .set_followers(state.generate_url(UrlKind::Followers))? + .set_following(state.generate_url(UrlKind::Following))? + .set_inbox(state.generate_url(UrlKind::Inbox))? + .set_endpoints(endpoint)?; + + Ok(inbox::response(application)) +} + #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); @@ -21,13 +52,19 @@ async fn main() -> Result<(), anyhow::Error> { pretty_env_logger::init(); let pg_config: tokio_postgres::Config = std::env::var("DATABASE_URL")?.parse()?; + let hostname: String = std::env::var("HOSTNAME")?; + let use_whitelist = std::env::var("USE_WHITELIST").is_ok(); + let use_https = std::env::var("USE_HTTPS").is_ok(); + let arbiter_labeler = ArbiterLabelFactory::new(); let db_actor = DbActor::new(pg_config.clone()); arbiter_labeler.clone().set_label(); let state: State = db_actor - .send(db_actor::DbQuery(|pool| State::hydrate(false, pool))) + .send(db_actor::DbQuery(move |pool| { + State::hydrate(use_https, use_whitelist, hostname, pool) + })) .await? .await??; @@ -37,11 +74,13 @@ async fn main() -> Result<(), anyhow::Error> { let client = Client::default(); App::new() + .wrap(Logger::default()) .data(actor) .data(state.clone()) .data(client) .service(web::resource("/").route(web::get().to(index))) .service(web::resource("/inbox").route(web::post().to(inbox::inbox))) + .service(web::resource("/actor").route(web::get().to(actor_route))) }) .bind("127.0.0.1:8080")? .run() diff --git a/src/schema.rs b/src/schema.rs index 6c62165..9222afd 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -3,7 +3,7 @@ table! { id -> Uuid, domain_name -> Text, created_at -> Timestamp, - updated_at -> Timestamp, + updated_at -> Nullable, } } @@ -12,7 +12,7 @@ table! { id -> Uuid, actor_id -> Text, created_at -> Timestamp, - updated_at -> Timestamp, + updated_at -> Nullable, } } @@ -21,7 +21,7 @@ table! { id -> Uuid, domain_name -> Text, created_at -> Timestamp, - updated_at -> Timestamp, + updated_at -> Nullable, } } diff --git a/src/state.rs b/src/state.rs index 11d59e0..6558566 100644 --- a/src/state.rs +++ b/src/state.rs @@ -6,11 +6,14 @@ use lru::LruCache; use std::{collections::HashSet, sync::Arc}; use tokio::sync::RwLock; use ttl_cache::TtlCache; +use uuid::Uuid; use crate::{apub::AcceptedActors, db_actor::Pool}; #[derive(Clone)] pub struct State { + use_https: bool, + hostname: String, whitelist_enabled: bool, actor_cache: Arc>>, actor_id_cache: Arc>>, @@ -19,11 +22,69 @@ pub struct State { listeners: Arc>>, } +pub enum UrlKind { + Activity, + Actor, + Followers, + Following, + Inbox, +} + #[derive(Clone, Debug, thiserror::Error)] #[error("No host present in URI")] pub struct HostError; impl State { + pub fn generate_url(&self, kind: UrlKind) -> String { + let scheme = if self.use_https { "https" } else { "http" }; + + match kind { + UrlKind::Activity => { + format!("{}://{}/activity/{}", scheme, self.hostname, Uuid::new_v4()) + } + UrlKind::Actor => format!("{}://{}/actor", scheme, self.hostname), + UrlKind::Followers => format!("{}://{}/followers", scheme, self.hostname), + UrlKind::Following => format!("{}://{}/following", scheme, self.hostname), + UrlKind::Inbox => format!("{}://{}/inbox", scheme, self.hostname), + } + } + + pub async fn remove_listener(&self, client: &Client, inbox: &XsdAnyUri) -> Result<(), Error> { + let hs = self.listeners.clone(); + + log::info!("DELETE FROM listeners WHERE actor_id = {};", inbox.as_str()); + client + .execute( + "DELETE FROM listeners WHERE actor_id = $1::TEXT;", + &[&inbox.as_str()], + ) + .await?; + + let mut write_guard = hs.write().await; + write_guard.remove(inbox); + + Ok(()) + } + + pub async fn listeners_without(&self, inbox: &XsdAnyUri, domain: &str) -> Vec { + let hs = self.listeners.clone(); + + let read_guard = hs.read().await; + + read_guard + .iter() + .filter_map(|listener| { + if let Some(host) = listener.as_url().host() { + if listener != inbox && host.to_string() != domain { + return Some(listener.clone()); + } + } + + None + }) + .collect() + } + pub async fn is_whitelisted(&self, actor_id: &XsdAnyUri) -> bool { if !self.whitelist_enabled { return true; @@ -94,9 +155,13 @@ impl State { return Err(HostError.into()); }; + log::info!( + "INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]", + host.to_string() + ); client .execute( - "INSERT INTO blocks (actor_id, created_at) VALUES ($1::TEXT, now);", + "INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now');", &[&host.to_string()], ) .await?; @@ -116,9 +181,13 @@ impl State { return Err(HostError.into()); }; + log::info!( + "INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]", + host.to_string() + ); client .execute( - "INSERT INTO whitelists (actor_id, created_at) VALUES ($1::TEXT, now);", + "INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now');", &[&host.to_string()], ) .await?; @@ -132,9 +201,13 @@ impl State { pub async fn add_listener(&self, client: &Client, listener: XsdAnyUri) -> Result<(), Error> { let listeners = self.listeners.clone(); + log::info!( + "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]", + listener.as_str(), + ); client .execute( - "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, now);", + "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');", &[&listener.as_str()], ) .await?; @@ -145,7 +218,12 @@ impl State { Ok(()) } - pub async fn hydrate(whitelist_enabled: bool, pool: Pool) -> Result { + pub async fn hydrate( + use_https: bool, + whitelist_enabled: bool, + hostname: String, + pool: Pool, + ) -> Result { let pool1 = pool.clone(); let pool2 = pool.clone(); @@ -170,7 +248,9 @@ impl State { let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?; Ok(State { + use_https, whitelist_enabled, + hostname, actor_cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))), actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))), blocks: Arc::new(RwLock::new(blocks)), @@ -181,12 +261,14 @@ impl State { } pub async fn hydrate_blocks(client: &Client) -> Result, Error> { + log::info!("SELECT domain_name FROM blocks"); let rows = client.query("SELECT domain_name FROM blocks", &[]).await?; parse_rows(rows) } pub async fn hydrate_whitelists(client: &Client) -> Result, Error> { + log::info!("SELECT domain_name FROM whitelists"); let rows = client .query("SELECT domain_name FROM whitelists", &[]) .await?; @@ -195,6 +277,7 @@ pub async fn hydrate_whitelists(client: &Client) -> Result, Erro } pub async fn hydrate_listeners(client: &Client) -> Result, Error> { + log::info!("SELECT actor_id FROM listeners"); let rows = client.query("SELECT actor_id FROM listeners", &[]).await?; parse_rows(rows)