diff --git a/Cargo.toml b/Cargo.toml index 050eafa..57d1d13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "jobs-actix", "jobs-core", "jobs-sled", + "examples/actix-example", ] [features] diff --git a/README.md b/README.md index f3a00c9..111e2e2 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,13 @@ might not be the best experience. #### Add Background Jobs to your project ```toml [dependencies] -background-jobs = "0.4" +actix = "0.8" +background-jobs = "0.5" failure = "0.1" futures = "0.1" -tokio = "0.1" +serde = "1.0" +serde_drive = "1.0" +sled = "0.24" ``` #### To get started with Background Jobs, first you should define a job. @@ -20,6 +23,9 @@ Jobs are a combination of the data required to perform an operation, and the log operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. ```rust +use background_jobs::Job; +use serde_derive::{Deserialize, Serialize}; + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct MyJob { some_usize: usize, @@ -36,7 +42,7 @@ impl MyJob { } impl Job for MyJob { - fn run(self) -> Box + Send> { + fn run(self, _: ()) -> Box + Send> { info!("args: {:?}", self); Box::new(Ok(()).into_future()) @@ -57,6 +63,14 @@ pub struct MyState { pub app_name: String, } +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + impl Job for MyJob { fn run(self, state: MyState) -> Box + Send> { info!("{}: args, {:?}", state.app_name, self); @@ -71,6 +85,10 @@ Processors are types that define default attributes for jobs, as well as contain used internally to perform the job. Processors must implement `Proccessor` and `Clone`. ```rust +use background_jobs::{Backoff, MaxRetries, Processor}; + +const DEFAULT_QUEUE: &'static str = "default"; + #[derive(Clone, Debug)] pub struct MyProcessor; @@ -104,177 +122,58 @@ impl Processor for MyProcessor { ``` #### Running jobs -By default, this crate ships with the `background-jobs-server` feature enabled. This uses the -`background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for +By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the +`background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for spawning new jobs. -`background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped storage -mechanism, so the jobs information it keeps track of is all stored locally on-disk. In the future, -the storage mechanism may be made generic so implementors can bring their own storage. - -`background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and -workers. If you plan to run two or more of these pieces from the same process, look at the -documentation for the methods `new_with_context` and `init_with_context`. It is important that -ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the -same application. +`background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This +can be implemented manually by implementing the `Storage` trait from `background-jobs-core`, +or the `background-jobs-sled-storage` crate can be used to provide a +[Sled](https://github.com/spacejam/sled)-backed jobs store. With that out of the way, back to the examples: -##### Starting the job server +##### Main ```rust -use background_jobs::ServerConfig; +use actix::System; +use background_jobs::{ServerConfig, SledStorage, WorkerConfig}; use failure::Error; -use server_jobs_example::queue_set; fn main() -> Result<(), Error> { - // Run our job server - tokio::run(ServerConfig::init( - 1, - "127.0.0.1", - 5555, - queue_set(), - "example-db", - )); + // First set up the Actix System to ensure we have a runtime to spawn jobs on. + let sys = System::new("my-actix-system"); + // Set up our Storage + let db = Db::start_default("my-sled-db")?; + let storage = SledStorage::new(db)?; + + // Start the application server. This guards access to to the jobs store + let queue_handle = ServerConfig::new(storage).start(); + + // Configure and start our workers + let mut worker_config = WorkerConfig::new(move || MyState::new("My App")); + worker_config.register(MyProcessor); + worker_config.set_processor_count(DEFAULT_QUEUE, 16); + worker_config.start(queue_handle.clone()); + + // Queue our jobs + queue_handle.queue::(MyJob::new(1, 2))?; + queue_handle.queue::(MyJob::new(3, 4))?; + queue_handle.queue::(MyJob::new(5, 6))?; + + // Block on Actix + sys.run()?; Ok(()) } ``` -##### Starting the job worker -```rust -use background_jobs::WorkerConfig; -use failure::Error; -use server_jobs_example::{queue_map, MyProcessor}; - -fn main() -> Result<(), Error> { - // Create the worker config - let mut worker = WorkerConfig::new( - MyState { - app_name: "My Example Application".to_owned(), - }, - "localhost".to_owned(), - 5555, - queue_map() - ); - - // Register our processor - worker.register_processor(MyProcessor); - - // Spin up the workers - tokio::run(worker.run()); - - Ok(()) -} -``` -##### Queuing jobs -```rust -use background_jobs::SpawnerConfig; -use futures::{future::lazy, Future}; -use server_jobs_example::{MyJob, MyProcessor}; - -fn main() { - // Create 50 new jobs, each with two consecutive values of the fibonacci sequence - let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { - acc.push(MyJob::new(x, y)); - - (y, x + y, acc) - }); - - // Create the spawner - let spawner = SpawnerConfig::new("localhost", 5555); - - // Queue each job - tokio::run(lazy(move || { - for job in jobs { - tokio::spawn(spawner.queue::(job).map_err(|_| ())); - } - - Ok(()) - })); -} -``` -##### Queuing jobs from a synchronous application -```rust -use background_jobs::SpawnerConfig; -use failure::Error; -use server_jobs_example::{MyJob, MyProcessor}; - -fn main() -> Result<(), Error> { - // Create 50 new jobs, each with two consecutive values of the fibonacci sequence - let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { - acc.push(MyJob::new(x, y)); - - (y, x + y, acc) - }); - - // Create the spawner - let spawner = SpawnerConfig::new("localhost", 5555); - - // Queue each job - for job in jobs { - spawner.queue_sync::(job)? - } -} -``` ##### Complete Example -For the complete example project, see [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example) - -#### Using on Windows -`background-jobs-server` depends by default on [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which -only works on unix (and unix-like) systems. This might mean it works on the Windows Subsystem for Linux, -but it's untested and hard to say. You can override this behavior by specifying the following in your -Cargo.toml -```toml -[Dependencies.background-jobs] -version = "0.4" -default-features = false -features = ["no_unix"] -``` - -[`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement for -tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled by default is -that it's slower than tokio-zmq, and in all likelihood, the production environment for projects -depending on this one will be linux. - -#### Actix -Another implementation of a jobs processor is also provided by this library under a feature flag. -```toml -[dependencies.background-jobs] -version = "0.4" -default-features = false -features = ["actix"] -``` - -This provides an in-process implementation of a jobs server and worker setup. Here's some example usage. -```rust -use background_jobs::{Processor, ServerConfig, WorkerConfig}; - -let sys = actix::System::new("my-actix-thing"); - -let queue_handle = ServerConfig::new(1, db_path.into()).start::(); - -let state = MyState { - queue_handle: queue_handle.clone(), -}; - -let mut worker_config = WorkerConfig::new(state); -WorkerConfig::register(&mut worker_config, FetchProcessor); -WorkerConfig::register(&mut worker_config, InstanceProcessor); -WorkerConfig::register(&mut worker_config, OpenProcessor); -WorkerConfig::set_processor_count( - &mut worker_config, - >::QUEUE, - 16, -); -WorkerConfig::start(worker_config, queue_handle.clone()); - -let _ = sys.run(); -``` +For the complete example project, see [the examples folder](https://git.asonix.dog/Aardwolf/background-jobs/src/branch/master/examples/actix-example) #### Bringing your own server/worker implementation If you want to create your own jobs processor based on this idea, you can depend on the -`background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some -other useful types for implementing a jobs processor. +`background-jobs-core` crate, which provides the Processor and Job traits, as well as some +other useful types for implementing a jobs processor and job store. ### Contributing Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the GPLv3. diff --git a/examples/actix-example/.gitignore b/examples/actix-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/actix-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml new file mode 100644 index 0000000..bcde15d --- /dev/null +++ b/examples/actix-example/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "actix-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix = "0.8" +background-jobs = { version = "0.5", path = "../.." } +failure = "0.1" +futures = "0.1" +serde = "1.0" +serde_derive = "1.0" +sled = "0.24" diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs new file mode 100644 index 0000000..8ae165a --- /dev/null +++ b/examples/actix-example/src/main.rs @@ -0,0 +1,96 @@ +use actix::System; +use background_jobs::{ServerConfig, SledStorage, WorkerConfig, Processor, Job, Backoff, MaxRetries}; +use failure::Error; +use futures::{Future, future::ok}; +use serde_derive::{Deserialize, Serialize}; +use sled::Db; + +const DEFAULT_QUEUE: &'static str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug)] +pub struct MyProcessor; + +fn main() -> Result<(), Error> { + let sys = System::new("my-actix-system"); + + let db = Db::start_default("my-sled-db")?; + let storage = SledStorage::new(db)?; + + let queue_handle = ServerConfig::new(storage).start(); + + let mut worker_config = WorkerConfig::new(move || MyState::new("My App")); + worker_config.register(MyProcessor); + worker_config.set_processor_count(DEFAULT_QUEUE, 16); + worker_config.start(queue_handle.clone()); + + queue_handle.queue::(MyJob::new(1, 2))?; + queue_handle.queue::(MyJob::new(3, 4))?; + queue_handle.queue::(MyJob::new(5, 6))?; + + sys.run()?; + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +impl Job for MyJob { + fn run(self, state: MyState) -> Box + Send> { + println!("{}: args, {:?}", state.app_name, self); + + Box::new(ok(())) + } +} + +impl Processor for MyProcessor { + // The kind of job this processor should execute + type Job = MyJob; + + // The name of the processor. It is super important that each processor has a unique name, + // because otherwise one processor will overwrite another processor when they're being + // registered. + const NAME: &'static str = "MyProcessor"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + // The logic to determine how often to retry this job if it fails + // + // Jobs can optionally override this value + const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); +} diff --git a/src/lib.rs b/src/lib.rs index 5c80fd9..bc54781 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,16 +28,24 @@ //! #### Add Background Jobs to your project //! ```toml //! [dependencies] -//! background-jobs = "0.4" +//! actix = "0.8" +//! background-jobs = "0.5" //! failure = "0.1" //! futures = "0.1" -//! tokio = "0.1" +//! serde = "1.0" +//! serde_drive = "1.0" +//! sled = "0.24" //! ``` +//! //! #### To get started with Background Jobs, first you should define a job. //! Jobs are a combination of the data required to perform an operation, and the logic of that //! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. //! //! ```rust,ignore +//! use background_jobs::Job; +//! use serde_derive::{Deserialize, Serialize}; +//! use failure::Error; +//! //! #[derive(Clone, Debug, Deserialize, Serialize)] //! pub struct MyJob { //! some_usize: usize, @@ -55,7 +63,7 @@ //! //! impl Job for MyJob { //! fn run(self, _: ()) -> Box + Send> { -//! info!("args: {:?}", self); +//! println!("args: {:?}", self); //! //! Box::new(Ok(()).into_future()) //! } @@ -70,11 +78,20 @@ //! Let's re-define the job to care about some application state. //! //! ```rust,ignore +//! # use failure::Error; //! #[derive(Clone, Debug)] //! pub struct MyState { //! pub app_name: String, //! } //! +//! impl MyState { +//! pub fn new(app_name: &str) -> Self { +//! MyState { +//! app_name: app_name.to_owned(), +//! } +//! } +//! } +//! //! impl Job for MyJob { //! fn run(self, state: MyState) -> Box + Send> { //! info!("{}: args, {:?}", state.app_name, self); @@ -89,6 +106,10 @@ //! used internally to perform the job. Processors must implement `Proccessor` and `Clone`. //! //! ```rust,ignore +//! use background_jobs::{Backoff, MaxRetries, Processor}; +//! +//! const DEFAULT_QUEUE: &'static str = "default"; +//! //! #[derive(Clone, Debug)] //! pub struct MyProcessor; //! @@ -99,7 +120,7 @@ //! // The name of the processor. It is super important that each processor has a unique name, //! // because otherwise one processor will overwrite another processor when they're being //! // registered. -//! const NAME: &'static str = "IncrementProcessor"; +//! const NAME: &'static str = "MyProcessor"; //! //! // The queue that this processor belongs to //! // @@ -107,7 +128,7 @@ //! // determine which worker will call the processor //! // //! // Jobs can optionally override the queue they're spawned on -//! const QUEUE: &'static str = "default"; +//! const QUEUE: &'static str = DEFAULT_QUEUE; //! //! // The number of times background-jobs should try to retry a job before giving up //! // @@ -122,156 +143,59 @@ //! ``` //! //! #### Running jobs -//! By default, this crate ships with the `background-jobs-server` feature enabled. This uses the -//! `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for +//! By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the +//! `background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for //! spawning new jobs. //! -//! `background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped -//! storage mechanism, so the jobs information it keeps track of is all stored locally on-disk. In -//! the future, the storage mechanism may be made generic so implementors can bring their own -//! storage. -//! -//! `background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and -//! workers. If you plan to run two or more of these pieces from the same process, look at the -//! documentation for the methods `new_with_context` and `init_with_context`. It is important that -//! ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the -//! same application. +//! `background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This +//! can be implemented manually by implementing the `Storage` trait from `background-jobs-core`, +//! or the `background-jobs-sled-storage` crate can be used to provide a +//! [Sled](https://github.com/spacejam/sled)-backed jobs store. //! //! With that out of the way, back to the examples: //! -//! ##### Starting the job server +//! ##### Main //! ```rust,ignore -//! use background_jobs::ServerConfig; +//! use actix::System; +//! use background_jobs::{ServerConfig, SledStorage, WorkerConfig}; //! use failure::Error; -//! use server_jobs_example::queue_set; //! //! fn main() -> Result<(), Error> { -//! // Run our job server -//! tokio::run(ServerConfig::init( -//! "127.0.0.1", -//! 5555, -//! 1, -//! queue_set(), -//! "example-db", -//! )); +//! // First set up the Actix System to ensure we have a runtime to spawn jobs on. +//! let sys = System::new("my-actix-system"); //! +//! // Set up our Storage +//! let db = Db::start_default("my-sled-db")?; +//! let storage = SledStorage::new(db)?; +//! +//! // Start the application server. This guards access to to the jobs store +//! let queue_handle = ServerConfig::new(storage).start(); +//! +//! // Configure and start our workers +//! let mut worker_config = WorkerConfig::new(move || MyState::new("My App")); +//! worker_config.register(MyProcessor); +//! worker_config.set_processor_count(DEFAULT_QUEUE, 16); +//! worker_config.start(queue_handle.clone()); +//! +//! // Queue our jobs +//! queue_handle.queue::(MyJob::new(1, 2))?; +//! queue_handle.queue::(MyJob::new(3, 4))?; +//! queue_handle.queue::(MyJob::new(5, 6))?; +//! +//! // Block on Actix +//! sys.run()?; //! Ok(()) //! } //! ``` -//! ##### Starting the job worker -//! ```rust,ignore -//! use background_jobs::WorkerConfig; -//! use failure::Error; -//! use server_jobs_example::{queue_map, MyProcessor}; -//! -//! fn main() -> Result<(), Error> { -//! // Create the worker config -//! let mut worker = WorkerConfig::new( -//! MyState { -//! app_name: "My Example Application".to_owned(), -//! }, -//! "localhost".to_owned(), -//! 5555, -//! queue_map() -//! ); -//! -//! // Register our processor -//! worker.register_processor(MyProcessor); -//! -//! // Spin up the workers -//! tokio::run(worker.run()); -//! -//! Ok(()) -//! } -//! ``` -//! ##### Queuing jobs -//! ```rust,ignore -//! use background_jobs::SpawnerConfig; -//! use futures::{future::lazy, Future}; -//! use server_jobs_example::{MyJob, MyProcessor}; -//! -//! fn main() { -//! // Create 50 new jobs, each with two consecutive values of the fibonacci sequence -//! let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { -//! acc.push(MyJob::new(x, y)); -//! -//! (y, x + y, acc) -//! }); -//! -//! // Create the spawner -//! let spawner = SpawnerConfig::new("localhost", 5555); -//! -//! // Queue each job -//! tokio::run(lazy(move || { -//! for job in jobs { -//! tokio::spawn(spawner.queue::(job).map_err(|_| ())); -//! } -//! -//! Ok(()) -//! })); -//! } -//! ``` //! //! ##### Complete Example //! For the complete example project, see -//! [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example) -//! -//! #### Using on Windows -//! `background-jobs-server` depends by default on -//! [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which only works on unix (and unix-like) -//! systems. This might mean it works on the Windows Subsystem for Linux, but it's untested and -//! hard to say. You can override this behavior by specifying the following in your Cargo.toml -//! ```toml -//! [Dependencies.background-jobs] -//! version = "0.4" -//! default-features = false -//! features = ["no_unix"] -//! ``` -//! -//! [`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement -//! for tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled -//! by default is that it's slower than tokio-zmq, and in all likelihood, the production -//! environment for projects depending on this one will be linux. -//! -//! #### Actix -//! Another implementation of a jobs processor is also provided by this library under a feature flag. -//! ```toml -//! [dependencies.background-jobs] -//! version = "0.4" -//! default-features = false -//! features = ["actix"] -//! ``` -//! -//! This provides an in-process implementation of a jobs server and worker setup. Here's some example usage. -//! ```rust,ignore -//! use background_jobs::{Processor, ServerConfig, WorkerConfig}; -//! -//! let sys = actix::System::new("my-actix-thing"); -//! -//! let queue_handle = ServerConfig::new(1, db_path.into()).start::(); -//! -//! let state = MyState { -//! queue_handle: queue_handle.clone(), -//! }; -//! -//! let mut worker_config = WorkerConfig::new(state); -//! WorkerConfig::register(&mut worker_config, FetchProcessor); -//! WorkerConfig::register(&mut worker_config, InstanceProcessor); -//! WorkerConfig::register(&mut worker_config, OpenProcessor); -//! WorkerConfig::set_processor_count( -//! &mut worker_config, -//! >::QUEUE, -//! 16, -//! ); -//! WorkerConfig::start(worker_config, queue_handle.clone()); -//! -//! let _ = sys.run(); -//! ``` +//! [the examples folder](https://git.asonix.dog/Aardwolf/background-jobs/src/branch/master/examples/actix-example) //! //! #### Bringing your own server/worker implementation //! If you want to create your own jobs processor based on this idea, you can depend on the -//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some -//! other useful types for implementing a jobs processor. +//! `background-jobs-core` crate, which provides the Processor and Job traits, as well as some +//! other useful types for implementing a jobs processor and job store. pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats};