diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 383e036..a78c4bc 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -135,6 +135,7 @@ pub trait Storage: Clone + Send { /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { use super::{JobInfo, Stats}; + use chrono::Utc; use futures::lock::Mutex; use std::{collections::HashMap, convert::Infallible, sync::Arc}; @@ -195,16 +196,21 @@ pub mod memory_storage { async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error> { let mut inner = self.inner.lock().await; + let now = Utc::now(); let j = inner .queues .iter() .filter_map(|(k, v)| { if v == queue { - inner.jobs.get(k).map(|j| j.clone()) - } else { - None + let job = inner.jobs.get(k)?; + + if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { + return Some(job.clone()); + } } + + None }) .next();