Hook things together

This commit is contained in:
asonix 2018-11-05 20:53:03 -06:00
parent 3c8fe03bca
commit 13f31c6e31
5 changed files with 337 additions and 144 deletions

View file

@ -8,7 +8,8 @@ edition = "2018"
failure = "0.1"
futures = "0.1.21"
log = "0.4"
rkv = "0.5"
kv = "0.6"
lmdb = "0.8"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"

View file

@ -5,13 +5,15 @@ extern crate log;
#[macro_use]
extern crate serde_derive;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use failure::Error;
use futures::future::{Either, Future, IntoFuture};
use serde::{de::DeserializeOwned, ser::Serialize};
use serde_json::Value;
pub mod storage;
#[derive(Debug, Fail)]
pub enum JobError {
#[fail(display = "Error performing job: {}", _0)]
@ -92,6 +94,7 @@ pub trait Processor {
status: JobStatus::Pending,
args: serde_json::to_value(args)?,
retry_count: max_retries.unwrap_or(Self::max_retries()),
requeued_at: None,
};
Ok(job)
@ -136,13 +139,13 @@ pub enum MaxRetries {
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum ShouldStop {
pub enum ShouldStop {
LimitReached,
Requeue,
}
impl ShouldStop {
fn should_requeue(&self) -> bool {
pub fn should_requeue(&self) -> bool {
*self == ShouldStop::Requeue
}
}
@ -164,7 +167,7 @@ impl MaxRetries {
}
}
#[derive(Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JobInfo {
/// ID of the job, None means an ID has not been set
id: Option<usize>,
@ -180,10 +183,13 @@ pub struct JobInfo {
/// Retries left for this job, None means no limit
retry_count: MaxRetries,
/// The time this job was re-queued
requeued_at: Option<usize>,
}
impl JobInfo {
fn decrement(&mut self) -> ShouldStop {
pub fn decrement(&mut self) -> ShouldStop {
self.retry_count.decrement()
}
@ -198,35 +204,11 @@ impl JobInfo {
}
}
#[derive(Deserialize, Serialize)]
pub struct Jobs {
inner: VecDeque<JobInfo>,
}
impl Jobs {
fn queue(&mut self, job: JobInfo) {
self.inner.push_back(job);
}
fn pop(&mut self) -> Option<JobInfo> {
self.inner.pop_front()
}
}
impl Default for Jobs {
fn default() -> Self {
Jobs {
inner: Default::default(),
}
}
}
pub type ProcessFn =
Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send>;
pub struct Processors {
inner: HashMap<String, ProcessFn>,
jobs: Jobs,
}
impl Processors {
@ -244,67 +226,42 @@ impl Processors {
);
}
pub fn queue(&mut self, job: JobInfo) {
self.jobs.queue(job);
}
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
let opt = self
.inner
.get(&job.processor)
.map(|processor| process(processor, job.clone()));
pub fn turn(mut self) -> impl Future<Item = Self, Error = ()> {
match self.jobs.pop() {
Some(job) => Either::A(self.process_job(job)),
None => Either::B(Ok(self).into_future()),
if let Some(fut) = opt {
Either::A(fut)
} else {
error!("Processor {} not present", job.processor);
Either::B(Ok(job).into_future())
}
}
fn process_job(mut self, job: JobInfo) -> impl Future<Item = Self, Error = ()> {
let processor = self.inner.remove(&job.processor);
processor
.ok_or_else(|| {
error!("No processor");
()
})
.into_future()
.and_then(move |processor| process(self, processor, job))
}
}
impl Default for Processors {
fn default() -> Self {
Processors {
inner: Default::default(),
jobs: Default::default(),
}
}
}
fn process(
mut processors: Processors,
process_fn: ProcessFn,
mut job: JobInfo,
) -> impl Future<Item = Processors, Error = ()> {
fn process(process_fn: &ProcessFn, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
let args = job.args.clone();
let processor = job.processor.clone();
let fut = process_fn(args).then(move |res| match res {
Ok(_) => Ok(info!("Job completed, {}", processor)),
process_fn(args).then(move |res| match res {
Ok(_) => {
info!("Job completed, {}", processor);
Ok(job)
}
Err(e) => {
error!("Job errored, {}, {}", processor, e);
Err(e)
Ok(job)
}
});
processors.inner.insert(job.processor.clone(), process_fn);
fut.then(|res| {
if let Err(e) = res {
if job.decrement().should_requeue() {
processors.jobs.queue(job);
} else {
error!("Job failed permanently, {}, {}", &job.processor, e);
}
}
Ok(processors)
})
}

View file

@ -1,10 +1,10 @@
use std::{
path::PathBuf,
sync::{Arc, RwLock},
sync::{Arc, RwLock, RwLockWriteGuard},
};
use jobs_core::JobInfo;
use kv::{bincode::Bincode, Config, Error, Manager, Serde, Store, ValueBuf};
use crate::JobInfo;
use kv::{bincode::Bincode, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf};
use lmdb::Error as LmdbError;
/// Set the status of a job when storing it
@ -17,8 +17,12 @@ pub enum JobStatus {
/// Job has failed
Failed,
/// Job has finished
Finished,
}
#[derive(Clone)]
pub struct Storage {
runner_id: usize,
store: Arc<RwLock<Store>>,
@ -48,60 +52,73 @@ impl Storage {
let bucket = store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(Storage::id_store()))?;
let mut txn = store.write_txn()?;
let new_id = self.with_lock(&store, &bucket, "id-lock", |txn| {
let id = match txn.get(&bucket, "current-id") {
Ok(id) => id.inner()?.to_serde(),
Err(e) => match e {
Error::NotFound => 1,
_ => return Err(e),
},
};
let mut other_runner_id = 0;
let new_id = id + 1;
loop {
let lock_value = Bincode::to_value_buf(self.runner_id)?;
match txn.set_no_overwrite(&bucket, "id-lock", lock_value) {
Ok(_) => break,
Err(e) => {
match txn.get(&bucket, "id-lock") {
Ok(other_id) => {
let other_id = other_id.inner()?.to_serde();
let new_id_value = Bincode::to_value_buf(new_id)?;
txn.set(&bucket, "current-id", new_id_value)?;
if other_runner_id != other_id {
other_runner_id = other_id;
info!("Id lock held by runner {}", other_id);
}
}
Err(e) => match e {
Error::NotFound => continue,
e => return Err(e),
},
}
match e {
Error::LMDB(lmdb) => match lmdb {
LmdbError::KeyExist => continue,
e => return Err(Error::LMDB(e)),
},
e => return Err(e),
}
}
}
}
let id = match txn.get(&bucket, "current-id") {
Ok(id) => id.inner()?.to_serde(),
Err(e) => match e {
Error::NotFound => 1,
_ => return Err(e),
},
};
let new_id = id + 1;
let new_id_value = Bincode::to_value_buf(new_id)?;
txn.set(&bucket, "current-id", new_id_value)?;
txn.del(&bucket, "id-lock")?;
txn.commit()?;
Ok(new_id)
})?;
Ok(new_id)
}
pub fn dequeue_job(&self) -> Result<Option<JobInfo>, Error> {
let store = self.store.write()?;
let job_bucket =
store.bucket::<&[u8], ValueBuf<Bincode<JobInfo>>>(Some(Storage::job_queue()))?;
let txn = store.read_txn()?;
let maybe_job = {
let cursor = txn.read_cursor(&job_bucket)?;
match cursor.get(None, CursorOp::Current) {
Ok((k, v)) => {
let v = v.inner()?.to_serde();
Some((k, v))
}
Err(e) => match e {
Error::NotFound => None,
e => return Err(e),
},
}
};
let lock_bucket =
store.bucket::<&str, ValueBuf<Bincode<usize>>>(Some(Storage::job_lock()))?;
let result = if let Some((maybe_key, job)) = maybe_job {
if let Some(key) = maybe_key {
let job_key = std::str::from_utf8(&key).unwrap();
self.with_lock(&store, &lock_bucket, job_key, move |_| {
self.run_job(job_key.parse().unwrap())
})
.map(|_| Some(job))
} else {
warn!("Didn't get key from cursor");
Ok(None)
}
} else {
info!("No jobs queued");
Ok(None)
};
txn.commit()?;
result
}
pub fn store_job(&self, mut job: JobInfo, status: JobStatus) -> Result<(), Error> {
let job_id = match job.id() {
Some(id) => id,
@ -128,6 +145,7 @@ impl Storage {
JobStatus::Pending => self.queue_job(job_id)?,
JobStatus::Running => self.run_job(job_id)?,
JobStatus::Failed => self.fail_job(job_id)?,
JobStatus::Finished => self.finish_job(job_id)?,
}
Ok(())
@ -137,6 +155,7 @@ impl Storage {
self.add_job_to(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
@ -145,6 +164,7 @@ impl Storage {
self.add_job_to(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
@ -153,6 +173,16 @@ impl Storage {
self.add_job_to(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
self.delete_job_from(id, Storage::job_finished())?;
Ok(())
}
fn finish_job(&self, id: usize) -> Result<(), Error> {
self.add_job_to(id, Storage::job_finished())?;
self.delete_job_from(id, Storage::job_running())?;
self.delete_job_from(id, Storage::job_queue())?;
self.delete_job_from(id, Storage::job_failed())?;
Ok(())
}
@ -191,13 +221,80 @@ impl Storage {
Ok(())
}
fn buckets() -> [&'static str; 5] {
// In all likelihood, this function is not necessary
//
// But in the event of multiple processes running on the same machine, it is good to have some
// way to make sure they don't step on eachother's toes
fn with_lock<F, T>(
&self,
store: &RwLockWriteGuard<Store>,
bucket: &Bucket<&str, ValueBuf<Bincode<usize>>>,
lock_key: &str,
callback: F,
) -> Result<T, Error>
where
F: Fn(&mut Txn) -> Result<T, Error>,
{
let mut txn = store.write_txn()?;
let mut other_runner_id = 0;
loop {
let lock_value = Bincode::to_value_buf(self.runner_id)?;
let mut inner_txn = txn.txn()?;
let res = inner_txn.set_no_overwrite(bucket, lock_key, lock_value);
inner_txn.commit()?;
match res {
Ok(_) => break,
Err(e) => {
let inner_txn = txn.txn()?;
let res = inner_txn.get(bucket, lock_key);
inner_txn.commit()?;
match res {
Ok(other_id) => {
let other_id = other_id.inner()?.to_serde();
if other_runner_id != other_id {
other_runner_id = other_id;
info!("Lock held by runner {}", other_id);
}
}
Err(e) => match e {
Error::NotFound => continue,
e => return Err(e),
},
}
match e {
Error::LMDB(lmdb) => match lmdb {
LmdbError::KeyExist => continue,
e => return Err(Error::LMDB(e)),
},
e => return Err(e),
}
}
}
}
let item = callback(&mut txn)?;
txn.del(bucket, lock_key)?;
txn.commit()?;
Ok(item)
}
fn buckets() -> [&'static str; 7] {
[
Storage::id_store(),
Storage::job_store(),
Storage::job_queue(),
Storage::job_failed(),
Storage::job_running(),
Storage::job_lock(),
Storage::job_finished(),
]
}
@ -220,4 +317,12 @@ impl Storage {
fn job_running() -> &'static str {
"job-running"
}
fn job_finished() -> &'static str {
"job-finished"
}
fn job_lock() -> &'static str {
"job-lock"
}
}

View file

@ -6,7 +6,6 @@ edition = "2018"
[dependencies]
futures = "0.1"
lmdb = "0.8"
log = "0.4"
tokio = "0.1"
tokio-threadpool = "0.1"

View file

@ -1,20 +1,23 @@
#[macro_use]
extern crate log;
mod storage;
use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
time::{Duration, Instant},
};
use futures::{
future::poll_fn,
future::{poll_fn, Either, IntoFuture},
sync::mpsc::{channel, Receiver, SendError, Sender},
Future, Sink, Stream,
};
use jobs_core::{JobInfo, Processor, Processors};
use jobs_core::{
storage::{JobStatus, Storage},
JobInfo, Processor, Processors,
};
use tokio::timer::Interval;
use tokio_threadpool::blocking;
use crate::storage::Storage;
pub struct ProcessorHandle {
spawner: Sender<JobInfo>,
}
@ -28,7 +31,7 @@ impl ProcessorHandle {
}
}
fn setup_kv(db_path: PathBuf) -> impl Future<Item = Storage, Error = ()> {
fn setup_kv(runner_id: usize, db_path: PathBuf) -> impl Future<Item = Storage, Error = ()> {
tokio::fs::create_dir_all(db_path.clone())
.map_err(|e| error!("Failed to create db directory: {}", e))
.and_then(move |_| {
@ -36,7 +39,8 @@ fn setup_kv(db_path: PathBuf) -> impl Future<Item = Storage, Error = ()> {
let path = db_path.clone();
blocking(move || {
Storage::init(0, path).map_err(|e| error!("Error initializing db, {}", e))
Storage::init(runner_id, path)
.map_err(|e| error!("Error initializing db, {}", e))
})
.map_err(|e| error!("Error in blocking, {}", e))
})
@ -44,15 +48,117 @@ fn setup_kv(db_path: PathBuf) -> impl Future<Item = Storage, Error = ()> {
.and_then(|res| res)
}
enum ProcessorMessage {
Job(JobInfo),
Time(Instant),
Stop,
}
fn return_job(
storage: Storage,
processor_count: usize,
processors: Processors,
job: JobInfo,
) -> impl Future<Item = (Processors, usize), Error = ()> {
poll_fn(move || {
let storage = storage.clone();
let job = job.clone();
blocking(move || {
storage
.store_job(job, JobStatus::Finished)
.map_err(|e| error!("Error finishing job, {}", e))
})
.map_err(|e| error!("Error blocking, {}", e))
})
.and_then(|res| res)
.map(move |_| (processors, processor_count + 1))
}
fn try_process_job(
storage: Storage,
processor_count: usize,
processors: Processors,
tx: Sender<ProcessorMessage>,
) -> impl Future<Item = (Processors, usize), Error = ()> {
if processor_count > 0 {
let fut = poll_fn(move || {
let storage = storage.clone();
blocking(move || {
storage
.dequeue_job()
.map_err(|e| error!("Error dequeuing job, {}", e))
})
.map_err(|e| error!("Error blocking, {}", e))
})
.and_then(|res| res)
.then(move |res| match res {
Ok(maybe_job) => {
if let Some(job) = maybe_job {
// TODO: return JobInfo to DB with job status
tokio::spawn(processors.process_job(job).and_then(move |job| {
tx.send(ProcessorMessage::Job(job))
.map(|_| ())
.map_err(|e| error!("Error returning job, {}", e))
}));
Ok((processors, processor_count - 1))
} else {
Ok((processors, processor_count))
}
}
Err(_) => Ok((processors, processor_count)),
});
Either::A(fut)
} else {
Either::B(Ok((processors, processor_count)).into_future())
}
}
fn process_jobs(
storage: Storage,
num_processors: usize,
processors: Processors,
tx: Sender<ProcessorMessage>,
rx: Receiver<ProcessorMessage>,
) -> impl Future<Item = (), Error = ()> {
Interval::new(Instant::now(), Duration::from_millis(500))
.map(ProcessorMessage::Time)
.map_err(|e| error!("Error in timer, {}", e))
.select(rx)
.fold(
(processors, num_processors),
move |(processors, processor_count), msg| match msg {
ProcessorMessage::Job(job) => Either::A(return_job(
storage.clone(),
processor_count,
processors,
job,
)),
ProcessorMessage::Time(_) => Either::B(Either::A(try_process_job(
storage.clone(),
processor_count,
processors,
tx.clone(),
))),
ProcessorMessage::Stop => Either::B(Either::B(Err(()).into_future())),
},
)
.map(|_| ())
}
pub struct JobRunner {
processors: Processors,
receiver: Receiver<JobInfo>,
sender: Sender<JobInfo>,
db_path: PathBuf,
num_processors: usize,
runner_id: usize,
}
impl JobRunner {
pub fn new<P: AsRef<Path>>(db_path: P) -> Self {
pub fn new<P: AsRef<Path>>(runner_id: usize, num_processors: usize, db_path: P) -> Self {
let (tx, rx) = channel::<JobInfo>(100);
JobRunner {
@ -60,6 +166,8 @@ impl JobRunner {
receiver: rx,
sender: tx,
db_path: db_path.as_ref().to_owned(),
num_processors,
runner_id,
}
}
@ -74,22 +182,45 @@ impl JobRunner {
let JobRunner {
processors,
receiver,
sender,
sender: _,
db_path,
num_processors,
runner_id,
} = self;
let _ = sender;
let _ = db_path;
let (tx, rx) = channel::<ProcessorMessage>(100);
let tx2 = tx.clone();
// tokio::spawn(setup_kv(db_path));
setup_kv(runner_id, db_path)
.and_then(move |storage| {
tokio::spawn(process_jobs(
storage.clone(),
num_processors,
processors,
tx,
rx,
));
receiver
.fold(processors, |mut processors, job| {
processors.queue(job);
Box::new(processors.turn())
receiver.fold(storage, |storage, job| {
poll_fn(move || {
let job = job.clone();
let storage = storage.clone();
blocking(|| {
storage
.store_job(job, JobStatus::Pending)
.map_err(|e| error!("Error storing job, {}", e))
.map(|_| storage)
})
.map_err(|e| error!("Error blocking, {}", e))
})
.and_then(|res| res)
})
})
.and_then(|_| {
tx2.send(ProcessorMessage::Stop)
.map(|_| ())
.map_err(|e| error!("Error shutting down processor, {}", e))
})
.map(|_| ())
}
pub fn spawn(self) -> ProcessorHandle {