Make background-jobs simpler to use

This commit is contained in:
asonix 2019-05-27 19:01:21 -05:00
parent 645041fbac
commit 1f10095269
13 changed files with 261 additions and 77 deletions

View file

@ -10,7 +10,7 @@ might not be the best experience.
```toml
[dependencies]
actix = "0.8"
background-jobs = "0.5.1"
background-jobs = "0.6.0"
failure = "0.1"
futures = "0.1"
serde = "1.0"
@ -24,6 +24,7 @@ operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeO
```rust
use background_jobs::Job;
use failure::Error;
use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
@ -42,6 +43,9 @@ impl MyJob {
}
impl Job for MyJob {
type Processor = MyProcessor; // We will define this later
type State = ();
fn run(self, _: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("args: {:?}", self);
@ -71,7 +75,10 @@ impl MyState {
}
}
impl Job<MyState> for MyJob {
impl Job for MyJob {
type Processor = MyProcessor; // We will define this later
type State = MyState;
fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("{}: args, {:?}", state.app_name, self);
@ -92,7 +99,7 @@ const DEFAULT_QUEUE: &'static str = "default";
#[derive(Clone, Debug)]
pub struct MyProcessor;
impl Processor<MyState> for MyProcessor {
impl Processor for MyProcessor {
// The kind of job this processor should execute
type Job = MyJob;
@ -128,7 +135,8 @@ spawning new jobs.
`background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This
can be implemented manually by implementing the `Storage` trait from `background-jobs-core`,
or the `background-jobs-sled-storage` crate can be used to provide a
the in-memory store provided in the `background-jobs-core` crate can be used, or the
`background-jobs-sled-storage` crate can be used to provide a
[Sled](https://github.com/spacejam/sled)-backed jobs store.
With that out of the way, back to the examples:
@ -136,7 +144,7 @@ With that out of the way, back to the examples:
##### Main
```rust
use actix::System;
use background_jobs::{ServerConfig, SledStorage, WorkerConfig};
use background_jobs::{ServerConfig, WorkerConfig};
use failure::Error;
fn main() -> Result<(), Error> {
@ -144,22 +152,31 @@ fn main() -> Result<(), Error> {
let sys = System::new("my-actix-system");
// Set up our Storage
// For this example, we use the default in-memory storage mechanism
use background_jobs::memory_storage::Storage;
let storage = Storage::new();
/*
// Optionally, a storage backend using the Sled database is provided
use sled::Db;
use background_jobs::sled_storage::Storage;
let db = Db::start_default("my-sled-db")?;
let storage = SledStorage::new(db)?;
let storage = Storage::new(db)?;
*/
// Start the application server. This guards access to to the jobs store
let queue_handle = ServerConfig::new(storage).start();
let queue_handle = ServerConfig::new(storage).thread_count(8).start();
// Configure and start our workers
let mut worker_config = WorkerConfig::new(move || MyState::new("My App"));
worker_config.register(MyProcessor);
worker_config.set_processor_count(DEFAULT_QUEUE, 16);
worker_config.start(queue_handle.clone());
WorkerConfig::new(move || MyState::new("My App"))
.register(MyProcessor(queue_handle.clone()))
.set_processor_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone());
// Queue our jobs
queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
// Block on Actix
sys.run()?;

4
TODO
View file

@ -1,4 +0,0 @@
1.
Gracefull Shutdown
2.
Periodically check staged jobs

View file

@ -1,11 +1,10 @@
use actix::System;
use background_jobs::{
Backoff, Job, MaxRetries, Processor, ServerConfig, SledStorage, WorkerConfig, QueueHandle,
Backoff, Job, MaxRetries, Processor, QueueHandle, ServerConfig, WorkerConfig,
};
use failure::Error;
use futures::{future::ok, Future};
use serde_derive::{Deserialize, Serialize};
use sled::Db;
const DEFAULT_QUEUE: &'static str = "default";
@ -24,24 +23,37 @@ pub struct MyJob {
pub struct MyProcessor(pub QueueHandle);
fn main() -> Result<(), Error> {
// First set up the Actix System to ensure we have a runtime to spawn jobs on.
let sys = System::new("my-actix-system");
// Set up our Storage
// For this example, we use the default in-memory storage mechanism
use background_jobs::memory_storage::Storage;
let storage = Storage::new();
/*
// Optionally, a storage backend using the Sled database is provided
use sled::Db;
use background_jobs::sled_storage::Storage;
let db = Db::start_default("my-sled-db")?;
let storage = SledStorage::new(db)?;
let storage = Storage::new(db)?;
*/
let queue_handle = ServerConfig::new(storage).thread_count(2).start();
let processor = MyProcessor(queue_handle.clone());
// Start the application server. This guards access to to the jobs store
let queue_handle = ServerConfig::new(storage).thread_count(8).start();
// Configure and start our workers
WorkerConfig::new(move || MyState::new("My App"))
.register(processor.clone())
.register(MyProcessor(queue_handle.clone()))
.set_processor_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone());
processor.queue(MyJob::new(1, 2))?;
processor.queue(MyJob::new(3, 4))?;
processor.queue(MyJob::new(5, 6))?;
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
// Block on Actix
sys.run()?;
Ok(())
}
@ -63,7 +75,10 @@ impl MyJob {
}
}
impl Job<MyState> for MyJob {
impl Job for MyJob {
type Processor = MyProcessor;
type State = MyState;
fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
println!("{}: args, {:?}", state.app_name, self);
@ -71,13 +86,7 @@ impl Job<MyState> for MyJob {
}
}
impl MyProcessor {
fn queue(&self, job: <Self as Processor<MyState>>::Job) -> Result<(), Error> {
self.0.queue::<Self, _>(job)
}
}
impl Processor<MyState> for MyProcessor {
impl Processor for MyProcessor {
// The kind of job this processor should execute
type Job = MyJob;

View file

@ -16,6 +16,7 @@ failure = "0.1"
futures = "0.1"
log = "0.4"
num_cpus = "1.10.0"
rand = "0.6.5"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"

View file

@ -1,7 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use actix::{Actor, Addr, SyncArbiter};
use background_jobs_core::{Processor, ProcessorMap, Stats, Storage};
use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage};
use failure::Error;
use futures::Future;
@ -66,7 +66,7 @@ where
Server::new(StorageWrapper(storage.clone()))
});
Pinger::new(server.clone()).start();
Pinger::new(server.clone(), threads).start();
QueueHandle { inner: server }
}
@ -91,11 +91,11 @@ where
}
}
pub fn register<P>(mut self, processor: P) -> Self
where
P: Processor<State> + Send + 'static,
{
self.queues.insert(P::QUEUE.to_owned(), 4);
pub fn register(
mut self,
processor: impl Processor<Job = impl Job<State = State> + Send + 'static> + Send + 'static,
) -> Self {
self.queues.insert(processor.queue().to_owned(), 4);
self.processors.register_processor(processor);
self
}
@ -130,12 +130,11 @@ pub struct QueueHandle {
}
impl QueueHandle {
pub fn queue<P, State>(&self, job: P::Job) -> Result<(), Error>
pub fn queue<J>(&self, job: J) -> Result<(), Error>
where
P: Processor<State>,
State: Clone,
J: Job,
{
self.inner.do_send(NewJob(P::new_job(job)?));
self.inner.do_send(NewJob(J::Processor::new_job(job)?));
Ok(())
}

View file

@ -5,11 +5,12 @@ use crate::{CheckDb, Server};
pub struct Pinger {
server: Addr<Server>,
threads: usize,
}
impl Pinger {
pub fn new(server: Addr<Server>) -> Self {
Pinger { server }
pub fn new(server: Addr<Server>, threads: usize) -> Self {
Pinger { server, threads }
}
}
@ -18,7 +19,9 @@ impl Actor for Pinger {
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(1), |actor, _| {
actor.server.do_send(CheckDb);
for _ in 0..actor.threads {
actor.server.do_send(CheckDb);
}
});
}
}

View file

@ -51,7 +51,7 @@ impl Message for RequestJob {
}
impl Message for CheckDb {
type Result = Result<(), Error>;
type Result = ();
}
impl Message for GetStats {
@ -70,7 +70,7 @@ impl Handler<NewJob> for Server {
let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new());
if let Some(worker) = entry.pop_front() {
if let Some(job) = self.storage.request_job(&queue, worker.id())? {
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()) {
worker.process_job(job);
} else {
entry.push_back(worker);
@ -117,7 +117,7 @@ impl Handler<RequestJob> for Server {
}
impl Handler<CheckDb> for Server {
type Result = Result<(), Error>;
type Result = ();
fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result {
trace!("Checkdb");
@ -125,7 +125,7 @@ impl Handler<CheckDb> for Server {
for (queue, workers) in self.cache.iter_mut() {
while !workers.is_empty() {
if let Some(worker) = workers.pop_front() {
if let Some(job) = self.storage.request_job(queue, worker.id())? {
if let Ok(Some(job)) = self.storage.request_job(queue, worker.id()) {
worker.process_job(job);
} else {
workers.push_back(worker);
@ -134,8 +134,6 @@ impl Handler<CheckDb> for Server {
}
}
}
Ok(())
}
}

View file

@ -21,13 +21,17 @@ use failure::Error;
use futures::Future;
use serde::{de::DeserializeOwned, ser::Serialize};
use crate::{Backoff, MaxRetries};
use crate::{Backoff, MaxRetries, Processor};
/// The Job trait defines parameters pertaining to an instance of background job
pub trait Job<S = ()>: Serialize + DeserializeOwned
where
S: Clone + 'static,
{
pub trait Job: Serialize + DeserializeOwned + 'static {
/// The processor this job is associated with. The job's processor can be used to create a
/// JobInfo from a job, which is used to serialize the job into a storage mechanism.
type Processor: Processor<Job = Self>;
/// The application state provided to this job at runtime.
type State: Clone + 'static;
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
@ -37,7 +41,7 @@ where
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: S) -> Box<dyn Future<Item = (), Error = Error> + Send>;
fn run(self, state: Self::State) -> Box<dyn Future<Item = (), Error = Error> + Send>;
/// If this job should not use the default queue for its processor, this can be overridden in
/// user-code.

View file

@ -33,7 +33,7 @@ pub use crate::{
processor::Processor,
processor_map::ProcessorMap,
stats::{JobStat, Stats},
storage::Storage,
storage::{memory_storage, Storage},
};
#[derive(Debug, Fail)]

View file

@ -81,11 +81,8 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// Ok(())
/// }
/// ```
pub trait Processor<S = ()>: Clone
where
S: Clone + 'static,
{
type Job: Job<S> + 'static;
pub trait Processor: Clone {
type Job: Job + 'static;
/// The name of the processor
///
@ -175,7 +172,7 @@ where
fn process(
&self,
args: Value,
state: S,
state: <Self::Job as Job>::State,
) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
let res = serde_json::from_value::<Self::Job>(args);
@ -186,6 +183,16 @@ where
Box::new(fut)
}
/// Hack to access Associated Constant from impl type
fn name(&self) -> &'static str {
Self::NAME
}
/// Hack to access Associated Constant from impl type
fn queue(&self) -> &'static str {
Self::QUEUE
}
}
#[derive(Clone, Debug, Fail)]

View file

@ -23,7 +23,7 @@ use futures::future::{Either, Future, IntoFuture};
use log::{error, info};
use serde_json::Value;
use crate::{JobError, JobInfo, Processor, ReturnJobInfo};
use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo};
/// A generic function that processes a job
///
@ -72,12 +72,12 @@ where
///
/// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so
/// make sure to register all your processors up-front.
pub fn register_processor<P>(&mut self, processor: P)
where
P: Processor<S> + Send + 'static,
{
pub fn register_processor(
&mut self,
processor: impl Processor<Job = impl Job<State = S> + 'static> + Send + 'static,
) {
self.inner.insert(
P::NAME.to_owned(),
processor.name().to_owned(),
Box::new(move |value, state| processor.process(value, state)),
);
}

View file

@ -23,7 +23,14 @@ use log::error;
use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
/// Define a storage backend for jobs
///
/// This crate provides a default implementation in the `memory_storage` module, which is backed by
/// HashMaps and uses counting to assign IDs. If jobs must be persistent across application
/// restarts, look into the `[sled-backed](https://github.com/spacejam/sled)` implementation from
/// the `background-jobs-sled-storage` crate.
pub trait Storage: Clone + Send {
/// The error type used by the storage mechansim.
type Error: Fail;
/// This method generates unique IDs for jobs
@ -132,3 +139,142 @@ pub trait Storage: Clone + Send {
}
}
}
pub mod memory_storage {
use super::{JobInfo, Stats};
use failure::Fail;
use std::{
collections::HashMap,
fmt,
sync::{Arc, Mutex},
};
#[derive(Clone)]
pub struct Storage {
inner: Arc<Mutex<Inner>>,
}
#[derive(Clone)]
struct Inner {
count: u64,
jobs: HashMap<u64, JobInfo>,
queues: HashMap<u64, String>,
worker_ids: HashMap<u64, u64>,
worker_ids_inverse: HashMap<u64, u64>,
stats: Stats,
}
impl Storage {
pub fn new() -> Self {
Storage {
inner: Arc::new(Mutex::new(Inner {
count: 0,
jobs: HashMap::new(),
queues: HashMap::new(),
worker_ids: HashMap::new(),
worker_ids_inverse: HashMap::new(),
stats: Stats::default(),
})),
}
}
}
impl super::Storage for Storage {
type Error = Never;
fn generate_id(&mut self) -> Result<u64, Self::Error> {
let mut inner = self.inner.lock().unwrap();
let id = inner.count;
inner.count = inner.count.wrapping_add(1);
Ok(id)
}
fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error> {
self.inner.lock().unwrap().jobs.insert(job.id(), job);
Ok(())
}
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>, Self::Error> {
let j = self.inner.lock().unwrap().jobs.get(&id).map(|j| j.clone());
Ok(j)
}
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>, Self::Error> {
let mut inner = self.inner.lock().unwrap();
let j = inner
.queues
.iter()
.filter_map(|(k, v)| {
if v == queue {
inner.jobs.get(k).map(|j| j.clone())
} else {
None
}
})
.next();
if let Some(ref j) = j {
inner.queues.remove(&j.id());
}
Ok(j)
}
fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error> {
self.inner
.lock()
.unwrap()
.queues
.insert(id, queue.to_owned());
Ok(())
}
fn run_job(&mut self, id: u64, worker_id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
inner.worker_ids.insert(id, worker_id);
inner.worker_ids_inverse.insert(worker_id, id);
Ok(())
}
fn delete_job(&mut self, id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
inner.jobs.remove(&id);
inner.queues.remove(&id);
if let Some(worker_id) = inner.worker_ids.remove(&id) {
inner.worker_ids_inverse.remove(&worker_id);
}
Ok(())
}
fn get_stats(&self) -> Result<Stats, Self::Error> {
Ok(self.inner.lock().unwrap().stats.clone())
}
fn update_stats<F>(&mut self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats,
{
let mut inner = self.inner.lock().unwrap();
inner.stats = (f)(inner.stats.clone());
Ok(())
}
}
#[derive(Clone, Debug, Fail)]
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
#[derive(Clone, Debug, Fail)]
#[fail(display = "Created too many storages, can't generate any more IDs")]
pub struct TooManyStoragesError;
}

View file

@ -197,10 +197,14 @@
//! `background-jobs-core` crate, which provides the Processor and Job traits, as well as some
//! other useful types for implementing a jobs processor and job store.
pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats};
pub use background_jobs_core::{
memory_storage, Backoff, Job, JobStat, MaxRetries, Processor, Stats,
};
#[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};
#[cfg(feature = "background-jobs-sled-storage")]
pub use background_jobs_sled_storage::{Error as SledStorageError, SledStorage};
pub mod sled_storage {
pub use background_jobs_sled_storage::{Error, SledStorage as Storage};
}