diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 7bd1b88..e99cad0 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -162,14 +162,21 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048)); let mut variant_stream = variant_stream.into_streamer(); + let mut count = 0; + while let Some(res) = variant_stream.next().await { metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1); tracing::trace!("outdated_variants: looping"); let (hash, variant) = res?; super::cleanup_variants(repo, hash, Some(variant)).await?; + count += 1; } + tracing::debug!("Queued {count} variant cleanup jobs"); + let queue_length = repo.queue_length().await?; + tracing::debug!("Total queue length: {queue_length}"); + Ok(()) } @@ -182,6 +189,8 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048)); let mut alias_stream = alias_stream.into_streamer(); + let mut count = 0; + while let Some(res) = alias_stream.next().await { metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1); tracing::trace!("outdated_proxies: looping"); @@ -189,6 +198,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let alias = res?; if let Some(token) = repo.delete_token(&alias).await? { super::cleanup_alias(repo, alias, token).await?; + count += 1; } else { tracing::warn!("Skipping alias cleanup - no delete token"); repo.remove_relation(alias.clone()).await?; @@ -196,6 +206,10 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), } } + tracing::debug!("Queued {count} alias cleanup jobs"); + let queue_length = repo.queue_length().await?; + tracing::debug!("Total queue length: {queue_length}"); + Ok(()) } diff --git a/src/repo.rs b/src/repo.rs index 6051a5b..aaf556b 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -353,6 +353,8 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { + async fn queue_length(&self) -> Result; + async fn push( &self, queue: &'static str, @@ -386,6 +388,10 @@ impl QueueRepo for Arc where T: QueueRepo, { + async fn queue_length(&self) -> Result { + T::queue_length(self).await + } + async fn push( &self, queue: &'static str, diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 670d0c5..fa10010 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1254,6 +1254,23 @@ impl DetailsRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl QueueRepo for PostgresRepo { + async fn queue_length(&self) -> Result { + use schema::job_queue::dsl::*; + + let mut conn = self.get_connection().await?; + + let count = job_queue + .count() + .get_result::(&mut conn) + .with_metrics("pict-rs.postgres.job_queue.count") + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + + Ok(count.try_into().expect("non-negative count")) + } + #[tracing::instrument(level = "debug", skip(self, job_json))] async fn push( &self, diff --git a/src/repo/sled.rs b/src/repo/sled.rs index cb3e99c..96a8ba0 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -624,6 +624,16 @@ fn try_into_arc_str(ivec: IVec) -> Result, SledError> { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { + async fn queue_length(&self) -> Result { + let queue = self.queue.clone(); + + let size = crate::sync::spawn_blocking("sled-io", move || queue.len()) + .await + .map_err(|_| RepoError::Canceled)?; + + Ok(size as u64) + } + #[tracing::instrument(skip(self))] async fn push( &self,