diff --git a/Cargo.lock b/Cargo.lock index cf4a6bd..008efcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,7 +298,7 @@ checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" [[package]] name = "ap-relay" -version = "0.3.77" +version = "0.3.78" dependencies = [ "activitystreams", "activitystreams-ext", @@ -320,6 +320,7 @@ dependencies = [ "http-signature-normalization-actix", "lru", "metrics", + "metrics-exporter-prometheus", "metrics-util", "mime", "minify-html", @@ -1717,6 +1718,24 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" +dependencies = [ + "hyper", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "thiserror", + "tokio", +] + [[package]] name = "metrics-macros" version = "0.6.0" diff --git a/Cargo.toml b/Cargo.toml index c58370d..a8705cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.77" +version = "0.3.78" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -43,6 +43,9 @@ dotenv = "0.15.0" futures-util = "0.3.17" lru = "0.8.0" metrics = "0.20.1" +metrics-exporter-prometheus = { version = "0.11.0", default-features = false, features = [ + "http-listener", +] } metrics-util = "0.14.0" mime = "0.3.16" minify-html = "0.10.0" diff --git a/README.md b/README.md index af7b4b3..dfa6356 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ $ sudo docker run --rm -it \ -e ADDR=0.0.0.0 \ -e SLED_PATH=/mnt/sled/db-0.34 \ -p 8080:8080 \ - asonix/relay:0.3.73 + asonix/relay:0.3.78 ``` This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080 #### Cargo @@ -103,6 +103,8 @@ TLS_CERT=/path/to/cert FOOTER_BLURB="Contact @asonix for inquiries" LOCAL_DOMAINS=masto.asonix.dog LOCAL_BLURB="

Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!

" +PROMETHEUS_ADDR=0.0.0.0 +PROMETHEUS_PORT=9000 ``` #### Descriptions @@ -146,6 +148,10 @@ Optional - Add custom notes in the footer of the page Optional - domains of mastodon servers run by the same admin as the relay ##### `LOCAL_BLURB` Optional - description for the relay +##### `PROMETHEUS_ADDR` +Optional - Address to bind to for serving the prometheus scrape endpoint +##### `PROMETHEUS_PORT` +Optional - Port to bind to for serving the prometheus scrape endpoint ### Subscribing Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. diff --git a/src/collector.rs b/src/collector.rs index 0d3536d..b2758b0 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -1,414 +1,5 @@ -use metrics::{Key, Recorder, SetRecorderError}; -use metrics_util::{ - registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, - MetricKindMask, Summary, -}; -use quanta::Clock; -use std::{ - collections::{BTreeMap, HashMap}, - sync::{atomic::Ordering, Arc, RwLock}, - time::Duration, -}; +mod double; +mod stats; -const SECONDS: u64 = 1; -const MINUTES: u64 = 60 * SECONDS; -const HOURS: u64 = 60 * MINUTES; -const DAYS: u64 = 24 * HOURS; - -type DistributionMap = BTreeMap, Summary>; - -#[derive(Clone)] -pub struct MemoryCollector { - inner: Arc, -} - -struct Inner { - descriptions: RwLock>, - distributions: RwLock>, - recency: Recency, - registry: Registry>, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Counter { - labels: BTreeMap, - value: u64, -} - -impl std::fmt::Display for Counter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}: {}", k, v)) - .collect::>() - .join(", "); - - write!(f, "{} - {}", labels, self.value) - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Gauge { - labels: BTreeMap, - value: f64, -} - -impl std::fmt::Display for Gauge { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}: {}", k, v)) - .collect::>() - .join(", "); - - write!(f, "{} - {}", labels, self.value) - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct Histogram { - labels: BTreeMap, - value: Vec<(f64, Option)>, -} - -impl std::fmt::Display for Histogram { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}: {}", k, v)) - .collect::>() - .join(", "); - - let value = self - .value - .iter() - .map(|(k, v)| { - if let Some(v) = v { - format!("{}: {:.6}", k, v) - } else { - format!("{}: None,", k) - } - }) - .collect::>() - .join(", "); - - write!(f, "{} - {}", labels, value) - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct Snapshot { - counters: HashMap>, - gauges: HashMap>, - histograms: HashMap>, -} - -const PAIRS: [((&str, &str), &str); 2] = [ - ( - ( - "background-jobs.worker.started", - "background-jobs.worker.finished", - ), - "background-jobs.worker.running", - ), - ( - ( - "background-jobs.job.started", - "background-jobs.job.finished", - ), - "background-jobs.job.running", - ), -]; - -#[derive(Default)] -struct MergeCounter { - start: Option, - finish: Option, -} - -impl MergeCounter { - fn merge(self) -> Option { - match (self.start, self.finish) { - (Some(start), Some(end)) => Some(Counter { - labels: start.labels, - value: start.value.saturating_sub(end.value), - }), - (Some(only), None) => Some(only), - (None, Some(only)) => Some(Counter { - labels: only.labels, - value: 0, - }), - (None, None) => None, - } - } -} - -impl Snapshot { - pub(crate) fn present(self) { - if !self.counters.is_empty() { - println!("Counters"); - let mut merging = HashMap::new(); - for (key, counters) in self.counters { - if let Some(((start, _), name)) = PAIRS - .iter() - .find(|((start, finish), _)| *start == key || *finish == key) - { - let entry = merging.entry(name).or_insert_with(HashMap::new); - - for counter in counters { - let mut merge_counter = entry - .entry(counter.labels.clone()) - .or_insert_with(MergeCounter::default); - if key == *start { - merge_counter.start = Some(counter); - } else { - merge_counter.finish = Some(counter); - } - } - - continue; - } - - println!("\t{}", key); - for counter in counters { - println!("\t\t{}", counter); - } - } - - for (key, counters) in merging { - println!("\t{}", key); - - for (_, counter) in counters { - if let Some(counter) = counter.merge() { - println!("\t\t{}", counter); - } - } - } - } - - if !self.gauges.is_empty() { - println!("Gauges"); - for (key, gauges) in self.gauges { - println!("\t{}", key); - - for gauge in gauges { - println!("\t\t{}", gauge); - } - } - } - - if !self.histograms.is_empty() { - println!("Histograms"); - for (key, histograms) in self.histograms { - println!("\t{}", key); - - for histogram in histograms { - println!("\t\t{}", histogram); - } - } - } - } -} - -fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { - let labels = key - .labels() - .into_iter() - .map(|label| (label.key().to_string(), label.value().to_string())) - .collect(); - let name = key.name().to_string(); - (name, labels) -} - -impl Inner { - fn snapshot_counters(&self) -> HashMap> { - let mut counters = HashMap::new(); - - for (key, counter) in self.registry.get_counter_handles() { - let gen = counter.get_generation(); - if !self.recency.should_store_counter(&key, gen, &self.registry) { - continue; - } - - let (name, labels) = key_to_parts(&key); - let value = counter.get_inner().load(Ordering::Acquire); - counters.entry(name).or_insert_with(Vec::new).push(Counter { - labels: labels.into_iter().collect(), - value, - }); - } - - counters - } - - fn snapshot_gauges(&self) -> HashMap> { - let mut gauges = HashMap::new(); - - for (key, gauge) in self.registry.get_gauge_handles() { - let gen = gauge.get_generation(); - if !self.recency.should_store_gauge(&key, gen, &self.registry) { - continue; - } - - let (name, labels) = key_to_parts(&key); - let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); - gauges.entry(name).or_insert_with(Vec::new).push(Gauge { - labels: labels.into_iter().collect(), - value, - }) - } - - gauges - } - - fn snapshot_histograms(&self) -> HashMap> { - for (key, histogram) in self.registry.get_histogram_handles() { - let gen = histogram.get_generation(); - let (name, labels) = key_to_parts(&key); - - if !self - .recency - .should_store_histogram(&key, gen, &self.registry) - { - let mut d = self.distributions.write().unwrap(); - let delete_by_name = if let Some(by_name) = d.get_mut(&name) { - by_name.remove(&labels); - by_name.is_empty() - } else { - false - }; - drop(d); - - if delete_by_name { - self.descriptions.write().unwrap().remove(&name); - } - - continue; - } - - let mut d = self.distributions.write().unwrap(); - let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new); - - let entry = outer_entry - .entry(labels) - .or_insert_with(Summary::with_defaults); - - histogram.get_inner().clear_with(|samples| { - for sample in samples { - entry.add(*sample); - } - }) - } - - let d = self.distributions.read().unwrap().clone(); - d.into_iter() - .map(|(key, value)| { - ( - key, - value - .into_iter() - .map(|(labels, summary)| Histogram { - labels: labels.into_iter().collect(), - value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] - .into_iter() - .map(|q| (q, summary.quantile(q))) - .collect(), - }) - .collect(), - ) - }) - .collect() - } - - fn snapshot(&self) -> Snapshot { - Snapshot { - counters: self.snapshot_counters(), - gauges: self.snapshot_gauges(), - histograms: self.snapshot_histograms(), - } - } -} - -impl MemoryCollector { - pub(crate) fn new() -> Self { - MemoryCollector { - inner: Arc::new(Inner { - descriptions: Default::default(), - distributions: Default::default(), - recency: Recency::new( - Clock::new(), - MetricKindMask::ALL, - Some(Duration::from_secs(5 * DAYS)), - ), - registry: Registry::new(GenerationalStorage::atomic()), - }), - } - } - - pub(crate) fn install(&self) -> Result<(), SetRecorderError> { - metrics::set_boxed_recorder(Box::new(self.clone())) - } - - pub(crate) fn snapshot(&self) -> Snapshot { - self.inner.snapshot() - } - - fn add_description_if_missing( - &self, - key: &metrics::KeyName, - description: metrics::SharedString, - ) { - let mut d = self.inner.descriptions.write().unwrap(); - d.entry(key.as_str().to_owned()).or_insert(description); - } -} - -impl Recorder for MemoryCollector { - fn describe_counter( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn describe_gauge( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn describe_histogram( - &self, - key: metrics::KeyName, - _: Option, - description: metrics::SharedString, - ) { - self.add_description_if_missing(&key, description) - } - - fn register_counter(&self, key: &Key) -> metrics::Counter { - self.inner - .registry - .get_or_create_counter(key, |c| c.clone().into()) - } - - fn register_gauge(&self, key: &Key) -> metrics::Gauge { - self.inner - .registry - .get_or_create_gauge(key, |c| c.clone().into()) - } - - fn register_histogram(&self, key: &Key) -> metrics::Histogram { - self.inner - .registry - .get_or_create_histogram(key, |c| c.clone().into()) - } -} +pub(crate) use double::DoubleRecorder; +pub(crate) use stats::{MemoryCollector, Snapshot}; diff --git a/src/collector/double.rs b/src/collector/double.rs new file mode 100644 index 0000000..b78dbff --- /dev/null +++ b/src/collector/double.rs @@ -0,0 +1,133 @@ +use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError}; +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) struct DoubleRecorder { + first: R, + second: S, +} + +struct DoubleCounter { + first: metrics::Counter, + second: metrics::Counter, +} + +struct DoubleGauge { + first: metrics::Gauge, + second: metrics::Gauge, +} + +struct DoubleHistogram { + first: metrics::Histogram, + second: metrics::Histogram, +} + +impl DoubleRecorder { + pub(crate) fn new(first: R, second: S) -> Self { + DoubleRecorder { first, second } + } + + pub(crate) fn install(self) -> Result<(), SetRecorderError> + where + R: Recorder + 'static, + S: Recorder + 'static, + { + metrics::set_boxed_recorder(Box::new(self)) + } +} + +impl Recorder for DoubleRecorder +where + R: Recorder, + S: Recorder, +{ + fn describe_counter( + &self, + key: metrics::KeyName, + unit: Option, + description: metrics::SharedString, + ) { + self.first + .describe_counter(key.clone(), unit, description.clone()); + self.second.describe_counter(key, unit, description); + } + + fn describe_gauge( + &self, + key: metrics::KeyName, + unit: Option, + description: metrics::SharedString, + ) { + self.first + .describe_gauge(key.clone(), unit, description.clone()); + self.second.describe_gauge(key, unit, description); + } + + fn describe_histogram( + &self, + key: metrics::KeyName, + unit: Option, + description: metrics::SharedString, + ) { + self.first + .describe_histogram(key.clone(), unit, description.clone()); + self.second.describe_histogram(key, unit, description); + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + let first = self.first.register_counter(key); + let second = self.second.register_counter(key); + + metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second })) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + let first = self.first.register_gauge(key); + let second = self.second.register_gauge(key); + + metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second })) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + let first = self.first.register_histogram(key); + let second = self.second.register_histogram(key); + + metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second })) + } +} + +impl CounterFn for DoubleCounter { + fn increment(&self, value: u64) { + self.first.increment(value); + self.second.increment(value); + } + + fn absolute(&self, value: u64) { + self.first.absolute(value); + self.second.absolute(value); + } +} + +impl GaugeFn for DoubleGauge { + fn increment(&self, value: f64) { + self.first.increment(value); + self.second.increment(value); + } + + fn decrement(&self, value: f64) { + self.first.decrement(value); + self.second.decrement(value); + } + + fn set(&self, value: f64) { + self.first.set(value); + self.second.set(value); + } +} + +impl HistogramFn for DoubleHistogram { + fn record(&self, value: f64) { + self.first.record(value); + self.second.record(value); + } +} diff --git a/src/collector/stats.rs b/src/collector/stats.rs new file mode 100644 index 0000000..c8d1812 --- /dev/null +++ b/src/collector/stats.rs @@ -0,0 +1,414 @@ +use metrics::{Key, Recorder, SetRecorderError}; +use metrics_util::{ + registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, + MetricKindMask, Summary, +}; +use quanta::Clock; +use std::{ + collections::{BTreeMap, HashMap}, + sync::{atomic::Ordering, Arc, RwLock}, + time::Duration, +}; + +const SECONDS: u64 = 1; +const MINUTES: u64 = 60 * SECONDS; +const HOURS: u64 = 60 * MINUTES; +const DAYS: u64 = 24 * HOURS; + +type DistributionMap = BTreeMap, Summary>; + +#[derive(Clone)] +pub struct MemoryCollector { + inner: Arc, +} + +struct Inner { + descriptions: RwLock>, + distributions: RwLock>, + recency: Recency, + registry: Registry>, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Counter { + labels: BTreeMap, + value: u64, +} + +impl std::fmt::Display for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Gauge { + labels: BTreeMap, + value: f64, +} + +impl std::fmt::Display for Gauge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct Histogram { + labels: BTreeMap, + value: Vec<(f64, Option)>, +} + +impl std::fmt::Display for Histogram { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + let value = self + .value + .iter() + .map(|(k, v)| { + if let Some(v) = v { + format!("{}: {:.6}", k, v) + } else { + format!("{}: None,", k) + } + }) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, value) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub(crate) struct Snapshot { + counters: HashMap>, + gauges: HashMap>, + histograms: HashMap>, +} + +const PAIRS: [((&str, &str), &str); 2] = [ + ( + ( + "background-jobs.worker.started", + "background-jobs.worker.finished", + ), + "background-jobs.worker.running", + ), + ( + ( + "background-jobs.job.started", + "background-jobs.job.finished", + ), + "background-jobs.job.running", + ), +]; + +#[derive(Default)] +struct MergeCounter { + start: Option, + finish: Option, +} + +impl MergeCounter { + fn merge(self) -> Option { + match (self.start, self.finish) { + (Some(start), Some(end)) => Some(Counter { + labels: start.labels, + value: start.value.saturating_sub(end.value), + }), + (Some(only), None) => Some(only), + (None, Some(only)) => Some(Counter { + labels: only.labels, + value: 0, + }), + (None, None) => None, + } + } +} + +impl Snapshot { + pub(crate) fn present(self) { + if !self.counters.is_empty() { + println!("Counters"); + let mut merging = HashMap::new(); + for (key, counters) in self.counters { + if let Some(((start, _), name)) = PAIRS + .iter() + .find(|((start, finish), _)| *start == key || *finish == key) + { + let entry = merging.entry(name).or_insert_with(HashMap::new); + + for counter in counters { + let mut merge_counter = entry + .entry(counter.labels.clone()) + .or_insert_with(MergeCounter::default); + if key == *start { + merge_counter.start = Some(counter); + } else { + merge_counter.finish = Some(counter); + } + } + + continue; + } + + println!("\t{}", key); + for counter in counters { + println!("\t\t{}", counter); + } + } + + for (key, counters) in merging { + println!("\t{}", key); + + for (_, counter) in counters { + if let Some(counter) = counter.merge() { + println!("\t\t{}", counter); + } + } + } + } + + if !self.gauges.is_empty() { + println!("Gauges"); + for (key, gauges) in self.gauges { + println!("\t{}", key); + + for gauge in gauges { + println!("\t\t{}", gauge); + } + } + } + + if !self.histograms.is_empty() { + println!("Histograms"); + for (key, histograms) in self.histograms { + println!("\t{}", key); + + for histogram in histograms { + println!("\t\t{}", histogram); + } + } + } + } +} + +fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { + let labels = key + .labels() + .into_iter() + .map(|label| (label.key().to_string(), label.value().to_string())) + .collect(); + let name = key.name().to_string(); + (name, labels) +} + +impl Inner { + fn snapshot_counters(&self) -> HashMap> { + let mut counters = HashMap::new(); + + for (key, counter) in self.registry.get_counter_handles() { + let gen = counter.get_generation(); + if !self.recency.should_store_counter(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = counter.get_inner().load(Ordering::Acquire); + counters.entry(name).or_insert_with(Vec::new).push(Counter { + labels: labels.into_iter().collect(), + value, + }); + } + + counters + } + + fn snapshot_gauges(&self) -> HashMap> { + let mut gauges = HashMap::new(); + + for (key, gauge) in self.registry.get_gauge_handles() { + let gen = gauge.get_generation(); + if !self.recency.should_store_gauge(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); + gauges.entry(name).or_insert_with(Vec::new).push(Gauge { + labels: labels.into_iter().collect(), + value, + }) + } + + gauges + } + + fn snapshot_histograms(&self) -> HashMap> { + for (key, histogram) in self.registry.get_histogram_handles() { + let gen = histogram.get_generation(); + let (name, labels) = key_to_parts(&key); + + if !self + .recency + .should_store_histogram(&key, gen, &self.registry) + { + let mut d = self.distributions.write().unwrap(); + let delete_by_name = if let Some(by_name) = d.get_mut(&name) { + by_name.remove(&labels); + by_name.is_empty() + } else { + false + }; + drop(d); + + if delete_by_name { + self.descriptions.write().unwrap().remove(&name); + } + + continue; + } + + let mut d = self.distributions.write().unwrap(); + let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new); + + let entry = outer_entry + .entry(labels) + .or_insert_with(Summary::with_defaults); + + histogram.get_inner().clear_with(|samples| { + for sample in samples { + entry.add(*sample); + } + }) + } + + let d = self.distributions.read().unwrap().clone(); + d.into_iter() + .map(|(key, value)| { + ( + key, + value + .into_iter() + .map(|(labels, summary)| Histogram { + labels: labels.into_iter().collect(), + value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] + .into_iter() + .map(|q| (q, summary.quantile(q))) + .collect(), + }) + .collect(), + ) + }) + .collect() + } + + fn snapshot(&self) -> Snapshot { + Snapshot { + counters: self.snapshot_counters(), + gauges: self.snapshot_gauges(), + histograms: self.snapshot_histograms(), + } + } +} + +impl MemoryCollector { + pub(crate) fn new() -> Self { + MemoryCollector { + inner: Arc::new(Inner { + descriptions: Default::default(), + distributions: Default::default(), + recency: Recency::new( + Clock::new(), + MetricKindMask::ALL, + Some(Duration::from_secs(5 * DAYS)), + ), + registry: Registry::new(GenerationalStorage::atomic()), + }), + } + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.inner.snapshot() + } + + fn add_description_if_missing( + &self, + key: &metrics::KeyName, + description: metrics::SharedString, + ) { + let mut d = self.inner.descriptions.write().unwrap(); + d.entry(key.as_str().to_owned()).or_insert(description); + } + + pub(crate) fn install(&self) -> Result<(), SetRecorderError> { + metrics::set_boxed_recorder(Box::new(self.clone())) + } +} + +impl Recorder for MemoryCollector { + fn describe_counter( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_gauge( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_histogram( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + self.inner + .registry + .get_or_create_counter(key, |c| c.clone().into()) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + self.inner + .registry + .get_or_create_gauge(key, |c| c.clone().into()) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + self.inner + .registry + .get_or_create_histogram(key, |c| c.clone().into()) + } +} diff --git a/src/config.rs b/src/config.rs index bd70162..52657cf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,11 @@ use config::Environment; use http_signature_normalization_actix::prelude::VerifyDigest; use rustls::{Certificate, PrivateKey}; use sha2::{Digest, Sha256}; -use std::{io::BufReader, net::IpAddr, path::PathBuf}; +use std::{ + io::BufReader, + net::{IpAddr, SocketAddr}, + path::PathBuf, +}; use uuid::Uuid; #[derive(Clone, Debug, serde::Deserialize)] @@ -38,6 +42,8 @@ pub(crate) struct ParsedConfig { footer_blurb: Option, local_domains: Option, local_blurb: Option, + prometheus_addr: Option, + prometheus_port: Option, } #[derive(Clone)] @@ -60,6 +66,7 @@ pub struct Config { footer_blurb: Option, local_domains: Vec, local_blurb: Option, + prometheus_config: Option, } #[derive(Clone)] @@ -68,6 +75,12 @@ struct TlsConfig { cert: PathBuf, } +#[derive(Clone, Debug)] +struct PrometheusConfig { + addr: IpAddr, + port: u16, +} + #[derive(Debug)] pub enum UrlKind { Activity, @@ -120,6 +133,7 @@ impl std::fmt::Debug for Config { .field("footer_blurb", &self.footer_blurb) .field("local_domains", &self.local_domains) .field("local_blurb", &self.local_blurb) + .field("prometheus_config", &self.prometheus_config) .finish() } } @@ -146,6 +160,8 @@ impl Config { .set_default("footer_blurb", None as Option<&str>)? .set_default("local_domains", None as Option<&str>)? .set_default("local_blurb", None as Option<&str>)? + .set_default("prometheus_addr", None as Option<&str>)? + .set_default("prometheus_port", None as Option)? .add_source(Environment::default()) .build()?; @@ -174,6 +190,19 @@ impl Config { .map(|d| d.to_string()) .collect(); + let prometheus_config = match (config.prometheus_addr, config.prometheus_port) { + (Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }), + (Some(_), None) => { + tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config"); + None + } + (None, Some(_)) => { + tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config"); + None + } + (None, None) => None, + }; + Ok(Config { hostname: config.hostname, addr: config.addr, @@ -193,9 +222,16 @@ impl Config { footer_blurb: config.footer_blurb, local_domains, local_blurb: config.local_blurb, + prometheus_config, }) } + pub(crate) fn prometheus_bind_address(&self) -> Option { + let config = self.prometheus_config.as_ref()?; + + Some((config.addr, config.port).into()) + } + pub(crate) fn open_keys(&self) -> Result, PrivateKey)>, Error> { let tls = if let Some(tls) = &self.tls { tls diff --git a/src/main.rs b/src/main.rs index dccd8bb..f9976cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,11 @@ use activitystreams::iri_string::types::IriString; use actix_rt::task::JoinHandle; use actix_web::{middleware::Compress, web, App, HttpServer}; -use collector::MemoryCollector; +use collector::{DoubleRecorder, MemoryCollector}; #[cfg(feature = "console")] use console_subscriber::ConsoleLayer; use http_signature_normalization_actix::middleware::VerifySignature; +use metrics_exporter_prometheus::PrometheusBuilder; use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry_otlp::WithExportConfig; use rustls::ServerConfig; @@ -103,8 +104,19 @@ async fn main() -> Result<(), anyhow::Error> { let config = Config::build()?; init_subscriber(Config::software_name(), config.opentelemetry_url())?; + let collector = MemoryCollector::new(); - collector.install()?; + + if let Some(bind_addr) = config.prometheus_bind_address() { + let (recorder, exporter) = PrometheusBuilder::new() + .with_http_listener(bind_addr) + .build()?; + + actix_rt::spawn(exporter); + DoubleRecorder::new(recorder, collector.clone()).install()?; + } else { + collector.install()?; + } let args = Args::new();