diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index bc544a4..9e2c8ea 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -23,3 +23,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" tokio = { version = "0.2.13", features = ["sync"] } +uuid = { version ="0.8.1", features = ["v4", "serde"] } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 06023a6..9e78ffe 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -213,42 +213,30 @@ where /// Start the workers in the current arbiter pub fn start(self, queue_handle: QueueHandle) { - let processors = self.processors.clone(); - - self.queues.into_iter().fold(0, |acc, (key, count)| { - (0..count).for_each(|i| { + for (key, count) in self.queues.into_iter() { + for _ in 0..count { local_worker( - acc + i + 1000, key.clone(), - processors.cached(), + self.processors.cached(), queue_handle.inner.clone(), ); - }); - - acc + count - }); + } + } } /// Start the workers in the provided arbiter pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) { - let processors = self.processors.clone(); - self.queues.into_iter().fold(0, |acc, (key, count)| { - (0..count).for_each(|i| { - let processors = processors.clone(); - let queue_handle = queue_handle.clone(); + for (key, count) in self.queues.into_iter() { + for _ in 0..count { let key = key.clone(); - arbiter.exec_fn(move || { - local_worker( - acc + i + 1000, - key.clone(), - processors.cached(), - queue_handle.inner.clone(), - ); - }); - }); + let processors = self.processors.clone(); + let server = queue_handle.inner.clone(); - acc + count - }); + arbiter.exec_fn(move || { + local_worker(key, processors.cached(), server); + }); + } + } } } diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs index 0a1b0fc..8ee03f3 100644 --- a/jobs-actix/src/storage.rs +++ b/jobs-actix/src/storage.rs @@ -1,11 +1,12 @@ use anyhow::Error; use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage}; +use uuid::Uuid; #[async_trait::async_trait] pub(crate) trait ActixStorage { - async fn new_job(&self, job: NewJobInfo) -> Result; + async fn new_job(&self, job: NewJobInfo) -> Result; - async fn request_job(&self, queue: &str, runner_id: u64) -> Result, Error>; + async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result, Error>; async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>; @@ -23,11 +24,11 @@ where S: Storage + Send + Sync, S::Error: Send + Sync + 'static, { - async fn new_job(&self, job: NewJobInfo) -> Result { + async fn new_job(&self, job: NewJobInfo) -> Result { Ok(self.0.new_job(job).await?) } - async fn request_job(&self, queue: &str, runner_id: u64) -> Result, Error> { + async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result, Error> { Ok(self.0.request_job(queue, runner_id).await?) } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index f41053c..c0e5254 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -2,12 +2,13 @@ use crate::Server; use background_jobs_core::{CachedProcessorMap, JobInfo}; use log::{debug, error, warn}; use tokio::sync::mpsc::{channel, Sender}; +use uuid::Uuid; #[async_trait::async_trait] pub trait Worker { async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo>; - fn id(&self) -> u64; + fn id(&self) -> Uuid; fn queue(&self) -> &str; } @@ -15,7 +16,7 @@ pub trait Worker { #[derive(Clone)] pub(crate) struct LocalWorkerHandle { tx: Sender, - id: u64, + id: Uuid, queue: String, } @@ -31,7 +32,7 @@ impl Worker for LocalWorkerHandle { } } - fn id(&self) -> u64 { + fn id(&self) -> Uuid { self.id } @@ -41,13 +42,14 @@ impl Worker for LocalWorkerHandle { } pub(crate) fn local_worker( - id: u64, queue: String, processors: CachedProcessorMap, server: Server, ) where State: Clone + 'static, { + let id = Uuid::new_v4(); + let (tx, mut rx) = channel(16); let handle = LocalWorkerHandle { diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 28c4454..7951cff 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -18,3 +18,4 @@ log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" +uuid = { version = "0.8.1", features = ["serde", "v4"] } diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index ef0b471..b55ad80 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -2,30 +2,31 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; use chrono::{offset::Utc, DateTime, Duration}; use log::trace; use serde_json::Value; +use uuid::Uuid; #[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] /// Information about the sate of an attempted job pub struct ReturnJobInfo { - pub(crate) id: u64, + pub(crate) id: Uuid, pub(crate) result: JobResult, } impl ReturnJobInfo { - pub(crate) fn fail(id: u64) -> Self { + pub(crate) fn fail(id: Uuid) -> Self { ReturnJobInfo { id, result: JobResult::Failure, } } - pub(crate) fn pass(id: u64) -> Self { + pub(crate) fn pass(id: Uuid) -> Self { ReturnJobInfo { id, result: JobResult::Success, } } - pub(crate) fn missing_processor(id: u64) -> Self { + pub(crate) fn missing_processor(id: Uuid) -> Self { ReturnJobInfo { id, result: JobResult::MissingProcessor, @@ -94,7 +95,7 @@ impl NewJobInfo { self.next_queue.is_none() } - pub(crate) fn with_id(self, id: u64) -> JobInfo { + pub(crate) fn with_id(self, id: Uuid) -> JobInfo { JobInfo { id, processor: self.processor, @@ -120,7 +121,7 @@ impl NewJobInfo { /// new_job method. pub struct JobInfo { /// ID of the job - id: u64, + id: Uuid, /// Name of the processor that should handle this job processor: String, @@ -174,7 +175,7 @@ impl JobInfo { } /// The ID of this job - pub fn id(&self) -> u64 { + pub fn id(&self) -> Uuid { self.id } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index a78c4bc..36cb480 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,8 +1,8 @@ +use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; use chrono::offset::Utc; use log::info; use std::error::Error; - -use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; +use uuid::Uuid; /// Define a storage backend for jobs /// @@ -16,7 +16,7 @@ pub trait Storage: Clone + Send { type Error: Error + Send + Sync; /// This method generates unique IDs for jobs - async fn generate_id(&self) -> Result; + async fn generate_id(&self) -> Result; /// This method should store the supplied job /// @@ -25,7 +25,7 @@ pub trait Storage: Clone + Send { async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error>; /// This method should return the job with the given ID regardless of what state the job is in. - async fn fetch_job(&self, id: u64) -> Result, Self::Error>; + async fn fetch_job(&self, id: Uuid) -> Result, Self::Error>; /// This should fetch a job ready to be processed from the queue /// @@ -35,15 +35,15 @@ pub trait Storage: Clone + Send { /// This method tells the storage mechanism to mark the given job as being in the provided /// queue - async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error>; + async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error>; /// This method tells the storage mechanism to mark a given job as running - async fn run_job(&self, id: u64, runner_id: u64) -> Result<(), Self::Error>; + async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>; /// This method tells the storage mechanism to remove the job /// /// This happens when a job has been completed or has failed too many times - async fn delete_job(&self, id: u64) -> Result<(), Self::Error>; + async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error>; /// This method returns the current statistics, or Stats::default() if none exists. async fn get_stats(&self) -> Result; @@ -55,7 +55,7 @@ pub trait Storage: Clone + Send { F: Fn(Stats) -> Stats + Send + 'static; /// Generate a new job based on the provided NewJobInfo - async fn new_job(&self, job: NewJobInfo) -> Result { + async fn new_job(&self, job: NewJobInfo) -> Result { let id = self.generate_id().await?; let job = job.with_id(id); @@ -72,7 +72,7 @@ pub trait Storage: Clone + Send { async fn request_job( &self, queue: &str, - runner_id: u64, + runner_id: Uuid, ) -> Result, Self::Error> { match self.fetch_job_from_queue(queue).await? { Some(mut job) => { @@ -138,6 +138,7 @@ pub mod memory_storage { use chrono::Utc; use futures::lock::Mutex; use std::{collections::HashMap, convert::Infallible, sync::Arc}; + use uuid::Uuid; #[derive(Clone)] /// An In-Memory store for jobs @@ -147,11 +148,10 @@ pub mod memory_storage { #[derive(Clone)] struct Inner { - count: u64, - jobs: HashMap, - queues: HashMap, - worker_ids: HashMap, - worker_ids_inverse: HashMap, + jobs: HashMap, + queues: HashMap, + worker_ids: HashMap, + worker_ids_inverse: HashMap, stats: Stats, } @@ -160,7 +160,6 @@ pub mod memory_storage { pub fn new() -> Self { Storage { inner: Arc::new(Mutex::new(Inner { - count: 0, jobs: HashMap::new(), queues: HashMap::new(), worker_ids: HashMap::new(), @@ -175,11 +174,15 @@ pub mod memory_storage { impl super::Storage for Storage { type Error = Infallible; - async fn generate_id(&self) -> Result { - let mut inner = self.inner.lock().await; - let id = inner.count; - inner.count = inner.count.wrapping_add(1); - Ok(id) + async fn generate_id(&self) -> Result { + let uuid = loop { + let uuid = Uuid::new_v4(); + if !self.inner.lock().await.jobs.contains_key(&uuid) { + break uuid; + } + }; + + Ok(uuid) } async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> { @@ -188,7 +191,7 @@ pub mod memory_storage { Ok(()) } - async fn fetch_job(&self, id: u64) -> Result, Self::Error> { + async fn fetch_job(&self, id: Uuid) -> Result, Self::Error> { let j = self.inner.lock().await.jobs.get(&id).map(|j| j.clone()); Ok(j) @@ -221,12 +224,12 @@ pub mod memory_storage { Ok(j) } - async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error> { + async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> { self.inner.lock().await.queues.insert(id, queue.to_owned()); Ok(()) } - async fn run_job(&self, id: u64, worker_id: u64) -> Result<(), Self::Error> { + async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> { let mut inner = self.inner.lock().await; inner.worker_ids.insert(id, worker_id); @@ -234,7 +237,7 @@ pub mod memory_storage { Ok(()) } - async fn delete_job(&self, id: u64) -> Result<(), Self::Error> { + async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> { let mut inner = self.inner.lock().await; inner.jobs.remove(&id); inner.queues.remove(&id); diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 1089985..25eaac9 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -17,3 +17,4 @@ background-jobs-core = { version = "0.7", path = "../jobs-core" } chrono = "0.4" sled-extensions = { version = "0.3.0-alpha.0", features = ["bincode", "cbor"], git = "https://git.asonix.dog/Aardwolf/sled-extensions" } thiserror = "1.0" +uuid = { version = "0.8.1", features = ["v4", "serde"] } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index db20f00..5c07f34 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -17,6 +17,7 @@ use actix_threadpool::{run, BlockingError}; use background_jobs_core::{JobInfo, Stats, Storage}; use chrono::offset::Utc; use sled_extensions::{bincode::Tree, cbor, Db, DbExt}; +use uuid::Uuid; /// The error produced by sled storage calls #[derive(Debug, thiserror::Error)] @@ -37,11 +38,11 @@ pub type Result = std::result::Result; /// The Sled-backed storage implementation pub struct SledStorage { jobinfo: cbor::Tree, - running: Tree, - running_inverse: Tree, + running: Tree, + running_inverse: Tree, queue: Tree, stats: Tree, - lock: Tree, + lock: Tree, db: Db, } @@ -49,10 +50,21 @@ pub struct SledStorage { impl Storage for SledStorage { type Error = Error; - async fn generate_id(&self) -> Result { + async fn generate_id(&self) -> Result { let this = self.clone(); - Ok(run(move || Ok(this.db.generate_id()?) as sled_extensions::Result).await?) + Ok(run(move || { + let uuid = loop { + let uuid = Uuid::new_v4(); + + if !this.jobinfo.contains_key(job_key(uuid))? { + break uuid; + } + }; + + Ok(uuid) as sled_extensions::Result + }) + .await?) } async fn save_job(&self, job: JobInfo) -> Result<()> { @@ -66,7 +78,7 @@ impl Storage for SledStorage { .await?) } - async fn fetch_job(&self, id: u64) -> Result> { + async fn fetch_job(&self, id: Uuid) -> Result> { let this = self.clone(); Ok(run(move || this.jobinfo.get(job_key(id))).await?) @@ -111,7 +123,7 @@ impl Storage for SledStorage { .await?) } - async fn queue_job(&self, queue: &str, id: u64) -> Result<()> { + async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> { let this = self.clone(); let queue = queue.to_owned(); @@ -125,7 +137,7 @@ impl Storage for SledStorage { .await?) } - async fn run_job(&self, id: u64, runner_id: u64) -> Result<()> { + async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> { let this = self.clone(); Ok(run(move || { @@ -139,7 +151,7 @@ impl Storage for SledStorage { .await?) } - async fn delete_job(&self, id: u64) -> Result<()> { + async fn delete_job(&self, id: Uuid) -> Result<()> { let this = self.clone(); Ok(run(move || { @@ -204,7 +216,7 @@ impl SledStorage { where F: Fn() -> sled_extensions::Result, { - let id = self.db.generate_id()?; + let id = Uuid::new_v4(); let mut prev; while { @@ -224,11 +236,11 @@ impl SledStorage { } } -fn job_key(id: u64) -> String { +fn job_key(id: Uuid) -> String { format!("job-{}", id) } -fn runner_key(runner_id: u64) -> String { +fn runner_key(runner_id: Uuid) -> String { format!("runner-{}", runner_id) }