diff --git a/Cargo.lock b/Cargo.lock index f7e33ec..694caf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,12 +299,14 @@ dependencies = [ "dotenv", "futures-util", "http-signature-normalization-actix", + "indexmap", "lru", "metrics", "metrics-util", "mime", "opentelemetry", "opentelemetry-otlp", + "quanta", "rand", "rsa", "rsa-magic-public-key", diff --git a/Cargo.toml b/Cargo.toml index 334600f..05753fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,12 +37,14 @@ console-subscriber = { version = "0.1", optional = true } dashmap = "5.1.0" dotenv = "0.15.0" futures-util = "0.3.17" +indexmap = "1.9.2" lru = "0.8.0" metrics = "0.20.1" metrics-util = "0.14.0" mime = "0.3.16" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" +quanta = "0.10.1" rand = "0.8" rsa = "0.7" rsa-magic-public-key = "0.6.0" diff --git a/src/admin/routes.rs b/src/admin/routes.rs index c33efca..6578bfd 100644 --- a/src/admin/routes.rs +++ b/src/admin/routes.rs @@ -1,9 +1,13 @@ use crate::{ admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + collector::{MemoryCollector, Snapshot}, error::Error, extractors::Admin, }; -use actix_web::{web::Json, HttpResponse}; +use actix_web::{ + web::{Data, Json}, + HttpResponse, +}; pub(crate) async fn allow( admin: Admin, @@ -58,3 +62,10 @@ pub(crate) async fn connected(admin: Admin) -> Result, Err Ok(Json(ConnectedActors { connected_actors })) } + +pub(crate) async fn stats( + _admin: Admin, + collector: Data, +) -> Result, Error> { + Ok(Json(collector.snapshot())) +} diff --git a/src/collector.rs b/src/collector.rs new file mode 100644 index 0000000..401307b --- /dev/null +++ b/src/collector.rs @@ -0,0 +1,369 @@ +use dashmap::DashMap; +use indexmap::IndexMap; +use metrics::{Key, Recorder, SetRecorderError}; +use metrics_util::{ + registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, + MetricKindMask, Summary, +}; +use quanta::Clock; +use std::{ + collections::HashMap, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; + +const SECONDS: u64 = 1; +const MINUTES: u64 = 60 * SECONDS; +const HOURS: u64 = 60 * MINUTES; +const DAYS: u64 = 24 * HOURS; + +#[derive(Clone)] +pub struct MemoryCollector { + inner: Arc, +} + +struct Inner { + descriptions: DashMap, + distributions: DashMap, Summary>>, + recency: Recency, + registry: Registry>, +} + +#[derive(serde::Serialize)] +struct Counter { + labels: Vec<(String, String)>, + value: u64, +} + +#[derive(serde::Serialize)] +struct Gauge { + labels: Vec<(String, String)>, + value: f64, +} + +#[derive(serde::Serialize)] +struct Histogram { + labels: Vec<(String, String)>, + value: Vec<(f64, Option)>, +} + +#[derive(serde::Serialize)] +pub(crate) struct Snapshot { + counters: HashMap>, + gauges: HashMap>, + histograms: HashMap>, +} + +fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { + let labels = key + .labels() + .into_iter() + .map(|label| (label.key().to_string(), label.value().to_string())) + .collect(); + let name = key.name().to_string(); + (name, labels) +} + +impl Inner { + fn snapshot_counters(&self) -> HashMap> { + let mut counters = HashMap::new(); + + for (key, counter) in self.registry.get_counter_handles() { + let gen = counter.get_generation(); + if !self.recency.should_store_counter(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = counter.get_inner().load(Ordering::Acquire); + counters + .entry(name) + .or_insert_with(Vec::new) + .push(Counter { labels, value }); + } + + counters + } + + fn snapshot_gauges(&self) -> HashMap> { + let mut gauges = HashMap::new(); + + for (key, gauge) in self.registry.get_gauge_handles() { + let gen = gauge.get_generation(); + if !self.recency.should_store_gauge(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); + gauges + .entry(name) + .or_insert_with(Vec::new) + .push(Gauge { labels, value }) + } + + gauges + } + + fn snapshot_histograms(&self) -> HashMap> { + for (key, histogram) in self.registry.get_histogram_handles() { + let gen = histogram.get_generation(); + let (name, labels) = key_to_parts(&key); + + if !self + .recency + .should_store_histogram(&key, gen, &self.registry) + { + let delete_by_name = if let Some(mut by_name) = self.distributions.get_mut(&name) { + by_name.remove(&labels); + by_name.is_empty() + } else { + false + }; + + if delete_by_name { + self.descriptions.remove(&name); + } + + continue; + } + + let mut outer_entry = self + .distributions + .entry(name.clone()) + .or_insert_with(IndexMap::new); + + let entry = outer_entry + .entry(labels) + .or_insert_with(Summary::with_defaults); + + histogram.get_inner().clear_with(|samples| { + for sample in samples { + entry.add(*sample); + } + }) + } + + self.distributions + .iter() + .map(|entry| { + ( + entry.key().clone(), + entry + .value() + .iter() + .map(|(labels, summary)| Histogram { + labels: labels.clone(), + value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] + .into_iter() + .map(|q| (q, summary.quantile(q))) + .collect(), + }) + .collect(), + ) + }) + .collect() + } + + fn snapshot(&self) -> Snapshot { + Snapshot { + counters: self.snapshot_counters(), + gauges: self.snapshot_gauges(), + histograms: self.snapshot_histograms(), + } + } +} + +impl MemoryCollector { + pub(crate) fn new() -> Self { + MemoryCollector { + inner: Arc::new(Inner { + descriptions: Default::default(), + distributions: Default::default(), + recency: Recency::new( + Clock::new(), + MetricKindMask::ALL, + Some(Duration::from_secs(5 * DAYS)), + ), + registry: Registry::new(GenerationalStorage::atomic()), + }), + } + } + + pub(crate) fn install(&self) -> Result<(), SetRecorderError> { + metrics::set_boxed_recorder(Box::new(self.clone())) + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.inner.snapshot() + } + + fn add_description_if_missing( + &self, + key: &metrics::KeyName, + description: metrics::SharedString, + ) { + self.inner + .descriptions + .entry(key.as_str().to_owned()) + .or_insert(description); + } +} + +impl Recorder for MemoryCollector { + fn describe_counter( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_gauge( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_histogram( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + self.inner + .registry + .get_or_create_counter(key, |c| c.clone().into()) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + self.inner + .registry + .get_or_create_gauge(key, |c| c.clone().into()) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + self.inner + .registry + .get_or_create_histogram(key, |c| c.clone().into()) + } +} + +/* +struct Bucket { + begin: Instant, + summary: Summary, +} + +pub(crate) struct RollingSummary { + buckets: Vec, + bucket_duration: Duration, + expire_after: Duration, + count: usize, +} + +impl Default for RollingSummary { + fn default() -> Self { + Self::new( + Duration::from_secs(5 * MINUTES), + Duration::from_secs(1 * DAYS), + ) + } +} + +impl RollingSummary { + fn new(bucket_duration: Duration, expire_after: Duration) -> Self { + Self { + buckets: Vec::new(), + bucket_duration, + expire_after, + count: 0, + } + } + + fn add(&mut self, value: f64, now: Instant) { + self.count += 1; + + // try adding to existing bucket + for bucket in &mut self.buckets { + let end = bucket.begin + self.bucket_duration; + + if now >= end { + break; + } + + if now >= bucket.begin { + bucket.summary.add(value); + return; + } + } + + // if we're adding a new bucket, clean old buckets first + if let Some(cutoff) = now.checked_sub(self.expire_after) { + self.buckets.retain(|b| b.begin > cutoff); + } + + let mut summary = Summary::with_defaults(); + summary.add(value); + + // if there's no buckets, make one and return + if self.buckets.is_empty() { + self.buckets.push(Bucket { + summary, + begin: now, + }); + return; + } + + let mut begin = self.buckets[0].begin; + + // there are buckets, but none can hold our value, see why + if now < self.buckets[0].begin { + // create an old bucket + + while now < begin { + begin -= self.bucket_duration; + } + + self.buckets.push(Bucket { begin, summary }); + self.buckets.sort_unstable_by(|a, b| b.begin.cmp(&a.begin)); + } else { + // create a new bucket + let mut end = self.buckets[0].begin + self.bucket_duration; + + while now >= end { + begin += self.bucket_duration; + end += self.bucket_duration; + } + + self.buckets.insert(0, Bucket { begin, summary }); + } + } + + fn snapshot(&self, now: Instant) -> Summary { + let cutoff = now.checked_sub(self.expire_after); + let mut acc = Summary::with_defaults(); + + let summaries = self + .buckets + .iter() + .filter(|b| cutoff.map(|c| b.begin > c).unwrap_or(true)) + .map(|b| &b.summary); + + for item in summaries { + acc.merge(item) + .expect("All summaries are created with default settings"); + } + + acc + } +} +*/ diff --git a/src/main.rs b/src/main.rs index 1d80a0d..9aebdb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::Subscribe mod admin; mod apub; mod args; +mod collector; mod config; mod data; mod db; @@ -98,6 +99,8 @@ async fn main() -> Result<(), anyhow::Error> { let config = Config::build()?; init_subscriber(Config::software_name(), config.opentelemetry_url())?; + let collector = collector::MemoryCollector::new(); + collector.install()?; let args = Args::new(); @@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(job_server.clone())) - .app_data(web::Data::new(media.clone())); + .app_data(web::Data::new(media.clone())) + .app_data(web::Data::new(collector.clone())); let app = if let Some(data) = config.admin_config() { app.app_data(data) @@ -203,7 +207,8 @@ async fn main() -> Result<(), anyhow::Error> { .route("/unblock", web::post().to(admin::routes::unblock)) .route("/allowed", web::get().to(admin::routes::allowed)) .route("/blocked", web::get().to(admin::routes::blocked)) - .route("/connected", web::get().to(admin::routes::connected)), + .route("/connected", web::get().to(admin::routes::connected)) + .route("/stats", web::get().to(admin::routes::stats)), ), ) })