From 78a359c4035658b5f7b1dc71c8e6e8b2d1947693 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 25 Mar 2020 17:10:10 -0500 Subject: [PATCH] Move notify to registration-based triggers, store nodes in db --- .../down.sql | 3 + .../up.sql | 37 ++ .../down.sql | 2 + .../up.sql | 2 + src/data/node.rs | 267 +++++++++++-- src/data/state.rs | 2 +- src/db.rs | 4 +- src/jobs/instance.rs | 18 +- src/jobs/mod.rs | 29 +- src/jobs/nodeinfo.rs | 8 +- src/jobs/process_listeners.rs | 4 +- src/main.rs | 13 +- src/notify.rs | 365 ++++++++++++------ 13 files changed, 576 insertions(+), 178 deletions(-) create mode 100644 migrations/2020-03-25-211630_add-node-notifications/down.sql create mode 100644 migrations/2020-03-25-211630_add-node-notifications/up.sql create mode 100644 migrations/2020-03-25-214244_make-nodes-listener-id-unique/down.sql create mode 100644 migrations/2020-03-25-214244_make-nodes-listener-id-unique/up.sql diff --git a/migrations/2020-03-25-211630_add-node-notifications/down.sql b/migrations/2020-03-25-211630_add-node-notifications/down.sql new file mode 100644 index 0000000..b990082 --- /dev/null +++ b/migrations/2020-03-25-211630_add-node-notifications/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP TRIGGER IF EXISTS nodes_notify ON nodes; +DROP FUNCTION IF EXISTS invoke_nodes_trigger(); diff --git a/migrations/2020-03-25-211630_add-node-notifications/up.sql b/migrations/2020-03-25-211630_add-node-notifications/up.sql new file mode 100644 index 0000000..018a89f --- /dev/null +++ b/migrations/2020-03-25-211630_add-node-notifications/up.sql @@ -0,0 +1,37 @@ +-- Your SQL goes here +CREATE OR REPLACE FUNCTION invoke_nodes_trigger () + RETURNS TRIGGER + LANGUAGE plpgsql +AS $$ +DECLARE + rec RECORD; + channel TEXT; + payload TEXT; +BEGIN + case TG_OP + WHEN 'INSERT' THEN + rec := NEW; + channel := 'new_nodes'; + payload := NEW.listener_id; + WHEN 'UPDATE' THEN + rec := NEW; + channel := 'new_nodes'; + payload := NEW.listener_id; + WHEN 'DELETE' THEN + rec := OLD; + channel := 'rm_nodes'; + payload := OLD.listener_id; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + + PERFORM pg_notify(channel, payload::TEXT); + RETURN rec; +END; +$$; + +CREATE TRIGGER nodes_notify + AFTER INSERT OR UPDATE OR DELETE + ON nodes +FOR EACH ROW + EXECUTE PROCEDURE invoke_nodes_trigger(); diff --git a/migrations/2020-03-25-214244_make-nodes-listener-id-unique/down.sql b/migrations/2020-03-25-214244_make-nodes-listener-id-unique/down.sql new file mode 100644 index 0000000..4d8057e --- /dev/null +++ b/migrations/2020-03-25-214244_make-nodes-listener-id-unique/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE nodes DROP CONSTRAINT nodes_listener_ids_unique; diff --git a/migrations/2020-03-25-214244_make-nodes-listener-id-unique/up.sql b/migrations/2020-03-25-214244_make-nodes-listener-id-unique/up.sql new file mode 100644 index 0000000..2cd4967 --- /dev/null +++ b/migrations/2020-03-25-214244_make-nodes-listener-id-unique/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +ALTER TABLE nodes ADD CONSTRAINT nodes_listener_ids_unique UNIQUE (listener_id); diff --git a/src/data/node.rs b/src/data/node.rs index b5f5ff7..cf7d428 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -1,21 +1,28 @@ +use crate::{db::Db, error::MyError}; use activitystreams::primitives::XsdAnyUri; +use bb8_postgres::tokio_postgres::types::Json; +use log::error; use std::{ collections::{HashMap, HashSet}, sync::Arc, + time::{Duration, SystemTime}, }; use tokio::sync::RwLock; +use uuid::Uuid; pub type ListenersCache = Arc>>; #[derive(Clone)] pub struct NodeCache { + db: Db, listeners: ListenersCache, nodes: Arc>>, } impl NodeCache { - pub fn new(listeners: ListenersCache) -> Self { + pub fn new(db: Db, listeners: ListenersCache) -> Self { NodeCache { + db, listeners, nodes: Arc::new(RwLock::new(HashMap::new())), } @@ -38,67 +45,247 @@ impl NodeCache { .collect() } - pub async fn set_info( - &self, - listener: XsdAnyUri, - software: String, - version: String, - reg: bool, - ) { - if !self.listeners.read().await.contains(&listener) { - let mut nodes = self.nodes.write().await; - nodes.remove(&listener); - return; + pub async fn is_nodeinfo_outdated(&self, listener: &XsdAnyUri) -> bool { + let read_guard = self.nodes.read().await; + + let node = match read_guard.get(listener) { + None => return true, + Some(node) => node, + }; + + match node.info.as_ref() { + Some(nodeinfo) => nodeinfo.outdated(), + None => true, } + } + + pub async fn is_contact_outdated(&self, listener: &XsdAnyUri) -> bool { + let read_guard = self.nodes.read().await; + + let node = match read_guard.get(listener) { + None => return true, + Some(node) => node, + }; + + match node.contact.as_ref() { + Some(contact) => contact.outdated(), + None => true, + } + } + + pub async fn is_instance_outdated(&self, listener: &XsdAnyUri) -> bool { + let read_guard = self.nodes.read().await; + + let node = match read_guard.get(listener) { + None => return true, + Some(node) => node, + }; + + match node.instance.as_ref() { + Some(instance) => instance.outdated(), + None => true, + } + } + + pub async fn cache_by_id(&self, id: Uuid) { + if let Err(e) = self.do_cache_by_id(id).await { + error!("Error loading node into cache, {}", e); + } + } + + pub async fn bust_by_id(&self, id: Uuid) { + if let Err(e) = self.do_bust_by_id(id).await { + error!("Error busting node cache, {}", e); + } + } + + async fn do_bust_by_id(&self, id: Uuid) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "SELECT ls.actor_id + FROM listeners AS ls + INNER JOIN nodes AS nd ON nd.listener_id = ls.id + WHERE nd.id = $1::UUID + LIMIT 1;", + &[&id], + ) + .await?; + + let row = if let Some(row) = row_opt { + row + } else { + return Ok(()); + }; + + let listener: String = row.try_get(0)?; + let listener: XsdAnyUri = listener.parse()?; + + let mut write_guard = self.nodes.write().await; + write_guard.remove(&listener); + + Ok(()) + } + + async fn do_cache_by_id(&self, id: Uuid) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "SELECT ls.actor_id, nd.nodeinfo, nd.instance, nd.contact + FROM nodes AS nd + INNER JOIN listeners AS ls ON nd.listener_id = ls.id + WHERE nd.id = $1::UUID + LIMIT 1;", + &[&id], + ) + .await?; + + let row = if let Some(row) = row_opt { + row + } else { + return Ok(()); + }; + + let listener: String = row.try_get(0)?; + let listener: XsdAnyUri = listener.parse()?; + let info: Option> = row.try_get(1)?; + let instance: Option> = row.try_get(2)?; + let contact: Option> = row.try_get(3)?; let mut write_guard = self.nodes.write().await; let node = write_guard .entry(listener.clone()) .or_insert(Node::new(listener)); + + if let Some(info) = info { + node.info = Some(info.0); + } + if let Some(instance) = instance { + node.instance = Some(instance.0); + } + if let Some(contact) = contact { + node.contact = Some(contact.0); + } + + Ok(()) + } + + pub async fn set_info( + &self, + listener: &XsdAnyUri, + software: String, + version: String, + reg: bool, + ) -> Result<(), MyError> { + if !self.listeners.read().await.contains(listener) { + let mut nodes = self.nodes.write().await; + nodes.remove(listener); + return Ok(()); + } + + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener.clone())); node.set_info(software, version, reg); + self.save(listener, &*node).await?; + Ok(()) } pub async fn set_instance( &self, - listener: XsdAnyUri, + listener: &XsdAnyUri, title: String, description: String, version: String, reg: bool, requires_approval: bool, - ) { - if !self.listeners.read().await.contains(&listener) { + ) -> Result<(), MyError> { + if !self.listeners.read().await.contains(listener) { let mut nodes = self.nodes.write().await; - nodes.remove(&listener); - return; + nodes.remove(listener); + return Ok(()); } let mut write_guard = self.nodes.write().await; let node = write_guard .entry(listener.clone()) - .or_insert(Node::new(listener)); + .or_insert(Node::new(listener.clone())); node.set_instance(title, description, version, reg, requires_approval); + self.save(listener, &*node).await?; + Ok(()) } pub async fn set_contact( &self, - listener: XsdAnyUri, + listener: &XsdAnyUri, username: String, display_name: String, url: XsdAnyUri, avatar: XsdAnyUri, - ) { - if !self.listeners.read().await.contains(&listener) { + ) -> Result<(), MyError> { + if !self.listeners.read().await.contains(listener) { let mut nodes = self.nodes.write().await; - nodes.remove(&listener); - return; + nodes.remove(listener); + return Ok(()); } let mut write_guard = self.nodes.write().await; let node = write_guard .entry(listener.clone()) - .or_insert(Node::new(listener)); + .or_insert(Node::new(listener.clone())); node.set_contact(username, display_name, url, avatar); + self.save(listener, &*node).await?; + Ok(()) + } + + pub async fn save(&self, listener: &XsdAnyUri, node: &Node) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;", + &[&listener.as_str()], + ) + .await?; + + let id: Uuid = if let Some(row) = row_opt { + row.try_get(0)? + } else { + return Err(MyError::NotSubscribed(listener.as_str().to_owned())); + }; + + conn.execute( + "INSERT INTO nodes ( + listener_id, + nodeinfo, + instance, + contact, + created_at, + updated_at + ) VALUES ( + $1::UUID, + $2::JSONB, + $3::JSONB, + $4::JSONB, + 'now', + 'now' + ) ON CONFLICT (listener_id) + DO UPDATE SET + nodeinfo = $2::JSONB, + instance = $3::JSONB, + contact = $4::JSONB;", + &[ + &id, + &Json(&node.info), + &Json(&node.instance), + &Json(&node.contact), + ], + ) + .await?; + Ok(()) } } @@ -130,6 +317,7 @@ impl Node { software, version, reg, + updated: SystemTime::now(), }); self } @@ -148,6 +336,7 @@ impl Node { version, reg, requires_approval, + updated: SystemTime::now(), }); self } @@ -164,31 +353,55 @@ impl Node { display_name, url, avatar, + updated: SystemTime::now(), }); self } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Info { pub software: String, pub version: String, pub reg: bool, + pub updated: SystemTime, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Instance { pub title: String, pub description: String, pub version: String, pub reg: bool, pub requires_approval: bool, + pub updated: SystemTime, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Contact { pub username: String, pub display_name: String, pub url: XsdAnyUri, pub avatar: XsdAnyUri, + pub updated: SystemTime, +} + +static TEN_MINUTES: Duration = Duration::from_secs(60 * 10); + +impl Info { + pub fn outdated(&self) -> bool { + self.updated + TEN_MINUTES < SystemTime::now() + } +} + +impl Instance { + pub fn outdated(&self) -> bool { + self.updated + TEN_MINUTES < SystemTime::now() + } +} + +impl Contact { + pub fn outdated(&self) -> bool { + self.updated + TEN_MINUTES < SystemTime::now() + } } diff --git a/src/data/state.rs b/src/data/state.rs index 7297fe4..730357d 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -202,7 +202,7 @@ impl State { blocks: Arc::new(RwLock::new(blocks)), whitelists: Arc::new(RwLock::new(whitelists)), listeners: listeners.clone(), - node_cache: NodeCache::new(listeners), + node_cache: NodeCache::new(db.clone(), listeners), }; state.spawn_rehydrate(db.clone()); diff --git a/src/db.rs b/src/db.rs index 5ce5590..1ac300f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -136,10 +136,12 @@ pub async fn listen(client: &Client) -> Result<(), Error> { LISTEN new_whitelists; LISTEN new_listeners; LISTEN new_actors; + LISTEN new_nodes; LISTEN rm_blocks; LISTEN rm_whitelists; LISTEN rm_listeners; - LISTEN rm_actors;", + LISTEN rm_actors; + LISTEN rm_nodes", ) .await?; diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 965fa0d..e6f34cf 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -2,6 +2,7 @@ use crate::jobs::JobState; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; use background_jobs::{Job, Processor}; +use futures::join; use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; @@ -18,6 +19,15 @@ impl QueryInstance { async fn perform(mut self, state: JobState) -> Result<(), Error> { let listener = self.listener.clone(); + let (o1, o2) = join!( + state.node_cache.is_contact_outdated(&listener), + state.node_cache.is_instance_outdated(&listener), + ); + + if !(o1 || o2) { + return Ok(()); + } + let url = self.listener.as_url_mut(); url.set_fragment(None); url.set_query(None); @@ -38,26 +48,26 @@ impl QueryInstance { state .node_cache .set_contact( - listener.clone(), + &listener, contact.username, contact.display_name, contact.url, contact.avatar, ) - .await; + .await?; } state .node_cache .set_instance( - listener, + &listener, instance.title, description, instance.version, instance.registrations, instance.approval_required, ) - .await; + .await?; Ok(()) } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 78f7104..19cc788 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -22,38 +22,28 @@ use crate::{ }, requests::Requests, }; -use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig}; +use background_jobs::{Job, QueueHandle, WorkerConfig}; use std::time::Duration; pub fn create_server(db: Db) -> JobServer { - let local = background_jobs::create_server(MemoryStorage::new()); let shared = background_jobs::create_server(Storage::new(db)); - local.every(Duration::from_secs(60 * 5), Listeners); + shared.every(Duration::from_secs(60 * 5), Listeners); - JobServer::new(shared, local) + JobServer::new(shared) } pub fn create_workers(state: State, actors: ActorCache, job_server: JobServer) { - let state2 = state.clone(); - let actors2 = actors.clone(); - let job_server2 = job_server.clone(); - let remote_handle = job_server.remote.clone(); - let local_handle = job_server.local.clone(); WorkerConfig::new(move || JobState::new(state.clone(), actors.clone(), job_server.clone())) .register(DeliverProcessor) .register(DeliverManyProcessor) - .set_processor_count("default", 4) - .start(remote_handle); - - WorkerConfig::new(move || JobState::new(state2.clone(), actors2.clone(), job_server2.clone())) .register(NodeinfoProcessor) .register(InstanceProcessor) .register(ListenersProcessor) .set_processor_count("default", 4) - .start(local_handle); + .start(remote_handle); } #[derive(Clone)] @@ -68,7 +58,6 @@ pub struct JobState { #[derive(Clone)] pub struct JobServer { remote: QueueHandle, - local: QueueHandle, } impl JobState { @@ -84,10 +73,9 @@ impl JobState { } impl JobServer { - fn new(remote_handle: QueueHandle, local_handle: QueueHandle) -> Self { + fn new(remote_handle: QueueHandle) -> Self { JobServer { remote: remote_handle, - local: local_handle, } } @@ -97,11 +85,4 @@ impl JobServer { { self.remote.queue(job).map_err(MyError::Queue) } - - pub fn queue_local(&self, job: J) -> Result<(), MyError> - where - J: Job, - { - self.local.queue(job).map_err(MyError::Queue) - } } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index d60c144..ef138cf 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -18,6 +18,10 @@ impl QueryNodeinfo { async fn perform(mut self, state: JobState) -> Result<(), Error> { let listener = self.listener.clone(); + if !state.node_cache.is_nodeinfo_outdated(&listener).await { + return Ok(()); + } + let url = self.listener.as_url_mut(); url.set_fragment(None); url.set_query(None); @@ -39,12 +43,12 @@ impl QueryNodeinfo { state .node_cache .set_info( - listener, + &listener, nodeinfo.software.name, nodeinfo.software.version, nodeinfo.open_registrations, ) - .await; + .await?; Ok(()) } } diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index 2969457..449ad01 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -15,8 +15,8 @@ impl Listeners { for listener in state.state.listeners().await { state .job_server - .queue_local(QueryInstance::new(listener.clone()))?; - state.job_server.queue_local(QueryNodeinfo::new(listener))?; + .queue(QueryInstance::new(listener.clone()))?; + state.job_server.queue(QueryNodeinfo::new(listener))?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 9883404..ca39a7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,7 +66,18 @@ async fn main() -> Result<(), anyhow::Error> { let actors = ActorCache::new(db.clone()); let job_server = create_server(db.clone()); - notify::spawn(state.clone(), actors.clone(), job_server.clone(), &config)?; + notify::Notifier::new(config.database_url().parse()?) + .register(notify::NewBlocks(state.clone())) + .register(notify::NewWhitelists(state.clone())) + .register(notify::NewListeners(state.clone(), job_server.clone())) + .register(notify::NewActors(actors.clone())) + .register(notify::NewNodes(state.node_cache())) + .register(notify::RmBlocks(state.clone())) + .register(notify::RmWhitelists(state.clone())) + .register(notify::RmListeners(state.clone())) + .register(notify::RmActors(actors.clone())) + .register(notify::RmNodes(state.node_cache())) + .start(); if args.jobs_only() { for _ in 0..num_cpus::get() { diff --git a/src/notify.rs b/src/notify.rs index 95539eb..c329630 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -1,131 +1,264 @@ use crate::{ - data::{ActorCache, State}, + data::{ActorCache, NodeCache, State}, db::listen, - error::MyError, jobs::{JobServer, QueryInstance, QueryNodeinfo}, }; use activitystreams::primitives::XsdAnyUri; use actix::clock::{delay_for, Duration}; -use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; +use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config}; use futures::{ future::ready, stream::{poll_fn, StreamExt}, }; use log::{debug, error, info, warn}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use uuid::Uuid; -async fn handle_notification( - state: State, - actors: ActorCache, - job_server: JobServer, - notif: Notification, -) { - match notif.channel() { - "new_blocks" => { - info!("Caching block of {}", notif.payload()); - state.cache_block(notif.payload().to_owned()).await; - } - "new_whitelists" => { - info!("Caching whitelist of {}", notif.payload()); - state.cache_whitelist(notif.payload().to_owned()).await; - } - "new_listeners" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Caching listener {}", uri); - state.cache_listener(uri.clone()).await; - let _ = job_server.queue_local(QueryInstance::new(uri.clone())); - let _ = job_server.queue_local(QueryNodeinfo::new(uri)); - } - } - "new_actors" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Caching follower {}", uri); - actors.cache_follower(uri).await; - } - } - "rm_blocks" => { - info!("Busting block cache for {}", notif.payload()); - state.bust_block(notif.payload()).await; - } - "rm_whitelists" => { - info!("Busting whitelist cache for {}", notif.payload()); - state.bust_whitelist(notif.payload()).await; - } - "rm_listeners" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Busting listener cache for {}", uri); - state.bust_listener(&uri).await; - } - } - "rm_actors" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Busting follower cache for {}", uri); - actors.bust_follower(&uri).await; - } - } - _ => (), - }; +pub trait Listener { + fn key(&self) -> &str; + + fn execute(&self, payload: &str); } -pub fn spawn( - state: State, - actors: ActorCache, - job_server: JobServer, - config: &crate::config::Config, -) -> Result<(), MyError> { - let config: Config = config.database_url().parse()?; - - actix::spawn(async move { - loop { - let (new_client, mut conn) = match config.connect(NoTls).await { - Ok((client, conn)) => (client, conn), - Err(e) => { - error!("Error establishing DB Connection, {}", e); - delay_for(Duration::new(5, 0)).await; - continue; - } - }; - - let client = Arc::new(new_client); - let new_client = client.clone(); - - actix::spawn(async move { - if let Err(e) = listen(&new_client).await { - error!("Error listening for updates, {}", e); - } - }); - - let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { - Ok(AsyncMessage::Notification(n)) => { - debug!("Handling Notification, {:?}", n); - ready(Some(n)) - } - Ok(AsyncMessage::Notice(e)) => { - debug!("Handling Notice, {:?}", e); - ready(None) - } - Err(e) => { - debug!("Handling Error, {:?}", e); - ready(None) - } - _ => { - debug!("Handling rest"); - ready(None) - } - }); - - while let Some(n) = stream.next().await { - actix::spawn(handle_notification( - state.clone(), - actors.clone(), - job_server.clone(), - n, - )); - } - - drop(client); - warn!("Restarting listener task"); - } - }); - Ok(()) +pub struct Notifier { + config: Config, + listeners: HashMap>>, +} + +impl Notifier { + pub fn new(config: Config) -> Self { + Notifier { + config, + listeners: HashMap::new(), + } + } + + pub fn register(mut self, l: L) -> Self + where + L: Listener + Send + Sync + 'static, + { + let v = self + .listeners + .entry(l.key().to_owned()) + .or_insert(Vec::new()); + v.push(Box::new(l)); + self + } + + pub fn start(self) { + actix::spawn(async move { + let Notifier { config, listeners } = self; + + loop { + let (new_client, mut conn) = match config.connect(NoTls).await { + Ok((client, conn)) => (client, conn), + Err(e) => { + error!("Error establishing DB Connection, {}", e); + delay_for(Duration::new(5, 0)).await; + continue; + } + }; + + let client = Arc::new(new_client); + let new_client = client.clone(); + + actix::spawn(async move { + if let Err(e) = listen(&new_client).await { + error!("Error listening for updates, {}", e); + } + }); + + let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { + Ok(AsyncMessage::Notification(n)) => { + debug!("Handling Notification, {:?}", n); + ready(Some(n)) + } + Ok(AsyncMessage::Notice(e)) => { + debug!("Handling Notice, {:?}", e); + ready(None) + } + Err(e) => { + debug!("Handling Error, {:?}", e); + ready(None) + } + _ => { + debug!("Handling rest"); + ready(None) + } + }); + + while let Some(n) = stream.next().await { + if let Some(v) = listeners.get(n.channel()) { + for l in v { + l.execute(n.payload()); + } + } + } + + drop(client); + warn!("Restarting listener task"); + } + }); + } +} + +pub struct NewBlocks(pub State); +pub struct NewWhitelists(pub State); +pub struct NewListeners(pub State, pub JobServer); +pub struct NewActors(pub ActorCache); +pub struct NewNodes(pub NodeCache); +pub struct RmBlocks(pub State); +pub struct RmWhitelists(pub State); +pub struct RmListeners(pub State); +pub struct RmActors(pub ActorCache); +pub struct RmNodes(pub NodeCache); + +impl Listener for NewBlocks { + fn key(&self) -> &str { + "new_blocks" + } + + fn execute(&self, payload: &str) { + info!("Caching block of {}", payload); + let state = self.0.clone(); + let payload = payload.to_owned(); + actix::spawn(async move { state.cache_block(payload).await }); + } +} + +impl Listener for NewWhitelists { + fn key(&self) -> &str { + "new_whitelists" + } + + fn execute(&self, payload: &str) { + info!("Caching whitelist of {}", payload); + let state = self.0.clone(); + let payload = payload.to_owned(); + actix::spawn(async move { state.cache_whitelist(payload.to_owned()).await }); + } +} + +impl Listener for NewListeners { + fn key(&self) -> &str { + "new_listeners" + } + + fn execute(&self, payload: &str) { + if let Ok(uri) = payload.parse::() { + info!("Caching listener {}", uri); + let state = self.0.clone(); + let _ = self.1.queue(QueryInstance::new(uri.clone())); + let _ = self.1.queue(QueryNodeinfo::new(uri.clone())); + actix::spawn(async move { state.cache_listener(uri).await }); + } else { + warn!("Not caching listener {}, parse error", payload); + } + } +} + +impl Listener for NewActors { + fn key(&self) -> &str { + "new_actors" + } + + fn execute(&self, payload: &str) { + if let Ok(uri) = payload.parse::() { + info!("Caching actor {}", uri); + let actors = self.0.clone(); + actix::spawn(async move { actors.cache_follower(uri).await }); + } else { + warn!("Not caching actor {}, parse error", payload); + } + } +} + +impl Listener for NewNodes { + fn key(&self) -> &str { + "new_nodes" + } + + fn execute(&self, payload: &str) { + if let Ok(uuid) = payload.parse::() { + info!("Caching node {}", uuid); + let nodes = self.0.clone(); + actix::spawn(async move { nodes.cache_by_id(uuid).await }); + } else { + warn!("Not caching node {}, parse error", payload); + } + } +} + +impl Listener for RmBlocks { + fn key(&self) -> &str { + "rm_blocks" + } + + fn execute(&self, payload: &str) { + info!("Busting block cache for {}", payload); + let state = self.0.clone(); + let payload = payload.to_owned(); + actix::spawn(async move { state.bust_block(&payload).await }); + } +} + +impl Listener for RmWhitelists { + fn key(&self) -> &str { + "rm_whitelists" + } + + fn execute(&self, payload: &str) { + info!("Busting whitelist cache for {}", payload); + let state = self.0.clone(); + let payload = payload.to_owned(); + actix::spawn(async move { state.bust_whitelist(&payload).await }); + } +} + +impl Listener for RmListeners { + fn key(&self) -> &str { + "rm_listeners" + } + + fn execute(&self, payload: &str) { + if let Ok(uri) = payload.parse::() { + info!("Busting listener cache for {}", uri); + let state = self.0.clone(); + actix::spawn(async move { state.bust_listener(&uri).await }); + } else { + warn!("Not busting listener cache for {}", payload); + } + } +} + +impl Listener for RmActors { + fn key(&self) -> &str { + "rm_actors" + } + + fn execute(&self, payload: &str) { + if let Ok(uri) = payload.parse::() { + info!("Busting actor cache for {}", uri); + let actors = self.0.clone(); + actix::spawn(async move { actors.bust_follower(&uri).await }); + } else { + warn!("Not busting actor cache for {}", payload); + } + } +} + +impl Listener for RmNodes { + fn key(&self) -> &str { + "rm_nodes" + } + + fn execute(&self, payload: &str) { + if let Ok(uuid) = payload.parse::() { + info!("Caching node {}", uuid); + let nodes = self.0.clone(); + actix::spawn(async move { nodes.bust_by_id(uuid).await }); + } else { + warn!("Not caching node {}, parse error", payload); + } + } }