Add lock on queue traversal

This commit is contained in:
asonix 2019-05-25 16:39:16 -05:00
parent d3987768a5
commit e43abbfaaa

View file

@ -17,6 +17,7 @@ pub struct SledStorage {
running_inverse: Tree<u64>, running_inverse: Tree<u64>,
queue: Tree<String>, queue: Tree<String>,
stats: Tree<Stats>, stats: Tree<Stats>,
lock: Tree<u64>,
db: sled::Db, db: sled::Db,
} }
@ -36,16 +37,24 @@ impl Storage for SledStorage {
} }
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> { fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
let job = self let queue_tree = self.queue.clone();
.queue let job_tree = self.jobinfo.clone();
self.lock_queue(queue, move || {
let job = queue_tree
.iter() .iter()
.filter_map(|res| res.ok()) .filter_map(|res| res.ok())
.filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None }) .filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None })
.filter_map(|id| self.jobinfo.get(id).ok()) .filter_map(|id| job_tree.get(id).ok())
.filter_map(|opt| opt) .filter_map(|opt| opt)
.next(); .next();
if let Some(ref job) = job {
queue_tree.del(&job_key(job.id()))?;
}
Ok(job) Ok(job)
})
} }
fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> { fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> {
@ -104,9 +113,33 @@ impl SledStorage {
running_inverse: open_tree(&db, "background-jobs-running-inverse")?, running_inverse: open_tree(&db, "background-jobs-running-inverse")?,
queue: open_tree(&db, "background-jobs-queue")?, queue: open_tree(&db, "background-jobs-queue")?,
stats: open_tree(&db, "background-jobs-stats")?, stats: open_tree(&db, "background-jobs-stats")?,
lock: open_tree(&db, "background-jobs-lock")?,
db, db,
}) })
} }
fn lock_queue<T, F>(&self, queue: &str, f: F) -> Result<T>
where
F: Fn() -> Result<T>,
{
let id = self.db.generate_id()?;
let mut prev;
while {
prev = self.lock.fetch_and_update(queue, move |opt| match opt {
Some(_) => opt,
None => Some(id),
})?;
prev.is_some()
} {}
let res = (f)();
self.lock.fetch_and_update(queue, |_| None)?;
res
}
} }
fn job_key(id: u64) -> String { fn job_key(id: u64) -> String {