From aa6b5daaf05de4869c3d093258e42be1fee63356 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 15 Mar 2020 12:49:27 -0500 Subject: [PATCH] Follow 'works' --- Cargo.lock | 4 +- Cargo.toml | 2 +- .../2020-03-14-213217_create_blocks/down.sql | 2 +- .../2020-03-14-213217_create_blocks/up.sql | 4 +- .../down.sql | 2 +- .../up.sql | 4 +- src/apub.rs | 20 ++-- src/db_actor.rs | 4 +- src/inbox.rs | 108 ++++++++++++++++-- src/main.rs | 2 +- src/schema.rs | 4 +- src/state.rs | 84 +++++++++++--- 12 files changed, 188 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b7231a..8f862ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,9 +2,9 @@ # It is not intended for manual editing. [[package]] name = "activitystreams" -version = "0.5.0-alpha.5" +version = "0.5.0-alpha.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cb2b3d9e17c12a21669b061015ca54c777ace07f824b4f275ef3c3a9c736d9e" +checksum = "9057510df06a864f3a4da22c393373ae13850f001655a102acb4a8ada0895d9d" dependencies = [ "activitystreams-derive", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 2864410..6b979e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" actix = "0.10.0-alpha.2" actix-web = { version = "3.0.0-alpha.1", features = ["openssl"] } actix-rt = "1.0.0" -activitystreams = "0.5.0-alpha.5" +activitystreams = "0.5.0-alpha.6" bb8-postgres = "0.4.0" dotenv = "0.15.0" futures = "0.3.4" diff --git a/migrations/2020-03-14-213217_create_blocks/down.sql b/migrations/2020-03-14-213217_create_blocks/down.sql index 79e5e3b..c25fc5c 100644 --- a/migrations/2020-03-14-213217_create_blocks/down.sql +++ b/migrations/2020-03-14-213217_create_blocks/down.sql @@ -1,3 +1,3 @@ -- This file should undo anything in `up.sql` -DROP INDEX blocks_actor_id_index; +DROP INDEX blocks_domain_name_index; DROP TABLE blocks; diff --git a/migrations/2020-03-14-213217_create_blocks/up.sql b/migrations/2020-03-14-213217_create_blocks/up.sql index 085a82c..75b5871 100644 --- a/migrations/2020-03-14-213217_create_blocks/up.sql +++ b/migrations/2020-03-14-213217_create_blocks/up.sql @@ -1,11 +1,11 @@ -- Your SQL goes here CREATE TABLE blocks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - actor_id TEXT UNIQUE NOT NULL, + domain_name TEXT UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); -CREATE INDEX blocks_actor_id_index ON blocks(actor_id); +CREATE INDEX blocks_domain_name_index ON blocks(domain_name); SELECT diesel_manage_updated_at('blocks'); diff --git a/migrations/2020-03-14-213511_create_whitelists/down.sql b/migrations/2020-03-14-213511_create_whitelists/down.sql index 2af899d..ee24697 100644 --- a/migrations/2020-03-14-213511_create_whitelists/down.sql +++ b/migrations/2020-03-14-213511_create_whitelists/down.sql @@ -1,3 +1,3 @@ -- This file should undo anything in `up.sql` -DROP INDEX whitelists_actor_id_index; +DROP INDEX whitelists_domain_name_index; DROP TABLE whitelists; diff --git a/migrations/2020-03-14-213511_create_whitelists/up.sql b/migrations/2020-03-14-213511_create_whitelists/up.sql index 7c0e4e0..7a90398 100644 --- a/migrations/2020-03-14-213511_create_whitelists/up.sql +++ b/migrations/2020-03-14-213511_create_whitelists/up.sql @@ -1,11 +1,11 @@ -- Your SQL goes here CREATE TABLE whitelists ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - actor_id TEXT UNIQUE NOT NULL, + domain_name TEXT UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); -CREATE INDEX whitelists_actor_id_index ON whitelists(actor_id); +CREATE INDEX whitelists_domain_name_index ON whitelists(domain_name); SELECT diesel_manage_updated_at('whitelists'); diff --git a/src/apub.rs b/src/apub.rs index 6592392..687b508 100644 --- a/src/apub.rs +++ b/src/apub.rs @@ -24,12 +24,6 @@ pub enum ValidTypes { Undo, } -impl Default for ValidTypes { - fn default() -> Self { - ValidTypes::Create - } -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(untagged)] #[serde(rename_all = "camelCase")] @@ -38,13 +32,7 @@ pub enum ValidObjects { Object(AnyExistingObject), } -impl Default for ValidObjects { - fn default() -> Self { - ValidObjects::Id(Default::default()) - } -} - -#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct AcceptedObjects { pub id: XsdAnyUri, @@ -84,3 +72,9 @@ impl ValidObjects { } } } + +impl AcceptedActors { + pub fn inbox(&self) -> &XsdAnyUri { + self.endpoints.shared_inbox.as_ref().unwrap_or(&self.inbox) + } +} diff --git a/src/db_actor.rs b/src/db_actor.rs index e0827ca..f98a195 100644 --- a/src/db_actor.rs +++ b/src/db_actor.rs @@ -76,7 +76,7 @@ impl Supervised for DbActor {} impl Handler> for DbActor where - F: Fn(Pool) -> Fut + 'static, + F: FnOnce(Pool) -> Fut + 'static, Fut: Future, R: Send + 'static, { @@ -107,7 +107,7 @@ where impl Message for DbQuery where - F: Fn(Pool) -> Fut, + F: FnOnce(Pool) -> Fut, Fut: Future, R: Send + 'static, { diff --git a/src/inbox.rs b/src/inbox.rs index 8b3a234..ab0db0b 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -1,11 +1,15 @@ -use activitystreams::primitives::XsdAnyUri; +use activitystreams::{ + activity::apub::{Accept, Follow}, + primitives::XsdAnyUri, +}; use actix::Addr; use actix_web::{client::Client, web, Responder}; -use log::info; +use futures::join; +use log::{error, info}; use crate::{ apub::{AcceptedActors, AcceptedObjects, ValidTypes}, - db_actor::DbActor, + db_actor::{DbActor, DbQuery, Pool}, state::State, }; @@ -21,19 +25,75 @@ pub async fn inbox( ) -> Result { let input = input.into_inner(); - info!("Relaying {} for {}", input.object.id(), input.actor); - let actor = fetch_actor(state, client, &input.actor).await?; - info!("Actor, {:#?}", actor); + let actor = fetch_actor(state.clone(), client, &input.actor).await?; match input.kind { ValidTypes::Announce => (), ValidTypes::Create => (), ValidTypes::Delete => (), - ValidTypes::Follow => (), + ValidTypes::Follow => return handle_follow(db_actor, state, input, actor).await, ValidTypes::Undo => (), } - Ok("{}") + Err(MyError) +} + +async fn handle_follow( + db_actor: web::Data>, + state: web::Data, + input: AcceptedObjects, + actor: AcceptedActors, +) -> Result, MyError> { + let (is_listener, is_blocked, is_whitelisted) = join!( + state.is_listener(&actor.id), + state.is_blocked(&actor.id), + state.is_whitelisted(&actor.id) + ); + + if is_blocked { + error!("Follow from blocked listener, {}", actor.id); + return Err(MyError); + } + + if !is_whitelisted { + error!("Follow from non-whitelisted listener, {}", actor.id); + return Err(MyError); + } + + if !is_listener { + let state = state.into_inner(); + + let actor = actor.clone(); + db_actor.do_send(DbQuery(move |pool: Pool| { + let actor_id = actor.id.clone(); + let state = state.clone(); + + async move { + let conn = pool.get().await?; + + state.add_listener(&conn, actor_id).await + } + })); + } + + let mut accept = Accept::default(); + let mut follow = Follow::default(); + follow.object_props.set_id(input.id)?; + follow + .follow_props + .set_object_xsd_any_uri(format!("https://{}/actor", "localhost"))? + .set_actor_xsd_any_uri(actor.id.clone())?; + + accept + .object_props + .set_id(format!("https://{}/activities/{}", "localhost", "1"))? + .set_many_to_xsd_any_uris(vec![actor.id])?; + accept + .accept_props + .set_object_object_box(follow)? + .set_actor_xsd_any_uri(format!("https://{}/actor", "localhost"))?; + + Ok(web::Json(accept)) } async fn fetch_actor( @@ -46,14 +106,20 @@ async fn fetch_actor( } let actor: AcceptedActors = client - .get(actor_id.as_ref()) + .get(actor_id.as_str()) .header("Accept", "application/activity+json") .send() .await - .map_err(|_| MyError)? + .map_err(|e| { + error!("Couldn't send request for actor, {}", e); + MyError + })? .json() .await - .map_err(|_| MyError)?; + .map_err(|e| { + error!("Coudn't fetch actor, {}", e); + MyError + })?; state.cache_actor(actor_id.to_owned(), actor.clone()).await; @@ -61,3 +127,23 @@ async fn fetch_actor( } impl actix_web::error::ResponseError for MyError {} + +impl From for MyError { + fn from(_: std::convert::Infallible) -> Self { + MyError + } +} + +impl From for MyError { + fn from(_: activitystreams::primitives::XsdAnyUriError) -> Self { + error!("Error parsing URI"); + MyError + } +} + +impl From for MyError { + fn from(e: std::io::Error) -> Self { + error!("JSON Error, {}", e); + MyError + } +} diff --git a/src/main.rs b/src/main.rs index 96cb81e..53a7cf2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,7 @@ async fn main() -> Result<(), anyhow::Error> { arbiter_labeler.clone().set_label(); let state: State = db_actor - .send(db_actor::DbQuery(State::hydrate)) + .send(db_actor::DbQuery(|pool| State::hydrate(false, pool))) .await? .await??; diff --git a/src/schema.rs b/src/schema.rs index e8c0d52..6c62165 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,7 +1,7 @@ table! { blocks (id) { id -> Uuid, - actor_id -> Text, + domain_name -> Text, created_at -> Timestamp, updated_at -> Timestamp, } @@ -19,7 +19,7 @@ table! { table! { whitelists (id) { id -> Uuid, - actor_id -> Text, + domain_name -> Text, created_at -> Timestamp, updated_at -> Timestamp, } diff --git a/src/state.rs b/src/state.rs index 0b716a5..11d59e0 100644 --- a/src/state.rs +++ b/src/state.rs @@ -11,14 +11,52 @@ use crate::{apub::AcceptedActors, db_actor::Pool}; #[derive(Clone)] pub struct State { + whitelist_enabled: bool, actor_cache: Arc>>, actor_id_cache: Arc>>, - blocks: Arc>>, - whitelists: Arc>>, + blocks: Arc>>, + whitelists: Arc>>, listeners: Arc>>, } +#[derive(Clone, Debug, thiserror::Error)] +#[error("No host present in URI")] +pub struct HostError; + impl State { + pub async fn is_whitelisted(&self, actor_id: &XsdAnyUri) -> bool { + if !self.whitelist_enabled { + return true; + } + + let hs = self.whitelists.clone(); + + if let Some(host) = actor_id.as_url().host() { + let read_guard = hs.read().await; + return read_guard.contains(&host.to_string()); + } + + false + } + + pub async fn is_blocked(&self, actor_id: &XsdAnyUri) -> bool { + let hs = self.blocks.clone(); + + if let Some(host) = actor_id.as_url().host() { + let read_guard = hs.read().await; + return read_guard.contains(&host.to_string()); + } + + true + } + + pub async fn is_listener(&self, actor_id: &XsdAnyUri) -> bool { + let hs = self.listeners.clone(); + + let read_guard = hs.read().await; + read_guard.contains(actor_id) + } + pub async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option { let cache = self.actor_cache.clone(); @@ -50,15 +88,21 @@ impl State { pub async fn add_block(&self, client: &Client, block: XsdAnyUri) -> Result<(), Error> { let blocks = self.blocks.clone(); + let host = if let Some(host) = block.as_url().host() { + host + } else { + return Err(HostError.into()); + }; + client .execute( "INSERT INTO blocks (actor_id, created_at) VALUES ($1::TEXT, now);", - &[&block.as_ref()], + &[&host.to_string()], ) .await?; let mut write_guard = blocks.write().await; - write_guard.insert(block); + write_guard.insert(host.to_string()); Ok(()) } @@ -66,15 +110,21 @@ impl State { pub async fn add_whitelist(&self, client: &Client, whitelist: XsdAnyUri) -> Result<(), Error> { let whitelists = self.whitelists.clone(); + let host = if let Some(host) = whitelist.as_url().host() { + host + } else { + return Err(HostError.into()); + }; + client .execute( "INSERT INTO whitelists (actor_id, created_at) VALUES ($1::TEXT, now);", - &[&whitelist.as_ref()], + &[&host.to_string()], ) .await?; let mut write_guard = whitelists.write().await; - write_guard.insert(whitelist); + write_guard.insert(host.to_string()); Ok(()) } @@ -85,7 +135,7 @@ impl State { client .execute( "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, now);", - &[&listener.as_ref()], + &[&listener.as_str()], ) .await?; @@ -95,7 +145,7 @@ impl State { Ok(()) } - pub async fn hydrate(pool: Pool) -> Result { + pub async fn hydrate(whitelist_enabled: bool, pool: Pool) -> Result { let pool1 = pool.clone(); let pool2 = pool.clone(); @@ -120,6 +170,7 @@ impl State { let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?; Ok(State { + whitelist_enabled, actor_cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))), actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))), blocks: Arc::new(RwLock::new(blocks)), @@ -129,14 +180,16 @@ impl State { } } -pub async fn hydrate_blocks(client: &Client) -> Result, Error> { - let rows = client.query("SELECT actor_id FROM blocks", &[]).await?; +pub async fn hydrate_blocks(client: &Client) -> Result, Error> { + let rows = client.query("SELECT domain_name FROM blocks", &[]).await?; parse_rows(rows) } -pub async fn hydrate_whitelists(client: &Client) -> Result, Error> { - let rows = client.query("SELECT actor_id FROM whitelists", &[]).await?; +pub async fn hydrate_whitelists(client: &Client) -> Result, Error> { + let rows = client + .query("SELECT domain_name FROM whitelists", &[]) + .await?; parse_rows(rows) } @@ -147,11 +200,14 @@ pub async fn hydrate_listeners(client: &Client) -> Result, Er parse_rows(rows) } -pub fn parse_rows(rows: Vec) -> Result, Error> { +pub fn parse_rows(rows: Vec) -> Result, Error> +where + T: std::str::FromStr + Eq + std::hash::Hash, +{ let hs = rows .into_iter() .filter_map(move |row| { - let s: String = row.try_get("actor_id").ok()?; + let s: String = row.try_get(0).ok()?; s.parse().ok() }) .collect();