Revamp background-jobs with pluggable backends

This commit is contained in:
asonix 2019-05-25 15:22:26 -05:00
parent edd63abf0f
commit f2a694879c
16 changed files with 806 additions and 1167 deletions

View file

@ -1,28 +1,34 @@
[package]
name = "background-jobs"
description = "Background Jobs implemented with tokio and futures"
description = "Background Jobs implemented with sled, actix, and futures"
version = "0.5.0"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
readme = "README.md"
keywords = ["jobs", "processor"]
keywords = ["jobs", "processor", "actix", "sled"]
edition = "2018"
[workspace]
members = [
"jobs-actix",
"jobs-core",
"jobs-sled",
]
[features]
default = ["background-jobs-actix"]
default = ["background-jobs-actix", "background-jobs-sled-storage"]
[dependencies.background-jobs-core]
version = "0.4"
version = "0.5"
path = "jobs-core"
[dependencies.background-jobs-actix]
version = "0.5"
path = "jobs-actix"
optional = true
[dependencies.background-jobs-sled-storage]
version = "0.1"
path = "jobs-sled"
optional = true

View file

@ -10,7 +10,7 @@ edition = "2018"
[dependencies]
actix = "0.8"
background-jobs-core = { version = "0.4", path = "../jobs-core" }
background-jobs-core = { version = "0.5", path = "../jobs-core" }
chrono = "0.4"
failure = "0.1"
futures = "0.1"

View file

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};
use actix::{Actor, Addr, SyncArbiter};
use background_jobs_core::{Processor, ProcessorMap, Stats, Storage};
@ -12,29 +12,29 @@ pub use self::{server::Server, worker::LocalWorker};
use self::{
pinger::Pinger,
server::{CheckDb, EitherJob, GetStats, RequestJob},
server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob},
worker::ProcessJob,
};
pub struct ServerConfig {
server_id: usize,
db_path: PathBuf,
pub struct ServerConfig<S> {
storage: S,
}
impl ServerConfig {
pub fn new(server_id: usize, db_path: PathBuf) -> Self {
ServerConfig { server_id, db_path }
impl<S> ServerConfig<S>
where
S: Storage + Sync + 'static,
{
pub fn new(storage: S) -> Self {
ServerConfig { storage }
}
pub fn start<S>(self) -> QueueHandle<S>
pub fn start<State>(self) -> QueueHandle<S, State>
where
S: Clone + 'static,
State: Clone + 'static,
{
let ServerConfig { server_id, db_path } = self;
let ServerConfig { storage } = self;
let server = SyncArbiter::start(1, move || {
Server::new(server_id, Storage::init(db_path.clone()).unwrap())
});
let server = SyncArbiter::start(4, move || Server::new(storage.clone()));
Pinger::new(server.clone()).start();
@ -42,19 +42,19 @@ impl ServerConfig {
}
}
pub struct WorkerConfig<S>
pub struct WorkerConfig<State>
where
S: Clone + 'static,
State: Clone + 'static,
{
processors: ProcessorMap<S>,
queues: BTreeMap<String, usize>,
processors: ProcessorMap<State>,
queues: BTreeMap<String, u64>,
}
impl<S> WorkerConfig<S>
impl<State> WorkerConfig<State>
where
S: Clone + 'static,
State: Clone + 'static,
{
pub fn new(state_fn: impl Fn() -> S + Send + Sync + 'static) -> Self {
pub fn new(state_fn: impl Fn() -> State + Send + Sync + 'static) -> Self {
WorkerConfig {
processors: ProcessorMap::new(Box::new(state_fn)),
queues: BTreeMap::new(),
@ -63,17 +63,20 @@ where
pub fn register<P>(&mut self, processor: P)
where
P: Processor<S> + Send + 'static,
P: Processor<State> + Send + 'static,
{
self.queues.insert(P::QUEUE.to_owned(), 4);
self.processors.register_processor(processor);
}
pub fn set_processor_count(&mut self, queue: &str, count: usize) {
pub fn set_processor_count(&mut self, queue: &str, count: u64) {
self.queues.insert(queue.to_owned(), count);
}
pub fn start(self, queue_handle: QueueHandle<S>) {
pub fn start<S>(self, queue_handle: QueueHandle<S, State>)
where
S: Storage + 'static,
{
let processors = Arc::new(self.processors);
self.queues.into_iter().fold(0, |acc, (key, count)| {
@ -93,22 +96,24 @@ where
}
#[derive(Clone)]
pub struct QueueHandle<S>
pub struct QueueHandle<S, State>
where
S: Clone + 'static,
S: Storage + 'static,
State: Clone + 'static,
{
inner: Addr<Server<LocalWorker<S>>>,
inner: Addr<Server<S, LocalWorker<S, State>>>,
}
impl<S> QueueHandle<S>
impl<S, State> QueueHandle<S, State>
where
S: Clone + 'static,
S: Storage + 'static,
State: Clone + 'static,
{
pub fn queue<P>(&self, job: P::Job) -> Result<(), Error>
where
P: Processor<S>,
P: Processor<State>,
{
self.inner.do_send(EitherJob::New(P::new_job(job)?));
self.inner.do_send(NewJob(P::new_job(job)?));
Ok(())
}

View file

@ -1,29 +1,33 @@
use std::time::Duration;
use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext};
use background_jobs_core::Storage;
use crate::{CheckDb, ProcessJob, Server};
pub struct Pinger<W>
pub struct Pinger<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
{
server: Addr<Server<W>>,
server: Addr<Server<S, W>>,
}
impl<W> Pinger<W>
impl<S, W> Pinger<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
{
pub fn new(server: Addr<Server<W>>) -> Self {
pub fn new(server: Addr<Server<S, W>>) -> Self {
Pinger { server }
}
}
impl<W> Actor for Pinger<W>
impl<S, W> Actor for Pinger<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
Server<W>: Actor<Context = SyncContext<Server<W>>> + Handler<CheckDb>,
Server<S, W>: Actor<Context = SyncContext<Server<S, W>>> + Handler<CheckDb>,
{
type Context = Context<Self>;

View file

@ -1,20 +1,24 @@
use std::collections::{HashMap, VecDeque};
use actix::{Actor, Addr, Context, Handler, Message, SyncContext};
use background_jobs_core::{JobInfo, NewJobInfo, Stats, Storage};
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use failure::Error;
use log::{debug, trace};
use log::trace;
use serde_derive::Deserialize;
use crate::ProcessJob;
#[derive(Clone, Debug, Deserialize)]
pub enum EitherJob {
New(NewJobInfo),
Existing(JobInfo),
pub struct NewJob(pub(crate) NewJobInfo);
#[derive(Clone, Debug, Deserialize)]
pub struct ReturningJob(pub(crate) ReturnJobInfo);
impl Message for NewJob {
type Result = Result<(), Error>;
}
impl Message for EitherJob {
impl Message for ReturningJob {
type Result = Result<(), Error>;
}
@ -22,7 +26,7 @@ pub struct RequestJob<W>
where
W: Actor + Handler<ProcessJob>,
{
worker_id: usize,
worker_id: u64,
queue: String,
addr: Addr<W>,
}
@ -31,7 +35,7 @@ impl<W> RequestJob<W>
where
W: Actor + Handler<ProcessJob>,
{
pub fn new(worker_id: usize, queue: &str, addr: Addr<W>) -> Self {
pub fn new(worker_id: u64, queue: &str, addr: Addr<W>) -> Self {
RequestJob {
worker_id,
queue: queue.to_owned(),
@ -59,124 +63,60 @@ impl Message for GetStats {
type Result = Result<Stats, Error>;
}
struct Cache<W>
pub struct Server<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
{
workers: VecDeque<RequestJob<W>>,
jobs: VecDeque<JobInfo>,
storage: S,
cache: HashMap<String, VecDeque<RequestJob<W>>>,
}
impl<W> Cache<W>
impl<S, W> Server<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
{
fn new() -> Self {
Cache {
workers: VecDeque::new(),
jobs: VecDeque::new(),
}
}
}
pub struct Server<W>
where
W: Actor + Handler<ProcessJob>,
{
server_id: usize,
storage: Storage,
cache: HashMap<String, Cache<W>>,
cache_size: usize,
}
impl<W> Server<W>
where
W: Actor + Handler<ProcessJob>,
{
pub fn new(server_id: usize, storage: Storage) -> Self {
pub fn new(storage: S) -> Self {
Server {
server_id,
storage,
cache: HashMap::new(),
cache_size: 25,
}
}
pub fn set_cache_size(&mut self, cache_size: usize) {
self.cache_size = cache_size;
}
fn populate(&mut self, queue: &str) -> Result<bool, Error> {
trace!("Populating queue {}", queue);
let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new());
if entry.jobs.is_empty() {
let new_jobs = self
.storage
.stage_jobs(self.cache_size, queue, self.server_id)?;
let empty = new_jobs.is_empty();
debug!("Retrieved {} jobs from storage", new_jobs.len());
trace!("{:?}", new_jobs.iter().map(|j| j.id()).collect::<Vec<_>>());
new_jobs
.into_iter()
.for_each(|job| entry.jobs.push_back(job));
Ok(!empty)
} else {
Ok(true)
}
}
}
impl<W> Actor for Server<W>
impl<S, W> Actor for Server<S, W>
where
S: Storage + 'static,
W: Actor + Handler<ProcessJob>,
{
type Context = SyncContext<Self>;
fn started(&mut self, _: &mut Self::Context) {
self.storage.requeue_staged_jobs(self.server_id).unwrap();
self.storage.check_stalled_jobs(self.server_id).unwrap();
}
}
impl<W> Handler<EitherJob> for Server<W>
impl<S, W> Handler<NewJob> for Server<S, W>
where
S: Storage + 'static,
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
{
type Result = Result<(), Error>;
fn handle(&mut self, msg: EitherJob, _: &mut Self::Context) -> Self::Result {
let mut job = match msg {
EitherJob::New(new_job) => {
let job = self.storage.assign_id(new_job, self.server_id)?;
debug!("Created job {}, {:?}", job.id(), job);
job
}
EitherJob::Existing(job) => job,
};
fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result {
let queue = msg.0.queue().to_owned();
let ready = msg.0.is_ready();
self.storage.new_job(msg.0)?;
let retry_now = job.is_pending() || (job.needs_retry() && job.retry_ready());
if job.is_pending() && !retry_now {
trace!("Storing job {} for later processing", job.id());
}
self.storage.store_job(job.clone(), self.server_id)?;
if retry_now {
if ready {
let entry = self
.cache
.entry(job.queue().to_owned())
.or_insert(Cache::new());
.entry(queue.clone())
.or_insert(VecDeque::new());
if let Some(worker) = entry.workers.pop_front() {
debug!("Retrying job {} on worker {}", job.id(), worker.worker_id);
worker.addr.do_send(ProcessJob::new(job.clone()));
job.set_running();
self.storage.store_job(job, worker.worker_id)?;
} else if entry.jobs.len() < self.cache_size {
entry.jobs.push_back(job);
if let Some(request) = entry.pop_front() {
if let Some(job) = self.storage.request_job(&queue, request.worker_id)? {
request.addr.do_send(ProcessJob::new(job));
} else {
entry.push_back(request);
}
}
}
@ -184,83 +124,76 @@ where
}
}
impl<W> Handler<RequestJob<W>> for Server<W>
impl<S, W> Handler<ReturningJob> for Server<S, W>
where
S: Storage + 'static,
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
{
type Result = Result<(), Error>;
fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result {
self.storage.return_job(msg.0).map_err(|e| e.into())
}
}
impl<S, W> Handler<RequestJob<W>> for Server<S, W>
where
S: Storage + 'static,
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
{
type Result = Result<(), Error>;
fn handle(&mut self, msg: RequestJob<W>, _: &mut Self::Context) -> Self::Result {
trace!("Worker {} requested job", msg.worker_id);
self.populate(&msg.queue)?;
let job = self.storage.request_job(&msg.queue, msg.worker_id)?;
let job = self
.cache
.get_mut(&msg.queue)
.and_then(|cache| cache.jobs.pop_front());
if let Some(mut job) = job {
if let Some(job) = job {
msg.addr.do_send(ProcessJob::new(job.clone()));
job.set_running();
self.storage.store_job(job, msg.worker_id)?;
} else {
trace!("storing worker {} for queue {}", msg.worker_id, msg.queue);
let entry = self.cache.entry(msg.queue.clone()).or_insert(Cache::new());
entry.workers.push_back(msg);
let entry = self
.cache
.entry(msg.queue.to_owned())
.or_insert(VecDeque::new());
entry.push_back(msg);
}
Ok(())
}
}
impl<W> Handler<CheckDb> for Server<W>
impl<S, W> Handler<CheckDb> for Server<S, W>
where
S: Storage + 'static,
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
{
type Result = Result<(), Error>;
fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result {
trace!("Checkdb");
let queues: Vec<String> = self.cache.keys().cloned().collect();
let mut todo = Vec::new();
for queue in queues {
if self.populate(&queue)? {
debug!("Cached jobs for {}", queue);
for (queue, workers) in self.cache.iter_mut() {
if let Some(request) = workers.pop_front() {
if let Some(job) = self.storage.request_job(queue, request.worker_id)? {
request.addr.do_send(ProcessJob::new(job));
} else {
workers.push_back(request);
}
}
let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new());
let min_len = entry.jobs.len().min(entry.workers.len());
entry
.jobs
.drain(..min_len)
.zip(entry.workers.drain(..min_len))
.for_each(|pair| {
todo.push(pair);
});
}
for (mut job, worker) in todo {
debug!("Sending job {} to worker {}", job.id(), worker.worker_id);
worker.addr.do_send(ProcessJob::new(job.clone()));
job.set_running();
self.storage.store_job(job, worker.worker_id)?;
}
Ok(())
}
}
impl<W> Handler<GetStats> for Server<W>
impl<S, W> Handler<GetStats> for Server<S, W>
where
S: Storage + 'static,
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
{
type Result = Result<Stats, Error>;
fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result {
Ok(self.storage.get_stats()?)
self.storage.get_stats().map_err(|e| e.into())
}
}

View file

@ -4,10 +4,10 @@ use actix::{
fut::{wrap_future, ActorFuture},
Actor, Addr, AsyncContext, Context, Handler, Message,
};
use background_jobs_core::{JobInfo, ProcessorMap};
use background_jobs_core::{JobInfo, ProcessorMap, Storage};
use log::info;
use crate::{EitherJob, RequestJob, Server};
use crate::{RequestJob, ReturningJob, Server};
pub struct ProcessJob {
job: JobInfo,
@ -23,25 +23,27 @@ impl Message for ProcessJob {
type Result = ();
}
pub struct LocalWorker<State>
pub struct LocalWorker<S, State>
where
S: Storage + 'static,
State: Clone + 'static,
{
id: usize,
id: u64,
queue: String,
processors: Arc<ProcessorMap<State>>,
server: Addr<Server<LocalWorker<State>>>,
server: Addr<Server<S, LocalWorker<S, State>>>,
}
impl<State> LocalWorker<State>
impl<S, State> LocalWorker<S, State>
where
S: Storage + 'static,
State: Clone + 'static,
{
pub fn new(
id: usize,
id: u64,
queue: String,
processors: Arc<ProcessorMap<State>>,
server: Addr<Server<Self>>,
server: Addr<Server<S, Self>>,
) -> Self {
LocalWorker {
id,
@ -52,8 +54,9 @@ where
}
}
impl<State> Actor for LocalWorker<State>
impl<S, State> Actor for LocalWorker<S, State>
where
S: Storage + 'static,
State: Clone + 'static,
{
type Context = Context<Self>;
@ -64,8 +67,9 @@ where
}
}
impl<State> Handler<ProcessJob> for LocalWorker<State>
impl<S, State> Handler<ProcessJob> for LocalWorker<S, State>
where
S: Storage + 'static,
State: Clone + 'static,
{
type Result = ();
@ -74,7 +78,7 @@ where
info!("Worker {} processing job {}", self.id, msg.job.id());
let fut =
wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| {
actor.server.do_send(EitherJob::Existing(job));
actor.server.do_send(ReturningJob(job));
actor
.server
.do_send(RequestJob::new(actor.id, &actor.queue, ctx.address()));

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor on tokio"
version = "0.4.1"
version = "0.5.0"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -13,8 +13,6 @@ chrono = { version = "0.4", features = ["serde"] }
failure = "0.1"
futures = "0.1.21"
log = "0.4"
kv = { version = "0.7", features = ["json-value"] }
lmdb = "0.8"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"

View file

@ -22,7 +22,36 @@ use log::trace;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct ReturnJobInfo {
pub(crate) id: u64,
pub(crate) result: JobResult,
}
impl ReturnJobInfo {
pub(crate) fn fail(id: u64) -> Self {
ReturnJobInfo {
id,
result: JobResult::Failure,
}
}
pub(crate) fn pass(id: u64) -> Self {
ReturnJobInfo {
id,
result: JobResult::Success,
}
}
pub(crate) fn missing_processor(id: u64) -> Self {
ReturnJobInfo {
id,
result: JobResult::MissingProcessor,
}
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct NewJobInfo {
@ -67,7 +96,15 @@ impl NewJobInfo {
}
}
pub(crate) fn with_id(self, id: usize) -> JobInfo {
pub fn queue(&self) -> &str {
&self.queue
}
pub fn is_ready(&self) -> bool {
self.next_queue.is_none()
}
pub(crate) fn with_id(self, id: u64) -> JobInfo {
JobInfo {
id,
processor: self.processor,
@ -92,7 +129,7 @@ impl NewJobInfo {
/// new_job method.
pub struct JobInfo {
/// ID of the job
id: usize,
id: u64,
/// Name of the processor that should handle this job
processor: String,
@ -127,7 +164,7 @@ impl JobInfo {
&self.queue
}
pub(crate) fn updated(&mut self) {
fn updated(&mut self) {
self.updated_at = Utc::now();
}
@ -139,20 +176,17 @@ impl JobInfo {
self.args.clone()
}
pub(crate) fn status(&self) -> JobStatus {
self.status.clone()
}
pub fn id(&self) -> usize {
pub fn id(&self) -> u64 {
self.id
}
pub(crate) fn increment(&mut self) -> ShouldStop {
self.updated();
self.retry_count += 1;
self.max_retries.compare(self.retry_count)
}
pub(crate) fn next_queue(&mut self) {
fn next_queue(&mut self) {
let now = Utc::now();
let next_queue = match self.backoff_strategy {
@ -173,19 +207,15 @@ impl JobInfo {
);
}
pub(crate) fn is_stale(&self) -> bool {
self.updated_at < Utc::now() - OldDuration::days(1)
}
pub(crate) fn is_ready(&self, now: DateTime<Utc>) -> bool {
pub fn is_ready(&self, now: DateTime<Utc>) -> bool {
match self.next_queue {
Some(ref time) => now > *time,
None => true,
}
}
pub fn needs_retry(&mut self) -> bool {
let should_retry = self.is_failed() && self.increment().should_requeue();
pub(crate) fn needs_retry(&mut self) -> bool {
let should_retry = self.increment().should_requeue();
if should_retry {
self.pending();
@ -195,47 +225,21 @@ impl JobInfo {
should_retry
}
pub fn retry_ready(&self) -> bool {
self.is_ready(Utc::now())
}
pub fn is_pending(&self) -> bool {
self.status == JobStatus::Pending
}
pub fn is_failed(&self) -> bool {
self.status == JobStatus::Failed
}
pub fn is_finished(&self) -> bool {
self.status == JobStatus::Finished
}
pub(crate) fn is_in_queue(&self, queue: &str) -> bool {
self.queue == queue
}
pub(crate) fn stage(&mut self) {
self.status = JobStatus::Staged;
}
/// This method sets the Job's status to running
///
/// Touching this outside of the background_jobs crates is dangerous, since these libraries
/// rely on the state of the job being correct.
pub fn set_running(&mut self) {
pub(crate) fn run(&mut self) {
self.updated();
self.status = JobStatus::Running;
}
pub(crate) fn pending(&mut self) {
self.updated();
self.status = JobStatus::Pending;
}
pub(crate) fn fail(&mut self) {
self.status = JobStatus::Failed;
}
pub(crate) fn pass(&mut self) {
self.status = JobStatus::Finished;
}
}

View file

@ -24,14 +24,16 @@ mod job;
mod job_info;
mod processor;
mod processor_map;
mod stats;
mod storage;
pub use crate::{
job::Job,
job_info::{JobInfo, NewJobInfo},
job_info::{JobInfo, NewJobInfo, ReturnJobInfo},
processor::Processor,
processor_map::ProcessorMap,
storage::{JobStat, Stat, Stats, Storage},
stats::{JobStat, Stats},
storage::Storage,
};
#[derive(Debug, Fail)]
@ -50,23 +52,65 @@ pub enum JobError {
MissingProcessor,
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
pub enum JobResult {
Success,
Failure,
MissingProcessor,
}
impl JobResult {
pub fn success() -> Self {
JobResult::Success
}
pub fn failure() -> Self {
JobResult::Failure
}
pub fn missing_processor() -> Self {
JobResult::MissingProcessor
}
pub fn is_failure(&self) -> bool {
*self == JobResult::Failure
}
pub fn is_success(&self) -> bool {
*self == JobResult::Success
}
pub fn is_missing_processor(&self) -> bool {
*self == JobResult::MissingProcessor
}
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
/// Set the status of a job when storing it
pub enum JobStatus {
/// Job should be queued
Pending,
/// Job has been dequeued, but is not yet running
Staged,
/// Job is running
Running,
}
/// Job has failed
Failed,
impl JobStatus {
pub fn pending() -> Self {
JobStatus::Pending
}
/// Job has finished
Finished,
pub fn running() -> Self {
JobStatus::Running
}
pub fn is_pending(&self) -> bool {
*self == JobStatus::Pending
}
pub fn is_running(&self) -> bool {
*self == JobStatus::Running
}
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]

View file

@ -23,7 +23,7 @@ use futures::future::{Either, Future, IntoFuture};
use log::{error, info};
use serde_json::Value;
use crate::{JobError, JobInfo, Processor};
use crate::{JobError, JobInfo, Processor, ReturnJobInfo};
/// A generic function that processes a job
///
@ -35,7 +35,6 @@ use crate::{JobError, JobInfo, Processor};
pub type ProcessFn<S> =
Box<dyn Fn(Value, S) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send>;
pub type StateFn<S> = Box<dyn Fn() -> S + Send + Sync>;
/// A type for storing the relationships between processor names and the processor itself
@ -87,7 +86,7 @@ where
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = ReturnJobInfo, Error = ()> {
let opt = self
.inner
.get(job.processor())
@ -97,26 +96,28 @@ where
Either::A(fut)
} else {
error!("Processor {} not present", job.processor());
Either::B(Ok(job).into_future())
Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future())
}
}
}
fn process<S>(process_fn: &ProcessFn<S>, state: S, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
fn process<S>(
process_fn: &ProcessFn<S>,
state: S,
job: JobInfo,
) -> impl Future<Item = ReturnJobInfo, Error = ()> {
let args = job.args();
let id = job.id();
let processor = job.processor().to_owned();
process_fn(args, state).then(move |res| match res {
Ok(_) => {
info!("Job {} completed, {}", job.id(), processor);
job.pass();
Ok(job)
info!("Job {} completed, {}", id, processor);
Ok(ReturnJobInfo::pass(id))
}
Err(e) => {
error!("Job {} errored, {}, {}", job.id(), processor, e);
job.fail();
Ok(job)
error!("Job {} errored, {}, {}", id, processor, e);
Ok(ReturnJobInfo::fail(id))
}
})
}

163
jobs-core/src/stats.rs Normal file
View file

@ -0,0 +1,163 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2018 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use chrono::{offset::Utc, DateTime, Datelike, Timelike};
use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Stats {
pub pending: usize,
pub running: usize,
pub dead: JobStat,
pub complete: JobStat,
}
impl Stats {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn new_job(mut self) -> Self {
self.pending += 1;
self
}
pub(crate) fn run_job(mut self) -> Self {
if self.pending > 0 {
self.pending -= 1;
}
self.running += 1;
self
}
pub(crate) fn retry_job(mut self) -> Self {
self.pending += 1;
if self.running > 0 {
self.running -= 1;
}
self
}
pub(crate) fn fail_job(mut self) -> Self {
if self.running > 0 {
self.running -= 1;
}
self.dead.increment();
self
}
pub(crate) fn complete_job(mut self) -> Self {
if self.running > 0 {
self.running -= 1;
}
self.complete.increment();
self
}
}
impl Default for Stats {
fn default() -> Self {
Stats {
pending: 0,
running: 0,
dead: JobStat::default(),
complete: JobStat::default(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JobStat {
this_hour: usize,
today: usize,
this_month: usize,
all_time: usize,
updated_at: DateTime<Utc>,
}
impl JobStat {
pub fn new() -> Self {
Self::default()
}
fn increment(&mut self) {
self.tick();
self.this_hour += 1;
self.today += 1;
self.this_month += 1;
self.all_time += 1;
}
fn tick(&mut self) {
let now = Utc::now();
if now.month() != self.updated_at.month() {
self.next_month();
} else if now.day() != self.updated_at.day() {
self.next_day();
} else if now.hour() != self.updated_at.hour() {
self.next_hour();
}
self.updated_at = now;
}
fn next_hour(&mut self) {
self.this_hour = 0;
}
fn next_day(&mut self) {
self.next_hour();
self.today = 0;
}
fn next_month(&mut self) {
self.next_day();
self.this_month = 0;
}
pub fn this_hour(&self) -> usize {
self.this_hour
}
pub fn today(&self) -> usize {
self.today
}
pub fn this_month(&self) -> usize {
self.this_month
}
pub fn all_time(&self) -> usize {
self.all_time
}
}
impl Default for JobStat {
fn default() -> Self {
JobStat {
this_hour: 0,
today: 0,
this_month: 0,
all_time: 0,
updated_at: Utc::now(),
}
}
}

File diff suppressed because it is too large Load diff

18
jobs-sled/Cargo.toml Normal file
View file

@ -0,0 +1,18 @@
[package]
name = "background-jobs-sled-storage"
description = "Sled storage backend for background-jobs"
version = "0.1.0"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
readme = "README.md"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
background-jobs-core = { version = "0.5", path = "../jobs-core" }
failure = "0.1"
sled = "0.24"
serde = "1.0"
serde_json = "1.0"

14
jobs-sled/README.md Normal file
View file

@ -0,0 +1,14 @@
# Jobs Sled
_a Sled storage backend for background-jobs_
This is the default storage backend for the Background Jobs library based on [Sled](https://github.com/spacejam/sled). It also servers as a reference implementation for storage backends.
### License
Copyright © 2018 Riley Trautman
Background Jobs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Background Jobs is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. This file is part of Background Jobs.
You should have received a copy of the GNU General Public License along with Background Jobs. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/).

259
jobs-sled/src/lib.rs Normal file
View file

@ -0,0 +1,259 @@
use background_jobs_core::{JobInfo, Storage, Stats};
use failure::Fail;
use std::{marker::PhantomData, sync::Arc};
#[derive(Clone)]
pub struct SledStorage {
jobinfo: Tree<JobInfo>,
running: Tree<u64>,
running_inverse: Tree<u64>,
queue: Tree<String>,
stats: Tree<Stats>,
db: sled::Db,
}
impl Storage for SledStorage {
type Error = Error;
fn generate_id(&mut self) -> Result<u64> {
self.db.generate_id().map_err(Error::from)
}
fn save_job(&mut self, job: JobInfo) -> Result<()> {
self.jobinfo.set(&job_key(job.id()), job).map(|_| ())
}
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>> {
self.jobinfo.get(&job_key(id))
}
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
let job = self
.queue
.iter()
.filter_map(|res| res.ok())
.filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None })
.filter_map(|id| self.jobinfo.get(id).ok())
.filter_map(|opt| opt)
.next();
Ok(job)
}
fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> {
if let Some(runner_id) = self.running_inverse.del(&job_key(id))? {
self.running.del(&runner_key(runner_id))?;
}
self.queue.set(&job_key(id), queue.to_owned()).map(|_| ())
}
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> {
self.queue.del(&job_key(id))?;
self.running.set(&runner_key(runner_id), id)?;
self.running_inverse.set(&job_key(id), runner_id)?;
Ok(())
}
fn delete_job(&mut self, id: u64) -> Result<()> {
self.jobinfo.del(&job_key(id))?;
self.queue.del(&job_key(id))?;
if let Some(runner_id) = self.running_inverse.del(&job_key(id))? {
self.running.del(&runner_key(runner_id))?;
}
Ok(())
}
fn get_stats(&self) -> Result<Stats> {
Ok(self.stats.get("stats")?.unwrap_or(Stats::default()))
}
fn update_stats<F>(&mut self, f: F) -> Result<()>
where
F: Fn(Stats) -> Stats,
{
self.stats.fetch_and_update("stats", |opt| {
let stats = match opt {
Some(stats) => stats,
None => Stats::default(),
};
Some((f)(stats))
})?;
Ok(())
}
}
fn job_key(id: u64) -> String {
format!("job-{}", id)
}
fn runner_key(runner_id: u64) -> String {
format!("runner-{}", runner_id)
}
impl SledStorage {
pub fn new(db: sled::Db) -> Result<Self> {
Ok(SledStorage {
jobinfo: open_tree(&db, "background-jobs-jobinfo")?,
running: open_tree(&db, "background-jobs-running")?,
running_inverse: open_tree(&db, "background-jobs-running-inverse")?,
queue: open_tree(&db, "background-jobs-queue")?,
stats: open_tree(&db, "background-jobs-stats")?,
db,
})
}
}
fn open_tree<T>(db: &sled::Db, name: &str) -> sled::Result<Tree<T>>
where
T: serde::de::DeserializeOwned + serde::ser::Serialize,
{
db.open_tree(name).map(Tree::new)
}
#[derive(Clone)]
struct Tree<T>(Arc<sled::Tree>, PhantomData<T>);
impl<T> Tree<T>
where
T: serde::de::DeserializeOwned + serde::ser::Serialize,
{
fn new(t: Arc<sled::Tree>) -> Self {
Tree(t, PhantomData)
}
fn iter(&self) -> Iter<T> {
Iter::new(self.0.iter())
}
fn get<K>(&self, key: K) -> Result<Option<T>>
where
K: AsRef<[u8]>
{
match self.0.get(key)? {
Some(vec) => {
serde_json::from_slice(&vec)
.map_err(|_| Error::Deserialize)
.map(Some)
},
None => Ok(None),
}
}
fn set(&self, key: &str, value: T) -> Result<Option<T>> {
let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?;
Ok(self.0.set(key, vec)?.map(move |_| value))
}
fn del(&self, key: &str) -> Result<Option<T>> {
match self.0.del(key)? {
Some(vec) => {
serde_json::from_slice(&vec)
.map_err(|_| Error::Deserialize)
.map(Some)
},
None => Ok(None),
}
}
fn fetch_and_update<F>(&self, key: &str, f: F) -> Result<Option<T>>
where
F: Fn(Option<T>) -> Option<T>,
{
let final_opt = self.0.fetch_and_update(key, |opt| {
let new_opt = match opt {
Some(vec) => {
let t = serde_json::from_slice(&vec)
.map(Some)
.unwrap_or(None);
(f)(t)
},
None => (f)(None),
};
match new_opt {
Some(t) => serde_json::to_vec(&t)
.map(Some)
.unwrap_or(None),
None => None,
}
})?;
match final_opt {
Some(vec) => {
serde_json::from_slice(&vec)
.map_err(|_| Error::Deserialize)
.map(Some)
},
None => Ok(None),
}
}
}
struct Iter<'a, T>(sled::Iter<'a>, PhantomData<T>);
impl<'a, T> Iter<'a, T> {
fn new(i: sled::Iter<'a>) -> Self {
Iter(i, PhantomData)
}
}
#[derive(Clone, Debug, Fail)]
pub enum Error {
#[fail(display = "Error in database: {}", _0)]
Sled(#[cause] sled::Error),
#[fail(display = "Failed to deserialize data")]
Deserialize,
#[fail(display = "Failed to serialize data")]
Serialize,
}
type Result<T> = std::result::Result<T, Error>;
impl<'a, T> Iterator for Iter<'a, T>
where
T: serde::de::DeserializeOwned
{
type Item = Result<(Vec<u8>, T)>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|res| {
res.map_err(Error::from).and_then(|(k, v)| {
serde_json::from_slice(&v)
.map(|item| (k, item))
.map_err(|_| Error::Deserialize)
})
})
}
}
impl<'a, T> DoubleEndedIterator for Iter<'a, T>
where
T: serde::de::DeserializeOwned
{
fn next_back(&mut self) -> Option<Self::Item> {
self.0.next_back().map(|res| {
res.map_err(Error::from).and_then(|(k, v)| {
serde_json::from_slice(&v)
.map(|item| (k, item))
.map_err(|_| Error::Deserialize)
})
})
}
}
impl From<sled::Error> for Error {
fn from(e: sled::Error) -> Self {
Error::Sled(e)
}
}

View file

@ -273,7 +273,10 @@
//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some
//! other useful types for implementing a jobs processor.
pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats};
pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats};
#[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};
#[cfg(feature = "background-jobs-sled-storage")]
pub use background_jobs_sled_storage::{SledStorage, Error as SledStorageError};