From a9b294d39e7101979741b45a54ac29ca1b69ca6d Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 6 Nov 2018 20:01:43 -0600 Subject: [PATCH] Fix deadlocks in Storage methods --- Cargo.toml | 1 + examples/process-jobs/.env | 1 + examples/process-jobs/.gitignore | 1 + examples/process-jobs/Cargo.toml | 20 ++ examples/process-jobs/src/main.rs | 79 +++++++ jobs-core/Cargo.toml | 2 +- jobs-core/src/lib.rs | 27 ++- jobs-core/src/storage.rs | 340 ++++++++++++++++++------------ jobs-tokio/src/lib.rs | 57 ++--- src/lib.rs | 7 + 10 files changed, 368 insertions(+), 167 deletions(-) create mode 100644 examples/process-jobs/.env create mode 100644 examples/process-jobs/.gitignore create mode 100644 examples/process-jobs/Cargo.toml create mode 100644 examples/process-jobs/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 9dff5e1..086e53c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "jobs-core", "jobs-executor", "jobs-tokio", + "examples/process-jobs", ] [features] diff --git a/examples/process-jobs/.env b/examples/process-jobs/.env new file mode 100644 index 0000000..0d3ea5c --- /dev/null +++ b/examples/process-jobs/.env @@ -0,0 +1 @@ +RUST_LOG=jobs_tokio,process_jobs=trace diff --git a/examples/process-jobs/.gitignore b/examples/process-jobs/.gitignore new file mode 100644 index 0000000..ac25cf6 --- /dev/null +++ b/examples/process-jobs/.gitignore @@ -0,0 +1 @@ +example-db diff --git a/examples/process-jobs/Cargo.toml b/examples/process-jobs/Cargo.toml new file mode 100644 index 0000000..1348753 --- /dev/null +++ b/examples/process-jobs/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "process-jobs" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +[dependencies] +dotenv = "0.13" +env_logger = "0.5" +failure = "0.1" +futures = "0.1" +log = "0.4" +serde = "1.0" +serde_derive = "1.0" +tokio = "0.1" + +[dependencies.jobs] +version = "0.1" +path = "../.." +features = ["jobs-tokio"] diff --git a/examples/process-jobs/src/main.rs b/examples/process-jobs/src/main.rs new file mode 100644 index 0000000..95e03a3 --- /dev/null +++ b/examples/process-jobs/src/main.rs @@ -0,0 +1,79 @@ +#[macro_use] +extern crate serde_derive; + +use failure::Error; +use futures::{ + future::{lazy, IntoFuture}, + Future, +}; +use jobs::{JobRunner, MaxRetries, Processor}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct MyJobArguments { + some_usize: usize, + other_usize: usize, +} + +struct MyProcessor; + +impl Processor for MyProcessor { + type Arguments = MyJobArguments; + + fn name() -> &'static str { + "MyProcessor" + } + + fn max_retries() -> MaxRetries { + MaxRetries::Count(1) + } + + fn process(&self, args: Self::Arguments) -> Box + Send> { + println!("args: {:?}", args); + + Box::new(Ok(()).into_future()) + } +} + +fn main() { + dotenv::dotenv().ok(); + env_logger::init(); + + tokio::run(lazy(|| { + let mut runner = JobRunner::new(1234, 8, "example-db"); + runner.register_processor(MyProcessor); + + let handle = runner.spawn(); + + let jobs = vec![ + MyJobArguments { + some_usize: 0, + other_usize: 1, + }, + MyJobArguments { + some_usize: 1, + other_usize: 2, + }, + MyJobArguments { + some_usize: 3, + other_usize: 5, + }, + MyJobArguments { + some_usize: 8, + other_usize: 13, + }, + ]; + + let _: Vec<_> = jobs + .into_iter() + .map(|job| { + tokio::spawn( + handle + .queue(MyProcessor::new_job(job, None).unwrap()) + .then(|_| Ok(())), + ); + }) + .collect(); + + Ok(()) + })); +} diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 9d4807b..ee3454a 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" failure = "0.1" futures = "0.1.21" log = "0.4" -kv = "0.6" +kv = { version = "0.6", path = "../../rust-kv", features = ["json-value"] } lmdb = "0.8" serde = "1.0" serde_derive = "1.0" diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 875441a..c16af36 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -26,7 +26,7 @@ pub enum JobError { /// The Processor trait /// -/// Processors are +/// Processors define the logic for executing jobs pub trait Processor { type Arguments: Serialize + DeserializeOwned; @@ -113,10 +113,19 @@ pub trait Processor { } } +/// Set the status of a job when storing it #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub enum JobStatus { + /// Job should be queued Pending, - Active, + + /// Job is running + Running, + + /// Job has failed + Failed, + + /// Job has finished Finished, } @@ -167,7 +176,7 @@ impl MaxRetries { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct JobInfo { /// ID of the job, None means an ID has not been set id: Option, @@ -202,6 +211,14 @@ impl JobInfo { self.id = Some(id); } } + + fn fail(&mut self) { + self.status = JobStatus::Failed; + } + + fn pass(&mut self) { + self.status = JobStatus::Finished; + } } pub type ProcessFn = @@ -249,7 +266,7 @@ impl Default for Processors { } } -fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future { +fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { let args = job.args.clone(); let processor = job.processor.clone(); @@ -257,10 +274,12 @@ fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future { info!("Job completed, {}", processor); + job.pass(); Ok(job) } Err(e) => { error!("Job errored, {}, {}", processor, e); + job.fail(); Ok(job) } }) diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index a0c933e..2c80479 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -3,23 +3,28 @@ use std::{ sync::{Arc, RwLock, RwLockWriteGuard}, }; -use crate::JobInfo; -use kv::{bincode::Bincode, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; +use crate::{JobInfo, JobStatus}; +use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; use lmdb::Error as LmdbError; -/// Set the status of a job when storing it -pub enum JobStatus { - /// Job should be queued - Pending, +struct Buckets<'a> { + queued: Bucket<'a, &'a [u8], ValueBuf>>, + running: Bucket<'a, &'a [u8], ValueBuf>>, + failed: Bucket<'a, &'a [u8], ValueBuf>>, + finished: Bucket<'a, &'a [u8], ValueBuf>>, +} - /// Job is running - Running, +impl<'a> Buckets<'a> { + fn new(store: &'a RwLockWriteGuard) -> Result { + let b = Buckets { + queued: store.bucket(Some(Storage::job_queue()))?, + running: store.bucket(Some(Storage::job_running()))?, + failed: store.bucket(Some(Storage::job_failed()))?, + finished: store.bucket(Some(Storage::job_finished()))?, + }; - /// Job has failed - Failed, - - /// Job has finished - Finished, + Ok(b) + } } #[derive(Clone)] @@ -50,10 +55,12 @@ impl Storage { pub fn get_new_id(&self) -> Result { let store = self.store.write()?; - let bucket = store.bucket::<&str, ValueBuf>>(Some(Storage::id_store()))?; + let bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::id_store()))?; - let new_id = self.with_lock(&store, &bucket, "id-lock", |txn| { - let id = match txn.get(&bucket, "current-id") { + let mut txn = store.write_txn()?; + + let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", |txn| { + let id = match txn.get(&bucket, b"current-id") { Ok(id) => id.inner()?.to_serde(), Err(e) => match e { Error::NotFound => 1, @@ -63,63 +70,92 @@ impl Storage { let new_id = id + 1; - let new_id_value = Bincode::to_value_buf(new_id)?; - txn.set(&bucket, "current-id", new_id_value)?; + let new_id_value = Json::to_value_buf(new_id)?; + txn.set(&bucket, b"current-id", new_id_value)?; Ok(new_id) })?; + txn.commit()?; + Ok(new_id) } - pub fn dequeue_job(&self) -> Result, Error> { + pub fn dequeue_job(&self, limit: usize) -> Result, Error> { let store = self.store.write()?; + trace!("Got store"); + let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_queue()))?; - - let txn = store.read_txn()?; - - let maybe_job = { - let cursor = txn.read_cursor(&job_bucket)?; - match cursor.get(None, CursorOp::Current) { - Ok((k, v)) => { - let v = v.inner()?.to_serde(); - Some((k, v)) - } - Err(e) => match e { - Error::NotFound => None, - e => return Err(e), - }, - } - }; + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; let lock_bucket = - store.bucket::<&str, ValueBuf>>(Some(Storage::job_lock()))?; + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; - let result = if let Some((maybe_key, job)) = maybe_job { - if let Some(key) = maybe_key { - let job_key = std::str::from_utf8(&key).unwrap(); + let buckets = Buckets::new(&store)?; - self.with_lock(&store, &lock_bucket, job_key, move |_| { - self.run_job(job_key.parse().unwrap()) - }) - .map(|_| Some(job)) - } else { - warn!("Didn't get key from cursor"); - Ok(None) - } - } else { - info!("No jobs queued"); - Ok(None) - }; + trace!("got buckets"); + + let mut txn = store.write_txn()?; + let read_txn = store.read_txn()?; + + let result = + self.with_lock::<_, Vec>(&lock_bucket, &mut txn, b"job-queue", |inner_txn| { + let mut cursor = read_txn.read_cursor(&buckets.queued)?; + trace!("Got cursor"); + match cursor.get(None, CursorOp::First) { + Ok(_) => (), + Err(e) => match e { + Error::NotFound => { + trace!("No items in queue"); + return Ok(vec![]); + } + e => { + return Err(e); + } + }, + } + trace!("Set cursor to first"); + + let initial_value = + Ok((inner_txn, Vec::new())) as Result<(&mut Txn, Vec), Error>; + + trace!("Got lock"); + let (_inner_txn, vec) = + cursor + .iter() + .fold(initial_value, |acc, (key, _)| match acc { + Ok((inner_txn, mut jobs)) => { + if jobs.len() < limit { + self.run_job( + &buckets, + inner_txn, + std::str::from_utf8(key).unwrap().parse().unwrap(), + )?; + + let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + jobs.push(job); + } + + Ok((inner_txn, jobs)) + } + Err(e) => Err(e), + })?; + + Ok(vec) + })?; + + trace!("Committing"); txn.commit()?; - result + trace!("Committed"); + + Ok(result) } - pub fn store_job(&self, mut job: JobInfo, status: JobStatus) -> Result<(), Error> { + pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> { let job_id = match job.id() { Some(id) => id, None => { @@ -129,94 +165,132 @@ impl Storage { } }; - let job_value = Bincode::to_value_buf(job)?; + trace!("Generaged job id, {}", job_id); - { - let store = self.store.write()?; - let bucket = - store.bucket::<&str, ValueBuf>>(Some(Storage::job_store()))?; - - let mut txn = store.write_txn()?; - txn.set(&bucket, &job_id.to_string(), job_value)?; - txn.commit()?; + if let JobStatus::Failed = job.status.clone() { + if job.decrement().should_requeue() { + job.status = JobStatus::Pending; + } } + let status = job.status.clone(); + let job_value = Json::to_value_buf(job)?; + + trace!("Storing job"); + + let store = self.store.write()?; + trace!("Got store"); + let bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; + trace!("Got bucket"); + + let buckets = Buckets::new(&store)?; + + let mut txn = store.write_txn()?; + trace!("Opened write txn"); + txn.set(&bucket, job_id.to_string().as_ref(), job_value)?; + trace!("Set value"); + match status { - JobStatus::Pending => self.queue_job(job_id)?, - JobStatus::Running => self.run_job(job_id)?, - JobStatus::Failed => self.fail_job(job_id)?, - JobStatus::Finished => self.finish_job(job_id)?, + JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id)?, + JobStatus::Running => self.run_job(&buckets, &mut txn, job_id)?, + JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id)?, + JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id)?, } - Ok(()) - } + trace!("Committing"); - fn queue_job(&self, id: usize) -> Result<(), Error> { - self.add_job_to(id, Storage::job_queue())?; - self.delete_job_from(id, Storage::job_failed())?; - self.delete_job_from(id, Storage::job_running())?; - self.delete_job_from(id, Storage::job_finished())?; - - Ok(()) - } - - fn fail_job(&self, id: usize) -> Result<(), Error> { - self.add_job_to(id, Storage::job_failed())?; - self.delete_job_from(id, Storage::job_queue())?; - self.delete_job_from(id, Storage::job_running())?; - self.delete_job_from(id, Storage::job_finished())?; - - Ok(()) - } - - fn run_job(&self, id: usize) -> Result<(), Error> { - self.add_job_to(id, Storage::job_running())?; - self.delete_job_from(id, Storage::job_queue())?; - self.delete_job_from(id, Storage::job_failed())?; - self.delete_job_from(id, Storage::job_finished())?; - - Ok(()) - } - - fn finish_job(&self, id: usize) -> Result<(), Error> { - self.add_job_to(id, Storage::job_finished())?; - self.delete_job_from(id, Storage::job_running())?; - self.delete_job_from(id, Storage::job_queue())?; - self.delete_job_from(id, Storage::job_failed())?; - - Ok(()) - } - - fn add_job_to(&self, id: usize, bucket_name: &str) -> Result<(), Error> { - let store = self.store.write()?; - let bucket = store.bucket::<&str, ValueBuf>>(Some(bucket_name))?; - - let mut txn = store.write_txn()?; - txn.set( - &bucket, - &id.to_string(), - Bincode::to_value_buf(self.runner_id)?, - )?; txn.commit()?; + trace!("Committed"); Ok(()) } - fn delete_job_from(&self, id: usize, bucket_name: &str) -> Result<(), Error> { - let store = self.store.write()?; - let bucket = store.bucket::<&str, ValueBuf>>(Some(bucket_name))?; + fn queue_job<'env>( + &self, + buckets: &'env Buckets<'env>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + self.add_job_to(&buckets.queued, txn, id)?; + self.delete_job_from(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.failed, txn, id)?; + self.delete_job_from(&buckets.running, txn, id)?; - let mut txn = store.write_txn()?; + Ok(()) + } - match txn.del(&bucket, &id.to_string()) { + fn fail_job<'env>( + &self, + buckets: &'env Buckets<'env>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + self.add_job_to(&buckets.failed, txn, id)?; + self.delete_job_from(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.queued, txn, id)?; + + Ok(()) + } + + fn run_job<'env>( + &self, + buckets: &'env Buckets<'env>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + self.add_job_to(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.failed, txn, id)?; + self.delete_job_from(&buckets.queued, txn, id)?; + + Ok(()) + } + + fn finish_job<'env>( + &self, + buckets: &'env Buckets<'env>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + self.add_job_to(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.failed, txn, id)?; + self.delete_job_from(&buckets.queued, txn, id)?; + + Ok(()) + } + + fn add_job_to<'env>( + &self, + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + txn.set( + bucket, + id.to_string().as_ref(), + Json::to_value_buf(self.runner_id)?, + )?; + trace!("Set value"); + + Ok(()) + } + + fn delete_job_from<'env>( + &self, + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + id: usize, + ) -> Result<(), Error> { + match txn.del(bucket, id.to_string().as_ref()) { Ok(_) => (), Err(e) => match e { Error::NotFound => (), e => return Err(e), }, } - - txn.commit()?; + trace!("Deleted value"); Ok(()) } @@ -225,31 +299,30 @@ impl Storage { // // But in the event of multiple processes running on the same machine, it is good to have some // way to make sure they don't step on eachother's toes - fn with_lock( + fn with_lock<'env, F, T>( &self, - store: &RwLockWriteGuard, - bucket: &Bucket<&str, ValueBuf>>, - lock_key: &str, + lock_bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + lock_key: &[u8], callback: F, ) -> Result where - F: Fn(&mut Txn) -> Result, + F: Fn(&mut Txn<'env>) -> Result, { - let mut txn = store.write_txn()?; let mut other_runner_id = 0; loop { - let lock_value = Bincode::to_value_buf(self.runner_id)?; + let lock_value = Json::to_value_buf(self.runner_id)?; let mut inner_txn = txn.txn()?; - let res = inner_txn.set_no_overwrite(bucket, lock_key, lock_value); + let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value); inner_txn.commit()?; match res { Ok(_) => break, Err(e) => { let inner_txn = txn.txn()?; - let res = inner_txn.get(bucket, lock_key); + let res = inner_txn.get(lock_bucket, lock_key); inner_txn.commit()?; match res { @@ -278,10 +351,9 @@ impl Storage { } } - let item = callback(&mut txn)?; + let item = callback(txn)?; - txn.del(bucket, lock_key)?; - txn.commit()?; + txn.del(lock_bucket, lock_key)?; Ok(item) } diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index 767a4e4..708fc51 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -11,10 +11,7 @@ use futures::{ sync::mpsc::{channel, Receiver, SendError, Sender}, Future, Sink, Stream, }; -use jobs_core::{ - storage::{JobStatus, Storage}, - JobInfo, Processor, Processors, -}; +use jobs_core::{storage::Storage, JobInfo, Processor, Processors}; use tokio::timer::Interval; use tokio_threadpool::blocking; @@ -66,7 +63,7 @@ fn return_job( blocking(move || { storage - .store_job(job, JobStatus::Finished) + .store_job(job) .map_err(|e| error!("Error finishing job, {}", e)) }) .map_err(|e| error!("Error blocking, {}", e)) @@ -87,26 +84,26 @@ fn try_process_job( blocking(move || { storage - .dequeue_job() + .dequeue_job(processor_count) .map_err(|e| error!("Error dequeuing job, {}", e)) }) .map_err(|e| error!("Error blocking, {}", e)) }) .and_then(|res| res) .then(move |res| match res { - Ok(maybe_job) => { - if let Some(job) = maybe_job { - // TODO: return JobInfo to DB with job status - tokio::spawn(processors.process_job(job).and_then(move |job| { + Ok(jobs) => Ok(jobs.into_iter().fold( + (processors, processor_count), + move |(proc, count), job| { + let tx = tx.clone(); + tokio::spawn(proc.process_job(job).and_then(move |job| { tx.send(ProcessorMessage::Job(job)) .map(|_| ()) .map_err(|e| error!("Error returning job, {}", e)) })); - Ok((processors, processor_count - 1)) - } else { - Ok((processors, processor_count)) - } - } + + (proc, count - 1) + }, + )), Err(_) => Ok((processors, processor_count)), }); @@ -123,7 +120,7 @@ fn process_jobs( tx: Sender, rx: Receiver, ) -> impl Future { - Interval::new(Instant::now(), Duration::from_millis(500)) + Interval::new(tokio::clock::now(), Duration::from_millis(500)) .map(ProcessorMessage::Time) .map_err(|e| error!("Error in timer, {}", e)) .select(rx) @@ -142,10 +139,14 @@ fn process_jobs( processors, tx.clone(), ))), - ProcessorMessage::Stop => Either::B(Either::B(Err(()).into_future())), + ProcessorMessage::Stop => { + info!("Got stop message"); + Either::B(Either::B(Err(()).into_future())) + } }, ) - .map(|_| ()) + .map(|_| info!("Terminating processor")) + .map_err(|_| info!("Terminating processor")) } pub struct JobRunner { @@ -178,6 +179,14 @@ impl JobRunner { self.processors.register_processor(processor); } + pub fn spawn(self) -> ProcessorHandle { + let spawner = self.sender.clone(); + + tokio::spawn(self.runner()); + + ProcessorHandle { spawner } + } + fn runner(self) -> impl Future { let JobRunner { processors, @@ -207,7 +216,7 @@ impl JobRunner { let storage = storage.clone(); blocking(|| { storage - .store_job(job, JobStatus::Pending) + .store_job(job) .map_err(|e| error!("Error storing job, {}", e)) .map(|_| storage) }) @@ -218,18 +227,10 @@ impl JobRunner { }) .and_then(|_| { tx2.send(ProcessorMessage::Stop) - .map(|_| ()) + .map(|_| info!("Sent stop message")) .map_err(|e| error!("Error shutting down processor, {}", e)) }) } - - pub fn spawn(self) -> ProcessorHandle { - let spawner = self.sender.clone(); - - tokio::spawn(self.runner()); - - ProcessorHandle { spawner } - } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index e69de29..37a447d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -0,0 +1,7 @@ +pub use jobs_core::{ + storage::Storage, Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, + ShouldStop, +}; + +#[cfg(feature = "jobs-tokio")] +pub use jobs_tokio::{JobRunner, ProcessorHandle};