diff --git a/src/collector.rs b/src/collector.rs index b2758b0..91f235f 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -1,5 +1,414 @@ -mod double; -mod stats; +use metrics::{Key, Recorder, SetRecorderError}; +use metrics_util::{ + registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, + MetricKindMask, Summary, +}; +use quanta::Clock; +use std::{ + collections::{BTreeMap, HashMap}, + sync::{atomic::Ordering, Arc, RwLock}, + time::Duration, +}; -pub(crate) use double::DoubleRecorder; -pub(crate) use stats::{MemoryCollector, Snapshot}; +const SECONDS: u64 = 1; +const MINUTES: u64 = 60 * SECONDS; +const HOURS: u64 = 60 * MINUTES; +const DAYS: u64 = 24 * HOURS; + +type DistributionMap = BTreeMap, Summary>; + +#[derive(Clone)] +pub struct MemoryCollector { + inner: Arc, +} + +struct Inner { + descriptions: RwLock>, + distributions: RwLock>, + recency: Recency, + registry: Registry>, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Counter { + labels: BTreeMap, + value: u64, +} + +impl std::fmt::Display for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{k}: {v}")) + .collect::>() + .join(", "); + + write!(f, "{labels} - {}", self.value) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Gauge { + labels: BTreeMap, + value: f64, +} + +impl std::fmt::Display for Gauge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{k}: {v}")) + .collect::>() + .join(", "); + + write!(f, "{labels} - {}", self.value) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Histogram { + labels: BTreeMap, + value: Vec<(f64, Option)>, +} + +impl std::fmt::Display for Histogram { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{k}: {v}")) + .collect::>() + .join(", "); + + let value = self + .value + .iter() + .map(|(k, v)| { + if let Some(v) = v { + format!("{k}: {v:.6}") + } else { + format!("{k}: None,") + } + }) + .collect::>() + .join(", "); + + write!(f, "{labels} - {value}") + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub(crate) struct Snapshot { + counters: HashMap>, + gauges: HashMap>, + histograms: HashMap>, +} + +const PAIRS: [((&str, &str), &str); 2] = [ + ( + ( + "background-jobs.worker.started", + "background-jobs.worker.finished", + ), + "background-jobs.worker.running", + ), + ( + ( + "background-jobs.job.started", + "background-jobs.job.finished", + ), + "background-jobs.job.running", + ), +]; + +#[derive(Default)] +struct MergeCounter { + start: Option, + finish: Option, +} + +impl MergeCounter { + fn merge(self) -> Option { + match (self.start, self.finish) { + (Some(start), Some(end)) => Some(Counter { + labels: start.labels, + value: start.value.saturating_sub(end.value), + }), + (Some(only), None) => Some(only), + (None, Some(only)) => Some(Counter { + labels: only.labels, + value: 0, + }), + (None, None) => None, + } + } +} + +impl Snapshot { + pub(crate) fn present(self) { + if !self.counters.is_empty() { + println!("Counters"); + let mut merging = HashMap::new(); + for (key, counters) in self.counters { + if let Some(((start, _), name)) = PAIRS + .iter() + .find(|((start, finish), _)| *start == key || *finish == key) + { + let entry = merging.entry(name).or_insert_with(HashMap::new); + + for counter in counters { + let mut merge_counter = entry + .entry(counter.labels.clone()) + .or_insert_with(MergeCounter::default); + if key == *start { + merge_counter.start = Some(counter); + } else { + merge_counter.finish = Some(counter); + } + } + + continue; + } + + println!("\t{key}"); + for counter in counters { + println!("\t\t{counter}"); + } + } + + for (key, counters) in merging { + println!("\t{key}"); + + for (_, counter) in counters { + if let Some(counter) = counter.merge() { + println!("\t\t{counter}"); + } + } + } + } + + if !self.gauges.is_empty() { + println!("Gauges"); + for (key, gauges) in self.gauges { + println!("\t{key}"); + + for gauge in gauges { + println!("\t\t{gauge}"); + } + } + } + + if !self.histograms.is_empty() { + println!("Histograms"); + for (key, histograms) in self.histograms { + println!("\t{key}"); + + for histogram in histograms { + println!("\t\t{histogram}"); + } + } + } + } +} + +fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { + let labels = key + .labels() + .into_iter() + .map(|label| (label.key().to_string(), label.value().to_string())) + .collect(); + let name = key.name().to_string(); + (name, labels) +} + +impl Inner { + fn snapshot_counters(&self) -> HashMap> { + let mut counters = HashMap::new(); + + for (key, counter) in self.registry.get_counter_handles() { + let gen = counter.get_generation(); + if !self.recency.should_store_counter(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = counter.get_inner().load(Ordering::Acquire); + counters.entry(name).or_insert_with(Vec::new).push(Counter { + labels: labels.into_iter().collect(), + value, + }); + } + + counters + } + + fn snapshot_gauges(&self) -> HashMap> { + let mut gauges = HashMap::new(); + + for (key, gauge) in self.registry.get_gauge_handles() { + let gen = gauge.get_generation(); + if !self.recency.should_store_gauge(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); + gauges.entry(name).or_insert_with(Vec::new).push(Gauge { + labels: labels.into_iter().collect(), + value, + }) + } + + gauges + } + + fn snapshot_histograms(&self) -> HashMap> { + for (key, histogram) in self.registry.get_histogram_handles() { + let gen = histogram.get_generation(); + let (name, labels) = key_to_parts(&key); + + if !self + .recency + .should_store_histogram(&key, gen, &self.registry) + { + let mut d = self.distributions.write().unwrap(); + let delete_by_name = if let Some(by_name) = d.get_mut(&name) { + by_name.remove(&labels); + by_name.is_empty() + } else { + false + }; + drop(d); + + if delete_by_name { + self.descriptions.write().unwrap().remove(&name); + } + + continue; + } + + let mut d = self.distributions.write().unwrap(); + let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new); + + let entry = outer_entry + .entry(labels) + .or_insert_with(Summary::with_defaults); + + histogram.get_inner().clear_with(|samples| { + for sample in samples { + entry.add(*sample); + } + }) + } + + let d = self.distributions.read().unwrap().clone(); + d.into_iter() + .map(|(key, value)| { + ( + key, + value + .into_iter() + .map(|(labels, summary)| Histogram { + labels: labels.into_iter().collect(), + value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] + .into_iter() + .map(|q| (q, summary.quantile(q))) + .collect(), + }) + .collect(), + ) + }) + .collect() + } + + fn snapshot(&self) -> Snapshot { + Snapshot { + counters: self.snapshot_counters(), + gauges: self.snapshot_gauges(), + histograms: self.snapshot_histograms(), + } + } +} + +impl MemoryCollector { + pub(crate) fn new() -> Self { + MemoryCollector { + inner: Arc::new(Inner { + descriptions: Default::default(), + distributions: Default::default(), + recency: Recency::new( + Clock::new(), + MetricKindMask::ALL, + Some(Duration::from_secs(5 * DAYS)), + ), + registry: Registry::new(GenerationalStorage::atomic()), + }), + } + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.inner.snapshot() + } + + fn add_description_if_missing( + &self, + key: &metrics::KeyName, + description: metrics::SharedString, + ) { + let mut d = self.inner.descriptions.write().unwrap(); + d.entry(key.as_str().to_owned()).or_insert(description); + } + + pub(crate) fn install(&self) -> Result<(), SetRecorderError> { + metrics::set_boxed_recorder(Box::new(self.clone())) + } +} + +impl Recorder for MemoryCollector { + fn describe_counter( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_gauge( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_histogram( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + self.inner + .registry + .get_or_create_counter(key, |c| c.clone().into()) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + self.inner + .registry + .get_or_create_gauge(key, |c| c.clone().into()) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + self.inner + .registry + .get_or_create_histogram(key, |c| c.clone().into()) + } +} diff --git a/src/collector/double.rs b/src/collector/double.rs deleted file mode 100644 index b78dbff..0000000 --- a/src/collector/double.rs +++ /dev/null @@ -1,133 +0,0 @@ -use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError}; -use std::sync::Arc; - -#[derive(Clone)] -pub(crate) struct DoubleRecorder { - first: R, - second: S, -} - -struct DoubleCounter { - first: metrics::Counter, - second: metrics::Counter, -} - -struct DoubleGauge { - first: metrics::Gauge, - second: metrics::Gauge, -} - -struct DoubleHistogram { - first: metrics::Histogram, - second: metrics::Histogram, -} - -impl DoubleRecorder { - pub(crate) fn new(first: R, second: S) -> Self { - DoubleRecorder { first, second } - } - - pub(crate) fn install(self) -> Result<(), SetRecorderError> - where - R: Recorder + 'static, - S: Recorder + 'static, - { - metrics::set_boxed_recorder(Box::new(self)) - } -} - -impl Recorder for DoubleRecorder -where - R: Recorder, - S: Recorder, -{ - fn describe_counter( - &self, - key: metrics::KeyName, - unit: Option, - description: metrics::SharedString, - ) { - self.first - .describe_counter(key.clone(), unit, description.clone()); - self.second.describe_counter(key, unit, description); - } - - fn describe_gauge( - &self, - key: metrics::KeyName, - unit: Option, - description: metrics::SharedString, - ) { - self.first - .describe_gauge(key.clone(), unit, description.clone()); - self.second.describe_gauge(key, unit, description); - } - - fn describe_histogram( - &self, - key: metrics::KeyName, - unit: Option, - description: metrics::SharedString, - ) { - self.first - .describe_histogram(key.clone(), unit, description.clone()); - self.second.describe_histogram(key, unit, description); - } - - fn register_counter(&self, key: &Key) -> metrics::Counter { - let first = self.first.register_counter(key); - let second = self.second.register_counter(key); - - metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second })) - } - - fn register_gauge(&self, key: &Key) -> metrics::Gauge { - let first = self.first.register_gauge(key); - let second = self.second.register_gauge(key); - - metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second })) - } - - fn register_histogram(&self, key: &Key) -> metrics::Histogram { - let first = self.first.register_histogram(key); - let second = self.second.register_histogram(key); - - metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second })) - } -} - -impl CounterFn for DoubleCounter { - fn increment(&self, value: u64) { - self.first.increment(value); - self.second.increment(value); - } - - fn absolute(&self, value: u64) { - self.first.absolute(value); - self.second.absolute(value); - } -} - -impl GaugeFn for DoubleGauge { - fn increment(&self, value: f64) { - self.first.increment(value); - self.second.increment(value); - } - - fn decrement(&self, value: f64) { - self.first.decrement(value); - self.second.decrement(value); - } - - fn set(&self, value: f64) { - self.first.set(value); - self.second.set(value); - } -} - -impl HistogramFn for DoubleHistogram { - fn record(&self, value: f64) { - self.first.record(value); - self.second.record(value); - } -} diff --git a/src/collector/stats.rs b/src/collector/stats.rs deleted file mode 100644 index 91f235f..0000000 --- a/src/collector/stats.rs +++ /dev/null @@ -1,414 +0,0 @@ -use metrics::{Key, Recorder, SetRecorderError}; -use metrics_util::{ - registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, - MetricKindMask, Summary, -}; -use quanta::Clock; -use std::{ - collections::{BTreeMap, HashMap}, - sync::{atomic::Ordering, Arc, RwLock}, - time::Duration, -}; - -const SECONDS: u64 = 1; -const MINUTES: u64 = 60 * SECONDS; -const HOURS: u64 = 60 * MINUTES; -const DAYS: u64 = 24 * HOURS; - -type DistributionMap = BTreeMap, Summary>; - -#[derive(Clone)] -pub struct MemoryCollector { - inner: Arc, -} - -struct Inner { - descriptions: RwLock>, - distributions: RwLock>, - recency: Recency, - registry: Registry>, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Counter { - labels: BTreeMap, - value: u64, -} - -impl std::fmt::Display for Counter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{k}: {v}")) - .collect::>() - .join(", "); - - write!(f, "{labels} - {}", self.value) - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Gauge { - labels: BTreeMap, - value: f64, -} - -impl std::fmt::Display for Gauge { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{k}: {v}")) - .collect::>() - .join(", "); - - write!(f, "{labels} - {}", self.value) - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Histogram { - labels: BTreeMap, - value: Vec<(f64, Option)>, -} - -impl std::fmt::Display for Histogram { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{k}: {v}")) - .collect::>() - .join(", "); - - let value = self - .value - .iter() - .map(|(k, v)| { - if let Some(v) = v { - format!("{k}: {v:.6}") - } else { - format!("{k}: None,") - } - }) - .collect::>() - .join(", "); - - write!(f, "{labels} - {value}") - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct Snapshot { - counters: HashMap>, - gauges: HashMap>, - histograms: HashMap>, -} - -const PAIRS: [((&str, &str), &str); 2] = [ - ( - ( - "background-jobs.worker.started", - "background-jobs.worker.finished", - ), - "background-jobs.worker.running", - ), - ( - ( - "background-jobs.job.started", - "background-jobs.job.finished", - ), - "background-jobs.job.running", - ), -]; - -#[derive(Default)] -struct MergeCounter { - start: Option, - finish: Option, -} - -impl MergeCounter { - fn merge(self) -> Option { - match (self.start, self.finish) { - (Some(start), Some(end)) => Some(Counter { - labels: start.labels, - value: start.value.saturating_sub(end.value), - }), - (Some(only), None) => Some(only), - (None, Some(only)) => Some(Counter { - labels: only.labels, - value: 0, - }), - (None, None) => None, - } - } -} - -impl Snapshot { - pub(crate) fn present(self) { - if !self.counters.is_empty() { - println!("Counters"); - let mut merging = HashMap::new(); - for (key, counters) in self.counters { - if let Some(((start, _), name)) = PAIRS - .iter() - .find(|((start, finish), _)| *start == key || *finish == key) - { - let entry = merging.entry(name).or_insert_with(HashMap::new); - - for counter in counters { - let mut merge_counter = entry - .entry(counter.labels.clone()) - .or_insert_with(MergeCounter::default); - if key == *start { - merge_counter.start = Some(counter); - } else { - merge_counter.finish = Some(counter); - } - } - - continue; - } - - println!("\t{key}"); - for counter in counters { - println!("\t\t{counter}"); - } - } - - for (key, counters) in merging { - println!("\t{key}"); - - for (_, counter) in counters { - if let Some(counter) = counter.merge() { - println!("\t\t{counter}"); - } - } - } - } - - if !self.gauges.is_empty() { - println!("Gauges"); - for (key, gauges) in self.gauges { - println!("\t{key}"); - - for gauge in gauges { - println!("\t\t{gauge}"); - } - } - } - - if !self.histograms.is_empty() { - println!("Histograms"); - for (key, histograms) in self.histograms { - println!("\t{key}"); - - for histogram in histograms { - println!("\t\t{histogram}"); - } - } - } - } -} - -fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { - let labels = key - .labels() - .into_iter() - .map(|label| (label.key().to_string(), label.value().to_string())) - .collect(); - let name = key.name().to_string(); - (name, labels) -} - -impl Inner { - fn snapshot_counters(&self) -> HashMap> { - let mut counters = HashMap::new(); - - for (key, counter) in self.registry.get_counter_handles() { - let gen = counter.get_generation(); - if !self.recency.should_store_counter(&key, gen, &self.registry) { - continue; - } - - let (name, labels) = key_to_parts(&key); - let value = counter.get_inner().load(Ordering::Acquire); - counters.entry(name).or_insert_with(Vec::new).push(Counter { - labels: labels.into_iter().collect(), - value, - }); - } - - counters - } - - fn snapshot_gauges(&self) -> HashMap> { - let mut gauges = HashMap::new(); - - for (key, gauge) in self.registry.get_gauge_handles() { - let gen = gauge.get_generation(); - if !self.recency.should_store_gauge(&key, gen, &self.registry) { - continue; - } - - let (name, labels) = key_to_parts(&key); - let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); - gauges.entry(name).or_insert_with(Vec::new).push(Gauge { - labels: labels.into_iter().collect(), - value, - }) - } - - gauges - } - - fn snapshot_histograms(&self) -> HashMap> { - for (key, histogram) in self.registry.get_histogram_handles() { - let gen = histogram.get_generation(); - let (name, labels) = key_to_parts(&key); - - if !self - .recency - .should_store_histogram(&key, gen, &self.registry) - { - let mut d = self.distributions.write().unwrap(); - let delete_by_name = if let Some(by_name) = d.get_mut(&name) { - by_name.remove(&labels); - by_name.is_empty() - } else { - false - }; - drop(d); - - if delete_by_name { - self.descriptions.write().unwrap().remove(&name); - } - - continue; - } - - let mut d = self.distributions.write().unwrap(); - let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new); - - let entry = outer_entry - .entry(labels) - .or_insert_with(Summary::with_defaults); - - histogram.get_inner().clear_with(|samples| { - for sample in samples { - entry.add(*sample); - } - }) - } - - let d = self.distributions.read().unwrap().clone(); - d.into_iter() - .map(|(key, value)| { - ( - key, - value - .into_iter() - .map(|(labels, summary)| Histogram { - labels: labels.into_iter().collect(), - value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] - .into_iter() - .map(|q| (q, summary.quantile(q))) - .collect(), - }) - .collect(), - ) - }) - .collect() - } - - fn snapshot(&self) -> Snapshot { - Snapshot { - counters: self.snapshot_counters(), - gauges: self.snapshot_gauges(), - histograms: self.snapshot_histograms(), - } - } -} - -impl MemoryCollector { - pub(crate) fn new() -> Self { - MemoryCollector { - inner: Arc::new(Inner { - descriptions: Default::default(), - distributions: Default::default(), - recency: Recency::new( - Clock::new(), - MetricKindMask::ALL, - Some(Duration::from_secs(5 * DAYS)), - ), - registry: Registry::new(GenerationalStorage::atomic()), - }), - } - } - - pub(crate) fn snapshot(&self) -> Snapshot { - self.inner.snapshot() - } - - fn add_description_if_missing( - &self, - key: &metrics::KeyName, - description: metrics::SharedString, - ) { - let mut d = self.inner.descriptions.write().unwrap(); - d.entry(key.as_str().to_owned()).or_insert(description); - } - - pub(crate) fn install(&self) -> Result<(), SetRecorderError> { - metrics::set_boxed_recorder(Box::new(self.clone())) - } -} - -impl Recorder for MemoryCollector { - fn describe_counter( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn describe_gauge( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn describe_histogram( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn register_counter(&self, key: &Key) -> metrics::Counter { - self.inner - .registry - .get_or_create_counter(key, |c| c.clone().into()) - } - - fn register_gauge(&self, key: &Key) -> metrics::Gauge { - self.inner - .registry - .get_or_create_gauge(key, |c| c.clone().into()) - } - - fn register_histogram(&self, key: &Key) -> metrics::Histogram { - self.inner - .registry - .get_or_create_histogram(key, |c| c.clone().into()) - } -} diff --git a/src/main.rs b/src/main.rs index 22764a2..5f733d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,12 @@ use activitystreams::iri_string::types::IriString; use actix_rt::task::JoinHandle; use actix_web::{middleware::Compress, web, App, HttpServer}; -use collector::{DoubleRecorder, MemoryCollector}; +use collector::MemoryCollector; #[cfg(feature = "console")] use console_subscriber::ConsoleLayer; use http_signature_normalization_actix::middleware::VerifySignature; use metrics_exporter_prometheus::PrometheusBuilder; +use metrics_util::layers::FanoutBuilder; use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry_otlp::WithExportConfig; use rustls::ServerConfig; @@ -119,7 +120,11 @@ async fn main() -> Result<(), anyhow::Error> { .build()?; actix_rt::spawn(exporter); - DoubleRecorder::new(recorder, collector.clone()).install()?; + let recorder = FanoutBuilder::default() + .add_recorder(recorder) + .add_recorder(collector.clone()) + .build(); + metrics::set_boxed_recorder(Box::new(recorder))?; } else { collector.install()?; }