Update readme, main repo docs

This commit is contained in:
asonix 2019-05-25 16:15:09 -05:00
parent 986a08b7e9
commit d3987768a5
6 changed files with 229 additions and 292 deletions

View file

@ -14,6 +14,7 @@ members = [
"jobs-actix",
"jobs-core",
"jobs-sled",
"examples/actix-example",
]
[features]

211
README.md
View file

@ -9,10 +9,13 @@ might not be the best experience.
#### Add Background Jobs to your project
```toml
[dependencies]
background-jobs = "0.4"
actix = "0.8"
background-jobs = "0.5"
failure = "0.1"
futures = "0.1"
tokio = "0.1"
serde = "1.0"
serde_drive = "1.0"
sled = "0.24"
```
#### To get started with Background Jobs, first you should define a job.
@ -20,6 +23,9 @@ Jobs are a combination of the data required to perform an operation, and the log
operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
```rust
use background_jobs::Job;
use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MyJob {
some_usize: usize,
@ -36,7 +42,7 @@ impl MyJob {
}
impl Job for MyJob {
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
fn run(self, _: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("args: {:?}", self);
Box::new(Ok(()).into_future())
@ -57,6 +63,14 @@ pub struct MyState {
pub app_name: String,
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl Job<MyState> for MyJob {
fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("{}: args, {:?}", state.app_name, self);
@ -71,6 +85,10 @@ Processors are types that define default attributes for jobs, as well as contain
used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
```rust
use background_jobs::{Backoff, MaxRetries, Processor};
const DEFAULT_QUEUE: &'static str = "default";
#[derive(Clone, Debug)]
pub struct MyProcessor;
@ -104,177 +122,58 @@ impl Processor<MyState> for MyProcessor {
```
#### Running jobs
By default, this crate ships with the `background-jobs-server` feature enabled. This uses the
`background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for
By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the
`background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for
spawning new jobs.
`background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped storage
mechanism, so the jobs information it keeps track of is all stored locally on-disk. In the future,
the storage mechanism may be made generic so implementors can bring their own storage.
`background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and
workers. If you plan to run two or more of these pieces from the same process, look at the
documentation for the methods `new_with_context` and `init_with_context`. It is important that
ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the
same application.
`background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This
can be implemented manually by implementing the `Storage` trait from `background-jobs-core`,
or the `background-jobs-sled-storage` crate can be used to provide a
[Sled](https://github.com/spacejam/sled)-backed jobs store.
With that out of the way, back to the examples:
##### Starting the job server
##### Main
```rust
use background_jobs::ServerConfig;
use actix::System;
use background_jobs::{ServerConfig, SledStorage, WorkerConfig};
use failure::Error;
use server_jobs_example::queue_set;
fn main() -> Result<(), Error> {
// Run our job server
tokio::run(ServerConfig::init(
1,
"127.0.0.1",
5555,
queue_set(),
"example-db",
));
// First set up the Actix System to ensure we have a runtime to spawn jobs on.
let sys = System::new("my-actix-system");
// Set up our Storage
let db = Db::start_default("my-sled-db")?;
let storage = SledStorage::new(db)?;
// Start the application server. This guards access to to the jobs store
let queue_handle = ServerConfig::new(storage).start();
// Configure and start our workers
let mut worker_config = WorkerConfig::new(move || MyState::new("My App"));
worker_config.register(MyProcessor);
worker_config.set_processor_count(DEFAULT_QUEUE, 16);
worker_config.start(queue_handle.clone());
// Queue our jobs
queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
// Block on Actix
sys.run()?;
Ok(())
}
```
##### Starting the job worker
```rust
use background_jobs::WorkerConfig;
use failure::Error;
use server_jobs_example::{queue_map, MyProcessor};
fn main() -> Result<(), Error> {
// Create the worker config
let mut worker = WorkerConfig::new(
MyState {
app_name: "My Example Application".to_owned(),
},
"localhost".to_owned(),
5555,
queue_map()
);
// Register our processor
worker.register_processor(MyProcessor);
// Spin up the workers
tokio::run(worker.run());
Ok(())
}
```
##### Queuing jobs
```rust
use background_jobs::SpawnerConfig;
use futures::{future::lazy, Future};
use server_jobs_example::{MyJob, MyProcessor};
fn main() {
// Create 50 new jobs, each with two consecutive values of the fibonacci sequence
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
acc.push(MyJob::new(x, y));
(y, x + y, acc)
});
// Create the spawner
let spawner = SpawnerConfig::new("localhost", 5555);
// Queue each job
tokio::run(lazy(move || {
for job in jobs {
tokio::spawn(spawner.queue::<MyProcessor>(job).map_err(|_| ()));
}
Ok(())
}));
}
```
##### Queuing jobs from a synchronous application
```rust
use background_jobs::SpawnerConfig;
use failure::Error;
use server_jobs_example::{MyJob, MyProcessor};
fn main() -> Result<(), Error> {
// Create 50 new jobs, each with two consecutive values of the fibonacci sequence
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
acc.push(MyJob::new(x, y));
(y, x + y, acc)
});
// Create the spawner
let spawner = SpawnerConfig::new("localhost", 5555);
// Queue each job
for job in jobs {
spawner.queue_sync::<MyProcessor, _>(job)?
}
}
```
##### Complete Example
For the complete example project, see [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example)
#### Using on Windows
`background-jobs-server` depends by default on [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which
only works on unix (and unix-like) systems. This might mean it works on the Windows Subsystem for Linux,
but it's untested and hard to say. You can override this behavior by specifying the following in your
Cargo.toml
```toml
[Dependencies.background-jobs]
version = "0.4"
default-features = false
features = ["no_unix"]
```
[`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement for
tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled by default is
that it's slower than tokio-zmq, and in all likelihood, the production environment for projects
depending on this one will be linux.
#### Actix
Another implementation of a jobs processor is also provided by this library under a feature flag.
```toml
[dependencies.background-jobs]
version = "0.4"
default-features = false
features = ["actix"]
```
This provides an in-process implementation of a jobs server and worker setup. Here's some example usage.
```rust
use background_jobs::{Processor, ServerConfig, WorkerConfig};
let sys = actix::System::new("my-actix-thing");
let queue_handle = ServerConfig::new(1, db_path.into()).start::<MyState>();
let state = MyState {
queue_handle: queue_handle.clone(),
};
let mut worker_config = WorkerConfig::new(state);
WorkerConfig::register(&mut worker_config, FetchProcessor);
WorkerConfig::register(&mut worker_config, InstanceProcessor);
WorkerConfig::register(&mut worker_config, OpenProcessor);
WorkerConfig::set_processor_count(
&mut worker_config,
<InstanceProcessor as Processor<MyState>>::QUEUE,
16,
);
WorkerConfig::start(worker_config, queue_handle.clone());
let _ = sys.run();
```
For the complete example project, see [the examples folder](https://git.asonix.dog/Aardwolf/background-jobs/src/branch/master/examples/actix-example)
#### Bringing your own server/worker implementation
If you want to create your own jobs processor based on this idea, you can depend on the
`background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some
other useful types for implementing a jobs processor.
`background-jobs-core` crate, which provides the Processor and Job traits, as well as some
other useful types for implementing a jobs processor and job store.
### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the GPLv3.

1
examples/actix-example/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/my-sled-db

View file

@ -0,0 +1,16 @@
[package]
name = "actix-example"
version = "0.1.0"
authors = ["asonix <asonix@asonix.dog>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix = "0.8"
background-jobs = { version = "0.5", path = "../.." }
failure = "0.1"
futures = "0.1"
serde = "1.0"
serde_derive = "1.0"
sled = "0.24"

View file

@ -0,0 +1,96 @@
use actix::System;
use background_jobs::{ServerConfig, SledStorage, WorkerConfig, Processor, Job, Backoff, MaxRetries};
use failure::Error;
use futures::{Future, future::ok};
use serde_derive::{Deserialize, Serialize};
use sled::Db;
const DEFAULT_QUEUE: &'static str = "default";
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MyJob {
some_usize: usize,
other_usize: usize,
}
#[derive(Clone, Debug)]
pub struct MyProcessor;
fn main() -> Result<(), Error> {
let sys = System::new("my-actix-system");
let db = Db::start_default("my-sled-db")?;
let storage = SledStorage::new(db)?;
let queue_handle = ServerConfig::new(storage).start();
let mut worker_config = WorkerConfig::new(move || MyState::new("My App"));
worker_config.register(MyProcessor);
worker_config.set_processor_count(DEFAULT_QUEUE, 16);
worker_config.start(queue_handle.clone());
queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
sys.run()?;
Ok(())
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl MyJob {
pub fn new(some_usize: usize, other_usize: usize) -> Self {
MyJob {
some_usize,
other_usize,
}
}
}
impl Job<MyState> for MyJob {
fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
println!("{}: args, {:?}", state.app_name, self);
Box::new(ok(()))
}
}
impl Processor<MyState> for MyProcessor {
// The kind of job this processor should execute
type Job = MyJob;
// The name of the processor. It is super important that each processor has a unique name,
// because otherwise one processor will overwrite another processor when they're being
// registered.
const NAME: &'static str = "MyProcessor";
// The queue that this processor belongs to
//
// Workers have the option to subscribe to specific queues, so this is important to
// determine which worker will call the processor
//
// Jobs can optionally override the queue they're spawned on
const QUEUE: &'static str = DEFAULT_QUEUE;
// The number of times background-jobs should try to retry a job before giving up
//
// Jobs can optionally override this value
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
// The logic to determine how often to retry this job if it fails
//
// Jobs can optionally override this value
const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
}

View file

@ -28,16 +28,24 @@
//! #### Add Background Jobs to your project
//! ```toml
//! [dependencies]
//! background-jobs = "0.4"
//! actix = "0.8"
//! background-jobs = "0.5"
//! failure = "0.1"
//! futures = "0.1"
//! tokio = "0.1"
//! serde = "1.0"
//! serde_drive = "1.0"
//! sled = "0.24"
//! ```
//!
//! #### To get started with Background Jobs, first you should define a job.
//! Jobs are a combination of the data required to perform an operation, and the logic of that
//! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
//!
//! ```rust,ignore
//! use background_jobs::Job;
//! use serde_derive::{Deserialize, Serialize};
//! use failure::Error;
//!
//! #[derive(Clone, Debug, Deserialize, Serialize)]
//! pub struct MyJob {
//! some_usize: usize,
@ -55,7 +63,7 @@
//!
//! impl Job for MyJob {
//! fn run(self, _: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
//! info!("args: {:?}", self);
//! println!("args: {:?}", self);
//!
//! Box::new(Ok(()).into_future())
//! }
@ -70,11 +78,20 @@
//! Let's re-define the job to care about some application state.
//!
//! ```rust,ignore
//! # use failure::Error;
//! #[derive(Clone, Debug)]
//! pub struct MyState {
//! pub app_name: String,
//! }
//!
//! impl MyState {
//! pub fn new(app_name: &str) -> Self {
//! MyState {
//! app_name: app_name.to_owned(),
//! }
//! }
//! }
//!
//! impl Job<MyState> for MyJob {
//! fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
//! info!("{}: args, {:?}", state.app_name, self);
@ -89,6 +106,10 @@
//! used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
//!
//! ```rust,ignore
//! use background_jobs::{Backoff, MaxRetries, Processor};
//!
//! const DEFAULT_QUEUE: &'static str = "default";
//!
//! #[derive(Clone, Debug)]
//! pub struct MyProcessor;
//!
@ -99,7 +120,7 @@
//! // The name of the processor. It is super important that each processor has a unique name,
//! // because otherwise one processor will overwrite another processor when they're being
//! // registered.
//! const NAME: &'static str = "IncrementProcessor";
//! const NAME: &'static str = "MyProcessor";
//!
//! // The queue that this processor belongs to
//! //
@ -107,7 +128,7 @@
//! // determine which worker will call the processor
//! //
//! // Jobs can optionally override the queue they're spawned on
//! const QUEUE: &'static str = "default";
//! const QUEUE: &'static str = DEFAULT_QUEUE;
//!
//! // The number of times background-jobs should try to retry a job before giving up
//! //
@ -122,156 +143,59 @@
//! ```
//!
//! #### Running jobs
//! By default, this crate ships with the `background-jobs-server` feature enabled. This uses the
//! `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for
//! By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the
//! `background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for
//! spawning new jobs.
//!
//! `background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped
//! storage mechanism, so the jobs information it keeps track of is all stored locally on-disk. In
//! the future, the storage mechanism may be made generic so implementors can bring their own
//! storage.
//!
//! `background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and
//! workers. If you plan to run two or more of these pieces from the same process, look at the
//! documentation for the methods `new_with_context` and `init_with_context`. It is important that
//! ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the
//! same application.
//! `background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This
//! can be implemented manually by implementing the `Storage` trait from `background-jobs-core`,
//! or the `background-jobs-sled-storage` crate can be used to provide a
//! [Sled](https://github.com/spacejam/sled)-backed jobs store.
//!
//! With that out of the way, back to the examples:
//!
//! ##### Starting the job server
//! ##### Main
//! ```rust,ignore
//! use background_jobs::ServerConfig;
//! use actix::System;
//! use background_jobs::{ServerConfig, SledStorage, WorkerConfig};
//! use failure::Error;
//! use server_jobs_example::queue_set;
//!
//! fn main() -> Result<(), Error> {
//! // Run our job server
//! tokio::run(ServerConfig::init(
//! "127.0.0.1",
//! 5555,
//! 1,
//! queue_set(),
//! "example-db",
//! ));
//! // First set up the Actix System to ensure we have a runtime to spawn jobs on.
//! let sys = System::new("my-actix-system");
//!
//! // Set up our Storage
//! let db = Db::start_default("my-sled-db")?;
//! let storage = SledStorage::new(db)?;
//!
//! // Start the application server. This guards access to to the jobs store
//! let queue_handle = ServerConfig::new(storage).start();
//!
//! // Configure and start our workers
//! let mut worker_config = WorkerConfig::new(move || MyState::new("My App"));
//! worker_config.register(MyProcessor);
//! worker_config.set_processor_count(DEFAULT_QUEUE, 16);
//! worker_config.start(queue_handle.clone());
//!
//! // Queue our jobs
//! queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
//! queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
//! queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
//!
//! // Block on Actix
//! sys.run()?;
//! Ok(())
//! }
//! ```
//! ##### Starting the job worker
//! ```rust,ignore
//! use background_jobs::WorkerConfig;
//! use failure::Error;
//! use server_jobs_example::{queue_map, MyProcessor};
//!
//! fn main() -> Result<(), Error> {
//! // Create the worker config
//! let mut worker = WorkerConfig::new(
//! MyState {
//! app_name: "My Example Application".to_owned(),
//! },
//! "localhost".to_owned(),
//! 5555,
//! queue_map()
//! );
//!
//! // Register our processor
//! worker.register_processor(MyProcessor);
//!
//! // Spin up the workers
//! tokio::run(worker.run());
//!
//! Ok(())
//! }
//! ```
//! ##### Queuing jobs
//! ```rust,ignore
//! use background_jobs::SpawnerConfig;
//! use futures::{future::lazy, Future};
//! use server_jobs_example::{MyJob, MyProcessor};
//!
//! fn main() {
//! // Create 50 new jobs, each with two consecutive values of the fibonacci sequence
//! let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
//! acc.push(MyJob::new(x, y));
//!
//! (y, x + y, acc)
//! });
//!
//! // Create the spawner
//! let spawner = SpawnerConfig::new("localhost", 5555);
//!
//! // Queue each job
//! tokio::run(lazy(move || {
//! for job in jobs {
//! tokio::spawn(spawner.queue::<MyProcessor, _>(job).map_err(|_| ()));
//! }
//!
//! Ok(())
//! }));
//! }
//! ```
//!
//! ##### Complete Example
//! For the complete example project, see
//! [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example)
//!
//! #### Using on Windows
//! `background-jobs-server` depends by default on
//! [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which only works on unix (and unix-like)
//! systems. This might mean it works on the Windows Subsystem for Linux, but it's untested and
//! hard to say. You can override this behavior by specifying the following in your Cargo.toml
//! ```toml
//! [Dependencies.background-jobs]
//! version = "0.4"
//! default-features = false
//! features = ["no_unix"]
//! ```
//!
//! [`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement
//! for tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled
//! by default is that it's slower than tokio-zmq, and in all likelihood, the production
//! environment for projects depending on this one will be linux.
//!
//! #### Actix
//! Another implementation of a jobs processor is also provided by this library under a feature flag.
//! ```toml
//! [dependencies.background-jobs]
//! version = "0.4"
//! default-features = false
//! features = ["actix"]
//! ```
//!
//! This provides an in-process implementation of a jobs server and worker setup. Here's some example usage.
//! ```rust,ignore
//! use background_jobs::{Processor, ServerConfig, WorkerConfig};
//!
//! let sys = actix::System::new("my-actix-thing");
//!
//! let queue_handle = ServerConfig::new(1, db_path.into()).start::<MyState>();
//!
//! let state = MyState {
//! queue_handle: queue_handle.clone(),
//! };
//!
//! let mut worker_config = WorkerConfig::new(state);
//! WorkerConfig::register(&mut worker_config, FetchProcessor);
//! WorkerConfig::register(&mut worker_config, InstanceProcessor);
//! WorkerConfig::register(&mut worker_config, OpenProcessor);
//! WorkerConfig::set_processor_count(
//! &mut worker_config,
//! <InstanceProcessor as Processor<MyState>>::QUEUE,
//! 16,
//! );
//! WorkerConfig::start(worker_config, queue_handle.clone());
//!
//! let _ = sys.run();
//! ```
//! [the examples folder](https://git.asonix.dog/Aardwolf/background-jobs/src/branch/master/examples/actix-example)
//!
//! #### Bringing your own server/worker implementation
//! If you want to create your own jobs processor based on this idea, you can depend on the
//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some
//! other useful types for implementing a jobs processor.
//! `background-jobs-core` crate, which provides the Processor and Job traits, as well as some
//! other useful types for implementing a jobs processor and job store.
pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats};