Move notify to registration-based triggers, store nodes in db

This commit is contained in:
asonix 2020-03-25 17:10:10 -05:00
parent 0a43fd3a22
commit 78a359c403
13 changed files with 576 additions and 178 deletions

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TRIGGER IF EXISTS nodes_notify ON nodes;
DROP FUNCTION IF EXISTS invoke_nodes_trigger();

View file

@ -0,0 +1,37 @@
-- Your SQL goes here
CREATE OR REPLACE FUNCTION invoke_nodes_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_nodes';
payload := NEW.listener_id;
WHEN 'UPDATE' THEN
rec := NEW;
channel := 'new_nodes';
payload := NEW.listener_id;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_nodes';
payload := OLD.listener_id;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE TRIGGER nodes_notify
AFTER INSERT OR UPDATE OR DELETE
ON nodes
FOR EACH ROW
EXECUTE PROCEDURE invoke_nodes_trigger();

View file

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE nodes DROP CONSTRAINT nodes_listener_ids_unique;

View file

@ -0,0 +1,2 @@
-- Your SQL goes here
ALTER TABLE nodes ADD CONSTRAINT nodes_listener_ids_unique UNIQUE (listener_id);

View file

@ -1,21 +1,28 @@
use crate::{db::Db, error::MyError};
use activitystreams::primitives::XsdAnyUri;
use bb8_postgres::tokio_postgres::types::Json;
use log::error;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use uuid::Uuid;
pub type ListenersCache = Arc<RwLock<HashSet<XsdAnyUri>>>;
#[derive(Clone)]
pub struct NodeCache {
db: Db,
listeners: ListenersCache,
nodes: Arc<RwLock<HashMap<XsdAnyUri, Node>>>,
}
impl NodeCache {
pub fn new(listeners: ListenersCache) -> Self {
pub fn new(db: Db, listeners: ListenersCache) -> Self {
NodeCache {
db,
listeners,
nodes: Arc::new(RwLock::new(HashMap::new())),
}
@ -38,67 +45,247 @@ impl NodeCache {
.collect()
}
pub async fn set_info(
&self,
listener: XsdAnyUri,
software: String,
version: String,
reg: bool,
) {
if !self.listeners.read().await.contains(&listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
pub async fn is_nodeinfo_outdated(&self, listener: &XsdAnyUri) -> bool {
let read_guard = self.nodes.read().await;
let node = match read_guard.get(listener) {
None => return true,
Some(node) => node,
};
match node.info.as_ref() {
Some(nodeinfo) => nodeinfo.outdated(),
None => true,
}
}
pub async fn is_contact_outdated(&self, listener: &XsdAnyUri) -> bool {
let read_guard = self.nodes.read().await;
let node = match read_guard.get(listener) {
None => return true,
Some(node) => node,
};
match node.contact.as_ref() {
Some(contact) => contact.outdated(),
None => true,
}
}
pub async fn is_instance_outdated(&self, listener: &XsdAnyUri) -> bool {
let read_guard = self.nodes.read().await;
let node = match read_guard.get(listener) {
None => return true,
Some(node) => node,
};
match node.instance.as_ref() {
Some(instance) => instance.outdated(),
None => true,
}
}
pub async fn cache_by_id(&self, id: Uuid) {
if let Err(e) = self.do_cache_by_id(id).await {
error!("Error loading node into cache, {}", e);
}
}
pub async fn bust_by_id(&self, id: Uuid) {
if let Err(e) = self.do_bust_by_id(id).await {
error!("Error busting node cache, {}", e);
}
}
async fn do_bust_by_id(&self, id: Uuid) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"SELECT ls.actor_id
FROM listeners AS ls
INNER JOIN nodes AS nd ON nd.listener_id = ls.id
WHERE nd.id = $1::UUID
LIMIT 1;",
&[&id],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(());
};
let listener: String = row.try_get(0)?;
let listener: XsdAnyUri = listener.parse()?;
let mut write_guard = self.nodes.write().await;
write_guard.remove(&listener);
Ok(())
}
async fn do_cache_by_id(&self, id: Uuid) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"SELECT ls.actor_id, nd.nodeinfo, nd.instance, nd.contact
FROM nodes AS nd
INNER JOIN listeners AS ls ON nd.listener_id = ls.id
WHERE nd.id = $1::UUID
LIMIT 1;",
&[&id],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(());
};
let listener: String = row.try_get(0)?;
let listener: XsdAnyUri = listener.parse()?;
let info: Option<Json<Info>> = row.try_get(1)?;
let instance: Option<Json<Instance>> = row.try_get(2)?;
let contact: Option<Json<Contact>> = row.try_get(3)?;
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
if let Some(info) = info {
node.info = Some(info.0);
}
if let Some(instance) = instance {
node.instance = Some(instance.0);
}
if let Some(contact) = contact {
node.contact = Some(contact.0);
}
Ok(())
}
pub async fn set_info(
&self,
listener: &XsdAnyUri,
software: String,
version: String,
reg: bool,
) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(listener);
return Ok(());
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener.clone()));
node.set_info(software, version, reg);
self.save(listener, &*node).await?;
Ok(())
}
pub async fn set_instance(
&self,
listener: XsdAnyUri,
listener: &XsdAnyUri,
title: String,
description: String,
version: String,
reg: bool,
requires_approval: bool,
) {
if !self.listeners.read().await.contains(&listener) {
) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
nodes.remove(listener);
return Ok(());
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
.or_insert(Node::new(listener.clone()));
node.set_instance(title, description, version, reg, requires_approval);
self.save(listener, &*node).await?;
Ok(())
}
pub async fn set_contact(
&self,
listener: XsdAnyUri,
listener: &XsdAnyUri,
username: String,
display_name: String,
url: XsdAnyUri,
avatar: XsdAnyUri,
) {
if !self.listeners.read().await.contains(&listener) {
) -> Result<(), MyError> {
if !self.listeners.read().await.contains(listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
nodes.remove(listener);
return Ok(());
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
.or_insert(Node::new(listener.clone()));
node.set_contact(username, display_name, url, avatar);
self.save(listener, &*node).await?;
Ok(())
}
pub async fn save(&self, listener: &XsdAnyUri, node: &Node) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;",
&[&listener.as_str()],
)
.await?;
let id: Uuid = if let Some(row) = row_opt {
row.try_get(0)?
} else {
return Err(MyError::NotSubscribed(listener.as_str().to_owned()));
};
conn.execute(
"INSERT INTO nodes (
listener_id,
nodeinfo,
instance,
contact,
created_at,
updated_at
) VALUES (
$1::UUID,
$2::JSONB,
$3::JSONB,
$4::JSONB,
'now',
'now'
) ON CONFLICT (listener_id)
DO UPDATE SET
nodeinfo = $2::JSONB,
instance = $3::JSONB,
contact = $4::JSONB;",
&[
&id,
&Json(&node.info),
&Json(&node.instance),
&Json(&node.contact),
],
)
.await?;
Ok(())
}
}
@ -130,6 +317,7 @@ impl Node {
software,
version,
reg,
updated: SystemTime::now(),
});
self
}
@ -148,6 +336,7 @@ impl Node {
version,
reg,
requires_approval,
updated: SystemTime::now(),
});
self
}
@ -164,31 +353,55 @@ impl Node {
display_name,
url,
avatar,
updated: SystemTime::now(),
});
self
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Info {
pub software: String,
pub version: String,
pub reg: bool,
pub updated: SystemTime,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Instance {
pub title: String,
pub description: String,
pub version: String,
pub reg: bool,
pub requires_approval: bool,
pub updated: SystemTime,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Contact {
pub username: String,
pub display_name: String,
pub url: XsdAnyUri,
pub avatar: XsdAnyUri,
pub updated: SystemTime,
}
static TEN_MINUTES: Duration = Duration::from_secs(60 * 10);
impl Info {
pub fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now()
}
}
impl Instance {
pub fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now()
}
}
impl Contact {
pub fn outdated(&self) -> bool {
self.updated + TEN_MINUTES < SystemTime::now()
}
}

View file

@ -202,7 +202,7 @@ impl State {
blocks: Arc::new(RwLock::new(blocks)),
whitelists: Arc::new(RwLock::new(whitelists)),
listeners: listeners.clone(),
node_cache: NodeCache::new(listeners),
node_cache: NodeCache::new(db.clone(), listeners),
};
state.spawn_rehydrate(db.clone());

View file

@ -136,10 +136,12 @@ pub async fn listen(client: &Client) -> Result<(), Error> {
LISTEN new_whitelists;
LISTEN new_listeners;
LISTEN new_actors;
LISTEN new_nodes;
LISTEN rm_blocks;
LISTEN rm_whitelists;
LISTEN rm_listeners;
LISTEN rm_actors;",
LISTEN rm_actors;
LISTEN rm_nodes",
)
.await?;

View file

@ -2,6 +2,7 @@ use crate::jobs::JobState;
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use background_jobs::{Job, Processor};
use futures::join;
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
@ -18,6 +19,15 @@ impl QueryInstance {
async fn perform(mut self, state: JobState) -> Result<(), Error> {
let listener = self.listener.clone();
let (o1, o2) = join!(
state.node_cache.is_contact_outdated(&listener),
state.node_cache.is_instance_outdated(&listener),
);
if !(o1 || o2) {
return Ok(());
}
let url = self.listener.as_url_mut();
url.set_fragment(None);
url.set_query(None);
@ -38,26 +48,26 @@ impl QueryInstance {
state
.node_cache
.set_contact(
listener.clone(),
&listener,
contact.username,
contact.display_name,
contact.url,
contact.avatar,
)
.await;
.await?;
}
state
.node_cache
.set_instance(
listener,
&listener,
instance.title,
description,
instance.version,
instance.registrations,
instance.approval_required,
)
.await;
.await?;
Ok(())
}

View file

@ -22,38 +22,28 @@ use crate::{
},
requests::Requests,
};
use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig};
use background_jobs::{Job, QueueHandle, WorkerConfig};
use std::time::Duration;
pub fn create_server(db: Db) -> JobServer {
let local = background_jobs::create_server(MemoryStorage::new());
let shared = background_jobs::create_server(Storage::new(db));
local.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 5), Listeners);
JobServer::new(shared, local)
JobServer::new(shared)
}
pub fn create_workers(state: State, actors: ActorCache, job_server: JobServer) {
let state2 = state.clone();
let actors2 = actors.clone();
let job_server2 = job_server.clone();
let remote_handle = job_server.remote.clone();
let local_handle = job_server.local.clone();
WorkerConfig::new(move || JobState::new(state.clone(), actors.clone(), job_server.clone()))
.register(DeliverProcessor)
.register(DeliverManyProcessor)
.set_processor_count("default", 4)
.start(remote_handle);
WorkerConfig::new(move || JobState::new(state2.clone(), actors2.clone(), job_server2.clone()))
.register(NodeinfoProcessor)
.register(InstanceProcessor)
.register(ListenersProcessor)
.set_processor_count("default", 4)
.start(local_handle);
.start(remote_handle);
}
#[derive(Clone)]
@ -68,7 +58,6 @@ pub struct JobState {
#[derive(Clone)]
pub struct JobServer {
remote: QueueHandle,
local: QueueHandle,
}
impl JobState {
@ -84,10 +73,9 @@ impl JobState {
}
impl JobServer {
fn new(remote_handle: QueueHandle, local_handle: QueueHandle) -> Self {
fn new(remote_handle: QueueHandle) -> Self {
JobServer {
remote: remote_handle,
local: local_handle,
}
}
@ -97,11 +85,4 @@ impl JobServer {
{
self.remote.queue(job).map_err(MyError::Queue)
}
pub fn queue_local<J>(&self, job: J) -> Result<(), MyError>
where
J: Job,
{
self.local.queue(job).map_err(MyError::Queue)
}
}

View file

@ -18,6 +18,10 @@ impl QueryNodeinfo {
async fn perform(mut self, state: JobState) -> Result<(), Error> {
let listener = self.listener.clone();
if !state.node_cache.is_nodeinfo_outdated(&listener).await {
return Ok(());
}
let url = self.listener.as_url_mut();
url.set_fragment(None);
url.set_query(None);
@ -39,12 +43,12 @@ impl QueryNodeinfo {
state
.node_cache
.set_info(
listener,
&listener,
nodeinfo.software.name,
nodeinfo.software.version,
nodeinfo.open_registrations,
)
.await;
.await?;
Ok(())
}
}

View file

@ -15,8 +15,8 @@ impl Listeners {
for listener in state.state.listeners().await {
state
.job_server
.queue_local(QueryInstance::new(listener.clone()))?;
state.job_server.queue_local(QueryNodeinfo::new(listener))?;
.queue(QueryInstance::new(listener.clone()))?;
state.job_server.queue(QueryNodeinfo::new(listener))?;
}
Ok(())

View file

@ -66,7 +66,18 @@ async fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone());
let job_server = create_server(db.clone());
notify::spawn(state.clone(), actors.clone(), job_server.clone(), &config)?;
notify::Notifier::new(config.database_url().parse()?)
.register(notify::NewBlocks(state.clone()))
.register(notify::NewWhitelists(state.clone()))
.register(notify::NewListeners(state.clone(), job_server.clone()))
.register(notify::NewActors(actors.clone()))
.register(notify::NewNodes(state.node_cache()))
.register(notify::RmBlocks(state.clone()))
.register(notify::RmWhitelists(state.clone()))
.register(notify::RmListeners(state.clone()))
.register(notify::RmActors(actors.clone()))
.register(notify::RmNodes(state.node_cache()))
.start();
if args.jobs_only() {
for _ in 0..num_cpus::get() {

View file

@ -1,81 +1,54 @@
use crate::{
data::{ActorCache, State},
data::{ActorCache, NodeCache, State},
db::listen,
error::MyError,
jobs::{JobServer, QueryInstance, QueryNodeinfo},
};
use activitystreams::primitives::XsdAnyUri;
use actix::clock::{delay_for, Duration};
use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification};
use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config};
use futures::{
future::ready,
stream::{poll_fn, StreamExt},
};
use log::{debug, error, info, warn};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
async fn handle_notification(
state: State,
actors: ActorCache,
job_server: JobServer,
notif: Notification,
) {
match notif.channel() {
"new_blocks" => {
info!("Caching block of {}", notif.payload());
state.cache_block(notif.payload().to_owned()).await;
}
"new_whitelists" => {
info!("Caching whitelist of {}", notif.payload());
state.cache_whitelist(notif.payload().to_owned()).await;
}
"new_listeners" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Caching listener {}", uri);
state.cache_listener(uri.clone()).await;
let _ = job_server.queue_local(QueryInstance::new(uri.clone()));
let _ = job_server.queue_local(QueryNodeinfo::new(uri));
}
}
"new_actors" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Caching follower {}", uri);
actors.cache_follower(uri).await;
}
}
"rm_blocks" => {
info!("Busting block cache for {}", notif.payload());
state.bust_block(notif.payload()).await;
}
"rm_whitelists" => {
info!("Busting whitelist cache for {}", notif.payload());
state.bust_whitelist(notif.payload()).await;
}
"rm_listeners" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Busting listener cache for {}", uri);
state.bust_listener(&uri).await;
}
}
"rm_actors" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Busting follower cache for {}", uri);
actors.bust_follower(&uri).await;
}
}
_ => (),
};
pub trait Listener {
fn key(&self) -> &str;
fn execute(&self, payload: &str);
}
pub fn spawn(
state: State,
actors: ActorCache,
job_server: JobServer,
config: &crate::config::Config,
) -> Result<(), MyError> {
let config: Config = config.database_url().parse()?;
pub struct Notifier {
config: Config,
listeners: HashMap<String, Vec<Box<dyn Listener + Send + Sync + 'static>>>,
}
impl Notifier {
pub fn new(config: Config) -> Self {
Notifier {
config,
listeners: HashMap::new(),
}
}
pub fn register<L>(mut self, l: L) -> Self
where
L: Listener + Send + Sync + 'static,
{
let v = self
.listeners
.entry(l.key().to_owned())
.or_insert(Vec::new());
v.push(Box::new(l));
self
}
pub fn start(self) {
actix::spawn(async move {
let Notifier { config, listeners } = self;
loop {
let (new_client, mut conn) = match config.connect(NoTls).await {
Ok((client, conn)) => (client, conn),
@ -115,17 +88,177 @@ pub fn spawn(
});
while let Some(n) = stream.next().await {
actix::spawn(handle_notification(
state.clone(),
actors.clone(),
job_server.clone(),
n,
));
if let Some(v) = listeners.get(n.channel()) {
for l in v {
l.execute(n.payload());
}
}
}
drop(client);
warn!("Restarting listener task");
}
});
Ok(())
}
}
pub struct NewBlocks(pub State);
pub struct NewWhitelists(pub State);
pub struct NewListeners(pub State, pub JobServer);
pub struct NewActors(pub ActorCache);
pub struct NewNodes(pub NodeCache);
pub struct RmBlocks(pub State);
pub struct RmWhitelists(pub State);
pub struct RmListeners(pub State);
pub struct RmActors(pub ActorCache);
pub struct RmNodes(pub NodeCache);
impl Listener for NewBlocks {
fn key(&self) -> &str {
"new_blocks"
}
fn execute(&self, payload: &str) {
info!("Caching block of {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
actix::spawn(async move { state.cache_block(payload).await });
}
}
impl Listener for NewWhitelists {
fn key(&self) -> &str {
"new_whitelists"
}
fn execute(&self, payload: &str) {
info!("Caching whitelist of {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
actix::spawn(async move { state.cache_whitelist(payload.to_owned()).await });
}
}
impl Listener for NewListeners {
fn key(&self) -> &str {
"new_listeners"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<XsdAnyUri>() {
info!("Caching listener {}", uri);
let state = self.0.clone();
let _ = self.1.queue(QueryInstance::new(uri.clone()));
let _ = self.1.queue(QueryNodeinfo::new(uri.clone()));
actix::spawn(async move { state.cache_listener(uri).await });
} else {
warn!("Not caching listener {}, parse error", payload);
}
}
}
impl Listener for NewActors {
fn key(&self) -> &str {
"new_actors"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<XsdAnyUri>() {
info!("Caching actor {}", uri);
let actors = self.0.clone();
actix::spawn(async move { actors.cache_follower(uri).await });
} else {
warn!("Not caching actor {}, parse error", payload);
}
}
}
impl Listener for NewNodes {
fn key(&self) -> &str {
"new_nodes"
}
fn execute(&self, payload: &str) {
if let Ok(uuid) = payload.parse::<Uuid>() {
info!("Caching node {}", uuid);
let nodes = self.0.clone();
actix::spawn(async move { nodes.cache_by_id(uuid).await });
} else {
warn!("Not caching node {}, parse error", payload);
}
}
}
impl Listener for RmBlocks {
fn key(&self) -> &str {
"rm_blocks"
}
fn execute(&self, payload: &str) {
info!("Busting block cache for {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
actix::spawn(async move { state.bust_block(&payload).await });
}
}
impl Listener for RmWhitelists {
fn key(&self) -> &str {
"rm_whitelists"
}
fn execute(&self, payload: &str) {
info!("Busting whitelist cache for {}", payload);
let state = self.0.clone();
let payload = payload.to_owned();
actix::spawn(async move { state.bust_whitelist(&payload).await });
}
}
impl Listener for RmListeners {
fn key(&self) -> &str {
"rm_listeners"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<XsdAnyUri>() {
info!("Busting listener cache for {}", uri);
let state = self.0.clone();
actix::spawn(async move { state.bust_listener(&uri).await });
} else {
warn!("Not busting listener cache for {}", payload);
}
}
}
impl Listener for RmActors {
fn key(&self) -> &str {
"rm_actors"
}
fn execute(&self, payload: &str) {
if let Ok(uri) = payload.parse::<XsdAnyUri>() {
info!("Busting actor cache for {}", uri);
let actors = self.0.clone();
actix::spawn(async move { actors.bust_follower(&uri).await });
} else {
warn!("Not busting actor cache for {}", payload);
}
}
}
impl Listener for RmNodes {
fn key(&self) -> &str {
"rm_nodes"
}
fn execute(&self, payload: &str) {
if let Ok(uuid) = payload.parse::<Uuid>() {
info!("Caching node {}", uuid);
let nodes = self.0.clone();
actix::spawn(async move { nodes.bust_by_id(uuid).await });
} else {
warn!("Not caching node {}, parse error", payload);
}
}
}