forked from mirrors/relay
Improve presentation of stats
This commit is contained in:
parent
f892a50f2c
commit
99c3ec0b75
2 changed files with 172 additions and 14 deletions
184
src/collector.rs
184
src/collector.rs
|
@ -7,7 +7,7 @@ use metrics_util::{
|
||||||
};
|
};
|
||||||
use quanta::Clock;
|
use quanta::Clock;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::{BTreeMap, HashMap},
|
||||||
sync::{atomic::Ordering, Arc},
|
sync::{atomic::Ordering, Arc},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -31,22 +31,74 @@ struct Inner {
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
struct Counter {
|
struct Counter {
|
||||||
labels: Vec<(String, String)>,
|
labels: BTreeMap<String, String>,
|
||||||
value: u64,
|
value: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Counter {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, self.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
struct Gauge {
|
struct Gauge {
|
||||||
labels: Vec<(String, String)>,
|
labels: BTreeMap<String, String>,
|
||||||
value: f64,
|
value: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Gauge {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, self.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
struct Histogram {
|
struct Histogram {
|
||||||
labels: Vec<(String, String)>,
|
labels: BTreeMap<String, String>,
|
||||||
value: Vec<(f64, Option<f64>)>,
|
value: Vec<(f64, Option<f64>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Histogram {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
let value = self
|
||||||
|
.value
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| {
|
||||||
|
if let Some(v) = v {
|
||||||
|
format!("{}: {:.6}", k, v)
|
||||||
|
} else {
|
||||||
|
format!("{}: None,", k)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Snapshot {
|
pub(crate) struct Snapshot {
|
||||||
counters: HashMap<String, Vec<Counter>>,
|
counters: HashMap<String, Vec<Counter>>,
|
||||||
|
@ -54,6 +106,112 @@ pub(crate) struct Snapshot {
|
||||||
histograms: HashMap<String, Vec<Histogram>>,
|
histograms: HashMap<String, Vec<Histogram>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PAIRS: [((&str, &str), &str); 2] = [
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"background-jobs.worker.started",
|
||||||
|
"background-jobs.worker.finished",
|
||||||
|
),
|
||||||
|
"background-jobs.worker.running",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"background-jobs.job.started",
|
||||||
|
"background-jobs.job.finished",
|
||||||
|
),
|
||||||
|
"background-jobs.job.running",
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct MergeCounter {
|
||||||
|
start: Option<Counter>,
|
||||||
|
finish: Option<Counter>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MergeCounter {
|
||||||
|
fn merge(self) -> Option<Counter> {
|
||||||
|
match (self.start, self.finish) {
|
||||||
|
(Some(start), Some(end)) => Some(Counter {
|
||||||
|
labels: start.labels,
|
||||||
|
value: start.value.saturating_sub(end.value),
|
||||||
|
}),
|
||||||
|
(Some(only), None) | (None, Some(only)) => Some(Counter {
|
||||||
|
labels: only.labels,
|
||||||
|
value: 0,
|
||||||
|
}),
|
||||||
|
(None, None) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Snapshot {
|
||||||
|
pub(crate) fn present(self) {
|
||||||
|
if !self.counters.is_empty() {
|
||||||
|
println!("Counters");
|
||||||
|
let mut merging = HashMap::new();
|
||||||
|
for (key, counters) in self.counters {
|
||||||
|
if let Some(((start, _), name)) = PAIRS
|
||||||
|
.iter()
|
||||||
|
.find(|((start, finish), _)| *start == key || *finish == key)
|
||||||
|
{
|
||||||
|
let entry = merging.entry(name).or_insert_with(HashMap::new);
|
||||||
|
|
||||||
|
for counter in counters {
|
||||||
|
let mut merge_counter = entry
|
||||||
|
.entry(counter.labels.clone())
|
||||||
|
.or_insert_with(MergeCounter::default);
|
||||||
|
if key == *start {
|
||||||
|
merge_counter.start = Some(counter);
|
||||||
|
} else {
|
||||||
|
merge_counter.finish = Some(counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("\t{}", key);
|
||||||
|
for counter in counters {
|
||||||
|
println!("\t\t{}", counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (key, counters) in merging {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for (_, counter) in counters {
|
||||||
|
if let Some(counter) = counter.merge() {
|
||||||
|
println!("\t\t{}", counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.gauges.is_empty() {
|
||||||
|
println!("Gauges");
|
||||||
|
for (key, gauges) in self.gauges {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for gauge in gauges {
|
||||||
|
println!("\t\t{}", gauge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.histograms.is_empty() {
|
||||||
|
println!("Histograms");
|
||||||
|
for (key, histograms) in self.histograms {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for histogram in histograms {
|
||||||
|
println!("\t\t{}", histogram);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
||||||
let labels = key
|
let labels = key
|
||||||
.labels()
|
.labels()
|
||||||
|
@ -76,10 +234,10 @@ impl Inner {
|
||||||
|
|
||||||
let (name, labels) = key_to_parts(&key);
|
let (name, labels) = key_to_parts(&key);
|
||||||
let value = counter.get_inner().load(Ordering::Acquire);
|
let value = counter.get_inner().load(Ordering::Acquire);
|
||||||
counters
|
counters.entry(name).or_insert_with(Vec::new).push(Counter {
|
||||||
.entry(name)
|
labels: labels.into_iter().collect(),
|
||||||
.or_insert_with(Vec::new)
|
value,
|
||||||
.push(Counter { labels, value });
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
counters
|
counters
|
||||||
|
@ -96,10 +254,10 @@ impl Inner {
|
||||||
|
|
||||||
let (name, labels) = key_to_parts(&key);
|
let (name, labels) = key_to_parts(&key);
|
||||||
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
||||||
gauges
|
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
|
||||||
.entry(name)
|
labels: labels.into_iter().collect(),
|
||||||
.or_insert_with(Vec::new)
|
value,
|
||||||
.push(Gauge { labels, value })
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
gauges
|
gauges
|
||||||
|
@ -153,7 +311,7 @@ impl Inner {
|
||||||
.value()
|
.value()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(labels, summary)| Histogram {
|
.map(|(labels, summary)| Histogram {
|
||||||
labels: labels.clone(),
|
labels: labels.iter().cloned().collect(),
|
||||||
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|q| (q, summary.quantile(q)))
|
.map(|q| (q, summary.quantile(q)))
|
||||||
|
|
|
@ -144,7 +144,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
|
|
||||||
if args.stats() {
|
if args.stats() {
|
||||||
let stats = admin::client::stats(&client, &config).await?;
|
let stats = admin::client::stats(&client, &config).await?;
|
||||||
println!("{:#?}", stats);
|
stats.present();
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|
Loading…
Reference in a new issue