Properly fetch jobs for memory storage

This commit is contained in:
asonix 2020-03-21 14:19:16 -05:00
parent 007d53b3c5
commit 3144b71abb

View file

@ -135,6 +135,7 @@ pub trait Storage: Clone + Send {
/// A default, in-memory implementation of a storage mechanism /// A default, in-memory implementation of a storage mechanism
pub mod memory_storage { pub mod memory_storage {
use super::{JobInfo, Stats}; use super::{JobInfo, Stats};
use chrono::Utc;
use futures::lock::Mutex; use futures::lock::Mutex;
use std::{collections::HashMap, convert::Infallible, sync::Arc}; use std::{collections::HashMap, convert::Infallible, sync::Arc};
@ -195,16 +196,21 @@ pub mod memory_storage {
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error> { async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error> {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let now = Utc::now();
let j = inner let j = inner
.queues .queues
.iter() .iter()
.filter_map(|(k, v)| { .filter_map(|(k, v)| {
if v == queue { if v == queue {
inner.jobs.get(k).map(|j| j.clone()) let job = inner.jobs.get(k)?;
} else {
None if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
return Some(job.clone());
}
} }
None
}) })
.next(); .next();