mirror of
https://git.asonix.dog/asonix/relay.git
synced 2024-11-25 19:11:08 +00:00
Add metrics collector, admin route
This commit is contained in:
parent
c8b81bb9aa
commit
c322798ba3
5 changed files with 392 additions and 3 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -299,12 +299,14 @@ dependencies = [
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"http-signature-normalization-actix",
|
"http-signature-normalization-actix",
|
||||||
|
"indexmap",
|
||||||
"lru",
|
"lru",
|
||||||
"metrics",
|
"metrics",
|
||||||
"metrics-util",
|
"metrics-util",
|
||||||
"mime",
|
"mime",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"opentelemetry-otlp",
|
"opentelemetry-otlp",
|
||||||
|
"quanta",
|
||||||
"rand",
|
"rand",
|
||||||
"rsa",
|
"rsa",
|
||||||
"rsa-magic-public-key",
|
"rsa-magic-public-key",
|
||||||
|
|
|
@ -37,12 +37,14 @@ console-subscriber = { version = "0.1", optional = true }
|
||||||
dashmap = "5.1.0"
|
dashmap = "5.1.0"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
futures-util = "0.3.17"
|
futures-util = "0.3.17"
|
||||||
|
indexmap = "1.9.2"
|
||||||
lru = "0.8.0"
|
lru = "0.8.0"
|
||||||
metrics = "0.20.1"
|
metrics = "0.20.1"
|
||||||
metrics-util = "0.14.0"
|
metrics-util = "0.14.0"
|
||||||
mime = "0.3.16"
|
mime = "0.3.16"
|
||||||
opentelemetry = { version = "0.18", features = ["rt-tokio"] }
|
opentelemetry = { version = "0.18", features = ["rt-tokio"] }
|
||||||
opentelemetry-otlp = "0.11"
|
opentelemetry-otlp = "0.11"
|
||||||
|
quanta = "0.10.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
rsa = "0.7"
|
rsa = "0.7"
|
||||||
rsa-magic-public-key = "0.6.0"
|
rsa-magic-public-key = "0.6.0"
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
|
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
|
||||||
|
collector::{MemoryCollector, Snapshot},
|
||||||
error::Error,
|
error::Error,
|
||||||
extractors::Admin,
|
extractors::Admin,
|
||||||
};
|
};
|
||||||
use actix_web::{web::Json, HttpResponse};
|
use actix_web::{
|
||||||
|
web::{Data, Json},
|
||||||
|
HttpResponse,
|
||||||
|
};
|
||||||
|
|
||||||
pub(crate) async fn allow(
|
pub(crate) async fn allow(
|
||||||
admin: Admin,
|
admin: Admin,
|
||||||
|
@ -58,3 +62,10 @@ pub(crate) async fn connected(admin: Admin) -> Result<Json<ConnectedActors>, Err
|
||||||
|
|
||||||
Ok(Json(ConnectedActors { connected_actors }))
|
Ok(Json(ConnectedActors { connected_actors }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn stats(
|
||||||
|
_admin: Admin,
|
||||||
|
collector: Data<MemoryCollector>,
|
||||||
|
) -> Result<Json<Snapshot>, Error> {
|
||||||
|
Ok(Json(collector.snapshot()))
|
||||||
|
}
|
||||||
|
|
369
src/collector.rs
Normal file
369
src/collector.rs
Normal file
|
@ -0,0 +1,369 @@
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use indexmap::IndexMap;
|
||||||
|
use metrics::{Key, Recorder, SetRecorderError};
|
||||||
|
use metrics_util::{
|
||||||
|
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
||||||
|
MetricKindMask, Summary,
|
||||||
|
};
|
||||||
|
use quanta::Clock;
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
sync::{atomic::Ordering, Arc},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
const SECONDS: u64 = 1;
|
||||||
|
const MINUTES: u64 = 60 * SECONDS;
|
||||||
|
const HOURS: u64 = 60 * MINUTES;
|
||||||
|
const DAYS: u64 = 24 * HOURS;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MemoryCollector {
|
||||||
|
inner: Arc<Inner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
descriptions: DashMap<String, metrics::SharedString>,
|
||||||
|
distributions: DashMap<String, IndexMap<Vec<(String, String)>, Summary>>,
|
||||||
|
recency: Recency<Key>,
|
||||||
|
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct Counter {
|
||||||
|
labels: Vec<(String, String)>,
|
||||||
|
value: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct Gauge {
|
||||||
|
labels: Vec<(String, String)>,
|
||||||
|
value: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct Histogram {
|
||||||
|
labels: Vec<(String, String)>,
|
||||||
|
value: Vec<(f64, Option<f64>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
pub(crate) struct Snapshot {
|
||||||
|
counters: HashMap<String, Vec<Counter>>,
|
||||||
|
gauges: HashMap<String, Vec<Gauge>>,
|
||||||
|
histograms: HashMap<String, Vec<Histogram>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
||||||
|
let labels = key
|
||||||
|
.labels()
|
||||||
|
.into_iter()
|
||||||
|
.map(|label| (label.key().to_string(), label.value().to_string()))
|
||||||
|
.collect();
|
||||||
|
let name = key.name().to_string();
|
||||||
|
(name, labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inner {
|
||||||
|
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
|
||||||
|
let mut counters = HashMap::new();
|
||||||
|
|
||||||
|
for (key, counter) in self.registry.get_counter_handles() {
|
||||||
|
let gen = counter.get_generation();
|
||||||
|
if !self.recency.should_store_counter(&key, gen, &self.registry) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
let value = counter.get_inner().load(Ordering::Acquire);
|
||||||
|
counters
|
||||||
|
.entry(name)
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push(Counter { labels, value });
|
||||||
|
}
|
||||||
|
|
||||||
|
counters
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
|
||||||
|
let mut gauges = HashMap::new();
|
||||||
|
|
||||||
|
for (key, gauge) in self.registry.get_gauge_handles() {
|
||||||
|
let gen = gauge.get_generation();
|
||||||
|
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
||||||
|
gauges
|
||||||
|
.entry(name)
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push(Gauge { labels, value })
|
||||||
|
}
|
||||||
|
|
||||||
|
gauges
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
|
||||||
|
for (key, histogram) in self.registry.get_histogram_handles() {
|
||||||
|
let gen = histogram.get_generation();
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
|
||||||
|
if !self
|
||||||
|
.recency
|
||||||
|
.should_store_histogram(&key, gen, &self.registry)
|
||||||
|
{
|
||||||
|
let delete_by_name = if let Some(mut by_name) = self.distributions.get_mut(&name) {
|
||||||
|
by_name.remove(&labels);
|
||||||
|
by_name.is_empty()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
if delete_by_name {
|
||||||
|
self.descriptions.remove(&name);
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut outer_entry = self
|
||||||
|
.distributions
|
||||||
|
.entry(name.clone())
|
||||||
|
.or_insert_with(IndexMap::new);
|
||||||
|
|
||||||
|
let entry = outer_entry
|
||||||
|
.entry(labels)
|
||||||
|
.or_insert_with(Summary::with_defaults);
|
||||||
|
|
||||||
|
histogram.get_inner().clear_with(|samples| {
|
||||||
|
for sample in samples {
|
||||||
|
entry.add(*sample);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
self.distributions
|
||||||
|
.iter()
|
||||||
|
.map(|entry| {
|
||||||
|
(
|
||||||
|
entry.key().clone(),
|
||||||
|
entry
|
||||||
|
.value()
|
||||||
|
.iter()
|
||||||
|
.map(|(labels, summary)| Histogram {
|
||||||
|
labels: labels.clone(),
|
||||||
|
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
||||||
|
.into_iter()
|
||||||
|
.map(|q| (q, summary.quantile(q)))
|
||||||
|
.collect(),
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self) -> Snapshot {
|
||||||
|
Snapshot {
|
||||||
|
counters: self.snapshot_counters(),
|
||||||
|
gauges: self.snapshot_gauges(),
|
||||||
|
histograms: self.snapshot_histograms(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MemoryCollector {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
MemoryCollector {
|
||||||
|
inner: Arc::new(Inner {
|
||||||
|
descriptions: Default::default(),
|
||||||
|
distributions: Default::default(),
|
||||||
|
recency: Recency::new(
|
||||||
|
Clock::new(),
|
||||||
|
MetricKindMask::ALL,
|
||||||
|
Some(Duration::from_secs(5 * DAYS)),
|
||||||
|
),
|
||||||
|
registry: Registry::new(GenerationalStorage::atomic()),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
||||||
|
metrics::set_boxed_recorder(Box::new(self.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn snapshot(&self) -> Snapshot {
|
||||||
|
self.inner.snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_description_if_missing(
|
||||||
|
&self,
|
||||||
|
key: &metrics::KeyName,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.inner
|
||||||
|
.descriptions
|
||||||
|
.entry(key.as_str().to_owned())
|
||||||
|
.or_insert(description);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Recorder for MemoryCollector {
|
||||||
|
fn describe_counter(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_gauge(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_histogram(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_counter(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_gauge(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_histogram(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
struct Bucket {
|
||||||
|
begin: Instant,
|
||||||
|
summary: Summary,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct RollingSummary {
|
||||||
|
buckets: Vec<Bucket>,
|
||||||
|
bucket_duration: Duration,
|
||||||
|
expire_after: Duration,
|
||||||
|
count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RollingSummary {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(
|
||||||
|
Duration::from_secs(5 * MINUTES),
|
||||||
|
Duration::from_secs(1 * DAYS),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RollingSummary {
|
||||||
|
fn new(bucket_duration: Duration, expire_after: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
buckets: Vec::new(),
|
||||||
|
bucket_duration,
|
||||||
|
expire_after,
|
||||||
|
count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, value: f64, now: Instant) {
|
||||||
|
self.count += 1;
|
||||||
|
|
||||||
|
// try adding to existing bucket
|
||||||
|
for bucket in &mut self.buckets {
|
||||||
|
let end = bucket.begin + self.bucket_duration;
|
||||||
|
|
||||||
|
if now >= end {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if now >= bucket.begin {
|
||||||
|
bucket.summary.add(value);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we're adding a new bucket, clean old buckets first
|
||||||
|
if let Some(cutoff) = now.checked_sub(self.expire_after) {
|
||||||
|
self.buckets.retain(|b| b.begin > cutoff);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut summary = Summary::with_defaults();
|
||||||
|
summary.add(value);
|
||||||
|
|
||||||
|
// if there's no buckets, make one and return
|
||||||
|
if self.buckets.is_empty() {
|
||||||
|
self.buckets.push(Bucket {
|
||||||
|
summary,
|
||||||
|
begin: now,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut begin = self.buckets[0].begin;
|
||||||
|
|
||||||
|
// there are buckets, but none can hold our value, see why
|
||||||
|
if now < self.buckets[0].begin {
|
||||||
|
// create an old bucket
|
||||||
|
|
||||||
|
while now < begin {
|
||||||
|
begin -= self.bucket_duration;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.buckets.push(Bucket { begin, summary });
|
||||||
|
self.buckets.sort_unstable_by(|a, b| b.begin.cmp(&a.begin));
|
||||||
|
} else {
|
||||||
|
// create a new bucket
|
||||||
|
let mut end = self.buckets[0].begin + self.bucket_duration;
|
||||||
|
|
||||||
|
while now >= end {
|
||||||
|
begin += self.bucket_duration;
|
||||||
|
end += self.bucket_duration;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.buckets.insert(0, Bucket { begin, summary });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self, now: Instant) -> Summary {
|
||||||
|
let cutoff = now.checked_sub(self.expire_after);
|
||||||
|
let mut acc = Summary::with_defaults();
|
||||||
|
|
||||||
|
let summaries = self
|
||||||
|
.buckets
|
||||||
|
.iter()
|
||||||
|
.filter(|b| cutoff.map(|c| b.begin > c).unwrap_or(true))
|
||||||
|
.map(|b| &b.summary);
|
||||||
|
|
||||||
|
for item in summaries {
|
||||||
|
acc.merge(item)
|
||||||
|
.expect("All summaries are created with default settings");
|
||||||
|
}
|
||||||
|
|
||||||
|
acc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
|
@ -15,6 +15,7 @@ use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::Subscribe
|
||||||
mod admin;
|
mod admin;
|
||||||
mod apub;
|
mod apub;
|
||||||
mod args;
|
mod args;
|
||||||
|
mod collector;
|
||||||
mod config;
|
mod config;
|
||||||
mod data;
|
mod data;
|
||||||
mod db;
|
mod db;
|
||||||
|
@ -98,6 +99,8 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
let config = Config::build()?;
|
let config = Config::build()?;
|
||||||
|
|
||||||
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
|
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
|
||||||
|
let collector = collector::MemoryCollector::new();
|
||||||
|
collector.install()?;
|
||||||
|
|
||||||
let args = Args::new();
|
let args = Args::new();
|
||||||
|
|
||||||
|
@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
.app_data(web::Data::new(actors.clone()))
|
.app_data(web::Data::new(actors.clone()))
|
||||||
.app_data(web::Data::new(config.clone()))
|
.app_data(web::Data::new(config.clone()))
|
||||||
.app_data(web::Data::new(job_server.clone()))
|
.app_data(web::Data::new(job_server.clone()))
|
||||||
.app_data(web::Data::new(media.clone()));
|
.app_data(web::Data::new(media.clone()))
|
||||||
|
.app_data(web::Data::new(collector.clone()));
|
||||||
|
|
||||||
let app = if let Some(data) = config.admin_config() {
|
let app = if let Some(data) = config.admin_config() {
|
||||||
app.app_data(data)
|
app.app_data(data)
|
||||||
|
@ -203,7 +207,8 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
.route("/unblock", web::post().to(admin::routes::unblock))
|
.route("/unblock", web::post().to(admin::routes::unblock))
|
||||||
.route("/allowed", web::get().to(admin::routes::allowed))
|
.route("/allowed", web::get().to(admin::routes::allowed))
|
||||||
.route("/blocked", web::get().to(admin::routes::blocked))
|
.route("/blocked", web::get().to(admin::routes::blocked))
|
||||||
.route("/connected", web::get().to(admin::routes::connected)),
|
.route("/connected", web::get().to(admin::routes::connected))
|
||||||
|
.route("/stats", web::get().to(admin::routes::stats)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue