Basic relay functionality

This commit is contained in:
asonix 2020-03-15 17:37:53 -05:00
parent aa6b5daaf0
commit 0cbe679e65
11 changed files with 391 additions and 37 deletions

1
.env
View file

@ -1 +1,2 @@
DATABASE_URL=postgres://ap_actix:ap_actix@localhost:5432/ap_actix
HOSTNAME=localhost:8080

10
Cargo.lock generated
View file

@ -374,6 +374,7 @@ dependencies = [
"thiserror",
"tokio",
"ttl_cache",
"uuid",
]
[[package]]
@ -2117,6 +2118,15 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
dependencies = [
"rand",
]
[[package]]
name = "vcpkg"
version = "0.2.8"

View file

@ -23,3 +23,4 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "0.2.13", features = ["sync"] }
ttl_cache = "0.5.1"
uuid = { version = "0.8", features = ["v4"] }

View file

@ -3,7 +3,7 @@ CREATE TABLE listeners (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
updated_at TIMESTAMP
);
CREATE INDEX listeners_actor_id_index ON listeners(actor_id);

View file

@ -3,7 +3,7 @@ CREATE TABLE blocks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain_name TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
updated_at TIMESTAMP
);
CREATE INDEX blocks_domain_name_index ON blocks(domain_name);

View file

@ -3,7 +3,7 @@ CREATE TABLE whitelists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain_name TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
updated_at TIMESTAMP
);
CREATE INDEX whitelists_domain_name_index ON whitelists(domain_name);

View file

@ -3,6 +3,7 @@ use activitystreams::{
primitives::XsdAnyUri,
PropRefs,
};
use std::collections::HashMap;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PropRefs)]
#[serde(rename_all = "camelCase")]
@ -12,6 +13,9 @@ pub struct AnyExistingObject {
#[serde(rename = "type")]
pub kind: String,
#[serde(flatten)]
ext: HashMap<String, serde_json::Value>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -22,6 +26,7 @@ pub enum ValidTypes {
Delete,
Follow,
Undo,
Update,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -43,6 +48,9 @@ pub struct AcceptedObjects {
pub actor: XsdAnyUri,
pub object: ValidObjects,
#[serde(flatten)]
ext: HashMap<String, serde_json::Value>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -71,6 +79,28 @@ impl ValidObjects {
ValidObjects::Object(ref obj) => &obj.id,
}
}
pub fn is_kind(&self, query_kind: &str) -> bool {
match self {
ValidObjects::Id(_) => false,
ValidObjects::Object(AnyExistingObject { kind, .. }) => kind == query_kind,
}
}
pub fn child_object_is_actor(&self) -> bool {
match self {
ValidObjects::Id(_) => false,
ValidObjects::Object(AnyExistingObject { ext, .. }) => {
if let Some(o) = ext.get("object") {
if let Ok(s) = serde_json::from_value::<String>(o.clone()) {
return s.ends_with("/actor");
}
}
false
}
}
}
}
impl AcceptedActors {

View file

@ -1,16 +1,18 @@
use activitystreams::{
activity::apub::{Accept, Follow},
activity::apub::{Accept, Announce, Follow, Undo},
context,
primitives::XsdAnyUri,
};
use actix::Addr;
use actix_web::{client::Client, web, Responder};
use actix_web::{client::Client, web, HttpResponse};
use futures::join;
use log::{error, info};
use log::error;
use std::collections::HashMap;
use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes},
db_actor::{DbActor, DbQuery, Pool},
state::State,
state::{State, UrlKind},
};
#[derive(Clone, Debug, thiserror::Error)]
@ -22,28 +24,146 @@ pub async fn inbox(
state: web::Data<State>,
client: web::Data<Client>,
input: web::Json<AcceptedObjects>,
) -> Result<impl Responder, MyError> {
) -> Result<HttpResponse, MyError> {
let input = input.into_inner();
let actor = fetch_actor(state.clone(), client, &input.actor).await?;
let actor = fetch_actor(state.clone(), &client, &input.actor).await?;
match input.kind {
ValidTypes::Announce => (),
ValidTypes::Create => (),
ValidTypes::Delete => (),
ValidTypes::Follow => return handle_follow(db_actor, state, input, actor).await,
ValidTypes::Undo => (),
ValidTypes::Announce | ValidTypes::Create => {
handle_relay(state, client, input, actor).await
}
ValidTypes::Follow => handle_follow(db_actor, state, client, input, actor).await,
ValidTypes::Delete | ValidTypes::Update => {
handle_forward(state, client, input, actor).await
}
ValidTypes::Undo => handle_undo(db_actor, state, client, input, actor).await,
}
}
Err(MyError)
pub fn response<T>(item: T) -> HttpResponse
where
T: serde::ser::Serialize,
{
HttpResponse::Accepted()
.content_type("application/activity+json")
.json(item)
}
async fn handle_undo(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") {
return Err(MyError);
}
let inbox = actor.inbox().to_owned();
let state2 = state.clone().into_inner();
db_actor.do_send(DbQuery(move |pool: Pool| {
let inbox = inbox.clone();
async move {
let conn = pool.get().await?;
state2.remove_listener(&conn, &inbox).await.map_err(|e| {
error!("Error removing listener, {}", e);
e
})
}
}));
let mut undo = Undo::default();
let mut follow = Follow::default();
follow
.object_props
.set_id(state.generate_url(UrlKind::Activity))?;
follow
.follow_props
.set_actor_xsd_any_uri(actor.id.clone())?
.set_object_xsd_any_uri(actor.id.clone())?;
undo.object_props
.set_id(state.generate_url(UrlKind::Activity))?
.set_many_to_xsd_any_uris(vec![actor.id.clone()])?
.set_context_xsd_any_uri(context())?;
undo.undo_props
.set_object_object_box(follow)?
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
if input.object.child_object_is_actor() {
let undo2 = undo.clone();
let client = client.into_inner();
actix::Arbiter::spawn(async move {
let _ = deliver(&client, actor.id, &undo2).await;
});
}
Ok(response(undo))
}
async fn handle_forward(
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
let inboxes = get_inboxes(&state, &actor, &object_id).await?;
deliver_many(client, inboxes, input);
Ok(response(HashMap::<(), ()>::new()))
}
async fn handle_relay(
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
if state.is_cached(object_id).await {
return Err(MyError);
}
let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?;
let mut announce = Announce::default();
announce
.object_props
.set_context_xsd_any_uri(context())?
.set_many_to_xsd_any_uris(vec![state.generate_url(UrlKind::Followers)])?
.set_id(activity_id.clone())?;
announce
.announce_props
.set_object_xsd_any_uri(object_id.clone())?
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
let inboxes = get_inboxes(&state, &actor, &object_id).await?;
deliver_many(client, inboxes, announce);
state.cache(object_id.to_owned(), activity_id).await;
Ok(response(HashMap::<(), ()>::new()))
}
async fn handle_follow(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<web::Json<Accept>, MyError> {
) -> Result<HttpResponse, MyError> {
let (is_listener, is_blocked, is_whitelisted) = join!(
state.is_listener(&actor.id),
state.is_blocked(&actor.id),
@ -61,44 +181,55 @@ async fn handle_follow(
}
if !is_listener {
let state = state.into_inner();
let state = state.clone().into_inner();
let actor = actor.clone();
let inbox = actor.inbox().to_owned();
db_actor.do_send(DbQuery(move |pool: Pool| {
let actor_id = actor.id.clone();
let inbox = inbox.clone();
let state = state.clone();
async move {
let conn = pool.get().await?;
state.add_listener(&conn, actor_id).await
state.add_listener(&conn, inbox).await.map_err(|e| {
error!("Error adding listener, {}", e);
e
})
}
}));
}
let actor_inbox = actor.inbox().clone();
let mut accept = Accept::default();
let mut follow = Follow::default();
follow.object_props.set_id(input.id)?;
follow
.follow_props
.set_object_xsd_any_uri(format!("https://{}/actor", "localhost"))?
.set_object_xsd_any_uri(state.generate_url(UrlKind::Actor))?
.set_actor_xsd_any_uri(actor.id.clone())?;
accept
.object_props
.set_id(format!("https://{}/activities/{}", "localhost", "1"))?
.set_id(state.generate_url(UrlKind::Activity))?
.set_many_to_xsd_any_uris(vec![actor.id])?;
accept
.accept_props
.set_object_object_box(follow)?
.set_actor_xsd_any_uri(format!("https://{}/actor", "localhost"))?;
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
Ok(web::Json(accept))
let client = client.into_inner();
let accept2 = accept.clone();
actix::Arbiter::spawn(async move {
let _ = deliver(&client, actor_inbox, &accept2).await;
});
Ok(response(accept))
}
async fn fetch_actor(
state: web::Data<State>,
client: web::Data<Client>,
client: &web::Data<Client>,
actor_id: &XsdAnyUri,
) -> Result<AcceptedActors, MyError> {
if let Some(actor) = state.get_actor(actor_id).await {
@ -111,13 +242,13 @@ async fn fetch_actor(
.send()
.await
.map_err(|e| {
error!("Couldn't send request for actor, {}", e);
error!("Couldn't send request to {} for actor, {}", actor_id, e);
MyError
})?
.json()
.await
.map_err(|e| {
error!("Coudn't fetch actor, {}", e);
error!("Coudn't fetch actor from {}, {}", actor_id, e);
MyError
})?;
@ -126,6 +257,65 @@ async fn fetch_actor(
Ok(actor)
}
fn deliver_many<T>(client: web::Data<Client>, inboxes: Vec<XsdAnyUri>, item: T)
where
T: serde::ser::Serialize + 'static,
{
let client = client.into_inner();
actix::Arbiter::spawn(async move {
use futures::stream::StreamExt;
let client = client.clone();
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(deliver(&client, inbox, &item));
}
while let Some(_) = unordered.next().await {}
});
}
async fn deliver<T>(
client: &std::sync::Arc<Client>,
inbox: XsdAnyUri,
item: &T,
) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
let res = client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.send_json(item)
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
MyError
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
return Err(MyError);
}
Ok(())
}
async fn get_inboxes(
state: &web::Data<State>,
actor: &AcceptedActors,
object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> {
let domain = object_id.as_url().host().ok_or(MyError)?.to_string();
let inbox = actor.inbox();
Ok(state.listeners_without(&inbox, &domain).await)
}
impl actix_web::error::ResponseError for MyError {}
impl From<std::convert::Infallible> for MyError {

View file

@ -1,5 +1,6 @@
#![feature(drain_filter)]
use actix_web::{client::Client, web, App, HttpServer, Responder};
use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties};
use actix_web::{client::Client, middleware::Logger, web, App, HttpServer, Responder};
use bb8_postgres::tokio_postgres;
mod apub;
@ -8,12 +9,42 @@ mod inbox;
mod label;
mod state;
use self::{db_actor::DbActor, label::ArbiterLabelFactory, state::State};
use self::{
db_actor::DbActor,
inbox::MyError,
label::ArbiterLabelFactory,
state::{State, UrlKind},
};
async fn index() -> impl Responder {
"hewwo, mr obama"
}
async fn actor_route(state: web::Data<State>) -> Result<impl Responder, MyError> {
let mut application = Application::default();
let mut endpoint = EndpointProperties::default();
endpoint.set_shared_inbox(format!("https://{}/inbox", "localhost"))?;
application
.object_props
.set_id(state.generate_url(UrlKind::Actor))?
.set_summary_xsd_string("AodeRelay bot")?
.set_name_xsd_string("AodeRelay")?
.set_url_xsd_any_uri(state.generate_url(UrlKind::Actor))?
.set_context_xsd_any_uri(context())?;
application
.ap_actor_props
.set_preferred_username("relay")?
.set_followers(state.generate_url(UrlKind::Followers))?
.set_following(state.generate_url(UrlKind::Following))?
.set_inbox(state.generate_url(UrlKind::Inbox))?
.set_endpoints(endpoint)?;
Ok(inbox::response(application))
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
@ -21,13 +52,19 @@ async fn main() -> Result<(), anyhow::Error> {
pretty_env_logger::init();
let pg_config: tokio_postgres::Config = std::env::var("DATABASE_URL")?.parse()?;
let hostname: String = std::env::var("HOSTNAME")?;
let use_whitelist = std::env::var("USE_WHITELIST").is_ok();
let use_https = std::env::var("USE_HTTPS").is_ok();
let arbiter_labeler = ArbiterLabelFactory::new();
let db_actor = DbActor::new(pg_config.clone());
arbiter_labeler.clone().set_label();
let state: State = db_actor
.send(db_actor::DbQuery(|pool| State::hydrate(false, pool)))
.send(db_actor::DbQuery(move |pool| {
State::hydrate(use_https, use_whitelist, hostname, pool)
}))
.await?
.await??;
@ -37,11 +74,13 @@ async fn main() -> Result<(), anyhow::Error> {
let client = Client::default();
App::new()
.wrap(Logger::default())
.data(actor)
.data(state.clone())
.data(client)
.service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/inbox").route(web::post().to(inbox::inbox)))
.service(web::resource("/actor").route(web::get().to(actor_route)))
})
.bind("127.0.0.1:8080")?
.run()

View file

@ -3,7 +3,7 @@ table! {
id -> Uuid,
domain_name -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
@ -12,7 +12,7 @@ table! {
id -> Uuid,
actor_id -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}
@ -21,7 +21,7 @@ table! {
id -> Uuid,
domain_name -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}

View file

@ -6,11 +6,14 @@ use lru::LruCache;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
use ttl_cache::TtlCache;
use uuid::Uuid;
use crate::{apub::AcceptedActors, db_actor::Pool};
#[derive(Clone)]
pub struct State {
use_https: bool,
hostname: String,
whitelist_enabled: bool,
actor_cache: Arc<RwLock<TtlCache<XsdAnyUri, AcceptedActors>>>,
actor_id_cache: Arc<RwLock<LruCache<XsdAnyUri, XsdAnyUri>>>,
@ -19,11 +22,69 @@ pub struct State {
listeners: Arc<RwLock<HashSet<XsdAnyUri>>>,
}
pub enum UrlKind {
Activity,
Actor,
Followers,
Following,
Inbox,
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("No host present in URI")]
pub struct HostError;
impl State {
pub fn generate_url(&self, kind: UrlKind) -> String {
let scheme = if self.use_https { "https" } else { "http" };
match kind {
UrlKind::Activity => {
format!("{}://{}/activity/{}", scheme, self.hostname, Uuid::new_v4())
}
UrlKind::Actor => format!("{}://{}/actor", scheme, self.hostname),
UrlKind::Followers => format!("{}://{}/followers", scheme, self.hostname),
UrlKind::Following => format!("{}://{}/following", scheme, self.hostname),
UrlKind::Inbox => format!("{}://{}/inbox", scheme, self.hostname),
}
}
pub async fn remove_listener(&self, client: &Client, inbox: &XsdAnyUri) -> Result<(), Error> {
let hs = self.listeners.clone();
log::info!("DELETE FROM listeners WHERE actor_id = {};", inbox.as_str());
client
.execute(
"DELETE FROM listeners WHERE actor_id = $1::TEXT;",
&[&inbox.as_str()],
)
.await?;
let mut write_guard = hs.write().await;
write_guard.remove(inbox);
Ok(())
}
pub async fn listeners_without(&self, inbox: &XsdAnyUri, domain: &str) -> Vec<XsdAnyUri> {
let hs = self.listeners.clone();
let read_guard = hs.read().await;
read_guard
.iter()
.filter_map(|listener| {
if let Some(host) = listener.as_url().host() {
if listener != inbox && host.to_string() != domain {
return Some(listener.clone());
}
}
None
})
.collect()
}
pub async fn is_whitelisted(&self, actor_id: &XsdAnyUri) -> bool {
if !self.whitelist_enabled {
return true;
@ -94,9 +155,13 @@ impl State {
return Err(HostError.into());
};
log::info!(
"INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]",
host.to_string()
);
client
.execute(
"INSERT INTO blocks (actor_id, created_at) VALUES ($1::TEXT, now);",
"INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now');",
&[&host.to_string()],
)
.await?;
@ -116,9 +181,13 @@ impl State {
return Err(HostError.into());
};
log::info!(
"INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]",
host.to_string()
);
client
.execute(
"INSERT INTO whitelists (actor_id, created_at) VALUES ($1::TEXT, now);",
"INSERT INTO whitelists (domain_name, created_at) VALUES ($1::TEXT, 'now');",
&[&host.to_string()],
)
.await?;
@ -132,9 +201,13 @@ impl State {
pub async fn add_listener(&self, client: &Client, listener: XsdAnyUri) -> Result<(), Error> {
let listeners = self.listeners.clone();
log::info!(
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]",
listener.as_str(),
);
client
.execute(
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, now);",
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');",
&[&listener.as_str()],
)
.await?;
@ -145,7 +218,12 @@ impl State {
Ok(())
}
pub async fn hydrate(whitelist_enabled: bool, pool: Pool) -> Result<Self, Error> {
pub async fn hydrate(
use_https: bool,
whitelist_enabled: bool,
hostname: String,
pool: Pool,
) -> Result<Self, Error> {
let pool1 = pool.clone();
let pool2 = pool.clone();
@ -170,7 +248,9 @@ impl State {
let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?;
Ok(State {
use_https,
whitelist_enabled,
hostname,
actor_cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))),
actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))),
blocks: Arc::new(RwLock::new(blocks)),
@ -181,12 +261,14 @@ impl State {
}
pub async fn hydrate_blocks(client: &Client) -> Result<HashSet<String>, Error> {
log::info!("SELECT domain_name FROM blocks");
let rows = client.query("SELECT domain_name FROM blocks", &[]).await?;
parse_rows(rows)
}
pub async fn hydrate_whitelists(client: &Client) -> Result<HashSet<String>, Error> {
log::info!("SELECT domain_name FROM whitelists");
let rows = client
.query("SELECT domain_name FROM whitelists", &[])
.await?;
@ -195,6 +277,7 @@ pub async fn hydrate_whitelists(client: &Client) -> Result<HashSet<String>, Erro
}
pub async fn hydrate_listeners(client: &Client) -> Result<HashSet<XsdAnyUri>, Error> {
log::info!("SELECT actor_id FROM listeners");
let rows = client.query("SELECT actor_id FROM listeners", &[]).await?;
parse_rows(rows)