From 4f2530d4859ce9b8db14c7aa969b97fabd88b7b0 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 18 Nov 2018 15:05:03 -0600 Subject: [PATCH] Add application state --- Cargo.toml | 6 +- README.md | 33 ++++- examples/server-jobs-example/Cargo.toml | 2 +- .../server-jobs-example/src/bin/worker.rs | 2 +- examples/server-jobs-example/src/lib.rs | 2 +- jobs-core/Cargo.toml | 2 +- jobs-core/src/job.rs | 11 +- jobs-core/src/job_info.rs | 2 +- jobs-core/src/processor.rs | 39 ++++-- jobs-core/src/processor_map.rs | 44 +++--- jobs-server/Cargo.toml | 5 +- jobs-server/src/lib.rs | 125 ++++++++++++++++++ jobs-server/src/server/mod.rs | 2 +- jobs-server/src/worker/config.rs | 37 ++++-- jobs-server/src/worker/mod.rs | 42 ++++-- src/lib.rs | 37 +++++- 16 files changed, 319 insertions(+), 72 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b63468..bc0aa88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with tokio and futures" -version = "0.2.0" +version = "0.3.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,10 +20,10 @@ members = [ default = ["background-jobs-server", "background-jobs-server/tokio-zmq"] [dependencies.background-jobs-core] -version = "0.2" +version = "0.3" path = "jobs-core" [dependencies.background-jobs-server] -version = "0.2" +version = "0.3" path = "jobs-server" optional = true diff --git a/README.md b/README.md index 12b7afd..fb394ef 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,28 @@ impl Job for MyJob { } ``` +The run method for a job takes an additional argument, which is the state the job expects to +use. The state for all jobs defined in an application must be the same. By default, the state +is an empty tuple, but it's likely you'll want to pass in some Actix address, or something +else. + +Let's re-define the job to care about some application state. + +```rust,ignore +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +impl Job for MyJob { + fn run(self, state: MyState) -> Box + Send> { + info!("{}: args, {:?}", state.app_name, self); + + Box::new(Ok(()).into_future()) + } +} +``` + #### Next, define a Processor. Processors are types that define default attributes for jobs, as well as containing some logic used internally to perform the job. Processors must implement `Proccessor` and `Clone`. @@ -52,7 +74,7 @@ used internally to perform the job. Processors must implement `Proccessor` and ` #[derive(Clone, Debug)] pub struct MyProcessor; -impl Processor for MyProcessor { +impl Processor for MyProcessor { // The kind of job this processor should execute type Job = MyJob; @@ -125,7 +147,14 @@ use server_jobs_example::{queue_map, MyProcessor}; fn main() -> Result<(), Error> { // Create the worker config - let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); + let mut worker = WorkerConfig::new( + MyState { + app_name: "My Example Application".to_owned(), + }, + "localhost".to_owned(), + 5555, + queue_map() + ); // Register our processor worker.register_processor(MyProcessor); diff --git a/examples/server-jobs-example/Cargo.toml b/examples/server-jobs-example/Cargo.toml index 36cba9a..82f725a 100644 --- a/examples/server-jobs-example/Cargo.toml +++ b/examples/server-jobs-example/Cargo.toml @@ -15,7 +15,7 @@ serde_derive = "1.0" tokio = "0.1" [dependencies.background-jobs] -version = "0.2" +version = "0.3" path = "../.." default-features = false features = ["background-jobs-server"] diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs index 16c05be..435c06c 100644 --- a/examples/server-jobs-example/src/bin/worker.rs +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -25,7 +25,7 @@ fn main() -> Result<(), Error> { dotenv::dotenv().ok(); env_logger::init(); - let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); + let mut worker = WorkerConfig::new((), "localhost".to_owned(), 5555, queue_map()); worker.register_processor(MyProcessor); diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs index 9ef218e..c03aa0a 100644 --- a/examples/server-jobs-example/src/lib.rs +++ b/examples/server-jobs-example/src/lib.rs @@ -57,7 +57,7 @@ impl MyJob { } impl Job for MyJob { - fn run(self) -> Box + Send> { + fn run(self, _: ()) -> Box + Send> { info!("args: {:?}", self); Box::new(Ok(()).into_future()) diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index c780d3c..1842b0b 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.2.0" +version = "0.3.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index ec8dda9..60a32c6 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -24,13 +24,20 @@ use serde::{de::DeserializeOwned, ser::Serialize}; use crate::{Backoff, MaxRetries}; /// The Job trait defines parameters pertaining to an instance of background job -pub trait Job: Serialize + DeserializeOwned { +pub trait Job: Serialize + DeserializeOwned +where + S: Clone + Send + Sync + 'static, +{ /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy /// processes, that logic should all be called from inside this method. - fn run(self) -> Box + Send>; + /// + /// The state passed into this job is initialized at the start of the application. The state + /// argument could be useful for containing a hook into something like r2d2, or the address of + /// an actor in an actix-based system. + fn run(self, state: S) -> Box + Send>; /// If this job should not use the default queue for its processor, this can be overridden in /// user-code. diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 72d017d..82519eb 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -27,7 +27,7 @@ use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; /// /// Although exposed publically, this type should only really be handled by the library itself, and /// is impossible to create outside of a -/// [Processor](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html)'s +/// [Processor](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html)'s /// new_job method. pub struct JobInfo { /// ID of the job, None means an ID has not been set diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 2d8c9bb..6c52a36 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -33,11 +33,11 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// - The job's default queue /// - The job's default maximum number of retries /// - The job's [backoff -/// strategy](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Backoff.html) +/// strategy](https://docs.rs/background-jobs/0.3.0/background_jobs/enum.Backoff.html) /// /// Processors also provide the default mechanism for running a job, and the only mechanism for /// creating a -/// [JobInfo](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.JobInfo.html), +/// [JobInfo](https://docs.rs/background-jobs-core/0.3.0/background_jobs_core/struct.JobInfo.html), /// which is the type required for queuing jobs to be executed. /// /// ### Example @@ -54,8 +54,8 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// count: i32, /// } /// -/// impl Job for MyJob { -/// fn run(self) -> Box + Send> { +/// impl Job<()> for MyJob { +/// fn run(self, _state: ()) -> Box + Send> { /// info!("Processing {}", self.count); /// /// Box::new(Ok(()).into_future()) @@ -65,7 +65,7 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// #[derive(Clone)] /// struct MyProcessor; /// -/// impl Processor for MyProcessor { +/// impl Processor<()> for MyProcessor { /// type Job = MyJob; /// /// const NAME: &'static str = "IncrementProcessor"; @@ -80,8 +80,11 @@ use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; /// Ok(()) /// } /// ``` -pub trait Processor: Clone { - type Job: Job; +pub trait Processor: Clone +where + S: Clone + Send + Sync + 'static, +{ + type Job: Job; /// The name of the processor /// @@ -129,14 +132,22 @@ pub trait Processor: Clone { /// Advanced users may want to override this method in order to provide their own custom /// before/after logic for certain job processors /// + /// The state passed into this method is initialized at the start of the application. The state + /// argument could be useful for containing a hook into something like r2d2, or the address of + /// an actor in an actix-based system. + /// /// ```rust,ignore - /// fn process(&self, args: Value) -> Box + Send> { + /// fn process( + /// &self, + /// args: Value, + /// state: S + /// ) -> Box + Send> { /// let res = serde_json::from_value::(args); /// /// let fut = match res { /// Ok(job) => { /// // Perform some custom pre-job logic - /// Either::A(job.run().map_err(JobError::Processing)) + /// Either::A(job.run(state).map_err(JobError::Processing)) /// }, /// Err(_) => Either::B(Err(JobError::Json).into_future()), /// }; @@ -150,13 +161,17 @@ pub trait Processor: Clone { /// Patterns like this could be useful if you want to use the same job type for multiple /// scenarios. Defining the `process` method for multiple `Processor`s with different /// before/after logic for the same - /// [`Job`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Job.html) type is + /// [`Job`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Job.html) type is /// supported. - fn process(&self, args: Value) -> Box + Send> { + fn process( + &self, + args: Value, + state: S, + ) -> Box + Send> { let res = serde_json::from_value::(args); let fut = match res { - Ok(job) => Either::A(job.run().map_err(JobError::Processing)), + Ok(job) => Either::A(job.run(state).map_err(JobError::Processing)), Err(_) => Either::B(Err(JobError::Json).into_future()), }; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 1a63c39..3beecf4 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -27,41 +27,57 @@ use crate::{JobError, JobInfo, Processor}; /// A generic function that processes a job /// /// Instead of storing -/// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) type +/// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) type /// directly, the -/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.2.0/background_jobs_core/struct.ProcessorMap.html) +/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.3.0/background_jobs_core/struct.ProcessorMap.html) /// struct stores these `ProcessFn` types that don't expose differences in Job types. pub type ProcessFn = Box Box + Send> + Send + Sync>; /// A type for storing the relationships between processor names and the processor itself /// -/// [`Processor`s](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) must +/// [`Processor`s](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) must /// be registered with the `ProcessorMap` in the initialization phase of an application before /// workers are spawned in order to handle queued jobs. -pub struct ProcessorMap { +pub struct ProcessorMap +where + S: Clone, +{ inner: HashMap, + state: S, } -impl ProcessorMap { +impl ProcessorMap +where + S: Clone + Send + Sync + 'static, +{ /// Intialize a `ProcessorMap` - pub fn new() -> Self { - Default::default() + /// + /// The state passed into this method will be passed to all jobs executed through this + /// ProcessorMap. The state argument could be useful for containing a hook into something like + /// r2d2, or the address of an actor in an actix-based system. + pub fn new(state: S) -> Self { + ProcessorMap { + inner: HashMap::new(), + state, + } } /// Register a - /// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/trait.Processor.html) with + /// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html) with /// this `ProcessorMap`. /// /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so /// make sure to register all your processors up-front. pub fn register_processor

(&mut self, processor: P) where - P: Processor + Send + Sync + 'static, + P: Processor + Send + Sync + 'static, { + let state = self.state.clone(); + self.inner.insert( P::NAME.to_owned(), - Box::new(move |value| processor.process(value)), + Box::new(move |value| processor.process(value, state.clone())), ); } @@ -84,14 +100,6 @@ impl ProcessorMap { } } -impl Default for ProcessorMap { - fn default() -> Self { - ProcessorMap { - inner: Default::default(), - } - } -} - fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { let args = job.args(); diff --git a/jobs-server/Cargo.toml b/jobs-server/Cargo.toml index dd6fce7..51a81d0 100644 --- a/jobs-server/Cargo.toml +++ b/jobs-server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-server" description = "Jobs processor server based on ZeroMQ" -version = "0.2.0" +version = "0.3.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -14,6 +14,7 @@ futures = "0.1" log = "0.4" serde = "1.0" serde_json = "1.0" +serde_derive = "1.0" tokio = "0.1" tokio-threadpool = "0.1" zmq = "0.8" @@ -22,7 +23,7 @@ zmq = "0.8" default = ["tokio-zmq"] [dependencies.background-jobs-core] -version = "0.2" +version = "0.3" path = "../jobs-core" [dependencies.tokio-zmq] diff --git a/jobs-server/src/lib.rs b/jobs-server/src/lib.rs index 970a615..7dd51a7 100644 --- a/jobs-server/src/lib.rs +++ b/jobs-server/src/lib.rs @@ -17,7 +17,14 @@ * along with Background Jobs. If not, see . */ +use std::marker::PhantomData; + +use background_jobs_core::{Backoff, Job, MaxRetries}; use failure::Error; +use futures::{future::poll_fn, Future}; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_derive::{Deserialize, Serialize}; +use tokio_threadpool::blocking; mod server; mod spawner; @@ -34,3 +41,121 @@ where Err(e) => Err(e.into()), } } + +/// The SyncJob trait defines parameters pertaining to a synchronous instance of background job +/// +/// This trait should be implemented sparingly, but is provided so that synchronous tasks may be +/// executed. If you have the ability to implement the +/// [`Job`](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Job.html) trait directly, +/// you should. +/// +/// ### Example +/// +/// ```rust +/// use background_jobs_server::SyncJob; +/// use failure::Error; +/// use log::info; +/// use serde_derive::{Deserialize, Serialize}; +/// +/// #[derive(Clone, Deserialize, Serialize)] +/// struct MyJob { +/// count: i32, +/// } +/// +/// impl SyncJob for MyJob { +/// fn run(self, _state: ()) -> Result<(), Error> { +/// info!("Processing {}", self.count); +/// +/// // Perform some synchronous operation, like a DB action with r2d2 and diesel +/// +/// Ok(()) +/// } +/// } +/// +/// fn main() { +/// let sync_job = MyJob { count: 0 }; +/// let job = sync_job.to_job(); +/// } +/// ``` +pub trait SyncJob: Clone { + /// Users of this library must define what it means to run a job. + /// + /// This should contain all the logic needed to complete a job. If that means queuing more + /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy + /// processes, that logic should all be called from inside this method. + /// + /// The state passed into this job is initialized at the start of the application. The state + /// argument could be useful for containing a hook into something like r2d2, or the address of + /// an actor in an actix-based system. + fn run(self, state: S) -> Result<(), Error>; + + /// If this job should not use the default queue for its processor, this can be overridden in + /// user-code. + /// + /// Jobs will only be processed by processors that are registered, and if a queue is supplied + /// here that is not associated with a valid processor for this job, it will never be + /// processed. + fn queue(&self) -> Option<&str> { + None + } + + /// If this job should not use the default maximum retry count for its processor, this can be + /// overridden in user-code. + fn max_retries(&self) -> Option { + None + } + + /// If this job should not use the default backoff strategy for its processor, this can be + /// overridden in user-code. + fn backoff_strategy(&self) -> Option { + None + } + + /// Wrap this type in a SyncJobWrapper so it implements Job + fn to_job(self) -> SyncJobWrapper { + SyncJobWrapper { + inner: self, + phantom: PhantomData, + } + } +} + +/// A wrapper around synchronous jobs +#[derive(Clone, Deserialize, Serialize)] +pub struct SyncJobWrapper +where + J: SyncJob, +{ + inner: J, + phantom: PhantomData, +} + +impl Job for SyncJobWrapper +where + J: SyncJob + Serialize + DeserializeOwned + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + fn queue(&self) -> Option<&str> { + self.inner.queue() + } + + fn max_retries(&self) -> Option { + self.inner.max_retries() + } + + fn backoff_strategy(&self) -> Option { + self.inner.backoff_strategy() + } + + fn run(self, state: S) -> Box + Send> { + let fut = poll_fn(move || { + let job = self.inner.clone(); + let state = state.clone(); + + blocking(move || job.run(state.clone())) + }) + .then(coerce); + + Box::new(fut) + } +} diff --git a/jobs-server/src/server/mod.rs b/jobs-server/src/server/mod.rs index d35a2ac..d3d7c8f 100644 --- a/jobs-server/src/server/mod.rs +++ b/jobs-server/src/server/mod.rs @@ -123,7 +123,7 @@ struct MissingQueue(String); /// /// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but /// it does not provide functionality to execute jobs. For that, you must create a -/// [`Worker`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.WorkerConfig.html) +/// [`Worker`](https://docs.rs/background-jobs-server/0.3.0/background_jobs_server/struct.WorkerConfig.html) /// that will connect to the running server. /// /// This type doesn't have any associated data, but is used as a proxy for starting the diff --git a/jobs-server/src/worker/config.rs b/jobs-server/src/worker/config.rs index c41e6a9..f4cf138 100644 --- a/jobs-server/src/worker/config.rs +++ b/jobs-server/src/worker/config.rs @@ -33,23 +33,29 @@ use tokio::timer::Delay; use tokio_zmq::{prelude::*, Multipart, Pull, Push}; use zmq::{Context, Message}; -pub(crate) struct Worker { +pub(crate) struct Worker +where + S: Clone, +{ pull: Pull, push: Push, push2: Push, push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc>, context: Arc, } -impl Worker { +impl Worker +where + S: Clone + Send + Sync + 'static, +{ pub(crate) fn init( push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc>, context: Arc, ) -> impl Future { let cfg = ResetWorker { @@ -107,7 +113,7 @@ impl Worker { Box::new(fut) } - fn reset(&self) -> ResetWorker { + fn reset(&self) -> ResetWorker { ResetWorker { push_address: self.push_address.clone(), pull_address: self.pull_address.clone(), @@ -118,15 +124,21 @@ impl Worker { } } -struct ResetWorker { +struct ResetWorker +where + S: Clone, +{ push_address: String, pull_address: String, queue: String, - processors: Arc, + processors: Arc>, context: Arc, } -impl ResetWorker { +impl ResetWorker +where + S: Clone + Send + Sync + 'static, +{ fn rebuild(self) -> impl Future { let queue = self.queue.clone(); @@ -194,10 +206,13 @@ fn report_running( .map_err(|_| NotifyError.into()) } -fn process_job( +fn process_job( job: JobInfo, - processors: &ProcessorMap, -) -> impl Future { + processors: &ProcessorMap, +) -> impl Future +where + S: Clone + Send + Sync + 'static, +{ processors .process_job(job.clone()) .map_err(|_| ProcessError) diff --git a/jobs-server/src/worker/mod.rs b/jobs-server/src/worker/mod.rs index a9199c9..06a2d39 100644 --- a/jobs-server/src/worker/mod.rs +++ b/jobs-server/src/worker/mod.rs @@ -34,7 +34,7 @@ use self::{config::Worker, portmap::PortMap}; /// /// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects /// to a server (crated with -/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.2.0/background_jobs_server/struct.ServerConfig.html)) +/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.3.0/background_jobs_server/struct.ServerConfig.html)) /// and receives work from there. /// /// ```rust @@ -46,7 +46,7 @@ use self::{config::Worker, portmap::PortMap}; /// let mut queue_map = BTreeMap::new(); /// queue_map.insert("default".to_owned(), 10); /// -/// let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map); +/// let mut worker = WorkerConfig::new((), "localhost".to_owned(), 5555, queue_map); /// /// // Register a processor /// // worker.register_processor(MyProcessor); @@ -57,18 +57,27 @@ use self::{config::Worker, portmap::PortMap}; /// Ok(()) /// } /// ``` -pub struct WorkerConfig { - processors: ProcessorMap, +pub struct WorkerConfig +where + S: Clone, +{ + processors: ProcessorMap, queues: BTreeMap, server_host: String, base_port: usize, context: Arc, } -impl WorkerConfig { +impl WorkerConfig +where + S: Clone + Send + Sync + 'static, +{ /// Create a new worker /// - /// This method takes three arguments + /// This method takes four arguments + /// - The state passed into this method will be passed to all jobs executed on this Worker. + /// The state argument could be useful for containing a hook into something like r2d2, or + /// the address of an actor in an actix-based system. /// - `server_host` is the hostname, or IP address, of the background-jobs server. /// - `base_port` is the same value from the `ServerConfig` initialization. It dictates the /// port the worker uses to return jobs to the server. The worker is guaranteed to connect @@ -76,10 +85,15 @@ impl WorkerConfig { /// `base_port` + n. /// - queues is a mapping between the name of a queue, and the number of workers that should /// be started to process jobs in that queue. - pub fn new(server_host: String, base_port: usize, queues: BTreeMap) -> Self { + pub fn new( + state: S, + server_host: String, + base_port: usize, + queues: BTreeMap, + ) -> Self { let context = Arc::new(Context::new()); - Self::new_with_context(server_host, base_port, queues, context) + Self::new_with_context(state, server_host, base_port, queues, context) } /// The same as `WorkerConfig::new()`, but with a provided ZeroMQ Context. @@ -90,13 +104,14 @@ impl WorkerConfig { /// If you're running the Server, Worker, and Spawner in the same application, you should share /// a ZeroMQ context between them. pub fn new_with_context( + state: S, server_host: String, base_port: usize, queues: BTreeMap, context: Arc, ) -> Self { WorkerConfig { - processors: ProcessorMap::new(), + processors: ProcessorMap::new(state), server_host, base_port, queues, @@ -107,10 +122,10 @@ impl WorkerConfig { /// Register a processor with this worker /// /// For more information, see - /// [`Processor`](https://docs.rs/background-jobs/0.2.0/background_jobs/enum.Processor.html). + /// [`Processor`](https://docs.rs/background-jobs/0.3.0/background_jobs/enum.Processor.html). pub fn register_processor

(&mut self, processor: P) where - P: Processor + Send + Sync + 'static, + P: Processor + Send + Sync + 'static, { self.processors.register_processor(processor); } @@ -119,7 +134,10 @@ impl WorkerConfig { /// /// This method returns a future that, when run, spawns all of the worker's required futures /// onto tokio. Therefore, this can only be used from tokio. - pub fn run(self) -> Box + Send> { + pub fn run(self) -> Box + Send> + where + S: Send + Sync + 'static, + { let WorkerConfig { processors, server_host, diff --git a/src/lib.rs b/src/lib.rs index 72edab6..753c0e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! } //! //! impl Job for MyJob { -//! fn run(self) -> Box + Send> { +//! fn run(self, _: ()) -> Box + Send> { //! info!("args: {:?}", self); //! //! Box::new(Ok(()).into_future()) @@ -62,6 +62,28 @@ //! } //! ``` //! +//! The run method for a job takes an additional argument, which is the state the job expects to +//! use. The state for all jobs defined in an application must be the same. By default, the state +//! is an empty tuple, but it's likely you'll want to pass in some Actix address, or something +//! else. +//! +//! Let's re-define the job to care about some application state. +//! +//! ```rust,ignore +//! #[derive(Clone, Debug)] +//! pub struct MyState { +//! pub app_name: String, +//! } +//! +//! impl Job for MyJob { +//! fn run(self, state: MyState) -> Box + Send> { +//! info!("{}: args, {:?}", state.app_name, self); +//! +//! Box::new(Ok(()).into_future()) +//! } +//! } +//! ``` +//! //! #### Next, define a Processor. //! Processors are types that define default attributes for jobs, as well as containing some logic //! used internally to perform the job. Processors must implement `Proccessor` and `Clone`. @@ -70,7 +92,7 @@ //! #[derive(Clone, Debug)] //! pub struct MyProcessor; //! -//! impl Processor for MyProcessor { +//! impl Processor for MyProcessor { //! // The kind of job this processor should execute //! type Job = MyJob; //! @@ -144,7 +166,14 @@ //! //! fn main() -> Result<(), Error> { //! // Create the worker config -//! let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); +//! let mut worker = WorkerConfig::new( +//! MyState { +//! app_name: "My Example Application".to_owned(), +//! }, +//! "localhost".to_owned(), +//! 5555, +//! queue_map() +//! ); //! //! // Register our processor //! worker.register_processor(MyProcessor); @@ -212,4 +241,4 @@ pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; #[cfg(feature = "background-jobs-server")] -pub use background_jobs_server::{ServerConfig, SpawnerConfig, WorkerConfig}; +pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};