From 58f794dc55695a485d2bea1a1c196812be50c5b4 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 18 Nov 2018 08:43:10 -0600 Subject: [PATCH] Update docs, use associated constants, add queue_sync --- Cargo.toml | 6 +- README.md | 75 +++++++++++++++---- examples/server-jobs-example/Cargo.toml | 2 +- .../src/bin/sync_spawner.rs | 41 ++++++++++ examples/server-jobs-example/src/lib.rs | 19 +---- jobs-core/Cargo.toml | 2 +- jobs-core/src/job_info.rs | 4 +- jobs-core/src/processor.rs | 42 ++++------- jobs-core/src/processor_map.rs | 12 +-- jobs-server/Cargo.toml | 4 +- jobs-server/src/server/mod.rs | 2 +- jobs-server/src/spawner.rs | 27 ++++++- jobs-server/src/worker/mod.rs | 4 +- src/lib.rs | 57 ++++++++++---- 14 files changed, 208 insertions(+), 89 deletions(-) create mode 100644 examples/server-jobs-example/src/bin/sync_spawner.rs diff --git a/Cargo.toml b/Cargo.toml index 7122f32..4b63468 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with tokio and futures" -version = "0.1.1" +version = "0.2.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,10 +20,10 @@ members = [ default = ["background-jobs-server", "background-jobs-server/tokio-zmq"] [dependencies.background-jobs-core] -version = "0.1" +version = "0.2" path = "jobs-core" [dependencies.background-jobs-server] -version = "0.1" +version = "0.2" path = "jobs-server" optional = true diff --git a/README.md b/README.md index ee8a1f6..12b7afd 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ might not be the best experience. #### Add Background Jobs to your project ```toml [dependencies] -background-jobs = "0.1" +background-jobs = "0.2" failure = "0.1" futures = "0.1" tokio = "0.1" @@ -59,9 +59,7 @@ impl Processor for MyProcessor { // The name of the processor. It is super important that each processor has a unique name, // because otherwise one processor will overwrite another processor when they're being // registered. - fn name() -> &'static str { - "MyProcessor" - } + const NAME: &'static str = "MyProcessor"; // The queue that this processor belongs to // @@ -69,23 +67,17 @@ impl Processor for MyProcessor { // determine which worker will call the processor // // Jobs can optionally override the queue they're spawned on - fn queue() -> &'static str { - DEFAULT_QUEUE - } + const QUEUE: &'static str = DEFAULT_QUEUE; // The number of times background-jobs should try to retry a job before giving up // // Jobs can optionally override this value - fn max_retries() -> MaxRetries { - MaxRetries::Count(1) - } + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); // The logic to determine how often to retry this job if it fails // // Jobs can optionally override this value - fn backoff_strategy() -> Backoff { - Backoff::Exponential(2) - } + const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); } ``` @@ -94,6 +86,18 @@ By default, this crate ships with the `background-jobs-server` feature enabled. `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for spawning new jobs. +`background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped storage +mechanism, so the jobs information it keeps track of is all stored locally on-disk. In the future, +the storage mechanism may be made generic so implementors can bring their own storage. + +`background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and +workers. If you plan to run two or more of these pieces from the same process, look at the +documentation for the methods `new_with_context` and `init_with_context`. It is important that +ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the +same application. + +With that out of the way, back to the examples: + ##### Starting the job server ```rust use background_jobs::ServerConfig; @@ -159,8 +163,51 @@ fn main() { })); } ``` +##### Queuing jobs from a synchronous application +```rust +use background_jobs::SpawnerConfig; +use failure::Error; +use server_jobs_example::{MyJob, MyProcessor}; -#### Not using a ZeroMQ based client/server model +fn main() -> Result<(), Error> { + // Create 50 new jobs, each with two consecutive values of the fibonacci sequence + let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { + acc.push(MyJob::new(x, y)); + + (y, x + y, acc) + }); + + // Create the spawner + let spawner = SpawnerConfig::new("localhost", 5555); + + // Queue each job + for job in jobs { + spawner.queue_sync::(job)? + } +} +``` + +##### Complete Example +For the complete example project, see [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example) + +#### Using on Windows +`background-jobs-server` depends by default on [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which +only works on unix (and unix-like) systems. This might mean it works on the Windows Subsystem for Linux, +but it's untested and hard to say. You can override this behavior by specifying the following in your +Cargo.toml +```toml +[Dependencies.background-jobs] +version = "0.1" +default-features = false +features = ["background-jobs-server", "background-jobs-server/futures-zmq"] +``` + +[`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement for +tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled by default is +that it's slower than tokio-zmq, and in all likelihood, the production environment for projects +depending on this one will be linux. + +#### Not using a ZeroMQ+LMDB based client/server model If you want to create your own jobs processor based on this idea, you can depend on the `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some other useful types for implementing a jobs processor. diff --git a/examples/server-jobs-example/Cargo.toml b/examples/server-jobs-example/Cargo.toml index 15bf15a..36cba9a 100644 --- a/examples/server-jobs-example/Cargo.toml +++ b/examples/server-jobs-example/Cargo.toml @@ -15,7 +15,7 @@ serde_derive = "1.0" tokio = "0.1" [dependencies.background-jobs] -version = "0.1" +version = "0.2" path = "../.." default-features = false features = ["background-jobs-server"] diff --git a/examples/server-jobs-example/src/bin/sync_spawner.rs b/examples/server-jobs-example/src/bin/sync_spawner.rs new file mode 100644 index 0000000..1860735 --- /dev/null +++ b/examples/server-jobs-example/src/bin/sync_spawner.rs @@ -0,0 +1,41 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +use background_jobs::SpawnerConfig; +use failure::Error; +use server_jobs_example::{MyJob, MyProcessor}; + +fn main() -> Result<(), Error> { + dotenv::dotenv().ok(); + env_logger::init(); + + let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { + acc.push(MyJob::new(x, y)); + + (y, x + y, acc) + }); + + let spawner = SpawnerConfig::new("localhost", 5555); + + for job in jobs { + spawner.queue_sync::(job)?; + } + + Ok(()) +} diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs index 6a42bc2..9ef218e 100644 --- a/examples/server-jobs-example/src/lib.rs +++ b/examples/server-jobs-example/src/lib.rs @@ -70,19 +70,8 @@ pub struct MyProcessor; impl Processor for MyProcessor { type Job = MyJob; - fn name() -> &'static str { - "MyProcessor" - } - - fn queue() -> &'static str { - DEFAULT_QUEUE - } - - fn max_retries() -> MaxRetries { - MaxRetries::Count(1) - } - - fn backoff_strategy() -> Backoff { - Backoff::Exponential(2) - } + const NAME: &'static str = "MyProcessor"; + const QUEUE: &'static str = DEFAULT_QUEUE; + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); } diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 9732bf9..c780d3c 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.1.1" +version = "0.2.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index d757fdb..72d017d 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -27,8 +27,8 @@ use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; /// /// Although exposed publically, this type should only really be handled by the library itself, and /// is impossible to create outside of a -/// [Processor](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Processor)'s new_job -/// method. +/// [Processor](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html)'s +/// new_job method. pub struct JobInfo { /// ID of the job, None means an ID has not been set id: Option, diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 8fd6a32..2d8c9bb 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -33,10 +33,11 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// - The job's default queue /// - The job's default maximum number of retries /// - The job's [backoff -/// strategy](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Backoff) +/// strategy](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Backoff.html) /// /// Processors also provide the default mechanism for running a job, and the only mechanism for -/// creating a [JobInfo](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.JobInfo), +/// creating a +/// [JobInfo](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.JobInfo.html), /// which is the type required for queuing jobs to be executed. /// /// ### Example @@ -67,21 +68,10 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// impl Processor for MyProcessor { /// type Job = MyJob; /// -/// fn name() -> &'static str { -/// "IncrementProcessor" -/// } -/// -/// fn queue() -> &'static str { -/// "default" -/// } -/// -/// fn max_retries() -> MaxRetries { -/// MaxRetries::Count(1) -/// } -/// -/// fn backoff_strategy() -> Backoff { -/// Backoff::Exponential(2) -/// } +/// const NAME: &'static str = "IncrementProcessor"; +/// const QUEUE: &'static str = "default"; +/// const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); +/// const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); /// } /// /// fn main() -> Result<(), Error> { @@ -96,35 +86,35 @@ pub trait Processor: Clone { /// The name of the processor /// /// This name must be unique!!! It is used to look up which processor should handle a job - fn name() -> &'static str; + const NAME: &'static str; /// The name of the default queue for jobs created with this processor /// /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, /// the job will never be processed. - fn queue() -> &'static str; + const QUEUE: &'static str; /// Define the default number of retries for a given processor /// /// Jobs can override - fn max_retries() -> MaxRetries; + const MAX_RETRIES: MaxRetries; /// Define the default backoff strategy for a given processor /// /// Jobs can override - fn backoff_strategy() -> Backoff; + const BACKOFF_STRATEGY: Backoff; /// A provided method to create a new JobInfo from provided arguments /// /// This is required for spawning jobs, since it enforces the relationship between the job and /// the Processor that should handle it. fn new_job(job: Self::Job) -> Result { - let queue = job.queue().unwrap_or(Self::queue()).to_owned(); - let max_retries = job.max_retries().unwrap_or(Self::max_retries()); - let backoff_strategy = job.backoff_strategy().unwrap_or(Self::backoff_strategy()); + let queue = job.queue().unwrap_or(Self::QUEUE).to_owned(); + let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES); + let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY); let job = JobInfo::new( - Self::name().to_owned(), + Self::NAME.to_owned(), queue, serde_json::to_value(job)?, max_retries, @@ -160,7 +150,7 @@ pub trait Processor: Clone { /// Patterns like this could be useful if you want to use the same job type for multiple /// scenarios. Defining the `process` method for multiple `Processor`s with different /// before/after logic for the same - /// [`Job`](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Job) type is + /// [`Job`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Job.html) type is /// supported. fn process(&self, args: Value) -> Box + Send> { let res = serde_json::from_value::(args); diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 42670fb..1a63c39 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -27,17 +27,17 @@ use crate::{JobError, JobInfo, Processor}; /// A generic function that processes a job /// /// Instead of storing -/// [`Processor`](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Processor) type +/// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) type /// directly, the -/// [`ProcessorMap`](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.ProcessorMap) +/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.ProcessorMap.html) /// struct stores these `ProcessFn` types that don't expose differences in Job types. pub type ProcessFn = Box Box + Send> + Send + Sync>; /// A type for storing the relationships between processor names and the processor itself /// -/// [`Processor`s](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Processor) must be -/// registered with the `ProcessorMap` in the initialization phase of an application before +/// [`Processor`s](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) must +/// be registered with the `ProcessorMap` in the initialization phase of an application before /// workers are spawned in order to handle queued jobs. pub struct ProcessorMap { inner: HashMap, @@ -50,7 +50,7 @@ impl ProcessorMap { } /// Register a - /// [`Processor`](https://docs.rs/background-jobs/0.1.1/background_jobs/struct.Processor) with + /// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) with /// this `ProcessorMap`. /// /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so @@ -60,7 +60,7 @@ impl ProcessorMap { P: Processor + Send + Sync + 'static, { self.inner.insert( - P::name().to_owned(), + P::NAME.to_owned(), Box::new(move |value| processor.process(value)), ); } diff --git a/jobs-server/Cargo.toml b/jobs-server/Cargo.toml index bcddced..dd6fce7 100644 --- a/jobs-server/Cargo.toml +++ b/jobs-server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-server" description = "Jobs processor server based on ZeroMQ" -version = "0.1.0" +version = "0.2.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -22,7 +22,7 @@ zmq = "0.8" default = ["tokio-zmq"] [dependencies.background-jobs-core] -version = "0.1" +version = "0.2" path = "../jobs-core" [dependencies.tokio-zmq] diff --git a/jobs-server/src/server/mod.rs b/jobs-server/src/server/mod.rs index f17f81e..d35a2ac 100644 --- a/jobs-server/src/server/mod.rs +++ b/jobs-server/src/server/mod.rs @@ -123,7 +123,7 @@ struct MissingQueue(String); /// /// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but /// it does not provide functionality to execute jobs. For that, you must create a -/// [`Worker`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.WorkerConfig) +/// [`Worker`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.WorkerConfig.html) /// that will connect to the running server. /// /// This type doesn't have any associated data, but is used as a proxy for starting the diff --git a/jobs-server/src/spawner.rs b/jobs-server/src/spawner.rs index a4fed54..0ddbefe 100644 --- a/jobs-server/src/spawner.rs +++ b/jobs-server/src/spawner.rs @@ -80,7 +80,7 @@ impl SpawnerConfig { { let msg = P::new_job(job) .map_err(Error::from) - .and_then(|job| serde_json::to_string(&job).map_err(Error::from)) + .and_then(|job_info| serde_json::to_string(&job_info).map_err(Error::from)) .and_then(|s| { Message::from_slice(s.as_ref()) .map(|m| m.into()) @@ -100,4 +100,29 @@ impl SpawnerConfig { push.send(msg).from_err().map(|_| trace!("sent")) }) } + + /// `queue_sync` is the same as Queue, except that it blocks the current thread while it is + /// sending the message to the jobs server. + /// + /// If you have a tokio-based application, you should use `queue` instead. + pub fn queue_sync

(&self, job: P::Job) -> Result<(), Error> + where + P: Processor, + { + use zmq::PUSH; + + let job_info = P::new_job(job)?; + + let msg_string = serde_json::to_string(&job_info)?; + + let msg = Message::from_slice(msg_string.as_ref())?; + + let socket = self.ctx.socket(PUSH)?; + socket.connect(&self.server)?; + trace!("Sending"); + socket.send_msg(msg, 0)?; + trace!("Sent"); + + Ok(()) + } } diff --git a/jobs-server/src/worker/mod.rs b/jobs-server/src/worker/mod.rs index b096742..a9199c9 100644 --- a/jobs-server/src/worker/mod.rs +++ b/jobs-server/src/worker/mod.rs @@ -34,7 +34,7 @@ use self::{config::Worker, portmap::PortMap}; /// /// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects /// to a server (crated with -/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.ServerConfig)) +/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.ServerConfig.html)) /// and receives work from there. /// /// ```rust @@ -107,7 +107,7 @@ impl WorkerConfig { /// Register a processor with this worker /// /// For more information, see - /// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor). + /// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Processor.html). pub fn register_processor

(&mut self, processor: P) where P: Processor + Send + Sync + 'static, diff --git a/src/lib.rs b/src/lib.rs index 1584275..72edab6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ //! #### Add Background Jobs to your project //! ```toml //! [dependencies] -//! background-jobs = "0.1" +//! background-jobs = "0.2" //! failure = "0.1" //! futures = "0.1" //! tokio = "0.1" @@ -77,9 +77,7 @@ //! // The name of the processor. It is super important that each processor has a unique name, //! // because otherwise one processor will overwrite another processor when they're being //! // registered. -//! fn name() -> &'static str { -//! "MyProcessor" -//! } +//! const NAME: &'static str = "IncrementProcessor"; //! //! // The queue that this processor belongs to //! // @@ -87,23 +85,17 @@ //! // determine which worker will call the processor //! // //! // Jobs can optionally override the queue they're spawned on -//! fn queue() -> &'static str { -//! DEFAULT_QUEUE -//! } +//! const QUEUE: &'static str = "default"; //! //! // The number of times background-jobs should try to retry a job before giving up //! // //! // Jobs can optionally override this value -//! fn max_retries() -> MaxRetries { -//! MaxRetries::Count(1) -//! } +//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); //! //! // The logic to determine how often to retry this job if it fails //! // //! // Jobs can optionally override this value -//! fn backoff_strategy() -> Backoff { -//! Backoff::Exponential(2) -//! } +//! const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); //! } //! ``` //! @@ -112,6 +104,19 @@ //! `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for //! spawning new jobs. //! +//! `background-jobs-server` uses LMDB to keep track of local state. LMDB is a memory-mapped +//! storage mechanism, so the jobs information it keeps track of is all stored locally on-disk. In +//! the future, the storage mechanism may be made generic so implementors can bring their own +//! storage. +//! +//! `background-jobs-server` also uses ZeroMQ to transfer data between the spawner, server, and +//! workers. If you plan to run two or more of these pieces from the same process, look at the +//! documentation for the methods `new_with_context` and `init_with_context`. It is important that +//! ZeroMQ contexts are shared when possible to avoid spinning up multiple ZeroMQ instances for the +//! same application. +//! +//! With that out of the way, back to the examples: +//! //! ##### Starting the job server //! ```rust,ignore //! use background_jobs::ServerConfig; @@ -178,9 +183,31 @@ //! } //! ``` //! +//! ##### Complete Example +//! For the complete example project, see +//! [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/master/examples/server-jobs-example) +//! +//! #### Using on Windows +//! `background-jobs-server` depends by default on +//! [`tokio-zmq`](https://crates.io/crates/tokio-zmq), which only works on unix (and unix-like) +//! systems. This might mean it works on the Windows Subsystem for Linux, but it's untested and +//! hard to say. You can override this behavior by specifying the following in your Cargo.toml +//! ```toml +//! [Dependencies.background-jobs] +//! version = "0.1" +//! default-features = false +//! features = ["background-jobs-server", "background-jobs-server/futures-zmq"] +//! ``` +//! +//! [`futures-zmq`](https://crates.io/crates/futures-zmq) Is designed to be a drop-in replacement +//! for tokio-zmq that works on non-unix and non-tokio platforms. The reason why it isn't enabled +//! by default is that it's slower than tokio-zmq, and in all likelihood, the production +//! environment for projects depending on this one will be linux. +//! +//! #### Not using a ZeroMQ+LMDB based client/server model //! If you want to create your own jobs processor based on this idea, you can depend on the -//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some -//! other useful types for implementing a jobs processor. +//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as +//! well as some other useful types for implementing a jobs processor. pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor};