diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index e28f319..9d4807b 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" failure = "0.1" futures = "0.1.21" log = "0.4" -rkv = "0.5" +kv = "0.6" +lmdb = "0.8" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 49796a6..875441a 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -5,13 +5,15 @@ extern crate log; #[macro_use] extern crate serde_derive; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use failure::Error; use futures::future::{Either, Future, IntoFuture}; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value; +pub mod storage; + #[derive(Debug, Fail)] pub enum JobError { #[fail(display = "Error performing job: {}", _0)] @@ -92,6 +94,7 @@ pub trait Processor { status: JobStatus::Pending, args: serde_json::to_value(args)?, retry_count: max_retries.unwrap_or(Self::max_retries()), + requeued_at: None, }; Ok(job) @@ -136,13 +139,13 @@ pub enum MaxRetries { } #[derive(Clone, Debug, Eq, PartialEq)] -enum ShouldStop { +pub enum ShouldStop { LimitReached, Requeue, } impl ShouldStop { - fn should_requeue(&self) -> bool { + pub fn should_requeue(&self) -> bool { *self == ShouldStop::Requeue } } @@ -164,7 +167,7 @@ impl MaxRetries { } } -#[derive(Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct JobInfo { /// ID of the job, None means an ID has not been set id: Option, @@ -180,10 +183,13 @@ pub struct JobInfo { /// Retries left for this job, None means no limit retry_count: MaxRetries, + + /// The time this job was re-queued + requeued_at: Option, } impl JobInfo { - fn decrement(&mut self) -> ShouldStop { + pub fn decrement(&mut self) -> ShouldStop { self.retry_count.decrement() } @@ -198,35 +204,11 @@ impl JobInfo { } } -#[derive(Deserialize, Serialize)] -pub struct Jobs { - inner: VecDeque, -} - -impl Jobs { - fn queue(&mut self, job: JobInfo) { - self.inner.push_back(job); - } - - fn pop(&mut self) -> Option { - self.inner.pop_front() - } -} - -impl Default for Jobs { - fn default() -> Self { - Jobs { - inner: Default::default(), - } - } -} - pub type ProcessFn = Box Box + Send> + Send>; pub struct Processors { inner: HashMap, - jobs: Jobs, } impl Processors { @@ -244,67 +226,42 @@ impl Processors { ); } - pub fn queue(&mut self, job: JobInfo) { - self.jobs.queue(job); - } + pub fn process_job(&self, job: JobInfo) -> impl Future { + let opt = self + .inner + .get(&job.processor) + .map(|processor| process(processor, job.clone())); - pub fn turn(mut self) -> impl Future { - match self.jobs.pop() { - Some(job) => Either::A(self.process_job(job)), - None => Either::B(Ok(self).into_future()), + if let Some(fut) = opt { + Either::A(fut) + } else { + error!("Processor {} not present", job.processor); + Either::B(Ok(job).into_future()) } } - - fn process_job(mut self, job: JobInfo) -> impl Future { - let processor = self.inner.remove(&job.processor); - - processor - .ok_or_else(|| { - error!("No processor"); - () - }) - .into_future() - .and_then(move |processor| process(self, processor, job)) - } } impl Default for Processors { fn default() -> Self { Processors { inner: Default::default(), - jobs: Default::default(), } } } -fn process( - mut processors: Processors, - process_fn: ProcessFn, - mut job: JobInfo, -) -> impl Future { +fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future { let args = job.args.clone(); let processor = job.processor.clone(); - let fut = process_fn(args).then(move |res| match res { - Ok(_) => Ok(info!("Job completed, {}", processor)), + process_fn(args).then(move |res| match res { + Ok(_) => { + info!("Job completed, {}", processor); + Ok(job) + } Err(e) => { error!("Job errored, {}, {}", processor, e); - Err(e) + Ok(job) } - }); - - processors.inner.insert(job.processor.clone(), process_fn); - - fut.then(|res| { - if let Err(e) = res { - if job.decrement().should_requeue() { - processors.jobs.queue(job); - } else { - error!("Job failed permanently, {}, {}", &job.processor, e); - } - } - - Ok(processors) }) } diff --git a/jobs-tokio/src/storage.rs b/jobs-core/src/storage.rs similarity index 56% rename from jobs-tokio/src/storage.rs rename to jobs-core/src/storage.rs index 1c5db1f..a0c933e 100644 --- a/jobs-tokio/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,10 +1,10 @@ use std::{ path::PathBuf, - sync::{Arc, RwLock}, + sync::{Arc, RwLock, RwLockWriteGuard}, }; -use jobs_core::JobInfo; -use kv::{bincode::Bincode, Config, Error, Manager, Serde, Store, ValueBuf}; +use crate::JobInfo; +use kv::{bincode::Bincode, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; use lmdb::Error as LmdbError; /// Set the status of a job when storing it @@ -17,8 +17,12 @@ pub enum JobStatus { /// Job has failed Failed, + + /// Job has finished + Finished, } +#[derive(Clone)] pub struct Storage { runner_id: usize, store: Arc>, @@ -48,60 +52,73 @@ impl Storage { let bucket = store.bucket::<&str, ValueBuf>>(Some(Storage::id_store()))?; - let mut txn = store.write_txn()?; + let new_id = self.with_lock(&store, &bucket, "id-lock", |txn| { + let id = match txn.get(&bucket, "current-id") { + Ok(id) => id.inner()?.to_serde(), + Err(e) => match e { + Error::NotFound => 1, + _ => return Err(e), + }, + }; - let mut other_runner_id = 0; + let new_id = id + 1; - loop { - let lock_value = Bincode::to_value_buf(self.runner_id)?; - match txn.set_no_overwrite(&bucket, "id-lock", lock_value) { - Ok(_) => break, - Err(e) => { - match txn.get(&bucket, "id-lock") { - Ok(other_id) => { - let other_id = other_id.inner()?.to_serde(); + let new_id_value = Bincode::to_value_buf(new_id)?; + txn.set(&bucket, "current-id", new_id_value)?; - if other_runner_id != other_id { - other_runner_id = other_id; - info!("Id lock held by runner {}", other_id); - } - } - Err(e) => match e { - Error::NotFound => continue, - e => return Err(e), - }, - } - - match e { - Error::LMDB(lmdb) => match lmdb { - LmdbError::KeyExist => continue, - e => return Err(Error::LMDB(e)), - }, - e => return Err(e), - } - } - } - } - - let id = match txn.get(&bucket, "current-id") { - Ok(id) => id.inner()?.to_serde(), - Err(e) => match e { - Error::NotFound => 1, - _ => return Err(e), - }, - }; - - let new_id = id + 1; - - let new_id_value = Bincode::to_value_buf(new_id)?; - txn.set(&bucket, "current-id", new_id_value)?; - - txn.del(&bucket, "id-lock")?; - txn.commit()?; + Ok(new_id) + })?; Ok(new_id) } + pub fn dequeue_job(&self) -> Result, Error> { + let store = self.store.write()?; + + let job_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_queue()))?; + + let txn = store.read_txn()?; + + let maybe_job = { + let cursor = txn.read_cursor(&job_bucket)?; + match cursor.get(None, CursorOp::Current) { + Ok((k, v)) => { + let v = v.inner()?.to_serde(); + Some((k, v)) + } + Err(e) => match e { + Error::NotFound => None, + e => return Err(e), + }, + } + }; + + let lock_bucket = + store.bucket::<&str, ValueBuf>>(Some(Storage::job_lock()))?; + + let result = if let Some((maybe_key, job)) = maybe_job { + if let Some(key) = maybe_key { + let job_key = std::str::from_utf8(&key).unwrap(); + + self.with_lock(&store, &lock_bucket, job_key, move |_| { + self.run_job(job_key.parse().unwrap()) + }) + .map(|_| Some(job)) + } else { + warn!("Didn't get key from cursor"); + Ok(None) + } + } else { + info!("No jobs queued"); + Ok(None) + }; + + txn.commit()?; + + result + } + pub fn store_job(&self, mut job: JobInfo, status: JobStatus) -> Result<(), Error> { let job_id = match job.id() { Some(id) => id, @@ -128,6 +145,7 @@ impl Storage { JobStatus::Pending => self.queue_job(job_id)?, JobStatus::Running => self.run_job(job_id)?, JobStatus::Failed => self.fail_job(job_id)?, + JobStatus::Finished => self.finish_job(job_id)?, } Ok(()) @@ -137,6 +155,7 @@ impl Storage { self.add_job_to(id, Storage::job_queue())?; self.delete_job_from(id, Storage::job_failed())?; self.delete_job_from(id, Storage::job_running())?; + self.delete_job_from(id, Storage::job_finished())?; Ok(()) } @@ -145,6 +164,7 @@ impl Storage { self.add_job_to(id, Storage::job_failed())?; self.delete_job_from(id, Storage::job_queue())?; self.delete_job_from(id, Storage::job_running())?; + self.delete_job_from(id, Storage::job_finished())?; Ok(()) } @@ -153,6 +173,16 @@ impl Storage { self.add_job_to(id, Storage::job_running())?; self.delete_job_from(id, Storage::job_queue())?; self.delete_job_from(id, Storage::job_failed())?; + self.delete_job_from(id, Storage::job_finished())?; + + Ok(()) + } + + fn finish_job(&self, id: usize) -> Result<(), Error> { + self.add_job_to(id, Storage::job_finished())?; + self.delete_job_from(id, Storage::job_running())?; + self.delete_job_from(id, Storage::job_queue())?; + self.delete_job_from(id, Storage::job_failed())?; Ok(()) } @@ -191,13 +221,80 @@ impl Storage { Ok(()) } - fn buckets() -> [&'static str; 5] { + // In all likelihood, this function is not necessary + // + // But in the event of multiple processes running on the same machine, it is good to have some + // way to make sure they don't step on eachother's toes + fn with_lock( + &self, + store: &RwLockWriteGuard, + bucket: &Bucket<&str, ValueBuf>>, + lock_key: &str, + callback: F, + ) -> Result + where + F: Fn(&mut Txn) -> Result, + { + let mut txn = store.write_txn()?; + let mut other_runner_id = 0; + + loop { + let lock_value = Bincode::to_value_buf(self.runner_id)?; + + let mut inner_txn = txn.txn()?; + let res = inner_txn.set_no_overwrite(bucket, lock_key, lock_value); + inner_txn.commit()?; + + match res { + Ok(_) => break, + Err(e) => { + let inner_txn = txn.txn()?; + let res = inner_txn.get(bucket, lock_key); + inner_txn.commit()?; + + match res { + Ok(other_id) => { + let other_id = other_id.inner()?.to_serde(); + + if other_runner_id != other_id { + other_runner_id = other_id; + info!("Lock held by runner {}", other_id); + } + } + Err(e) => match e { + Error::NotFound => continue, + e => return Err(e), + }, + } + + match e { + Error::LMDB(lmdb) => match lmdb { + LmdbError::KeyExist => continue, + e => return Err(Error::LMDB(e)), + }, + e => return Err(e), + } + } + } + } + + let item = callback(&mut txn)?; + + txn.del(bucket, lock_key)?; + txn.commit()?; + + Ok(item) + } + + fn buckets() -> [&'static str; 7] { [ Storage::id_store(), Storage::job_store(), Storage::job_queue(), Storage::job_failed(), Storage::job_running(), + Storage::job_lock(), + Storage::job_finished(), ] } @@ -220,4 +317,12 @@ impl Storage { fn job_running() -> &'static str { "job-running" } + + fn job_finished() -> &'static str { + "job-finished" + } + + fn job_lock() -> &'static str { + "job-lock" + } } diff --git a/jobs-tokio/Cargo.toml b/jobs-tokio/Cargo.toml index 2a58d39..bffd9d3 100644 --- a/jobs-tokio/Cargo.toml +++ b/jobs-tokio/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] futures = "0.1" -lmdb = "0.8" log = "0.4" tokio = "0.1" tokio-threadpool = "0.1" diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index ee571b0..767a4e4 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -1,20 +1,23 @@ #[macro_use] extern crate log; -mod storage; - -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; use futures::{ - future::poll_fn, + future::{poll_fn, Either, IntoFuture}, sync::mpsc::{channel, Receiver, SendError, Sender}, Future, Sink, Stream, }; -use jobs_core::{JobInfo, Processor, Processors}; +use jobs_core::{ + storage::{JobStatus, Storage}, + JobInfo, Processor, Processors, +}; +use tokio::timer::Interval; use tokio_threadpool::blocking; -use crate::storage::Storage; - pub struct ProcessorHandle { spawner: Sender, } @@ -28,7 +31,7 @@ impl ProcessorHandle { } } -fn setup_kv(db_path: PathBuf) -> impl Future { +fn setup_kv(runner_id: usize, db_path: PathBuf) -> impl Future { tokio::fs::create_dir_all(db_path.clone()) .map_err(|e| error!("Failed to create db directory: {}", e)) .and_then(move |_| { @@ -36,7 +39,8 @@ fn setup_kv(db_path: PathBuf) -> impl Future { let path = db_path.clone(); blocking(move || { - Storage::init(0, path).map_err(|e| error!("Error initializing db, {}", e)) + Storage::init(runner_id, path) + .map_err(|e| error!("Error initializing db, {}", e)) }) .map_err(|e| error!("Error in blocking, {}", e)) }) @@ -44,15 +48,117 @@ fn setup_kv(db_path: PathBuf) -> impl Future { .and_then(|res| res) } +enum ProcessorMessage { + Job(JobInfo), + Time(Instant), + Stop, +} + +fn return_job( + storage: Storage, + processor_count: usize, + processors: Processors, + job: JobInfo, +) -> impl Future { + poll_fn(move || { + let storage = storage.clone(); + let job = job.clone(); + + blocking(move || { + storage + .store_job(job, JobStatus::Finished) + .map_err(|e| error!("Error finishing job, {}", e)) + }) + .map_err(|e| error!("Error blocking, {}", e)) + }) + .and_then(|res| res) + .map(move |_| (processors, processor_count + 1)) +} + +fn try_process_job( + storage: Storage, + processor_count: usize, + processors: Processors, + tx: Sender, +) -> impl Future { + if processor_count > 0 { + let fut = poll_fn(move || { + let storage = storage.clone(); + + blocking(move || { + storage + .dequeue_job() + .map_err(|e| error!("Error dequeuing job, {}", e)) + }) + .map_err(|e| error!("Error blocking, {}", e)) + }) + .and_then(|res| res) + .then(move |res| match res { + Ok(maybe_job) => { + if let Some(job) = maybe_job { + // TODO: return JobInfo to DB with job status + tokio::spawn(processors.process_job(job).and_then(move |job| { + tx.send(ProcessorMessage::Job(job)) + .map(|_| ()) + .map_err(|e| error!("Error returning job, {}", e)) + })); + Ok((processors, processor_count - 1)) + } else { + Ok((processors, processor_count)) + } + } + Err(_) => Ok((processors, processor_count)), + }); + + Either::A(fut) + } else { + Either::B(Ok((processors, processor_count)).into_future()) + } +} + +fn process_jobs( + storage: Storage, + num_processors: usize, + processors: Processors, + tx: Sender, + rx: Receiver, +) -> impl Future { + Interval::new(Instant::now(), Duration::from_millis(500)) + .map(ProcessorMessage::Time) + .map_err(|e| error!("Error in timer, {}", e)) + .select(rx) + .fold( + (processors, num_processors), + move |(processors, processor_count), msg| match msg { + ProcessorMessage::Job(job) => Either::A(return_job( + storage.clone(), + processor_count, + processors, + job, + )), + ProcessorMessage::Time(_) => Either::B(Either::A(try_process_job( + storage.clone(), + processor_count, + processors, + tx.clone(), + ))), + ProcessorMessage::Stop => Either::B(Either::B(Err(()).into_future())), + }, + ) + .map(|_| ()) +} + pub struct JobRunner { processors: Processors, receiver: Receiver, sender: Sender, db_path: PathBuf, + num_processors: usize, + runner_id: usize, } impl JobRunner { - pub fn new>(db_path: P) -> Self { + pub fn new>(runner_id: usize, num_processors: usize, db_path: P) -> Self { let (tx, rx) = channel::(100); JobRunner { @@ -60,6 +166,8 @@ impl JobRunner { receiver: rx, sender: tx, db_path: db_path.as_ref().to_owned(), + num_processors, + runner_id, } } @@ -74,22 +182,45 @@ impl JobRunner { let JobRunner { processors, receiver, - sender, + sender: _, db_path, + num_processors, + runner_id, } = self; - let _ = sender; - let _ = db_path; + let (tx, rx) = channel::(100); + let tx2 = tx.clone(); - // tokio::spawn(setup_kv(db_path)); + setup_kv(runner_id, db_path) + .and_then(move |storage| { + tokio::spawn(process_jobs( + storage.clone(), + num_processors, + processors, + tx, + rx, + )); - receiver - .fold(processors, |mut processors, job| { - processors.queue(job); - - Box::new(processors.turn()) + receiver.fold(storage, |storage, job| { + poll_fn(move || { + let job = job.clone(); + let storage = storage.clone(); + blocking(|| { + storage + .store_job(job, JobStatus::Pending) + .map_err(|e| error!("Error storing job, {}", e)) + .map(|_| storage) + }) + .map_err(|e| error!("Error blocking, {}", e)) + }) + .and_then(|res| res) + }) + }) + .and_then(|_| { + tx2.send(ProcessorMessage::Stop) + .map(|_| ()) + .map_err(|e| error!("Error shutting down processor, {}", e)) }) - .map(|_| ()) } pub fn spawn(self) -> ProcessorHandle {