From ef13e93140ba80db54e228af8f397b31a493abc8 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 20 Mar 2020 10:22:56 -0500 Subject: [PATCH] Listen isn't always reliable, rehydrate every 10 minutes --- src/main.rs | 3 +++ src/rehydrate.rs | 24 ++++++++++++++++++++++++ src/state.rs | 27 ++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 src/rehydrate.rs diff --git a/src/main.rs b/src/main.rs index 6932d30..74e0b49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod error; mod inbox; mod nodeinfo; mod notify; +mod rehydrate; mod requests; mod responses; mod state; @@ -80,6 +81,8 @@ async fn main() -> Result<(), anyhow::Error> { let state = State::hydrate(config.clone(), &db).await?; + rehydrate::spawn(db.clone(), state.clone()); + let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); let bind_address = config.bind_address(); diff --git a/src/rehydrate.rs b/src/rehydrate.rs new file mode 100644 index 0000000..28dd2a6 --- /dev/null +++ b/src/rehydrate.rs @@ -0,0 +1,24 @@ +use crate::{db::Db, state::State}; +use actix::{ + clock::{interval_at, Duration, Instant}, + Arbiter, +}; +use log::error; + +pub fn spawn(db: Db, state: State) { + Arbiter::spawn(async move { + let start = Instant::now(); + let duration = Duration::from_secs(60 * 10); + + let mut interval = interval_at(start, duration); + + loop { + interval.tick().await; + + match state.rehydrate(&db).await { + Err(e) => error!("Error rehydrating, {}", e), + _ => (), + } + } + }); +} diff --git a/src/state.rs b/src/state.rs index 7f985e0..38ffdd5 100644 --- a/src/state.rs +++ b/src/state.rs @@ -7,7 +7,7 @@ use crate::{ }; use activitystreams::primitives::XsdAnyUri; use actix_web::web; -use futures::try_join; +use futures::{join, try_join}; use log::info; use lru::LruCache; use rand::thread_rng; @@ -135,6 +135,31 @@ impl State { write_guard.insert(listener); } + pub async fn rehydrate(&self, db: &Db) -> Result<(), MyError> { + let f1 = db.hydrate_blocks(); + let f2 = db.hydrate_whitelists(); + let f3 = db.hydrate_listeners(); + + let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?; + + join!( + async move { + let mut write_guard = self.listeners.write().await; + *write_guard = listeners; + }, + async move { + let mut write_guard = self.whitelists.write().await; + *write_guard = whitelists; + }, + async move { + let mut write_guard = self.blocks.write().await; + *write_guard = blocks; + } + ); + + Ok(()) + } + pub async fn hydrate(config: Config, db: &Db) -> Result { let f1 = db.hydrate_blocks(); let f2 = db.hydrate_whitelists();