diff --git a/README.md b/README.md index 70ac26f..e07b821 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ might not be the best experience. ```toml [dependencies] actix = "0.8" -background-jobs = "0.5.1" +background-jobs = "0.6.0" failure = "0.1" futures = "0.1" serde = "1.0" @@ -24,6 +24,7 @@ operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeO ```rust use background_jobs::Job; +use failure::Error; use serde_derive::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -42,6 +43,9 @@ impl MyJob { } impl Job for MyJob { + type Processor = MyProcessor; // We will define this later + type State = (); + fn run(self, _: ()) -> Box + Send> { info!("args: {:?}", self); @@ -71,7 +75,10 @@ impl MyState { } } -impl Job for MyJob { +impl Job for MyJob { + type Processor = MyProcessor; // We will define this later + type State = MyState; + fn run(self, state: MyState) -> Box + Send> { info!("{}: args, {:?}", state.app_name, self); @@ -92,7 +99,7 @@ const DEFAULT_QUEUE: &'static str = "default"; #[derive(Clone, Debug)] pub struct MyProcessor; -impl Processor for MyProcessor { +impl Processor for MyProcessor { // The kind of job this processor should execute type Job = MyJob; @@ -128,7 +135,8 @@ spawning new jobs. `background-jobs-actix` on it's own doesn't have a mechanism for storing worker state. This can be implemented manually by implementing the `Storage` trait from `background-jobs-core`, -or the `background-jobs-sled-storage` crate can be used to provide a +the in-memory store provided in the `background-jobs-core` crate can be used, or the +`background-jobs-sled-storage` crate can be used to provide a [Sled](https://github.com/spacejam/sled)-backed jobs store. With that out of the way, back to the examples: @@ -136,7 +144,7 @@ With that out of the way, back to the examples: ##### Main ```rust use actix::System; -use background_jobs::{ServerConfig, SledStorage, WorkerConfig}; +use background_jobs::{ServerConfig, WorkerConfig}; use failure::Error; fn main() -> Result<(), Error> { @@ -144,22 +152,31 @@ fn main() -> Result<(), Error> { let sys = System::new("my-actix-system"); // Set up our Storage + // For this example, we use the default in-memory storage mechanism + use background_jobs::memory_storage::Storage; + let storage = Storage::new(); + + /* + // Optionally, a storage backend using the Sled database is provided + use sled::Db; + use background_jobs::sled_storage::Storage; let db = Db::start_default("my-sled-db")?; - let storage = SledStorage::new(db)?; + let storage = Storage::new(db)?; + */ // Start the application server. This guards access to to the jobs store - let queue_handle = ServerConfig::new(storage).start(); + let queue_handle = ServerConfig::new(storage).thread_count(8).start(); // Configure and start our workers - let mut worker_config = WorkerConfig::new(move || MyState::new("My App")); - worker_config.register(MyProcessor); - worker_config.set_processor_count(DEFAULT_QUEUE, 16); - worker_config.start(queue_handle.clone()); + WorkerConfig::new(move || MyState::new("My App")) + .register(MyProcessor(queue_handle.clone())) + .set_processor_count(DEFAULT_QUEUE, 16) + .start(queue_handle.clone()); // Queue our jobs - queue_handle.queue::(MyJob::new(1, 2))?; - queue_handle.queue::(MyJob::new(3, 4))?; - queue_handle.queue::(MyJob::new(5, 6))?; + queue_handle.queue(MyJob::new(1, 2))?; + queue_handle.queue(MyJob::new(3, 4))?; + queue_handle.queue(MyJob::new(5, 6))?; // Block on Actix sys.run()?; diff --git a/TODO b/TODO deleted file mode 100644 index dd7b2f3..0000000 --- a/TODO +++ /dev/null @@ -1,4 +0,0 @@ -1. -Gracefull Shutdown -2. -Periodically check staged jobs diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index b9acb29..3befe8c 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,11 +1,10 @@ use actix::System; use background_jobs::{ - Backoff, Job, MaxRetries, Processor, ServerConfig, SledStorage, WorkerConfig, QueueHandle, + Backoff, Job, MaxRetries, Processor, QueueHandle, ServerConfig, WorkerConfig, }; use failure::Error; use futures::{future::ok, Future}; use serde_derive::{Deserialize, Serialize}; -use sled::Db; const DEFAULT_QUEUE: &'static str = "default"; @@ -24,24 +23,37 @@ pub struct MyJob { pub struct MyProcessor(pub QueueHandle); fn main() -> Result<(), Error> { + // First set up the Actix System to ensure we have a runtime to spawn jobs on. let sys = System::new("my-actix-system"); + // Set up our Storage + // For this example, we use the default in-memory storage mechanism + use background_jobs::memory_storage::Storage; + let storage = Storage::new(); + + /* + // Optionally, a storage backend using the Sled database is provided + use sled::Db; + use background_jobs::sled_storage::Storage; let db = Db::start_default("my-sled-db")?; - let storage = SledStorage::new(db)?; + let storage = Storage::new(db)?; + */ - let queue_handle = ServerConfig::new(storage).thread_count(2).start(); - - let processor = MyProcessor(queue_handle.clone()); + // Start the application server. This guards access to to the jobs store + let queue_handle = ServerConfig::new(storage).thread_count(8).start(); + // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) - .register(processor.clone()) + .register(MyProcessor(queue_handle.clone())) .set_processor_count(DEFAULT_QUEUE, 16) .start(queue_handle.clone()); - processor.queue(MyJob::new(1, 2))?; - processor.queue(MyJob::new(3, 4))?; - processor.queue(MyJob::new(5, 6))?; + // Queue our jobs + queue_handle.queue(MyJob::new(1, 2))?; + queue_handle.queue(MyJob::new(3, 4))?; + queue_handle.queue(MyJob::new(5, 6))?; + // Block on Actix sys.run()?; Ok(()) } @@ -63,7 +75,10 @@ impl MyJob { } } -impl Job for MyJob { +impl Job for MyJob { + type Processor = MyProcessor; + type State = MyState; + fn run(self, state: MyState) -> Box + Send> { println!("{}: args, {:?}", state.app_name, self); @@ -71,13 +86,7 @@ impl Job for MyJob { } } -impl MyProcessor { - fn queue(&self, job: >::Job) -> Result<(), Error> { - self.0.queue::(job) - } -} - -impl Processor for MyProcessor { +impl Processor for MyProcessor { // The kind of job this processor should execute type Job = MyJob; diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index e2953f0..8370b8f 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -16,6 +16,7 @@ failure = "0.1" futures = "0.1" log = "0.4" num_cpus = "1.10.0" +rand = "0.6.5" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index c75fd54..8a24ed2 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, sync::Arc}; use actix::{Actor, Addr, SyncArbiter}; -use background_jobs_core::{Processor, ProcessorMap, Stats, Storage}; +use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; use failure::Error; use futures::Future; @@ -66,7 +66,7 @@ where Server::new(StorageWrapper(storage.clone())) }); - Pinger::new(server.clone()).start(); + Pinger::new(server.clone(), threads).start(); QueueHandle { inner: server } } @@ -91,11 +91,11 @@ where } } - pub fn register

(mut self, processor: P) -> Self - where - P: Processor + Send + 'static, - { - self.queues.insert(P::QUEUE.to_owned(), 4); + pub fn register( + mut self, + processor: impl Processor + Send + 'static> + Send + 'static, + ) -> Self { + self.queues.insert(processor.queue().to_owned(), 4); self.processors.register_processor(processor); self } @@ -130,12 +130,11 @@ pub struct QueueHandle { } impl QueueHandle { - pub fn queue(&self, job: P::Job) -> Result<(), Error> + pub fn queue(&self, job: J) -> Result<(), Error> where - P: Processor, - State: Clone, + J: Job, { - self.inner.do_send(NewJob(P::new_job(job)?)); + self.inner.do_send(NewJob(J::Processor::new_job(job)?)); Ok(()) } diff --git a/jobs-actix/src/pinger.rs b/jobs-actix/src/pinger.rs index 8ad8911..9cc7587 100644 --- a/jobs-actix/src/pinger.rs +++ b/jobs-actix/src/pinger.rs @@ -5,11 +5,12 @@ use crate::{CheckDb, Server}; pub struct Pinger { server: Addr, + threads: usize, } impl Pinger { - pub fn new(server: Addr) -> Self { - Pinger { server } + pub fn new(server: Addr, threads: usize) -> Self { + Pinger { server, threads } } } @@ -18,7 +19,9 @@ impl Actor for Pinger { fn started(&mut self, ctx: &mut Self::Context) { ctx.run_interval(Duration::from_secs(1), |actor, _| { - actor.server.do_send(CheckDb); + for _ in 0..actor.threads { + actor.server.do_send(CheckDb); + } }); } } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 7765cfe..8265b50 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -51,7 +51,7 @@ impl Message for RequestJob { } impl Message for CheckDb { - type Result = Result<(), Error>; + type Result = (); } impl Message for GetStats { @@ -70,7 +70,7 @@ impl Handler for Server { let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new()); if let Some(worker) = entry.pop_front() { - if let Some(job) = self.storage.request_job(&queue, worker.id())? { + if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()) { worker.process_job(job); } else { entry.push_back(worker); @@ -117,7 +117,7 @@ impl Handler for Server { } impl Handler for Server { - type Result = Result<(), Error>; + type Result = (); fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result { trace!("Checkdb"); @@ -125,7 +125,7 @@ impl Handler for Server { for (queue, workers) in self.cache.iter_mut() { while !workers.is_empty() { if let Some(worker) = workers.pop_front() { - if let Some(job) = self.storage.request_job(queue, worker.id())? { + if let Ok(Some(job)) = self.storage.request_job(queue, worker.id()) { worker.process_job(job); } else { workers.push_back(worker); @@ -134,8 +134,6 @@ impl Handler for Server { } } } - - Ok(()) } } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 968390f..4c8cdf7 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -21,13 +21,17 @@ use failure::Error; use futures::Future; use serde::{de::DeserializeOwned, ser::Serialize}; -use crate::{Backoff, MaxRetries}; +use crate::{Backoff, MaxRetries, Processor}; /// The Job trait defines parameters pertaining to an instance of background job -pub trait Job: Serialize + DeserializeOwned -where - S: Clone + 'static, -{ +pub trait Job: Serialize + DeserializeOwned + 'static { + /// The processor this job is associated with. The job's processor can be used to create a + /// JobInfo from a job, which is used to serialize the job into a storage mechanism. + type Processor: Processor; + + /// The application state provided to this job at runtime. + type State: Clone + 'static; + /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more @@ -37,7 +41,7 @@ where /// The state passed into this job is initialized at the start of the application. The state /// argument could be useful for containing a hook into something like r2d2, or the address of /// an actor in an actix-based system. - fn run(self, state: S) -> Box + Send>; + fn run(self, state: Self::State) -> Box + Send>; /// If this job should not use the default queue for its processor, this can be overridden in /// user-code. diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 5548d5c..8fc5ac4 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -33,7 +33,7 @@ pub use crate::{ processor::Processor, processor_map::ProcessorMap, stats::{JobStat, Stats}, - storage::Storage, + storage::{memory_storage, Storage}, }; #[derive(Debug, Fail)] diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 02c8bc5..0bd3a8a 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -81,11 +81,8 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// Ok(()) /// } /// ``` -pub trait Processor: Clone -where - S: Clone + 'static, -{ - type Job: Job + 'static; +pub trait Processor: Clone { + type Job: Job + 'static; /// The name of the processor /// @@ -175,7 +172,7 @@ where fn process( &self, args: Value, - state: S, + state: ::State, ) -> Box + Send> { let res = serde_json::from_value::(args); @@ -186,6 +183,16 @@ where Box::new(fut) } + + /// Hack to access Associated Constant from impl type + fn name(&self) -> &'static str { + Self::NAME + } + + /// Hack to access Associated Constant from impl type + fn queue(&self) -> &'static str { + Self::QUEUE + } } #[derive(Clone, Debug, Fail)] diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 78ea714..14c97ca 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -23,7 +23,7 @@ use futures::future::{Either, Future, IntoFuture}; use log::{error, info}; use serde_json::Value; -use crate::{JobError, JobInfo, Processor, ReturnJobInfo}; +use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo}; /// A generic function that processes a job /// @@ -72,12 +72,12 @@ where /// /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so /// make sure to register all your processors up-front. - pub fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + 'static, - { + pub fn register_processor( + &mut self, + processor: impl Processor + 'static> + Send + 'static, + ) { self.inner.insert( - P::NAME.to_owned(), + processor.name().to_owned(), Box::new(move |value, state| processor.process(value, state)), ); } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 8b45a6f..48e4e32 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -23,7 +23,14 @@ use log::error; use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; +/// Define a storage backend for jobs +/// +/// This crate provides a default implementation in the `memory_storage` module, which is backed by +/// HashMaps and uses counting to assign IDs. If jobs must be persistent across application +/// restarts, look into the `[sled-backed](https://github.com/spacejam/sled)` implementation from +/// the `background-jobs-sled-storage` crate. pub trait Storage: Clone + Send { + /// The error type used by the storage mechansim. type Error: Fail; /// This method generates unique IDs for jobs @@ -132,3 +139,142 @@ pub trait Storage: Clone + Send { } } } + +pub mod memory_storage { + use super::{JobInfo, Stats}; + use failure::Fail; + use std::{ + collections::HashMap, + fmt, + sync::{Arc, Mutex}, + }; + + #[derive(Clone)] + pub struct Storage { + inner: Arc>, + } + + #[derive(Clone)] + struct Inner { + count: u64, + jobs: HashMap, + queues: HashMap, + worker_ids: HashMap, + worker_ids_inverse: HashMap, + stats: Stats, + } + + impl Storage { + pub fn new() -> Self { + Storage { + inner: Arc::new(Mutex::new(Inner { + count: 0, + jobs: HashMap::new(), + queues: HashMap::new(), + worker_ids: HashMap::new(), + worker_ids_inverse: HashMap::new(), + stats: Stats::default(), + })), + } + } + } + + impl super::Storage for Storage { + type Error = Never; + + fn generate_id(&mut self) -> Result { + let mut inner = self.inner.lock().unwrap(); + let id = inner.count; + inner.count = inner.count.wrapping_add(1); + Ok(id) + } + + fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error> { + self.inner.lock().unwrap().jobs.insert(job.id(), job); + + Ok(()) + } + + fn fetch_job(&mut self, id: u64) -> Result, Self::Error> { + let j = self.inner.lock().unwrap().jobs.get(&id).map(|j| j.clone()); + + Ok(j) + } + + fn fetch_job_from_queue(&mut self, queue: &str) -> Result, Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + let j = inner + .queues + .iter() + .filter_map(|(k, v)| { + if v == queue { + inner.jobs.get(k).map(|j| j.clone()) + } else { + None + } + }) + .next(); + + if let Some(ref j) = j { + inner.queues.remove(&j.id()); + } + + Ok(j) + } + + fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error> { + self.inner + .lock() + .unwrap() + .queues + .insert(id, queue.to_owned()); + Ok(()) + } + + fn run_job(&mut self, id: u64, worker_id: u64) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + inner.worker_ids.insert(id, worker_id); + inner.worker_ids_inverse.insert(worker_id, id); + Ok(()) + } + + fn delete_job(&mut self, id: u64) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().unwrap(); + inner.jobs.remove(&id); + inner.queues.remove(&id); + if let Some(worker_id) = inner.worker_ids.remove(&id) { + inner.worker_ids_inverse.remove(&worker_id); + } + Ok(()) + } + + fn get_stats(&self) -> Result { + Ok(self.inner.lock().unwrap().stats.clone()) + } + + fn update_stats(&mut self, f: F) -> Result<(), Self::Error> + where + F: Fn(Stats) -> Stats, + { + let mut inner = self.inner.lock().unwrap(); + + inner.stats = (f)(inner.stats.clone()); + Ok(()) + } + } + + #[derive(Clone, Debug, Fail)] + pub enum Never {} + + impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } + } + + #[derive(Clone, Debug, Fail)] + #[fail(display = "Created too many storages, can't generate any more IDs")] + pub struct TooManyStoragesError; +} diff --git a/src/lib.rs b/src/lib.rs index a8db3c0..b503938 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -197,10 +197,14 @@ //! `background-jobs-core` crate, which provides the Processor and Job traits, as well as some //! other useful types for implementing a jobs processor and job store. -pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stats}; +pub use background_jobs_core::{ + memory_storage, Backoff, Job, JobStat, MaxRetries, Processor, Stats, +}; #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig}; #[cfg(feature = "background-jobs-sled-storage")] -pub use background_jobs_sled_storage::{Error as SledStorageError, SledStorage}; +pub mod sled_storage { + pub use background_jobs_sled_storage::{Error, SledStorage as Storage}; +}