Fix deadlocks in Storage methods

This commit is contained in:
asonix 2018-11-06 20:01:43 -06:00
parent 13f31c6e31
commit a9b294d39e
10 changed files with 368 additions and 167 deletions

View file

@ -9,6 +9,7 @@ members = [
"jobs-core",
"jobs-executor",
"jobs-tokio",
"examples/process-jobs",
]
[features]

View file

@ -0,0 +1 @@
RUST_LOG=jobs_tokio,process_jobs=trace

1
examples/process-jobs/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
example-db

View file

@ -0,0 +1,20 @@
[package]
name = "process-jobs"
version = "0.1.0"
authors = ["asonix <asonix@asonix.dog>"]
edition = "2018"
[dependencies]
dotenv = "0.13"
env_logger = "0.5"
failure = "0.1"
futures = "0.1"
log = "0.4"
serde = "1.0"
serde_derive = "1.0"
tokio = "0.1"
[dependencies.jobs]
version = "0.1"
path = "../.."
features = ["jobs-tokio"]

View file

@ -0,0 +1,79 @@
#[macro_use]
extern crate serde_derive;
use failure::Error;
use futures::{
future::{lazy, IntoFuture},
Future,
};
use jobs::{JobRunner, MaxRetries, Processor};
#[derive(Clone, Debug, Deserialize, Serialize)]
struct MyJobArguments {
some_usize: usize,
other_usize: usize,
}
struct MyProcessor;
impl Processor for MyProcessor {
type Arguments = MyJobArguments;
fn name() -> &'static str {
"MyProcessor"
}
fn max_retries() -> MaxRetries {
MaxRetries::Count(1)
}
fn process(&self, args: Self::Arguments) -> Box<dyn Future<Item = (), Error = Error> + Send> {
println!("args: {:?}", args);
Box::new(Ok(()).into_future())
}
}
fn main() {
dotenv::dotenv().ok();
env_logger::init();
tokio::run(lazy(|| {
let mut runner = JobRunner::new(1234, 8, "example-db");
runner.register_processor(MyProcessor);
let handle = runner.spawn();
let jobs = vec![
MyJobArguments {
some_usize: 0,
other_usize: 1,
},
MyJobArguments {
some_usize: 1,
other_usize: 2,
},
MyJobArguments {
some_usize: 3,
other_usize: 5,
},
MyJobArguments {
some_usize: 8,
other_usize: 13,
},
];
let _: Vec<_> = jobs
.into_iter()
.map(|job| {
tokio::spawn(
handle
.queue(MyProcessor::new_job(job, None).unwrap())
.then(|_| Ok(())),
);
})
.collect();
Ok(())
}));
}

View file

@ -8,7 +8,7 @@ edition = "2018"
failure = "0.1"
futures = "0.1.21"
log = "0.4"
kv = "0.6"
kv = { version = "0.6", path = "../../rust-kv", features = ["json-value"] }
lmdb = "0.8"
serde = "1.0"
serde_derive = "1.0"

View file

@ -26,7 +26,7 @@ pub enum JobError {
/// The Processor trait
///
/// Processors are
/// Processors define the logic for executing jobs
pub trait Processor {
type Arguments: Serialize + DeserializeOwned;
@ -113,10 +113,19 @@ pub trait Processor {
}
}
/// Set the status of a job when storing it
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
pub enum JobStatus {
/// Job should be queued
Pending,
Active,
/// Job is running
Running,
/// Job has failed
Failed,
/// Job has finished
Finished,
}
@ -167,7 +176,7 @@ impl MaxRetries {
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct JobInfo {
/// ID of the job, None means an ID has not been set
id: Option<usize>,
@ -202,6 +211,14 @@ impl JobInfo {
self.id = Some(id);
}
}
fn fail(&mut self) {
self.status = JobStatus::Failed;
}
fn pass(&mut self) {
self.status = JobStatus::Finished;
}
}
pub type ProcessFn =
@ -249,7 +266,7 @@ impl Default for Processors {
}
}
fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
let args = job.args.clone();
let processor = job.processor.clone();
@ -257,10 +274,12 @@ fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future<Item = JobInfo,
process_fn(args).then(move |res| match res {
Ok(_) => {
info!("Job completed, {}", processor);
job.pass();
Ok(job)
}
Err(e) => {
error!("Job errored, {}, {}", processor, e);
job.fail();
Ok(job)
}
})

View file

@ -3,23 +3,28 @@ use std::{
sync::{Arc, RwLock, RwLockWriteGuard},
};
use crate::JobInfo;
use kv::{bincode::Bincode, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf};
use crate::{JobInfo, JobStatus};
use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf};
use lmdb::Error as LmdbError;
/// Set the status of a job when storing it
pub enum JobStatus {
/// Job should be queued
Pending,
struct Buckets<'a> {
queued: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
running: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
failed: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
finished: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
}
/// Job is running
Running,
impl<'a> Buckets<'a> {
fn new(store: &'a RwLockWriteGuard<Store>) -> Result<Self, Error> {
let b = Buckets {
queued: store.bucket(Some(Storage::job_queue()))?,
running: store.bucket(Some(Storage::job_running()))?,
failed: store.bucket(Some(Storage::job_failed()))?,
finished: store.bucket(Some(Storage::job_finished()))?,
};
/// Job has failed
Failed,
/// Job has finished
Finished,
Ok(b)
}
}
#[derive(Clone)]
@ -50,10 +55,12 @@ impl Storage {
pub fn get_new_id(&self) -> Result<usize, Error> {
let store = self.store.write()?;
let bucket = store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(Storage::id_store()))?;
let bucket = store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::id_store()))?;
let new_id = self.with_lock(&store, &bucket, "id-lock", |txn| {
let id = match txn.get(&bucket, "current-id") {
let mut txn = store.write_txn()?;
let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", |txn| {
let id = match txn.get(&bucket, b"current-id") {
Ok(id) => id.inner()?.to_serde(),
Err(e) => match e {
Error::NotFound => 1,
@ -63,63 +70,92 @@ impl Storage {
let new_id = id + 1;
let new_id_value = Bincode::to_value_buf(new_id)?;
txn.set(&bucket, "current-id", new_id_value)?;
let new_id_value = Json::to_value_buf(new_id)?;
txn.set(&bucket, b"current-id", new_id_value)?;
Ok(new_id)
})?;
txn.commit()?;
Ok(new_id)
}
pub fn dequeue_job(&self) -> Result<Option<JobInfo>, Error> {
pub fn dequeue_job(&self, limit: usize) -> Result<Vec<JobInfo>, Error> {
let store = self.store.write()?;
trace!("Got store");
let job_bucket =
store.bucket::<&[u8], ValueBuf<Bincode<JobInfo>>>(Some(Storage::job_queue()))?;
let txn = store.read_txn()?;
let maybe_job = {
let cursor = txn.read_cursor(&job_bucket)?;
match cursor.get(None, CursorOp::Current) {
Ok((k, v)) => {
let v = v.inner()?.to_serde();
Some((k, v))
}
Err(e) => match e {
Error::NotFound => None,
e => return Err(e),
},
}
};
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
let lock_bucket =
store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(Storage::job_lock()))?;
store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::job_lock()))?;
let result = if let Some((maybe_key, job)) = maybe_job {
if let Some(key) = maybe_key {
let job_key = std::str::from_utf8(&key).unwrap();
let buckets = Buckets::new(&store)?;
self.with_lock(&store, &lock_bucket, job_key, move |_| {
self.run_job(job_key.parse().unwrap())
})
.map(|_| Some(job))
} else {
warn!("Didn't get key from cursor");
Ok(None)
}
} else {
info!("No jobs queued");
Ok(None)
};
trace!("got buckets");
let mut txn = store.write_txn()?;
let read_txn = store.read_txn()?;
let result =
self.with_lock::<_, Vec<JobInfo>>(&lock_bucket, &mut txn, b"job-queue", |inner_txn| {
let mut cursor = read_txn.read_cursor(&buckets.queued)?;
trace!("Got cursor");
match cursor.get(None, CursorOp::First) {
Ok(_) => (),
Err(e) => match e {
Error::NotFound => {
trace!("No items in queue");
return Ok(vec![]);
}
e => {
return Err(e);
}
},
}
trace!("Set cursor to first");
let initial_value =
Ok((inner_txn, Vec::new())) as Result<(&mut Txn, Vec<JobInfo>), Error>;
trace!("Got lock");
let (_inner_txn, vec) =
cursor
.iter()
.fold(initial_value, |acc, (key, _)| match acc {
Ok((inner_txn, mut jobs)) => {
if jobs.len() < limit {
self.run_job(
&buckets,
inner_txn,
std::str::from_utf8(key).unwrap().parse().unwrap(),
)?;
let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
jobs.push(job);
}
Ok((inner_txn, jobs))
}
Err(e) => Err(e),
})?;
Ok(vec)
})?;
trace!("Committing");
txn.commit()?;
result
trace!("Committed");
Ok(result)
}
pub fn store_job(&self, mut job: JobInfo, status: JobStatus) -> Result<(), Error> {
pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> {
let job_id = match job.id() {
Some(id) => id,
None => {
@ -129,94 +165,132 @@ impl Storage {
}
};
let job_value = Bincode::to_value_buf(job)?;
trace!("Generaged job id, {}", job_id);
{
let store = self.store.write()?;
let bucket =
store.bucket::<&str, ValueBuf<Bincode<JobInfo>>>(Some(Storage::job_store()))?;
let mut txn = store.write_txn()?;
txn.set(&bucket, &job_id.to_string(), job_value)?;
txn.commit()?;
if let JobStatus::Failed = job.status.clone() {
if job.decrement().should_requeue() {
job.status = JobStatus::Pending;
}
}
let status = job.status.clone();
let job_value = Json::to_value_buf(job)?;
trace!("Storing job");
let store = self.store.write()?;
trace!("Got store");
let bucket = store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
trace!("Got bucket");
let buckets = Buckets::new(&store)?;
let mut txn = store.write_txn()?;
trace!("Opened write txn");
txn.set(&bucket, job_id.to_string().as_ref(), job_value)?;
trace!("Set value");
match status {
JobStatus::Pending => self.queue_job(job_id)?,
JobStatus::Running => self.run_job(job_id)?,
JobStatus::Failed => self.fail_job(job_id)?,
JobStatus::Finished => self.finish_job(job_id)?,
JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id)?,
JobStatus::Running => self.run_job(&buckets, &mut txn, job_id)?,
JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id)?,
JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id)?,
}
Ok(())
}
trace!("Committing");
fn queue_job(&self, id: usize) -> Result<(), Error> {
self.add_job_to(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
fn fail_job(&self, id: usize) -> Result<(), Error> {
self.add_job_to(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
fn run_job(&self, id: usize) -> Result<(), Error> {
self.add_job_to(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
fn finish_job(&self, id: usize) -> Result<(), Error> {
self.add_job_to(id, Storage::job_finished())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
Ok(())
}
fn add_job_to(&self, id: usize, bucket_name: &str) -> Result<(), Error> {
let store = self.store.write()?;
let bucket = store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(bucket_name))?;
let mut txn = store.write_txn()?;
txn.set(
&bucket,
&id.to_string(),
Bincode::to_value_buf(self.runner_id)?,
)?;
txn.commit()?;
trace!("Committed");
Ok(())
}
fn delete_job_from(&self, id: usize, bucket_name: &str) -> Result<(), Error> {
let store = self.store.write()?;
let bucket = store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(bucket_name))?;
fn queue_job<'env>(
&self,
buckets: &'env Buckets<'env>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
self.add_job_to(&buckets.queued, txn, id)?;
self.delete_job_from(&buckets.finished, txn, id)?;
self.delete_job_from(&buckets.failed, txn, id)?;
self.delete_job_from(&buckets.running, txn, id)?;
let mut txn = store.write_txn()?;
Ok(())
}
match txn.del(&bucket, &id.to_string()) {
fn fail_job<'env>(
&self,
buckets: &'env Buckets<'env>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
self.add_job_to(&buckets.failed, txn, id)?;
self.delete_job_from(&buckets.finished, txn, id)?;
self.delete_job_from(&buckets.running, txn, id)?;
self.delete_job_from(&buckets.queued, txn, id)?;
Ok(())
}
fn run_job<'env>(
&self,
buckets: &'env Buckets<'env>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
self.add_job_to(&buckets.running, txn, id)?;
self.delete_job_from(&buckets.finished, txn, id)?;
self.delete_job_from(&buckets.failed, txn, id)?;
self.delete_job_from(&buckets.queued, txn, id)?;
Ok(())
}
fn finish_job<'env>(
&self,
buckets: &'env Buckets<'env>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
self.add_job_to(&buckets.finished, txn, id)?;
self.delete_job_from(&buckets.running, txn, id)?;
self.delete_job_from(&buckets.failed, txn, id)?;
self.delete_job_from(&buckets.queued, txn, id)?;
Ok(())
}
fn add_job_to<'env>(
&self,
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
txn.set(
bucket,
id.to_string().as_ref(),
Json::to_value_buf(self.runner_id)?,
)?;
trace!("Set value");
Ok(())
}
fn delete_job_from<'env>(
&self,
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
txn: &mut Txn<'env>,
id: usize,
) -> Result<(), Error> {
match txn.del(bucket, id.to_string().as_ref()) {
Ok(_) => (),
Err(e) => match e {
Error::NotFound => (),
e => return Err(e),
},
}
txn.commit()?;
trace!("Deleted value");
Ok(())
}
@ -225,31 +299,30 @@ impl Storage {
//
// But in the event of multiple processes running on the same machine, it is good to have some
// way to make sure they don't step on eachother's toes
fn with_lock<F, T>(
fn with_lock<'env, F, T>(
&self,
store: &RwLockWriteGuard<Store>,
bucket: &Bucket<&str, ValueBuf<Bincode<usize>>>,
lock_key: &str,
lock_bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
txn: &mut Txn<'env>,
lock_key: &[u8],
callback: F,
) -> Result<T, Error>
where
F: Fn(&mut Txn) -> Result<T, Error>,
F: Fn(&mut Txn<'env>) -> Result<T, Error>,
{
let mut txn = store.write_txn()?;
let mut other_runner_id = 0;
loop {
let lock_value = Bincode::to_value_buf(self.runner_id)?;
let lock_value = Json::to_value_buf(self.runner_id)?;
let mut inner_txn = txn.txn()?;
let res = inner_txn.set_no_overwrite(bucket, lock_key, lock_value);
let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value);
inner_txn.commit()?;
match res {
Ok(_) => break,
Err(e) => {
let inner_txn = txn.txn()?;
let res = inner_txn.get(bucket, lock_key);
let res = inner_txn.get(lock_bucket, lock_key);
inner_txn.commit()?;
match res {
@ -278,10 +351,9 @@ impl Storage {
}
}
let item = callback(&mut txn)?;
let item = callback(txn)?;
txn.del(bucket, lock_key)?;
txn.commit()?;
txn.del(lock_bucket, lock_key)?;
Ok(item)
}

View file

@ -11,10 +11,7 @@ use futures::{
sync::mpsc::{channel, Receiver, SendError, Sender},
Future, Sink, Stream,
};
use jobs_core::{
storage::{JobStatus, Storage},
JobInfo, Processor, Processors,
};
use jobs_core::{storage::Storage, JobInfo, Processor, Processors};
use tokio::timer::Interval;
use tokio_threadpool::blocking;
@ -66,7 +63,7 @@ fn return_job(
blocking(move || {
storage
.store_job(job, JobStatus::Finished)
.store_job(job)
.map_err(|e| error!("Error finishing job, {}", e))
})
.map_err(|e| error!("Error blocking, {}", e))
@ -87,26 +84,26 @@ fn try_process_job(
blocking(move || {
storage
.dequeue_job()
.dequeue_job(processor_count)
.map_err(|e| error!("Error dequeuing job, {}", e))
})
.map_err(|e| error!("Error blocking, {}", e))
})
.and_then(|res| res)
.then(move |res| match res {
Ok(maybe_job) => {
if let Some(job) = maybe_job {
// TODO: return JobInfo to DB with job status
tokio::spawn(processors.process_job(job).and_then(move |job| {
Ok(jobs) => Ok(jobs.into_iter().fold(
(processors, processor_count),
move |(proc, count), job| {
let tx = tx.clone();
tokio::spawn(proc.process_job(job).and_then(move |job| {
tx.send(ProcessorMessage::Job(job))
.map(|_| ())
.map_err(|e| error!("Error returning job, {}", e))
}));
Ok((processors, processor_count - 1))
} else {
Ok((processors, processor_count))
}
}
(proc, count - 1)
},
)),
Err(_) => Ok((processors, processor_count)),
});
@ -123,7 +120,7 @@ fn process_jobs(
tx: Sender<ProcessorMessage>,
rx: Receiver<ProcessorMessage>,
) -> impl Future<Item = (), Error = ()> {
Interval::new(Instant::now(), Duration::from_millis(500))
Interval::new(tokio::clock::now(), Duration::from_millis(500))
.map(ProcessorMessage::Time)
.map_err(|e| error!("Error in timer, {}", e))
.select(rx)
@ -142,10 +139,14 @@ fn process_jobs(
processors,
tx.clone(),
))),
ProcessorMessage::Stop => Either::B(Either::B(Err(()).into_future())),
ProcessorMessage::Stop => {
info!("Got stop message");
Either::B(Either::B(Err(()).into_future()))
}
},
)
.map(|_| ())
.map(|_| info!("Terminating processor"))
.map_err(|_| info!("Terminating processor"))
}
pub struct JobRunner {
@ -178,6 +179,14 @@ impl JobRunner {
self.processors.register_processor(processor);
}
pub fn spawn(self) -> ProcessorHandle {
let spawner = self.sender.clone();
tokio::spawn(self.runner());
ProcessorHandle { spawner }
}
fn runner(self) -> impl Future<Item = (), Error = ()> {
let JobRunner {
processors,
@ -207,7 +216,7 @@ impl JobRunner {
let storage = storage.clone();
blocking(|| {
storage
.store_job(job, JobStatus::Pending)
.store_job(job)
.map_err(|e| error!("Error storing job, {}", e))
.map(|_| storage)
})
@ -218,18 +227,10 @@ impl JobRunner {
})
.and_then(|_| {
tx2.send(ProcessorMessage::Stop)
.map(|_| ())
.map(|_| info!("Sent stop message"))
.map_err(|e| error!("Error shutting down processor, {}", e))
})
}
pub fn spawn(self) -> ProcessorHandle {
let spawner = self.sender.clone();
tokio::spawn(self.runner());
ProcessorHandle { spawner }
}
}
#[cfg(test)]

View file

@ -0,0 +1,7 @@
pub use jobs_core::{
storage::Storage, Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors,
ShouldStop,
};
#[cfg(feature = "jobs-tokio")]
pub use jobs_tokio::{JobRunner, ProcessorHandle};