Add application state

This commit is contained in:
asonix 2018-11-18 15:05:03 -06:00
parent 58f794dc55
commit 4f2530d485
16 changed files with 319 additions and 72 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs" name = "background-jobs"
description = "Background Jobs implemented with tokio and futures" description = "Background Jobs implemented with tokio and futures"
version = "0.2.0" version = "0.3.0"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs" repository = "https://git.asonix.dog/asonix/background-jobs"
@ -20,10 +20,10 @@ members = [
default = ["background-jobs-server", "background-jobs-server/tokio-zmq"] default = ["background-jobs-server", "background-jobs-server/tokio-zmq"]
[dependencies.background-jobs-core] [dependencies.background-jobs-core]
version = "0.2" version = "0.3"
path = "jobs-core" path = "jobs-core"
[dependencies.background-jobs-server] [dependencies.background-jobs-server]
version = "0.2" version = "0.3"
path = "jobs-server" path = "jobs-server"
optional = true optional = true

View file

@ -44,6 +44,28 @@ impl Job for MyJob {
} }
``` ```
The run method for a job takes an additional argument, which is the state the job expects to
use. The state for all jobs defined in an application must be the same. By default, the state
is an empty tuple, but it's likely you'll want to pass in some Actix address, or something
else.
Let's re-define the job to care about some application state.
```rust,ignore
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
impl Job<MyState> for MyJob {
fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("{}: args, {:?}", state.app_name, self);
Box::new(Ok(()).into_future())
}
}
```
#### Next, define a Processor. #### Next, define a Processor.
Processors are types that define default attributes for jobs, as well as containing some logic Processors are types that define default attributes for jobs, as well as containing some logic
used internally to perform the job. Processors must implement `Proccessor` and `Clone`. used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
@ -52,7 +74,7 @@ used internally to perform the job. Processors must implement `Proccessor` and `
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MyProcessor; pub struct MyProcessor;
impl Processor for MyProcessor { impl Processor<MyState> for MyProcessor {
// The kind of job this processor should execute // The kind of job this processor should execute
type Job = MyJob; type Job = MyJob;
@ -125,7 +147,14 @@ use server_jobs_example::{queue_map, MyProcessor};
fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
// Create the worker config // Create the worker config
let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); let mut worker = WorkerConfig::new(
MyState {
app_name: "My Example Application".to_owned(),
},
"localhost".to_owned(),
5555,
queue_map()
);
// Register our processor // Register our processor
worker.register_processor(MyProcessor); worker.register_processor(MyProcessor);

View file

@ -15,7 +15,7 @@ serde_derive = "1.0"
tokio = "0.1" tokio = "0.1"
[dependencies.background-jobs] [dependencies.background-jobs]
version = "0.2" version = "0.3"
path = "../.." path = "../.."
default-features = false default-features = false
features = ["background-jobs-server"] features = ["background-jobs-server"]

View file

@ -25,7 +25,7 @@ fn main() -> Result<(), Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
env_logger::init(); env_logger::init();
let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); let mut worker = WorkerConfig::new((), "localhost".to_owned(), 5555, queue_map());
worker.register_processor(MyProcessor); worker.register_processor(MyProcessor);

View file

@ -57,7 +57,7 @@ impl MyJob {
} }
impl Job for MyJob { impl Job for MyJob {
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> { fn run(self, _: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
info!("args: {:?}", self); info!("args: {:?}", self);
Box::new(Ok(()).into_future()) Box::new(Ok(()).into_future())

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-core" name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor on tokio" description = "Core types for implementing an asynchronous jobs processor on tokio"
version = "0.2.0" version = "0.3.0"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs" repository = "https://git.asonix.dog/asonix/background-jobs"

View file

@ -24,13 +24,20 @@ use serde::{de::DeserializeOwned, ser::Serialize};
use crate::{Backoff, MaxRetries}; use crate::{Backoff, MaxRetries};
/// The Job trait defines parameters pertaining to an instance of background job /// The Job trait defines parameters pertaining to an instance of background job
pub trait Job: Serialize + DeserializeOwned { pub trait Job<S = ()>: Serialize + DeserializeOwned
where
S: Clone + Send + Sync + 'static,
{
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
/// This should contain all the logic needed to complete a job. If that means queuing more /// This should contain all the logic needed to complete a job. If that means queuing more
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
/// processes, that logic should all be called from inside this method. /// processes, that logic should all be called from inside this method.
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send>; ///
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: S) -> Box<dyn Future<Item = (), Error = Error> + Send>;
/// If this job should not use the default queue for its processor, this can be overridden in /// If this job should not use the default queue for its processor, this can be overridden in
/// user-code. /// user-code.

View file

@ -27,7 +27,7 @@ use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
/// ///
/// Although exposed publically, this type should only really be handled by the library itself, and /// Although exposed publically, this type should only really be handled by the library itself, and
/// is impossible to create outside of a /// is impossible to create outside of a
/// [Processor](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html)'s /// [Processor](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html)'s
/// new_job method. /// new_job method.
pub struct JobInfo { pub struct JobInfo {
/// ID of the job, None means an ID has not been set /// ID of the job, None means an ID has not been set

View file

@ -33,11 +33,11 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
/// - The job's default queue /// - The job's default queue
/// - The job's default maximum number of retries /// - The job's default maximum number of retries
/// - The job's [backoff /// - The job's [backoff
/// strategy](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Backoff.html) /// strategy](https://docs.rs/background-jobs/0.3.0/background_jobs/enum.Backoff.html)
/// ///
/// Processors also provide the default mechanism for running a job, and the only mechanism for /// Processors also provide the default mechanism for running a job, and the only mechanism for
/// creating a /// creating a
/// [JobInfo](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.JobInfo.html), /// [JobInfo](https://docs.rs/background-jobs-core/0.3.0/background_jobs_core/struct.JobInfo.html),
/// which is the type required for queuing jobs to be executed. /// which is the type required for queuing jobs to be executed.
/// ///
/// ### Example /// ### Example
@ -54,8 +54,8 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
/// count: i32, /// count: i32,
/// } /// }
/// ///
/// impl Job for MyJob { /// impl Job<()> for MyJob {
/// fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> { /// fn run(self, _state: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
/// info!("Processing {}", self.count); /// info!("Processing {}", self.count);
/// ///
/// Box::new(Ok(()).into_future()) /// Box::new(Ok(()).into_future())
@ -65,7 +65,7 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
/// #[derive(Clone)] /// #[derive(Clone)]
/// struct MyProcessor; /// struct MyProcessor;
/// ///
/// impl Processor for MyProcessor { /// impl Processor<()> for MyProcessor {
/// type Job = MyJob; /// type Job = MyJob;
/// ///
/// const NAME: &'static str = "IncrementProcessor"; /// const NAME: &'static str = "IncrementProcessor";
@ -80,8 +80,11 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub trait Processor: Clone { pub trait Processor<S = ()>: Clone
type Job: Job; where
S: Clone + Send + Sync + 'static,
{
type Job: Job<S>;
/// The name of the processor /// The name of the processor
/// ///
@ -129,14 +132,22 @@ pub trait Processor: Clone {
/// Advanced users may want to override this method in order to provide their own custom /// Advanced users may want to override this method in order to provide their own custom
/// before/after logic for certain job processors /// before/after logic for certain job processors
/// ///
/// The state passed into this method is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
///
/// ```rust,ignore /// ```rust,ignore
/// fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> { /// fn process(
/// &self,
/// args: Value,
/// state: S
/// ) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
/// let res = serde_json::from_value::<Self::Job>(args); /// let res = serde_json::from_value::<Self::Job>(args);
/// ///
/// let fut = match res { /// let fut = match res {
/// Ok(job) => { /// Ok(job) => {
/// // Perform some custom pre-job logic /// // Perform some custom pre-job logic
/// Either::A(job.run().map_err(JobError::Processing)) /// Either::A(job.run(state).map_err(JobError::Processing))
/// }, /// },
/// Err(_) => Either::B(Err(JobError::Json).into_future()), /// Err(_) => Either::B(Err(JobError::Json).into_future()),
/// }; /// };
@ -150,13 +161,17 @@ pub trait Processor: Clone {
/// Patterns like this could be useful if you want to use the same job type for multiple /// Patterns like this could be useful if you want to use the same job type for multiple
/// scenarios. Defining the `process` method for multiple `Processor`s with different /// scenarios. Defining the `process` method for multiple `Processor`s with different
/// before/after logic for the same /// before/after logic for the same
/// [`Job`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Job.html) type is /// [`Job`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Job.html) type is
/// supported. /// supported.
fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> { fn process(
&self,
args: Value,
state: S,
) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
let res = serde_json::from_value::<Self::Job>(args); let res = serde_json::from_value::<Self::Job>(args);
let fut = match res { let fut = match res {
Ok(job) => Either::A(job.run().map_err(JobError::Processing)), Ok(job) => Either::A(job.run(state).map_err(JobError::Processing)),
Err(_) => Either::B(Err(JobError::Json).into_future()), Err(_) => Either::B(Err(JobError::Json).into_future()),
}; };

View file

@ -27,41 +27,57 @@ use crate::{JobError, JobInfo, Processor};
/// A generic function that processes a job /// A generic function that processes a job
/// ///
/// Instead of storing /// Instead of storing
/// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) type /// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) type
/// directly, the /// directly, the
/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.ProcessorMap.html) /// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.3.0/background_jobs_core/struct.ProcessorMap.html)
/// struct stores these `ProcessFn` types that don't expose differences in Job types. /// struct stores these `ProcessFn` types that don't expose differences in Job types.
pub type ProcessFn = pub type ProcessFn =
Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>; Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>;
/// A type for storing the relationships between processor names and the processor itself /// A type for storing the relationships between processor names and the processor itself
/// ///
/// [`Processor`s](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) must /// [`Processor`s](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) must
/// be registered with the `ProcessorMap` in the initialization phase of an application before /// be registered with the `ProcessorMap` in the initialization phase of an application before
/// workers are spawned in order to handle queued jobs. /// workers are spawned in order to handle queued jobs.
pub struct ProcessorMap { pub struct ProcessorMap<S>
where
S: Clone,
{
inner: HashMap<String, ProcessFn>, inner: HashMap<String, ProcessFn>,
state: S,
} }
impl ProcessorMap { impl<S> ProcessorMap<S>
where
S: Clone + Send + Sync + 'static,
{
/// Intialize a `ProcessorMap` /// Intialize a `ProcessorMap`
pub fn new() -> Self { ///
Default::default() /// The state passed into this method will be passed to all jobs executed through this
/// ProcessorMap. The state argument could be useful for containing a hook into something like
/// r2d2, or the address of an actor in an actix-based system.
pub fn new(state: S) -> Self {
ProcessorMap {
inner: HashMap::new(),
state,
}
} }
/// Register a /// Register a
/// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) with /// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) with
/// this `ProcessorMap`. /// this `ProcessorMap`.
/// ///
/// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so
/// make sure to register all your processors up-front. /// make sure to register all your processors up-front.
pub fn register_processor<P>(&mut self, processor: P) pub fn register_processor<P>(&mut self, processor: P)
where where
P: Processor + Send + Sync + 'static, P: Processor<S> + Send + Sync + 'static,
{ {
let state = self.state.clone();
self.inner.insert( self.inner.insert(
P::NAME.to_owned(), P::NAME.to_owned(),
Box::new(move |value| processor.process(value)), Box::new(move |value| processor.process(value, state.clone())),
); );
} }
@ -84,14 +100,6 @@ impl ProcessorMap {
} }
} }
impl Default for ProcessorMap {
fn default() -> Self {
ProcessorMap {
inner: Default::default(),
}
}
}
fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> { fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
let args = job.args(); let args = job.args();

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-server" name = "background-jobs-server"
description = "Jobs processor server based on ZeroMQ" description = "Jobs processor server based on ZeroMQ"
version = "0.2.0" version = "0.3.0"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs" repository = "https://git.asonix.dog/asonix/background-jobs"
@ -14,6 +14,7 @@ futures = "0.1"
log = "0.4" log = "0.4"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0"
tokio = "0.1" tokio = "0.1"
tokio-threadpool = "0.1" tokio-threadpool = "0.1"
zmq = "0.8" zmq = "0.8"
@ -22,7 +23,7 @@ zmq = "0.8"
default = ["tokio-zmq"] default = ["tokio-zmq"]
[dependencies.background-jobs-core] [dependencies.background-jobs-core]
version = "0.2" version = "0.3"
path = "../jobs-core" path = "../jobs-core"
[dependencies.tokio-zmq] [dependencies.tokio-zmq]

View file

@ -17,7 +17,14 @@
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>. * along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/ */
use std::marker::PhantomData;
use background_jobs_core::{Backoff, Job, MaxRetries};
use failure::Error; use failure::Error;
use futures::{future::poll_fn, Future};
use serde::{de::DeserializeOwned, ser::Serialize};
use serde_derive::{Deserialize, Serialize};
use tokio_threadpool::blocking;
mod server; mod server;
mod spawner; mod spawner;
@ -34,3 +41,121 @@ where
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
} }
/// The SyncJob trait defines parameters pertaining to a synchronous instance of background job
///
/// This trait should be implemented sparingly, but is provided so that synchronous tasks may be
/// executed. If you have the ability to implement the
/// [`Job`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Job.html) trait directly,
/// you should.
///
/// ### Example
///
/// ```rust
/// use background_jobs_server::SyncJob;
/// use failure::Error;
/// use log::info;
/// use serde_derive::{Deserialize, Serialize};
///
/// #[derive(Clone, Deserialize, Serialize)]
/// struct MyJob {
/// count: i32,
/// }
///
/// impl SyncJob for MyJob {
/// fn run(self, _state: ()) -> Result<(), Error> {
/// info!("Processing {}", self.count);
///
/// // Perform some synchronous operation, like a DB action with r2d2 and diesel
///
/// Ok(())
/// }
/// }
///
/// fn main() {
/// let sync_job = MyJob { count: 0 };
/// let job = sync_job.to_job();
/// }
/// ```
pub trait SyncJob<S = ()>: Clone {
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
/// processes, that logic should all be called from inside this method.
///
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: S) -> Result<(), Error>;
/// If this job should not use the default queue for its processor, this can be overridden in
/// user-code.
///
/// Jobs will only be processed by processors that are registered, and if a queue is supplied
/// here that is not associated with a valid processor for this job, it will never be
/// processed.
fn queue(&self) -> Option<&str> {
None
}
/// If this job should not use the default maximum retry count for its processor, this can be
/// overridden in user-code.
fn max_retries(&self) -> Option<MaxRetries> {
None
}
/// If this job should not use the default backoff strategy for its processor, this can be
/// overridden in user-code.
fn backoff_strategy(&self) -> Option<Backoff> {
None
}
/// Wrap this type in a SyncJobWrapper so it implements Job
fn to_job(self) -> SyncJobWrapper<Self, S> {
SyncJobWrapper {
inner: self,
phantom: PhantomData,
}
}
}
/// A wrapper around synchronous jobs
#[derive(Clone, Deserialize, Serialize)]
pub struct SyncJobWrapper<J, S = ()>
where
J: SyncJob<S>,
{
inner: J,
phantom: PhantomData<S>,
}
impl<J, S> Job<S> for SyncJobWrapper<J, S>
where
J: SyncJob<S> + Serialize + DeserializeOwned + Send + 'static,
S: Clone + Send + Sync + 'static,
{
fn queue(&self) -> Option<&str> {
self.inner.queue()
}
fn max_retries(&self) -> Option<MaxRetries> {
self.inner.max_retries()
}
fn backoff_strategy(&self) -> Option<Backoff> {
self.inner.backoff_strategy()
}
fn run(self, state: S) -> Box<dyn Future<Item = (), Error = Error> + Send> {
let fut = poll_fn(move || {
let job = self.inner.clone();
let state = state.clone();
blocking(move || job.run(state.clone()))
})
.then(coerce);
Box::new(fut)
}
}

View file

@ -123,7 +123,7 @@ struct MissingQueue(String);
/// ///
/// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but /// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but
/// it does not provide functionality to execute jobs. For that, you must create a /// it does not provide functionality to execute jobs. For that, you must create a
/// [`Worker`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.WorkerConfig.html) /// [`Worker`](https://docs.rs/background-jobs-server/0.3.0/background_jobs_server/struct.WorkerConfig.html)
/// that will connect to the running server. /// that will connect to the running server.
/// ///
/// This type doesn't have any associated data, but is used as a proxy for starting the /// This type doesn't have any associated data, but is used as a proxy for starting the

View file

@ -33,23 +33,29 @@ use tokio::timer::Delay;
use tokio_zmq::{prelude::*, Multipart, Pull, Push}; use tokio_zmq::{prelude::*, Multipart, Pull, Push};
use zmq::{Context, Message}; use zmq::{Context, Message};
pub(crate) struct Worker { pub(crate) struct Worker<S>
where
S: Clone,
{
pull: Pull, pull: Pull,
push: Push, push: Push,
push2: Push, push2: Push,
push_address: String, push_address: String,
pull_address: String, pull_address: String,
queue: String, queue: String,
processors: Arc<ProcessorMap>, processors: Arc<ProcessorMap<S>>,
context: Arc<Context>, context: Arc<Context>,
} }
impl Worker { impl<S> Worker<S>
where
S: Clone + Send + Sync + 'static,
{
pub(crate) fn init( pub(crate) fn init(
push_address: String, push_address: String,
pull_address: String, pull_address: String,
queue: String, queue: String,
processors: Arc<ProcessorMap>, processors: Arc<ProcessorMap<S>>,
context: Arc<Context>, context: Arc<Context>,
) -> impl Future<Item = (), Error = ()> { ) -> impl Future<Item = (), Error = ()> {
let cfg = ResetWorker { let cfg = ResetWorker {
@ -107,7 +113,7 @@ impl Worker {
Box::new(fut) Box::new(fut)
} }
fn reset(&self) -> ResetWorker { fn reset(&self) -> ResetWorker<S> {
ResetWorker { ResetWorker {
push_address: self.push_address.clone(), push_address: self.push_address.clone(),
pull_address: self.pull_address.clone(), pull_address: self.pull_address.clone(),
@ -118,15 +124,21 @@ impl Worker {
} }
} }
struct ResetWorker { struct ResetWorker<S>
where
S: Clone,
{
push_address: String, push_address: String,
pull_address: String, pull_address: String,
queue: String, queue: String,
processors: Arc<ProcessorMap>, processors: Arc<ProcessorMap<S>>,
context: Arc<Context>, context: Arc<Context>,
} }
impl ResetWorker { impl<S> ResetWorker<S>
where
S: Clone + Send + Sync + 'static,
{
fn rebuild(self) -> impl Future<Item = (), Error = ()> { fn rebuild(self) -> impl Future<Item = (), Error = ()> {
let queue = self.queue.clone(); let queue = self.queue.clone();
@ -194,10 +206,13 @@ fn report_running(
.map_err(|_| NotifyError.into()) .map_err(|_| NotifyError.into())
} }
fn process_job( fn process_job<S>(
job: JobInfo, job: JobInfo,
processors: &ProcessorMap, processors: &ProcessorMap<S>,
) -> impl Future<Item = JobInfo, Error = Error> { ) -> impl Future<Item = JobInfo, Error = Error>
where
S: Clone + Send + Sync + 'static,
{
processors processors
.process_job(job.clone()) .process_job(job.clone())
.map_err(|_| ProcessError) .map_err(|_| ProcessError)

View file

@ -34,7 +34,7 @@ use self::{config::Worker, portmap::PortMap};
/// ///
/// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects /// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects
/// to a server (crated with /// to a server (crated with
/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.ServerConfig.html)) /// [`ServerConfig`](https://docs.rs/background-jobs-server/0.3.0/background_jobs_server/struct.ServerConfig.html))
/// and receives work from there. /// and receives work from there.
/// ///
/// ```rust /// ```rust
@ -46,7 +46,7 @@ use self::{config::Worker, portmap::PortMap};
/// let mut queue_map = BTreeMap::new(); /// let mut queue_map = BTreeMap::new();
/// queue_map.insert("default".to_owned(), 10); /// queue_map.insert("default".to_owned(), 10);
/// ///
/// let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map); /// let mut worker = WorkerConfig::new((), "localhost".to_owned(), 5555, queue_map);
/// ///
/// // Register a processor /// // Register a processor
/// // worker.register_processor(MyProcessor); /// // worker.register_processor(MyProcessor);
@ -57,18 +57,27 @@ use self::{config::Worker, portmap::PortMap};
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub struct WorkerConfig { pub struct WorkerConfig<S>
processors: ProcessorMap, where
S: Clone,
{
processors: ProcessorMap<S>,
queues: BTreeMap<String, usize>, queues: BTreeMap<String, usize>,
server_host: String, server_host: String,
base_port: usize, base_port: usize,
context: Arc<Context>, context: Arc<Context>,
} }
impl WorkerConfig { impl<S> WorkerConfig<S>
where
S: Clone + Send + Sync + 'static,
{
/// Create a new worker /// Create a new worker
/// ///
/// This method takes three arguments /// This method takes four arguments
/// - The state passed into this method will be passed to all jobs executed on this Worker.
/// The state argument could be useful for containing a hook into something like r2d2, or
/// the address of an actor in an actix-based system.
/// - `server_host` is the hostname, or IP address, of the background-jobs server. /// - `server_host` is the hostname, or IP address, of the background-jobs server.
/// - `base_port` is the same value from the `ServerConfig` initialization. It dictates the /// - `base_port` is the same value from the `ServerConfig` initialization. It dictates the
/// port the worker uses to return jobs to the server. The worker is guaranteed to connect /// port the worker uses to return jobs to the server. The worker is guaranteed to connect
@ -76,10 +85,15 @@ impl WorkerConfig {
/// `base_port` + n. /// `base_port` + n.
/// - queues is a mapping between the name of a queue, and the number of workers that should /// - queues is a mapping between the name of a queue, and the number of workers that should
/// be started to process jobs in that queue. /// be started to process jobs in that queue.
pub fn new(server_host: String, base_port: usize, queues: BTreeMap<String, usize>) -> Self { pub fn new(
state: S,
server_host: String,
base_port: usize,
queues: BTreeMap<String, usize>,
) -> Self {
let context = Arc::new(Context::new()); let context = Arc::new(Context::new());
Self::new_with_context(server_host, base_port, queues, context) Self::new_with_context(state, server_host, base_port, queues, context)
} }
/// The same as `WorkerConfig::new()`, but with a provided ZeroMQ Context. /// The same as `WorkerConfig::new()`, but with a provided ZeroMQ Context.
@ -90,13 +104,14 @@ impl WorkerConfig {
/// If you're running the Server, Worker, and Spawner in the same application, you should share /// If you're running the Server, Worker, and Spawner in the same application, you should share
/// a ZeroMQ context between them. /// a ZeroMQ context between them.
pub fn new_with_context( pub fn new_with_context(
state: S,
server_host: String, server_host: String,
base_port: usize, base_port: usize,
queues: BTreeMap<String, usize>, queues: BTreeMap<String, usize>,
context: Arc<Context>, context: Arc<Context>,
) -> Self { ) -> Self {
WorkerConfig { WorkerConfig {
processors: ProcessorMap::new(), processors: ProcessorMap::new(state),
server_host, server_host,
base_port, base_port,
queues, queues,
@ -107,10 +122,10 @@ impl WorkerConfig {
/// Register a processor with this worker /// Register a processor with this worker
/// ///
/// For more information, see /// For more information, see
/// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Processor.html). /// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/enum.Processor.html).
pub fn register_processor<P>(&mut self, processor: P) pub fn register_processor<P>(&mut self, processor: P)
where where
P: Processor + Send + Sync + 'static, P: Processor<S> + Send + Sync + 'static,
{ {
self.processors.register_processor(processor); self.processors.register_processor(processor);
} }
@ -119,7 +134,10 @@ impl WorkerConfig {
/// ///
/// This method returns a future that, when run, spawns all of the worker's required futures /// This method returns a future that, when run, spawns all of the worker's required futures
/// onto tokio. Therefore, this can only be used from tokio. /// onto tokio. Therefore, this can only be used from tokio.
pub fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> { pub fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send>
where
S: Send + Sync + 'static,
{
let WorkerConfig { let WorkerConfig {
processors, processors,
server_host, server_host,

View file

@ -54,7 +54,7 @@
//! } //! }
//! //!
//! impl Job for MyJob { //! impl Job for MyJob {
//! fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> { //! fn run(self, _: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> {
//! info!("args: {:?}", self); //! info!("args: {:?}", self);
//! //!
//! Box::new(Ok(()).into_future()) //! Box::new(Ok(()).into_future())
@ -62,6 +62,28 @@
//! } //! }
//! ``` //! ```
//! //!
//! The run method for a job takes an additional argument, which is the state the job expects to
//! use. The state for all jobs defined in an application must be the same. By default, the state
//! is an empty tuple, but it's likely you'll want to pass in some Actix address, or something
//! else.
//!
//! Let's re-define the job to care about some application state.
//!
//! ```rust,ignore
//! #[derive(Clone, Debug)]
//! pub struct MyState {
//! pub app_name: String,
//! }
//!
//! impl Job<MyState> for MyJob {
//! fn run(self, state: MyState) -> Box<dyn Future<Item = (), Error = Error> + Send> {
//! info!("{}: args, {:?}", state.app_name, self);
//!
//! Box::new(Ok(()).into_future())
//! }
//! }
//! ```
//!
//! #### Next, define a Processor. //! #### Next, define a Processor.
//! Processors are types that define default attributes for jobs, as well as containing some logic //! Processors are types that define default attributes for jobs, as well as containing some logic
//! used internally to perform the job. Processors must implement `Proccessor` and `Clone`. //! used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
@ -70,7 +92,7 @@
//! #[derive(Clone, Debug)] //! #[derive(Clone, Debug)]
//! pub struct MyProcessor; //! pub struct MyProcessor;
//! //!
//! impl Processor for MyProcessor { //! impl Processor<MyState> for MyProcessor {
//! // The kind of job this processor should execute //! // The kind of job this processor should execute
//! type Job = MyJob; //! type Job = MyJob;
//! //!
@ -144,7 +166,14 @@
//! //!
//! fn main() -> Result<(), Error> { //! fn main() -> Result<(), Error> {
//! // Create the worker config //! // Create the worker config
//! let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); //! let mut worker = WorkerConfig::new(
//! MyState {
//! app_name: "My Example Application".to_owned(),
//! },
//! "localhost".to_owned(),
//! 5555,
//! queue_map()
//! );
//! //!
//! // Register our processor //! // Register our processor
//! worker.register_processor(MyProcessor); //! worker.register_processor(MyProcessor);
@ -212,4 +241,4 @@
pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
#[cfg(feature = "background-jobs-server")] #[cfg(feature = "background-jobs-server")]
pub use background_jobs_server::{ServerConfig, SpawnerConfig, WorkerConfig}; pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};