mirror of
https://git.asonix.dog/asonix/relay.git
synced 2025-01-21 00:38:08 +00:00
Replace Double with Fanout
This commit is contained in:
parent
afd4105d0f
commit
2cb5ad9917
4 changed files with 420 additions and 553 deletions
417
src/collector.rs
417
src/collector.rs
|
@ -1,5 +1,414 @@
|
|||
mod double;
|
||||
mod stats;
|
||||
use metrics::{Key, Recorder, SetRecorderError};
|
||||
use metrics_util::{
|
||||
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
||||
MetricKindMask, Summary,
|
||||
};
|
||||
use quanta::Clock;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::{atomic::Ordering, Arc, RwLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
pub(crate) use double::DoubleRecorder;
|
||||
pub(crate) use stats::{MemoryCollector, Snapshot};
|
||||
const SECONDS: u64 = 1;
|
||||
const MINUTES: u64 = 60 * SECONDS;
|
||||
const HOURS: u64 = 60 * MINUTES;
|
||||
const DAYS: u64 = 24 * HOURS;
|
||||
|
||||
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MemoryCollector {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
|
||||
distributions: RwLock<HashMap<String, DistributionMap>>,
|
||||
recency: Recency<Key>,
|
||||
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Counter {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: u64,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Counter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {}", self.value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Gauge {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: f64,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Gauge {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {}", self.value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Histogram {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: Vec<(f64, Option<f64>)>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Histogram {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let value = self
|
||||
.value
|
||||
.iter()
|
||||
.map(|(k, v)| {
|
||||
if let Some(v) = v {
|
||||
format!("{k}: {v:.6}")
|
||||
} else {
|
||||
format!("{k}: None,")
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {value}")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Snapshot {
|
||||
counters: HashMap<String, Vec<Counter>>,
|
||||
gauges: HashMap<String, Vec<Gauge>>,
|
||||
histograms: HashMap<String, Vec<Histogram>>,
|
||||
}
|
||||
|
||||
const PAIRS: [((&str, &str), &str); 2] = [
|
||||
(
|
||||
(
|
||||
"background-jobs.worker.started",
|
||||
"background-jobs.worker.finished",
|
||||
),
|
||||
"background-jobs.worker.running",
|
||||
),
|
||||
(
|
||||
(
|
||||
"background-jobs.job.started",
|
||||
"background-jobs.job.finished",
|
||||
),
|
||||
"background-jobs.job.running",
|
||||
),
|
||||
];
|
||||
|
||||
#[derive(Default)]
|
||||
struct MergeCounter {
|
||||
start: Option<Counter>,
|
||||
finish: Option<Counter>,
|
||||
}
|
||||
|
||||
impl MergeCounter {
|
||||
fn merge(self) -> Option<Counter> {
|
||||
match (self.start, self.finish) {
|
||||
(Some(start), Some(end)) => Some(Counter {
|
||||
labels: start.labels,
|
||||
value: start.value.saturating_sub(end.value),
|
||||
}),
|
||||
(Some(only), None) => Some(only),
|
||||
(None, Some(only)) => Some(Counter {
|
||||
labels: only.labels,
|
||||
value: 0,
|
||||
}),
|
||||
(None, None) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
pub(crate) fn present(self) {
|
||||
if !self.counters.is_empty() {
|
||||
println!("Counters");
|
||||
let mut merging = HashMap::new();
|
||||
for (key, counters) in self.counters {
|
||||
if let Some(((start, _), name)) = PAIRS
|
||||
.iter()
|
||||
.find(|((start, finish), _)| *start == key || *finish == key)
|
||||
{
|
||||
let entry = merging.entry(name).or_insert_with(HashMap::new);
|
||||
|
||||
for counter in counters {
|
||||
let mut merge_counter = entry
|
||||
.entry(counter.labels.clone())
|
||||
.or_insert_with(MergeCounter::default);
|
||||
if key == *start {
|
||||
merge_counter.start = Some(counter);
|
||||
} else {
|
||||
merge_counter.finish = Some(counter);
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
println!("\t{key}");
|
||||
for counter in counters {
|
||||
println!("\t\t{counter}");
|
||||
}
|
||||
}
|
||||
|
||||
for (key, counters) in merging {
|
||||
println!("\t{key}");
|
||||
|
||||
for (_, counter) in counters {
|
||||
if let Some(counter) = counter.merge() {
|
||||
println!("\t\t{counter}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.gauges.is_empty() {
|
||||
println!("Gauges");
|
||||
for (key, gauges) in self.gauges {
|
||||
println!("\t{key}");
|
||||
|
||||
for gauge in gauges {
|
||||
println!("\t\t{gauge}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.histograms.is_empty() {
|
||||
println!("Histograms");
|
||||
for (key, histograms) in self.histograms {
|
||||
println!("\t{key}");
|
||||
|
||||
for histogram in histograms {
|
||||
println!("\t\t{histogram}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
||||
let labels = key
|
||||
.labels()
|
||||
.into_iter()
|
||||
.map(|label| (label.key().to_string(), label.value().to_string()))
|
||||
.collect();
|
||||
let name = key.name().to_string();
|
||||
(name, labels)
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
|
||||
let mut counters = HashMap::new();
|
||||
|
||||
for (key, counter) in self.registry.get_counter_handles() {
|
||||
let gen = counter.get_generation();
|
||||
if !self.recency.should_store_counter(&key, gen, &self.registry) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
let value = counter.get_inner().load(Ordering::Acquire);
|
||||
counters.entry(name).or_insert_with(Vec::new).push(Counter {
|
||||
labels: labels.into_iter().collect(),
|
||||
value,
|
||||
});
|
||||
}
|
||||
|
||||
counters
|
||||
}
|
||||
|
||||
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
|
||||
let mut gauges = HashMap::new();
|
||||
|
||||
for (key, gauge) in self.registry.get_gauge_handles() {
|
||||
let gen = gauge.get_generation();
|
||||
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
||||
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
|
||||
labels: labels.into_iter().collect(),
|
||||
value,
|
||||
})
|
||||
}
|
||||
|
||||
gauges
|
||||
}
|
||||
|
||||
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
|
||||
for (key, histogram) in self.registry.get_histogram_handles() {
|
||||
let gen = histogram.get_generation();
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
|
||||
if !self
|
||||
.recency
|
||||
.should_store_histogram(&key, gen, &self.registry)
|
||||
{
|
||||
let mut d = self.distributions.write().unwrap();
|
||||
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
|
||||
by_name.remove(&labels);
|
||||
by_name.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
drop(d);
|
||||
|
||||
if delete_by_name {
|
||||
self.descriptions.write().unwrap().remove(&name);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut d = self.distributions.write().unwrap();
|
||||
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
|
||||
|
||||
let entry = outer_entry
|
||||
.entry(labels)
|
||||
.or_insert_with(Summary::with_defaults);
|
||||
|
||||
histogram.get_inner().clear_with(|samples| {
|
||||
for sample in samples {
|
||||
entry.add(*sample);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
let d = self.distributions.read().unwrap().clone();
|
||||
d.into_iter()
|
||||
.map(|(key, value)| {
|
||||
(
|
||||
key,
|
||||
value
|
||||
.into_iter()
|
||||
.map(|(labels, summary)| Histogram {
|
||||
labels: labels.into_iter().collect(),
|
||||
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
||||
.into_iter()
|
||||
.map(|q| (q, summary.quantile(q)))
|
||||
.collect(),
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> Snapshot {
|
||||
Snapshot {
|
||||
counters: self.snapshot_counters(),
|
||||
gauges: self.snapshot_gauges(),
|
||||
histograms: self.snapshot_histograms(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryCollector {
|
||||
pub(crate) fn new() -> Self {
|
||||
MemoryCollector {
|
||||
inner: Arc::new(Inner {
|
||||
descriptions: Default::default(),
|
||||
distributions: Default::default(),
|
||||
recency: Recency::new(
|
||||
Clock::new(),
|
||||
MetricKindMask::ALL,
|
||||
Some(Duration::from_secs(5 * DAYS)),
|
||||
),
|
||||
registry: Registry::new(GenerationalStorage::atomic()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn snapshot(&self) -> Snapshot {
|
||||
self.inner.snapshot()
|
||||
}
|
||||
|
||||
fn add_description_if_missing(
|
||||
&self,
|
||||
key: &metrics::KeyName,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
let mut d = self.inner.descriptions.write().unwrap();
|
||||
d.entry(key.as_str().to_owned()).or_insert(description);
|
||||
}
|
||||
|
||||
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
||||
metrics::set_boxed_recorder(Box::new(self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Recorder for MemoryCollector {
|
||||
fn describe_counter(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn describe_gauge(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn describe_histogram(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_counter(key, |c| c.clone().into())
|
||||
}
|
||||
|
||||
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_gauge(key, |c| c.clone().into())
|
||||
}
|
||||
|
||||
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_histogram(key, |c| c.clone().into())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct DoubleRecorder<R, S> {
|
||||
first: R,
|
||||
second: S,
|
||||
}
|
||||
|
||||
struct DoubleCounter {
|
||||
first: metrics::Counter,
|
||||
second: metrics::Counter,
|
||||
}
|
||||
|
||||
struct DoubleGauge {
|
||||
first: metrics::Gauge,
|
||||
second: metrics::Gauge,
|
||||
}
|
||||
|
||||
struct DoubleHistogram {
|
||||
first: metrics::Histogram,
|
||||
second: metrics::Histogram,
|
||||
}
|
||||
|
||||
impl<R, S> DoubleRecorder<R, S> {
|
||||
pub(crate) fn new(first: R, second: S) -> Self {
|
||||
DoubleRecorder { first, second }
|
||||
}
|
||||
|
||||
pub(crate) fn install(self) -> Result<(), SetRecorderError>
|
||||
where
|
||||
R: Recorder + 'static,
|
||||
S: Recorder + 'static,
|
||||
{
|
||||
metrics::set_boxed_recorder(Box::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl<R, S> Recorder for DoubleRecorder<R, S>
|
||||
where
|
||||
R: Recorder,
|
||||
S: Recorder,
|
||||
{
|
||||
fn describe_counter(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
unit: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.first
|
||||
.describe_counter(key.clone(), unit, description.clone());
|
||||
self.second.describe_counter(key, unit, description);
|
||||
}
|
||||
|
||||
fn describe_gauge(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
unit: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.first
|
||||
.describe_gauge(key.clone(), unit, description.clone());
|
||||
self.second.describe_gauge(key, unit, description);
|
||||
}
|
||||
|
||||
fn describe_histogram(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
unit: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.first
|
||||
.describe_histogram(key.clone(), unit, description.clone());
|
||||
self.second.describe_histogram(key, unit, description);
|
||||
}
|
||||
|
||||
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||
let first = self.first.register_counter(key);
|
||||
let second = self.second.register_counter(key);
|
||||
|
||||
metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second }))
|
||||
}
|
||||
|
||||
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||
let first = self.first.register_gauge(key);
|
||||
let second = self.second.register_gauge(key);
|
||||
|
||||
metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second }))
|
||||
}
|
||||
|
||||
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||
let first = self.first.register_histogram(key);
|
||||
let second = self.second.register_histogram(key);
|
||||
|
||||
metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second }))
|
||||
}
|
||||
}
|
||||
|
||||
impl CounterFn for DoubleCounter {
|
||||
fn increment(&self, value: u64) {
|
||||
self.first.increment(value);
|
||||
self.second.increment(value);
|
||||
}
|
||||
|
||||
fn absolute(&self, value: u64) {
|
||||
self.first.absolute(value);
|
||||
self.second.absolute(value);
|
||||
}
|
||||
}
|
||||
|
||||
impl GaugeFn for DoubleGauge {
|
||||
fn increment(&self, value: f64) {
|
||||
self.first.increment(value);
|
||||
self.second.increment(value);
|
||||
}
|
||||
|
||||
fn decrement(&self, value: f64) {
|
||||
self.first.decrement(value);
|
||||
self.second.decrement(value);
|
||||
}
|
||||
|
||||
fn set(&self, value: f64) {
|
||||
self.first.set(value);
|
||||
self.second.set(value);
|
||||
}
|
||||
}
|
||||
|
||||
impl HistogramFn for DoubleHistogram {
|
||||
fn record(&self, value: f64) {
|
||||
self.first.record(value);
|
||||
self.second.record(value);
|
||||
}
|
||||
}
|
|
@ -1,414 +0,0 @@
|
|||
use metrics::{Key, Recorder, SetRecorderError};
|
||||
use metrics_util::{
|
||||
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
||||
MetricKindMask, Summary,
|
||||
};
|
||||
use quanta::Clock;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::{atomic::Ordering, Arc, RwLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
const SECONDS: u64 = 1;
|
||||
const MINUTES: u64 = 60 * SECONDS;
|
||||
const HOURS: u64 = 60 * MINUTES;
|
||||
const DAYS: u64 = 24 * HOURS;
|
||||
|
||||
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MemoryCollector {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
|
||||
distributions: RwLock<HashMap<String, DistributionMap>>,
|
||||
recency: Recency<Key>,
|
||||
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Counter {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: u64,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Counter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {}", self.value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Gauge {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: f64,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Gauge {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {}", self.value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct Histogram {
|
||||
labels: BTreeMap<String, String>,
|
||||
value: Vec<(f64, Option<f64>)>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Histogram {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}: {v}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
let value = self
|
||||
.value
|
||||
.iter()
|
||||
.map(|(k, v)| {
|
||||
if let Some(v) = v {
|
||||
format!("{k}: {v:.6}")
|
||||
} else {
|
||||
format!("{k}: None,")
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
write!(f, "{labels} - {value}")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Snapshot {
|
||||
counters: HashMap<String, Vec<Counter>>,
|
||||
gauges: HashMap<String, Vec<Gauge>>,
|
||||
histograms: HashMap<String, Vec<Histogram>>,
|
||||
}
|
||||
|
||||
const PAIRS: [((&str, &str), &str); 2] = [
|
||||
(
|
||||
(
|
||||
"background-jobs.worker.started",
|
||||
"background-jobs.worker.finished",
|
||||
),
|
||||
"background-jobs.worker.running",
|
||||
),
|
||||
(
|
||||
(
|
||||
"background-jobs.job.started",
|
||||
"background-jobs.job.finished",
|
||||
),
|
||||
"background-jobs.job.running",
|
||||
),
|
||||
];
|
||||
|
||||
#[derive(Default)]
|
||||
struct MergeCounter {
|
||||
start: Option<Counter>,
|
||||
finish: Option<Counter>,
|
||||
}
|
||||
|
||||
impl MergeCounter {
|
||||
fn merge(self) -> Option<Counter> {
|
||||
match (self.start, self.finish) {
|
||||
(Some(start), Some(end)) => Some(Counter {
|
||||
labels: start.labels,
|
||||
value: start.value.saturating_sub(end.value),
|
||||
}),
|
||||
(Some(only), None) => Some(only),
|
||||
(None, Some(only)) => Some(Counter {
|
||||
labels: only.labels,
|
||||
value: 0,
|
||||
}),
|
||||
(None, None) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
pub(crate) fn present(self) {
|
||||
if !self.counters.is_empty() {
|
||||
println!("Counters");
|
||||
let mut merging = HashMap::new();
|
||||
for (key, counters) in self.counters {
|
||||
if let Some(((start, _), name)) = PAIRS
|
||||
.iter()
|
||||
.find(|((start, finish), _)| *start == key || *finish == key)
|
||||
{
|
||||
let entry = merging.entry(name).or_insert_with(HashMap::new);
|
||||
|
||||
for counter in counters {
|
||||
let mut merge_counter = entry
|
||||
.entry(counter.labels.clone())
|
||||
.or_insert_with(MergeCounter::default);
|
||||
if key == *start {
|
||||
merge_counter.start = Some(counter);
|
||||
} else {
|
||||
merge_counter.finish = Some(counter);
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
println!("\t{key}");
|
||||
for counter in counters {
|
||||
println!("\t\t{counter}");
|
||||
}
|
||||
}
|
||||
|
||||
for (key, counters) in merging {
|
||||
println!("\t{key}");
|
||||
|
||||
for (_, counter) in counters {
|
||||
if let Some(counter) = counter.merge() {
|
||||
println!("\t\t{counter}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.gauges.is_empty() {
|
||||
println!("Gauges");
|
||||
for (key, gauges) in self.gauges {
|
||||
println!("\t{key}");
|
||||
|
||||
for gauge in gauges {
|
||||
println!("\t\t{gauge}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.histograms.is_empty() {
|
||||
println!("Histograms");
|
||||
for (key, histograms) in self.histograms {
|
||||
println!("\t{key}");
|
||||
|
||||
for histogram in histograms {
|
||||
println!("\t\t{histogram}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
||||
let labels = key
|
||||
.labels()
|
||||
.into_iter()
|
||||
.map(|label| (label.key().to_string(), label.value().to_string()))
|
||||
.collect();
|
||||
let name = key.name().to_string();
|
||||
(name, labels)
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
|
||||
let mut counters = HashMap::new();
|
||||
|
||||
for (key, counter) in self.registry.get_counter_handles() {
|
||||
let gen = counter.get_generation();
|
||||
if !self.recency.should_store_counter(&key, gen, &self.registry) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
let value = counter.get_inner().load(Ordering::Acquire);
|
||||
counters.entry(name).or_insert_with(Vec::new).push(Counter {
|
||||
labels: labels.into_iter().collect(),
|
||||
value,
|
||||
});
|
||||
}
|
||||
|
||||
counters
|
||||
}
|
||||
|
||||
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
|
||||
let mut gauges = HashMap::new();
|
||||
|
||||
for (key, gauge) in self.registry.get_gauge_handles() {
|
||||
let gen = gauge.get_generation();
|
||||
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
||||
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
|
||||
labels: labels.into_iter().collect(),
|
||||
value,
|
||||
})
|
||||
}
|
||||
|
||||
gauges
|
||||
}
|
||||
|
||||
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
|
||||
for (key, histogram) in self.registry.get_histogram_handles() {
|
||||
let gen = histogram.get_generation();
|
||||
let (name, labels) = key_to_parts(&key);
|
||||
|
||||
if !self
|
||||
.recency
|
||||
.should_store_histogram(&key, gen, &self.registry)
|
||||
{
|
||||
let mut d = self.distributions.write().unwrap();
|
||||
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
|
||||
by_name.remove(&labels);
|
||||
by_name.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
drop(d);
|
||||
|
||||
if delete_by_name {
|
||||
self.descriptions.write().unwrap().remove(&name);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut d = self.distributions.write().unwrap();
|
||||
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
|
||||
|
||||
let entry = outer_entry
|
||||
.entry(labels)
|
||||
.or_insert_with(Summary::with_defaults);
|
||||
|
||||
histogram.get_inner().clear_with(|samples| {
|
||||
for sample in samples {
|
||||
entry.add(*sample);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
let d = self.distributions.read().unwrap().clone();
|
||||
d.into_iter()
|
||||
.map(|(key, value)| {
|
||||
(
|
||||
key,
|
||||
value
|
||||
.into_iter()
|
||||
.map(|(labels, summary)| Histogram {
|
||||
labels: labels.into_iter().collect(),
|
||||
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
||||
.into_iter()
|
||||
.map(|q| (q, summary.quantile(q)))
|
||||
.collect(),
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> Snapshot {
|
||||
Snapshot {
|
||||
counters: self.snapshot_counters(),
|
||||
gauges: self.snapshot_gauges(),
|
||||
histograms: self.snapshot_histograms(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryCollector {
|
||||
pub(crate) fn new() -> Self {
|
||||
MemoryCollector {
|
||||
inner: Arc::new(Inner {
|
||||
descriptions: Default::default(),
|
||||
distributions: Default::default(),
|
||||
recency: Recency::new(
|
||||
Clock::new(),
|
||||
MetricKindMask::ALL,
|
||||
Some(Duration::from_secs(5 * DAYS)),
|
||||
),
|
||||
registry: Registry::new(GenerationalStorage::atomic()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn snapshot(&self) -> Snapshot {
|
||||
self.inner.snapshot()
|
||||
}
|
||||
|
||||
fn add_description_if_missing(
|
||||
&self,
|
||||
key: &metrics::KeyName,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
let mut d = self.inner.descriptions.write().unwrap();
|
||||
d.entry(key.as_str().to_owned()).or_insert(description);
|
||||
}
|
||||
|
||||
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
||||
metrics::set_boxed_recorder(Box::new(self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Recorder for MemoryCollector {
|
||||
fn describe_counter(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn describe_gauge(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn describe_histogram(
|
||||
&self,
|
||||
key: metrics::KeyName,
|
||||
_: Option<metrics::Unit>,
|
||||
description: metrics::SharedString,
|
||||
) {
|
||||
self.add_description_if_missing(&key, description)
|
||||
}
|
||||
|
||||
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_counter(key, |c| c.clone().into())
|
||||
}
|
||||
|
||||
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_gauge(key, |c| c.clone().into())
|
||||
}
|
||||
|
||||
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||
self.inner
|
||||
.registry
|
||||
.get_or_create_histogram(key, |c| c.clone().into())
|
||||
}
|
||||
}
|
|
@ -4,11 +4,12 @@
|
|||
use activitystreams::iri_string::types::IriString;
|
||||
use actix_rt::task::JoinHandle;
|
||||
use actix_web::{middleware::Compress, web, App, HttpServer};
|
||||
use collector::{DoubleRecorder, MemoryCollector};
|
||||
use collector::MemoryCollector;
|
||||
#[cfg(feature = "console")]
|
||||
use console_subscriber::ConsoleLayer;
|
||||
use http_signature_normalization_actix::middleware::VerifySignature;
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use metrics_util::layers::FanoutBuilder;
|
||||
use opentelemetry::{sdk::Resource, KeyValue};
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use rustls::ServerConfig;
|
||||
|
@ -119,7 +120,11 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
.build()?;
|
||||
|
||||
actix_rt::spawn(exporter);
|
||||
DoubleRecorder::new(recorder, collector.clone()).install()?;
|
||||
let recorder = FanoutBuilder::default()
|
||||
.add_recorder(recorder)
|
||||
.add_recorder(collector.clone())
|
||||
.build();
|
||||
metrics::set_boxed_recorder(Box::new(recorder))?;
|
||||
} else {
|
||||
collector.install()?;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue