diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index d826db6..0756db6 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -23,6 +23,7 @@ impl Every where J: Job + Clone + 'static, { + /// Create a new Every actor pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self { Every { spawner, diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index df5982c..7459237 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -1,9 +1,131 @@ +#![deny(missing_docs)] + +//! # An Actix-based Jobs Processor +//! +//! This library will spin up as many actors as requested for each processor to process jobs +//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in +//! order to achieve parallel execution, multiple Arbiters must be in use. +//! +//! The thread count is used to spawn Synchronous Actors to handle the storage of job +//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be +//! used. By default, the number of cores of the running system is used. +//! +//! ### Example +//! ```rust +//! use actix::System; +//! use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig}; +//! use failure::Error; +//! use futures::{future::ok, Future}; +//! use serde_derive::{Deserialize, Serialize}; +//! +//! const DEFAULT_QUEUE: &'static str = "default"; +//! +//! #[derive(Clone, Debug)] +//! pub struct MyState { +//! pub app_name: String, +//! } +//! +//! #[derive(Clone, Debug, Deserialize, Serialize)] +//! pub struct MyJob { +//! some_usize: usize, +//! other_usize: usize, +//! } +//! +//! #[derive(Clone, Debug)] +//! pub struct MyProcessor; +//! +//! fn main() -> Result<(), Error> { +//! // First set up the Actix System to ensure we have a runtime to spawn jobs on. +//! let sys = System::new("my-actix-system"); +//! +//! // Set up our Storage +//! // For this example, we use the default in-memory storage mechanism +//! use background_jobs::memory_storage::Storage; +//! let storage = Storage::new(); +//! +//! // Start the application server. This guards access to to the jobs store +//! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); +//! +//! // Configure and start our workers +//! WorkerConfig::new(move || MyState::new("My App")) +//! .register(MyProcessor) +//! .set_processor_count(DEFAULT_QUEUE, 16) +//! .start(queue_handle.clone()); +//! +//! // Queue our jobs +//! queue_handle.queue(MyJob::new(1, 2))?; +//! queue_handle.queue(MyJob::new(3, 4))?; +//! queue_handle.queue(MyJob::new(5, 6))?; +//! +//! // Block on Actix +//! sys.run()?; +//! Ok(()) +//! } +//! +//! impl MyState { +//! pub fn new(app_name: &str) -> Self { +//! MyState { +//! app_name: app_name.to_owned(), +//! } +//! } +//! } +//! +//! impl MyJob { +//! pub fn new(some_usize: usize, other_usize: usize) -> Self { +//! MyJob { +//! some_usize, +//! other_usize, +//! } +//! } +//! } +//! +//! impl Job for MyJob { +//! type Processor = MyProcessor; +//! type State = MyState; +//! type Future = Result<(), Error>; +//! +//! fn run(self, state: MyState) -> Self::Future { +//! println!("{}: args, {:?}", state.app_name, self); +//! +//! Ok(()) +//! } +//! } +//! +//! impl Processor for MyProcessor { +//! // The kind of job this processor should execute +//! type Job = MyJob; +//! +//! // The name of the processor. It is super important that each processor has a unique name, +//! // because otherwise one processor will overwrite another processor when they're being +//! // registered. +//! const NAME: &'static str = "MyProcessor"; +//! +//! // The queue that this processor belongs to +//! // +//! // Workers have the option to subscribe to specific queues, so this is important to +//! // determine which worker will call the processor +//! // +//! // Jobs can optionally override the queue they're spawned on +//! const QUEUE: &'static str = DEFAULT_QUEUE; +//! +//! // The number of times background-jobs should try to retry a job before giving up +//! // +//! // Jobs can optionally override this value +//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); +//! +//! // The logic to determine how often to retry this job if it fails +//! // +//! // Jobs can optionally override this value +//! const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); +//! } +//! ``` + use std::{collections::BTreeMap, sync::Arc, time::Duration}; use actix::{Actor, Addr, Arbiter, SyncArbiter}; use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; use failure::{Error, Fail}; -use futures::Future; +use futures::{future::IntoFuture, Future}; mod every; mod pinger; @@ -20,6 +142,10 @@ use self::{ worker::Worker, }; +/// The configuration for a jobs server +/// +/// The server guards access to the storage backend, and keeps job information properly +/// up-to-date when workers request jobs to process pub struct ServerConfig { storage: S, threads: usize, @@ -114,6 +240,7 @@ where where P: Processor + Send + Sync + 'static, J: Job, + ::Future: Send, { self.queues.insert(P::QUEUE.to_owned(), 4); self.processors.register_processor(processor); diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 8265b50..b318653 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -8,6 +8,10 @@ use serde_derive::Deserialize; use crate::{ActixStorage, Worker}; +/// The server Actor +/// +/// This server guards access to Thee storage, and keeps a list of workers that are waiting for +/// jobs to process pub struct Server { storage: Box, cache: HashMap>>, diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index d106890..519dfc3 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -44,6 +44,7 @@ where } } +/// A worker that runs on the same system as the jobs server pub struct LocalWorker where S: Actor + Handler + Handler, @@ -62,6 +63,7 @@ where S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { + /// Create a new local worker pub fn new(id: u64, queue: String, processors: ProcessorMap, server: Addr) -> Self { LocalWorker { id, diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 5b4c29f..1fd69e7 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,5 +1,5 @@ use failure::Error; -use futures::{future::IntoFuture, Future}; +use futures::future::IntoFuture; use serde::{de::DeserializeOwned, ser::Serialize}; use crate::{Backoff, MaxRetries, Processor}; @@ -25,7 +25,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The state passed into this job is initialized at the start of the application. The state /// argument could be useful for containing a hook into something like r2d2, or the address of /// an actor in an actix-based system. - fn run(self, state: Self::State) -> Box + Send>; + fn run(self, state: Self::State) -> Self::Future; /// If this job should not use the default queue for its processor, this can be overridden in /// user-code. diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 5a1be8a..ded35cb 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -159,7 +159,10 @@ pub trait Processor: Clone { &self, args: Value, state: ::State, - ) -> Box + Send> { + ) -> Box + Send> + where + <::Future as IntoFuture>::Future: Send, + { let res = serde_json::from_value::(args); let fut = match res { diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 24735ea..63084d2 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -58,6 +58,7 @@ where where P: Processor + Sync + Send + 'static, J: Job, + ::Future: Send, { self.inner.insert( P::NAME.to_owned(),