diff --git a/Cargo.toml b/Cargo.toml index bf1f45f..3a4d5bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,28 +1,34 @@ [package] name = "background-jobs" -description = "Background Jobs implemented with tokio and futures" +description = "Background Jobs implemented with sled, actix, and futures" version = "0.5.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" readme = "README.md" -keywords = ["jobs", "processor"] +keywords = ["jobs", "processor", "actix", "sled"] edition = "2018" [workspace] members = [ "jobs-actix", "jobs-core", + "jobs-sled", ] [features] -default = ["background-jobs-actix"] +default = ["background-jobs-actix", "background-jobs-sled-storage"] [dependencies.background-jobs-core] -version = "0.4" +version = "0.5" path = "jobs-core" [dependencies.background-jobs-actix] version = "0.5" path = "jobs-actix" optional = true + +[dependencies.background-jobs-sled-storage] +version = "0.1" +path = "jobs-sled" +optional = true diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 8fd6574..97d84b1 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" [dependencies] actix = "0.8" -background-jobs-core = { version = "0.4", path = "../jobs-core" } +background-jobs-core = { version = "0.5", path = "../jobs-core" } chrono = "0.4" failure = "0.1" futures = "0.1" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 8792811..657da9e 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use actix::{Actor, Addr, SyncArbiter}; use background_jobs_core::{Processor, ProcessorMap, Stats, Storage}; @@ -12,29 +12,29 @@ pub use self::{server::Server, worker::LocalWorker}; use self::{ pinger::Pinger, - server::{CheckDb, EitherJob, GetStats, RequestJob}, + server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob}, worker::ProcessJob, }; -pub struct ServerConfig { - server_id: usize, - db_path: PathBuf, +pub struct ServerConfig { + storage: S, } -impl ServerConfig { - pub fn new(server_id: usize, db_path: PathBuf) -> Self { - ServerConfig { server_id, db_path } +impl ServerConfig +where + S: Storage + Sync + 'static, +{ + pub fn new(storage: S) -> Self { + ServerConfig { storage } } - pub fn start(self) -> QueueHandle + pub fn start(self) -> QueueHandle where - S: Clone + 'static, + State: Clone + 'static, { - let ServerConfig { server_id, db_path } = self; + let ServerConfig { storage } = self; - let server = SyncArbiter::start(1, move || { - Server::new(server_id, Storage::init(db_path.clone()).unwrap()) - }); + let server = SyncArbiter::start(4, move || Server::new(storage.clone())); Pinger::new(server.clone()).start(); @@ -42,19 +42,19 @@ impl ServerConfig { } } -pub struct WorkerConfig +pub struct WorkerConfig where - S: Clone + 'static, + State: Clone + 'static, { - processors: ProcessorMap, - queues: BTreeMap, + processors: ProcessorMap, + queues: BTreeMap, } -impl WorkerConfig +impl WorkerConfig where - S: Clone + 'static, + State: Clone + 'static, { - pub fn new(state_fn: impl Fn() -> S + Send + Sync + 'static) -> Self { + pub fn new(state_fn: impl Fn() -> State + Send + Sync + 'static) -> Self { WorkerConfig { processors: ProcessorMap::new(Box::new(state_fn)), queues: BTreeMap::new(), @@ -63,17 +63,20 @@ where pub fn register

(&mut self, processor: P) where - P: Processor + Send + 'static, + P: Processor + Send + 'static, { self.queues.insert(P::QUEUE.to_owned(), 4); self.processors.register_processor(processor); } - pub fn set_processor_count(&mut self, queue: &str, count: usize) { + pub fn set_processor_count(&mut self, queue: &str, count: u64) { self.queues.insert(queue.to_owned(), count); } - pub fn start(self, queue_handle: QueueHandle) { + pub fn start(self, queue_handle: QueueHandle) + where + S: Storage + 'static, + { let processors = Arc::new(self.processors); self.queues.into_iter().fold(0, |acc, (key, count)| { @@ -93,22 +96,24 @@ where } #[derive(Clone)] -pub struct QueueHandle +pub struct QueueHandle where - S: Clone + 'static, + S: Storage + 'static, + State: Clone + 'static, { - inner: Addr>>, + inner: Addr>>, } -impl QueueHandle +impl QueueHandle where - S: Clone + 'static, + S: Storage + 'static, + State: Clone + 'static, { pub fn queue

(&self, job: P::Job) -> Result<(), Error> where - P: Processor, + P: Processor, { - self.inner.do_send(EitherJob::New(P::new_job(job)?)); + self.inner.do_send(NewJob(P::new_job(job)?)); Ok(()) } diff --git a/jobs-actix/src/pinger.rs b/jobs-actix/src/pinger.rs index dde21de..78a188c 100644 --- a/jobs-actix/src/pinger.rs +++ b/jobs-actix/src/pinger.rs @@ -1,29 +1,33 @@ use std::time::Duration; use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext}; +use background_jobs_core::Storage; use crate::{CheckDb, ProcessJob, Server}; -pub struct Pinger +pub struct Pinger where + S: Storage + 'static, W: Actor + Handler, { - server: Addr>, + server: Addr>, } -impl Pinger +impl Pinger where + S: Storage + 'static, W: Actor + Handler, { - pub fn new(server: Addr>) -> Self { + pub fn new(server: Addr>) -> Self { Pinger { server } } } -impl Actor for Pinger +impl Actor for Pinger where + S: Storage + 'static, W: Actor + Handler, - Server: Actor>> + Handler, + Server: Actor>> + Handler, { type Context = Context; diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index d3ee3cc..2d49460 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,20 +1,24 @@ use std::collections::{HashMap, VecDeque}; use actix::{Actor, Addr, Context, Handler, Message, SyncContext}; -use background_jobs_core::{JobInfo, NewJobInfo, Stats, Storage}; +use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use failure::Error; -use log::{debug, trace}; +use log::trace; use serde_derive::Deserialize; use crate::ProcessJob; #[derive(Clone, Debug, Deserialize)] -pub enum EitherJob { - New(NewJobInfo), - Existing(JobInfo), +pub struct NewJob(pub(crate) NewJobInfo); + +#[derive(Clone, Debug, Deserialize)] +pub struct ReturningJob(pub(crate) ReturnJobInfo); + +impl Message for NewJob { + type Result = Result<(), Error>; } -impl Message for EitherJob { +impl Message for ReturningJob { type Result = Result<(), Error>; } @@ -22,7 +26,7 @@ pub struct RequestJob where W: Actor + Handler, { - worker_id: usize, + worker_id: u64, queue: String, addr: Addr, } @@ -31,7 +35,7 @@ impl RequestJob where W: Actor + Handler, { - pub fn new(worker_id: usize, queue: &str, addr: Addr) -> Self { + pub fn new(worker_id: u64, queue: &str, addr: Addr) -> Self { RequestJob { worker_id, queue: queue.to_owned(), @@ -59,124 +63,60 @@ impl Message for GetStats { type Result = Result; } -struct Cache +pub struct Server where + S: Storage + 'static, W: Actor + Handler, { - workers: VecDeque>, - jobs: VecDeque, + storage: S, + cache: HashMap>>, } -impl Cache +impl Server where + S: Storage + 'static, W: Actor + Handler, { - fn new() -> Self { - Cache { - workers: VecDeque::new(), - jobs: VecDeque::new(), - } - } -} - -pub struct Server -where - W: Actor + Handler, -{ - server_id: usize, - storage: Storage, - cache: HashMap>, - cache_size: usize, -} - -impl Server -where - W: Actor + Handler, -{ - pub fn new(server_id: usize, storage: Storage) -> Self { + pub fn new(storage: S) -> Self { Server { - server_id, storage, cache: HashMap::new(), - cache_size: 25, - } - } - - pub fn set_cache_size(&mut self, cache_size: usize) { - self.cache_size = cache_size; - } - - fn populate(&mut self, queue: &str) -> Result { - trace!("Populating queue {}", queue); - let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new()); - - if entry.jobs.is_empty() { - let new_jobs = self - .storage - .stage_jobs(self.cache_size, queue, self.server_id)?; - let empty = new_jobs.is_empty(); - - debug!("Retrieved {} jobs from storage", new_jobs.len()); - trace!("{:?}", new_jobs.iter().map(|j| j.id()).collect::>()); - - new_jobs - .into_iter() - .for_each(|job| entry.jobs.push_back(job)); - Ok(!empty) - } else { - Ok(true) } } } -impl Actor for Server +impl Actor for Server where + S: Storage + 'static, W: Actor + Handler, { type Context = SyncContext; - - fn started(&mut self, _: &mut Self::Context) { - self.storage.requeue_staged_jobs(self.server_id).unwrap(); - self.storage.check_stalled_jobs(self.server_id).unwrap(); - } } -impl Handler for Server +impl Handler for Server where + S: Storage + 'static, W: Actor> + Handler, { type Result = Result<(), Error>; - fn handle(&mut self, msg: EitherJob, _: &mut Self::Context) -> Self::Result { - let mut job = match msg { - EitherJob::New(new_job) => { - let job = self.storage.assign_id(new_job, self.server_id)?; - debug!("Created job {}, {:?}", job.id(), job); - job - } - EitherJob::Existing(job) => job, - }; + fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result { + let queue = msg.0.queue().to_owned(); + let ready = msg.0.is_ready(); + self.storage.new_job(msg.0)?; - let retry_now = job.is_pending() || (job.needs_retry() && job.retry_ready()); - - if job.is_pending() && !retry_now { - trace!("Storing job {} for later processing", job.id()); - } - self.storage.store_job(job.clone(), self.server_id)?; - - if retry_now { + if ready { let entry = self .cache - .entry(job.queue().to_owned()) - .or_insert(Cache::new()); + .entry(queue.clone()) + .or_insert(VecDeque::new()); - if let Some(worker) = entry.workers.pop_front() { - debug!("Retrying job {} on worker {}", job.id(), worker.worker_id); - worker.addr.do_send(ProcessJob::new(job.clone())); - job.set_running(); - self.storage.store_job(job, worker.worker_id)?; - } else if entry.jobs.len() < self.cache_size { - entry.jobs.push_back(job); + if let Some(request) = entry.pop_front() { + if let Some(job) = self.storage.request_job(&queue, request.worker_id)? { + request.addr.do_send(ProcessJob::new(job)); + } else { + entry.push_back(request); + } } } @@ -184,83 +124,76 @@ where } } -impl Handler> for Server +impl Handler for Server where + S: Storage + 'static, + W: Actor> + Handler, +{ + type Result = Result<(), Error>; + + fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result { + self.storage.return_job(msg.0).map_err(|e| e.into()) + } +} + +impl Handler> for Server +where + S: Storage + 'static, W: Actor> + Handler, { type Result = Result<(), Error>; fn handle(&mut self, msg: RequestJob, _: &mut Self::Context) -> Self::Result { trace!("Worker {} requested job", msg.worker_id); - self.populate(&msg.queue)?; + let job = self.storage.request_job(&msg.queue, msg.worker_id)?; - let job = self - .cache - .get_mut(&msg.queue) - .and_then(|cache| cache.jobs.pop_front()); - - if let Some(mut job) = job { + if let Some(job) = job { msg.addr.do_send(ProcessJob::new(job.clone())); - job.set_running(); - self.storage.store_job(job, msg.worker_id)?; } else { trace!("storing worker {} for queue {}", msg.worker_id, msg.queue); - let entry = self.cache.entry(msg.queue.clone()).or_insert(Cache::new()); - entry.workers.push_back(msg); + let entry = self + .cache + .entry(msg.queue.to_owned()) + .or_insert(VecDeque::new()); + entry.push_back(msg); } Ok(()) } } -impl Handler for Server +impl Handler for Server where + S: Storage + 'static, W: Actor> + Handler, { type Result = Result<(), Error>; fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result { trace!("Checkdb"); - let queues: Vec = self.cache.keys().cloned().collect(); - let mut todo = Vec::new(); - - for queue in queues { - if self.populate(&queue)? { - debug!("Cached jobs for {}", queue); + for (queue, workers) in self.cache.iter_mut() { + if let Some(request) = workers.pop_front() { + if let Some(job) = self.storage.request_job(queue, request.worker_id)? { + request.addr.do_send(ProcessJob::new(job)); + } else { + workers.push_back(request); + } } - - let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new()); - - let min_len = entry.jobs.len().min(entry.workers.len()); - - entry - .jobs - .drain(..min_len) - .zip(entry.workers.drain(..min_len)) - .for_each(|pair| { - todo.push(pair); - }); - } - - for (mut job, worker) in todo { - debug!("Sending job {} to worker {}", job.id(), worker.worker_id); - worker.addr.do_send(ProcessJob::new(job.clone())); - job.set_running(); - self.storage.store_job(job, worker.worker_id)?; } Ok(()) } } -impl Handler for Server +impl Handler for Server where + S: Storage + 'static, W: Actor> + Handler, { type Result = Result; fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result { - Ok(self.storage.get_stats()?) + self.storage.get_stats().map_err(|e| e.into()) } } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 6a22a80..7c73247 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -4,10 +4,10 @@ use actix::{ fut::{wrap_future, ActorFuture}, Actor, Addr, AsyncContext, Context, Handler, Message, }; -use background_jobs_core::{JobInfo, ProcessorMap}; +use background_jobs_core::{JobInfo, ProcessorMap, Storage}; use log::info; -use crate::{EitherJob, RequestJob, Server}; +use crate::{RequestJob, ReturningJob, Server}; pub struct ProcessJob { job: JobInfo, @@ -23,25 +23,27 @@ impl Message for ProcessJob { type Result = (); } -pub struct LocalWorker +pub struct LocalWorker where + S: Storage + 'static, State: Clone + 'static, { - id: usize, + id: u64, queue: String, processors: Arc>, - server: Addr>>, + server: Addr>>, } -impl LocalWorker +impl LocalWorker where + S: Storage + 'static, State: Clone + 'static, { pub fn new( - id: usize, + id: u64, queue: String, processors: Arc>, - server: Addr>, + server: Addr>, ) -> Self { LocalWorker { id, @@ -52,8 +54,9 @@ where } } -impl Actor for LocalWorker +impl Actor for LocalWorker where + S: Storage + 'static, State: Clone + 'static, { type Context = Context; @@ -64,8 +67,9 @@ where } } -impl Handler for LocalWorker +impl Handler for LocalWorker where + S: Storage + 'static, State: Clone + 'static, { type Result = (); @@ -74,7 +78,7 @@ where info!("Worker {} processing job {}", self.id, msg.job.id()); let fut = wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| { - actor.server.do_send(EitherJob::Existing(job)); + actor.server.do_send(ReturningJob(job)); actor .server .do_send(RequestJob::new(actor.id, &actor.queue, ctx.address())); diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 2aeda01..d60d0d4 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.4.1" +version = "0.5.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -13,8 +13,6 @@ chrono = { version = "0.4", features = ["serde"] } failure = "0.1" futures = "0.1.21" log = "0.4" -kv = { version = "0.7", features = ["json-value"] } -lmdb = "0.8" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 097fcfc..dc84fbe 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -22,7 +22,36 @@ use log::trace; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; -use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; +use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ReturnJobInfo { + pub(crate) id: u64, + pub(crate) result: JobResult, +} + +impl ReturnJobInfo { + pub(crate) fn fail(id: u64) -> Self { + ReturnJobInfo { + id, + result: JobResult::Failure, + } + } + + pub(crate) fn pass(id: u64) -> Self { + ReturnJobInfo { + id, + result: JobResult::Success, + } + } + + pub(crate) fn missing_processor(id: u64) -> Self { + ReturnJobInfo { + id, + result: JobResult::MissingProcessor, + } + } +} #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct NewJobInfo { @@ -67,7 +96,15 @@ impl NewJobInfo { } } - pub(crate) fn with_id(self, id: usize) -> JobInfo { + pub fn queue(&self) -> &str { + &self.queue + } + + pub fn is_ready(&self) -> bool { + self.next_queue.is_none() + } + + pub(crate) fn with_id(self, id: u64) -> JobInfo { JobInfo { id, processor: self.processor, @@ -92,7 +129,7 @@ impl NewJobInfo { /// new_job method. pub struct JobInfo { /// ID of the job - id: usize, + id: u64, /// Name of the processor that should handle this job processor: String, @@ -127,7 +164,7 @@ impl JobInfo { &self.queue } - pub(crate) fn updated(&mut self) { + fn updated(&mut self) { self.updated_at = Utc::now(); } @@ -139,20 +176,17 @@ impl JobInfo { self.args.clone() } - pub(crate) fn status(&self) -> JobStatus { - self.status.clone() - } - - pub fn id(&self) -> usize { + pub fn id(&self) -> u64 { self.id } pub(crate) fn increment(&mut self) -> ShouldStop { + self.updated(); self.retry_count += 1; self.max_retries.compare(self.retry_count) } - pub(crate) fn next_queue(&mut self) { + fn next_queue(&mut self) { let now = Utc::now(); let next_queue = match self.backoff_strategy { @@ -173,19 +207,15 @@ impl JobInfo { ); } - pub(crate) fn is_stale(&self) -> bool { - self.updated_at < Utc::now() - OldDuration::days(1) - } - - pub(crate) fn is_ready(&self, now: DateTime) -> bool { + pub fn is_ready(&self, now: DateTime) -> bool { match self.next_queue { Some(ref time) => now > *time, None => true, } } - pub fn needs_retry(&mut self) -> bool { - let should_retry = self.is_failed() && self.increment().should_requeue(); + pub(crate) fn needs_retry(&mut self) -> bool { + let should_retry = self.increment().should_requeue(); if should_retry { self.pending(); @@ -195,47 +225,21 @@ impl JobInfo { should_retry } - pub fn retry_ready(&self) -> bool { - self.is_ready(Utc::now()) - } - pub fn is_pending(&self) -> bool { self.status == JobStatus::Pending } - pub fn is_failed(&self) -> bool { - self.status == JobStatus::Failed - } - - pub fn is_finished(&self) -> bool { - self.status == JobStatus::Finished - } - pub(crate) fn is_in_queue(&self, queue: &str) -> bool { self.queue == queue } - pub(crate) fn stage(&mut self) { - self.status = JobStatus::Staged; - } - - /// This method sets the Job's status to running - /// - /// Touching this outside of the background_jobs crates is dangerous, since these libraries - /// rely on the state of the job being correct. - pub fn set_running(&mut self) { + pub(crate) fn run(&mut self) { + self.updated(); self.status = JobStatus::Running; } pub(crate) fn pending(&mut self) { + self.updated(); self.status = JobStatus::Pending; } - - pub(crate) fn fail(&mut self) { - self.status = JobStatus::Failed; - } - - pub(crate) fn pass(&mut self) { - self.status = JobStatus::Finished; - } } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index b59c1a3..ca282f7 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -24,14 +24,16 @@ mod job; mod job_info; mod processor; mod processor_map; +mod stats; mod storage; pub use crate::{ job::Job, - job_info::{JobInfo, NewJobInfo}, + job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, processor::Processor, processor_map::ProcessorMap, - storage::{JobStat, Stat, Stats, Storage}, + stats::{JobStat, Stats}, + storage::Storage, }; #[derive(Debug, Fail)] @@ -50,23 +52,65 @@ pub enum JobError { MissingProcessor, } +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +pub enum JobResult { + Success, + Failure, + MissingProcessor, +} + +impl JobResult { + pub fn success() -> Self { + JobResult::Success + } + + pub fn failure() -> Self { + JobResult::Failure + } + + pub fn missing_processor() -> Self { + JobResult::MissingProcessor + } + + pub fn is_failure(&self) -> bool { + *self == JobResult::Failure + } + + pub fn is_success(&self) -> bool { + *self == JobResult::Success + } + + pub fn is_missing_processor(&self) -> bool { + *self == JobResult::MissingProcessor + } +} + #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] /// Set the status of a job when storing it pub enum JobStatus { /// Job should be queued Pending, - /// Job has been dequeued, but is not yet running - Staged, - /// Job is running Running, +} - /// Job has failed - Failed, +impl JobStatus { + pub fn pending() -> Self { + JobStatus::Pending + } - /// Job has finished - Finished, + pub fn running() -> Self { + JobStatus::Running + } + + pub fn is_pending(&self) -> bool { + *self == JobStatus::Pending + } + + pub fn is_running(&self) -> bool { + *self == JobStatus::Running + } } #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index f50a477..85c5701 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -23,7 +23,7 @@ use futures::future::{Either, Future, IntoFuture}; use log::{error, info}; use serde_json::Value; -use crate::{JobError, JobInfo, Processor}; +use crate::{JobError, JobInfo, Processor, ReturnJobInfo}; /// A generic function that processes a job /// @@ -35,7 +35,6 @@ use crate::{JobError, JobInfo, Processor}; pub type ProcessFn = Box Box + Send> + Send>; - pub type StateFn = Box S + Send + Sync>; /// A type for storing the relationships between processor names and the processor itself @@ -87,7 +86,7 @@ where /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. - pub fn process_job(&self, job: JobInfo) -> impl Future { + pub fn process_job(&self, job: JobInfo) -> impl Future { let opt = self .inner .get(job.processor()) @@ -97,26 +96,28 @@ where Either::A(fut) } else { error!("Processor {} not present", job.processor()); - Either::B(Ok(job).into_future()) + Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future()) } } } -fn process(process_fn: &ProcessFn, state: S, mut job: JobInfo) -> impl Future { +fn process( + process_fn: &ProcessFn, + state: S, + job: JobInfo, +) -> impl Future { let args = job.args(); - + let id = job.id(); let processor = job.processor().to_owned(); process_fn(args, state).then(move |res| match res { Ok(_) => { - info!("Job {} completed, {}", job.id(), processor); - job.pass(); - Ok(job) + info!("Job {} completed, {}", id, processor); + Ok(ReturnJobInfo::pass(id)) } Err(e) => { - error!("Job {} errored, {}, {}", job.id(), processor, e); - job.fail(); - Ok(job) + error!("Job {} errored, {}, {}", id, processor, e); + Ok(ReturnJobInfo::fail(id)) } }) } diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs new file mode 100644 index 0000000..132a522 --- /dev/null +++ b/jobs-core/src/stats.rs @@ -0,0 +1,163 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +use chrono::{offset::Utc, DateTime, Datelike, Timelike}; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Stats { + pub pending: usize, + pub running: usize, + pub dead: JobStat, + pub complete: JobStat, +} + +impl Stats { + pub fn new() -> Self { + Self::default() + } + + pub(crate) fn new_job(mut self) -> Self { + self.pending += 1; + self + } + + pub(crate) fn run_job(mut self) -> Self { + if self.pending > 0 { + self.pending -= 1; + } + self.running += 1; + self + } + + pub(crate) fn retry_job(mut self) -> Self { + self.pending += 1; + if self.running > 0 { + self.running -= 1; + } + self + } + + pub(crate) fn fail_job(mut self) -> Self { + if self.running > 0 { + self.running -= 1; + } + self.dead.increment(); + self + } + + pub(crate) fn complete_job(mut self) -> Self { + if self.running > 0 { + self.running -= 1; + } + self.complete.increment(); + self + } +} + +impl Default for Stats { + fn default() -> Self { + Stats { + pending: 0, + running: 0, + dead: JobStat::default(), + complete: JobStat::default(), + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct JobStat { + this_hour: usize, + today: usize, + this_month: usize, + all_time: usize, + updated_at: DateTime, +} + +impl JobStat { + pub fn new() -> Self { + Self::default() + } + + fn increment(&mut self) { + self.tick(); + + self.this_hour += 1; + self.today += 1; + self.this_month += 1; + self.all_time += 1; + } + + fn tick(&mut self) { + let now = Utc::now(); + + if now.month() != self.updated_at.month() { + self.next_month(); + } else if now.day() != self.updated_at.day() { + self.next_day(); + } else if now.hour() != self.updated_at.hour() { + self.next_hour(); + } + + self.updated_at = now; + } + + fn next_hour(&mut self) { + self.this_hour = 0; + } + + fn next_day(&mut self) { + self.next_hour(); + self.today = 0; + } + + fn next_month(&mut self) { + self.next_day(); + self.this_month = 0; + } + + pub fn this_hour(&self) -> usize { + self.this_hour + } + + pub fn today(&self) -> usize { + self.today + } + + pub fn this_month(&self) -> usize { + self.this_month + } + + pub fn all_time(&self) -> usize { + self.all_time + } +} + +impl Default for JobStat { + fn default() -> Self { + JobStat { + this_hour: 0, + today: 0, + this_month: 0, + all_time: 0, + updated_at: Utc::now(), + } + } +} diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index a3b49cb..ecec79e 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -17,939 +17,122 @@ * along with Background Jobs. If not, see . */ -use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, - path::PathBuf, - str::Utf8Error, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, -}; - -use chrono::{offset::Utc, DateTime, Datelike, Timelike}; +use chrono::offset::Utc; use failure::Fail; -use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; -use lmdb::Error as LmdbError; -use log::{info, trace}; -use serde_derive::{Deserialize, Serialize}; +use log::error; -use crate::{JobInfo, JobStatus, NewJobInfo}; +use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; -struct Buckets<'a> { - queued: Bucket<'a, &'a [u8], ValueBuf>>, - running: Bucket<'a, &'a [u8], ValueBuf>>, - staged: Bucket<'a, &'a [u8], ValueBuf>>, - failed: Bucket<'a, &'a [u8], ValueBuf>>, - finished: Bucket<'a, &'a [u8], ValueBuf>>, - stats: Bucket<'a, &'a [u8], ValueBuf>>, -} +pub trait Storage: Clone + Send { + type Error: Fail; -impl<'a> Buckets<'a> { - fn new(store: &'a RwLockWriteGuard) -> Result { - let b = Buckets { - queued: store.bucket(Some(Storage::job_queue()))?, - running: store.bucket(Some(Storage::job_running()))?, - staged: store.bucket(Some(Storage::job_staged()))?, - failed: store.bucket(Some(Storage::job_failed()))?, - finished: store.bucket(Some(Storage::job_finished()))?, - stats: store.bucket(Some(Storage::stats_store()))?, - }; + /// This method generates unique IDs for jobs + fn generate_id(&mut self) -> Result; - Ok(b) - } + /// This method should store the supplied job + /// + /// The supplied job _may already be present_. The implementation should overwrite the stored + /// job with the new job so that future calls to `fetch_job` return the new one. + fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error>; - fn new_readonly(store: &'a RwLockReadGuard) -> Result { - let b = Buckets { - queued: store.bucket(Some(Storage::job_queue()))?, - running: store.bucket(Some(Storage::job_running()))?, - staged: store.bucket(Some(Storage::job_staged()))?, - failed: store.bucket(Some(Storage::job_failed()))?, - finished: store.bucket(Some(Storage::job_finished()))?, - stats: store.bucket(Some(Storage::stats_store()))?, - }; + /// This method should return the job with the given ID regardless of what state the job is in. + fn fetch_job(&mut self, id: u64) -> Result, Self::Error>; - Ok(b) - } -} + /// This should fetch a job ready to be processed from the queue + /// + /// If a job is not ready, is currently running, or is not in the requested queue, this method + /// should not return it. If no jobs meet these criteria, this method should return Ok(None) + fn fetch_job_from_queue(&mut self, queue: &str) -> Result, Self::Error>; -#[derive(Clone)] -/// All the logic to interact with the persisted data is defined on this type. -/// -/// Perhapse in the future this will be made generic, but for now it is hard-coded to use LMDB to -/// store job information. -/// -/// None of the methods in this module are intended to be used outside of a background-jobs -/// runtime. -pub struct Storage { - store: Arc>, -} + /// This method tells the storage mechanism to mark the given job as being in the provided + /// queue + fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error>; -impl Storage { - pub fn new(store: Arc>) -> Self { - Storage { store } - } + /// This method tells the storage mechanism to mark a given job as running + fn run_job(&mut self, id: u64, runner_id: u64) -> Result<(), Self::Error>; - pub fn init(path: PathBuf) -> Result { - let mut manager = Manager::new(); - let mut cfg = Config::default(path); + /// This method tells the storage mechanism to remove the job + /// + /// This happens when a job has been completed or has failed too many times + fn delete_job(&mut self, id: u64) -> Result<(), Self::Error>; - cfg.set_max_readers(18); + /// This method returns the current statistics, or Stats::default() if none exists. + fn get_stats(&self) -> Result; - // Create our buckets - for bucket in Storage::buckets().iter() { - cfg.bucket(bucket, None); - } - - let handle = manager.open(cfg)?; - - Ok(Storage::new(handle)) - } - - pub fn get_new_id(&self, runner_id: usize) -> Result { - let store = self.store.write()?; - - let bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::id_store()))?; - - let mut txn = store.write_txn()?; - - let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", runner_id, |txn| { - let id = match txn.get(&bucket, b"current-id") { - Ok(id) => id.inner()?.to_serde(), - Err(e) => match e { - Error::NotFound => 0, - _ => return Err(e), - }, - }; - - let new_id = id + 1; - - let new_id_value = Json::to_value_buf(new_id)?; - txn.set(&bucket, b"current-id", new_id_value)?; - - Ok(new_id) - })?; - - txn.commit()?; - - Ok(new_id) - } - - pub fn requeue_staged_jobs(&self, runner_id: usize) -> Result<(), Error> { - let store = self.store.write()?; - let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; - - let lock_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; - - let buckets = Buckets::new(&store)?; - - let mut write_txn = store.write_txn()?; - let read_txn = store.read_txn()?; - - self.with_lock::<_, (), _>( - &lock_bucket, - &mut write_txn, - b"job-queue", - runner_id, - |inner_txn| { - let mut cursor = read_txn.read_cursor(&buckets.staged)?; - match cursor.get(None, CursorOp::First) { - Ok(_) => (), - Err(e) => match e { - Error::NotFound => { - return Ok(()); - } - e => { - return Err(e); - } - }, - } - - let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; - - let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { - acc.and_then(|inner_txn| { - let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.queue_job(&buckets, inner_txn, key, runner_id)?; - - Ok(inner_txn) - }) - })?; - - Ok(()) - }, - )?; - - read_txn.commit()?; - write_txn.commit()?; - - Ok(()) - } - - pub fn check_stalled_jobs(&self, runner_id: usize) -> Result<(), Error> { - let store = self.store.write()?; - let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; - - let lock_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; - - let buckets = Buckets::new(&store)?; - - let mut write_txn = store.write_txn()?; - let read_txn = store.read_txn()?; - - self.with_lock::<_, (), _>( - &lock_bucket, - &mut write_txn, - b"job-queue", - runner_id, - |inner_txn| { - let mut cursor = read_txn.read_cursor(&buckets.running)?; - match cursor.get(None, CursorOp::First) { - Ok(_) => (), - Err(e) => match e { - Error::NotFound => { - return Ok(()); - } - e => { - return Err(e); - } - }, - } - - let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; - - let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { - acc.and_then(|inner_txn| { - let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - - if job.is_stale() { - if job.increment().should_requeue() { - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.queue_job(&buckets, inner_txn, key, runner_id)?; - } else { - self.fail_job(&buckets, &job_bucket, inner_txn, key, runner_id)?; - } - } - - Ok(inner_txn) - }) - })?; - - Ok(()) - }, - )?; - - read_txn.commit()?; - write_txn.commit()?; - - Ok(()) - } - - pub fn stage_jobs( - &self, - limit: usize, - queue: &str, - runner_id: usize, - ) -> Result, Error> { - let store = self.store.write()?; - - trace!("Got store"); - - let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; - - let lock_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; - - let buckets = Buckets::new(&store)?; - - trace!("got buckets"); - - let mut txn = store.write_txn()?; - let read_txn = store.read_txn()?; - - let result = self.with_lock::<_, Vec, _>( - &lock_bucket, - &mut txn, - b"job-queue", - runner_id, - |inner_txn| { - trace!("Got lock"); - - let mut jobs = HashMap::new(); - let mut cursor = read_txn.read_cursor(&buckets.queued)?; - let now = Utc::now(); - - trace!("Got cursor"); - match cursor.get(None, CursorOp::First) { - Ok((maybe_key, _)) => { - if let Some(key) = maybe_key { - match inner_txn.get(&job_bucket, &key) { - Ok(job) => { - let mut job = job.inner()?.to_serde(); - if job.is_ready(now) && job.is_in_queue(queue) { - job.stage(); - self.stage_job(&buckets, inner_txn, key, runner_id)?; - jobs.insert(job.id(), job); - } - } - Err(e) => match e { - Error::NotFound => (), - err => return Err(err), - }, - } - } - } - Err(e) => match e { - Error::NotFound => { - trace!("No items in queue"); - return Ok(vec![]); - } - e => { - return Err(e); - } - }, - } - trace!("Set cursor to first"); - - let initial_value = - Ok((inner_txn, jobs)) as Result<(&mut Txn, HashMap), Error>; - - let (_inner_txn, mut hm) = cursor.iter().fold(initial_value, |acc, (key, _)| { - acc.and_then(|(inner_txn, mut jobs)| { - if jobs.len() < limit { - let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - - if job.is_ready(now) && job.is_in_queue(queue) { - job.stage(); - self.stage_job(&buckets, inner_txn, key, runner_id)?; - - jobs.insert(job.id(), job); - } - } - - Ok((inner_txn, jobs)) - }) - })?; - - Ok(hm.drain().map(|(_, v)| v).collect()) - }, - )?; - - trace!("Committing"); - - read_txn.commit()?; - - txn.commit()?; - - trace!("Committed"); - - Ok(result) - } - - pub fn assign_id(&self, job: NewJobInfo, runner_id: usize) -> Result { - let id = self.get_new_id(runner_id)?; - let job = job.with_id(id); - trace!("Generaged job id, {}", job.id()); - Ok(job) - } - - pub fn store_job(&self, mut job: JobInfo, runner_id: usize) -> Result<(), Error> { - let job_id = job.id().to_string(); - job.updated(); - - job.needs_retry(); - - let status = job.status(); - let job_value = Json::to_value_buf(job)?; - - trace!("Storing job"); - - let store = self.store.write()?; - trace!("Got store"); - let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; - trace!("Got bucket"); - - let buckets = Buckets::new(&store)?; - - let mut txn = store.write_txn()?; - trace!("Opened write txn"); - txn.set(&job_bucket, job_id.to_string().as_ref(), job_value)?; - trace!("Set value"); - - match status { - JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, - JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, - JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, - JobStatus::Failed => { - self.fail_job(&buckets, &job_bucket, &mut txn, job_id.as_ref(), runner_id)? - } - JobStatus::Finished => { - self.finish_job(&buckets, &job_bucket, &mut txn, job_id.as_ref(), runner_id)? - } - } - - trace!("Committing"); - - txn.commit()?; - trace!("Committed"); - - Ok(()) - } - - pub fn get_port_mapping( - &self, - base_port: usize, - queues: BTreeSet, - runner_id: usize, - ) -> Result, PortMapError> { - let store = self.store.write().map_err(|e| Error::from(e))?; - - let queue_port_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::queue_port()))?; - - let read_txn = store.read_txn()?; - let mut write_txn = store.write_txn()?; - - let lock_name = "lock-queue"; - - let queue_map = self.with_lock::<_, _, PortMapError>( - &queue_port_bucket, - &mut write_txn, - lock_name.as_ref(), - runner_id, - |write_txn| { - let mut cursor = read_txn.read_cursor(&queue_port_bucket)?; - - let (unused_queues, queue_map) = cursor.iter().fold( - Ok((queues.clone(), BTreeMap::new())), - |acc: Result<_, PortMapError>, (queue, port)| { - acc.and_then(move |(mut queues, mut map)| { - let port: usize = port.inner()?.to_serde(); - let queue = std::str::from_utf8(queue)?.to_owned(); - - if queue != lock_name { - queues.remove(&queue); - map.insert(queue, port); - } - - Ok((queues, map)) - }) - }, - )?; - - // The starting port for new queues should be one greater than the maximum port - // number in the btree set, or 2 greater than the base port. - // - // This is because there need to be two admin ports before the queue ports begin. - let start_port = queue_map - .iter() - .map(|(_, v)| *v) - .filter(|v| *v != 0) - .max() - .unwrap_or(base_port + 1) - + 1; - - let (_, queue_map, _) = unused_queues.into_iter().fold( - Ok((write_txn, queue_map, start_port)), - |acc: Result<_, PortMapError>, queue_name| { - acc.and_then(|(write_txn, mut queue_map, port_num)| { - let port = Json::to_value_buf(port_num)?; - - write_txn.set(&queue_port_bucket, queue_name.as_ref(), port)?; - queue_map.insert(queue_name, port_num); - - Ok((write_txn, queue_map, port_num + 1)) - }) - }, - )?; - - Ok(queue_map) - }, - )?; - - read_txn.commit()?; - write_txn.commit()?; - - Ok(queue_map) - } - - fn stage_job<'env>( - &self, - buckets: &'env Buckets<'env>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.staged, txn, id, runner_id)?; - self.delete_job_from(&buckets.finished, txn, id)?; - self.delete_job_from(&buckets.failed, txn, id)?; - self.delete_job_from(&buckets.running, txn, id)?; - self.delete_job_from(&buckets.queued, txn, id)?; - - Ok(()) - } - - fn queue_job<'env>( - &self, - buckets: &'env Buckets<'env>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.queued, txn, id, runner_id)?; - self.delete_job_from(&buckets.finished, txn, id)?; - self.delete_job_from(&buckets.failed, txn, id)?; - self.delete_job_from(&buckets.running, txn, id)?; - self.delete_job_from(&buckets.staged, txn, id)?; - - Ok(()) - } - - fn run_job<'env>( - &self, - buckets: &'env Buckets<'env>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.running, txn, id, runner_id)?; - self.delete_job_from(&buckets.staged, txn, id)?; - self.delete_job_from(&buckets.finished, txn, id)?; - self.delete_job_from(&buckets.failed, txn, id)?; - self.delete_job_from(&buckets.queued, txn, id)?; - - Ok(()) - } - - fn fail_job<'env>( - &self, - buckets: &'env Buckets<'env>, - job_store: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.failed, txn, id, runner_id)?; - self.delete_job_from(&buckets.finished, txn, id)?; - self.delete_job_from(&buckets.running, txn, id)?; - self.delete_job_from(&buckets.staged, txn, id)?; - self.delete_job_from(&buckets.queued, txn, id)?; - txn.del(job_store, id)?; - - Stat::get_dead(&buckets.stats, txn)? - .fail_job() - .save(&buckets.stats, txn)?; - - Ok(()) - } - - fn finish_job<'env>( - &self, - buckets: &'env Buckets<'env>, - job_store: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.finished, txn, id, runner_id)?; - self.delete_job_from(&buckets.running, txn, id)?; - self.delete_job_from(&buckets.staged, txn, id)?; - self.delete_job_from(&buckets.failed, txn, id)?; - self.delete_job_from(&buckets.queued, txn, id)?; - txn.del(job_store, id)?; - - Stat::get_finished(&buckets.stats, txn)? - .finish_job() - .save(&buckets.stats, txn)?; - - Ok(()) - } - - fn add_job_to<'env>( - &self, - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - txn.set(bucket, id, Json::to_value_buf(runner_id)?)?; - trace!("Set value"); - - Ok(()) - } - - fn delete_job_from<'env>( - &self, - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - id: &[u8], - ) -> Result<(), Error> { - match txn.del(bucket, id) { - Ok(_) => (), - Err(e) => match e { - Error::NotFound => (), - e => return Err(e), - }, - } - trace!("Deleted value"); - - Ok(()) - } - - pub fn get_stats(&self) -> Result { - let store = self.store.read()?; - let buckets = Buckets::new_readonly(&store)?; - - let mut txn = store.read_txn()?; - - let stats = { - let dead = Stat::get_dead(&buckets.stats, &mut txn)?.inner_stat(); - let complete = Stat::get_finished(&buckets.stats, &mut txn)?.inner_stat(); - - let mut queued_cursor = txn.read_cursor(&buckets.queued)?; - let mut staged_cursor = txn.read_cursor(&buckets.staged)?; - - let pending = queued_cursor.iter().count() + staged_cursor.iter().count(); - - let mut running_cursor = txn.read_cursor(&buckets.running)?; - let running = running_cursor.iter().count(); - - Stats { - dead, - complete, - pending, - running, - } - }; - - txn.commit()?; - Ok(stats) - } - - // In all likelihood, this function is not necessary - // - // But in the event of multiple processes running on the same machine, it is good to have some - // way to make sure they don't step on eachother's toes - fn with_lock<'env, F, T, E>( - &self, - lock_bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - lock_key: &[u8], - runner_id: usize, - callback: F, - ) -> Result + /// This method fetches the existing statistics or Stats::default(), and stores the result of + /// calling `update_stats` on it. + fn update_stats(&mut self, f: F) -> Result<(), Self::Error> where - F: Fn(&mut Txn<'env>) -> Result, - E: From, - { - let mut other_runner_id = 10; + F: Fn(Stats) -> Stats; - loop { - let lock_value = Json::to_value_buf(runner_id)?; + fn new_job(&mut self, job: NewJobInfo) -> Result { + let id = self.generate_id()?; - let mut inner_txn = txn.txn()?; - let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value); - inner_txn.commit()?; + let job = job.with_id(id); - match res { - Ok(_) => break, - Err(e) => { - let inner_txn = txn.txn()?; - let res = inner_txn.get(lock_bucket, lock_key); - inner_txn.commit()?; + let queue = job.queue().to_owned(); + self.save_job(job)?; + self.queue_job(&queue, id)?; + self.update_stats(Stats::new_job)?; - match res { - Ok(other_id) => { - let other_id = other_id.inner()?.to_serde(); + Ok(id) + } - if other_runner_id != other_id { - other_runner_id = other_id; - info!("Lock held by runner {}", other_id); - } - } - Err(e) => match e { - Error::NotFound => continue, - e => return Err(e.into()), - }, - } + fn request_job( + &mut self, + queue: &str, + runner_id: u64, + ) -> Result, Self::Error> { + match self.fetch_job_from_queue(queue)? { + Some(mut job) => { + if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) { + job.run(); + self.run_job(job.id(), runner_id)?; + self.save_job(job.clone())?; + self.update_stats(Stats::run_job)?; - match e { - Error::LMDB(lmdb) => match lmdb { - LmdbError::KeyExist => continue, - e => return Err(Error::LMDB(e).into()), - }, - e => return Err(e.into()), - } + Ok(Some(job)) + } else { + error!( + "Not fetching job {}, it is not ready for processing", + job.id() + ); + Ok(None) } } - } - - let item = callback(txn)?; - - txn.del(lock_bucket, lock_key)?; - - Ok(item) - } - - fn buckets() -> [&'static str; 10] { - [ - Storage::id_store(), - Storage::job_store(), - Storage::job_queue(), - Storage::job_failed(), - Storage::job_running(), - Storage::job_staged(), - Storage::job_lock(), - Storage::job_finished(), - Storage::queue_port(), - Storage::stats_store(), - ] - } - - fn id_store() -> &'static str { - "id-store" - } - - fn job_store() -> &'static str { - "job-store" - } - - fn job_queue() -> &'static str { - "job-queue" - } - - fn job_failed() -> &'static str { - "job-failed" - } - - fn job_running() -> &'static str { - "job-running" - } - - fn job_staged() -> &'static str { - "job-staged" - } - - fn job_finished() -> &'static str { - "job-finished" - } - - fn job_lock() -> &'static str { - "job-lock" - } - - fn queue_port() -> &'static str { - "queue-port" - } - - fn stats_store() -> &'static str { - "stats-store" - } -} - -#[derive(Debug, Fail)] -pub enum PortMapError { - #[fail(display = "Error in KV, {}", _0)] - Kv(#[cause] Error), - - #[fail(display = "Error parsing to Utf8, {}", _0)] - Utf8(#[cause] Utf8Error), -} - -impl From for PortMapError { - fn from(e: Error) -> Self { - PortMapError::Kv(e) - } -} - -impl From for PortMapError { - fn from(e: Utf8Error) -> Self { - PortMapError::Utf8(e) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Stats { - pub pending: usize, - pub running: usize, - pub dead: JobStat, - pub complete: JobStat, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum Stat { - DeadJobs(JobStat), - CompletedJobs(JobStat), -} - -impl Stat { - fn get_finished<'env>( - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - ) -> Result { - Self::get(bucket, txn, Self::completed_jobs()).map(|opt| match opt { - Some(stat) => stat, - None => Stat::CompletedJobs(JobStat::new()), - }) - } - - fn get_dead<'env>( - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - ) -> Result { - Self::get(bucket, txn, Self::dead_jobs()).map(|opt| match opt { - Some(stat) => stat, - None => Stat::DeadJobs(JobStat::new()), - }) - } - - fn get<'env>( - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - key: &str, - ) -> Result, Error> { - match txn.get(bucket, key.as_ref()) { - Ok(stat) => Ok(Some(stat.inner()?.to_serde())), - Err(e) => match e { - Error::NotFound => Ok(None), - err => return Err(err), - }, + None => Ok(None), } } - fn name(&self) -> &str { - match *self { - Stat::DeadJobs(_) => Stat::dead_jobs(), - Stat::CompletedJobs(_) => Stat::completed_jobs(), - } - } - - fn finish_job(self) -> Self { - match self { - Stat::CompletedJobs(mut job_stat) => { - job_stat.increment(); - Stat::CompletedJobs(job_stat) + fn return_job( + &mut self, + ReturnJobInfo { id, result }: ReturnJobInfo, + ) -> Result<(), Self::Error> { + if result.is_failure() { + if let Some(mut job) = self.fetch_job(id)? { + if job.needs_retry() { + self.queue_job(job.queue(), id)?; + self.save_job(job)?; + self.update_stats(Stats::retry_job) + } else { + self.delete_job(id)?; + self.update_stats(Stats::fail_job) + } + } else { + Ok(()) } - other => other, - } - } - - fn fail_job(self) -> Self { - match self { - Stat::DeadJobs(mut job_stat) => { - job_stat.increment(); - Stat::DeadJobs(job_stat) + } else if result.is_missing_processor() { + if let Some(mut job) = self.fetch_job(id)? { + job.pending(); + self.queue_job(job.queue(), id)?; + self.save_job(job)?; + self.update_stats(Stats::retry_job) + } else { + Ok(()) } - other => other, + } else { + self.delete_job(id)?; + self.update_stats(Stats::complete_job) } } - - fn inner_stat(self) -> JobStat { - let mut job_stat = match self { - Stat::DeadJobs(job_stat) => job_stat, - Stat::CompletedJobs(job_stat) => job_stat, - }; - - job_stat.tick(); - job_stat - } - - fn dead_jobs() -> &'static str { - "DeadJobs" - } - - fn completed_jobs() -> &'static str { - "CompletedJobs" - } - - fn save<'env>( - self, - bucket: &'env Bucket<&[u8], ValueBuf>>, - txn: &mut Txn<'env>, - ) -> Result<(), Error> { - let name = self.name().to_owned(); - txn.set(bucket, name.as_ref(), Json::to_value_buf(self)?)?; - Ok(()) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct JobStat { - this_hour: usize, - today: usize, - this_month: usize, - all_time: usize, - updated_at: DateTime, -} - -impl JobStat { - fn new() -> Self { - JobStat { - this_hour: 0, - today: 0, - this_month: 0, - all_time: 0, - updated_at: Utc::now(), - } - } - - fn increment(&mut self) { - self.tick(); - - self.this_hour += 1; - self.today += 1; - self.this_month += 1; - self.all_time += 1; - } - - fn tick(&mut self) { - let now = Utc::now(); - - if now.month() != self.updated_at.month() { - self.next_month(); - } else if now.day() != self.updated_at.day() { - self.next_day(); - } else if now.hour() != self.updated_at.hour() { - self.next_hour(); - } - - self.updated_at = now; - } - - fn next_hour(&mut self) { - self.this_hour = 0; - } - - fn next_day(&mut self) { - self.next_hour(); - self.today = 0; - } - - fn next_month(&mut self) { - self.next_day(); - self.this_month = 0; - } - - pub fn this_hour(&self) -> usize { - self.this_hour - } - - pub fn today(&self) -> usize { - self.today - } - - pub fn this_month(&self) -> usize { - self.this_month - } - - pub fn all_time(&self) -> usize { - self.all_time - } } diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml new file mode 100644 index 0000000..c805bc1 --- /dev/null +++ b/jobs-sled/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "background-jobs-sled-storage" +description = "Sled storage backend for background-jobs" +version = "0.1.0" +license = "GPL-3.0" +authors = ["asonix "] +repository = "https://git.asonix.dog/asonix/background-jobs" +readme = "README.md" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +background-jobs-core = { version = "0.5", path = "../jobs-core" } +failure = "0.1" +sled = "0.24" +serde = "1.0" +serde_json = "1.0" diff --git a/jobs-sled/README.md b/jobs-sled/README.md new file mode 100644 index 0000000..2eb5010 --- /dev/null +++ b/jobs-sled/README.md @@ -0,0 +1,14 @@ +# Jobs Sled +_a Sled storage backend for background-jobs_ + +This is the default storage backend for the Background Jobs library based on [Sled](https://github.com/spacejam/sled). It also servers as a reference implementation for storage backends. + +### License + +Copyright © 2018 Riley Trautman + +Background Jobs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. + +Background Jobs is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. This file is part of Background Jobs. + +You should have received a copy of the GNU General Public License along with Background Jobs. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/). diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs new file mode 100644 index 0000000..e121781 --- /dev/null +++ b/jobs-sled/src/lib.rs @@ -0,0 +1,259 @@ +use background_jobs_core::{JobInfo, Storage, Stats}; +use failure::Fail; +use std::{marker::PhantomData, sync::Arc}; + +#[derive(Clone)] +pub struct SledStorage { + jobinfo: Tree, + running: Tree, + running_inverse: Tree, + queue: Tree, + stats: Tree, + db: sled::Db, +} + +impl Storage for SledStorage { + type Error = Error; + + fn generate_id(&mut self) -> Result { + self.db.generate_id().map_err(Error::from) + } + + fn save_job(&mut self, job: JobInfo) -> Result<()> { + self.jobinfo.set(&job_key(job.id()), job).map(|_| ()) + } + + fn fetch_job(&mut self, id: u64) -> Result> { + self.jobinfo.get(&job_key(id)) + } + + fn fetch_job_from_queue(&mut self, queue: &str) -> Result> { + let job = self + .queue + .iter() + .filter_map(|res| res.ok()) + .filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None }) + .filter_map(|id| self.jobinfo.get(id).ok()) + .filter_map(|opt| opt) + .next(); + + Ok(job) + } + + fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> { + if let Some(runner_id) = self.running_inverse.del(&job_key(id))? { + self.running.del(&runner_key(runner_id))?; + } + + self.queue.set(&job_key(id), queue.to_owned()).map(|_| ()) + } + + fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> { + self.queue.del(&job_key(id))?; + self.running.set(&runner_key(runner_id), id)?; + self.running_inverse.set(&job_key(id), runner_id)?; + + Ok(()) + } + + fn delete_job(&mut self, id: u64) -> Result<()> { + self.jobinfo.del(&job_key(id))?; + self.queue.del(&job_key(id))?; + + if let Some(runner_id) = self.running_inverse.del(&job_key(id))? { + self.running.del(&runner_key(runner_id))?; + } + + Ok(()) + } + + fn get_stats(&self) -> Result { + Ok(self.stats.get("stats")?.unwrap_or(Stats::default())) + } + + fn update_stats(&mut self, f: F) -> Result<()> + where + F: Fn(Stats) -> Stats, + { + self.stats.fetch_and_update("stats", |opt| { + let stats = match opt { + Some(stats) => stats, + None => Stats::default(), + }; + + Some((f)(stats)) + })?; + + Ok(()) + } +} + +fn job_key(id: u64) -> String { + format!("job-{}", id) +} + +fn runner_key(runner_id: u64) -> String { + format!("runner-{}", runner_id) +} + +impl SledStorage { + pub fn new(db: sled::Db) -> Result { + Ok(SledStorage { + jobinfo: open_tree(&db, "background-jobs-jobinfo")?, + running: open_tree(&db, "background-jobs-running")?, + running_inverse: open_tree(&db, "background-jobs-running-inverse")?, + queue: open_tree(&db, "background-jobs-queue")?, + stats: open_tree(&db, "background-jobs-stats")?, + db, + }) + } +} + +fn open_tree(db: &sled::Db, name: &str) -> sled::Result> +where + T: serde::de::DeserializeOwned + serde::ser::Serialize, +{ + db.open_tree(name).map(Tree::new) +} + + +#[derive(Clone)] +struct Tree(Arc, PhantomData); + +impl Tree +where + T: serde::de::DeserializeOwned + serde::ser::Serialize, +{ + fn new(t: Arc) -> Self { + Tree(t, PhantomData) + } + + fn iter(&self) -> Iter { + Iter::new(self.0.iter()) + } + + fn get(&self, key: K) -> Result> + where + K: AsRef<[u8]> + { + match self.0.get(key)? { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } + + fn set(&self, key: &str, value: T) -> Result> { + let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?; + + Ok(self.0.set(key, vec)?.map(move |_| value)) + } + + fn del(&self, key: &str) -> Result> { + match self.0.del(key)? { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } + + fn fetch_and_update(&self, key: &str, f: F) -> Result> + where + F: Fn(Option) -> Option, + { + let final_opt = self.0.fetch_and_update(key, |opt| { + let new_opt = match opt { + Some(vec) => { + let t = serde_json::from_slice(&vec) + .map(Some) + .unwrap_or(None); + + (f)(t) + }, + None => (f)(None), + }; + + match new_opt { + Some(t) => serde_json::to_vec(&t) + .map(Some) + .unwrap_or(None), + None => None, + } + })?; + + match final_opt { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } +} + +struct Iter<'a, T>(sled::Iter<'a>, PhantomData); + +impl<'a, T> Iter<'a, T> { + fn new(i: sled::Iter<'a>) -> Self { + Iter(i, PhantomData) + } +} + +#[derive(Clone, Debug, Fail)] +pub enum Error { + #[fail(display = "Error in database: {}", _0)] + Sled(#[cause] sled::Error), + + #[fail(display = "Failed to deserialize data")] + Deserialize, + + #[fail(display = "Failed to serialize data")] + Serialize, +} + +type Result = std::result::Result; + +impl<'a, T> Iterator for Iter<'a, T> +where + T: serde::de::DeserializeOwned +{ + type Item = Result<(Vec, T)>; + + fn next(&mut self) -> Option { + self.0.next().map(|res| { + res.map_err(Error::from).and_then(|(k, v)| { + serde_json::from_slice(&v) + .map(|item| (k, item)) + .map_err(|_| Error::Deserialize) + }) + }) + } +} + +impl<'a, T> DoubleEndedIterator for Iter<'a, T> +where + T: serde::de::DeserializeOwned +{ + fn next_back(&mut self) -> Option { + self.0.next_back().map(|res| { + res.map_err(Error::from).and_then(|(k, v)| { + serde_json::from_slice(&v) + .map(|item| (k, item)) + .map_err(|_| Error::Deserialize) + }) + }) + } +} + +impl From for Error { + fn from(e: sled::Error) -> Self { + Error::Sled(e) + } +} diff --git a/src/lib.rs b/src/lib.rs index 0a80e65..5ddde10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -273,7 +273,10 @@ //! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some //! other useful types for implementing a jobs processor. -pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats}; +pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats}; #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig}; + +#[cfg(feature = "background-jobs-sled-storage")] +pub use background_jobs_sled_storage::{SledStorage, Error as SledStorageError};