Add a few more metrics, increase repo disconnect sleep

This commit is contained in:
asonix 2024-01-05 19:35:52 -06:00
parent 688c797082
commit 4145637a33
3 changed files with 37 additions and 2 deletions

View file

@ -1,7 +1,7 @@
use std::{ use std::{
future::{ready, Ready}, future::{ready, Ready},
rc::Rc, rc::Rc,
time::Duration, time::{Duration, Instant},
}; };
use actix_web::{ use actix_web::{
@ -15,6 +15,36 @@ use crate::{future::NowOrNever, stream::LocalBoxStream};
const LIMIT: usize = 256; const LIMIT: usize = 256;
struct MetricsGuard {
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard() -> Self {
metrics::counter!("pict-rs.payload.drain.start").increment(1);
MetricsGuard {
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::counter!("pict-rs.payload.drain.end", "completed" => (!self.armed).to_string())
.increment(1);
metrics::histogram!("pict-rs.payload.drain.duration", "completed" => (!self.armed).to_string())
.record(self.start.elapsed().as_secs_f64());
}
}
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) { async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new(); let mut set = JoinSet::new();
@ -22,11 +52,13 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
tracing::trace!("drain: looping"); tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail // draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
let guard = MetricsGuard::guard();
set.spawn_local(tokio::time::timeout(Duration::from_secs(120), async move { set.spawn_local(tokio::time::timeout(Duration::from_secs(120), async move {
let mut streamer = payload.into_streamer(); let mut streamer = payload.into_streamer();
while streamer.next().await.is_some() { while streamer.next().await.is_some() {
tracing::trace!("drain drop bytes: looping"); tracing::trace!("drain drop bytes: looping");
} }
guard.disarm();
})); }));
let mut count = 0; let mut count = 0;
@ -105,6 +137,7 @@ impl Drop for PayloadStream {
if let Some(payload) = self.inner.take() { if let Some(payload) = self.inner.take() {
tracing::warn!("Dropped unclosed payload, draining"); tracing::warn!("Dropped unclosed payload, draining");
if self.sender.try_send(payload).is_err() { if self.sender.try_send(payload).is_err() {
metrics::counter!("pict-rs.payload.drain.fail-send").increment(1);
tracing::error!("Failed to send unclosed payload for draining"); tracing::error!("Failed to send unclosed payload for draining");
} }
} }

View file

@ -358,7 +358,7 @@ async fn process_image_jobs<S, F>(
tracing::warn!("{}", format!("{e:?}")); tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() { if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(10)).await;
} }
continue; continue;

View file

@ -161,6 +161,7 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(),
let mut variant_stream = repo.older_variants(since).await?.into_streamer(); let mut variant_stream = repo.older_variants(since).await?.into_streamer();
while let Some(res) = variant_stream.next().await { while let Some(res) = variant_stream.next().await {
metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1);
tracing::trace!("outdated_variants: looping"); tracing::trace!("outdated_variants: looping");
let (hash, variant) = res?; let (hash, variant) = res?;
@ -178,6 +179,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
let mut alias_stream = repo.older_aliases(since).await?.into_streamer(); let mut alias_stream = repo.older_aliases(since).await?.into_streamer();
while let Some(res) = alias_stream.next().await { while let Some(res) = alias_stream.next().await {
metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1);
tracing::trace!("outdated_proxies: looping"); tracing::trace!("outdated_proxies: looping");
let alias = res?; let alias = res?;