From 3144b71abb5991643353d8e9f046a173ce9d6d4e Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 21 Mar 2020 14:19:16 -0500 Subject: [PATCH] Properly fetch jobs for memory storage --- jobs-core/src/storage.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 383e036..a78c4bc 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -135,6 +135,7 @@ pub trait Storage: Clone + Send { /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { use super::{JobInfo, Stats}; + use chrono::Utc; use futures::lock::Mutex; use std::{collections::HashMap, convert::Infallible, sync::Arc}; @@ -195,16 +196,21 @@ pub mod memory_storage { async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error> { let mut inner = self.inner.lock().await; + let now = Utc::now(); let j = inner .queues .iter() .filter_map(|(k, v)| { if v == queue { - inner.jobs.get(k).map(|j| j.clone()) - } else { - None + let job = inner.jobs.get(k)?; + + if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { + return Some(job.clone()); + } } + + None }) .next();