Keep track of when servers were last seen

This commit is contained in:
asonix 2022-12-21 16:51:17 -06:00
parent b49eeaf822
commit 88b0383084
15 changed files with 228 additions and 34 deletions

1
Cargo.lock generated
View file

@ -340,6 +340,7 @@ dependencies = [
"sled", "sled",
"teloxide", "teloxide",
"thiserror", "thiserror",
"time",
"tokio", "tokio",
"toml", "toml",
"tracing", "tracing",

View file

@ -66,6 +66,7 @@ teloxide = { version = "0.11.1", default-features = false, features = [
"rustls", "rustls",
] } ] }
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1" tracing = "0.1"
tracing-awc = "0.1.6" tracing-awc = "0.1.6"
tracing-error = "0.2" tracing-error = "0.2"

View file

@ -1,4 +1,6 @@
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub mod client; pub mod client;
pub mod routes; pub mod routes;
@ -22,3 +24,9 @@ pub(crate) struct BlockedDomains {
pub(crate) struct ConnectedActors { pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>, pub(crate) connected_actors: Vec<IriString>,
} }
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct LastSeen {
pub(crate) last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>>,
pub(crate) never: Vec<String>,
}

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::Snapshot, collector::Snapshot,
config::{AdminUrlKind, Config}, config::{AdminUrlKind, Config},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
@ -55,6 +55,10 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result<Snapshot,
get_results(client, config, AdminUrlKind::Stats).await get_results(client, config, AdminUrlKind::Stats).await
} }
pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result<LastSeen, Error> {
get_results(client, config, AdminUrlKind::LastSeen).await
}
async fn get_results<T: DeserializeOwned>( async fn get_results<T: DeserializeOwned>(
client: &Client, client: &Client,
config: &Config, config: &Config,

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::{MemoryCollector, Snapshot}, collector::{MemoryCollector, Snapshot},
error::Error, error::Error,
extractors::Admin, extractors::Admin,
@ -8,6 +8,8 @@ use actix_web::{
web::{Data, Json}, web::{Data, Json},
HttpResponse, HttpResponse,
}; };
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub(crate) async fn allow( pub(crate) async fn allow(
admin: Admin, admin: Admin,
@ -69,3 +71,20 @@ pub(crate) async fn stats(
) -> Result<Json<Snapshot>, Error> { ) -> Result<Json<Snapshot>, Error> {
Ok(Json(collector.snapshot())) Ok(Json(collector.snapshot()))
} }
pub(crate) async fn last_seen(admin: Admin) -> Result<Json<LastSeen>, Error> {
let nodes = admin.db_ref().last_seen().await?;
let mut last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>> = BTreeMap::new();
let mut never = Vec::new();
for (domain, datetime) in nodes {
if let Some(datetime) = datetime {
last_seen.entry(datetime).or_default().insert(domain);
} else {
never.push(domain);
}
}
Ok(Json(LastSeen { last_seen, never }))
}

View file

@ -17,11 +17,22 @@ pub(crate) struct Args {
#[arg(short, long, help = "Get statistics from the server")] #[arg(short, long, help = "Get statistics from the server")]
stats: bool, stats: bool,
#[arg(
short,
long,
help = "List domains by when they were last succesfully contacted"
)]
contacted: bool,
} }
impl Args { impl Args {
pub(crate) fn any(&self) -> bool { pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats !self.blocks.is_empty()
|| !self.allowed.is_empty()
|| self.list
|| self.stats
|| self.contacted
} }
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
@ -47,4 +58,8 @@ impl Args {
pub(crate) fn stats(&self) -> bool { pub(crate) fn stats(&self) -> bool {
self.stats self.stats
} }
pub(crate) fn contacted(&self) -> bool {
self.contacted
}
} }

View file

@ -92,6 +92,7 @@ pub enum AdminUrlKind {
Blocked, Blocked,
Connected, Connected,
Stats, Stats,
LastSeen,
} }
impl std::fmt::Debug for Config { impl std::fmt::Debug for Config {
@ -429,33 +430,22 @@ impl Config {
} }
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> { fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let iri = match kind { let path = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Allow => "api/v1/admin/allow",
.resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref()) AdminUrlKind::Disallow => "api/v1/admin/disallow",
.try_to_dedicated_string()?, AdminUrlKind::Block => "api/v1/admin/block",
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Unblock => "api/v1/admin/unblock",
.resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref()) AdminUrlKind::Allowed => "api/v1/admin/allowed",
.try_to_dedicated_string()?, AdminUrlKind::Blocked => "api/v1/admin/blocked",
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Connected => "api/v1/admin/connected",
.resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref()) AdminUrlKind::Stats => "api/v1/admin/stats",
.try_to_dedicated_string()?, AdminUrlKind::LastSeen => "api/v1/admin/last_seen",
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())
.try_to_dedicated_string()?,
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())
.try_to_dedicated_string()?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())
.try_to_dedicated_string()?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())
.try_to_dedicated_string()?,
AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())
.try_to_dedicated_string()?,
}; };
let iri = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(path)?.as_ref())
.try_to_dedicated_string()?;
Ok(iri) Ok(iri)
} }
} }

View file

@ -1,9 +1,11 @@
mod actor; mod actor;
mod last_online;
mod media; mod media;
mod node; mod node;
mod state; mod state;
pub(crate) use actor::ActorCache; pub(crate) use actor::ActorCache;
pub(crate) use last_online::LastOnline;
pub(crate) use media::MediaCache; pub(crate) use media::MediaCache;
pub(crate) use node::{Node, NodeCache}; pub(crate) use node::{Node, NodeCache};
pub(crate) use state::State; pub(crate) use state::State;

28
src/data/last_online.rs Normal file
View file

@ -0,0 +1,28 @@
use activitystreams::iri_string::types::IriStr;
use std::{collections::HashMap, sync::Mutex};
use time::OffsetDateTime;
pub(crate) struct LastOnline {
domains: Mutex<HashMap<String, OffsetDateTime>>,
}
impl LastOnline {
pub(crate) fn mark_seen(&self, iri: &IriStr) {
if let Some(authority) = iri.authority_str() {
self.domains
.lock()
.unwrap()
.insert(authority.to_string(), OffsetDateTime::now_utc());
}
}
pub(crate) fn take(&self) -> HashMap<String, OffsetDateTime> {
std::mem::take(&mut *self.domains.lock().unwrap())
}
pub(crate) fn empty() -> Self {
Self {
domains: Mutex::new(HashMap::default()),
}
}
}

View file

@ -12,6 +12,8 @@ use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey}; use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use super::LastOnline;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub(crate) public_key: RsaPublicKey, pub(crate) public_key: RsaPublicKey,
@ -19,6 +21,7 @@ pub struct State {
object_cache: Arc<RwLock<LruCache<IriString, IriString>>>, object_cache: Arc<RwLock<LruCache<IriString, IriString>>>,
node_cache: NodeCache, node_cache: NodeCache,
breakers: Breakers, breakers: Breakers,
pub(crate) last_online: Arc<LastOnline>,
pub(crate) db: Db, pub(crate) db: Db,
} }
@ -43,6 +46,7 @@ impl State {
self.private_key.clone(), self.private_key.clone(),
config.user_agent(), config.user_agent(),
self.breakers.clone(), self.breakers.clone(),
self.last_online.clone(),
) )
} }
@ -114,6 +118,7 @@ impl State {
node_cache: NodeCache::new(db.clone()), node_cache: NodeCache::new(db.clone()),
breakers: Breakers::default(), breakers: Breakers::default(),
db, db,
last_online: Arc::new(LastOnline::empty()),
}; };
Ok(state) Ok(state)

View file

@ -7,8 +7,13 @@ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey}, pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey, RsaPrivateKey,
}; };
use sled::Tree; use sled::{Batch, Tree};
use std::{collections::HashMap, sync::Arc, time::SystemTime}; use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::SystemTime,
};
use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -28,6 +33,7 @@ struct Inner {
actor_id_info: Tree, actor_id_info: Tree,
actor_id_instance: Tree, actor_id_instance: Tree,
actor_id_contact: Tree, actor_id_contact: Tree,
last_seen: Tree,
restricted_mode: bool, restricted_mode: bool,
} }
@ -247,6 +253,7 @@ impl Db {
actor_id_info: db.open_tree("actor-id-info")?, actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?, actor_id_contact: db.open_tree("actor-id-contact")?,
last_seen: db.open_tree("last-seen")?,
restricted_mode, restricted_mode,
}), }),
}) })
@ -254,7 +261,7 @@ impl Db {
async fn unblock<T>( async fn unblock<T>(
&self, &self,
f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static, f: impl FnOnce(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, Error> ) -> Result<T, Error>
where where
T: Send + 'static, T: Send + 'static,
@ -266,6 +273,48 @@ impl Db {
Ok(t) Ok(t)
} }
pub(crate) async fn mark_last_seen(
&self,
nodes: HashMap<String, OffsetDateTime>,
) -> Result<(), Error> {
let mut batch = Batch::default();
for (domain, datetime) in nodes {
let datetime_string = serde_json::to_vec(&datetime)?;
batch.insert(domain.as_bytes(), datetime_string);
}
self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from))
.await
}
pub(crate) async fn last_seen(
&self,
) -> Result<BTreeMap<String, Option<OffsetDateTime>>, Error> {
self.unblock(|inner| {
let mut map = BTreeMap::new();
for iri in inner.connected() {
let Some(authority_str) = iri.authority_str() else {
continue;
};
if let Some(datetime) = inner.last_seen.get(authority_str)? {
map.insert(
authority_str.to_string(),
Some(serde_json::from_slice(&datetime)?),
);
} else {
map.insert(authority_str.to_string(), None);
}
}
Ok(map)
})
.await
}
pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> { pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await self.unblock(|inner| Ok(inner.connected().collect())).await
} }

View file

@ -5,6 +5,7 @@ mod deliver_many;
mod instance; mod instance;
mod nodeinfo; mod nodeinfo;
mod process_listeners; mod process_listeners;
mod record_last_online;
pub(crate) use self::{ pub(crate) use self::{
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
@ -15,7 +16,7 @@ use crate::{
config::Config, config::Config,
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::process_listeners::Listeners, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::Requests, requests::Requests,
}; };
use background_jobs::{ use background_jobs::{
@ -62,6 +63,7 @@ pub(crate) fn create_workers(
.register::<QueryInstance>() .register::<QueryInstance>()
.register::<Listeners>() .register::<Listeners>()
.register::<QueryContact>() .register::<QueryContact>()
.register::<RecordLastOnline>()
.register::<apub::Announce>() .register::<apub::Announce>()
.register::<apub::Follow>() .register::<apub::Follow>()
.register::<apub::Forward>() .register::<apub::Forward>()
@ -73,6 +75,7 @@ pub(crate) fn create_workers(
.start_with_threads(parallelism); .start_with_threads(parallelism);
shared.every(Duration::from_secs(60 * 5), Listeners); shared.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone()); let job_server = JobServer::new(shared.queue_handle().clone());

View file

@ -0,0 +1,28 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
impl RecordLastOnline {
#[tracing::instrument(skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
let nodes = state.state.last_online.take();
state.state.db.mark_last_seen(nodes).await
}
}
impl ActixJob for RecordLastOnline {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View file

@ -144,6 +144,39 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
println!("Updated lists"); println!("Updated lists");
} }
if args.contacted() {
let last_seen = admin::client::last_seen(&client, &config).await?;
let mut report = String::from("Contacted:");
if !last_seen.never.is_empty() {
report += "\nNever seen:\n";
}
for domain in last_seen.never {
report += "\t";
report += &domain;
report += "\n";
}
if !last_seen.last_seen.is_empty() {
report += "\nSeen:\n";
}
for (datetime, domains) in last_seen.last_seen {
for domain in domains {
report += "\t";
report += &datetime.to_string();
report += " - ";
report += &domain;
report += "\n";
}
}
report += "\n";
println!("{report}");
}
if args.list() { if args.list() {
let (blocked, allowed, connected) = tokio::try_join!( let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config), admin::client::blocked(&client, &config),
@ -258,7 +291,8 @@ async fn do_server_main(
.route("/allowed", web::get().to(admin::routes::allowed)) .route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked)) .route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected)) .route("/connected", web::get().to(admin::routes::connected))
.route("/stats", web::get().to(admin::routes::stats)), .route("/stats", web::get().to(admin::routes::stats))
.route("/last_seen", web::get().to(admin::routes::last_seen)),
), ),
) )
}); });

View file

@ -1,4 +1,7 @@
use crate::error::{Error, ErrorKind}; use crate::{
data::LastOnline,
error::{Error, ErrorKind},
};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date; use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse}; use awc::{error::SendRequestError, Client, ClientResponse};
@ -146,6 +149,7 @@ pub(crate) struct Requests {
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
config: Config, config: Config,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
} }
impl std::fmt::Debug for Requests { impl std::fmt::Debug for Requests {
@ -174,6 +178,7 @@ impl Requests {
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
user_agent: String, user_agent: String,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
) -> Self { ) -> Self {
Requests { Requests {
client: Rc::new(RefCell::new(build_client(&user_agent))), client: Rc::new(RefCell::new(build_client(&user_agent))),
@ -184,6 +189,7 @@ impl Requests {
private_key, private_key,
config: Config::default().mastodon_compat(), config: Config::default().mastodon_compat(),
breakers, breakers,
last_online,
} }
} }
@ -233,6 +239,7 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
} }
self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url); self.breakers.succeed(&parsed_url);
Ok(res) Ok(res)