From cf6ede67ea1cb3c2d2e794751e5b5e4f4585407e Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 17 Nov 2018 19:39:04 -0600 Subject: [PATCH] Add documentation, license info, Make API cleaner --- README.md | 180 +++++++++++++++- TODO | 2 + .../server-jobs-example/src/bin/server.rs | 19 ++ .../server-jobs-example/src/bin/spawner.rs | 31 ++- .../server-jobs-example/src/bin/worker.rs | 19 ++ examples/server-jobs-example/src/lib.rs | 54 +++-- jobs-core/src/job.rs | 56 +++++ jobs-core/src/job_info.rs | 31 ++- jobs-core/src/lib.rs | 38 +++- jobs-core/src/processor.rs | 203 +++++++++++------- jobs-core/src/processor_map.rs | 112 ++++++++++ jobs-core/src/processors.rs | 70 ------ jobs-core/src/storage.rs | 26 +++ jobs-server/src/lib.rs | 19 ++ jobs-server/src/server/mod.rs | 72 +++++++ jobs-server/src/server/portmap.rs | 19 ++ jobs-server/src/server/pull.rs | 19 ++ jobs-server/src/server/push.rs | 19 ++ jobs-server/src/server/stalled.rs | 19 ++ jobs-server/src/spawner.rs | 60 +++++- jobs-server/src/worker/config.rs | 31 ++- jobs-server/src/worker/mod.rs | 77 ++++++- jobs-server/src/worker/portmap.rs | 19 ++ src/lib.rs | 188 +++++++++++++++- 24 files changed, 1192 insertions(+), 191 deletions(-) create mode 100644 jobs-core/src/job.rs create mode 100644 jobs-core/src/processor_map.rs delete mode 100644 jobs-core/src/processors.rs diff --git a/README.md b/README.md index a895783..ee8a1f6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,179 @@ -# background-jobs +# Background Jobs -a job processor for Rust based on Futures \ No newline at end of file +This crate provides tooling required to run some processes asynchronously from a usually +synchronous application. The standard example of this is Web Services, where certain things +need to be processed, but processing them while a user is waiting for their browser to respond +might not be the best experience. + +### Usage +#### Add Background Jobs to your project +```toml +[dependencies] +background-jobs = "0.1" +failure = "0.1" +futures = "0.1" +tokio = "0.1" +``` + +#### To get started with Background Jobs, first you should define a job. +Jobs are a combination of the data required to perform an operation, and the logic of that +operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. + +```rust +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +impl Job for MyJob { + fn run(self) -> Box + Send> { + info!("args: {:?}", self); + + Box::new(Ok(()).into_future()) + } +} +``` + +#### Next, define a Processor. +Processors are types that define default attributes for jobs, as well as containing some logic +used internally to perform the job. Processors must implement `Proccessor` and `Clone`. + +```rust +#[derive(Clone, Debug)] +pub struct MyProcessor; + +impl Processor for MyProcessor { + // The kind of job this processor should execute + type Job = MyJob; + + // The name of the processor. It is super important that each processor has a unique name, + // because otherwise one processor will overwrite another processor when they're being + // registered. + fn name() -> &'static str { + "MyProcessor" + } + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + fn queue() -> &'static str { + DEFAULT_QUEUE + } + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + fn max_retries() -> MaxRetries { + MaxRetries::Count(1) + } + + // The logic to determine how often to retry this job if it fails + // + // Jobs can optionally override this value + fn backoff_strategy() -> Backoff { + Backoff::Exponential(2) + } +} +``` + +#### Running jobs +By default, this crate ships with the `background-jobs-server` feature enabled. This uses the +`background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for +spawning new jobs. + +##### Starting the job server +```rust +use background_jobs::ServerConfig; +use failure::Error; +use server_jobs_example::queue_set; + +fn main() -> Result<(), Error> { + // Run our job server + tokio::run(ServerConfig::init( + "127.0.0.1", + 5555, + 1, + queue_set(), + "example-db", + )); + + Ok(()) +} +``` +##### Starting the job worker +```rust +use background_jobs::WorkerConfig; +use failure::Error; +use server_jobs_example::{queue_map, MyProcessor}; + +fn main() -> Result<(), Error> { + // Create the worker config + let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); + + // Register our processor + worker.register_processor(MyProcessor); + + // Spin up the workers + tokio::run(worker.run()); + + Ok(()) +} +``` +##### Queuing jobs +```rust +use background_jobs::SpawnerConfig; +use futures::{future::lazy, Future}; +use server_jobs_example::{MyJob, MyProcessor}; + +fn main() { + // Create 50 new jobs, each with two consecutive values of the fibonacci sequence + let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { + acc.push(MyJob::new(x, y)); + + (y, x + y, acc) + }); + + // Create the spawner + let spawner = SpawnerConfig::new("localhost", 5555); + + // Queue each job + tokio::run(lazy(move || { + for job in jobs { + tokio::spawn(spawner.queue::(job).map_err(|_| ())); + } + + Ok(()) + })); +} +``` + +#### Not using a ZeroMQ based client/server model +If you want to create your own jobs processor based on this idea, you can depend on the +`background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some +other useful types for implementing a jobs processor. + +### Contributing +Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the GPLv3. + +### License + +Copyright © 2018 Riley Trautman + +Background Jobs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. + +Background Jobs is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. This file is part of Background Jobs. + +You should have received a copy of the GNU General Public License along with Background Jobs. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/). diff --git a/TODO b/TODO index 0ca4c53..dd7b2f3 100644 --- a/TODO +++ b/TODO @@ -1,2 +1,4 @@ 1. Gracefull Shutdown +2. +Periodically check staged jobs diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs index 544e40f..62b76b3 100644 --- a/examples/server-jobs-example/src/bin/server.rs +++ b/examples/server-jobs-example/src/bin/server.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use background_jobs::ServerConfig; use failure::Error; use server_jobs_example::queue_set; diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs index 5846ed6..24dbb90 100644 --- a/examples/server-jobs-example/src/bin/spawner.rs +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -1,13 +1,32 @@ -use background_jobs::{Processor, SpawnerConfig}; +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +use background_jobs::SpawnerConfig; use futures::{future::lazy, Future}; -use server_jobs_example::{MyJobArguments, MyProcessor}; +use server_jobs_example::{MyJob, MyProcessor}; fn main() { dotenv::dotenv().ok(); env_logger::init(); let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { - acc.push(MyJobArguments::new(x, y)); + acc.push(MyJob::new(x, y)); (y, x + y, acc) }); @@ -16,11 +35,7 @@ fn main() { tokio::run(lazy(move || { for job in jobs { - tokio::spawn( - spawner - .queue(MyProcessor::new_job(job, None, None).unwrap()) - .map_err(|_| ()), - ); + tokio::spawn(spawner.queue::(job).map_err(|_| ())); } Ok(()) diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs index ab6cfe4..16c05be 100644 --- a/examples/server-jobs-example/src/bin/worker.rs +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use background_jobs::WorkerConfig; use failure::Error; use server_jobs_example::{queue_map, MyProcessor}; diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs index 828e397..6a42bc2 100644 --- a/examples/server-jobs-example/src/lib.rs +++ b/examples/server-jobs-example/src/lib.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + #[macro_use] extern crate log; #[macro_use] @@ -5,51 +24,58 @@ extern crate serde_derive; use std::collections::{BTreeMap, BTreeSet}; -use background_jobs::{Backoff, MaxRetries, Processor}; +use background_jobs::{Backoff, Job, MaxRetries, Processor}; use failure::Error; use futures::{future::IntoFuture, Future}; +const DEFAULT_QUEUE: &'static str = "default"; + pub fn queue_map() -> BTreeMap { let mut map = BTreeMap::new(); - map.insert("default".to_owned(), 18); + map.insert(DEFAULT_QUEUE.to_owned(), 18); map } pub fn queue_set() -> BTreeSet { - let mut set = BTreeSet::new(); - set.insert("default".to_owned()); - - set + queue_map().keys().cloned().collect() } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct MyJobArguments { +pub struct MyJob { some_usize: usize, other_usize: usize, } -impl MyJobArguments { +impl MyJob { pub fn new(some_usize: usize, other_usize: usize) -> Self { - MyJobArguments { + MyJob { some_usize, other_usize, } } } +impl Job for MyJob { + fn run(self) -> Box + Send> { + info!("args: {:?}", self); + + Box::new(Ok(()).into_future()) + } +} + #[derive(Clone, Debug)] pub struct MyProcessor; impl Processor for MyProcessor { - type Arguments = MyJobArguments; + type Job = MyJob; fn name() -> &'static str { "MyProcessor" } fn queue() -> &'static str { - "default" + DEFAULT_QUEUE } fn max_retries() -> MaxRetries { @@ -59,10 +85,4 @@ impl Processor for MyProcessor { fn backoff_strategy() -> Backoff { Backoff::Exponential(2) } - - fn process(&self, args: Self::Arguments) -> Box + Send> { - info!("args: {:?}", args); - - Box::new(Ok(()).into_future()) - } } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs new file mode 100644 index 0000000..ec8dda9 --- /dev/null +++ b/jobs-core/src/job.rs @@ -0,0 +1,56 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +use failure::Error; +use futures::Future; +use serde::{de::DeserializeOwned, ser::Serialize}; + +use crate::{Backoff, MaxRetries}; + +/// The Job trait defines parameters pertaining to an instance of background job +pub trait Job: Serialize + DeserializeOwned { + /// Users of this library must define what it means to run a job. + /// + /// This should contain all the logic needed to complete a job. If that means queuing more + /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy + /// processes, that logic should all be called from inside this method. + fn run(self) -> Box + Send>; + + /// If this job should not use the default queue for its processor, this can be overridden in + /// user-code. + /// + /// Jobs will only be processed by processors that are registered, and if a queue is supplied + /// here that is not associated with a valid processor for this job, it will never be + /// processed. + fn queue(&self) -> Option<&str> { + None + } + + /// If this job should not use the default maximum retry count for its processor, this can be + /// overridden in user-code. + fn max_retries(&self) -> Option { + None + } + + /// If this job should not use the default backoff strategy for its processor, this can be + /// overridden in user-code. + fn backoff_strategy(&self) -> Option { + None + } +} diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 4b53a85..b071eb1 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,9 +1,34 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use chrono::{offset::Utc, DateTime, Duration as OldDuration}; use serde_json::Value; use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +/// Metadata pertaining to a job that exists within the background_jobs system +/// +/// Although exposed publically, this type should only really be handled by the library itself, and +/// is impossible to create outside of a +/// [Processor](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor)'s new_job +/// method. pub struct JobInfo { /// ID of the job, None means an ID has not been set id: Option, @@ -126,7 +151,11 @@ impl JobInfo { self.status = JobStatus::Staged; } - pub fn run(&mut self) { + /// This method sets the Job's status to running + /// + /// Touching this outside of the background_jobs crates is dangerous, since these libraries + /// rely on the state of the job being correct. + pub fn set_running(&mut self) { self.status = JobStatus::Running; } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 775229d..cb152d8 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + #[macro_use] extern crate failure; #[macro_use] @@ -7,27 +26,35 @@ extern crate serde_derive; use failure::Error; +mod job; mod job_info; mod processor; -mod processors; +mod processor_map; mod storage; pub use crate::{ - job_info::JobInfo, processor::Processor, processors::Processors, storage::Storage, + job::Job, job_info::JobInfo, processor::Processor, processor_map::ProcessorMap, + storage::Storage, }; #[derive(Debug, Fail)] +/// The error type returned by a `Processor`'s `process` method pub enum JobError { + /// Some error occurred while processing the job #[fail(display = "Error performing job: {}", _0)] Processing(#[cause] Error), + + /// Creating a `Job` type from the provided `serde_json::Value` failed #[fail(display = "Could not make JSON value from arguments")] Json, + + /// No processor was present to handle a given job #[fail(display = "No processor available for job")] MissingProcessor, } -/// Set the status of a job when storing it #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +/// Set the status of a job when storing it pub enum JobStatus { /// Job should be queued Pending, @@ -79,12 +106,17 @@ impl MaxRetries { } #[derive(Clone, Debug, Eq, PartialEq)] +/// A type that represents whether a job should be requeued pub enum ShouldStop { + /// The job has hit the maximum allowed number of retries, and should be failed permanently LimitReached, + + /// The job is allowed to be put back into the job queue Requeue, } impl ShouldStop { + /// A boolean representation of this state pub fn should_requeue(&self) -> bool { *self == ShouldStop::Requeue } diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index b7344bb..f0ba396 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -1,28 +1,107 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use failure::Error; use futures::{ future::{Either, IntoFuture}, Future, }; -use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value; -use crate::{Backoff, JobError, JobInfo, MaxRetries}; +use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; -/// The Processor trait +/// ## The Processor trait /// -/// Processors define the logic for executing jobs +/// Processors define the logic spawning jobs such as +/// - The job's name +/// - The job's default queue +/// - The job's default maximum number of retries +/// - The job's [backoff +/// strategy](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Backoff) +/// +/// Processors also provide the default mechanism for running a job, and the only mechanism for +/// creating a [JobInfo](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.JobInfo), +/// which is the type required for queuing jobs to be executed. +/// +/// ### Example +/// +/// ```rust +/// use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; +/// use failure::Error; +/// use futures::future::{Future, IntoFuture}; +/// use log::info; +/// use serde_derive::{Deserialize, Serialize}; +/// +/// #[derive(Deserialize, Serialize)] +/// struct MyJob { +/// count: i32, +/// } +/// +/// impl Job for MyJob { +/// fn run(self) -> Box + Send> { +/// info!("Processing {}", self.count); +/// +/// Box::new(Ok(()).into_future()) +/// } +/// } +/// +/// #[derive(Clone)] +/// struct MyProcessor; +/// +/// impl Processor for MyProcessor { +/// type Job = MyJob; +/// +/// fn name() -> &'static str { +/// "IncrementProcessor" +/// } +/// +/// fn queue() -> &'static str { +/// "default" +/// } +/// +/// fn max_retries() -> MaxRetries { +/// MaxRetries::Count(1) +/// } +/// +/// fn backoff_strategy() -> Backoff { +/// Backoff::Exponential(2) +/// } +/// } +/// +/// fn main() -> Result<(), Error> { +/// let job = MyProcessor::new_job(MyJob { count: 1234 })?; +/// +/// Ok(()) +/// } +/// ``` pub trait Processor: Clone { - type Arguments: Serialize + DeserializeOwned; + type Job: Job; /// The name of the processor /// /// This name must be unique!!! It is used to look up which processor should handle a job fn name() -> &'static str; - /// The name of the queue + /// The name of the default queue for jobs created with this processor /// - /// The queue determines which workers should process which jobs. By default, all workers - /// process all jobs, but that can be configured when starting the workers + /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, + /// the job will never be processed. fn queue() -> &'static str; /// Define the default number of retries for a given processor @@ -35,83 +114,59 @@ pub trait Processor: Clone { /// Jobs can override fn backoff_strategy() -> Backoff; - /// Defines how jobs for this processor are processed + /// A provided method to create a new JobInfo from provided arguments /// - /// Please do not perform blocking operations in the process method except if put behind - /// tokio's `blocking` abstraction - fn process(&self, args: Self::Arguments) -> Box + Send>; + /// This is required for spawning jobs, since it enforces the relationship between the job and + /// the Processor that should handle it. + fn new_job(job: Self::Job) -> Result { + let queue = job.queue().unwrap_or(Self::queue()).to_owned(); + let max_retries = job.max_retries().unwrap_or(Self::max_retries()); + let backoff_strategy = job.backoff_strategy().unwrap_or(Self::backoff_strategy()); - /// A provided method to create a new Job from provided arguments - /// - /// ### Example - /// - /// ```rust - /// #[macro_use] - /// extern crate log; - /// - /// use jobs::{Processor, MaxRetries}; - /// use failure::Error; - /// use futures::future::{Future, IntoFuture}; - /// - /// struct MyProcessor; - /// - /// impl Processor for MyProcessor { - /// type Arguments = i32; - /// - /// fn name() -> &'static str { - /// "IncrementProcessor" - /// } - /// - /// fn queue() -> &'static str { - /// "default" - /// } - /// - /// fn max_retries() -> MaxRetries { - /// MaxRetries::Count(1) - /// } - /// - /// fn backoff_strategy() -> Backoff { - /// Backoff::Exponential(2) - /// } - /// - /// fn process( - /// &self, - /// args: Self::Arguments, - /// ) -> Box + Send> { - /// info!("Processing {}", args); - /// - /// Box::new(Ok(()).into_future()) - /// } - /// } - /// - /// fn main() -> Result<(), Error> { - /// let job = MyProcessor::new_job(1234, None)?; - /// - /// Ok(()) - /// } - /// ``` - fn new_job( - args: Self::Arguments, - max_retries: Option, - backoff_strategy: Option, - ) -> Result { let job = JobInfo::new( Self::name().to_owned(), - Self::queue().to_owned(), - serde_json::to_value(args)?, - max_retries.unwrap_or(Self::max_retries()), - backoff_strategy.unwrap_or(Self::backoff_strategy()), + queue, + serde_json::to_value(job)?, + max_retries, + backoff_strategy, ); Ok(job) } - /// A provided method to coerce arguments into the expected type - fn do_processing(&self, args: Value) -> Box + Send> { - let res = serde_json::from_value::(args); + /// A provided method to coerce arguments into the expected type and run the job + /// + /// Advanced users may want to override this method in order to provide their own custom + /// before/after logic for certain job processors + /// + /// ```rust,ignore + /// fn process(&self, args: Value) -> Box + Send> { + /// let res = serde_json::from_value::(args); + /// + /// let fut = match res { + /// Ok(job) => { + /// // Perform some custom pre-job logic + /// Either::A(job.run().map_err(JobError::Processing)) + /// }, + /// Err(_) => Either::B(Err(JobError::Json).into_future()), + /// }; + /// + /// Box::new(fut.and_then(|_| { + /// // Perform some custom post-job logic + /// })) + /// } + /// ``` + /// + /// Patterns like this could be useful if you want to use the same job type for multiple + /// scenarios. Defining the `process` method for multiple `Processor`s with different + /// before/after logic for the same + /// [`Job`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Job) type is + /// supported. + fn process(&self, args: Value) -> Box + Send> { + let res = serde_json::from_value::(args); let fut = match res { - Ok(item) => Either::A(self.process(item).map_err(JobError::Processing)), + Ok(job) => Either::A(job.run().map_err(JobError::Processing)), Err(_) => Either::B(Err(JobError::Json).into_future()), }; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs new file mode 100644 index 0000000..5e10ada --- /dev/null +++ b/jobs-core/src/processor_map.rs @@ -0,0 +1,112 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +use std::collections::HashMap; + +use futures::future::{Either, Future, IntoFuture}; +use serde_json::Value; + +use crate::{JobError, JobInfo, Processor}; + +/// A generic function that processes a job +/// +/// Instead of storing +/// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) type +/// directly, the +/// [`ProcessorMap`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.ProcessorMap) +/// struct stores these `ProcessFn` types that don't expose differences in Job types. +pub type ProcessFn = + Box Box + Send> + Send + Sync>; + +/// A type for storing the relationships between processor names and the processor itself +/// +/// [`Processor`s](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) must be +/// registered with the `ProcessorMap` in the initialization phase of an application before +/// workers are spawned in order to handle queued jobs. +pub struct ProcessorMap { + inner: HashMap, +} + +impl ProcessorMap { + /// Intialize a `ProcessorMap` + pub fn new() -> Self { + Default::default() + } + + /// Register a + /// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) with + /// this `ProcessorMap`. + /// + /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so + /// make sure to register all your processors up-front. + pub fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + self.inner.insert( + P::name().to_owned(), + Box::new(move |value| processor.process(value)), + ); + } + + /// Process a given job + /// + /// This should not be called from outside implementations of a backgoround-jobs runtime. It is + /// intended for internal use. + pub fn process_job(&self, job: JobInfo) -> impl Future { + let opt = self + .inner + .get(job.processor()) + .map(|processor| process(processor, job.clone())); + + if let Some(fut) = opt { + Either::A(fut) + } else { + error!("Processor {} not present", job.processor()); + Either::B(Ok(job).into_future()) + } + } +} + +impl Default for ProcessorMap { + fn default() -> Self { + ProcessorMap { + inner: Default::default(), + } + } +} + +fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { + let args = job.args(); + + let processor = job.processor().to_owned(); + + process_fn(args).then(move |res| match res { + Ok(_) => { + info!("Job completed, {}", processor); + job.pass(); + Ok(job) + } + Err(e) => { + error!("Job errored, {}, {}", processor, e); + job.fail(); + Ok(job) + } + }) +} diff --git a/jobs-core/src/processors.rs b/jobs-core/src/processors.rs deleted file mode 100644 index c055976..0000000 --- a/jobs-core/src/processors.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::collections::HashMap; - -use futures::future::{Either, Future, IntoFuture}; -use serde_json::Value; - -use crate::{JobError, JobInfo, Processor}; - -pub type ProcessFn = - Box Box + Send> + Send + Sync>; - -pub struct Processors { - inner: HashMap, -} - -impl Processors { - pub fn new() -> Self { - Default::default() - } - - pub fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + Sync + 'static, - { - self.inner.insert( - P::name().to_owned(), - Box::new(move |value| processor.do_processing(value)), - ); - } - - pub fn process_job(&self, job: JobInfo) -> impl Future { - let opt = self - .inner - .get(job.processor()) - .map(|processor| process(processor, job.clone())); - - if let Some(fut) = opt { - Either::A(fut) - } else { - error!("Processor {} not present", job.processor()); - Either::B(Ok(job).into_future()) - } - } -} - -impl Default for Processors { - fn default() -> Self { - Processors { - inner: Default::default(), - } - } -} - -fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { - let args = job.args(); - - let processor = job.processor().to_owned(); - - process_fn(args).then(move |res| match res { - Ok(_) => { - info!("Job completed, {}", processor); - job.pass(); - Ok(job) - } - Err(e) => { - error!("Job errored, {}, {}", processor, e); - job.fail(); - Ok(job) - } - }) -} diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 1557def..f4d2aae 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, @@ -34,6 +53,13 @@ impl<'a> Buckets<'a> { } #[derive(Clone)] +/// All the logic to interact with the persisted data is defined on this type. +/// +/// Perhapse in the future this will be made generic, but for now it is hard-coded to use LMDB to +/// store job information. +/// +/// None of the methods in this module are intended to be used outside of a background-jobs +/// runtime. pub struct Storage { runner_id: usize, store: Arc>, diff --git a/jobs-server/src/lib.rs b/jobs-server/src/lib.rs index f7664f9..970a615 100644 --- a/jobs-server/src/lib.rs +++ b/jobs-server/src/lib.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use failure::Error; mod server; diff --git a/jobs-server/src/server/mod.rs b/jobs-server/src/server/mod.rs index e0f3e0e..f17f81e 100644 --- a/jobs-server/src/server/mod.rs +++ b/jobs-server/src/server/mod.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{ collections::BTreeSet, path::{Path, PathBuf}, @@ -100,9 +119,55 @@ impl Config { #[fail(display = "Queue is missing from map, {}", _0)] struct MissingQueue(String); +/// The entry point for creating a background-jobs server +/// +/// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but +/// it does not provide functionality to execute jobs. For that, you must create a +/// [`Worker`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.WorkerConfig) +/// that will connect to the running server. +/// +/// This type doesn't have any associated data, but is used as a proxy for starting the +/// background-jobs runtime. +/// +/// ```rust +/// use std::collections::BTreeSet; +/// use background_jobs_server::ServerConfig; +/// use failure::Error; +/// +/// fn main() -> Result<(), Error> { +/// let mut queue_set = BTreeSet::new(); +/// queue_set.insert("default".to_owned()); +/// +/// let start_server = ServerConfig::init( +/// "127.0.0.1", +/// 5555, +/// 1, +/// queue_set, +/// "example-db", +/// ); +/// +/// # let _ = start_server; +/// // Comment out the start so we don't run the full server in doctests +/// // tokio::run(start_server) +/// +/// Ok(()) +/// } +/// ``` pub struct ServerConfig; impl ServerConfig { + /// Create a new background-jobs Server that binds to the provided `ip` with ports starting at + /// `base_port`. + /// + /// The smallest background-jobs server will bind to 3 ports. Each port serves a different + /// purpose: + /// - `base_port` is the port that jobs are sent to the server on + /// - `base_port` + 1 is the port that the server uses to advertise which queues are available + /// - `base_port` + n is bound for an individual queue of jobs that the server pushes to + /// workers. + /// + /// This method returns a future that, when run, spawns all of the server's required futures + /// onto tokio. Therefore, this can only be used from tokio. pub fn init>( ip: &str, base_port: usize, @@ -115,6 +180,13 @@ impl ServerConfig { Self::init_with_context(ip, base_port, runner_id, queues, db_path, context) } + /// The same as `ServerConfig::init()`, but with a provided ZeroMQ Context. + /// + /// This can be useful if you have other uses of ZeroMQ in your application, and want to share + /// a context with your dependencies. + /// + /// If you're running the Server, Worker, and Spawner in the same application, you should share + /// a ZeroMQ context between them. pub fn init_with_context>( ip: &str, base_port: usize, diff --git a/jobs-server/src/server/portmap.rs b/jobs-server/src/server/portmap.rs index bd4845a..5f5fe28 100644 --- a/jobs-server/src/server/portmap.rs +++ b/jobs-server/src/server/portmap.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{collections::BTreeMap, sync::Arc, time::Duration}; use failure::Error; diff --git a/jobs-server/src/server/pull.rs b/jobs-server/src/server/pull.rs index 7c4eb07..9dd9ae4 100644 --- a/jobs-server/src/server/pull.rs +++ b/jobs-server/src/server/pull.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{sync::Arc, time::Duration}; use background_jobs_core::{JobInfo, Storage}; diff --git a/jobs-server/src/server/push.rs b/jobs-server/src/server/push.rs index 7c4e8f9..fc4f4f9 100644 --- a/jobs-server/src/server/push.rs +++ b/jobs-server/src/server/push.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{sync::Arc, time::Duration}; use background_jobs_core::{JobInfo, Storage}; diff --git a/jobs-server/src/server/stalled.rs b/jobs-server/src/server/stalled.rs index 11af649..8cb0791 100644 --- a/jobs-server/src/server/stalled.rs +++ b/jobs-server/src/server/stalled.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{sync::Arc, time::Duration}; use background_jobs_core::Storage; diff --git a/jobs-server/src/spawner.rs b/jobs-server/src/spawner.rs index 8b01e1c..a4fed54 100644 --- a/jobs-server/src/spawner.rs +++ b/jobs-server/src/spawner.rs @@ -1,6 +1,25 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::sync::Arc; -use background_jobs_core::JobInfo; +use background_jobs_core::Processor; use failure::Error; use futures::{future::IntoFuture, Future}; #[cfg(feature = "futures-zmq")] @@ -10,24 +29,58 @@ use log::{debug, trace}; use tokio_zmq::{prelude::*, Push}; use zmq::{Context, Message}; +/// SpawnerConfig is the only part of this library required to actually exist in your application. +/// +/// This type is used to queue new jobs into the `background-jobs` server. +/// +/// ```rust,ignore +/// let spawner = SpawnerConfig::new("localhost", 5555); +/// +/// tokio::spawn( +/// spawner +/// .queue::(job) +/// .map_err(|_| ()), +/// ); +/// ``` pub struct SpawnerConfig { server: String, ctx: Arc, } impl SpawnerConfig { + /// Create a `SpawnerConfig` + /// + /// - `server_host` is the hostname or IP address of the host that the server is running on + /// - `base_port` is the same `base_port` from the server config. The spawner will only ever + /// need to communicate over `base_port` pub fn new(server_host: &str, base_port: usize) -> Self { let ctx = Arc::new(Context::new()); + Self::new_with_context(server_host, base_port, ctx) + } + + /// The same as `SpawnerConfig::new()`, but with a provided ZeroMQ Context. + /// + /// This can be useful if you have other uses of ZeroMQ in your application, and want to share + /// a context with your dependencies. + /// + /// If you're running the Server, Worker, and Spawner in the same application, you should share + /// a ZeroMQ context between them. + pub fn new_with_context(server_host: &str, base_port: usize, ctx: Arc) -> Self { SpawnerConfig { server: format!("tcp://{}:{}", server_host, base_port), ctx, } } - pub fn queue(&self, job: JobInfo) -> impl Future { - let msg = serde_json::to_string(&job) + /// Queue a job to be executed in the background + pub fn queue

(&self, job: P::Job) -> impl Future + where + P: Processor, + { + let msg = P::new_job(job) .map_err(Error::from) + .and_then(|job| serde_json::to_string(&job).map_err(Error::from)) .and_then(|s| { Message::from_slice(s.as_ref()) .map(|m| m.into()) @@ -40,7 +93,6 @@ impl SpawnerConfig { Push::builder(self.ctx.clone()) .connect(&self.server) .build() - .into_future() .from_err() .join(msg) .and_then(move |(push, msg)| { diff --git a/jobs-server/src/worker/config.rs b/jobs-server/src/worker/config.rs index 5c3b362..c41e6a9 100644 --- a/jobs-server/src/worker/config.rs +++ b/jobs-server/src/worker/config.rs @@ -1,6 +1,25 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{sync::Arc, time::Duration}; -use background_jobs_core::{JobInfo, Processors}; +use background_jobs_core::{JobInfo, ProcessorMap}; use failure::{Error, Fail}; use futures::{ sync::mpsc::{channel, Sender}, @@ -21,7 +40,7 @@ pub(crate) struct Worker { push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc, context: Arc, } @@ -30,7 +49,7 @@ impl Worker { push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc, context: Arc, ) -> impl Future { let cfg = ResetWorker { @@ -103,7 +122,7 @@ struct ResetWorker { push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc, context: Arc, } @@ -168,7 +187,7 @@ fn report_running( mut job: JobInfo, push: Sender, ) -> impl Future { - job.run(); + job.set_running(); push.send(job.clone()) .map(move |_| job) @@ -177,7 +196,7 @@ fn report_running( fn process_job( job: JobInfo, - processors: &Processors, + processors: &ProcessorMap, ) -> impl Future { processors .process_job(job.clone()) diff --git a/jobs-server/src/worker/mod.rs b/jobs-server/src/worker/mod.rs index 4deb4d4..b096742 100644 --- a/jobs-server/src/worker/mod.rs +++ b/jobs-server/src/worker/mod.rs @@ -1,6 +1,25 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{collections::BTreeMap, sync::Arc}; -use background_jobs_core::{Processor, Processors}; +use background_jobs_core::{Processor, ProcessorMap}; use failure::Fail; use futures::Future; use log::{error, info}; @@ -11,8 +30,35 @@ mod portmap; use self::{config::Worker, portmap::PortMap}; +/// The entry point for creating a background-jobs worker. +/// +/// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects +/// to a server (crated with +/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.ServerConfig)) +/// and receives work from there. +/// +/// ```rust +/// use std::collections::BTreeMap; +/// use background_jobs_server::WorkerConfig; +/// use failure::Error; +/// +/// fn main() -> Result<(), Error> { +/// let mut queue_map = BTreeMap::new(); +/// queue_map.insert("default".to_owned(), 10); +/// +/// let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map); +/// +/// // Register a processor +/// // worker.register_processor(MyProcessor); +/// +/// // Run the workers +/// // tokio::run(worker.run()); +/// +/// Ok(()) +/// } +/// ``` pub struct WorkerConfig { - processors: Processors, + processors: ProcessorMap, queues: BTreeMap, server_host: String, base_port: usize, @@ -20,12 +66,29 @@ pub struct WorkerConfig { } impl WorkerConfig { + /// Create a new worker + /// + /// This method takes three arguments + /// - `server_host` is the hostname, or IP address, of the background-jobs server. + /// - `base_port` is the same value from the `ServerConfig` initialization. It dictates the + /// port the worker uses to return jobs to the server. The worker is guaranteed to connect + /// to at least 2 other ports on the server when functioning properly, `base_port` + 1, and + /// `base_port` + n. + /// - queues is a mapping between the name of a queue, and the number of workers that should + /// be started to process jobs in that queue. pub fn new(server_host: String, base_port: usize, queues: BTreeMap) -> Self { let context = Arc::new(Context::new()); Self::new_with_context(server_host, base_port, queues, context) } + /// The same as `WorkerConfig::new()`, but with a provided ZeroMQ Context. + /// + /// This can be useful if you have other uses of ZeroMQ in your application, and want to share + /// a context with your dependencies. + /// + /// If you're running the Server, Worker, and Spawner in the same application, you should share + /// a ZeroMQ context between them. pub fn new_with_context( server_host: String, base_port: usize, @@ -33,7 +96,7 @@ impl WorkerConfig { context: Arc, ) -> Self { WorkerConfig { - processors: Processors::new(), + processors: ProcessorMap::new(), server_host, base_port, queues, @@ -41,6 +104,10 @@ impl WorkerConfig { } } + /// Register a processor with this worker + /// + /// For more information, see + /// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor). pub fn register_processor

(&mut self, processor: P) where P: Processor + Send + Sync + 'static, @@ -48,6 +115,10 @@ impl WorkerConfig { self.processors.register_processor(processor); } + /// Start the workers + /// + /// This method returns a future that, when run, spawns all of the worker's required futures + /// onto tokio. Therefore, this can only be used from tokio. pub fn run(self) -> Box + Send> { let WorkerConfig { processors, diff --git a/jobs-server/src/worker/portmap.rs b/jobs-server/src/worker/portmap.rs index 7019791..be95726 100644 --- a/jobs-server/src/worker/portmap.rs +++ b/jobs-server/src/worker/portmap.rs @@ -1,3 +1,22 @@ +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + use std::{collections::BTreeMap, sync::Arc}; use failure::{Error, Fail}; diff --git a/src/lib.rs b/src/lib.rs index 8c3ceb6..1584275 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,188 @@ -pub use background_jobs_core::{ - Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, ShouldStop, Storage, -}; +/* + * This file is part of Background Jobs. + * + * Copyright © 2018 Riley Trautman + * + * Background Jobs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Background Jobs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Background Jobs. If not, see . + */ + +//! # Background Jobs +//! +//! This crate provides tooling required to run some processes asynchronously from a usually +//! synchronous application. The standard example of this is Web Services, where certain things +//! need to be processed, but processing them while a user is waiting for their browser to respond +//! might not be the best experience. +//! +//! ### Usage +//! #### Add Background Jobs to your project +//! ```toml +//! [dependencies] +//! background-jobs = "0.1" +//! failure = "0.1" +//! futures = "0.1" +//! tokio = "0.1" +//! ``` +//! #### To get started with Background Jobs, first you should define a job. +//! Jobs are a combination of the data required to perform an operation, and the logic of that +//! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. +//! +//! ```rust,ignore +//! #[derive(Clone, Debug, Deserialize, Serialize)] +//! pub struct MyJob { +//! some_usize: usize, +//! other_usize: usize, +//! } +//! +//! impl MyJob { +//! pub fn new(some_usize: usize, other_usize: usize) -> Self { +//! MyJob { +//! some_usize, +//! other_usize, +//! } +//! } +//! } +//! +//! impl Job for MyJob { +//! fn run(self) -> Box + Send> { +//! info!("args: {:?}", self); +//! +//! Box::new(Ok(()).into_future()) +//! } +//! } +//! ``` +//! +//! #### Next, define a Processor. +//! Processors are types that define default attributes for jobs, as well as containing some logic +//! used internally to perform the job. Processors must implement `Proccessor` and `Clone`. +//! +//! ```rust,ignore +//! #[derive(Clone, Debug)] +//! pub struct MyProcessor; +//! +//! impl Processor for MyProcessor { +//! // The kind of job this processor should execute +//! type Job = MyJob; +//! +//! // The name of the processor. It is super important that each processor has a unique name, +//! // because otherwise one processor will overwrite another processor when they're being +//! // registered. +//! fn name() -> &'static str { +//! "MyProcessor" +//! } +//! +//! // The queue that this processor belongs to +//! // +//! // Workers have the option to subscribe to specific queues, so this is important to +//! // determine which worker will call the processor +//! // +//! // Jobs can optionally override the queue they're spawned on +//! fn queue() -> &'static str { +//! DEFAULT_QUEUE +//! } +//! +//! // The number of times background-jobs should try to retry a job before giving up +//! // +//! // Jobs can optionally override this value +//! fn max_retries() -> MaxRetries { +//! MaxRetries::Count(1) +//! } +//! +//! // The logic to determine how often to retry this job if it fails +//! // +//! // Jobs can optionally override this value +//! fn backoff_strategy() -> Backoff { +//! Backoff::Exponential(2) +//! } +//! } +//! ``` +//! +//! #### Running jobs +//! By default, this crate ships with the `background-jobs-server` feature enabled. This uses the +//! `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for +//! spawning new jobs. +//! +//! ##### Starting the job server +//! ```rust,ignore +//! use background_jobs::ServerConfig; +//! use failure::Error; +//! use server_jobs_example::queue_set; +//! +//! fn main() -> Result<(), Error> { +//! // Run our job server +//! tokio::run(ServerConfig::init( +//! "127.0.0.1", +//! 5555, +//! 1, +//! queue_set(), +//! "example-db", +//! )); +//! +//! Ok(()) +//! } +//! ``` +//! ##### Starting the job worker +//! ```rust,ignore +//! use background_jobs::WorkerConfig; +//! use failure::Error; +//! use server_jobs_example::{queue_map, MyProcessor}; +//! +//! fn main() -> Result<(), Error> { +//! // Create the worker config +//! let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); +//! +//! // Register our processor +//! worker.register_processor(MyProcessor); +//! +//! // Spin up the workers +//! tokio::run(worker.run()); +//! +//! Ok(()) +//! } +//! ``` +//! ##### Queuing jobs +//! ```rust,ignore +//! use background_jobs::SpawnerConfig; +//! use futures::{future::lazy, Future}; +//! use server_jobs_example::{MyJob, MyProcessor}; +//! +//! fn main() { +//! // Create 50 new jobs, each with two consecutive values of the fibonacci sequence +//! let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { +//! acc.push(MyJob::new(x, y)); +//! +//! (y, x + y, acc) +//! }); +//! +//! // Create the spawner +//! let spawner = SpawnerConfig::new("localhost", 5555); +//! +//! // Queue each job +//! tokio::run(lazy(move || { +//! for job in jobs { +//! tokio::spawn(spawner.queue::(job).map_err(|_| ())); +//! } +//! +//! Ok(()) +//! })); +//! } +//! ``` +//! +//! If you want to create your own jobs processor based on this idea, you can depend on the +//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some +//! other useful types for implementing a jobs processor. + +pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; #[cfg(feature = "background-jobs-server")] pub use background_jobs_server::{ServerConfig, SpawnerConfig, WorkerConfig};