bro we are sledding

This commit is contained in:
asonix 2021-02-09 22:05:06 -06:00
parent d7e68190c4
commit 50d2b5b21c
60 changed files with 1261 additions and 2460 deletions

1
.env
View file

@ -1,2 +1 @@
OUT_DIR="compiled_templates"
DATABASE_URL=postgres://ap_actix:ap_actix@localhost:5432/ap_actix DATABASE_URL=postgres://ap_actix:ap_actix@localhost:5432/ap_actix

562
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package] [package]
name = "relay" name = "relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.1.0" version = "0.2.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license-file = "LICENSE" license-file = "LICENSE"
readme = "README.md" readme = "README.md"
@ -15,9 +15,9 @@ build = "src/build.rs"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-web = { version = "3.0.1", default-features = false, features = ["rustls", "compress"] } actix-web = { version = "3.3.2", default-features = false, features = ["rustls", "compress"] }
actix-webfinger = "0.3.0" actix-webfinger = "0.3.0"
activitystreams = "0.7.0-alpha.4" activitystreams = "0.7.0-alpha.9"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.2"
ammonia = "3.1.0" ammonia = "3.1.0"
async-mutex = "1.0.1" async-mutex = "1.0.1"
@ -27,8 +27,6 @@ background-jobs = "0.8.0"
base64 = "0.13" base64 = "0.13"
chrono = "0.4.19" chrono = "0.4.19"
config = "0.10.1" config = "0.10.1"
deadpool = "0.5.1"
deadpool-postgres = "0.5.5"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.8.2" env_logger = "0.8.2"
futures = "0.3.4" futures = "0.3.4"
@ -45,9 +43,9 @@ rsa-pem = "0.2.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.9" sha2 = "0.9"
sled = "0.34.6"
structopt = "0.3.12" structopt = "0.3.12"
thiserror = "1.0" thiserror = "1.0"
tokio-postgres = { version = "0.5.1", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] }
ttl_cache = "0.5.1" ttl_cache = "0.5.1"
uuid = { version = "0.8", features = ["v4", "serde"] } uuid = { version = "0.8", features = ["v4", "serde"] }

View file

@ -1,7 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
TAG=$1 TAG=$1
MIGRATIONS=$2
function require() { function require() {
if [ "$1" = "" ]; then if [ "$1" = "" ]; then
@ -15,11 +14,10 @@ function print_help() {
echo "build.sh" echo "build.sh"
echo "" echo ""
echo "Usage:" echo "Usage:"
echo " build.sh [tag] [migrations]" echo " build.sh [tag]"
echo "" echo ""
echo "Args:" echo "Args:"
echo " tag: The git tag to create and publish" echo " tag: The git tag to create and publish"
echo " migrations: (optional) Whether to build the migrations container as well"
} }
function build_image() { function build_image() {
@ -61,12 +59,3 @@ build_image "asonix/relay" "$TAG" "amd64"
./manifest.sh "asonix/relay" "$TAG" ./manifest.sh "asonix/relay" "$TAG"
./manifest.sh "asonix/relay" "latest" ./manifest.sh "asonix/relay" "latest"
if [ "${MIGRATIONS}" = "migrations" ]; then
build_image "asonix/relay-migrations" "$TAG" arm64v8
build_image "asonix/relay-migrations" "$TAG" arm32v7
build_image "asonix/relay-migrations" "$TAG" amd64
./manifest.sh "asonix/relay-migrations" "$TAG"
./manifest.sh "asonix/relay-migrations" "latest"
fi

View file

View file

@ -1,6 +0,0 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View file

@ -1,36 +0,0 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX listeners_actor_id_index;
DROP TABLE listeners;

View file

@ -1,11 +0,0 @@
-- Your SQL goes here
CREATE TABLE listeners (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
);
CREATE INDEX listeners_actor_id_index ON listeners(actor_id);
SELECT diesel_manage_updated_at('listeners');

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX blocks_domain_name_index;
DROP TABLE blocks;

View file

@ -1,11 +0,0 @@
-- Your SQL goes here
CREATE TABLE blocks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain_name TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
);
CREATE INDEX blocks_domain_name_index ON blocks(domain_name);
SELECT diesel_manage_updated_at('blocks');

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX whitelists_domain_name_index;
DROP TABLE whitelists;

View file

@ -1,11 +0,0 @@
-- Your SQL goes here
CREATE TABLE whitelists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain_name TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
);
CREATE INDEX whitelists_domain_name_index ON whitelists(domain_name);
SELECT diesel_manage_updated_at('whitelists');

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX settings_key_index;
DROP TABLE settings;

View file

@ -1,12 +0,0 @@
-- Your SQL goes here
CREATE TABLE settings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT UNIQUE NOT NULL,
value TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
);
CREATE INDEX settings_key_index ON settings(key);
SELECT diesel_manage_updated_at('settings');

View file

@ -1,8 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TRIGGER IF EXISTS whitelists_notify ON whitelists;
DROP TRIGGER IF EXISTS blocks_notify ON blocks;
DROP TRIGGER IF EXISTS listeners_notify ON listeners;
DROP FUNCTION IF EXISTS invoke_whitelists_trigger();
DROP FUNCTION IF EXISTS invoke_blocks_trigger();
DROP FUNCTION IF EXISTS invoke_listeners_trigger();

View file

@ -1,99 +0,0 @@
-- Your SQL goes here
CREATE OR REPLACE FUNCTION invoke_listeners_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_listeners';
payload := NEW.actor_id;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_listeners';
payload := OLD.actor_id;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE OR REPLACE FUNCTION invoke_blocks_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_blocks';
payload := NEW.domain_name;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_blocks';
payload := OLD.domain_name;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN NULL;
END;
$$;
CREATE OR REPLACE FUNCTION invoke_whitelists_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_whitelists';
payload := NEW.domain_name;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_whitelists';
payload := OLD.domain_name;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE TRIGGER listeners_notify
AFTER INSERT OR UPDATE OR DELETE
ON listeners
FOR EACH ROW
EXECUTE PROCEDURE invoke_listeners_trigger();
CREATE TRIGGER blocks_notify
AFTER INSERT OR UPDATE OR DELETE
ON blocks
FOR EACH ROW
EXECUTE PROCEDURE invoke_blocks_trigger();
CREATE TRIGGER whitelists_notify
AFTER INSERT OR UPDATE OR DELETE
ON whitelists
FOR EACH ROW
EXECUTE PROCEDURE invoke_whitelists_trigger();

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP INDEX jobs_queue_status_index;
DROP TABLE jobs;

View file

@ -1,17 +0,0 @@
-- Your SQL goes here
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID UNIQUE NOT NULL,
job_queue TEXT NOT NULL,
job_timeout BIGINT NOT NULL,
job_updated TIMESTAMP NOT NULL,
job_status TEXT NOT NULL,
job_value JSONB NOT NULL,
job_next_run TIMESTAMP,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX jobs_queue_status_index ON jobs(job_queue, job_status);
SELECT diesel_manage_updated_at('jobs');

View file

@ -1,4 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TRIGGER IF EXISTS actors_notify ON actors;
DROP FUNCTION IF EXISTS invoke_actors_trigger();
DROP TABLE actors;

View file

@ -1,49 +0,0 @@
-- Your SQL goes here
CREATE TABLE actors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
public_key TEXT NOT NULL,
public_key_id TEXT UNIQUE NOT NULL,
listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
SELECT diesel_manage_updated_at('actors');
CREATE OR REPLACE FUNCTION invoke_actors_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_actors';
payload := NEW.actor_id;
WHEN 'UPDATE' THEN
rec := NEW;
channel := 'new_actors';
payload := NEW.actor_id;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_actors';
payload := OLD.actor_id;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE TRIGGER actors_notify
AFTER INSERT OR UPDATE OR DELETE
ON actors
FOR EACH ROW
EXECUTE PROCEDURE invoke_actors_trigger();

View file

@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TABLE nodes;

View file

@ -1,12 +0,0 @@
-- Your SQL goes here
CREATE TABLE nodes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE,
nodeinfo JSONB,
instance JSONB,
contact JSONB,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
SELECT diesel_manage_updated_at('nodes');

View file

@ -1,3 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TRIGGER IF EXISTS nodes_notify ON nodes;
DROP FUNCTION IF EXISTS invoke_nodes_trigger();

View file

@ -1,37 +0,0 @@
-- Your SQL goes here
CREATE OR REPLACE FUNCTION invoke_nodes_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_nodes';
payload := NEW.listener_id;
WHEN 'UPDATE' THEN
rec := NEW;
channel := 'new_nodes';
payload := NEW.listener_id;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_nodes';
payload := OLD.listener_id;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE TRIGGER nodes_notify
AFTER INSERT OR UPDATE OR DELETE
ON nodes
FOR EACH ROW
EXECUTE PROCEDURE invoke_nodes_trigger();

View file

@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
ALTER TABLE nodes DROP CONSTRAINT nodes_listener_ids_unique;

View file

@ -1,2 +0,0 @@
-- Your SQL goes here
ALTER TABLE nodes ADD CONSTRAINT nodes_listener_ids_unique UNIQUE (listener_id);

View file

@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TABLE media;

View file

@ -1,10 +0,0 @@
-- Your SQL goes here
CREATE TABLE media (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
media_id UUID UNIQUE NOT NULL,
url TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
SELECT diesel_manage_updated_at('media');

View file

@ -6,25 +6,11 @@ pub struct Args {
#[structopt(short, help = "A list of domains that should be blocked")] #[structopt(short, help = "A list of domains that should be blocked")]
blocks: Vec<String>, blocks: Vec<String>,
#[structopt(short, help = "A list of domains that should be whitelisted")] #[structopt(short, help = "A list of domains that should be allowed")]
whitelists: Vec<String>, allowed: Vec<String>,
#[structopt(short, long, help = "Undo whitelisting or blocking domains")] #[structopt(short, long, help = "Undo allowing or blocking domains")]
undo: bool, undo: bool,
#[structopt(
short,
long,
help = "Only process background jobs, do not start the relay server"
)]
jobs_only: bool,
#[structopt(
short,
long,
help = "Only run the relay server, do not process background jobs"
)]
no_jobs: bool,
} }
impl Args { impl Args {
@ -36,19 +22,11 @@ impl Args {
&self.blocks &self.blocks
} }
pub fn whitelists(&self) -> &[String] { pub fn allowed(&self) -> &[String] {
&self.whitelists &self.allowed
} }
pub fn undo(&self) -> bool { pub fn undo(&self) -> bool {
self.undo self.undo
} }
pub fn jobs_only(&self) -> bool {
self.jobs_only
}
pub fn no_jobs(&self) -> bool {
self.no_jobs
}
} }

View file

@ -8,7 +8,7 @@ use activitystreams::{uri, url::Url};
use config::Environment; use config::Environment;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::net::IpAddr; use std::{net::IpAddr, path::PathBuf};
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize)] #[derive(Clone, Debug, serde::Deserialize)]
@ -17,13 +17,14 @@ pub struct ParsedConfig {
addr: IpAddr, addr: IpAddr,
port: u16, port: u16,
debug: bool, debug: bool,
whitelist_mode: bool, restricted_mode: bool,
validate_signatures: bool, validate_signatures: bool,
https: bool, https: bool,
database_url: String, database_url: String,
pretty_log: bool, pretty_log: bool,
publish_blocks: bool, publish_blocks: bool,
max_connections: usize, max_connections: usize,
sled_path: PathBuf,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -32,13 +33,14 @@ pub struct Config {
addr: IpAddr, addr: IpAddr,
port: u16, port: u16,
debug: bool, debug: bool,
whitelist_mode: bool, restricted_mode: bool,
validate_signatures: bool, validate_signatures: bool,
database_url: String, database_url: String,
pretty_log: bool, pretty_log: bool,
publish_blocks: bool, publish_blocks: bool,
max_connections: usize, max_connections: usize,
base_uri: Url, base_uri: Url,
sled_path: PathBuf,
} }
pub enum UrlKind { pub enum UrlKind {
@ -62,12 +64,13 @@ impl Config {
.set_default("addr", "127.0.0.1")? .set_default("addr", "127.0.0.1")?
.set_default("port", 8080)? .set_default("port", 8080)?
.set_default("debug", true)? .set_default("debug", true)?
.set_default("whitelist_mode", false)? .set_default("restricted_mode", false)?
.set_default("validate_signatures", false)? .set_default("validate_signatures", false)?
.set_default("https", false)? .set_default("https", false)?
.set_default("pretty_log", true)? .set_default("pretty_log", true)?
.set_default("publish_blocks", false)? .set_default("publish_blocks", false)?
.set_default("max_connections", 2)? .set_default("max_connections", 2)?
.set_default("sled_path", "./sled/db-0-34")?
.merge(Environment::new())?; .merge(Environment::new())?;
let config: ParsedConfig = config.try_into()?; let config: ParsedConfig = config.try_into()?;
@ -80,16 +83,21 @@ impl Config {
addr: config.addr, addr: config.addr,
port: config.port, port: config.port,
debug: config.debug, debug: config.debug,
whitelist_mode: config.whitelist_mode, restricted_mode: config.restricted_mode,
validate_signatures: config.validate_signatures, validate_signatures: config.validate_signatures,
database_url: config.database_url, database_url: config.database_url,
pretty_log: config.pretty_log, pretty_log: config.pretty_log,
publish_blocks: config.publish_blocks, publish_blocks: config.publish_blocks,
max_connections: config.max_connections, max_connections: config.max_connections,
base_uri, base_uri,
sled_path: config.sled_path,
}) })
} }
pub fn sled_path(&self) -> &PathBuf {
&self.sled_path
}
pub fn pretty_log(&self) -> bool { pub fn pretty_log(&self) -> bool {
self.pretty_log self.pretty_log
} }
@ -135,8 +143,8 @@ impl Config {
self.publish_blocks self.publish_blocks
} }
pub fn whitelist_mode(&self) -> bool { pub fn restricted_mode(&self) -> bool {
self.whitelist_mode self.restricted_mode
} }
pub fn database_url(&self) -> &str { pub fn database_url(&self) -> &str {
@ -156,7 +164,7 @@ impl Config {
} }
pub fn software_version(&self) -> String { pub fn software_version(&self) -> String {
"v0.1.0-main".to_owned() "v0.2.0-main".to_owned()
} }
pub fn source_code(&self) -> String { pub fn source_code(&self) -> String {

View file

@ -1,10 +1,11 @@
use crate::{apub::AcceptedActors, db::Db, error::MyError, requests::Requests}; use crate::{
use activitystreams::{prelude::*, uri, url::Url}; apub::AcceptedActors,
use async_rwlock::RwLock; db::{Actor, Db},
use log::error; error::MyError,
use std::{collections::HashSet, sync::Arc, time::Duration}; requests::Requests,
use ttl_cache::TtlCache; };
use uuid::Uuid; use activitystreams::{prelude::*, url::Url};
use std::time::{Duration, SystemTime};
const REFETCH_DURATION: Duration = Duration::from_secs(60 * 30); const REFETCH_DURATION: Duration = Duration::from_secs(60 * 30);
@ -15,14 +16,14 @@ pub enum MaybeCached<T> {
} }
impl<T> MaybeCached<T> { impl<T> MaybeCached<T> {
pub fn is_cached(&self) -> bool { pub(crate) fn is_cached(&self) -> bool {
match self { match self {
MaybeCached::Cached(_) => true, MaybeCached::Cached(_) => true,
_ => false, _ => false,
} }
} }
pub fn into_inner(self) -> T { pub(crate) fn into_inner(self) -> T {
match self { match self {
MaybeCached::Cached(t) | MaybeCached::Fetched(t) => t, MaybeCached::Cached(t) | MaybeCached::Fetched(t) => t,
} }
@ -32,28 +33,43 @@ impl<T> MaybeCached<T> {
#[derive(Clone)] #[derive(Clone)]
pub struct ActorCache { pub struct ActorCache {
db: Db, db: Db,
cache: Arc<RwLock<TtlCache<Url, Actor>>>,
following: Arc<RwLock<HashSet<Url>>>,
} }
impl ActorCache { impl ActorCache {
pub fn new(db: Db) -> Self { pub(crate) fn new(db: Db) -> Self {
let cache = ActorCache { ActorCache { db }
db,
cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))),
following: Arc::new(RwLock::new(HashSet::new())),
};
cache.spawn_rehydrate();
cache
} }
pub async fn is_following(&self, id: &Url) -> bool { pub(crate) async fn get(
self.following.read().await.contains(id) &self,
id: &Url,
requests: &Requests,
) -> Result<MaybeCached<Actor>, MyError> {
if let Some(actor) = self.db.actor(id.clone()).await? {
if actor.saved_at + REFETCH_DURATION > SystemTime::now() {
return Ok(MaybeCached::Cached(actor));
}
}
self.get_no_cache(id, requests)
.await
.map(MaybeCached::Fetched)
} }
pub async fn get_no_cache(&self, id: &Url, requests: &Requests) -> Result<Actor, MyError> { pub(crate) async fn follower(&self, actor: Actor) -> Result<(), MyError> {
self.db.add_listener(actor.id.clone()).await?;
self.db.save_actor(actor).await
}
pub(crate) async fn unfollower(&self, actor: &Actor) -> Result<(), MyError> {
self.db.remove_listener(actor.id.clone()).await
}
pub(crate) async fn get_no_cache(
&self,
id: &Url,
requests: &Requests,
) -> Result<Actor, MyError> {
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?; let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
let input_domain = id.domain().ok_or(MyError::MissingDomain)?; let input_domain = id.domain().ok_or(MyError::MissingDomain)?;
@ -68,244 +84,13 @@ impl ActorCache {
public_key: accepted_actor.ext_one.public_key.public_key_pem, public_key: accepted_actor.ext_one.public_key.public_key_pem,
public_key_id: accepted_actor.ext_one.public_key.id, public_key_id: accepted_actor.ext_one.public_key.id,
inbox: inbox.into(), inbox: inbox.into(),
saved_at: SystemTime::now(),
}; };
self.cache self.db.save_actor(actor.clone()).await?;
.write()
.await
.insert(id.clone(), actor.clone(), REFETCH_DURATION);
self.update(id, &actor.public_key, &actor.public_key_id)
.await?;
Ok(actor) Ok(actor)
} }
pub async fn get(&self, id: &Url, requests: &Requests) -> Result<MaybeCached<Actor>, MyError> {
if let Some(actor) = self.cache.read().await.get(id) {
return Ok(MaybeCached::Cached(actor.clone()));
}
if let Some(actor) = self.lookup(id).await? {
self.cache
.write()
.await
.insert(id.clone(), actor.clone(), REFETCH_DURATION);
return Ok(MaybeCached::Cached(actor));
}
self.get_no_cache(id, requests)
.await
.map(MaybeCached::Fetched)
}
pub async fn follower(&self, actor: &Actor) -> Result<(), MyError> {
self.save(actor.clone()).await
}
pub async fn cache_follower(&self, id: Url) {
self.following.write().await.insert(id);
}
pub async fn bust_follower(&self, id: &Url) {
self.following.write().await.remove(id);
}
pub async fn unfollower(&self, actor: &Actor) -> Result<Option<Uuid>, MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"DELETE FROM actors
WHERE actor_id = $1::TEXT
RETURNING listener_id;",
&[&actor.id.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let listener_id: Uuid = row.try_get(0)?;
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT FROM actors
WHERE listener_id = $1::UUID;",
&[&listener_id],
)
.await?;
if row_opt.is_none() {
return Ok(Some(listener_id));
}
Ok(None)
}
async fn lookup(&self, id: &Url) -> Result<Option<Actor>, MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT listeners.actor_id, actors.public_key, actors.public_key_id
FROM listeners
INNER JOIN actors ON actors.listener_id = listeners.id
WHERE
actors.actor_id = $1::TEXT
AND
actors.updated_at + INTERVAL '120 seconds' < NOW()
LIMIT 1;",
&[&id.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let inbox: String = row.try_get(0)?;
let public_key_id: String = row.try_get(2)?;
Ok(Some(Actor {
id: id.clone().into(),
inbox: uri!(inbox).into(),
public_key: row.try_get(1)?,
public_key_id: uri!(public_key_id).into(),
}))
}
async fn save(&self, actor: Actor) -> Result<(), MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;",
&[&actor.inbox.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Err(MyError::NotSubscribed(actor.id.as_str().to_owned()));
};
let listener_id: Uuid = row.try_get(0)?;
self.db
.pool()
.get()
.await?
.execute(
"INSERT INTO actors (
actor_id,
public_key,
public_key_id,
listener_id,
created_at,
updated_at
) VALUES (
$1::TEXT,
$2::TEXT,
$3::TEXT,
$4::UUID,
'now',
'now'
) ON CONFLICT (actor_id)
DO UPDATE SET public_key = $2::TEXT;",
&[
&actor.id.as_str(),
&actor.public_key,
&actor.public_key_id.as_str(),
&listener_id,
],
)
.await?;
Ok(())
}
async fn update(&self, id: &Url, public_key: &str, public_key_id: &Url) -> Result<(), MyError> {
self.db
.pool()
.get()
.await?
.execute(
"UPDATE actors
SET public_key = $2::TEXT, public_key_id = $3::TEXT
WHERE actor_id = $1::TEXT;",
&[&id.as_str(), &public_key, &public_key_id.as_str()],
)
.await?;
Ok(())
}
fn spawn_rehydrate(&self) {
use actix_rt::time::{interval_at, Instant};
let this = self.clone();
actix_rt::spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(60 * 10));
loop {
if let Err(e) = this.rehydrate().await {
error!("Error rehydrating follows, {}", e);
}
interval.tick().await;
}
});
}
async fn rehydrate(&self) -> Result<(), MyError> {
let rows = self
.db
.pool()
.get()
.await?
.query("SELECT actor_id FROM actors;", &[])
.await?;
let actor_ids = rows
.into_iter()
.filter_map(|row| match row.try_get(0) {
Ok(s) => {
let s: String = s;
match s.parse() {
Ok(s) => Some(s),
Err(e) => {
error!("Error parsing actor id, {}", e);
None
}
}
}
Err(e) => {
error!("Error getting actor id from row, {}", e);
None
}
})
.collect();
let mut write_guard = self.following.write().await;
*write_guard = actor_ids;
Ok(())
}
} }
fn get_inbox(actor: &AcceptedActors) -> Result<&Url, MyError> { fn get_inbox(actor: &AcceptedActors) -> Result<&Url, MyError> {
@ -314,11 +99,3 @@ fn get_inbox(actor: &AcceptedActors) -> Result<&Url, MyError> {
.and_then(|e| e.shared_inbox) .and_then(|e| e.shared_inbox)
.unwrap_or(actor.inbox()?)) .unwrap_or(actor.inbox()?))
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Actor {
pub id: Url,
pub public_key: String,
pub public_key_id: Url,
pub inbox: Url,
}

View file

@ -1,171 +1,79 @@
use crate::{db::Db, error::MyError}; use crate::{
db::{Db, MediaMeta},
error::MyError,
};
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use async_mutex::Mutex; use std::time::{Duration, SystemTime};
use async_rwlock::RwLock;
use futures::join;
use lru::LruCache;
use std::{collections::HashMap, sync::Arc, time::Duration};
use ttl_cache::TtlCache;
use uuid::Uuid; use uuid::Uuid;
static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2); static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2);
#[derive(Clone)] #[derive(Clone)]
pub struct Media { pub struct MediaCache {
db: Db, db: Db,
inverse: Arc<Mutex<HashMap<Url, Uuid>>>,
url_cache: Arc<Mutex<LruCache<Uuid, Url>>>,
byte_cache: Arc<RwLock<TtlCache<Uuid, (String, Bytes)>>>,
} }
impl Media { impl MediaCache {
pub fn new(db: Db) -> Self { pub fn new(db: Db) -> Self {
Media { MediaCache { db }
db,
inverse: Arc::new(Mutex::new(HashMap::new())),
url_cache: Arc::new(Mutex::new(LruCache::new(128))),
byte_cache: Arc::new(RwLock::new(TtlCache::new(128))),
}
} }
pub async fn get_uuid(&self, url: &Url) -> Result<Option<Uuid>, MyError> { pub async fn get_uuid(&self, url: Url) -> Result<Option<Uuid>, MyError> {
let res = self.inverse.lock().await.get(url).cloned(); self.db.media_id(url).await
let uuid = match res {
Some(uuid) => uuid,
_ => {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT media_id
FROM media
WHERE url = $1::TEXT
LIMIT 1;",
&[&url.as_str()],
)
.await?;
if let Some(row) = row_opt {
let uuid: Uuid = row.try_get(0)?;
self.inverse.lock().await.insert(url.clone(), uuid);
uuid
} else {
return Ok(None);
}
}
};
if self.url_cache.lock().await.contains(&uuid) {
return Ok(Some(uuid));
}
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT id
FROM media
WHERE
url = $1::TEXT
AND
media_id = $2::UUID
LIMIT 1;",
&[&url.as_str(), &uuid],
)
.await?;
if row_opt.is_some() {
self.url_cache.lock().await.put(uuid, url.clone());
return Ok(Some(uuid));
}
self.inverse.lock().await.remove(url);
Ok(None)
} }
pub async fn get_url(&self, uuid: Uuid) -> Result<Option<Url>, MyError> { pub async fn get_url(&self, uuid: Uuid) -> Result<Option<Url>, MyError> {
if let Some(url) = self.url_cache.lock().await.get(&uuid).cloned() { self.db.media_url(uuid).await
return Ok(Some(url)); }
pub async fn is_outdated(&self, uuid: Uuid) -> Result<bool, MyError> {
if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return Ok(false);
}
} }
let row_opt = self Ok(true)
.db }
.pool()
.get()
.await?
.query_opt(
"SELECT url
FROM media
WHERE media_id = $1::UUID
LIMIT 1;",
&[&uuid],
)
.await?;
if let Some(row) = row_opt { pub async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, MyError> {
let url: String = row.try_get(0)?; if let Some(meta) = self.db.media_meta(uuid).await? {
let url: Url = url.parse()?; if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return Ok(Some(url)); return self
.db
.media_bytes(uuid)
.await
.map(|opt| opt.map(|bytes| (meta.media_type, bytes)));
}
} }
Ok(None) Ok(None)
} }
pub async fn get_bytes(&self, uuid: Uuid) -> Option<(String, Bytes)> { pub async fn store_url(&self, url: Url) -> Result<Uuid, MyError> {
self.byte_cache.read().await.get(&uuid).cloned()
}
pub async fn store_url(&self, url: &Url) -> Result<Uuid, MyError> {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let (_, _, res) = join!( self.db.save_url(url, uuid).await?;
async {
self.inverse.lock().await.insert(url.clone(), uuid);
},
async {
self.url_cache.lock().await.put(uuid, url.clone());
},
async {
self.db
.pool()
.get()
.await?
.execute(
"INSERT INTO media (
media_id,
url,
created_at,
updated_at
) VALUES (
$1::UUID,
$2::TEXT,
'now',
'now'
) ON CONFLICT (media_id)
DO UPDATE SET url = $2::TEXT;",
&[&uuid, &url.as_str()],
)
.await?;
Ok(()) as Result<(), MyError>
}
);
res?;
Ok(uuid) Ok(uuid)
} }
pub async fn store_bytes(&self, uuid: Uuid, content_type: String, bytes: Bytes) { pub async fn store_bytes(
self.byte_cache &self,
.write() uuid: Uuid,
media_type: String,
bytes: Bytes,
) -> Result<(), MyError> {
self.db
.save_bytes(
uuid,
MediaMeta {
media_type,
saved_at: SystemTime::now(),
},
bytes,
)
.await .await
.insert(uuid, (content_type, bytes), MEDIA_DURATION);
} }
} }

View file

@ -3,9 +3,7 @@ mod media;
mod node; mod node;
mod state; mod state;
pub use self::{ pub(crate) use actor::ActorCache;
actor::{Actor, ActorCache}, pub(crate) use media::MediaCache;
media::Media, pub(crate) use node::{Node, NodeCache};
node::{Contact, Info, Instance, Node, NodeCache}, pub(crate) use state::State;
state::State,
};

View file

@ -1,341 +1,146 @@
use crate::{db::Db, error::MyError}; use crate::{
use activitystreams::{uri, url::Url}; db::{Contact, Db, Info, Instance},
use async_rwlock::RwLock; error::MyError,
use log::{debug, error};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, SystemTime},
}; };
use tokio_postgres::types::Json; use activitystreams::url::Url;
use uuid::Uuid; use std::time::{Duration, SystemTime};
pub type ListenersCache = Arc<RwLock<HashSet<Url>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct NodeCache { pub struct NodeCache {
db: Db, db: Db,
listeners: ListenersCache, }
nodes: Arc<RwLock<HashMap<Url, Node>>>,
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Node {
pub(crate) base: Url,
pub(crate) info: Option<Info>,
pub(crate) instance: Option<Instance>,
pub(crate) contact: Option<Contact>,
} }
impl NodeCache { impl NodeCache {
pub fn new(db: Db, listeners: ListenersCache) -> Self { pub(crate) fn new(db: Db) -> Self {
NodeCache { NodeCache { db }
db,
listeners,
nodes: Arc::new(RwLock::new(HashMap::new())),
}
} }
pub async fn nodes(&self) -> Vec<Node> { pub(crate) async fn nodes(&self) -> Result<Vec<Node>, MyError> {
let listeners: HashSet<_> = self.listeners.read().await.clone(); let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact().await?;
self.nodes let vec = self
.read() .db
.await .connected_ids()
.iter() .await?
.filter_map(|(k, v)| { .into_iter()
if listeners.contains(k) { .map(move |actor_id| {
Some(v.clone()) let info = infos.get(&actor_id).map(|info| info.clone());
} else { let instance = instances.get(&actor_id).map(|instance| instance.clone());
None let contact = contacts.get(&actor_id).map(|contact| contact.clone());
}
Node::new(actor_id)
.info(info)
.instance(instance)
.contact(contact)
}) })
.collect() .collect();
Ok(vec)
} }
pub async fn is_nodeinfo_outdated(&self, listener: &Url) -> bool { pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: Url) -> bool {
let read_guard = self.nodes.read().await; self.db
.info(actor_id)
let node = match read_guard.get(listener) { .await
None => { .map(|opt| opt.map(|info| info.outdated()).unwrap_or(true))
debug!("No node for listener {}", listener); .unwrap_or(true)
return true;
}
Some(node) => node,
};
match node.info.as_ref() {
Some(nodeinfo) => nodeinfo.outdated(),
None => {
debug!("No info for node {}", node.base);
true
}
}
} }
pub async fn is_contact_outdated(&self, listener: &Url) -> bool { pub(crate) async fn is_contact_outdated(&self, actor_id: Url) -> bool {
let read_guard = self.nodes.read().await; self.db
.contact(actor_id)
let node = match read_guard.get(listener) { .await
None => { .map(|opt| opt.map(|contact| contact.outdated()).unwrap_or(true))
debug!("No node for listener {}", listener); .unwrap_or(true)
return true;
}
Some(node) => node,
};
match node.contact.as_ref() {
Some(contact) => contact.outdated(),
None => {
debug!("No contact for node {}", node.base);
true
}
}
} }
pub async fn is_instance_outdated(&self, listener: &Url) -> bool { pub(crate) async fn is_instance_outdated(&self, actor_id: Url) -> bool {
let read_guard = self.nodes.read().await; self.db
.instance(actor_id)
let node = match read_guard.get(listener) { .await
None => { .map(|opt| opt.map(|instance| instance.outdated()).unwrap_or(true))
debug!("No node for listener {}", listener); .unwrap_or(true)
return true;
}
Some(node) => node,
};
match node.instance.as_ref() {
Some(instance) => instance.outdated(),
None => {
debug!("No instance for node {}", node.base);
true
}
}
} }
pub async fn cache_by_id(&self, id: Uuid) { pub(crate) async fn set_info(
if let Err(e) = self.do_cache_by_id(id).await {
error!("Error loading node into cache, {}", e);
}
}
pub async fn bust_by_id(&self, id: Uuid) {
if let Err(e) = self.do_bust_by_id(id).await {
error!("Error busting node cache, {}", e);
}
}
async fn do_bust_by_id(&self, id: Uuid) -> Result<(), MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT ls.actor_id
FROM listeners AS ls
INNER JOIN nodes AS nd ON nd.listener_id = ls.id
WHERE nd.id = $1::UUID
LIMIT 1;",
&[&id],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(());
};
let listener: String = row.try_get(0)?;
self.nodes.write().await.remove(&uri!(listener));
Ok(())
}
async fn do_cache_by_id(&self, id: Uuid) -> Result<(), MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT ls.actor_id, nd.nodeinfo, nd.instance, nd.contact
FROM nodes AS nd
INNER JOIN listeners AS ls ON nd.listener_id = ls.id
WHERE nd.id = $1::UUID
LIMIT 1;",
&[&id],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(());
};
let listener: String = row.try_get(0)?;
let listener = uri!(listener);
let info: Option<Json<Info>> = row.try_get(1)?;
let instance: Option<Json<Instance>> = row.try_get(2)?;
let contact: Option<Json<Contact>> = row.try_get(3)?;
{
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert_with(|| Node::new(listener));
if let Some(info) = info {
node.info = Some(info.0);
}
if let Some(instance) = instance {
node.instance = Some(instance.0);
}
if let Some(contact) = contact {
node.contact = Some(contact.0);
}
}
Ok(())
}
pub async fn set_info(
&self, &self,
listener: &Url, actor_id: Url,
software: String, software: String,
version: String, version: String,
reg: bool, reg: bool,
) -> Result<(), MyError> { ) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) { self.db
let mut nodes = self.nodes.write().await; .save_info(
nodes.remove(listener); actor_id,
return Ok(()); Info {
} software,
version,
let node = { reg,
let mut write_guard = self.nodes.write().await; updated: SystemTime::now(),
let node = write_guard },
.entry(listener.clone()) )
.or_insert_with(|| Node::new(listener.clone())); .await
node.set_info(software, version, reg);
node.clone()
};
self.save(listener, &node).await?;
Ok(())
} }
pub async fn set_instance( pub(crate) async fn set_instance(
&self, &self,
listener: &Url, actor_id: Url,
title: String, title: String,
description: String, description: String,
version: String, version: String,
reg: bool, reg: bool,
requires_approval: bool, requires_approval: bool,
) -> Result<(), MyError> { ) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) { self.db
let mut nodes = self.nodes.write().await; .save_instance(
nodes.remove(listener); actor_id,
return Ok(()); Instance {
} title,
description,
let node = { version,
let mut write_guard = self.nodes.write().await; reg,
let node = write_guard requires_approval,
.entry(listener.clone()) updated: SystemTime::now(),
.or_insert_with(|| Node::new(listener.clone())); },
node.set_instance(title, description, version, reg, requires_approval); )
node.clone() .await
};
self.save(listener, &node).await?;
Ok(())
} }
pub async fn set_contact( pub(crate) async fn set_contact(
&self, &self,
listener: &Url, actor_id: Url,
username: String, username: String,
display_name: String, display_name: String,
url: Url, url: Url,
avatar: Url, avatar: Url,
) -> Result<(), MyError> { ) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(listener);
return Ok(());
}
let node = {
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert_with(|| Node::new(listener.clone()));
node.set_contact(username, display_name, url, avatar);
node.clone()
};
self.save(listener, &node).await?;
Ok(())
}
pub async fn save(&self, listener: &Url, node: &Node) -> Result<(), MyError> {
let row_opt = self
.db
.pool()
.get()
.await?
.query_opt(
"SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;",
&[&listener.as_str()],
)
.await?;
let id: Uuid = if let Some(row) = row_opt {
row.try_get(0)?
} else {
return Err(MyError::NotSubscribed(listener.as_str().to_owned()));
};
self.db self.db
.pool() .save_contact(
.get() actor_id,
.await? Contact {
.execute( username,
"INSERT INTO nodes ( display_name,
listener_id, url,
nodeinfo, avatar,
instance, updated: SystemTime::now(),
contact, },
created_at,
updated_at
) VALUES (
$1::UUID,
$2::JSONB,
$3::JSONB,
$4::JSONB,
'now',
'now'
) ON CONFLICT (listener_id)
DO UPDATE SET
nodeinfo = $2::JSONB,
instance = $3::JSONB,
contact = $4::JSONB;",
&[
&id,
&Json(&node.info),
&Json(&node.instance),
&Json(&node.contact),
],
) )
.await?; .await
Ok(())
} }
} }
#[derive(Clone, Debug)]
pub struct Node {
pub base: Url,
pub info: Option<Info>,
pub instance: Option<Instance>,
pub contact: Option<Contact>,
}
impl Node { impl Node {
pub fn new(mut url: Url) -> Self { fn new(mut url: Url) -> Self {
url.set_fragment(None); url.set_fragment(None);
url.set_query(None); url.set_query(None);
url.set_path(""); url.set_path("");
@ -348,96 +153,38 @@ impl Node {
} }
} }
fn set_info(&mut self, software: String, version: String, reg: bool) -> &mut Self { fn info(mut self, info: Option<Info>) -> Self {
self.info = Some(Info { self.info = info;
software,
version,
reg,
updated: SystemTime::now(),
});
self self
} }
fn set_instance( fn instance(mut self, instance: Option<Instance>) -> Self {
&mut self, self.instance = instance;
title: String,
description: String,
version: String,
reg: bool,
requires_approval: bool,
) -> &mut Self {
self.instance = Some(Instance {
title,
description,
version,
reg,
requires_approval,
updated: SystemTime::now(),
});
self self
} }
fn set_contact( fn contact(mut self, contact: Option<Contact>) -> Self {
&mut self, self.contact = contact;
username: String,
display_name: String,
url: Url,
avatar: Url,
) -> &mut Self {
self.contact = Some(Contact {
username,
display_name,
url: url.into(),
avatar: avatar.into(),
updated: SystemTime::now(),
});
self self
} }
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Info {
pub software: String,
pub version: String,
pub reg: bool,
pub updated: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Instance {
pub title: String,
pub description: String,
pub version: String,
pub reg: bool,
pub requires_approval: bool,
pub updated: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Contact {
pub username: String,
pub display_name: String,
pub url: Url,
pub avatar: Url,
pub updated: SystemTime,
}
static TEN_MINUTES: Duration = Duration::from_secs(60 * 10); static TEN_MINUTES: Duration = Duration::from_secs(60 * 10);
impl Info { impl Info {
pub fn outdated(&self) -> bool { pub(crate) fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now() self.updated + TEN_MINUTES < SystemTime::now()
} }
} }
impl Instance { impl Instance {
pub fn outdated(&self) -> bool { pub(crate) fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now() self.updated + TEN_MINUTES < SystemTime::now()
} }
} }
impl Contact { impl Contact {
pub fn outdated(&self) -> bool { pub(crate) fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now() self.updated + TEN_MINUTES < SystemTime::now()
} }
} }

View file

@ -6,38 +6,31 @@ use crate::{
requests::{Breakers, Requests}, requests::{Breakers, Requests},
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_rt::{
spawn,
time::{interval_at, Instant},
};
use actix_web::web; use actix_web::web;
use async_rwlock::RwLock; use async_rwlock::RwLock;
use futures::{join, try_join}; use log::info;
use log::{error, info};
use lru::LruCache; use lru::LruCache;
use rand::thread_rng; use rand::thread_rng;
use rsa::{RSAPrivateKey, RSAPublicKey}; use rsa::{RSAPrivateKey, RSAPublicKey};
use std::{collections::HashSet, sync::Arc, time::Duration}; use std::sync::Arc;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub public_key: RSAPublicKey, pub(crate) public_key: RSAPublicKey,
private_key: RSAPrivateKey, private_key: RSAPrivateKey,
config: Config, config: Config,
actor_id_cache: Arc<RwLock<LruCache<Url, Url>>>, object_cache: Arc<RwLock<LruCache<Url, Url>>>,
blocks: Arc<RwLock<HashSet<String>>>,
whitelists: Arc<RwLock<HashSet<String>>>,
listeners: Arc<RwLock<HashSet<Url>>>,
node_cache: NodeCache, node_cache: NodeCache,
breakers: Breakers, breakers: Breakers,
pub(crate) db: Db,
} }
impl State { impl State {
pub fn node_cache(&self) -> NodeCache { pub(crate) fn node_cache(&self) -> NodeCache {
self.node_cache.clone() self.node_cache.clone()
} }
pub fn requests(&self) -> Requests { pub(crate) fn requests(&self) -> Requests {
Requests::new( Requests::new(
self.config.generate_url(UrlKind::MainKey).to_string(), self.config.generate_url(UrlKind::MainKey).to_string(),
self.private_key.clone(), self.private_key.clone(),
@ -51,168 +44,64 @@ impl State {
) )
} }
pub async fn bust_whitelist(&self, whitelist: &str) { pub(crate) async fn inboxes_without(
self.whitelists.write().await.remove(whitelist); &self,
} existing_inbox: &Url,
domain: &str,
pub async fn bust_block(&self, block: &str) { ) -> Result<Vec<Url>, MyError> {
self.blocks.write().await.remove(block); Ok(self
} .db
.inboxes()
pub async fn bust_listener(&self, inbox: &Url) { .await?
self.listeners.write().await.remove(inbox);
}
pub async fn listeners(&self) -> Vec<Url> {
self.listeners.read().await.iter().cloned().collect()
}
pub async fn blocks(&self) -> Vec<String> {
self.blocks.read().await.iter().cloned().collect()
}
pub async fn listeners_without(&self, inbox: &Url, domain: &str) -> Vec<Url> {
self.listeners
.read()
.await
.iter() .iter()
.filter_map(|listener| { .filter_map(|inbox| {
if let Some(dom) = listener.domain() { if let Some(dom) = inbox.domain() {
if listener != inbox && dom != domain { if inbox != existing_inbox && dom != domain {
return Some(listener.clone()); return Some(inbox.clone());
} }
} }
None None
}) })
.collect() .collect())
} }
pub async fn is_whitelisted(&self, actor_id: &Url) -> bool { pub(crate) async fn is_cached(&self, object_id: &Url) -> bool {
if !self.config.whitelist_mode() { self.object_cache.read().await.contains(object_id)
return true;
}
if let Some(domain) = actor_id.domain() {
return self.whitelists.read().await.contains(domain);
}
false
} }
pub async fn is_blocked(&self, actor_id: &Url) -> bool { pub(crate) async fn cache(&self, object_id: Url, actor_id: Url) {
if let Some(domain) = actor_id.domain() { self.object_cache.write().await.put(object_id, actor_id);
return self.blocks.read().await.contains(domain);
}
true
} }
pub async fn is_listener(&self, actor_id: &Url) -> bool { pub(crate) async fn build(config: Config, db: Db) -> Result<Self, MyError> {
self.listeners.read().await.contains(actor_id) let private_key = if let Ok(Some(key)) = db.private_key().await {
} key
} else {
info!("Generating new keys");
let key = web::block(move || {
let mut rng = thread_rng();
RSAPrivateKey::new(&mut rng, 4096)
})
.await?;
pub async fn is_cached(&self, object_id: &Url) -> bool { db.update_private_key(&key).await?;
self.actor_id_cache.read().await.contains(object_id)
}
pub async fn cache(&self, object_id: Url, actor_id: Url) { key
self.actor_id_cache.write().await.put(object_id, actor_id);
}
pub async fn cache_block(&self, host: String) {
self.blocks.write().await.insert(host);
}
pub async fn cache_whitelist(&self, host: String) {
self.whitelists.write().await.insert(host);
}
pub async fn cache_listener(&self, listener: Url) {
self.listeners.write().await.insert(listener);
}
pub async fn rehydrate(&self, db: &Db) -> Result<(), MyError> {
let f1 = db.hydrate_blocks();
let f2 = db.hydrate_whitelists();
let f3 = db.hydrate_listeners();
let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?;
join!(
async move {
*self.listeners.write().await = listeners;
},
async move {
*self.whitelists.write().await = whitelists;
},
async move {
*self.blocks.write().await = blocks;
}
);
Ok(())
}
pub async fn hydrate(config: Config, db: &Db) -> Result<Self, MyError> {
let f1 = db.hydrate_blocks();
let f2 = db.hydrate_whitelists();
let f3 = db.hydrate_listeners();
let f4 = async move {
if let Ok(Some(key)) = db.hydrate_private_key().await {
Ok(key)
} else {
info!("Generating new keys");
let key = web::block(move || {
let mut rng = thread_rng();
RSAPrivateKey::new(&mut rng, 4096)
})
.await?;
db.update_private_key(&key).await?;
Ok(key)
}
}; };
let (blocks, whitelists, listeners, private_key) = try_join!(f1, f2, f3, f4)?;
let public_key = private_key.to_public_key(); let public_key = private_key.to_public_key();
let listeners = Arc::new(RwLock::new(listeners));
let state = State { let state = State {
public_key, public_key,
private_key, private_key,
config, config,
actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))), object_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))),
blocks: Arc::new(RwLock::new(blocks)), node_cache: NodeCache::new(db.clone()),
whitelists: Arc::new(RwLock::new(whitelists)),
listeners: listeners.clone(),
node_cache: NodeCache::new(db.clone(), listeners),
breakers: Breakers::default(), breakers: Breakers::default(),
db,
}; };
state.spawn_rehydrate(db.clone());
Ok(state) Ok(state)
} }
fn spawn_rehydrate(&self, db: Db) {
let state = self.clone();
spawn(async move {
let start = Instant::now();
let duration = Duration::from_secs(60 * 10);
let mut interval = interval_at(start, duration);
loop {
interval.tick().await;
if let Err(e) = state.rehydrate(&db).await {
error!("Error rehydrating, {}", e);
}
}
});
}
} }

863
src/db.rs
View file

@ -1,294 +1,619 @@
use crate::error::MyError; use crate::{config::Config, error::MyError};
use activitystreams::url::Url; use activitystreams::url::Url;
use deadpool_postgres::{Manager, Pool}; use actix_web::web::Bytes;
use log::{info, warn};
use rsa::RSAPrivateKey; use rsa::RSAPrivateKey;
use rsa_pem::KeyExt; use rsa_pem::KeyExt;
use std::collections::HashSet; use sled::Tree;
use tokio_postgres::{ use std::{collections::HashMap, sync::Arc, time::SystemTime};
error::{Error, SqlState}, use uuid::Uuid;
row::Row,
Client, Config, NoTls,
};
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {
pool: Pool, inner: Arc<Inner>,
}
struct Inner {
actor_id_actor: Tree,
public_key_id_actor_id: Tree,
connected_actor_ids: Tree,
allowed_domains: Tree,
blocked_domains: Tree,
settings: Tree,
media_url_media_id: Tree,
media_id_media_url: Tree,
media_id_media_bytes: Tree,
media_id_media_meta: Tree,
actor_id_info: Tree,
actor_id_instance: Tree,
actor_id_contact: Tree,
restricted_mode: bool,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Actor {
pub(crate) id: Url,
pub(crate) public_key: String,
pub(crate) public_key_id: Url,
pub(crate) inbox: Url,
pub(crate) saved_at: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MediaMeta {
pub(crate) media_type: String,
pub(crate) saved_at: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Info {
pub(crate) software: String,
pub(crate) version: String,
pub(crate) reg: bool,
pub(crate) updated: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Instance {
pub(crate) title: String,
pub(crate) description: String,
pub(crate) version: String,
pub(crate) reg: bool,
pub(crate) requires_approval: bool,
pub(crate) updated: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Contact {
pub(crate) username: String,
pub(crate) display_name: String,
pub(crate) url: Url,
pub(crate) avatar: Url,
pub(crate) updated: SystemTime,
}
impl Inner {
fn connected_by_domain(&self, domains: &[String]) -> impl DoubleEndedIterator<Item = Url> {
let reversed: Vec<_> = domains
.into_iter()
.map(|s| domain_key(s.as_str()))
.collect();
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(url_from_ivec)
.filter_map(move |url| {
let connected_domain = url.domain()?;
let connected_rdnn = domain_key(connected_domain);
for rdnn in &reversed {
if connected_rdnn.starts_with(rdnn) {
return Some(url);
}
}
None
})
}
fn blocks(&self) -> impl DoubleEndedIterator<Item = String> {
self.blocked_domains
.iter()
.values()
.filter_map(|res| res.ok())
.map(|s| String::from_utf8_lossy(&s).to_string())
}
fn connected(&self) -> impl DoubleEndedIterator<Item = Url> {
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(url_from_ivec)
}
fn connected_actors<'a>(&'a self) -> impl DoubleEndedIterator<Item = Actor> + 'a {
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(move |actor_id| {
let actor_ivec = self.actor_id_actor.get(actor_id).ok()??;
serde_json::from_slice::<Actor>(&actor_ivec).ok()
})
}
fn connected_info<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Info)> + 'a {
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(move |actor_id_ivec| {
let actor_id = url_from_ivec(actor_id_ivec.clone())?;
let ivec = self.actor_id_info.get(actor_id_ivec).ok()??;
let info = serde_json::from_slice(&ivec).ok()?;
Some((actor_id, info))
})
}
fn connected_instance<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Instance)> + 'a {
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(move |actor_id_ivec| {
let actor_id = url_from_ivec(actor_id_ivec.clone())?;
let ivec = self.actor_id_instance.get(actor_id_ivec).ok()??;
let instance = serde_json::from_slice(&ivec).ok()?;
Some((actor_id, instance))
})
}
fn connected_contact<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Contact)> + 'a {
self.connected_actor_ids
.iter()
.values()
.filter_map(|res| res.ok())
.filter_map(move |actor_id_ivec| {
let actor_id = url_from_ivec(actor_id_ivec.clone())?;
let ivec = self.actor_id_contact.get(actor_id_ivec).ok()??;
let contact = serde_json::from_slice(&ivec).ok()?;
Some((actor_id, contact))
})
}
fn is_allowed(&self, domain: &str) -> bool {
let prefix = domain_prefix(domain);
if self.restricted_mode {
self.allowed_domains
.scan_prefix(prefix)
.keys()
.filter_map(|res| res.ok())
.any(|rdnn| domain.starts_with(String::from_utf8_lossy(&rdnn).as_ref()))
} else {
!self
.blocked_domains
.scan_prefix(prefix)
.keys()
.filter_map(|res| res.ok())
.any(|rdnn| domain.starts_with(String::from_utf8_lossy(&rdnn).as_ref()))
}
}
} }
impl Db { impl Db {
pub fn build(config: &crate::config::Config) -> Result<Self, MyError> { pub(crate) fn build(config: &Config) -> Result<Self, MyError> {
let max_conns = config.max_connections(); let db = sled::open(config.sled_path())?;
let config: Config = config.database_url().parse()?; let restricted_mode = config.restricted_mode();
let manager = Manager::new(config, NoTls);
Ok(Db { Ok(Db {
pool: Pool::new(manager, max_conns), inner: Arc::new(Inner {
actor_id_actor: db.open_tree("actor-id-actor")?,
public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?,
connected_actor_ids: db.open_tree("connected-actor-ids")?,
allowed_domains: db.open_tree("allowed-actor-ids")?,
blocked_domains: db.open_tree("blocked-actor-ids")?,
settings: db.open_tree("settings")?,
media_url_media_id: db.open_tree("media-url-media-id")?,
media_id_media_url: db.open_tree("media-id-media-url")?,
media_id_media_bytes: db.open_tree("media-id-media-bytes")?,
media_id_media_meta: db.open_tree("media-id-media-meta")?,
actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?,
restricted_mode,
}),
}) })
} }
pub fn pool(&self) -> &Pool { async fn unblock<T>(
&self.pool &self,
f: impl Fn(&Inner) -> Result<T, MyError> + Send + 'static,
) -> Result<T, MyError>
where
T: Send + 'static,
{
let inner = self.inner.clone();
let t = actix_web::web::block(move || (f)(&inner)).await?;
Ok(t)
} }
pub async fn remove_listener(&self, inbox: Url) -> Result<(), MyError> { pub(crate) async fn connected_ids(&self) -> Result<Vec<Url>, MyError> {
info!("DELETE FROM listeners WHERE actor_id = {};", inbox.as_str()); self.unblock(|inner| Ok(inner.connected().collect())).await
self.pool
.get()
.await?
.execute(
"DELETE FROM listeners WHERE actor_id = $1::TEXT;",
&[&inbox.as_str()],
)
.await?;
Ok(())
} }
pub async fn add_listener(&self, inbox: Url) -> Result<(), MyError> { pub(crate) async fn save_info(&self, actor_id: Url, info: Info) -> Result<(), MyError> {
info!( self.unblock(move |inner| {
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]", let vec = serde_json::to_vec(&info)?;
inbox.as_str(),
);
self.pool
.get()
.await?
.execute(
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');",
&[&inbox.as_str()],
)
.await?;
Ok(()) inner
.actor_id_info
.insert(actor_id.as_str().as_bytes(), vec)?;
Ok(())
})
.await
} }
pub async fn add_blocks(&self, domains: &[String]) -> Result<(), MyError> { pub(crate) async fn info(&self, actor_id: Url) -> Result<Option<Info>, MyError> {
let conn = self.pool.get().await?; self.unblock(move |inner| {
for domain in domains { if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? {
match add_block(&conn, domain.as_str()).await { let info = serde_json::from_slice(&ivec)?;
Err(e) if e.code() != Some(&SqlState::UNIQUE_VIOLATION) => { Ok(Some(info))
return Err(e.into()); } else {
} Ok(None)
_ => (),
};
}
Ok(())
}
pub async fn remove_blocks(&self, domains: &[String]) -> Result<(), MyError> {
let conn = self.pool.get().await?;
for domain in domains {
remove_block(&conn, domain.as_str()).await?
}
Ok(())
}
pub async fn add_whitelists(&self, domains: &[String]) -> Result<(), MyError> {
let conn = self.pool.get().await?;
for domain in domains {
match add_whitelist(&conn, domain.as_str()).await {
Err(e) if e.code() != Some(&SqlState::UNIQUE_VIOLATION) => {
return Err(e.into());
}
_ => (),
};
}
Ok(())
}
pub async fn remove_whitelists(&self, domains: &[String]) -> Result<(), MyError> {
let conn = self.pool.get().await?;
for domain in domains {
remove_whitelist(&conn, domain.as_str()).await?
}
Ok(())
}
pub async fn hydrate_blocks(&self) -> Result<HashSet<String>, MyError> {
info!("SELECT domain_name FROM blocks");
let rows = self
.pool
.get()
.await?
.query("SELECT domain_name FROM blocks", &[])
.await?;
parse_rows(rows)
}
pub async fn hydrate_whitelists(&self) -> Result<HashSet<String>, MyError> {
info!("SELECT domain_name FROM whitelists");
let rows = self
.pool
.get()
.await?
.query("SELECT domain_name FROM whitelists", &[])
.await?;
parse_rows(rows)
}
pub async fn hydrate_listeners(&self) -> Result<HashSet<Url>, MyError> {
info!("SELECT actor_id FROM listeners");
let rows = self
.pool
.get()
.await?
.query("SELECT actor_id FROM listeners", &[])
.await?;
parse_rows(rows)
}
pub async fn hydrate_private_key(&self) -> Result<Option<RSAPrivateKey>, MyError> {
info!("SELECT value FROM settings WHERE key = 'private_key'");
let rows = self
.pool
.get()
.await?
.query("SELECT value FROM settings WHERE key = 'private_key'", &[])
.await?;
if let Some(row) = rows.into_iter().next() {
let key_str: String = row.get(0);
// precomputation happens when constructing a private key, so it should be on the
// threadpool
let key = actix_web::web::block(move || KeyExt::from_pem_pkcs8(&key_str)).await?;
return Ok(Some(key));
}
Ok(None)
}
pub async fn update_private_key(&self, private_key: &RSAPrivateKey) -> Result<(), MyError> {
let pem_pkcs8 = private_key.to_pem_pkcs8()?;
info!(
"INSERT INTO settings (key, value, created_at)
VALUES ('private_key', $1::TEXT, 'now')
ON CONFLICT (key)
DO UPDATE
SET value = $1::TEXT;"
);
self.pool
.get()
.await?
.execute(
"INSERT INTO settings (key, value, created_at)
VALUES ('private_key', $1::TEXT, 'now')
ON CONFLICT (key)
DO UPDATE
SET value = $1::TEXT;",
&[&pem_pkcs8],
)
.await?;
Ok(())
}
}
pub async fn listen(client: &Client) -> Result<(), Error> {
info!("LISTEN new_blocks, new_whitelists, new_listeners, new_actors, rm_blocks, rm_whitelists, rm_listeners, rm_actors");
client
.batch_execute(
"LISTEN new_blocks;
LISTEN new_whitelists;
LISTEN new_listeners;
LISTEN new_actors;
LISTEN new_nodes;
LISTEN rm_blocks;
LISTEN rm_whitelists;
LISTEN rm_listeners;
LISTEN rm_actors;
LISTEN rm_nodes",
)
.await?;
Ok(())
}
async fn add_block(client: &Client, domain: &str) -> Result<(), Error> {
info!(
"INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]",
domain,
);
client
.execute(
"INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now');",
&[&domain],
)
.await?;
Ok(())
}
async fn remove_block(client: &Client, domain: &str) -> Result<(), Error> {
info!(
"DELETE FROM blocks WHERE domain_name = $1::TEXT; [{}]",
domain,
);
client
.execute(
"DELETE FROM blocks WHERE domain_name = $1::TEXT;",
&[&domain],
)
.await?;
Ok(())
}
async fn add_whitelist(client: &Client, domain: &str) -> Result<(), Error> {
info!(
"INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]",
domain,
);
client
.execute(
"INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now');",
&[&domain],
)
.await?;
Ok(())
}
async fn remove_whitelist(client: &Client, domain: &str) -> Result<(), Error> {
info!(
"DELETE FROM whitelists WHERE domain_name = $1::TEXT; [{}]",
domain,
);
client
.execute(
"DELETE FROM whitelists WHERE domain_name = $1::TEXT;",
&[&domain],
)
.await?;
Ok(())
}
fn parse_rows<T, E>(rows: Vec<Row>) -> Result<HashSet<T>, MyError>
where
T: std::str::FromStr<Err = E> + Eq + std::hash::Hash,
E: std::fmt::Display,
{
let hs = rows
.into_iter()
.filter_map(move |row| match row.try_get::<_, String>(0) {
Ok(s) => match s.parse() {
Ok(t) => Some(t),
Err(e) => {
warn!("Couln't parse row, '{}', {}", s, e);
None
}
},
Err(e) => {
warn!("Couldn't get column, {}", e);
None
} }
}) })
.collect(); .await
}
Ok(hs) pub(crate) async fn connected_info(&self) -> Result<HashMap<Url, Info>, MyError> {
self.unblock(|inner| Ok(inner.connected_info().collect()))
.await
}
pub(crate) async fn save_instance(
&self,
actor_id: Url,
instance: Instance,
) -> Result<(), MyError> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&instance)?;
inner
.actor_id_instance
.insert(actor_id.as_str().as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn instance(&self, actor_id: Url) -> Result<Option<Instance>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? {
let instance = serde_json::from_slice(&ivec)?;
Ok(Some(instance))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn connected_instance(&self) -> Result<HashMap<Url, Instance>, MyError> {
self.unblock(|inner| Ok(inner.connected_instance().collect()))
.await
}
pub(crate) async fn save_contact(
&self,
actor_id: Url,
contact: Contact,
) -> Result<(), MyError> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&contact)?;
inner
.actor_id_contact
.insert(actor_id.as_str().as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn contact(&self, actor_id: Url) -> Result<Option<Contact>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? {
let contact = serde_json::from_slice(&ivec)?;
Ok(Some(contact))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn connected_contact(&self) -> Result<HashMap<Url, Contact>, MyError> {
self.unblock(|inner| Ok(inner.connected_contact().collect()))
.await
}
pub(crate) async fn save_url(&self, url: Url, id: Uuid) -> Result<(), MyError> {
self.unblock(move |inner| {
inner
.media_id_media_url
.insert(id.as_bytes(), url.as_str().as_bytes())?;
inner
.media_url_media_id
.insert(url.as_str().as_bytes(), id.as_bytes())?;
Ok(())
})
.await
}
pub(crate) async fn save_bytes(
&self,
id: Uuid,
meta: MediaMeta,
bytes: Bytes,
) -> Result<(), MyError> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&meta)?;
inner
.media_id_media_bytes
.insert(id.as_bytes(), bytes.as_ref())?;
inner.media_id_media_meta.insert(id.as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn media_id(&self, url: Url) -> Result<Option<Uuid>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
Ok(uuid_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<Url>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? {
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_bytes(&self, id: Uuid) -> Result<Option<Bytes>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? {
Ok(Some(Bytes::copy_from_slice(&ivec)))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_meta(&self, id: Uuid) -> Result<Option<MediaMeta>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? {
let meta = serde_json::from_slice(&ivec)?;
Ok(Some(meta))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn blocks(&self) -> Result<Vec<String>, MyError> {
self.unblock(|inner| Ok(inner.blocks().collect())).await
}
pub(crate) async fn inboxes(&self) -> Result<Vec<Url>, MyError> {
self.unblock(|inner| Ok(inner.connected_actors().map(|actor| actor.inbox).collect()))
.await
}
pub(crate) async fn is_connected(&self, id: Url) -> Result<bool, MyError> {
self.unblock(move |inner| {
let connected = inner
.connected_actor_ids
.contains_key(id.as_str().as_bytes())?;
Ok(connected)
})
.await
}
pub(crate) async fn actor_id_from_public_key_id(
&self,
public_key_id: Url,
) -> Result<Option<Url>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner
.public_key_id_actor_id
.get(public_key_id.as_str().as_bytes())?
{
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn actor(&self, actor_id: Url) -> Result<Option<Actor>, MyError> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? {
let actor = serde_json::from_slice(&ivec)?;
Ok(Some(actor))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn save_actor(&self, actor: Actor) -> Result<(), MyError> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&actor)?;
inner.public_key_id_actor_id.insert(
actor.public_key_id.as_str().as_bytes(),
actor.id.as_str().as_bytes(),
)?;
inner
.actor_id_actor
.insert(actor.id.as_str().as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn remove_listener(&self, actor_id: Url) -> Result<(), MyError> {
self.unblock(move |inner| {
inner
.connected_actor_ids
.remove(actor_id.as_str().as_bytes())?;
Ok(())
})
.await
}
pub(crate) async fn add_listener(&self, actor_id: Url) -> Result<(), MyError> {
self.unblock(move |inner| {
inner
.connected_actor_ids
.insert(actor_id.as_str().as_bytes(), actor_id.as_str().as_bytes())?;
Ok(())
})
.await
}
pub(crate) async fn add_blocks(&self, domains: Vec<String>) -> Result<(), MyError> {
self.unblock(move |inner| {
for connected in inner.connected_by_domain(&domains) {
inner
.connected_actor_ids
.remove(connected.as_str().as_bytes())?;
}
for domain in &domains {
inner
.blocked_domains
.insert(domain_key(domain), domain.as_bytes())?;
inner.allowed_domains.remove(domain_key(domain))?;
}
Ok(())
})
.await
}
pub(crate) async fn remove_blocks(&self, domains: Vec<String>) -> Result<(), MyError> {
self.unblock(move |inner| {
for domain in &domains {
inner.blocked_domains.remove(domain_key(domain))?;
}
Ok(())
})
.await
}
pub(crate) async fn add_allows(&self, domains: Vec<String>) -> Result<(), MyError> {
self.unblock(move |inner| {
for domain in &domains {
inner
.allowed_domains
.insert(domain_key(domain), domain.as_bytes())?;
}
Ok(())
})
.await
}
pub(crate) async fn remove_allows(&self, domains: Vec<String>) -> Result<(), MyError> {
self.unblock(move |inner| {
if inner.restricted_mode {
for connected in inner.connected_by_domain(&domains) {
inner
.connected_actor_ids
.remove(connected.as_str().as_bytes())?;
}
}
for domain in &domains {
inner.allowed_domains.remove(domain_key(domain))?;
}
Ok(())
})
.await
}
pub(crate) async fn is_allowed(&self, url: Url) -> Result<bool, MyError> {
self.unblock(move |inner| {
if let Some(domain) = url.domain() {
Ok(inner.is_allowed(domain))
} else {
Ok(false)
}
})
.await
}
pub(crate) async fn private_key(&self) -> Result<Option<RSAPrivateKey>, MyError> {
self.unblock(|inner| {
if let Some(ivec) = inner.settings.get("private-key")? {
let key_str = String::from_utf8_lossy(&ivec);
let key = RSAPrivateKey::from_pem_pkcs8(&key_str)?;
Ok(Some(key))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn update_private_key(
&self,
private_key: &RSAPrivateKey,
) -> Result<(), MyError> {
let pem_pkcs8 = private_key.to_pem_pkcs8()?;
self.unblock(move |inner| {
inner
.settings
.insert("private-key".as_bytes(), pem_pkcs8.as_bytes())?;
Ok(())
})
.await
}
}
fn domain_key(domain: &str) -> String {
domain.split('.').rev().collect::<Vec<_>>().join(".") + "."
}
fn domain_prefix(domain: &str) -> String {
domain
.split('.')
.rev()
.take(2)
.collect::<Vec<_>>()
.join(".")
+ "."
}
fn url_from_ivec(ivec: sled::IVec) -> Option<Url> {
String::from_utf8_lossy(&ivec).parse::<Url>().ok()
}
fn uuid_from_ivec(ivec: sled::IVec) -> Option<Uuid> {
Uuid::from_slice(&ivec).ok()
} }

View file

@ -4,7 +4,6 @@ use actix_web::{
http::StatusCode, http::StatusCode,
HttpResponse, HttpResponse,
}; };
use deadpool::managed::{PoolError, TimeoutType};
use http_signature_normalization_actix::PrepareSignError; use http_signature_normalization_actix::PrepareSignError;
use log::error; use log::error;
use rsa_pem::KeyError; use rsa_pem::KeyError;
@ -18,9 +17,6 @@ pub enum MyError {
#[error("Error in configuration, {0}")] #[error("Error in configuration, {0}")]
Config(#[from] config::ConfigError), Config(#[from] config::ConfigError),
#[error("Error in db, {0}")]
DbError(#[from] tokio_postgres::error::Error),
#[error("Couldn't parse key, {0}")] #[error("Couldn't parse key, {0}")]
Key(#[from] KeyError), Key(#[from] KeyError),
@ -33,6 +29,9 @@ pub enum MyError {
#[error("Couldn't sign string, {0}")] #[error("Couldn't sign string, {0}")]
Rsa(rsa::errors::Error), Rsa(rsa::errors::Error),
#[error("Couldn't use db, {0}")]
Sled(#[from] sled::Error),
#[error("Couldn't do the json thing, {0}")] #[error("Couldn't do the json thing, {0}")]
Json(#[from] serde_json::Error), Json(#[from] serde_json::Error),
@ -48,11 +47,8 @@ pub enum MyError {
#[error("Actor ({0}), or Actor's server, is not subscribed")] #[error("Actor ({0}), or Actor's server, is not subscribed")]
NotSubscribed(String), NotSubscribed(String),
#[error("Actor is blocked, {0}")] #[error("Actor is not allowed, {0}")]
Blocked(String), NotAllowed(String),
#[error("Actor is not whitelisted, {0}")]
Whitelist(String),
#[error("Cannot make decisions for foreign actor, {0}")] #[error("Cannot make decisions for foreign actor, {0}")]
WrongActor(String), WrongActor(String),
@ -78,9 +74,6 @@ pub enum MyError {
#[error("Couldn't flush buffer")] #[error("Couldn't flush buffer")]
FlushBuffer, FlushBuffer,
#[error("Timed out while waiting on db pool, {0:?}")]
DbTimeout(TimeoutType),
#[error("Invalid algorithm provided to verifier, {0}")] #[error("Invalid algorithm provided to verifier, {0}")]
Algorithm(String), Algorithm(String),
@ -127,10 +120,9 @@ pub enum MyError {
impl ResponseError for MyError { impl ResponseError for MyError {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match self { match self {
MyError::Blocked(_) MyError::NotAllowed(_) | MyError::WrongActor(_) | MyError::BadActor(_, _) => {
| MyError::Whitelist(_) StatusCode::FORBIDDEN
| MyError::WrongActor(_) }
| MyError::BadActor(_, _) => StatusCode::FORBIDDEN,
MyError::NotSubscribed(_) => StatusCode::UNAUTHORIZED, MyError::NotSubscribed(_) => StatusCode::UNAUTHORIZED,
MyError::Duplicate => StatusCode::ACCEPTED, MyError::Duplicate => StatusCode::ACCEPTED,
MyError::Kind(_) | MyError::MissingKind | MyError::MissingId | MyError::ObjectCount => { MyError::Kind(_) | MyError::MissingKind | MyError::MissingId | MyError::ObjectCount => {
@ -161,18 +153,6 @@ where
} }
} }
impl<T> From<PoolError<T>> for MyError
where
T: Into<MyError>,
{
fn from(e: PoolError<T>) -> Self {
match e {
PoolError::Backend(e) => e.into(),
PoolError::Timeout(t) => MyError::DbTimeout(t),
}
}
}
impl From<Infallible> for MyError { impl From<Infallible> for MyError {
fn from(i: Infallible) -> Self { fn from(i: Infallible) -> Self {
match i {} match i {}

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::Actor, db::Actor,
error::MyError, error::MyError,
jobs::{ jobs::{
apub::{get_inboxes, prepare_activity}, apub::{get_inboxes, prepare_activity},

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
config::{Config, UrlKind}, config::{Config, UrlKind},
data::Actor, db::Actor,
error::MyError, error::MyError,
jobs::{apub::prepare_activity, Deliver, JobState}, jobs::{apub::prepare_activity, Deliver, JobState},
}; };
@ -36,14 +36,16 @@ impl Follow {
let my_id = state.config.generate_url(UrlKind::Actor); let my_id = state.config.generate_url(UrlKind::Actor);
// if following relay directly, not just following 'public', followback // if following relay directly, not just following 'public', followback
if self.input.object_is(&my_id) && !state.actors.is_following(&self.actor.id).await { if self.input.object_is(&my_id)
&& !state.state.db.is_connected(self.actor.id.clone()).await?
{
let follow = generate_follow(&state.config, &self.actor.id, &my_id)?; let follow = generate_follow(&state.config, &self.actor.id, &my_id)?;
state state
.job_server .job_server
.queue(Deliver::new(self.actor.inbox.clone(), follow)?)?; .queue(Deliver::new(self.actor.inbox.clone(), follow)?)?;
} }
state.actors.follower(&self.actor).await?; state.actors.follower(self.actor.clone()).await?;
let accept = generate_accept_follow( let accept = generate_accept_follow(
&state.config, &state.config,

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
data::Actor, db::Actor,
error::MyError, error::MyError,
jobs::{apub::get_inboxes, DeliverMany, JobState}, jobs::{apub::get_inboxes, DeliverMany, JobState},
}; };

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::{Actor, State}, data::State,
db::Actor,
error::MyError, error::MyError,
}; };
use activitystreams::{ use activitystreams::{
@ -23,7 +24,7 @@ pub use self::{announce::Announce, follow::Follow, forward::Forward, reject::Rej
async fn get_inboxes(state: &State, actor: &Actor, object_id: &Url) -> Result<Vec<Url>, MyError> { async fn get_inboxes(state: &State, actor: &Actor, object_id: &Url) -> Result<Vec<Url>, MyError> {
let domain = object_id.host().ok_or(MyError::Domain)?.to_string(); let domain = object_id.host().ok_or(MyError::Domain)?.to_string();
Ok(state.listeners_without(&actor.inbox, &domain).await) state.inboxes_without(&actor.inbox, &domain).await
} }
fn prepare_activity<T, U, V, Kind>( fn prepare_activity<T, U, V, Kind>(

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
config::UrlKind, config::UrlKind,
data::Actor, db::Actor,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -11,9 +11,7 @@ pub struct Reject(pub Actor);
impl Reject { impl Reject {
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
if state.actors.unfollower(&self.0).await?.is_some() { state.actors.unfollower(&self.0).await?;
state.db.remove_listener(self.0.inbox.clone()).await?;
}
let my_id = state.config.generate_url(UrlKind::Actor); let my_id = state.config.generate_url(UrlKind::Actor);
let undo = generate_undo_follow(&state.config, &self.0.id, &my_id)?; let undo = generate_undo_follow(&state.config, &self.0.id, &my_id)?;

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
config::UrlKind, config::UrlKind,
data::Actor, db::Actor,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -19,11 +19,9 @@ impl Undo {
} }
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
let was_following = state.actors.is_following(&self.actor.id).await; let was_following = state.state.db.is_connected(self.actor.id.clone()).await?;
if state.actors.unfollower(&self.actor).await?.is_some() { state.actors.unfollower(&self.actor).await?;
state.db.remove_listener(self.actor.inbox.clone()).await?;
}
if was_following { if was_following {
let my_id = state.config.generate_url(UrlKind::Actor); let my_id = state.config.generate_url(UrlKind::Actor);

View file

@ -15,7 +15,7 @@ impl CacheMedia {
} }
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
if state.media.get_bytes(self.uuid).await.is_some() { if !state.media.is_outdated(self.uuid).await? {
return Ok(()); return Ok(());
} }
@ -25,7 +25,7 @@ impl CacheMedia {
state state
.media .media
.store_bytes(self.uuid, content_type, bytes) .store_bytes(self.uuid, content_type, bytes)
.await; .await?;
} }
Ok(()) Ok(())

View file

@ -5,32 +5,35 @@ use crate::{
use activitystreams::url::Url; use activitystreams::url::Url;
use anyhow::Error; use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use futures::join;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryInstance { pub struct QueryInstance {
listener: Url, actor_id: Url,
} }
impl QueryInstance { impl QueryInstance {
pub fn new(listener: Url) -> Self { pub fn new(actor_id: Url) -> Self {
QueryInstance { QueryInstance {
listener: listener.into(), actor_id: actor_id.into(),
} }
} }
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
let (o1, o2) = join!( let contact_outdated = state
state.node_cache.is_contact_outdated(&self.listener), .node_cache
state.node_cache.is_instance_outdated(&self.listener), .is_contact_outdated(self.actor_id.clone())
); .await;
let instance_outdated = state
.node_cache
.is_instance_outdated(self.actor_id.clone())
.await;
if !(o1 || o2) { if !(contact_outdated || instance_outdated) {
return Ok(()); return Ok(());
} }
let mut instance_uri = self.listener.clone(); let mut instance_uri = self.actor_id.clone();
instance_uri.set_fragment(None); instance_uri.set_fragment(None);
instance_uri.set_query(None); instance_uri.set_query(None);
instance_uri.set_path("api/v1/instance"); instance_uri.set_path("api/v1/instance");
@ -47,11 +50,11 @@ impl QueryInstance {
}; };
if let Some(mut contact) = instance.contact { if let Some(mut contact) = instance.contact {
let uuid = if let Some(uuid) = state.media.get_uuid(&contact.avatar).await? { let uuid = if let Some(uuid) = state.media.get_uuid(contact.avatar.clone()).await? {
contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into();
uuid uuid
} else { } else {
let uuid = state.media.store_url(&contact.avatar).await?; let uuid = state.media.store_url(contact.avatar.clone()).await?;
contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into();
uuid uuid
}; };
@ -61,7 +64,7 @@ impl QueryInstance {
state state
.node_cache .node_cache
.set_contact( .set_contact(
&self.listener, self.actor_id.clone(),
contact.username, contact.username,
contact.display_name, contact.display_name,
contact.url, contact.url,
@ -75,7 +78,7 @@ impl QueryInstance {
state state
.node_cache .node_cache
.set_instance( .set_instance(
&self.listener, self.actor_id.clone(),
instance.title, instance.title,
description, description,
instance.version, instance.version,

View file

@ -13,7 +13,7 @@ pub use self::{
use crate::{ use crate::{
config::Config, config::Config,
data::{ActorCache, Media, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
db::Db, db::Db,
error::MyError, error::MyError,
jobs::process_listeners::Listeners, jobs::process_listeners::Listeners,
@ -35,7 +35,7 @@ pub fn create_workers(
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer, job_server: JobServer,
media: Media, media: MediaCache,
config: Config, config: Config,
) { ) {
let remote_handle = job_server.remote.clone(); let remote_handle = job_server.remote.clone();
@ -72,7 +72,7 @@ pub struct JobState {
state: State, state: State,
actors: ActorCache, actors: ActorCache,
config: Config, config: Config,
media: Media, media: MediaCache,
node_cache: NodeCache, node_cache: NodeCache,
job_server: JobServer, job_server: JobServer,
} }
@ -88,7 +88,7 @@ impl JobState {
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer, job_server: JobServer,
media: Media, media: MediaCache,
config: Config, config: Config,
) -> Self { ) -> Self {
JobState { JobState {

View file

@ -6,20 +6,24 @@ use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryNodeinfo { pub struct QueryNodeinfo {
listener: Url, actor_id: Url,
} }
impl QueryNodeinfo { impl QueryNodeinfo {
pub fn new(listener: Url) -> Self { pub fn new(actor_id: Url) -> Self {
QueryNodeinfo { listener } QueryNodeinfo { actor_id }
} }
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
if !state.node_cache.is_nodeinfo_outdated(&self.listener).await { if !state
.node_cache
.is_nodeinfo_outdated(self.actor_id.clone())
.await
{
return Ok(()); return Ok(());
} }
let mut well_known_uri = self.listener.clone(); let mut well_known_uri = self.actor_id.clone();
well_known_uri.set_fragment(None); well_known_uri.set_fragment(None);
well_known_uri.set_query(None); well_known_uri.set_query(None);
well_known_uri.set_path(".well-known/nodeinfo"); well_known_uri.set_path(".well-known/nodeinfo");
@ -40,7 +44,7 @@ impl QueryNodeinfo {
state state
.node_cache .node_cache
.set_info( .set_info(
&self.listener, self.actor_id.clone(),
nodeinfo.software.name, nodeinfo.software.name,
nodeinfo.software.version, nodeinfo.software.version,
nodeinfo.open_registrations, nodeinfo.open_registrations,

View file

@ -8,11 +8,11 @@ pub struct Listeners;
impl Listeners { impl Listeners {
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
for listener in state.state.listeners().await { for actor_id in state.state.db.connected_ids().await? {
state state
.job_server .job_server
.queue(QueryInstance::new(listener.clone()))?; .queue(QueryInstance::new(actor_id.clone()))?;
state.job_server.queue(QueryNodeinfo::new(listener))?; state.job_server.queue(QueryNodeinfo::new(actor_id))?;
} }
Ok(()) Ok(())

View file

@ -1,4 +1,3 @@
use actix_rt::Arbiter;
use actix_web::{ use actix_web::{
middleware::{Compress, Logger}, middleware::{Compress, Logger},
web, App, HttpServer, web, App, HttpServer,
@ -12,14 +11,13 @@ mod db;
mod error; mod error;
mod jobs; mod jobs;
mod middleware; mod middleware;
mod notify;
mod requests; mod requests;
mod routes; mod routes;
use self::{ use self::{
args::Args, args::Args,
config::Config, config::Config,
data::{ActorCache, Media, State}, data::{ActorCache, MediaCache, State},
db::Db, db::Db,
jobs::{create_server, create_workers}, jobs::{create_server, create_workers},
middleware::{DebugPayload, RelayResolver}, middleware::{DebugPayload, RelayResolver},
@ -35,7 +33,7 @@ async fn main() -> Result<(), anyhow::Error> {
if config.debug() { if config.debug() {
std::env::set_var( std::env::set_var(
"RUST_LOG", "RUST_LOG",
"debug,tokio_postgres=info,h2=info,trust_dns_resolver=info,trust_dns_proto=info,rustls=info,html5ever=info", "debug,h2=info,trust_dns_resolver=info,trust_dns_proto=info,rustls=info,html5ever=info",
) )
} else { } else {
std::env::set_var("RUST_LOG", "info") std::env::set_var("RUST_LOG", "info")
@ -51,73 +49,33 @@ async fn main() -> Result<(), anyhow::Error> {
let args = Args::new(); let args = Args::new();
if args.jobs_only() && args.no_jobs() { if !args.blocks().is_empty() || !args.allowed().is_empty() {
return Err(anyhow::Error::msg(
"Either the server or the jobs must be run",
));
}
if !args.blocks().is_empty() || !args.whitelists().is_empty() {
if args.undo() { if args.undo() {
db.remove_blocks(args.blocks()).await?; db.remove_blocks(args.blocks().to_vec()).await?;
db.remove_whitelists(args.whitelists()).await?; db.remove_allows(args.allowed().to_vec()).await?;
} else { } else {
db.add_blocks(args.blocks()).await?; db.add_blocks(args.blocks().to_vec()).await?;
db.add_whitelists(args.whitelists()).await?; db.add_allows(args.allowed().to_vec()).await?;
} }
return Ok(()); return Ok(());
} }
let media = Media::new(db.clone()); let media = MediaCache::new(db.clone());
let state = State::hydrate(config.clone(), &db).await?; let state = State::build(config.clone(), db.clone()).await?;
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let job_server = create_server(); let job_server = create_server();
notify::Notifier::new(config.database_url().parse()?) create_workers(
.register(notify::NewBlocks(state.clone())) db.clone(),
.register(notify::NewWhitelists(state.clone())) state.clone(),
.register(notify::NewListeners(state.clone(), job_server.clone())) actors.clone(),
.register(notify::NewActors(actors.clone())) job_server.clone(),
.register(notify::NewNodes(state.node_cache())) media.clone(),
.register(notify::RmBlocks(state.clone())) config.clone(),
.register(notify::RmWhitelists(state.clone())) );
.register(notify::RmListeners(state.clone()))
.register(notify::RmActors(actors.clone()))
.register(notify::RmNodes(state.node_cache()))
.start();
if args.jobs_only() {
for _ in 0..num_cpus::get() {
let state = state.clone();
let actors = actors.clone();
let job_server = job_server.clone();
let media = media.clone();
let config = config.clone();
let db = db.clone();
Arbiter::new().exec_fn(move || {
create_workers(db, state, actors, job_server, media, config);
});
}
actix_rt::signal::ctrl_c().await?;
return Ok(());
}
let no_jobs = args.no_jobs();
let bind_address = config.bind_address(); let bind_address = config.bind_address();
HttpServer::new(move || { HttpServer::new(move || {
if !no_jobs {
create_workers(
db.clone(),
state.clone(),
actors.clone(),
job_server.clone(),
media.clone(),
config.clone(),
);
}
App::new() App::new()
.wrap(Compress::default()) .wrap(Compress::default())
.wrap(Logger::default()) .wrap(Logger::default())

View file

@ -1,11 +1,11 @@
use crate::{ use crate::{
apub::AcceptedActors,
data::{ActorCache, State}, data::{ActorCache, State},
error::MyError, error::MyError,
requests::Requests, requests::Requests,
}; };
use activitystreams::uri; use activitystreams::{base::BaseExt, uri, url::Url};
use actix_web::web; use actix_web::web;
use futures::join;
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use log::error; use log::error;
use rsa::{hash::Hash, padding::PaddingScheme, PublicKey, RSAPublicKey}; use rsa::{hash::Hash, padding::PaddingScheme, PublicKey, RSAPublicKey};
@ -24,47 +24,55 @@ impl MyVerify {
signature: String, signature: String,
signing_string: String, signing_string: String,
) -> Result<bool, MyError> { ) -> Result<bool, MyError> {
let mut uri = uri!(key_id); let public_key_id = uri!(key_id);
let (is_blocked, is_whitelisted) = let actor_id = if let Some(mut actor_id) = self
join!(self.2.is_blocked(&uri), self.2.is_whitelisted(&uri)); .2
.db
if is_blocked { .actor_id_from_public_key_id(public_key_id.clone())
return Err(MyError::Blocked(key_id)); .await?
} {
if !self.2.db.is_allowed(actor_id.clone()).await? {
if !is_whitelisted { return Err(MyError::NotAllowed(key_id));
return Err(MyError::Whitelist(key_id));
}
uri.set_fragment(None);
let actor = self.1.get(&uri, &self.0).await?;
let was_cached = actor.is_cached();
let actor = actor.into_inner();
match algorithm {
Some(Algorithm::Hs2019) => (),
Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (),
Some(other) => {
return Err(MyError::Algorithm(other.to_string()));
} }
None => (),
};
let res = do_verify(&actor.public_key, signature.clone(), signing_string.clone()).await; actor_id.set_fragment(None);
let actor = self.1.get(&actor_id, &self.0).await?;
let was_cached = actor.is_cached();
let actor = actor.into_inner();
if let Err(e) = res { match algorithm {
if !was_cached { Some(Algorithm::Hs2019) => (),
return Err(e); Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (),
Some(other) => {
return Err(MyError::Algorithm(other.to_string()));
}
None => (),
};
let res = do_verify(&actor.public_key, signature.clone(), signing_string.clone()).await;
if let Err(e) = res {
if !was_cached {
return Err(e);
}
} else {
return Ok(true);
} }
actor_id
} else { } else {
return Ok(true); self.0
} .fetch_json::<PublicKeyResponse>(public_key_id.as_str())
.await?
.actor_id()
.ok_or_else(|| MyError::MissingId)?
};
// Previously we verified the sig from an actor's local cache // Previously we verified the sig from an actor's local cache
// //
// Now we make sure we fetch an updated actor // Now we make sure we fetch an updated actor
let actor = self.1.get_no_cache(&uri, &self.0).await?; let actor = self.1.get_no_cache(&actor_id, &self.0).await?;
do_verify(&actor.public_key, signature, signing_string).await?; do_verify(&actor.public_key, signature, signing_string).await?;
@ -72,6 +80,29 @@ impl MyVerify {
} }
} }
#[derive(serde::Deserialize)]
#[serde(untagged)]
#[serde(rename_all = "camelCase")]
enum PublicKeyResponse {
PublicKey {
#[allow(dead_code)]
id: Url,
owner: Url,
#[allow(dead_code)]
public_key_pem: String,
},
Actor(AcceptedActors),
}
impl PublicKeyResponse {
fn actor_id(&self) -> Option<Url> {
match self {
PublicKeyResponse::PublicKey { owner, .. } => Some(owner.clone()),
PublicKeyResponse::Actor(actor) => actor.id_unchecked().map(|url| url.clone()),
}
}
}
async fn do_verify( async fn do_verify(
public_key: &str, public_key: &str,
signature: String, signature: String,

View file

@ -1,263 +0,0 @@
use crate::{
data::{ActorCache, NodeCache, State},
db::listen,
jobs::{JobServer, QueryInstance, QueryNodeinfo},
};
use activitystreams::url::Url;
use actix_rt::{spawn, time::delay_for};
use futures::stream::{poll_fn, StreamExt};
use log::{debug, error, warn};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio_postgres::{tls::NoTls, AsyncMessage, Config};
use uuid::Uuid;
pub trait Listener {
fn key(&self) -> &str;
fn execute(&self, payload: &str);
}
pub struct Notifier {
config: Config,
listeners: HashMap<String, Vec<Box<dyn Listener + Send + Sync + 'static>>>,
}
impl Notifier {
pub fn new(config: Config) -> Self {
Notifier {
config,
listeners: HashMap::new(),
}
}
pub fn register<L>(mut self, l: L) -> Self
where
L: Listener + Send + Sync + 'static,
{
let v = self
.listeners
.entry(l.key().to_owned())
.or_insert_with(Vec::new);
v.push(Box::new(l));
self
}
pub fn start(self) {
spawn(async move {
let Notifier { config, listeners } = self;
loop {
let (new_client, mut conn) = match config.connect(NoTls).await {
Ok((client, conn)) => (client, conn),
Err(e) => {
error!("Error establishing DB Connection, {}", e);
delay_for(Duration::new(5, 0)).await;
continue;
}
};
let client = Arc::new(new_client);
let new_client = client.clone();
spawn(async move {
if let Err(e) = listen(&new_client).await {
error!("Error listening for updates, {}", e);
}
});
let mut stream = poll_fn(move |cx| conn.poll_message(cx));
loop {
match stream.next().await {
Some(Ok(AsyncMessage::Notification(n))) => {
debug!("Handling Notification, {:?}", n);
if let Some(v) = listeners.get(n.channel()) {
for l in v {
l.execute(n.payload());
}
}
}
Some(Ok(AsyncMessage::Notice(e))) => {
debug!("Handling Notice, {:?}", e);
}
Some(Ok(_)) => {
debug!("Handling rest");
}
Some(Err(e)) => {
debug!("Breaking loop due to error Error, {:?}", e);
break;
}
None => {
debug!("End of stream, breaking loop");
break;
}
}
}
drop(client);
warn!("Restarting listener task");
}
});
}
}
pub struct NewBlocks(pub State);
pub struct NewWhitelists(pub State);
pub struct NewListeners(pub State, pub JobServer);
pub struct NewActors(pub ActorCache);
pub struct NewNodes(pub NodeCache);
pub struct RmBlocks(pub State);
pub struct RmWhitelists(pub State);
pub struct RmListeners(pub State);
pub struct RmActors(pub ActorCache);
pub struct RmNodes(pub NodeCache);
impl Listener for NewBlocks {
fn key(&self) -> &str {
"new_blocks"
}
fn execute(&self, payload: &str) {
debug!("Caching block of {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
spawn(async move { state.cache_block(payload).await });
}
}
impl Listener for NewWhitelists {
fn key(&self) -> &str {
"new_whitelists"
}
fn execute(&self, payload: &str) {
debug!("Caching whitelist of {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
spawn(async move { state.cache_whitelist(payload.to_owned()).await });
}
}
impl Listener for NewListeners {
fn key(&self) -> &str {
"new_listeners"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<Url>() {
debug!("Caching listener {}", uri);
let state = self.0.clone();
let _ = self.1.queue(QueryInstance::new(uri.clone()));
let _ = self.1.queue(QueryNodeinfo::new(uri.clone()));
spawn(async move { state.cache_listener(uri).await });
} else {
warn!("Not caching listener {}, parse error", payload);
}
}
}
impl Listener for NewActors {
fn key(&self) -> &str {
"new_actors"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<Url>() {
debug!("Caching actor {}", uri);
let actors = self.0.clone();
spawn(async move { actors.cache_follower(uri).await });
} else {
warn!("Not caching actor {}, parse error", payload);
}
}
}
impl Listener for NewNodes {
fn key(&self) -> &str {
"new_nodes"
}
fn execute(&self, payload: &str) {
if let Ok(uuid) = payload.parse::<Uuid>() {
debug!("Caching node {}", uuid);
let nodes = self.0.clone();
spawn(async move { nodes.cache_by_id(uuid).await });
} else {
warn!("Not caching node {}, parse error", payload);
}
}
}
impl Listener for RmBlocks {
fn key(&self) -> &str {
"rm_blocks"
}
fn execute(&self, payload: &str) {
debug!("Busting block cache for {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
spawn(async move { state.bust_block(&payload).await });
}
}
impl Listener for RmWhitelists {
fn key(&self) -> &str {
"rm_whitelists"
}
fn execute(&self, payload: &str) {
debug!("Busting whitelist cache for {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
spawn(async move { state.bust_whitelist(&payload).await });
}
}
impl Listener for RmListeners {
fn key(&self) -> &str {
"rm_listeners"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<Url>() {
debug!("Busting listener cache for {}", uri);
let state = self.0.clone();
spawn(async move { state.bust_listener(&uri).await });
} else {
warn!("Not busting listener cache for {}", payload);
}
}
}
impl Listener for RmActors {
fn key(&self) -> &str {
"rm_actors"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<Url>() {
debug!("Busting actor cache for {}", uri);
let actors = self.0.clone();
spawn(async move { actors.bust_follower(&uri).await });
} else {
warn!("Not busting actor cache for {}", payload);
}
}
}
impl Listener for RmNodes {
fn key(&self) -> &str {
"rm_nodes"
}
fn execute(&self, payload: &str) {
if let Ok(uuid) = payload.parse::<Uuid>() {
debug!("Caching node {}", uuid);
let nodes = self.0.clone();
spawn(async move { nodes.bust_by_id(uuid).await });
} else {
warn!("Not caching node {}, parse error", payload);
}
}
}

View file

@ -1,7 +1,8 @@
use crate::{ use crate::{
apub::{AcceptedActivities, AcceptedUndoObjects, UndoTypes, ValidTypes}, apub::{AcceptedActivities, AcceptedUndoObjects, UndoTypes, ValidTypes},
config::{Config, UrlKind}, config::{Config, UrlKind},
data::{Actor, ActorCache, State}, data::{ActorCache, State},
db::Actor,
error::MyError, error::MyError,
jobs::apub::{Announce, Follow, Forward, Reject, Undo}, jobs::apub::{Announce, Follow, Forward, Reject, Undo},
jobs::JobServer, jobs::JobServer,
@ -12,7 +13,6 @@ use activitystreams::{
activity, base::AnyBase, prelude::*, primitives::OneOrMany, public, url::Url, activity, base::AnyBase, prelude::*, primitives::OneOrMany, public, url::Url,
}; };
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use futures::join;
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
use log::error; use log::error;
@ -35,21 +35,14 @@ pub async fn route(
.await? .await?
.into_inner(); .into_inner();
let (is_blocked, is_whitelisted, is_listener) = join!( let is_allowed = state.db.is_allowed(actor.id.clone()).await?;
state.is_blocked(&actor.id), let is_connected = state.db.is_connected(actor.id.clone()).await?;
state.is_whitelisted(&actor.id),
state.is_listener(&actor.inbox)
);
if is_blocked { if !is_allowed {
return Err(MyError::Blocked(actor.id.to_string())); return Err(MyError::NotAllowed(actor.id.to_string()));
} }
if !is_whitelisted { if !is_connected && !valid_without_listener(&input)? {
return Err(MyError::Whitelist(actor.id.to_string()));
}
if !is_listener && !valid_without_listener(&input)? {
return Err(MyError::NotSubscribed(actor.inbox.to_string())); return Err(MyError::NotSubscribed(actor.inbox.to_string()));
} }
@ -73,9 +66,9 @@ pub async fn route(
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &jobs, input, actor).await? handle_announce(&state, &jobs, input, actor).await?
} }
ValidTypes::Follow => handle_follow(&config, &jobs, input, actor, is_listener).await?, ValidTypes::Follow => handle_follow(&config, &jobs, input, actor, is_connected).await?,
ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?, ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?,
ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_listener).await?, ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?,
}; };
Ok(accepted(serde_json::json!({}))) Ok(accepted(serde_json::json!({})))

View file

@ -8,7 +8,7 @@ pub async fn route(
state: web::Data<State>, state: web::Data<State>,
config: web::Data<Config>, config: web::Data<Config>,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
let mut nodes = state.node_cache().nodes().await; let mut nodes = state.node_cache().nodes().await?;
nodes.shuffle(&mut thread_rng()); nodes.shuffle(&mut thread_rng());
let mut buf = BufWriter::new(Vec::new()); let mut buf = BufWriter::new(Vec::new());

View file

@ -1,4 +1,4 @@
use crate::{data::Media, error::MyError, requests::Requests}; use crate::{data::MediaCache, error::MyError, requests::Requests};
use actix_web::{ use actix_web::{
http::header::{CacheControl, CacheDirective}, http::header::{CacheControl, CacheDirective},
web, HttpResponse, web, HttpResponse,
@ -6,13 +6,13 @@ use actix_web::{
use uuid::Uuid; use uuid::Uuid;
pub async fn route( pub async fn route(
media: web::Data<Media>, media: web::Data<MediaCache>,
requests: web::Data<Requests>, requests: web::Data<Requests>,
uuid: web::Path<Uuid>, uuid: web::Path<Uuid>,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some((content_type, bytes)) = media.get_bytes(uuid).await { if let Some((content_type, bytes)) = media.get_bytes(uuid).await? {
return Ok(cached(content_type, bytes)); return Ok(cached(content_type, bytes));
} }
@ -21,7 +21,7 @@ pub async fn route(
media media
.store_bytes(uuid, content_type.clone(), bytes.clone()) .store_bytes(uuid, content_type.clone(), bytes.clone())
.await; .await?;
return Ok(cached(content_type, bytes)); return Ok(cached(content_type, bytes));
} }

View file

@ -46,14 +46,16 @@ pub async fn route(config: web::Data<Config>, state: web::Data<State>) -> web::J
}, },
metadata: Metadata { metadata: Metadata {
peers: state peers: state
.listeners() .db
.inboxes()
.await .await
.unwrap_or(vec![])
.iter() .iter()
.filter_map(|listener| listener.domain()) .filter_map(|listener| listener.domain())
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect(), .collect(),
blocks: if config.publish_blocks() { blocks: if config.publish_blocks() {
Some(state.blocks().await) Some(state.db.blocks().await.unwrap_or(vec![]))
} else { } else {
None None
}, },

View file

@ -1,99 +0,0 @@
table! {
actors (id) {
id -> Uuid,
actor_id -> Text,
public_key -> Text,
public_key_id -> Text,
listener_id -> Uuid,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
blocks (id) {
id -> Uuid,
domain_name -> Text,
created_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
table! {
jobs (id) {
id -> Uuid,
job_id -> Uuid,
job_queue -> Text,
job_timeout -> Int8,
job_updated -> Timestamp,
job_status -> Text,
job_value -> Jsonb,
job_next_run -> Nullable<Timestamp>,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
listeners (id) {
id -> Uuid,
actor_id -> Text,
created_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
table! {
media (id) {
id -> Uuid,
media_id -> Uuid,
url -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
nodes (id) {
id -> Uuid,
listener_id -> Uuid,
nodeinfo -> Nullable<Jsonb>,
instance -> Nullable<Jsonb>,
contact -> Nullable<Jsonb>,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
settings (id) {
id -> Uuid,
key -> Text,
value -> Text,
created_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
table! {
whitelists (id) {
id -> Uuid,
domain_name -> Text,
created_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
joinable!(actors -> listeners (listener_id));
joinable!(nodes -> listeners (listener_id));
allow_tables_to_appear_in_same_query!(
actors,
blocks,
jobs,
listeners,
media,
nodes,
settings,
whitelists,
);

View file

@ -1,4 +1,4 @@
@use crate::data::Contact; @use crate::db::Contact;
@use activitystreams::url::Url; @use activitystreams::url::Url;
@(contact: &Contact, base: &Url) @(contact: &Contact, base: &Url)

View file

@ -1,4 +1,4 @@
@use crate::data::Info; @use crate::db::Info;
@use activitystreams::url::Url; @use activitystreams::url::Url;
@(info: &Info, base: &Url) @(info: &Info, base: &Url)

View file

@ -1,4 +1,4 @@
@use crate::{data::{Contact, Instance}, templates::admin}; @use crate::{db::{Contact, Instance}, templates::admin};
@use activitystreams::url::Url; @use activitystreams::url::Url;
@(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &Url) @(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &Url)