diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 183a66f..8344ff3 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -17,6 +17,7 @@ pub struct SledStorage { running_inverse: Tree, queue: Tree, stats: Tree, + lock: Tree, db: sled::Db, } @@ -36,16 +37,24 @@ impl Storage for SledStorage { } fn fetch_job_from_queue(&mut self, queue: &str) -> Result> { - let job = self - .queue - .iter() - .filter_map(|res| res.ok()) - .filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None }) - .filter_map(|id| self.jobinfo.get(id).ok()) - .filter_map(|opt| opt) - .next(); + let queue_tree = self.queue.clone(); + let job_tree = self.jobinfo.clone(); - Ok(job) + self.lock_queue(queue, move || { + let job = queue_tree + .iter() + .filter_map(|res| res.ok()) + .filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None }) + .filter_map(|id| job_tree.get(id).ok()) + .filter_map(|opt| opt) + .next(); + + if let Some(ref job) = job { + queue_tree.del(&job_key(job.id()))?; + } + + Ok(job) + }) } fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> { @@ -104,9 +113,33 @@ impl SledStorage { running_inverse: open_tree(&db, "background-jobs-running-inverse")?, queue: open_tree(&db, "background-jobs-queue")?, stats: open_tree(&db, "background-jobs-stats")?, + lock: open_tree(&db, "background-jobs-lock")?, db, }) } + + fn lock_queue(&self, queue: &str, f: F) -> Result + where + F: Fn() -> Result, + { + let id = self.db.generate_id()?; + + let mut prev; + while { + prev = self.lock.fetch_and_update(queue, move |opt| match opt { + Some(_) => opt, + None => Some(id), + })?; + + prev.is_some() + } {} + + let res = (f)(); + + self.lock.fetch_and_update(queue, |_| None)?; + + res + } } fn job_key(id: u64) -> String {