From 74ac3a9b6191c59917893768c38328f8fa1a1cb2 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 20 Mar 2020 21:31:03 -0500 Subject: [PATCH] Update to new futures, new actix --- Cargo.toml | 8 +- examples/actix-example/Cargo.toml | 11 +- examples/actix-example/src/main.rs | 23 ++-- jobs-actix/Cargo.toml | 15 ++- jobs-actix/src/every.rs | 59 +++------ jobs-actix/src/lib.rs | 133 +++++-------------- jobs-actix/src/pinger.rs | 27 ---- jobs-actix/src/server.rs | 188 +++++++++++--------------- jobs-actix/src/storage.rs | 36 ++--- jobs-actix/src/worker.rs | 140 +++++++------------- jobs-core/Cargo.toml | 11 +- jobs-core/src/job.rs | 12 +- jobs-core/src/job_info.rs | 18 ++- jobs-core/src/lib.rs | 30 +++-- jobs-core/src/processor.rs | 61 ++++----- jobs-core/src/processor_map.rs | 46 +++---- jobs-core/src/stats.rs | 5 +- jobs-core/src/storage.rs | 140 +++++++++----------- jobs-sled/Cargo.toml | 5 +- jobs-sled/src/lib.rs | 206 ++++++++++++++++++++--------- src/lib.rs | 2 +- 21 files changed, 529 insertions(+), 647 deletions(-) delete mode 100644 jobs-actix/src/pinger.rs diff --git a/Cargo.toml b/Cargo.toml index 46182fc..d819b96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with sled, actix, and futures" -version = "0.7.0" +version = "0.8.0-alpha.0" license-file = "LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -21,15 +21,15 @@ members = [ default = ["background-jobs-actix", "background-jobs-sled-storage"] [dependencies.background-jobs-core] -version = "0.6" +version = "0.7.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.6" +version = "0.7.0-alpha.0" path = "jobs-actix" optional = true [dependencies.background-jobs-sled-storage] -version = "0.3.0" +version = "0.4.0-alpha.0" path = "jobs-sled" optional = true diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 9de73e3..b0dfe65 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -7,10 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix = "0.8" +actix = "0.10.0-alpha.2" +actix-rt = "1.0.0" +anyhow = "1.0" +async-trait = "0.1.24" background-jobs = { version = "0.7.0", path = "../.." } -failure = "0.1" -futures = "0.1" +env_logger = "0.7" sled-extensions = "0.2.0" -serde = "1.0" -serde_derive = "1.0" +serde = { version = "1.0", features = ["derive"] } diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 5fbfda1..5b12be5 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,7 +1,5 @@ -use actix::System; -use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig}; -use failure::Error; -use serde_derive::{Deserialize, Serialize}; +use anyhow::Error; +use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig}; const DEFAULT_QUEUE: &'static str = "default"; @@ -10,7 +8,7 @@ pub struct MyState { pub app_name: String, } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct MyJob { some_usize: usize, other_usize: usize, @@ -19,10 +17,9 @@ pub struct MyJob { #[derive(Clone, Debug)] pub struct MyProcessor; -fn main() -> Result<(), Error> { - // First set up the Actix System to ensure we have a runtime to spawn jobs on. - let sys = System::new("my-actix-system"); - +#[actix_rt::main] +async fn main() -> Result<(), Error> { + env_logger::init(); // Set up our Storage // For this example, we use the default in-memory storage mechanism use background_jobs::memory_storage::Storage; @@ -37,7 +34,7 @@ fn main() -> Result<(), Error> { */ // Start the application server. This guards access to to the jobs store - let queue_handle = ServerConfig::new(storage).thread_count(8).start(); + let queue_handle = create_server(storage); // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) @@ -51,7 +48,7 @@ fn main() -> Result<(), Error> { queue_handle.queue(MyJob::new(5, 6))?; // Block on Actix - sys.run()?; + actix_rt::signal::ctrl_c().await?; Ok(()) } @@ -72,12 +69,12 @@ impl MyJob { } } +#[async_trait::async_trait] impl Job for MyJob { type Processor = MyProcessor; type State = MyState; - type Future = Result<(), Error>; - fn run(self, state: MyState) -> Self::Future { + async fn run(self, state: MyState) -> Result<(), Error> { println!("{}: args, {:?}", state.app_name, self); Ok(()) diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 2e2ec55..3038e73 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.6.1" +version = "0.7.0-alpha.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -10,14 +10,17 @@ readme = "../README.md" edition = "2018" [dependencies] -actix = "0.8" -background-jobs-core = { version = "0.6", path = "../jobs-core" } +actix = "0.10.0-alpha.2" +actix-rt = "1.0.0" +anyhow = "1.0" +async-trait = "0.1.24" +background-jobs-core = { version = "0.7", path = "../jobs-core" } chrono = "0.4" -failure = "0.1" futures = "0.1" log = "0.4" num_cpus = "1.10.0" rand = "0.7.0" -serde = "1.0" -serde_derive = "1.0" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +thiserror = "1.0" +tokio = { version = "0.2.13", features = ["sync"] } diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 0756db6..187f352 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,55 +1,30 @@ -use std::time::Duration; - -use super::{Job, QueueHandle}; -use actix::{Actor, AsyncContext, Context}; +use crate::{Job, QueueHandle}; +use actix::{ + clock::{interval_at, Duration, Instant}, + Arbiter, +}; use log::error; /// A type used to schedule recurring jobs. /// /// ```rust,ignore /// let server = ServerConfig::new(storage).start(); -/// Every::new(server, Duration::from_secs(60 * 30), MyJob::new()).start(); +/// every(server, Duration::from_secs(60 * 30), MyJob::new()); /// ``` -pub struct Every +pub fn every(spawner: QueueHandle, duration: Duration, job: J) where - J: Job + Clone + 'static, + J: Job + Clone, { - spawner: QueueHandle, - duration: Duration, - job: J, -} + Arbiter::spawn(async move { + let mut interval = interval_at(Instant::now(), duration); -impl Every -where - J: Job + Clone + 'static, -{ - /// Create a new Every actor - pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self { - Every { - spawner, - duration, - job, - } - } -} + loop { + interval.tick().await; -impl Actor for Every -where - J: Job + Clone + 'static, -{ - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - match self.spawner.queue(self.job.clone()) { - Ok(_) => (), - Err(_) => error!("Failed to queue job"), - }; - - ctx.run_interval(self.duration.clone(), move |actor, _| { - match actor.spawner.queue(actor.job.clone()) { - Ok(_) => (), + match spawner.queue(job.clone()) { Err(_) => error!("Failed to queue job"), - } - }); - } + _ => (), + }; + } + }); } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 76d29a6..7f70b06 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -15,7 +15,6 @@ //! use actix::System; //! use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig}; //! use failure::Error; -//! use futures::{future::ok, Future}; //! use serde_derive::{Deserialize, Serialize}; //! //! const DEFAULT_QUEUE: &'static str = "default"; @@ -34,10 +33,8 @@ //! #[derive(Clone, Debug)] //! pub struct MyProcessor; //! -//! fn main() -> Result<(), Error> { -//! // First set up the Actix System to ensure we have a runtime to spawn jobs on. -//! let sys = System::new("my-actix-system"); -//! +//! #[actix_rt::main] +//! async fn main() -> Result<(), Error> { //! // Set up our Storage //! // For this example, we use the default in-memory storage mechanism //! use background_jobs::memory_storage::Storage; @@ -57,8 +54,8 @@ //! queue_handle.queue(MyJob::new(3, 4))?; //! queue_handle.queue(MyJob::new(5, 6))?; //! -//! // Block on Actix -//! sys.run()?; +//! actix_rt::signal::ctrl_c().await?; +//! //! Ok(()) //! } //! @@ -79,12 +76,12 @@ //! } //! } //! +//! #[async_trait::async_trait] //! impl Job for MyJob { //! type Processor = MyProcessor; //! type State = MyState; -//! type Future = Result<(), Error>; //! -//! fn run(self, state: MyState) -> Self::Future { +//! async fn run(self, state: MyState) -> Result<(), Error> { //! println!("{}: args, {:?}", state.app_name, self); //! //! Ok(()) @@ -120,84 +117,32 @@ //! } //! ``` +use actix::Arbiter; +use anyhow::Error; +use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; +use log::error; use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use actix::{Actor, Addr, Arbiter, SyncArbiter}; -use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; -use failure::{Error, Fail}; -use futures::{future::IntoFuture, Future}; - mod every; -mod pinger; mod server; mod storage; mod worker; -pub use self::{every::Every, server::Server, worker::LocalWorker}; +use self::{every::every, server::Server, worker::local_worker}; -use self::{ - pinger::Pinger, - server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob}, - storage::{ActixStorage, StorageWrapper}, - worker::Worker, -}; - -/// The configuration for a jobs server +/// Create a new Server /// -/// The server guards access to the storage backend, and keeps job information properly -/// up-to-date when workers request jobs to process -pub struct ServerConfig { - storage: S, - threads: usize, -} - -impl ServerConfig +/// In previous versions of this library, the server itself was run on it's own dedicated threads +/// and guarded access to jobs via messages. Since we now have futures-aware synchronization +/// primitives, the Server has become an object that gets shared between client threads. +/// +/// This method should only be called once. +pub fn create_server(storage: S) -> QueueHandle where S: Storage + Sync + 'static, - S::Error: Fail, { - /// Create a new ServerConfig - pub fn new(storage: S) -> Self { - ServerConfig { - storage, - threads: num_cpus::get(), - } - } - - /// Set the number of threads to use for the server. - /// - /// This is not related to the number of workers or the number of worker threads. This is - /// purely how many threads will be used to manage access to the job store. - /// - /// By default, this is the number of processor cores available to the application. On systems - /// with logical cores (such as Intel hyperthreads), this will be the total number of logical - /// cores. - /// - /// In certain cases, it may be beneficial to limit the server process count to 1. - /// - /// When using actix-web, any configuration performed inside `HttpServer::new` closure will - /// happen on each thread started by the web server. In order to reduce the number of running - /// threads, one job server can be started per web server thread. - /// - /// Another case to use a single server is if your job store has not locking guarantee, and you - /// want to enforce that no job can be requested more than once. The default storage - /// implementation does provide this guarantee, but other implementations may not. - pub fn thread_count(mut self, threads: usize) -> Self { - self.threads = threads; - self - } - - /// Spin up the server processes - pub fn start(self) -> QueueHandle { - let ServerConfig { storage, threads } = self; - - let server = SyncArbiter::start(threads, move || { - Server::new(StorageWrapper(storage.clone())) - }); - - Pinger::new(server.clone(), threads).start(); - - QueueHandle { inner: server } + QueueHandle { + inner: Server::new(storage), } } @@ -240,7 +185,6 @@ where where P: Processor + Send + Sync + 'static, J: Job, - ::Future: Send, { self.queues.insert(P::QUEUE.to_owned(), 4); self.processors.register_processor(processor); @@ -264,13 +208,12 @@ where self.queues.into_iter().fold(0, |acc, (key, count)| { (0..count).for_each(|i| { - LocalWorker::new( + local_worker( acc + i + 1000, key.clone(), processors.cached(), queue_handle.inner.clone(), - ) - .start(); + ); }); acc + count @@ -285,13 +228,13 @@ where let processors = processors.clone(); let queue_handle = queue_handle.clone(); let key = key.clone(); - LocalWorker::start_in_arbiter(arbiter, move |_| { - LocalWorker::new( + arbiter.exec_fn(move || { + local_worker( acc + i + 1000, key.clone(), processors.cached(), queue_handle.inner.clone(), - ) + ); }); }); @@ -306,7 +249,7 @@ where /// application to spawn jobs. #[derive(Clone)] pub struct QueueHandle { - inner: Addr, + inner: Server, } impl QueueHandle { @@ -318,7 +261,13 @@ impl QueueHandle { where J: Job, { - self.inner.do_send(NewJob(J::Processor::new_job(job)?)); + let job = J::Processor::new_job(job)?; + let server = self.inner.clone(); + actix::spawn(async move { + if let Err(e) = server.new_job(job).await { + error!("Error creating job, {}", e); + } + }); Ok(()) } @@ -330,21 +279,11 @@ impl QueueHandle { where J: Job + Clone + 'static, { - Every::new(self.clone(), duration, job).start(); + every(self.clone(), duration, job); } /// Return an overview of the processor's statistics - pub fn get_stats(&self) -> Box + Send> { - Box::new(self.inner.send(GetStats).then(coerce)) - } -} - -fn coerce(res: Result, F>) -> Result -where - E: From, -{ - match res { - Ok(inner) => inner, - Err(e) => Err(e.into()), + pub async fn get_stats(&self) -> Result { + self.inner.get_stats().await } } diff --git a/jobs-actix/src/pinger.rs b/jobs-actix/src/pinger.rs deleted file mode 100644 index 9cc7587..0000000 --- a/jobs-actix/src/pinger.rs +++ /dev/null @@ -1,27 +0,0 @@ -use actix::{Actor, Addr, AsyncContext, Context}; -use std::time::Duration; - -use crate::{CheckDb, Server}; - -pub struct Pinger { - server: Addr, - threads: usize, -} - -impl Pinger { - pub fn new(server: Addr, threads: usize) -> Self { - Pinger { server, threads } - } -} - -impl Actor for Pinger { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - ctx.run_interval(Duration::from_secs(1), |actor, _| { - for _ in 0..actor.threads { - actor.server.do_send(CheckDb); - } - }); - } -} diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index b318653..f95aa00 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,150 +1,122 @@ -use std::collections::{HashMap, VecDeque}; +use crate::{ + storage::{ActixStorage, StorageWrapper}, + worker::Worker, +}; +use anyhow::Error; +use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; +use log::{error, trace}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use tokio::sync::Mutex; -use actix::{Actor, Handler, Message, SyncContext}; -use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats}; -use failure::Error; -use log::trace; -use serde_derive::Deserialize; - -use crate::{ActixStorage, Worker}; +#[derive(Clone)] +pub(crate) struct ServerCache { + cache: Arc>>>>, +} /// The server Actor /// /// This server guards access to Thee storage, and keeps a list of workers that are waiting for /// jobs to process -pub struct Server { - storage: Box, - cache: HashMap>>, +#[derive(Clone)] +pub(crate) struct Server { + storage: Arc, + cache: ServerCache, } impl Server { - pub(crate) fn new(storage: impl ActixStorage + Send + 'static) -> Self { + /// Create a new Server from a compatible storage implementation + pub(crate) fn new(storage: S) -> Self + where + S: Storage + Sync + 'static, + { Server { - storage: Box::new(storage), - cache: HashMap::new(), + storage: Arc::new(StorageWrapper(storage)), + cache: ServerCache::new(), } } -} -impl Actor for Server { - type Context = SyncContext; -} + pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { + let queue = job.queue().to_owned(); + let ready = job.is_ready(); + self.storage.new_job(job).await?; -#[derive(Clone, Debug, Deserialize)] -pub struct NewJob(pub(crate) NewJobInfo); + if !ready { + return Ok(()); + } -#[derive(Clone, Debug, Deserialize)] -pub struct ReturningJob(pub(crate) ReturnJobInfo); - -pub struct RequestJob(pub(crate) Box); - -pub struct CheckDb; - -pub struct GetStats; - -impl Message for NewJob { - type Result = Result<(), Error>; -} - -impl Message for ReturningJob { - type Result = Result<(), Error>; -} - -impl Message for RequestJob { - type Result = Result<(), Error>; -} - -impl Message for CheckDb { - type Result = (); -} - -impl Message for GetStats { - type Result = Result; -} - -impl Handler for Server { - type Result = Result<(), Error>; - - fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result { - let queue = msg.0.queue().to_owned(); - let ready = msg.0.is_ready(); - self.storage.new_job(msg.0)?; - - if ready { - let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new()); - - if let Some(worker) = entry.pop_front() { - if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()) { - worker.process_job(job); - } else { - entry.push_back(worker); + if let Some(worker) = self.cache.pop(queue.clone()).await { + if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { + if let Err(job) = worker.process_job(job).await { + error!("Worker has hung up"); + self.storage.return_job(job.unexecuted()).await?; } + } else { + self.cache.push(queue, worker).await; } } Ok(()) } -} -impl Handler for Server { - type Result = Result<(), Error>; - - fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result { - self.storage.return_job(msg.0).map_err(|e| e.into()) - } -} - -impl Handler for Server { - type Result = Result<(), Error>; - - fn handle(&mut self, RequestJob(worker): RequestJob, _: &mut Self::Context) -> Self::Result { + pub(crate) async fn request_job( + &self, + worker: Box, + ) -> Result<(), Error> { trace!("Worker {} requested job", worker.id()); - let job = self.storage.request_job(worker.queue(), worker.id())?; - if let Some(job) = job { - worker.process_job(job.clone()); + if let Ok(Some(job)) = self.storage.request_job(worker.queue(), worker.id()).await { + if let Err(job) = worker.process_job(job).await { + error!("Worker has hung up"); + self.storage.return_job(job.unexecuted()).await?; + } } else { trace!( "storing worker {} for queue {}", worker.id(), worker.queue() ); - let entry = self - .cache - .entry(worker.queue().to_owned()) - .or_insert(VecDeque::new()); - entry.push_back(worker); + self.cache.push(worker.queue().to_owned(), worker).await; } Ok(()) } + + pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { + Ok(self.storage.return_job(job).await?) + } + + pub(crate) async fn get_stats(&self) -> Result { + Ok(self.storage.get_stats().await?) + } } -impl Handler for Server { - type Result = (); - - fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result { - trace!("Checkdb"); - - for (queue, workers) in self.cache.iter_mut() { - while !workers.is_empty() { - if let Some(worker) = workers.pop_front() { - if let Ok(Some(job)) = self.storage.request_job(queue, worker.id()) { - worker.process_job(job); - } else { - workers.push_back(worker); - break; - } - } - } +impl ServerCache { + fn new() -> Self { + ServerCache { + cache: Arc::new(Mutex::new(HashMap::new())), } } -} -impl Handler for Server { - type Result = Result; + async fn push(&self, queue: String, worker: Box) { + let mut cache = self.cache.lock().await; - fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result { - self.storage.get_stats().map_err(|e| e.into()) + let entry = cache.entry(queue).or_insert(VecDeque::new()); + entry.push_back(worker); + } + + async fn pop(&self, queue: String) -> Option> { + let mut cache = self.cache.lock().await; + + let mut vec_deque = cache.remove(&queue)?; + let item = vec_deque.pop_front()?; + + if !vec_deque.is_empty() { + cache.insert(queue, vec_deque); + } + + Some(item) } } diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs index 28cbcb3..0a1b0fc 100644 --- a/jobs-actix/src/storage.rs +++ b/jobs-actix/src/storage.rs @@ -1,39 +1,41 @@ +use anyhow::Error; use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage}; -use failure::{Error, Fail}; +#[async_trait::async_trait] pub(crate) trait ActixStorage { - fn new_job(&mut self, job: NewJobInfo) -> Result; + async fn new_job(&self, job: NewJobInfo) -> Result; - fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Error>; + async fn request_job(&self, queue: &str, runner_id: u64) -> Result, Error>; - fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error>; + async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>; - fn get_stats(&self) -> Result; + async fn get_stats(&self) -> Result; } pub(crate) struct StorageWrapper(pub(crate) S) where - S: Storage, - S::Error: Fail; + S: Storage + Send + Sync, + S::Error: Send + Sync + 'static; +#[async_trait::async_trait] impl ActixStorage for StorageWrapper where - S: Storage, - S::Error: Fail, + S: Storage + Send + Sync, + S::Error: Send + Sync + 'static, { - fn new_job(&mut self, job: NewJobInfo) -> Result { - self.0.new_job(job).map_err(Error::from) + async fn new_job(&self, job: NewJobInfo) -> Result { + Ok(self.0.new_job(job).await?) } - fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Error> { - self.0.request_job(queue, runner_id).map_err(Error::from) + async fn request_job(&self, queue: &str, runner_id: u64) -> Result, Error> { + Ok(self.0.request_job(queue, runner_id).await?) } - fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error> { - self.0.return_job(ret).map_err(Error::from) + async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> { + Ok(self.0.return_job(ret).await?) } - fn get_stats(&self) -> Result { - self.0.get_stats().map_err(Error::from) + async fn get_stats(&self) -> Result { + Ok(self.0.get_stats().await?) } } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index e6cfa89..f41053c 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,38 +1,34 @@ -use actix::{ - dev::ToEnvelope, - fut::{wrap_future, ActorFuture}, - Actor, Addr, AsyncContext, Context, Handler, Message, -}; -use background_jobs_core::{JobInfo, CachedProcessorMap}; -use log::info; - -use crate::{RequestJob, ReturningJob}; +use crate::Server; +use background_jobs_core::{CachedProcessorMap, JobInfo}; +use log::{debug, error, warn}; +use tokio::sync::mpsc::{channel, Sender}; +#[async_trait::async_trait] pub trait Worker { - fn process_job(&self, job: JobInfo); + async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo>; fn id(&self) -> u64; fn queue(&self) -> &str; } -pub struct LocalWorkerHandle -where - W: Actor + Handler, - W::Context: ToEnvelope, -{ - addr: Addr, +#[derive(Clone)] +pub(crate) struct LocalWorkerHandle { + tx: Sender, id: u64, queue: String, } -impl Worker for LocalWorkerHandle -where - W: Actor + Handler, - W::Context: ToEnvelope, -{ - fn process_job(&self, job: JobInfo) { - self.addr.do_send(ProcessJob(job)); +#[async_trait::async_trait] +impl Worker for LocalWorkerHandle { + async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo> { + match self.tx.clone().send(job).await { + Err(e) => { + error!("Unable to send job"); + Err(e.0) + } + _ => Ok(()), + } } fn id(&self) -> u64 { @@ -44,79 +40,39 @@ where } } -/// A worker that runs on the same system as the jobs server -pub struct LocalWorker -where - S: Actor + Handler + Handler, - S::Context: ToEnvelope + ToEnvelope, - State: Clone + 'static, -{ +pub(crate) fn local_worker( id: u64, queue: String, processors: CachedProcessorMap, - server: Addr, -} - -impl LocalWorker -where - S: Actor + Handler + Handler, - S::Context: ToEnvelope + ToEnvelope, + server: Server, +) where State: Clone + 'static, { - /// Create a new local worker - pub fn new(id: u64, queue: String, processors: CachedProcessorMap, server: Addr) -> Self { - LocalWorker { - id, - queue, - processors, - server, + let (tx, mut rx) = channel(16); + + let handle = LocalWorkerHandle { + tx: tx.clone(), + id, + queue: queue.clone(), + }; + + actix::spawn(async move { + debug!("Beginning worker loop for {}", id); + if let Err(e) = server.request_job(Box::new(handle.clone())).await { + error!("Couldn't request first job, bailing, {}", e); + return; } - } -} - -impl Actor for LocalWorker -where - S: Actor + Handler + Handler, - S::Context: ToEnvelope + ToEnvelope, - State: Clone + 'static, -{ - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - self.server.do_send(RequestJob(Box::new(LocalWorkerHandle { - id: self.id, - queue: self.queue.clone(), - addr: ctx.address(), - }))); - } -} - -pub struct ProcessJob(JobInfo); - -impl Message for ProcessJob { - type Result = (); -} - -impl Handler for LocalWorker -where - S: Actor + Handler + Handler, - S::Context: ToEnvelope + ToEnvelope, - State: Clone + 'static, -{ - type Result = (); - - fn handle(&mut self, ProcessJob(job): ProcessJob, ctx: &mut Self::Context) -> Self::Result { - info!("Worker {} processing job {}", self.id, job.id()); - let fut = - wrap_future::<_, Self>(self.processors.process_job(job)).map(|job, actor, ctx| { - actor.server.do_send(ReturningJob(job)); - actor.server.do_send(RequestJob(Box::new(LocalWorkerHandle { - id: actor.id, - queue: actor.queue.clone(), - addr: ctx.address(), - }))); - }); - - ctx.spawn(fut); - } + while let Some(job) = rx.recv().await { + let return_job = processors.process_job(job).await; + + if let Err(e) = server.return_job(return_job).await { + error!("Error returning job, {}", e); + } + if let Err(e) = server.request_job(Box::new(handle.clone())).await { + error!("Error requesting job, {}", e); + break; + } + } + warn!("Worker {} closing", id); + }); } diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index cec7ed0..28c4454 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.6.1" +version = "0.7.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -10,10 +10,11 @@ readme = "../README.md" edition = "2018" [dependencies] +anyhow = "1.0" +async-trait = "0.1.24" chrono = { version = "0.4", features = ["serde"] } -failure = "0.1" -futures = "0.1.21" +futures = "0.3.4" log = "0.4" -serde = "1.0" -serde_derive = "1.0" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +thiserror = "1.0" diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 1fd69e7..77055d5 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,10 +1,9 @@ -use failure::Error; -use futures::future::IntoFuture; +use crate::{Backoff, MaxRetries, Processor}; +use anyhow::Error; use serde::{de::DeserializeOwned, ser::Serialize}; -use crate::{Backoff, MaxRetries, Processor}; - /// The Job trait defines parameters pertaining to an instance of background job +#[async_trait::async_trait] pub trait Job: Serialize + DeserializeOwned + 'static { /// The processor this job is associated with. The job's processor can be used to create a /// JobInfo from a job, which is used to serialize the job into a storage mechanism. @@ -13,9 +12,6 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The application state provided to this job at runtime. type State: Clone + 'static; - /// The result of running this operation - type Future: IntoFuture; - /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more @@ -25,7 +21,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The state passed into this job is initialized at the start of the application. The state /// argument could be useful for containing a hook into something like r2d2, or the address of /// an actor in an actix-based system. - fn run(self, state: Self::State) -> Self::Future; + async fn run(self, state: Self::State) -> Result<(), Error>; /// If this job should not use the default queue for its processor, this can be overridden in /// user-code. diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 9e0612d..1f97752 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,11 +1,9 @@ +use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; use chrono::{offset::Utc, DateTime, Duration as OldDuration}; use log::trace; -use serde_derive::{Deserialize, Serialize}; use serde_json::Value; -use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; - -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] /// Information about the sate of an attempted job pub struct ReturnJobInfo { pub(crate) id: u64, @@ -35,7 +33,7 @@ impl ReturnJobInfo { } } -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] /// Information about a newly created job pub struct NewJobInfo { /// Name of the processor that should handle this job @@ -105,7 +103,7 @@ impl NewJobInfo { } } -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] /// Metadata pertaining to a job that exists within the background_jobs system /// /// Although exposed publically, this type should only really be handled by the library itself, and @@ -167,6 +165,14 @@ impl JobInfo { self.id } + /// Convert a JobInfo into a ReturnJobInfo without executing it + pub fn unexecuted(self) -> ReturnJobInfo { + ReturnJobInfo { + id: self.id, + result: JobResult::Unexecuted, + } + } + pub(crate) fn increment(&mut self) -> ShouldStop { self.updated(); self.retry_count += 1; diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 8915916..85807f9 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -6,8 +6,7 @@ //! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs //! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate. -use failure::{Error, Fail}; -use serde_derive::{Deserialize, Serialize}; +use anyhow::Error; mod job; mod job_info; @@ -25,23 +24,23 @@ pub use crate::{ storage::{memory_storage, Storage}, }; -#[derive(Debug, Fail)] +#[derive(Debug, thiserror::Error)] /// The error type returned by a `Processor`'s `process` method pub enum JobError { /// Some error occurred while processing the job - #[fail(display = "Error performing job: {}", _0)] - Processing(#[cause] Error), + #[error("Error performing job: {0}")] + Processing(#[from] Error), /// Creating a `Job` type from the provided `serde_json::Value` failed - #[fail(display = "Could not make JSON value from arguments")] + #[error("Could not make JSON value from arguments")] Json, /// No processor was present to handle a given job - #[fail(display = "No processor available for job")] + #[error("No processor available for job")] MissingProcessor, } -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// Indicate the state of a job after an attempted run pub enum JobResult { /// The job succeeded @@ -52,6 +51,9 @@ pub enum JobResult { /// There was no processor to run the job MissingProcessor, + + /// The worker requesting this job closed + Unexecuted, } impl JobResult { @@ -86,7 +88,7 @@ impl JobResult { } } -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// Set the status of a job when storing it pub enum JobStatus { /// Job should be queued @@ -118,7 +120,7 @@ impl JobStatus { } } -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// Different styles for retrying jobs pub enum Backoff { /// Seconds between execution @@ -135,7 +137,7 @@ pub enum Backoff { Exponential(usize), } -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// How many times a job should be retried before giving up pub enum MaxRetries { /// Keep retrying forever @@ -176,3 +178,9 @@ impl ShouldStop { *self == ShouldStop::Requeue } } + +impl From for JobError { + fn from(_: serde_json::error::Error) -> Self { + JobError::Json + } +} diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 4bdc52b..ab8d130 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -1,12 +1,8 @@ -use chrono::{offset::Utc, DateTime}; -use failure::{Error, Fail}; -use futures::{ - future::{Either, IntoFuture}, - Future, -}; -use serde_json::Value; - use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; +use anyhow::Error; +use chrono::{offset::Utc, DateTime}; +use serde_json::Value; +use std::{future::Future, pin::Pin}; /// ## The Processor trait /// @@ -25,11 +21,12 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// ### Example /// /// ```rust +/// use anyhow::Error; /// use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; -/// use failure::Error; -/// use futures::future::Future; +/// use futures::future::{ok, Ready}; /// use log::info; /// use serde_derive::{Deserialize, Serialize}; +/// use std::future::Future; /// /// #[derive(Deserialize, Serialize)] /// struct MyJob { @@ -39,12 +36,12 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// impl Job for MyJob { /// type Processor = MyProcessor; /// type State = (); -/// type Future = Result<(), Error>; +/// type Future = Ready>; /// /// fn run(self, _state: Self::State) -> Self::Future { /// info!("Processing {}", self.count); /// -/// Ok(()) +/// ok(()) /// } /// } /// @@ -133,20 +130,18 @@ pub trait Processor: Clone { /// &self, /// args: Value, /// state: S - /// ) -> Box + Send> { + /// ) -> Pin> + Send>> { /// let res = serde_json::from_value::(args); /// - /// let fut = match res { - /// Ok(job) => { - /// // Perform some custom pre-job logic - /// Either::A(job.run(state).map_err(JobError::Processing)) - /// }, - /// Err(_) => Either::B(Err(JobError::Json).into_future()), - /// }; + /// Box::pin(async move { + /// let job = res.map_err(|_| JobError::Json)?; + /// // Perform some custom pre-job locic + /// + /// job.run(state).await.map_err(JobError::Processing)?; /// - /// Box::new(fut.and_then(|_| { /// // Perform some custom post-job logic - /// })) + /// Ok(()) + /// }) /// } /// ``` /// @@ -157,21 +152,19 @@ pub trait Processor: Clone { &self, args: Value, state: ::State, - ) -> Box + Send> - where - <::Future as IntoFuture>::Future: Send, - { - let res = serde_json::from_value::(args); + ) -> Pin> + Send>> { + // Call run on the job here because State isn't Send, but the future produced by job IS + // Send + let res = serde_json::from_value::(args).map(move |job| job.run(state)); - let fut = match res { - Ok(job) => Either::A(job.run(state).into_future().map_err(JobError::Processing)), - Err(_) => Either::B(Err(JobError::Json).into_future()), - }; + Box::pin(async move { + res?.await?; - Box::new(fut) + Ok(()) + }) } } -#[derive(Clone, Debug, Fail)] -#[fail(display = "Failed to to turn job into value")] +#[derive(Clone, Debug, thiserror::Error)] +#[error("Failed to to turn job into value")] pub struct ToJson; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index cb679fe..17af5e5 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,17 +1,15 @@ -use std::{collections::HashMap, sync::Arc}; - -use futures::future::{Either, Future, IntoFuture}; +use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo}; use log::{error, info}; use serde_json::Value; - -use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; /// A generic function that processes a job /// /// Instead of storing [`Processor`] type directly, the [`ProcessorMap`] /// struct stores these `ProcessFn` types that don't expose differences in Job types. -pub type ProcessFn = - Arc Box + Send> + Send + Sync>; +pub type ProcessFn = Arc< + dyn Fn(Value, S) -> Pin> + Send>> + Send + Sync, +>; pub type StateFn = Arc S + Send + Sync>; @@ -59,7 +57,6 @@ where where P: Processor + Sync + Send + 'static, J: Job, - ::Future: Send, { self.inner.insert( P::NAME.to_owned(), @@ -79,17 +76,17 @@ where /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. - pub fn process_job(&self, job: JobInfo) -> impl Future { + pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo { let opt = self .inner .get(job.processor()) .map(|processor| process(processor, (self.state_fn)(), job.clone())); if let Some(fut) = opt { - Either::A(fut) + fut.await } else { error!("Processor {} not present", job.processor()); - Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future()) + ReturnJobInfo::missing_processor(job.id()) } } } @@ -102,38 +99,29 @@ where /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. - pub fn process_job(&self, job: JobInfo) -> impl Future { - let opt = self - .inner - .get(job.processor()) - .map(|processor| process(processor, self.state.clone(), job.clone())); - - if let Some(fut) = opt { - Either::A(fut) + pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo { + if let Some(processor) = self.inner.get(job.processor()) { + process(processor, self.state.clone(), job).await } else { error!("Processor {} not present", job.processor()); - Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future()) + ReturnJobInfo::missing_processor(job.id()) } } } -fn process( - process_fn: &ProcessFn, - state: S, - job: JobInfo, -) -> impl Future { +async fn process(process_fn: &ProcessFn, state: S, job: JobInfo) -> ReturnJobInfo { let args = job.args(); let id = job.id(); let processor = job.processor().to_owned(); - process_fn(args, state).then(move |res| match res { + match process_fn(args, state).await { Ok(_) => { info!("Job {} completed, {}", id, processor); - Ok(ReturnJobInfo::pass(id)) + ReturnJobInfo::pass(id) } Err(e) => { error!("Job {} errored, {}, {}", id, processor, e); - Ok(ReturnJobInfo::fail(id)) + ReturnJobInfo::fail(id) } - }) + } } diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs index 7d4df6e..6d5c10f 100644 --- a/jobs-core/src/stats.rs +++ b/jobs-core/src/stats.rs @@ -1,7 +1,6 @@ use chrono::{offset::Utc, DateTime, Datelike, Timelike}; -use serde_derive::{Deserialize, Serialize}; -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] /// Statistics about the jobs processor pub struct Stats { /// How many jobs are pending execution @@ -72,7 +71,7 @@ impl Default for Stats { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] /// A time-based overview of job completion and failures pub struct JobStat { this_hour: usize, diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 70cfbed..5e79a17 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,6 +1,6 @@ use chrono::offset::Utc; -use failure::Fail; use log::error; +use std::error::Error; use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; @@ -10,72 +10,77 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; /// HashMaps and uses counting to assign IDs. If jobs must be persistent across application /// restarts, look into the [`sled-backed`](https://github.com/spacejam/sled) implementation from /// the `background-jobs-sled-storage` crate. +#[async_trait::async_trait] pub trait Storage: Clone + Send { /// The error type used by the storage mechansim. - type Error: Fail; + type Error: Error + Send + Sync; /// This method generates unique IDs for jobs - fn generate_id(&mut self) -> Result; + async fn generate_id(&self) -> Result; /// This method should store the supplied job /// /// The supplied job _may already be present_. The implementation should overwrite the stored /// job with the new job so that future calls to `fetch_job` return the new one. - fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error>; + async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error>; /// This method should return the job with the given ID regardless of what state the job is in. - fn fetch_job(&mut self, id: u64) -> Result, Self::Error>; + async fn fetch_job(&self, id: u64) -> Result, Self::Error>; /// This should fetch a job ready to be processed from the queue /// /// If a job is not ready, is currently running, or is not in the requested queue, this method /// should not return it. If no jobs meet these criteria, this method should return Ok(None) - fn fetch_job_from_queue(&mut self, queue: &str) -> Result, Self::Error>; + async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error>; /// This method tells the storage mechanism to mark the given job as being in the provided /// queue - fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error>; + async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error>; /// This method tells the storage mechanism to mark a given job as running - fn run_job(&mut self, id: u64, runner_id: u64) -> Result<(), Self::Error>; + async fn run_job(&self, id: u64, runner_id: u64) -> Result<(), Self::Error>; /// This method tells the storage mechanism to remove the job /// /// This happens when a job has been completed or has failed too many times - fn delete_job(&mut self, id: u64) -> Result<(), Self::Error>; + async fn delete_job(&self, id: u64) -> Result<(), Self::Error>; /// This method returns the current statistics, or Stats::default() if none exists. - fn get_stats(&self) -> Result; + async fn get_stats(&self) -> Result; /// This method fetches the existing statistics or Stats::default(), and stores the result of /// calling `update_stats` on it. - fn update_stats(&mut self, f: F) -> Result<(), Self::Error> + async fn update_stats(&self, f: F) -> Result<(), Self::Error> where - F: Fn(Stats) -> Stats; + F: Fn(Stats) -> Stats + Send + 'static; /// Generate a new job based on the provided NewJobInfo - fn new_job(&mut self, job: NewJobInfo) -> Result { - let id = self.generate_id()?; + async fn new_job(&self, job: NewJobInfo) -> Result { + let id = self.generate_id().await?; let job = job.with_id(id); let queue = job.queue().to_owned(); - self.save_job(job)?; - self.queue_job(&queue, id)?; - self.update_stats(Stats::new_job)?; + self.save_job(job).await?; + self.queue_job(&queue, id).await?; + self.update_stats(Stats::new_job).await?; Ok(id) } /// Fetch a job that is ready to be executed, marking it as running - fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Self::Error> { - match self.fetch_job_from_queue(queue)? { + async fn request_job( + &self, + queue: &str, + runner_id: u64, + ) -> Result, Self::Error> { + match self.fetch_job_from_queue(queue).await? { Some(mut job) => { if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) { job.run(); - self.run_job(job.id(), runner_id)?; - self.save_job(job.clone())?; - self.update_stats(Stats::run_job)?; + self.run_job(job.id(), runner_id).await?; + self.save_job(job.clone()).await?; + self.update_stats(Stats::run_job).await?; Ok(Some(job)) } else { @@ -91,35 +96,35 @@ pub trait Storage: Clone + Send { } /// "Return" a job to the database, marking it for retry if needed - fn return_job( - &mut self, + async fn return_job( + &self, ReturnJobInfo { id, result }: ReturnJobInfo, ) -> Result<(), Self::Error> { if result.is_failure() { - if let Some(mut job) = self.fetch_job(id)? { + if let Some(mut job) = self.fetch_job(id).await? { if job.needs_retry() { - self.queue_job(job.queue(), id)?; - self.save_job(job)?; - self.update_stats(Stats::retry_job) + self.queue_job(job.queue(), id).await?; + self.save_job(job).await?; + self.update_stats(Stats::retry_job).await } else { - self.delete_job(id)?; - self.update_stats(Stats::fail_job) + self.delete_job(id).await?; + self.update_stats(Stats::fail_job).await } } else { Ok(()) } } else if result.is_missing_processor() { - if let Some(mut job) = self.fetch_job(id)? { + if let Some(mut job) = self.fetch_job(id).await? { job.pending(); - self.queue_job(job.queue(), id)?; - self.save_job(job)?; - self.update_stats(Stats::retry_job) + self.queue_job(job.queue(), id).await?; + self.save_job(job).await?; + self.update_stats(Stats::retry_job).await } else { Ok(()) } } else { - self.delete_job(id)?; - self.update_stats(Stats::complete_job) + self.delete_job(id).await?; + self.update_stats(Stats::complete_job).await } } } @@ -127,12 +132,8 @@ pub trait Storage: Clone + Send { /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { use super::{JobInfo, Stats}; - use failure::Fail; - use std::{ - collections::HashMap, - fmt, - sync::{Arc, Mutex}, - }; + use futures::lock::Mutex; + use std::{collections::HashMap, convert::Infallible, sync::Arc}; #[derive(Clone)] /// An In-Memory store for jobs @@ -166,30 +167,31 @@ pub mod memory_storage { } } + #[async_trait::async_trait] impl super::Storage for Storage { - type Error = Never; + type Error = Infallible; - fn generate_id(&mut self) -> Result { - let mut inner = self.inner.lock().unwrap(); + async fn generate_id(&self) -> Result { + let mut inner = self.inner.lock().await; let id = inner.count; inner.count = inner.count.wrapping_add(1); Ok(id) } - fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error> { - self.inner.lock().unwrap().jobs.insert(job.id(), job); + async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> { + self.inner.lock().await.jobs.insert(job.id(), job); Ok(()) } - fn fetch_job(&mut self, id: u64) -> Result, Self::Error> { - let j = self.inner.lock().unwrap().jobs.get(&id).map(|j| j.clone()); + async fn fetch_job(&self, id: u64) -> Result, Self::Error> { + let j = self.inner.lock().await.jobs.get(&id).map(|j| j.clone()); Ok(j) } - fn fetch_job_from_queue(&mut self, queue: &str) -> Result, Self::Error> { - let mut inner = self.inner.lock().unwrap(); + async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error> { + let mut inner = self.inner.lock().await; let j = inner .queues @@ -210,25 +212,21 @@ pub mod memory_storage { Ok(j) } - fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error> { - self.inner - .lock() - .unwrap() - .queues - .insert(id, queue.to_owned()); + async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error> { + self.inner.lock().await.queues.insert(id, queue.to_owned()); Ok(()) } - fn run_job(&mut self, id: u64, worker_id: u64) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().unwrap(); + async fn run_job(&self, id: u64, worker_id: u64) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().await; inner.worker_ids.insert(id, worker_id); inner.worker_ids_inverse.insert(worker_id, id); Ok(()) } - fn delete_job(&mut self, id: u64) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().unwrap(); + async fn delete_job(&self, id: u64) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().await; inner.jobs.remove(&id); inner.queues.remove(&id); if let Some(worker_id) = inner.worker_ids.remove(&id) { @@ -237,28 +235,18 @@ pub mod memory_storage { Ok(()) } - fn get_stats(&self) -> Result { - Ok(self.inner.lock().unwrap().stats.clone()) + async fn get_stats(&self) -> Result { + Ok(self.inner.lock().await.stats.clone()) } - fn update_stats(&mut self, f: F) -> Result<(), Self::Error> + async fn update_stats(&self, f: F) -> Result<(), Self::Error> where - F: Fn(Stats) -> Stats, + F: Fn(Stats) -> Stats + Send, { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().await; inner.stats = (f)(inner.stats.clone()); Ok(()) } } - - #[derive(Clone, Debug, Fail)] - /// An error that is impossible to create - pub enum Never {} - - impl fmt::Display for Never { - fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { - match *self {} - } - } } diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index f88bbb7..94f6945 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -11,6 +11,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -background-jobs-core = { version = "0.6", path = "../jobs-core" } +actix-threadpool = "0.3.1" +async-trait = "0.1.24" +background-jobs-core = { version = "0.7", path = "../jobs-core" } chrono = "0.4" sled-extensions = { version = "0.2", features = ["bincode", "cbor"] } +thiserror = "1.0" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 1a0d039..cca92f2 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -13,11 +13,25 @@ //! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! ``` +use actix_threadpool::{run, BlockingError}; use background_jobs_core::{JobInfo, Stats, Storage}; use chrono::offset::Utc; use sled_extensions::{bincode::Tree, cbor, Db, DbExt}; -pub use sled_extensions::{Error, Result}; +/// The error produced by sled storage calls +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error in the database + #[error("Error in sled extensions, {0}")] + Sled(sled_extensions::Error), + + /// Error executing db operation + #[error("Blocking operation was canceled")] + Canceled, +} + +/// A simple alias for Result +pub type Result = std::result::Result; #[derive(Clone)] /// The Sled-backed storage implementation @@ -31,95 +45,144 @@ pub struct SledStorage { db: Db, } +#[async_trait::async_trait] impl Storage for SledStorage { type Error = Error; - fn generate_id(&mut self) -> Result { - Ok(self.db.generate_id()?) + async fn generate_id(&self) -> Result { + let this = self.clone(); + + Ok(run(move || Ok(this.db.generate_id()?) as sled_extensions::Result).await?) } - fn save_job(&mut self, job: JobInfo) -> Result<()> { - self.jobinfo - .insert(job_key(job.id()).as_bytes(), job) - .map(|_| ()) + async fn save_job(&self, job: JobInfo) -> Result<()> { + let this = self.clone(); + + Ok(run(move || { + this.jobinfo + .insert(job_key(job.id()).as_bytes(), job) + .map(|_| ()) + }) + .await?) } - fn fetch_job(&mut self, id: u64) -> Result> { - self.jobinfo.get(job_key(id)) + async fn fetch_job(&self, id: u64) -> Result> { + let this = self.clone(); + + Ok(run(move || this.jobinfo.get(job_key(id))).await?) } - fn fetch_job_from_queue(&mut self, queue: &str) -> Result> { - let queue_tree = self.queue.clone(); - let job_tree = self.jobinfo.clone(); + async fn fetch_job_from_queue(&self, queue: &str) -> Result> { + let this = self.clone(); + let queue = queue.to_owned(); - self.lock_queue(queue, move || { - let now = Utc::now(); + Ok(run(move || { + let queue_tree = this.queue.clone(); + let job_tree = this.jobinfo.clone(); + let queue2 = queue.clone(); - let job = queue_tree - .iter() - .filter_map(|res| res.ok()) - .filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None }) - .filter_map(|id| job_tree.get(id).ok()) - .filter_map(|opt| opt) - .filter(|job| job.is_ready(now)) - .next(); + this.lock_queue(&queue2, move || { + let now = Utc::now(); - if let Some(ref job) = job { - queue_tree.remove(&job_key(job.id()))?; + let job = queue_tree + .iter() + .filter_map(|res| res.ok()) + .filter_map( + |(id, in_queue)| { + if queue == in_queue { + Some(id) + } else { + None + } + }, + ) + .filter_map(|id| job_tree.get(id).ok()) + .filter_map(|opt| opt) + .filter(|job| job.is_ready(now)) + .next(); + + if let Some(ref job) = job { + queue_tree.remove(&job_key(job.id()))?; + } + + Ok(job) as sled_extensions::Result> + }) + }) + .await?) + } + + async fn queue_job(&self, queue: &str, id: u64) -> Result<()> { + let this = self.clone(); + let queue = queue.to_owned(); + + Ok(run(move || { + if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { + this.running.remove(&runner_key(runner_id))?; } - Ok(job) + this.queue.insert(job_key(id).as_bytes(), queue).map(|_| ()) }) + .await?) } - fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> { - if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? { - self.running.remove(&runner_key(runner_id))?; - } + async fn run_job(&self, id: u64, runner_id: u64) -> Result<()> { + let this = self.clone(); - self.queue - .insert(job_key(id).as_bytes(), queue.to_owned()) - .map(|_| ()) + Ok(run(move || { + this.queue.remove(job_key(id))?; + this.running.insert(runner_key(runner_id).as_bytes(), id)?; + this.running_inverse + .insert(job_key(id).as_bytes(), runner_id)?; + + Ok(()) as Result<()> + }) + .await?) } - fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> { - self.queue.remove(job_key(id))?; - self.running.insert(runner_key(runner_id).as_bytes(), id)?; - self.running_inverse - .insert(job_key(id).as_bytes(), runner_id)?; + async fn delete_job(&self, id: u64) -> Result<()> { + let this = self.clone(); - Ok(()) + Ok(run(move || { + this.jobinfo.remove(&job_key(id))?; + this.queue.remove(&job_key(id))?; + + if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { + this.running.remove(&runner_key(runner_id))?; + } + + Ok(()) as Result<()> + }) + .await?) } - fn delete_job(&mut self, id: u64) -> Result<()> { - self.jobinfo.remove(&job_key(id))?; - self.queue.remove(&job_key(id))?; + async fn get_stats(&self) -> Result { + let this = self.clone(); - if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? { - self.running.remove(&runner_key(runner_id))?; - } - - Ok(()) + Ok( + run(move || Ok(this.stats.get("stats")?.unwrap_or(Stats::default())) as Result) + .await?, + ) } - fn get_stats(&self) -> Result { - Ok(self.stats.get("stats")?.unwrap_or(Stats::default())) - } - - fn update_stats(&mut self, f: F) -> Result<()> + async fn update_stats(&self, f: F) -> Result<()> where - F: Fn(Stats) -> Stats, + F: Fn(Stats) -> Stats + Send + 'static, { - self.stats.fetch_and_update("stats", |opt| { - let stats = match opt { - Some(stats) => stats, - None => Stats::default(), - }; + let this = self.clone(); - Some((f)(stats)) - })?; + Ok(run(move || { + this.stats.fetch_and_update("stats", move |opt| { + let stats = match opt { + Some(stats) => stats, + None => Stats::default(), + }; - Ok(()) + Some((f)(stats)) + })?; + + Ok(()) as Result<()> + }) + .await?) } } @@ -137,9 +200,9 @@ impl SledStorage { }) } - fn lock_queue(&self, queue: &str, f: F) -> Result + fn lock_queue(&self, queue: &str, f: F) -> sled_extensions::Result where - F: Fn() -> Result, + F: Fn() -> sled_extensions::Result, { let id = self.db.generate_id()?; @@ -168,3 +231,22 @@ fn job_key(id: u64) -> String { fn runner_key(runner_id: u64) -> String { format!("runner-{}", runner_id) } + +impl From> for Error +where + Error: From, + T: std::fmt::Debug, +{ + fn from(e: BlockingError) -> Self { + match e { + BlockingError::Error(e) => e.into(), + BlockingError::Canceled => Error::Canceled, + } + } +} + +impl From for Error { + fn from(e: sled_extensions::Error) -> Self { + Error::Sled(e) + } +} diff --git a/src/lib.rs b/src/lib.rs index c94460f..9ffc255 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -211,7 +211,7 @@ pub use background_jobs_core::{ }; #[cfg(feature = "background-jobs-actix")] -pub use background_jobs_actix::{Every, QueueHandle, ServerConfig, WorkerConfig}; +pub use background_jobs_actix::{create_server, QueueHandle, WorkerConfig}; #[cfg(feature = "background-jobs-sled-storage")] pub mod sled_storage {