Add prometheus metrics

This commit is contained in:
asonix 2023-07-22 16:47:59 -05:00
parent d4719a76dc
commit ce0df080f4
13 changed files with 518 additions and 6 deletions

111
Cargo.lock generated
View file

@ -978,6 +978,15 @@ dependencies = [
"ahash 0.7.6", "ahash 0.7.6",
] ]
[[package]]
name = "hashbrown"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038"
dependencies = [
"ahash 0.8.3",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.14.0" version = "0.14.0"
@ -1285,6 +1294,15 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "mach2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.1.0" version = "0.1.0"
@ -1324,6 +1342,60 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "metrics"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5"
dependencies = [
"ahash 0.8.3",
"metrics-macros",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5"
dependencies = [
"base64 0.21.2",
"hyper",
"indexmap 1.9.3",
"ipnet",
"metrics",
"metrics-util",
"quanta",
"thiserror",
"tokio",
]
[[package]]
name = "metrics-macros"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.27",
]
[[package]]
name = "metrics-util"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.13.1",
"metrics",
"num_cpus",
"quanta",
"sketches-ddsketch",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@ -1661,6 +1733,8 @@ dependencies = [
"futures-util", "futures-util",
"hex", "hex",
"md-5", "md-5",
"metrics",
"metrics-exporter-prometheus",
"mime", "mime",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
@ -1728,6 +1802,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "portable-atomic"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc55135a600d700580e406b4de0d59cb9ad25e344a3a091a97ded2622ec4ec6"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.17" version = "0.2.17"
@ -1775,6 +1855,22 @@ dependencies = [
"prost", "prost",
] ]
[[package]]
name = "quanta"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab"
dependencies = [
"crossbeam-utils",
"libc",
"mach2",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]] [[package]]
name = "quick-xml" name = "quick-xml"
version = "0.27.1" version = "0.27.1"
@ -1834,6 +1930,15 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "raw-cpuid"
version = "10.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.2.16"
@ -2245,6 +2350,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "sketches-ddsketch"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.8" version = "0.4.8"

View file

@ -30,6 +30,8 @@ flume = "0.10.14"
futures-util = "0.3.17" futures-util = "0.3.17"
hex = "0.4.3" hex = "0.4.3"
md-5 = "0.10.5" md-5 = "0.10.5"
metrics = "0.21.1"
metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = ["http-listener"] }
mime = "0.3.1" mime = "0.3.1"
num_cpus = "1.13" num_cpus = "1.13"
once_cell = "1.4.0" once_cell = "1.4.0"

View file

@ -75,6 +75,8 @@ where
{ {
fn drop(&mut self) { fn drop(&mut self) {
if self.identifier.is_some() || self.upload_id.is_some() { if self.identifier.is_some() || self.upload_id.is_some() {
metrics::increment_counter!("pict-rs.background.upload.failure");
let cleanup_parent_span = let cleanup_parent_span =
tracing::info_span!(parent: None, "Dropped backgrounded cleanup"); tracing::info_span!(parent: None, "Dropped backgrounded cleanup");
cleanup_parent_span.follows_from(Span::current()); cleanup_parent_span.follows_from(Span::current());
@ -108,6 +110,8 @@ where
) )
}); });
} }
} else {
metrics::increment_counter!("pict-rs.background.upload.success");
} }
} }
} }

View file

@ -56,6 +56,8 @@ impl ProcessMap {
completed = &tracing::field::Empty, completed = &tracing::field::Empty,
); );
metrics::increment_counter!("pict-rs.process-map.inserted");
(CancelState::Sender { sender }, span) (CancelState::Sender { sender }, span)
} }
Entry::Occupied(receiver) => { Entry::Occupied(receiver) => {
@ -138,7 +140,9 @@ where
CancelState::Sender { sender } => { CancelState::Sender { sender } => {
let res = std::task::ready!(fut.poll(cx)); let res = std::task::ready!(fut.poll(cx));
process_map.remove(key); if process_map.remove(key).is_some() {
metrics::increment_counter!("pict-rs.process-map.removed");
}
if let Ok(tup) = &res { if let Ok(tup) = &res {
let _ = sender.try_send(tup.clone()); let _ = sender.try_send(tup.clone());
@ -158,6 +162,10 @@ impl Drop for CancelToken {
if self.state.is_sender() { if self.state.is_sender() {
let completed = self.process_map.remove(&self.key).is_none(); let completed = self.process_map.remove(&self.key).is_none();
self.span.record("completed", completed); self.span.record("completed", completed);
if !completed {
metrics::increment_counter!("pict-rs.process-map.removed");
}
} }
} }
} }

View file

@ -48,6 +48,7 @@ impl Args {
worker_id, worker_id,
client_pool_size, client_pool_size,
client_timeout, client_timeout,
metrics_prometheus_address,
media_preprocess_steps, media_preprocess_steps,
media_max_file_size, media_max_file_size,
media_image_max_width, media_image_max_width,
@ -104,6 +105,10 @@ impl Args {
timeout: client_timeout, timeout: client_timeout,
}; };
let metrics = Metrics {
prometheus_address: metrics_prometheus_address,
};
let image_quality = ImageQuality { let image_quality = ImageQuality {
avif: media_image_quality_avif, avif: media_image_quality_avif,
jpeg: media_image_quality_jpeg, jpeg: media_image_quality_jpeg,
@ -180,6 +185,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store, store,
repo, repo,
@ -197,6 +203,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store, store,
repo, repo,
@ -212,6 +219,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store: None, store: None,
repo: None, repo: None,
@ -229,6 +237,7 @@ impl Args {
let server = Server::default(); let server = Server::default();
let client = Client::default(); let client = Client::default();
let media = Media::default(); let media = Media::default();
let metrics = Metrics::default();
match store { match store {
MigrateStoreFrom::Filesystem(MigrateFilesystem { from, to }) => match to { MigrateStoreFrom::Filesystem(MigrateFilesystem { from, to }) => match to {
@ -238,6 +247,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store: None, store: None,
repo, repo,
@ -257,6 +267,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store: None, store: None,
repo, repo,
@ -280,6 +291,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store: None, store: None,
repo, repo,
@ -302,6 +314,7 @@ impl Args {
client, client,
old_db, old_db,
tracing, tracing,
metrics,
media, media,
store: None, store: None,
repo, repo,
@ -347,6 +360,7 @@ pub(super) struct ConfigFormat {
client: Client, client: Client,
old_db: OldDb, old_db: OldDb,
tracing: Tracing, tracing: Tracing,
metrics: Metrics,
media: Media, media: Media,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
repo: Option<Repo>, repo: Option<Repo>,
@ -415,6 +429,13 @@ struct OpenTelemetry {
targets: Option<Serde<Targets>>, targets: Option<Serde<Targets>>,
} }
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct Metrics {
#[serde(skip_serializing_if = "Option::is_none")]
prometheus_address: Option<SocketAddr>,
}
#[derive(Debug, Default, serde::Serialize)] #[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
struct OldDb { struct OldDb {
@ -723,6 +744,10 @@ struct Run {
#[arg(long)] #[arg(long)]
client_timeout: Option<u64>, client_timeout: Option<u64>,
/// Whether to enable the prometheus scrape endpoint
#[arg(long)]
metrics_prometheus_address: Option<SocketAddr>,
/// How many files are allowed to be uploaded per-request /// How many files are allowed to be uploaded per-request
/// ///
/// This number defaults to 1 /// This number defaults to 1

View file

@ -15,6 +15,8 @@ pub(crate) struct ConfigFile {
pub(crate) tracing: Tracing, pub(crate) tracing: Tracing,
pub(crate) metrics: Metrics,
pub(crate) old_db: OldDb, pub(crate) old_db: OldDb,
pub(crate) media: Media, pub(crate) media: Media,
@ -119,6 +121,13 @@ pub(crate) struct Tracing {
pub(crate) opentelemetry: OpenTelemetry, pub(crate) opentelemetry: OpenTelemetry,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Metrics {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) prometheus_address: Option<SocketAddr>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub(crate) struct Logging { pub(crate) struct Logging {

View file

@ -8,10 +8,40 @@ use crate::{
store::Store, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use std::path::PathBuf; use std::{path::PathBuf, time::Instant};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tracing::Instrument; use tracing::Instrument;
struct MetricsGuard {
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard() -> Self {
metrics::increment_counter!("pict-rs.generate.start");
Self {
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.generate.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
if self.armed {
metrics::increment_counter!("pict-rs.generate.failure");
} else {
metrics::increment_counter!("pict-rs.generate.success");
}
}
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(repo, store, hash))] #[tracing::instrument(skip(repo, store, hash))]
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>( pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
@ -61,6 +91,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
media: &crate::config::Media, media: &crate::config::Media,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let guard = MetricsGuard::guard();
let permit = crate::PROCESS_SEMAPHORE.acquire().await; let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let identifier = if let Some(identifier) = repo let identifier = if let Some(identifier) = repo
@ -149,5 +180,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
) )
.await?; .await?;
guard.disarm();
Ok((details, bytes)) as Result<(Details, Bytes), Error> Ok((details, bytes)) as Result<(Details, Bytes), Error>
} }

View file

@ -225,6 +225,8 @@ where
{ {
fn drop(&mut self) { fn drop(&mut self) {
if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() {
metrics::increment_counter!("pict-rs.ingest.failure");
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
cleanup_parent_span.follows_from(Span::current()); cleanup_parent_span.follows_from(Span::current());
@ -279,6 +281,8 @@ where
) )
}); });
} }
} else {
metrics::increment_counter!("pict-rs.ingest.success");
} }
} }
} }

View file

@ -37,6 +37,8 @@ use futures_util::{
stream::{empty, once}, stream::{empty, once},
Stream, StreamExt, TryStreamExt, Stream, StreamExt, TryStreamExt,
}; };
use metrics_exporter_prometheus::PrometheusBuilder;
use middleware::Metrics;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware; use reqwest_tracing::TracingMiddleware;
@ -161,6 +163,8 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
let store = store.clone(); let store = store.clone();
let config = config.clone(); let config = config.clone();
metrics::increment_counter!("pict-rs.files", "upload" => "inline");
let span = tracing::info_span!("file-upload", ?filename); let span = tracing::info_span!("file-upload", ?filename);
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
@ -218,6 +222,8 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
let store = store.clone(); let store = store.clone();
let config = config.clone(); let config = config.clone();
metrics::increment_counter!("pict-rs.files", "import" => "inline");
let span = tracing::info_span!("file-import", ?filename); let span = tracing::info_span!("file-import", ?filename);
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
@ -353,6 +359,8 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
let repo = (**repo).clone(); let repo = (**repo).clone();
let store = (**store).clone(); let store = (**store).clone();
metrics::increment_counter!("pict-rs.files", "upload" => "background");
let span = tracing::info_span!("file-proxy", ?filename); let span = tracing::info_span!("file-proxy", ?filename);
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
@ -440,6 +448,7 @@ async fn claim_upload<R: FullRepo, S: Store + 'static>(
Ok(wait_res) => { Ok(wait_res) => {
let upload_result = wait_res?; let upload_result = wait_res?;
repo.claim(upload_id).await?; repo.claim(upload_id).await?;
metrics::increment_counter!("pict-rs.background.upload.claim");
match upload_result { match upload_result {
UploadResult::Success { alias, token } => { UploadResult::Success { alias, token } => {
@ -511,6 +520,8 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>, config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
metrics::increment_counter!("pict-rs.files", "download" => "inline");
let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?; let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?;
let alias = session.alias().expect("alias should exist").to_owned(); let alias = session.alias().expect("alias should exist").to_owned();
@ -536,6 +547,8 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
metrics::increment_counter!("pict-rs.files", "download" => "background");
let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?; let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?;
let upload_id = backgrounded.upload_id().expect("Upload ID exists"); let upload_id = backgrounded.upload_id().expect("Upload ID exists");
@ -1362,6 +1375,7 @@ async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig)
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.wrap(Metrics)
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
@ -1396,6 +1410,7 @@ async fn launch_object_store<
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.wrap(Metrics)
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
@ -1471,8 +1486,6 @@ impl<P: AsRef<Path>, T: serde::Serialize> ConfigSource<P, T> {
/// parameters have defaults, it can be useful to dump a valid configuration with default values to /// parameters have defaults, it can be useful to dump a valid configuration with default values to
/// see what is available for tweaking. /// see what is available for tweaking.
/// ///
/// This function must be called before `run` or `install_tracing`
///
/// When running pict-rs as a library, configuration is limited to environment variables and /// When running pict-rs as a library, configuration is limited to environment variables and
/// configuration files. Commandline options are not available. /// configuration files. Commandline options are not available.
/// ///
@ -1535,6 +1548,16 @@ impl PictRsConfiguration {
Ok(self) Ok(self)
} }
pub fn install_metrics(self) -> color_eyre::Result<Self> {
if let Some(addr) = self.config.metrics.prometheus_address {
PrometheusBuilder::new()
.with_http_listener(addr)
.install()?;
}
Ok(self)
}
/// Run the pict-rs application /// Run the pict-rs application
/// ///
/// This must be called after `init_config`, or else the default configuration builder will run and /// This must be called after `init_config`, or else the default configuration builder will run and

View file

@ -2,6 +2,7 @@
async fn main() -> color_eyre::Result<()> { async fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()? pict_rs::PictRsConfiguration::build_default()?
.install_tracing()? .install_tracing()?
.install_metrics()?
.run() .run()
.await .await
} }

View file

@ -1,3 +1,5 @@
mod metrics;
use actix_rt::time::Timeout; use actix_rt::time::Timeout;
use actix_web::{ use actix_web::{
dev::{Service, ServiceRequest, Transform}, dev::{Service, ServiceRequest, Transform},
@ -10,6 +12,8 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
pub(crate) use self::metrics::Metrics;
pub(crate) struct Deadline; pub(crate) struct Deadline;
pub(crate) struct DeadlineMiddleware<S> { pub(crate) struct DeadlineMiddleware<S> {
inner: S, inner: S,

237
src/middleware/metrics.rs Normal file
View file

@ -0,0 +1,237 @@
use actix_web::{
body::MessageBody,
dev::{Service, ServiceRequest, ServiceResponse, Transform},
http::StatusCode,
HttpResponse, ResponseError,
};
use std::{
cell::RefCell,
future::{ready, Future, Ready},
pin::Pin,
task::{Context, Poll},
time::Instant,
};
struct MetricsGuard {
start: Instant,
matched_path: Option<String>,
armed: bool,
}
struct MetricsGuardWithStatus {
start: Instant,
matched_path: Option<String>,
status: StatusCode,
}
impl MetricsGuard {
fn new(matched_path: Option<String>) -> Self {
metrics::increment_counter!("pict-rs.request.start", "path" => format!("{matched_path:?}"));
Self {
start: Instant::now(),
matched_path,
armed: true,
}
}
fn with_status(mut self, status: StatusCode) -> MetricsGuardWithStatus {
self.armed = false;
MetricsGuardWithStatus {
start: self.start,
matched_path: self.matched_path.clone(),
status,
}
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
if self.armed {
metrics::increment_counter!("pict-rs.request.complete", "path" => format!("{:?}", self.matched_path));
metrics::histogram!("pict-rs.request.timings", self.start.elapsed().as_secs_f64(), "path" => format!("{:?}", self.matched_path))
}
}
}
impl Drop for MetricsGuardWithStatus {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.request.complete", "path" => format!("{:?}", self.matched_path), "status" => self.status.to_string());
metrics::histogram!("pict-rs.request.timings", self.start.elapsed().as_secs_f64(), "path" => format!("{:?}", self.matched_path), "status" => self.status.to_string());
}
}
pub(crate) struct Metrics;
pub(crate) struct MetricsMiddleware<S> {
inner: S,
}
pub(crate) struct MetricsError {
guard: RefCell<Option<MetricsGuard>>,
inner: actix_web::Error,
}
pin_project_lite::pin_project! {
pub(crate) struct MetricsFuture<F> {
guard: Option<MetricsGuard>,
#[pin]
inner: F,
}
}
pin_project_lite::pin_project! {
pub(crate) struct MetricsBody<B> {
guard: Option<MetricsGuardWithStatus>,
#[pin]
inner: B,
}
}
impl<S, B> Transform<S, ServiceRequest> for Metrics
where
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<MetricsBody<B>>;
type Error = actix_web::Error;
type InitError = ();
type Transform = MetricsMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(MetricsMiddleware { inner: service }))
}
}
impl<S, B> Service<ServiceRequest> for MetricsMiddleware<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<MetricsBody<B>>;
type Error = actix_web::Error;
type Future = MetricsFuture<S::Future>;
fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
let res = std::task::ready!(self.inner.poll_ready(ctx));
Poll::Ready(res.map_err(|e| {
MetricsError {
guard: RefCell::new(None),
inner: e.into(),
}
.into()
}))
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let matched_path = req.match_pattern();
MetricsFuture {
guard: Some(MetricsGuard::new(matched_path)),
inner: self.inner.call(req),
}
}
}
impl<F, B, E> Future for MetricsFuture<F>
where
F: Future<Output = Result<ServiceResponse<B>, E>>,
E: Into<actix_web::Error>,
{
type Output = Result<ServiceResponse<MetricsBody<B>>, actix_web::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match std::task::ready!(this.inner.poll(cx)) {
Ok(response) => {
let guard = this.guard.take();
Poll::Ready(Ok(response.map_body(|head, inner| MetricsBody {
guard: guard.map(|guard| guard.with_status(head.status)),
inner,
})))
}
Err(e) => {
let guard = this.guard.take();
Poll::Ready(Err(MetricsError {
guard: RefCell::new(guard),
inner: e.into(),
}
.into()))
}
}
}
}
impl<B> MessageBody for MetricsBody<B>
where
B: MessageBody,
{
type Error = B::Error;
fn size(&self) -> actix_web::body::BodySize {
self.inner.size()
}
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
let this = self.project();
let opt = std::task::ready!(this.inner.poll_next(cx));
if opt.is_none() {
this.guard.take();
}
Poll::Ready(opt)
}
}
impl std::fmt::Debug for MetricsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsError")
.field("guard", &"Guard")
.field("inner", &self.inner)
.finish()
}
}
impl std::fmt::Display for MetricsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
impl std::error::Error for MetricsError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source()
}
}
impl ResponseError for MetricsError {
fn status_code(&self) -> StatusCode {
self.inner.as_response_error().status_code()
}
fn error_response(&self) -> HttpResponse<actix_web::body::BoxBody> {
let guard = self.guard.borrow_mut().take();
self.inner.error_response().map_body(|head, inner| {
MetricsBody {
guard: guard.map(|guard| guard.with_status(head.status)),
inner,
}
.boxed()
})
}
}

View file

@ -5,6 +5,7 @@ use std::{
pin::Pin, pin::Pin,
process::{ExitStatus, Stdio}, process::{ExitStatus, Stdio},
task::{Context, Poll}, task::{Context, Poll},
time::Instant,
}; };
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncWriteExt, ReadBuf},
@ -13,12 +14,52 @@ use tokio::{
}; };
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
struct MetricsGuard {
start: Instant,
armed: bool,
command: String,
}
impl MetricsGuard {
fn guard(command: String) -> Self {
metrics::increment_counter!("pict-rs.process.spawn", "command" => command.clone());
Self {
start: Instant::now(),
armed: true,
command,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::histogram!(
"pict-rs.process.duration",
self.start.elapsed().as_secs_f64(),
"command" => self.command.clone(),
"completed" => (!self.armed).to_string(),
);
if self.armed {
metrics::increment_counter!("pict-rs.process.failure", "command" => self.command.clone());
} else {
metrics::increment_counter!("pict-rs.process.success", "command" => self.command.clone());
}
}
}
#[derive(Debug)] #[derive(Debug)]
struct StatusError(ExitStatus); struct StatusError(ExitStatus);
pub(crate) struct Process { pub(crate) struct Process {
command: String, command: String,
child: Child, child: Child,
guard: MetricsGuard,
} }
impl std::fmt::Debug for Process { impl std::fmt::Debug for Process {
@ -80,6 +121,8 @@ impl Process {
fn spawn(command: &str, cmd: &mut Command) -> std::io::Result<Self> { fn spawn(command: &str, cmd: &mut Command) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| {
let guard = MetricsGuard::guard(command.into());
let cmd = cmd let cmd = cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -88,6 +131,7 @@ impl Process {
cmd.spawn().map(|child| Process { cmd.spawn().map(|child| Process {
child, child,
command: String::from(command), command: String::from(command),
guard,
}) })
}) })
} }
@ -97,7 +141,11 @@ impl Process {
let res = self.child.wait().await; let res = self.child.wait().await;
match res { match res {
Ok(status) if status.success() => Ok(()), Ok(status) if status.success() => {
self.guard.disarm();
Ok(())
}
Ok(status) => Err(ProcessError::Status(self.command, status)), Ok(status) => Err(ProcessError::Status(self.command, status)),
Err(e) => Err(ProcessError::Other(e)), Err(e) => Err(ProcessError::Other(e)),
} }
@ -133,6 +181,7 @@ impl Process {
let mut child = self.child; let mut child = self.child;
let command = self.command; let command = self.command;
let guard = self.guard;
let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| {
actix_rt::spawn( actix_rt::spawn(
async move { async move {
@ -143,7 +192,9 @@ impl Process {
match child.wait().await { match child.wait().await {
Ok(status) => { Ok(status) => {
if !status.success() { if status.success() {
guard.disarm();
} else {
let _ = tx.send(std::io::Error::new( let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
StatusError(status), StatusError(status),