Count spawned tasks in cleanup, debug log queue length

This commit is contained in:
asonix 2024-01-30 14:57:48 -06:00
parent 34cadb86b8
commit e7357cbdaf
4 changed files with 47 additions and 0 deletions

View file

@ -162,14 +162,21 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(),
let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048)); let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048));
let mut variant_stream = variant_stream.into_streamer(); let mut variant_stream = variant_stream.into_streamer();
let mut count = 0;
while let Some(res) = variant_stream.next().await { while let Some(res) = variant_stream.next().await {
metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1); metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1);
tracing::trace!("outdated_variants: looping"); tracing::trace!("outdated_variants: looping");
let (hash, variant) = res?; let (hash, variant) = res?;
super::cleanup_variants(repo, hash, Some(variant)).await?; super::cleanup_variants(repo, hash, Some(variant)).await?;
count += 1;
} }
tracing::debug!("Queued {count} variant cleanup jobs");
let queue_length = repo.queue_length().await?;
tracing::debug!("Total queue length: {queue_length}");
Ok(()) Ok(())
} }
@ -182,6 +189,8 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048)); let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048));
let mut alias_stream = alias_stream.into_streamer(); let mut alias_stream = alias_stream.into_streamer();
let mut count = 0;
while let Some(res) = alias_stream.next().await { while let Some(res) = alias_stream.next().await {
metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1); metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1);
tracing::trace!("outdated_proxies: looping"); tracing::trace!("outdated_proxies: looping");
@ -189,6 +198,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
let alias = res?; let alias = res?;
if let Some(token) = repo.delete_token(&alias).await? { if let Some(token) = repo.delete_token(&alias).await? {
super::cleanup_alias(repo, alias, token).await?; super::cleanup_alias(repo, alias, token).await?;
count += 1;
} else { } else {
tracing::warn!("Skipping alias cleanup - no delete token"); tracing::warn!("Skipping alias cleanup - no delete token");
repo.remove_relation(alias.clone()).await?; repo.remove_relation(alias.clone()).await?;
@ -196,6 +206,10 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(),
} }
} }
tracing::debug!("Queued {count} alias cleanup jobs");
let queue_length = repo.queue_length().await?;
tracing::debug!("Total queue length: {queue_length}");
Ok(()) Ok(())
} }

View file

@ -353,6 +353,8 @@ impl JobId {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo { pub(crate) trait QueueRepo: BaseRepo {
async fn queue_length(&self) -> Result<u64, RepoError>;
async fn push( async fn push(
&self, &self,
queue: &'static str, queue: &'static str,
@ -386,6 +388,10 @@ impl<T> QueueRepo for Arc<T>
where where
T: QueueRepo, T: QueueRepo,
{ {
async fn queue_length(&self) -> Result<u64, RepoError> {
T::queue_length(self).await
}
async fn push( async fn push(
&self, &self,
queue: &'static str, queue: &'static str,

View file

@ -1254,6 +1254,23 @@ impl DetailsRepo for PostgresRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl QueueRepo for PostgresRepo { impl QueueRepo for PostgresRepo {
async fn queue_length(&self) -> Result<u64, RepoError> {
use schema::job_queue::dsl::*;
let mut conn = self.get_connection().await?;
let count = job_queue
.count()
.get_result::<i64>(&mut conn)
.with_metrics("pict-rs.postgres.job_queue.count")
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(count.try_into().expect("non-negative count"))
}
#[tracing::instrument(level = "debug", skip(self, job_json))] #[tracing::instrument(level = "debug", skip(self, job_json))]
async fn push( async fn push(
&self, &self,

View file

@ -624,6 +624,16 @@ fn try_into_arc_str(ivec: IVec) -> Result<Arc<str>, SledError> {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo { impl QueueRepo for SledRepo {
async fn queue_length(&self) -> Result<u64, RepoError> {
let queue = self.queue.clone();
let size = crate::sync::spawn_blocking("sled-io", move || queue.len())
.await
.map_err(|_| RepoError::Canceled)?;
Ok(size as u64)
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn push( async fn push(
&self, &self,