forked from mirrors/relay
Add optional prometheus scrape endpoint
This commit is contained in:
parent
07b961c28f
commit
fafba69258
8 changed files with 633 additions and 419 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
@ -298,7 +298,7 @@ checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ap-relay"
|
name = "ap-relay"
|
||||||
version = "0.3.77"
|
version = "0.3.78"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitystreams",
|
"activitystreams",
|
||||||
"activitystreams-ext",
|
"activitystreams-ext",
|
||||||
|
@ -320,6 +320,7 @@ dependencies = [
|
||||||
"http-signature-normalization-actix",
|
"http-signature-normalization-actix",
|
||||||
"lru",
|
"lru",
|
||||||
"metrics",
|
"metrics",
|
||||||
|
"metrics-exporter-prometheus",
|
||||||
"metrics-util",
|
"metrics-util",
|
||||||
"mime",
|
"mime",
|
||||||
"minify-html",
|
"minify-html",
|
||||||
|
@ -1717,6 +1718,24 @@ dependencies = [
|
||||||
"portable-atomic",
|
"portable-atomic",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "metrics-exporter-prometheus"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70"
|
||||||
|
dependencies = [
|
||||||
|
"hyper",
|
||||||
|
"indexmap",
|
||||||
|
"ipnet",
|
||||||
|
"metrics",
|
||||||
|
"metrics-util",
|
||||||
|
"parking_lot 0.12.1",
|
||||||
|
"portable-atomic",
|
||||||
|
"quanta",
|
||||||
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "metrics-macros"
|
name = "metrics-macros"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "ap-relay"
|
name = "ap-relay"
|
||||||
description = "A simple activitypub relay"
|
description = "A simple activitypub relay"
|
||||||
version = "0.3.77"
|
version = "0.3.78"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@ -43,6 +43,9 @@ dotenv = "0.15.0"
|
||||||
futures-util = "0.3.17"
|
futures-util = "0.3.17"
|
||||||
lru = "0.8.0"
|
lru = "0.8.0"
|
||||||
metrics = "0.20.1"
|
metrics = "0.20.1"
|
||||||
|
metrics-exporter-prometheus = { version = "0.11.0", default-features = false, features = [
|
||||||
|
"http-listener",
|
||||||
|
] }
|
||||||
metrics-util = "0.14.0"
|
metrics-util = "0.14.0"
|
||||||
mime = "0.3.16"
|
mime = "0.3.16"
|
||||||
minify-html = "0.10.0"
|
minify-html = "0.10.0"
|
||||||
|
|
|
@ -10,7 +10,7 @@ $ sudo docker run --rm -it \
|
||||||
-e ADDR=0.0.0.0 \
|
-e ADDR=0.0.0.0 \
|
||||||
-e SLED_PATH=/mnt/sled/db-0.34 \
|
-e SLED_PATH=/mnt/sled/db-0.34 \
|
||||||
-p 8080:8080 \
|
-p 8080:8080 \
|
||||||
asonix/relay:0.3.73
|
asonix/relay:0.3.78
|
||||||
```
|
```
|
||||||
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
|
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
|
||||||
#### Cargo
|
#### Cargo
|
||||||
|
@ -103,6 +103,8 @@ TLS_CERT=/path/to/cert
|
||||||
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
|
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
|
||||||
LOCAL_DOMAINS=masto.asonix.dog
|
LOCAL_DOMAINS=masto.asonix.dog
|
||||||
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
|
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
|
||||||
|
PROMETHEUS_ADDR=0.0.0.0
|
||||||
|
PROMETHEUS_PORT=9000
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Descriptions
|
#### Descriptions
|
||||||
|
@ -146,6 +148,10 @@ Optional - Add custom notes in the footer of the page
|
||||||
Optional - domains of mastodon servers run by the same admin as the relay
|
Optional - domains of mastodon servers run by the same admin as the relay
|
||||||
##### `LOCAL_BLURB`
|
##### `LOCAL_BLURB`
|
||||||
Optional - description for the relay
|
Optional - description for the relay
|
||||||
|
##### `PROMETHEUS_ADDR`
|
||||||
|
Optional - Address to bind to for serving the prometheus scrape endpoint
|
||||||
|
##### `PROMETHEUS_PORT`
|
||||||
|
Optional - Port to bind to for serving the prometheus scrape endpoint
|
||||||
|
|
||||||
### Subscribing
|
### Subscribing
|
||||||
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
|
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
|
||||||
|
|
417
src/collector.rs
417
src/collector.rs
|
@ -1,414 +1,5 @@
|
||||||
use metrics::{Key, Recorder, SetRecorderError};
|
mod double;
|
||||||
use metrics_util::{
|
mod stats;
|
||||||
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
|
||||||
MetricKindMask, Summary,
|
|
||||||
};
|
|
||||||
use quanta::Clock;
|
|
||||||
use std::{
|
|
||||||
collections::{BTreeMap, HashMap},
|
|
||||||
sync::{atomic::Ordering, Arc, RwLock},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
const SECONDS: u64 = 1;
|
pub(crate) use double::DoubleRecorder;
|
||||||
const MINUTES: u64 = 60 * SECONDS;
|
pub(crate) use stats::{MemoryCollector, Snapshot};
|
||||||
const HOURS: u64 = 60 * MINUTES;
|
|
||||||
const DAYS: u64 = 24 * HOURS;
|
|
||||||
|
|
||||||
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct MemoryCollector {
|
|
||||||
inner: Arc<Inner>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Inner {
|
|
||||||
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
|
|
||||||
distributions: RwLock<HashMap<String, DistributionMap>>,
|
|
||||||
recency: Recency<Key>,
|
|
||||||
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
||||||
struct Counter {
|
|
||||||
labels: BTreeMap<String, String>,
|
|
||||||
value: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Counter {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
let labels = self
|
|
||||||
.labels
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| format!("{}: {}", k, v))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(", ");
|
|
||||||
|
|
||||||
write!(f, "{} - {}", labels, self.value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
||||||
struct Gauge {
|
|
||||||
labels: BTreeMap<String, String>,
|
|
||||||
value: f64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Gauge {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
let labels = self
|
|
||||||
.labels
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| format!("{}: {}", k, v))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(", ");
|
|
||||||
|
|
||||||
write!(f, "{} - {}", labels, self.value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
||||||
struct Histogram {
|
|
||||||
labels: BTreeMap<String, String>,
|
|
||||||
value: Vec<(f64, Option<f64>)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Histogram {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
let labels = self
|
|
||||||
.labels
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| format!("{}: {}", k, v))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(", ");
|
|
||||||
|
|
||||||
let value = self
|
|
||||||
.value
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| {
|
|
||||||
if let Some(v) = v {
|
|
||||||
format!("{}: {:.6}", k, v)
|
|
||||||
} else {
|
|
||||||
format!("{}: None,", k)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(", ");
|
|
||||||
|
|
||||||
write!(f, "{} - {}", labels, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
||||||
pub(crate) struct Snapshot {
|
|
||||||
counters: HashMap<String, Vec<Counter>>,
|
|
||||||
gauges: HashMap<String, Vec<Gauge>>,
|
|
||||||
histograms: HashMap<String, Vec<Histogram>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
const PAIRS: [((&str, &str), &str); 2] = [
|
|
||||||
(
|
|
||||||
(
|
|
||||||
"background-jobs.worker.started",
|
|
||||||
"background-jobs.worker.finished",
|
|
||||||
),
|
|
||||||
"background-jobs.worker.running",
|
|
||||||
),
|
|
||||||
(
|
|
||||||
(
|
|
||||||
"background-jobs.job.started",
|
|
||||||
"background-jobs.job.finished",
|
|
||||||
),
|
|
||||||
"background-jobs.job.running",
|
|
||||||
),
|
|
||||||
];
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct MergeCounter {
|
|
||||||
start: Option<Counter>,
|
|
||||||
finish: Option<Counter>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MergeCounter {
|
|
||||||
fn merge(self) -> Option<Counter> {
|
|
||||||
match (self.start, self.finish) {
|
|
||||||
(Some(start), Some(end)) => Some(Counter {
|
|
||||||
labels: start.labels,
|
|
||||||
value: start.value.saturating_sub(end.value),
|
|
||||||
}),
|
|
||||||
(Some(only), None) => Some(only),
|
|
||||||
(None, Some(only)) => Some(Counter {
|
|
||||||
labels: only.labels,
|
|
||||||
value: 0,
|
|
||||||
}),
|
|
||||||
(None, None) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Snapshot {
|
|
||||||
pub(crate) fn present(self) {
|
|
||||||
if !self.counters.is_empty() {
|
|
||||||
println!("Counters");
|
|
||||||
let mut merging = HashMap::new();
|
|
||||||
for (key, counters) in self.counters {
|
|
||||||
if let Some(((start, _), name)) = PAIRS
|
|
||||||
.iter()
|
|
||||||
.find(|((start, finish), _)| *start == key || *finish == key)
|
|
||||||
{
|
|
||||||
let entry = merging.entry(name).or_insert_with(HashMap::new);
|
|
||||||
|
|
||||||
for counter in counters {
|
|
||||||
let mut merge_counter = entry
|
|
||||||
.entry(counter.labels.clone())
|
|
||||||
.or_insert_with(MergeCounter::default);
|
|
||||||
if key == *start {
|
|
||||||
merge_counter.start = Some(counter);
|
|
||||||
} else {
|
|
||||||
merge_counter.finish = Some(counter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("\t{}", key);
|
|
||||||
for counter in counters {
|
|
||||||
println!("\t\t{}", counter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (key, counters) in merging {
|
|
||||||
println!("\t{}", key);
|
|
||||||
|
|
||||||
for (_, counter) in counters {
|
|
||||||
if let Some(counter) = counter.merge() {
|
|
||||||
println!("\t\t{}", counter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.gauges.is_empty() {
|
|
||||||
println!("Gauges");
|
|
||||||
for (key, gauges) in self.gauges {
|
|
||||||
println!("\t{}", key);
|
|
||||||
|
|
||||||
for gauge in gauges {
|
|
||||||
println!("\t\t{}", gauge);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.histograms.is_empty() {
|
|
||||||
println!("Histograms");
|
|
||||||
for (key, histograms) in self.histograms {
|
|
||||||
println!("\t{}", key);
|
|
||||||
|
|
||||||
for histogram in histograms {
|
|
||||||
println!("\t\t{}", histogram);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
|
||||||
let labels = key
|
|
||||||
.labels()
|
|
||||||
.into_iter()
|
|
||||||
.map(|label| (label.key().to_string(), label.value().to_string()))
|
|
||||||
.collect();
|
|
||||||
let name = key.name().to_string();
|
|
||||||
(name, labels)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Inner {
|
|
||||||
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
|
|
||||||
let mut counters = HashMap::new();
|
|
||||||
|
|
||||||
for (key, counter) in self.registry.get_counter_handles() {
|
|
||||||
let gen = counter.get_generation();
|
|
||||||
if !self.recency.should_store_counter(&key, gen, &self.registry) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (name, labels) = key_to_parts(&key);
|
|
||||||
let value = counter.get_inner().load(Ordering::Acquire);
|
|
||||||
counters.entry(name).or_insert_with(Vec::new).push(Counter {
|
|
||||||
labels: labels.into_iter().collect(),
|
|
||||||
value,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
counters
|
|
||||||
}
|
|
||||||
|
|
||||||
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
|
|
||||||
let mut gauges = HashMap::new();
|
|
||||||
|
|
||||||
for (key, gauge) in self.registry.get_gauge_handles() {
|
|
||||||
let gen = gauge.get_generation();
|
|
||||||
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (name, labels) = key_to_parts(&key);
|
|
||||||
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
|
||||||
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
|
|
||||||
labels: labels.into_iter().collect(),
|
|
||||||
value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
gauges
|
|
||||||
}
|
|
||||||
|
|
||||||
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
|
|
||||||
for (key, histogram) in self.registry.get_histogram_handles() {
|
|
||||||
let gen = histogram.get_generation();
|
|
||||||
let (name, labels) = key_to_parts(&key);
|
|
||||||
|
|
||||||
if !self
|
|
||||||
.recency
|
|
||||||
.should_store_histogram(&key, gen, &self.registry)
|
|
||||||
{
|
|
||||||
let mut d = self.distributions.write().unwrap();
|
|
||||||
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
|
|
||||||
by_name.remove(&labels);
|
|
||||||
by_name.is_empty()
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
drop(d);
|
|
||||||
|
|
||||||
if delete_by_name {
|
|
||||||
self.descriptions.write().unwrap().remove(&name);
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut d = self.distributions.write().unwrap();
|
|
||||||
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
|
|
||||||
|
|
||||||
let entry = outer_entry
|
|
||||||
.entry(labels)
|
|
||||||
.or_insert_with(Summary::with_defaults);
|
|
||||||
|
|
||||||
histogram.get_inner().clear_with(|samples| {
|
|
||||||
for sample in samples {
|
|
||||||
entry.add(*sample);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
let d = self.distributions.read().unwrap().clone();
|
|
||||||
d.into_iter()
|
|
||||||
.map(|(key, value)| {
|
|
||||||
(
|
|
||||||
key,
|
|
||||||
value
|
|
||||||
.into_iter()
|
|
||||||
.map(|(labels, summary)| Histogram {
|
|
||||||
labels: labels.into_iter().collect(),
|
|
||||||
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
|
||||||
.into_iter()
|
|
||||||
.map(|q| (q, summary.quantile(q)))
|
|
||||||
.collect(),
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn snapshot(&self) -> Snapshot {
|
|
||||||
Snapshot {
|
|
||||||
counters: self.snapshot_counters(),
|
|
||||||
gauges: self.snapshot_gauges(),
|
|
||||||
histograms: self.snapshot_histograms(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MemoryCollector {
|
|
||||||
pub(crate) fn new() -> Self {
|
|
||||||
MemoryCollector {
|
|
||||||
inner: Arc::new(Inner {
|
|
||||||
descriptions: Default::default(),
|
|
||||||
distributions: Default::default(),
|
|
||||||
recency: Recency::new(
|
|
||||||
Clock::new(),
|
|
||||||
MetricKindMask::ALL,
|
|
||||||
Some(Duration::from_secs(5 * DAYS)),
|
|
||||||
),
|
|
||||||
registry: Registry::new(GenerationalStorage::atomic()),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
|
||||||
metrics::set_boxed_recorder(Box::new(self.clone()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn snapshot(&self) -> Snapshot {
|
|
||||||
self.inner.snapshot()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_description_if_missing(
|
|
||||||
&self,
|
|
||||||
key: &metrics::KeyName,
|
|
||||||
description: metrics::SharedString,
|
|
||||||
) {
|
|
||||||
let mut d = self.inner.descriptions.write().unwrap();
|
|
||||||
d.entry(key.as_str().to_owned()).or_insert(description);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Recorder for MemoryCollector {
|
|
||||||
fn describe_counter(
|
|
||||||
&self,
|
|
||||||
key: metrics::KeyName,
|
|
||||||
_: Option<metrics::Unit>,
|
|
||||||
description: metrics::SharedString,
|
|
||||||
) {
|
|
||||||
self.add_description_if_missing(&key, description)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn describe_gauge(
|
|
||||||
&self,
|
|
||||||
key: metrics::KeyName,
|
|
||||||
_: Option<metrics::Unit>,
|
|
||||||
description: metrics::SharedString,
|
|
||||||
) {
|
|
||||||
self.add_description_if_missing(&key, description)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn describe_histogram(
|
|
||||||
&self,
|
|
||||||
key: metrics::KeyName,
|
|
||||||
_: Option<metrics::Unit>,
|
|
||||||
description: metrics::SharedString,
|
|
||||||
) {
|
|
||||||
self.add_description_if_missing(&key, description)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
|
||||||
self.inner
|
|
||||||
.registry
|
|
||||||
.get_or_create_counter(key, |c| c.clone().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
|
||||||
self.inner
|
|
||||||
.registry
|
|
||||||
.get_or_create_gauge(key, |c| c.clone().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
|
||||||
self.inner
|
|
||||||
.registry
|
|
||||||
.get_or_create_histogram(key, |c| c.clone().into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
133
src/collector/double.rs
Normal file
133
src/collector/double.rs
Normal file
|
@ -0,0 +1,133 @@
|
||||||
|
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct DoubleRecorder<R, S> {
|
||||||
|
first: R,
|
||||||
|
second: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DoubleCounter {
|
||||||
|
first: metrics::Counter,
|
||||||
|
second: metrics::Counter,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DoubleGauge {
|
||||||
|
first: metrics::Gauge,
|
||||||
|
second: metrics::Gauge,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DoubleHistogram {
|
||||||
|
first: metrics::Histogram,
|
||||||
|
second: metrics::Histogram,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R, S> DoubleRecorder<R, S> {
|
||||||
|
pub(crate) fn new(first: R, second: S) -> Self {
|
||||||
|
DoubleRecorder { first, second }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn install(self) -> Result<(), SetRecorderError>
|
||||||
|
where
|
||||||
|
R: Recorder + 'static,
|
||||||
|
S: Recorder + 'static,
|
||||||
|
{
|
||||||
|
metrics::set_boxed_recorder(Box::new(self))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R, S> Recorder for DoubleRecorder<R, S>
|
||||||
|
where
|
||||||
|
R: Recorder,
|
||||||
|
S: Recorder,
|
||||||
|
{
|
||||||
|
fn describe_counter(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
unit: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.first
|
||||||
|
.describe_counter(key.clone(), unit, description.clone());
|
||||||
|
self.second.describe_counter(key, unit, description);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_gauge(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
unit: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.first
|
||||||
|
.describe_gauge(key.clone(), unit, description.clone());
|
||||||
|
self.second.describe_gauge(key, unit, description);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_histogram(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
unit: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.first
|
||||||
|
.describe_histogram(key.clone(), unit, description.clone());
|
||||||
|
self.second.describe_histogram(key, unit, description);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||||
|
let first = self.first.register_counter(key);
|
||||||
|
let second = self.second.register_counter(key);
|
||||||
|
|
||||||
|
metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second }))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||||
|
let first = self.first.register_gauge(key);
|
||||||
|
let second = self.second.register_gauge(key);
|
||||||
|
|
||||||
|
metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second }))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||||
|
let first = self.first.register_histogram(key);
|
||||||
|
let second = self.second.register_histogram(key);
|
||||||
|
|
||||||
|
metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CounterFn for DoubleCounter {
|
||||||
|
fn increment(&self, value: u64) {
|
||||||
|
self.first.increment(value);
|
||||||
|
self.second.increment(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn absolute(&self, value: u64) {
|
||||||
|
self.first.absolute(value);
|
||||||
|
self.second.absolute(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GaugeFn for DoubleGauge {
|
||||||
|
fn increment(&self, value: f64) {
|
||||||
|
self.first.increment(value);
|
||||||
|
self.second.increment(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decrement(&self, value: f64) {
|
||||||
|
self.first.decrement(value);
|
||||||
|
self.second.decrement(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set(&self, value: f64) {
|
||||||
|
self.first.set(value);
|
||||||
|
self.second.set(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HistogramFn for DoubleHistogram {
|
||||||
|
fn record(&self, value: f64) {
|
||||||
|
self.first.record(value);
|
||||||
|
self.second.record(value);
|
||||||
|
}
|
||||||
|
}
|
414
src/collector/stats.rs
Normal file
414
src/collector/stats.rs
Normal file
|
@ -0,0 +1,414 @@
|
||||||
|
use metrics::{Key, Recorder, SetRecorderError};
|
||||||
|
use metrics_util::{
|
||||||
|
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
||||||
|
MetricKindMask, Summary,
|
||||||
|
};
|
||||||
|
use quanta::Clock;
|
||||||
|
use std::{
|
||||||
|
collections::{BTreeMap, HashMap},
|
||||||
|
sync::{atomic::Ordering, Arc, RwLock},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
const SECONDS: u64 = 1;
|
||||||
|
const MINUTES: u64 = 60 * SECONDS;
|
||||||
|
const HOURS: u64 = 60 * MINUTES;
|
||||||
|
const DAYS: u64 = 24 * HOURS;
|
||||||
|
|
||||||
|
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MemoryCollector {
|
||||||
|
inner: Arc<Inner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
|
||||||
|
distributions: RwLock<HashMap<String, DistributionMap>>,
|
||||||
|
recency: Recency<Key>,
|
||||||
|
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
struct Counter {
|
||||||
|
labels: BTreeMap<String, String>,
|
||||||
|
value: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Counter {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, self.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
struct Gauge {
|
||||||
|
labels: BTreeMap<String, String>,
|
||||||
|
value: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Gauge {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, self.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
struct Histogram {
|
||||||
|
labels: BTreeMap<String, String>,
|
||||||
|
value: Vec<(f64, Option<f64>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for Histogram {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let labels = self
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| format!("{}: {}", k, v))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
let value = self
|
||||||
|
.value
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| {
|
||||||
|
if let Some(v) = v {
|
||||||
|
format!("{}: {:.6}", k, v)
|
||||||
|
} else {
|
||||||
|
format!("{}: None,", k)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
write!(f, "{} - {}", labels, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub(crate) struct Snapshot {
|
||||||
|
counters: HashMap<String, Vec<Counter>>,
|
||||||
|
gauges: HashMap<String, Vec<Gauge>>,
|
||||||
|
histograms: HashMap<String, Vec<Histogram>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
const PAIRS: [((&str, &str), &str); 2] = [
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"background-jobs.worker.started",
|
||||||
|
"background-jobs.worker.finished",
|
||||||
|
),
|
||||||
|
"background-jobs.worker.running",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"background-jobs.job.started",
|
||||||
|
"background-jobs.job.finished",
|
||||||
|
),
|
||||||
|
"background-jobs.job.running",
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct MergeCounter {
|
||||||
|
start: Option<Counter>,
|
||||||
|
finish: Option<Counter>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MergeCounter {
|
||||||
|
fn merge(self) -> Option<Counter> {
|
||||||
|
match (self.start, self.finish) {
|
||||||
|
(Some(start), Some(end)) => Some(Counter {
|
||||||
|
labels: start.labels,
|
||||||
|
value: start.value.saturating_sub(end.value),
|
||||||
|
}),
|
||||||
|
(Some(only), None) => Some(only),
|
||||||
|
(None, Some(only)) => Some(Counter {
|
||||||
|
labels: only.labels,
|
||||||
|
value: 0,
|
||||||
|
}),
|
||||||
|
(None, None) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Snapshot {
|
||||||
|
pub(crate) fn present(self) {
|
||||||
|
if !self.counters.is_empty() {
|
||||||
|
println!("Counters");
|
||||||
|
let mut merging = HashMap::new();
|
||||||
|
for (key, counters) in self.counters {
|
||||||
|
if let Some(((start, _), name)) = PAIRS
|
||||||
|
.iter()
|
||||||
|
.find(|((start, finish), _)| *start == key || *finish == key)
|
||||||
|
{
|
||||||
|
let entry = merging.entry(name).or_insert_with(HashMap::new);
|
||||||
|
|
||||||
|
for counter in counters {
|
||||||
|
let mut merge_counter = entry
|
||||||
|
.entry(counter.labels.clone())
|
||||||
|
.or_insert_with(MergeCounter::default);
|
||||||
|
if key == *start {
|
||||||
|
merge_counter.start = Some(counter);
|
||||||
|
} else {
|
||||||
|
merge_counter.finish = Some(counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("\t{}", key);
|
||||||
|
for counter in counters {
|
||||||
|
println!("\t\t{}", counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (key, counters) in merging {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for (_, counter) in counters {
|
||||||
|
if let Some(counter) = counter.merge() {
|
||||||
|
println!("\t\t{}", counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.gauges.is_empty() {
|
||||||
|
println!("Gauges");
|
||||||
|
for (key, gauges) in self.gauges {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for gauge in gauges {
|
||||||
|
println!("\t\t{}", gauge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.histograms.is_empty() {
|
||||||
|
println!("Histograms");
|
||||||
|
for (key, histograms) in self.histograms {
|
||||||
|
println!("\t{}", key);
|
||||||
|
|
||||||
|
for histogram in histograms {
|
||||||
|
println!("\t\t{}", histogram);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
|
||||||
|
let labels = key
|
||||||
|
.labels()
|
||||||
|
.into_iter()
|
||||||
|
.map(|label| (label.key().to_string(), label.value().to_string()))
|
||||||
|
.collect();
|
||||||
|
let name = key.name().to_string();
|
||||||
|
(name, labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inner {
|
||||||
|
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
|
||||||
|
let mut counters = HashMap::new();
|
||||||
|
|
||||||
|
for (key, counter) in self.registry.get_counter_handles() {
|
||||||
|
let gen = counter.get_generation();
|
||||||
|
if !self.recency.should_store_counter(&key, gen, &self.registry) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
let value = counter.get_inner().load(Ordering::Acquire);
|
||||||
|
counters.entry(name).or_insert_with(Vec::new).push(Counter {
|
||||||
|
labels: labels.into_iter().collect(),
|
||||||
|
value,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
counters
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
|
||||||
|
let mut gauges = HashMap::new();
|
||||||
|
|
||||||
|
for (key, gauge) in self.registry.get_gauge_handles() {
|
||||||
|
let gen = gauge.get_generation();
|
||||||
|
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
|
||||||
|
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
|
||||||
|
labels: labels.into_iter().collect(),
|
||||||
|
value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
gauges
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
|
||||||
|
for (key, histogram) in self.registry.get_histogram_handles() {
|
||||||
|
let gen = histogram.get_generation();
|
||||||
|
let (name, labels) = key_to_parts(&key);
|
||||||
|
|
||||||
|
if !self
|
||||||
|
.recency
|
||||||
|
.should_store_histogram(&key, gen, &self.registry)
|
||||||
|
{
|
||||||
|
let mut d = self.distributions.write().unwrap();
|
||||||
|
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
|
||||||
|
by_name.remove(&labels);
|
||||||
|
by_name.is_empty()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
drop(d);
|
||||||
|
|
||||||
|
if delete_by_name {
|
||||||
|
self.descriptions.write().unwrap().remove(&name);
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut d = self.distributions.write().unwrap();
|
||||||
|
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
|
||||||
|
|
||||||
|
let entry = outer_entry
|
||||||
|
.entry(labels)
|
||||||
|
.or_insert_with(Summary::with_defaults);
|
||||||
|
|
||||||
|
histogram.get_inner().clear_with(|samples| {
|
||||||
|
for sample in samples {
|
||||||
|
entry.add(*sample);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
let d = self.distributions.read().unwrap().clone();
|
||||||
|
d.into_iter()
|
||||||
|
.map(|(key, value)| {
|
||||||
|
(
|
||||||
|
key,
|
||||||
|
value
|
||||||
|
.into_iter()
|
||||||
|
.map(|(labels, summary)| Histogram {
|
||||||
|
labels: labels.into_iter().collect(),
|
||||||
|
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
|
||||||
|
.into_iter()
|
||||||
|
.map(|q| (q, summary.quantile(q)))
|
||||||
|
.collect(),
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self) -> Snapshot {
|
||||||
|
Snapshot {
|
||||||
|
counters: self.snapshot_counters(),
|
||||||
|
gauges: self.snapshot_gauges(),
|
||||||
|
histograms: self.snapshot_histograms(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MemoryCollector {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
MemoryCollector {
|
||||||
|
inner: Arc::new(Inner {
|
||||||
|
descriptions: Default::default(),
|
||||||
|
distributions: Default::default(),
|
||||||
|
recency: Recency::new(
|
||||||
|
Clock::new(),
|
||||||
|
MetricKindMask::ALL,
|
||||||
|
Some(Duration::from_secs(5 * DAYS)),
|
||||||
|
),
|
||||||
|
registry: Registry::new(GenerationalStorage::atomic()),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn snapshot(&self) -> Snapshot {
|
||||||
|
self.inner.snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_description_if_missing(
|
||||||
|
&self,
|
||||||
|
key: &metrics::KeyName,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
let mut d = self.inner.descriptions.write().unwrap();
|
||||||
|
d.entry(key.as_str().to_owned()).or_insert(description);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
||||||
|
metrics::set_boxed_recorder(Box::new(self.clone()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Recorder for MemoryCollector {
|
||||||
|
fn describe_counter(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_gauge(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe_histogram(
|
||||||
|
&self,
|
||||||
|
key: metrics::KeyName,
|
||||||
|
_: Option<metrics::Unit>,
|
||||||
|
description: metrics::SharedString,
|
||||||
|
) {
|
||||||
|
self.add_description_if_missing(&key, description)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_counter(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_gauge(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||||
|
self.inner
|
||||||
|
.registry
|
||||||
|
.get_or_create_histogram(key, |c| c.clone().into())
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,7 +14,11 @@ use config::Environment;
|
||||||
use http_signature_normalization_actix::prelude::VerifyDigest;
|
use http_signature_normalization_actix::prelude::VerifyDigest;
|
||||||
use rustls::{Certificate, PrivateKey};
|
use rustls::{Certificate, PrivateKey};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use std::{io::BufReader, net::IpAddr, path::PathBuf};
|
use std::{
|
||||||
|
io::BufReader,
|
||||||
|
net::{IpAddr, SocketAddr},
|
||||||
|
path::PathBuf,
|
||||||
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize)]
|
#[derive(Clone, Debug, serde::Deserialize)]
|
||||||
|
@ -38,6 +42,8 @@ pub(crate) struct ParsedConfig {
|
||||||
footer_blurb: Option<String>,
|
footer_blurb: Option<String>,
|
||||||
local_domains: Option<String>,
|
local_domains: Option<String>,
|
||||||
local_blurb: Option<String>,
|
local_blurb: Option<String>,
|
||||||
|
prometheus_addr: Option<IpAddr>,
|
||||||
|
prometheus_port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -60,6 +66,7 @@ pub struct Config {
|
||||||
footer_blurb: Option<String>,
|
footer_blurb: Option<String>,
|
||||||
local_domains: Vec<String>,
|
local_domains: Vec<String>,
|
||||||
local_blurb: Option<String>,
|
local_blurb: Option<String>,
|
||||||
|
prometheus_config: Option<PrometheusConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -68,6 +75,12 @@ struct TlsConfig {
|
||||||
cert: PathBuf,
|
cert: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct PrometheusConfig {
|
||||||
|
addr: IpAddr,
|
||||||
|
port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum UrlKind {
|
pub enum UrlKind {
|
||||||
Activity,
|
Activity,
|
||||||
|
@ -120,6 +133,7 @@ impl std::fmt::Debug for Config {
|
||||||
.field("footer_blurb", &self.footer_blurb)
|
.field("footer_blurb", &self.footer_blurb)
|
||||||
.field("local_domains", &self.local_domains)
|
.field("local_domains", &self.local_domains)
|
||||||
.field("local_blurb", &self.local_blurb)
|
.field("local_blurb", &self.local_blurb)
|
||||||
|
.field("prometheus_config", &self.prometheus_config)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,6 +160,8 @@ impl Config {
|
||||||
.set_default("footer_blurb", None as Option<&str>)?
|
.set_default("footer_blurb", None as Option<&str>)?
|
||||||
.set_default("local_domains", None as Option<&str>)?
|
.set_default("local_domains", None as Option<&str>)?
|
||||||
.set_default("local_blurb", None as Option<&str>)?
|
.set_default("local_blurb", None as Option<&str>)?
|
||||||
|
.set_default("prometheus_addr", None as Option<&str>)?
|
||||||
|
.set_default("prometheus_port", None as Option<u16>)?
|
||||||
.add_source(Environment::default())
|
.add_source(Environment::default())
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
|
@ -174,6 +190,19 @@ impl Config {
|
||||||
.map(|d| d.to_string())
|
.map(|d| d.to_string())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let prometheus_config = match (config.prometheus_addr, config.prometheus_port) {
|
||||||
|
(Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }),
|
||||||
|
(Some(_), None) => {
|
||||||
|
tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
(None, Some(_)) => {
|
||||||
|
tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
(None, None) => None,
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Config {
|
Ok(Config {
|
||||||
hostname: config.hostname,
|
hostname: config.hostname,
|
||||||
addr: config.addr,
|
addr: config.addr,
|
||||||
|
@ -193,9 +222,16 @@ impl Config {
|
||||||
footer_blurb: config.footer_blurb,
|
footer_blurb: config.footer_blurb,
|
||||||
local_domains,
|
local_domains,
|
||||||
local_blurb: config.local_blurb,
|
local_blurb: config.local_blurb,
|
||||||
|
prometheus_config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
|
||||||
|
let config = self.prometheus_config.as_ref()?;
|
||||||
|
|
||||||
|
Some((config.addr, config.port).into())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
|
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
|
||||||
let tls = if let Some(tls) = &self.tls {
|
let tls = if let Some(tls) = &self.tls {
|
||||||
tls
|
tls
|
||||||
|
|
14
src/main.rs
14
src/main.rs
|
@ -4,10 +4,11 @@
|
||||||
use activitystreams::iri_string::types::IriString;
|
use activitystreams::iri_string::types::IriString;
|
||||||
use actix_rt::task::JoinHandle;
|
use actix_rt::task::JoinHandle;
|
||||||
use actix_web::{middleware::Compress, web, App, HttpServer};
|
use actix_web::{middleware::Compress, web, App, HttpServer};
|
||||||
use collector::MemoryCollector;
|
use collector::{DoubleRecorder, MemoryCollector};
|
||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
use console_subscriber::ConsoleLayer;
|
use console_subscriber::ConsoleLayer;
|
||||||
use http_signature_normalization_actix::middleware::VerifySignature;
|
use http_signature_normalization_actix::middleware::VerifySignature;
|
||||||
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use opentelemetry::{sdk::Resource, KeyValue};
|
use opentelemetry::{sdk::Resource, KeyValue};
|
||||||
use opentelemetry_otlp::WithExportConfig;
|
use opentelemetry_otlp::WithExportConfig;
|
||||||
use rustls::ServerConfig;
|
use rustls::ServerConfig;
|
||||||
|
@ -103,8 +104,19 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
let config = Config::build()?;
|
let config = Config::build()?;
|
||||||
|
|
||||||
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
|
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
|
||||||
|
|
||||||
let collector = MemoryCollector::new();
|
let collector = MemoryCollector::new();
|
||||||
|
|
||||||
|
if let Some(bind_addr) = config.prometheus_bind_address() {
|
||||||
|
let (recorder, exporter) = PrometheusBuilder::new()
|
||||||
|
.with_http_listener(bind_addr)
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
actix_rt::spawn(exporter);
|
||||||
|
DoubleRecorder::new(recorder, collector.clone()).install()?;
|
||||||
|
} else {
|
||||||
collector.install()?;
|
collector.install()?;
|
||||||
|
}
|
||||||
|
|
||||||
let args = Args::new();
|
let args = Args::new();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue