Give background jobs more opportunity to yield to runtime

This commit is contained in:
asonix 2024-01-25 16:59:46 -06:00
parent 819b83bab7
commit 7282ee9312

View file

@ -216,44 +216,6 @@ pub(crate) async fn process_images<S: Store + 'static>(
.await
}
async fn process_jobs<S, F>(
repo: &ArcRepo,
store: &S,
config: &Configuration,
queue: &'static str,
callback: F,
) where
S: Store,
for<'a> F: Fn(
&'a ArcRepo,
&'a S,
&'a Configuration,
serde_json::Value,
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
let worker_id = uuid::Uuid::new_v4();
loop {
tracing::trace!("process_jobs: looping");
let res = job_loop(repo, store, config, worker_id, queue, callback).await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
break;
}
}
struct MetricsGuard {
worker_id: uuid::Uuid,
queue: &'static str,
@ -285,6 +247,55 @@ impl Drop for MetricsGuard {
}
}
async fn process_jobs<S, F>(
repo: &ArcRepo,
store: &S,
config: &Configuration,
queue: &'static str,
callback: F,
) where
S: Store,
for<'a> F: Fn(
&'a ArcRepo,
&'a S,
&'a Configuration,
serde_json::Value,
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
let worker_id = uuid::Uuid::new_v4();
let yield_limit = 8;
let mut count = 0;
loop {
tracing::trace!("process_jobs: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await;
}
let res = job_loop(repo, store, config, worker_id, queue, callback).await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
break;
}
}
async fn job_loop<S, F>(
repo: &ArcRepo,
store: &S,
@ -303,9 +314,20 @@ where
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
let yield_limit = 8;
let mut count = 0;
loop {
tracing::trace!("job_loop: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await;
}
let fut = async {
let (job_id, job) = repo.pop(queue, worker_id).await?;
@ -365,9 +387,20 @@ async fn process_image_jobs<S, F>(
{
let worker_id = uuid::Uuid::new_v4();
let yield_limit = 8;
let mut count = 0;
loop {
tracing::trace!("process_image_jobs: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await;
}
let res = image_job_loop(
tmp_dir,
repo,
@ -421,9 +454,20 @@ where
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
let yield_limit = 8;
let mut count = 0;
loop {
tracing::trace!("image_job_loop: looping");
count += 1;
count %= yield_limit;
// yield every 8 iterations to be kind to other tasks
if count == 0 {
tokio::task::yield_now().await;
}
let fut = async {
let (job_id, job) = repo.pop(queue, worker_id).await?;