From 88b03830846c521c0a40efb1d2e5ab9dd5e14f40 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 21 Dec 2022 16:51:17 -0600 Subject: [PATCH] Keep track of when servers were last seen --- Cargo.lock | 1 + Cargo.toml | 1 + src/admin.rs | 8 +++++ src/admin/client.rs | 6 +++- src/admin/routes.rs | 21 ++++++++++++- src/args.rs | 17 ++++++++++- src/config.rs | 40 ++++++++++--------------- src/data.rs | 2 ++ src/data/last_online.rs | 28 +++++++++++++++++ src/data/state.rs | 5 ++++ src/db.rs | 55 ++++++++++++++++++++++++++++++++-- src/jobs.rs | 5 +++- src/jobs/record_last_online.rs | 28 +++++++++++++++++ src/main.rs | 36 +++++++++++++++++++++- src/requests.rs | 9 +++++- 15 files changed, 228 insertions(+), 34 deletions(-) create mode 100644 src/data/last_online.rs create mode 100644 src/jobs/record_last_online.rs diff --git a/Cargo.lock b/Cargo.lock index ed762ec..8aa2297 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,7 @@ dependencies = [ "sled", "teloxide", "thiserror", + "time", "tokio", "toml", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 73346c9..4422028 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ teloxide = { version = "0.11.1", default-features = false, features = [ "rustls", ] } thiserror = "1.0" +time = { version = "0.3.17", features = ["serde"] } tracing = "0.1" tracing-awc = "0.1.6" tracing-error = "0.2" diff --git a/src/admin.rs b/src/admin.rs index e7fc665..156a5d9 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,6 @@ use activitystreams::iri_string::types::IriString; +use std::collections::{BTreeMap, BTreeSet}; +use time::OffsetDateTime; pub mod client; pub mod routes; @@ -22,3 +24,9 @@ pub(crate) struct BlockedDomains { pub(crate) struct ConnectedActors { pub(crate) connected_actors: Vec, } + +#[derive(serde::Deserialize, serde::Serialize)] +pub(crate) struct LastSeen { + pub(crate) last_seen: BTreeMap>, + pub(crate) never: Vec, +} diff --git a/src/admin/client.rs b/src/admin/client.rs index 3602487..fdb1687 100644 --- a/src/admin/client.rs +++ b/src/admin/client.rs @@ -1,5 +1,5 @@ use crate::{ - admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen}, collector::Snapshot, config::{AdminUrlKind, Config}, error::{Error, ErrorKind}, @@ -55,6 +55,10 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result Result { + get_results(client, config, AdminUrlKind::LastSeen).await +} + async fn get_results( client: &Client, config: &Config, diff --git a/src/admin/routes.rs b/src/admin/routes.rs index 6578bfd..c13a6e3 100644 --- a/src/admin/routes.rs +++ b/src/admin/routes.rs @@ -1,5 +1,5 @@ use crate::{ - admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen}, collector::{MemoryCollector, Snapshot}, error::Error, extractors::Admin, @@ -8,6 +8,8 @@ use actix_web::{ web::{Data, Json}, HttpResponse, }; +use std::collections::{BTreeMap, BTreeSet}; +use time::OffsetDateTime; pub(crate) async fn allow( admin: Admin, @@ -69,3 +71,20 @@ pub(crate) async fn stats( ) -> Result, Error> { Ok(Json(collector.snapshot())) } + +pub(crate) async fn last_seen(admin: Admin) -> Result, Error> { + let nodes = admin.db_ref().last_seen().await?; + + let mut last_seen: BTreeMap> = BTreeMap::new(); + let mut never = Vec::new(); + + for (domain, datetime) in nodes { + if let Some(datetime) = datetime { + last_seen.entry(datetime).or_default().insert(domain); + } else { + never.push(domain); + } + } + + Ok(Json(LastSeen { last_seen, never })) +} diff --git a/src/args.rs b/src/args.rs index 18a4059..155b296 100644 --- a/src/args.rs +++ b/src/args.rs @@ -17,11 +17,22 @@ pub(crate) struct Args { #[arg(short, long, help = "Get statistics from the server")] stats: bool, + + #[arg( + short, + long, + help = "List domains by when they were last succesfully contacted" + )] + contacted: bool, } impl Args { pub(crate) fn any(&self) -> bool { - !self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats + !self.blocks.is_empty() + || !self.allowed.is_empty() + || self.list + || self.stats + || self.contacted } pub(crate) fn new() -> Self { @@ -47,4 +58,8 @@ impl Args { pub(crate) fn stats(&self) -> bool { self.stats } + + pub(crate) fn contacted(&self) -> bool { + self.contacted + } } diff --git a/src/config.rs b/src/config.rs index 3a46833..bd70162 100644 --- a/src/config.rs +++ b/src/config.rs @@ -92,6 +92,7 @@ pub enum AdminUrlKind { Blocked, Connected, Stats, + LastSeen, } impl std::fmt::Debug for Config { @@ -429,33 +430,22 @@ impl Config { } fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result { - let iri = match kind { - AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref()) - .try_to_dedicated_string()?, - AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref()) - .resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref()) - .try_to_dedicated_string()?, + let path = match kind { + AdminUrlKind::Allow => "api/v1/admin/allow", + AdminUrlKind::Disallow => "api/v1/admin/disallow", + AdminUrlKind::Block => "api/v1/admin/block", + AdminUrlKind::Unblock => "api/v1/admin/unblock", + AdminUrlKind::Allowed => "api/v1/admin/allowed", + AdminUrlKind::Blocked => "api/v1/admin/blocked", + AdminUrlKind::Connected => "api/v1/admin/connected", + AdminUrlKind::Stats => "api/v1/admin/stats", + AdminUrlKind::LastSeen => "api/v1/admin/last_seen", }; + let iri = FixedBaseResolver::new(self.base_uri.as_ref()) + .resolve(IriRelativeStr::new(path)?.as_ref()) + .try_to_dedicated_string()?; + Ok(iri) } } diff --git a/src/data.rs b/src/data.rs index 918bdf7..1125149 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,9 +1,11 @@ mod actor; +mod last_online; mod media; mod node; mod state; pub(crate) use actor::ActorCache; +pub(crate) use last_online::LastOnline; pub(crate) use media::MediaCache; pub(crate) use node::{Node, NodeCache}; pub(crate) use state::State; diff --git a/src/data/last_online.rs b/src/data/last_online.rs new file mode 100644 index 0000000..889d804 --- /dev/null +++ b/src/data/last_online.rs @@ -0,0 +1,28 @@ +use activitystreams::iri_string::types::IriStr; +use std::{collections::HashMap, sync::Mutex}; +use time::OffsetDateTime; + +pub(crate) struct LastOnline { + domains: Mutex>, +} + +impl LastOnline { + pub(crate) fn mark_seen(&self, iri: &IriStr) { + if let Some(authority) = iri.authority_str() { + self.domains + .lock() + .unwrap() + .insert(authority.to_string(), OffsetDateTime::now_utc()); + } + } + + pub(crate) fn take(&self) -> HashMap { + std::mem::take(&mut *self.domains.lock().unwrap()) + } + + pub(crate) fn empty() -> Self { + Self { + domains: Mutex::new(HashMap::default()), + } + } +} diff --git a/src/data/state.rs b/src/data/state.rs index 31f0811..4387974 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -12,6 +12,8 @@ use rand::thread_rng; use rsa::{RsaPrivateKey, RsaPublicKey}; use std::sync::{Arc, RwLock}; +use super::LastOnline; + #[derive(Clone)] pub struct State { pub(crate) public_key: RsaPublicKey, @@ -19,6 +21,7 @@ pub struct State { object_cache: Arc>>, node_cache: NodeCache, breakers: Breakers, + pub(crate) last_online: Arc, pub(crate) db: Db, } @@ -43,6 +46,7 @@ impl State { self.private_key.clone(), config.user_agent(), self.breakers.clone(), + self.last_online.clone(), ) } @@ -114,6 +118,7 @@ impl State { node_cache: NodeCache::new(db.clone()), breakers: Breakers::default(), db, + last_online: Arc::new(LastOnline::empty()), }; Ok(state) diff --git a/src/db.rs b/src/db.rs index 685405f..6d3064f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -7,8 +7,13 @@ use rsa::{ pkcs8::{DecodePrivateKey, EncodePrivateKey}, RsaPrivateKey, }; -use sled::Tree; -use std::{collections::HashMap, sync::Arc, time::SystemTime}; +use sled::{Batch, Tree}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::SystemTime, +}; +use time::OffsetDateTime; use uuid::Uuid; #[derive(Clone, Debug)] @@ -28,6 +33,7 @@ struct Inner { actor_id_info: Tree, actor_id_instance: Tree, actor_id_contact: Tree, + last_seen: Tree, restricted_mode: bool, } @@ -247,6 +253,7 @@ impl Db { actor_id_info: db.open_tree("actor-id-info")?, actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_contact: db.open_tree("actor-id-contact")?, + last_seen: db.open_tree("last-seen")?, restricted_mode, }), }) @@ -254,7 +261,7 @@ impl Db { async fn unblock( &self, - f: impl Fn(&Inner) -> Result + Send + 'static, + f: impl FnOnce(&Inner) -> Result + Send + 'static, ) -> Result where T: Send + 'static, @@ -266,6 +273,48 @@ impl Db { Ok(t) } + pub(crate) async fn mark_last_seen( + &self, + nodes: HashMap, + ) -> Result<(), Error> { + let mut batch = Batch::default(); + + for (domain, datetime) in nodes { + let datetime_string = serde_json::to_vec(&datetime)?; + + batch.insert(domain.as_bytes(), datetime_string); + } + + self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from)) + .await + } + + pub(crate) async fn last_seen( + &self, + ) -> Result>, Error> { + self.unblock(|inner| { + let mut map = BTreeMap::new(); + + for iri in inner.connected() { + let Some(authority_str) = iri.authority_str() else { + continue; + }; + + if let Some(datetime) = inner.last_seen.get(authority_str)? { + map.insert( + authority_str.to_string(), + Some(serde_json::from_slice(&datetime)?), + ); + } else { + map.insert(authority_str.to_string(), None); + } + } + + Ok(map) + }) + .await + } + pub(crate) async fn connected_ids(&self) -> Result, Error> { self.unblock(|inner| Ok(inner.connected().collect())).await } diff --git a/src/jobs.rs b/src/jobs.rs index 014bd72..0bad1ac 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -5,6 +5,7 @@ mod deliver_many; mod instance; mod nodeinfo; mod process_listeners; +mod record_last_online; pub(crate) use self::{ contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, @@ -15,7 +16,7 @@ use crate::{ config::Config, data::{ActorCache, MediaCache, NodeCache, State}, error::{Error, ErrorKind}, - jobs::process_listeners::Listeners, + jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, requests::Requests, }; use background_jobs::{ @@ -62,6 +63,7 @@ pub(crate) fn create_workers( .register::() .register::() .register::() + .register::() .register::() .register::() .register::() @@ -73,6 +75,7 @@ pub(crate) fn create_workers( .start_with_threads(parallelism); shared.every(Duration::from_secs(60 * 5), Listeners); + shared.every(Duration::from_secs(60 * 10), RecordLastOnline); let job_server = JobServer::new(shared.queue_handle().clone()); diff --git a/src/jobs/record_last_online.rs b/src/jobs/record_last_online.rs new file mode 100644 index 0000000..3c81b31 --- /dev/null +++ b/src/jobs/record_last_online.rs @@ -0,0 +1,28 @@ +use crate::{error::Error, jobs::JobState}; +use background_jobs::{ActixJob, Backoff}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub(crate) struct RecordLastOnline; + +impl RecordLastOnline { + #[tracing::instrument(skip(state))] + async fn perform(self, state: JobState) -> Result<(), Error> { + let nodes = state.state.last_online.take(); + + state.state.db.mark_last_seen(nodes).await + } +} + +impl ActixJob for RecordLastOnline { + type State = JobState; + type Future = Pin>>>; + + const NAME: &'static str = "relay::jobs::RecordLastOnline"; + const QUEUE: &'static str = "maintenance"; + const BACKOFF: Backoff = Backoff::Linear(1); + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(async move { self.perform(state).await.map_err(Into::into) }) + } +} diff --git a/src/main.rs b/src/main.rs index 5605a48..dccd8bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,6 +144,39 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> println!("Updated lists"); } + if args.contacted() { + let last_seen = admin::client::last_seen(&client, &config).await?; + + let mut report = String::from("Contacted:"); + + if !last_seen.never.is_empty() { + report += "\nNever seen:\n"; + } + + for domain in last_seen.never { + report += "\t"; + report += &domain; + report += "\n"; + } + + if !last_seen.last_seen.is_empty() { + report += "\nSeen:\n"; + } + + for (datetime, domains) in last_seen.last_seen { + for domain in domains { + report += "\t"; + report += &datetime.to_string(); + report += " - "; + report += &domain; + report += "\n"; + } + } + + report += "\n"; + println!("{report}"); + } + if args.list() { let (blocked, allowed, connected) = tokio::try_join!( admin::client::blocked(&client, &config), @@ -258,7 +291,8 @@ async fn do_server_main( .route("/allowed", web::get().to(admin::routes::allowed)) .route("/blocked", web::get().to(admin::routes::blocked)) .route("/connected", web::get().to(admin::routes::connected)) - .route("/stats", web::get().to(admin::routes::stats)), + .route("/stats", web::get().to(admin::routes::stats)) + .route("/last_seen", web::get().to(admin::routes::last_seen)), ), ) }); diff --git a/src/requests.rs b/src/requests.rs index 332024a..9a1bc50 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,4 +1,7 @@ -use crate::error::{Error, ErrorKind}; +use crate::{ + data::LastOnline, + error::{Error, ErrorKind}, +}; use activitystreams::iri_string::types::IriString; use actix_web::http::header::Date; use awc::{error::SendRequestError, Client, ClientResponse}; @@ -146,6 +149,7 @@ pub(crate) struct Requests { private_key: RsaPrivateKey, config: Config, breakers: Breakers, + last_online: Arc, } impl std::fmt::Debug for Requests { @@ -174,6 +178,7 @@ impl Requests { private_key: RsaPrivateKey, user_agent: String, breakers: Breakers, + last_online: Arc, ) -> Self { Requests { client: Rc::new(RefCell::new(build_client(&user_agent))), @@ -184,6 +189,7 @@ impl Requests { private_key, config: Config::default().mastodon_compat(), breakers, + last_online, } } @@ -233,6 +239,7 @@ impl Requests { return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); } + self.last_online.mark_seen(&parsed_url); self.breakers.succeed(&parsed_url); Ok(res)