diff --git a/src/queue.rs b/src/queue.rs index f7bd1a5..5c87aa5 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -216,44 +216,6 @@ pub(crate) async fn process_images( .await } -async fn process_jobs( - repo: &ArcRepo, - store: &S, - config: &Configuration, - queue: &'static str, - callback: F, -) where - S: Store, - for<'a> F: Fn( - &'a ArcRepo, - &'a S, - &'a Configuration, - serde_json::Value, - ) -> LocalBoxFuture<'a, Result<(), Error>> - + Copy, -{ - let worker_id = uuid::Uuid::new_v4(); - - loop { - tracing::trace!("process_jobs: looping"); - - let res = job_loop(repo, store, config, worker_id, queue, callback).await; - - if let Err(e) = res { - tracing::warn!("Error processing jobs: {}", format!("{e}")); - tracing::warn!("{}", format!("{e:?}")); - - if e.is_disconnected() { - tokio::time::sleep(Duration::from_secs(10)).await; - } - - continue; - } - - break; - } -} - struct MetricsGuard { worker_id: uuid::Uuid, queue: &'static str, @@ -285,6 +247,55 @@ impl Drop for MetricsGuard { } } +async fn process_jobs( + repo: &ArcRepo, + store: &S, + config: &Configuration, + queue: &'static str, + callback: F, +) where + S: Store, + for<'a> F: Fn( + &'a ArcRepo, + &'a S, + &'a Configuration, + serde_json::Value, + ) -> LocalBoxFuture<'a, Result<(), Error>> + + Copy, +{ + let worker_id = uuid::Uuid::new_v4(); + + let yield_limit = 8; + let mut count = 0; + + loop { + tracing::trace!("process_jobs: looping"); + + count += 1; + count %= yield_limit; + + // yield every 8 iterations to be kind to other tasks + if count == 0 { + tokio::task::yield_now().await; + } + + let res = job_loop(repo, store, config, worker_id, queue, callback).await; + + if let Err(e) = res { + tracing::warn!("Error processing jobs: {}", format!("{e}")); + tracing::warn!("{}", format!("{e:?}")); + + if e.is_disconnected() { + tokio::time::sleep(Duration::from_secs(10)).await; + } + + continue; + } + + break; + } +} + async fn job_loop( repo: &ArcRepo, store: &S, @@ -303,9 +314,20 @@ where ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { + let yield_limit = 8; + let mut count = 0; + loop { tracing::trace!("job_loop: looping"); + count += 1; + count %= yield_limit; + + // yield every 8 iterations to be kind to other tasks + if count == 0 { + tokio::task::yield_now().await; + } + let fut = async { let (job_id, job) = repo.pop(queue, worker_id).await?; @@ -365,9 +387,20 @@ async fn process_image_jobs( { let worker_id = uuid::Uuid::new_v4(); + let yield_limit = 8; + let mut count = 0; + loop { tracing::trace!("process_image_jobs: looping"); + count += 1; + count %= yield_limit; + + // yield every 8 iterations to be kind to other tasks + if count == 0 { + tokio::task::yield_now().await; + } + let res = image_job_loop( tmp_dir, repo, @@ -421,9 +454,20 @@ where ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { + let yield_limit = 8; + let mut count = 0; + loop { tracing::trace!("image_job_loop: looping"); + count += 1; + count %= yield_limit; + + // yield every 8 iterations to be kind to other tasks + if count == 0 { + tokio::task::yield_now().await; + } + let fut = async { let (job_id, job) = repo.pop(queue, worker_id).await?;