From 1ac3c0bc86ffe710c3910d087d18bed2ceaa3bda Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 2 Jul 2022 13:42:17 -0500 Subject: [PATCH] Remove concept of ticking, instead wait for jobs --- Cargo.toml | 25 ++-- examples/basic-example/Cargo.toml | 4 +- examples/long-example/Cargo.toml | 4 +- examples/managed-example/Cargo.toml | 4 +- examples/managed-example/src/main.rs | 3 + examples/panic-example/Cargo.toml | 4 +- jobs-actix/Cargo.toml | 14 +- jobs-actix/src/lib.rs | 20 +-- jobs-actix/src/server.rs | 153 ++------------------- jobs-actix/src/storage.rs | 4 +- jobs-core/Cargo.toml | 6 +- jobs-core/src/storage.rs | 191 +++++++++++++++++---------- jobs-sled/Cargo.toml | 6 +- jobs-sled/src/lib.rs | 136 +++++++++++++++---- 14 files changed, 287 insertions(+), 287 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2e55335..86e51d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with actix and futures" -version = "0.12.0" +version = "0.13.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -11,25 +11,28 @@ edition = "2021" [workspace] members = [ - "jobs-actix", - "jobs-core", - "jobs-sled", - "examples/basic-example", - "examples/long-example", - "examples/managed-example", - "examples/panic-example", + "jobs-actix", + "jobs-core", + "jobs-sled", + "examples/basic-example", + "examples/long-example", + "examples/managed-example", + "examples/panic-example", ] [features] default = ["background-jobs-actix"] -completion-logging = ["background-jobs-core/completion-logging", "error-logging"] +completion-logging = [ + "background-jobs-core/completion-logging", + "error-logging", +] error-logging = ["background-jobs-core/error-logging"] [dependencies.background-jobs-core] -version = "0.12.0" +version = "0.13.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.12.0" +version = "0.13.0" path = "jobs-actix" optional = true diff --git a/examples/basic-example/Cargo.toml b/examples/basic-example/Cargo.toml index 9836bab..4023852 100644 --- a/examples/basic-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.13.0", path = "../..", features = [ + "error-logging", +] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml index 151faca..2f0c421 100644 --- a/examples/long-example/Cargo.toml +++ b/examples/long-example/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.13.0", path = "../..", features = [ + "error-logging", +] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml index 9b9330d..91a681a 100644 --- a/examples/managed-example/Cargo.toml +++ b/examples/managed-example/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.13.0", path = "../..", features = [ + "error-logging", +] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index 0694222..3a0585f 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -53,12 +53,14 @@ async fn main() -> Result<(), Error> { .await?; // Block on Actix + tracing::info!("Press CTRL^C to continue"); actix_rt::signal::ctrl_c().await?; // kill the current arbiter manager.queue(StopJob).await?; // Block on Actix + tracing::info!("Press CTRL^C to continue"); actix_rt::signal::ctrl_c().await?; // See that the workers have respawned @@ -70,6 +72,7 @@ async fn main() -> Result<(), Error> { .await?; // Block on Actix + tracing::info!("Press CTRL^C to quit"); actix_rt::signal::ctrl_c().await?; drop(manager); diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml index 8bda3e1..ea669d3 100644 --- a/examples/panic-example/Cargo.toml +++ b/examples/panic-example/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.13.0", path = "../..", features = [ + "error-logging", +] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } time = "0.3" tracing = "0.1" diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index cea93fc..2079892 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.12.0" +version = "0.13.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -14,12 +14,18 @@ actix-rt = "2.5.1" anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.12.0", path = "../jobs-core", features = ["with-actix"] } +background-jobs-core = { version = "0.13.0", path = "../jobs-core", features = [ + "with-actix", +] } tracing = "0.1" tracing-futures = "0.2" num_cpus = "1.10.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["macros", "rt", "sync"] } -uuid = { version ="0.8.1", features = ["v4", "serde"] } +tokio = { version = "1", default-features = false, features = [ + "macros", + "rt", + "sync", +] } +uuid = { version = "1", features = ["v4", "serde"] } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 9fbdf87..97d3118 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -171,10 +171,6 @@ impl Manager { }; loop { - worker_config - .queue_handle - .inner - .ticker(arbiter.handle(), drop_notifier.clone()); worker_config.start_managed(&arbiter.handle(), &drop_notifier); notifier.notified().await; @@ -254,20 +250,6 @@ impl Drop for DropNotifier { } } -/// Create a new Server -/// -/// In previous versions of this library, the server itself was run on it's own dedicated threads -/// and guarded access to jobs via messages. Since we now have futures-aware synchronization -/// primitives, the Server has become an object that gets shared between client threads. -fn create_server_in_arbiter(arbiter: ArbiterHandle, storage: S) -> QueueHandle -where - S: Storage + Sync + 'static, -{ - let handle = create_server_managed(storage); - handle.inner.ticker(arbiter, ()); - handle -} - /// Create a new managed Server /// /// In previous versions of this library, the server itself was run on it's own dedicated threads @@ -361,7 +343,7 @@ where storage: S, state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static, ) -> Self { - let queue_handle = create_server_in_arbiter(arbiter.clone(), storage); + let queue_handle = create_server_managed(storage); let q2 = queue_handle.clone(); WorkerConfig { diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 8f53876..56fd376 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -2,63 +2,10 @@ use crate::{ storage::{ActixStorage, StorageWrapper}, worker::Worker, }; -use actix_rt::{ - time::{interval_at, Instant}, - ArbiterHandle, -}; use anyhow::Error; -use async_mutex::Mutex; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - time::Duration, -}; -use tracing::{error, trace, warn}; - -type WorkerQueue = VecDeque>; - -#[derive(Clone)] -pub(crate) struct ServerCache { - cache: Arc>>, -} - -pub(super) struct Ticker { - server: Server, - extras: Option, - arbiter: ArbiterHandle, -} - -impl Drop for Ticker { - fn drop(&mut self) { - let online = self.arbiter.spawn(async move {}); - - let extras = self.extras.take().unwrap(); - - if online { - let server = self.server.clone(); - - let arbiter = self.arbiter.clone(); - let spawned = self.arbiter.spawn(async move { - let _ticker = server.ticker(arbiter, extras); - let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); - - loop { - interval.tick().await; - if let Err(e) = server.check_db().await { - error!("Error while checking database for new jobs, {}", e); - } - } - }); - - if spawned { - return; - } - } - - warn!("Not restarting ticker, arbiter is dead"); - } -} +use std::sync::Arc; +use tracing::{error, trace}; /// The server Actor /// @@ -67,22 +14,9 @@ impl Drop for Ticker { #[derive(Clone)] pub(crate) struct Server { storage: Arc, - cache: ServerCache, } impl Server { - pub(super) fn ticker( - &self, - arbiter: ArbiterHandle, - extras: Extras, - ) -> Ticker { - Ticker { - server: self.clone(), - extras: Some(extras), - arbiter, - } - } - /// Create a new Server from a compatible storage implementation pub(crate) fn new(storage: S) -> Self where @@ -90,26 +24,10 @@ impl Server { { Server { storage: Arc::new(StorageWrapper(storage)), - cache: ServerCache::new(), } } - async fn check_db(&self) -> Result<(), Error> { - trace!("Checking db for ready jobs"); - for queue in self.cache.keys().await { - 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { - if !self.try_turning(queue.clone(), worker).await? { - break 'worker_loop; - } - } - trace!("Finished job lookups for queue {}", queue); - } - - Ok(()) - } - pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { - let queue = job.queue().to_owned(); let ready = job.is_ready(); self.storage.new_job(job).await?; @@ -118,10 +36,6 @@ impl Server { return Ok(()); } - if let Some(worker) = self.cache.pop(queue.clone()).await { - self.try_turning(queue, worker).await?; - } - Ok(()) } @@ -130,30 +44,17 @@ impl Server { worker: Box, ) -> Result<(), Error> { trace!("Worker {} requested job", worker.id()); + let job = self + .storage + .request_job(worker.queue(), worker.id()) + .await?; - self.try_turning(worker.queue().to_owned(), worker).await?; - - Ok(()) - } - - async fn try_turning( - &self, - queue: String, - worker: Box, - ) -> Result { - trace!("Trying to find job for worker {}", worker.id()); - if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { - if let Err(job) = worker.process(job).await { - error!("Worker {} has hung up", worker.id()); - self.storage.return_job(job.unexecuted()).await? - } - } else { - trace!("No job exists, returning worker {}", worker.id()); - self.cache.push(queue.clone(), worker).await; - return Ok(false); + if let Err(job) = worker.process(job).await { + error!("Worker {} has hung up", worker.id()); + self.storage.return_job(job.unexecuted()).await?; } - Ok(true) + Ok(()) } pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { @@ -164,37 +65,3 @@ impl Server { Ok(self.storage.get_stats().await?) } } - -impl ServerCache { - fn new() -> Self { - ServerCache { - cache: Arc::new(Mutex::new(HashMap::new())), - } - } - - async fn keys(&self) -> Vec { - let cache = self.cache.lock().await; - - cache.keys().cloned().collect() - } - - async fn push(&self, queue: String, worker: Box) { - let mut cache = self.cache.lock().await; - - let entry = cache.entry(queue).or_insert_with(VecDeque::new); - entry.push_back(worker); - } - - async fn pop(&self, queue: String) -> Option> { - let mut cache = self.cache.lock().await; - - let mut vec_deque = cache.remove(&queue)?; - let item = vec_deque.pop_front()?; - - if !vec_deque.is_empty() { - cache.insert(queue, vec_deque); - } - - Some(item) - } -} diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs index 8ee03f3..de04792 100644 --- a/jobs-actix/src/storage.rs +++ b/jobs-actix/src/storage.rs @@ -6,7 +6,7 @@ use uuid::Uuid; pub(crate) trait ActixStorage { async fn new_job(&self, job: NewJobInfo) -> Result; - async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result, Error>; + async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result; async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>; @@ -28,7 +28,7 @@ where Ok(self.0.new_job(job).await?) } - async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result, Error> { + async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result { Ok(self.0.request_job(queue, runner_id).await?) } diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index c482fe9..bd21979 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.12.0" +version = "0.13.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -18,12 +18,12 @@ error-logging = [] [dependencies] actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" -async-mutex = "1.0.1" async-trait = "0.1.24" +event-listener = "2" time = { version = "0.3", features = ["serde-human-readable"] } tracing = "0.1" tracing-futures = "0.2.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -uuid = { version = "0.8.1", features = ["serde", "v4"] } +uuid = { version = "1", features = ["serde", "v4"] } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 56e2770..8f58c9d 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,6 +1,6 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; use std::{error::Error, time::SystemTime}; -use tracing::info; +use tracing::warn; use uuid::Uuid; /// Define a storage backend for jobs @@ -29,8 +29,8 @@ pub trait Storage: Clone + Send { /// This should fetch a job ready to be processed from the queue /// /// If a job is not ready, is currently running, or is not in the requested queue, this method - /// should not return it. If no jobs meet these criteria, this method should return Ok(None) - async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error>; + /// should not return it. If no jobs meet these criteria, this method wait until a job becomes available + async fn fetch_job_from_queue(&self, queue: &str) -> Result; /// This method tells the storage mechanism to mark the given job as being in the provided /// queue @@ -68,31 +68,25 @@ pub trait Storage: Clone + Send { } /// Fetch a job that is ready to be executed, marking it as running - async fn request_job( - &self, - queue: &str, - runner_id: Uuid, - ) -> Result, Self::Error> { - match self.fetch_job_from_queue(queue).await? { - Some(mut job) => { - let now = SystemTime::now(); - if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { - job.run(); - self.run_job(job.id(), runner_id).await?; - self.save_job(job.clone()).await?; - self.update_stats(Stats::run_job).await?; + async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result { + loop { + let mut job = self.fetch_job_from_queue(queue).await?; - Ok(Some(job)) - } else { - info!( - "Not fetching job {}, it is not ready for processing", - job.id() - ); - self.queue_job(job.queue(), job.id()).await?; - Ok(None) - } + let now = SystemTime::now(); + if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { + job.run(); + self.run_job(job.id(), runner_id).await?; + self.save_job(job.clone()).await?; + self.update_stats(Stats::run_job).await?; + + return Ok(job); + } else { + warn!( + "Not fetching job {}, it is not ready for processing", + job.id() + ); + self.queue_job(job.queue(), job.id()).await?; } - None => Ok(None), } } @@ -136,54 +130,66 @@ pub trait Storage: Clone + Send { /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { use super::{JobInfo, Stats}; - use async_mutex::Mutex; - use std::{collections::HashMap, convert::Infallible, sync::Arc, time::SystemTime}; + use event_listener::Event; + use std::{ + collections::HashMap, + convert::Infallible, + sync::Arc, + sync::Mutex, + time::{Duration, SystemTime}, + }; use uuid::Uuid; - #[derive(Clone)] - /// An In-Memory store for jobs - pub struct Storage { - inner: Arc>, + /// Allows memory storage to set timeouts for when to retry checking a queue for a job + #[async_trait::async_trait] + pub trait Timer { + /// Race a future against the clock, returning an empty tuple if the clock wins + async fn timeout(&self, duration: Duration, future: F) -> Result + where + F: std::future::Future; } #[derive(Clone)] + /// An In-Memory store for jobs + pub struct Storage { + timer: T, + inner: Arc>, + } + struct Inner { + queues: HashMap, jobs: HashMap, - queues: HashMap, + job_queues: HashMap, worker_ids: HashMap, worker_ids_inverse: HashMap, stats: Stats, } - impl Storage { + impl Storage { /// Create a new, empty job store - pub fn new() -> Self { + pub fn new(timer: T) -> Self { Storage { inner: Arc::new(Mutex::new(Inner { - jobs: HashMap::new(), queues: HashMap::new(), + jobs: HashMap::new(), + job_queues: HashMap::new(), worker_ids: HashMap::new(), worker_ids_inverse: HashMap::new(), stats: Stats::default(), })), + timer, } } } - impl Default for Storage { - fn default() -> Self { - Self::new() - } - } - #[async_trait::async_trait] - impl super::Storage for Storage { + impl super::Storage for Storage { type Error = Infallible; async fn generate_id(&self) -> Result { let uuid = loop { let uuid = Uuid::new_v4(); - if !self.inner.lock().await.jobs.contains_key(&uuid) { + if !self.inner.lock().unwrap().jobs.contains_key(&uuid) { break uuid; } }; @@ -192,51 +198,94 @@ pub mod memory_storage { } async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> { - self.inner.lock().await.jobs.insert(job.id(), job); + self.inner.lock().unwrap().jobs.insert(job.id(), job); Ok(()) } async fn fetch_job(&self, id: Uuid) -> Result, Self::Error> { - let j = self.inner.lock().await.jobs.get(&id).cloned(); + let j = self.inner.lock().unwrap().jobs.get(&id).cloned(); Ok(j) } - async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error> { - let mut inner = self.inner.lock().await; - let now = SystemTime::now(); + async fn fetch_job_from_queue(&self, queue: &str) -> Result { + loop { + let listener = { + let mut inner = self.inner.lock().unwrap(); + let now = SystemTime::now(); - let j = inner - .queues - .iter() - .filter_map(|(k, v)| { - if v == queue { - let job = inner.jobs.get(k)?; + let j = inner.job_queues.iter().find_map(|(k, v)| { + if v == queue { + let job = inner.jobs.get(k)?; - if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { - return Some(job.clone()); + if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { + return Some(job.clone()); + } } - } - None - }) - .next(); + None + }); - if let Some(ref j) = j { - inner.queues.remove(&j.id()); + let duration = if let Some(j) = j { + if inner.job_queues.remove(&j.id()).is_some() { + return Ok(j); + } else { + continue; + } + } else { + inner.job_queues.iter().fold( + Duration::from_secs(5), + |duration, (id, v_queue)| { + if v_queue == queue { + if let Some(job) = inner.jobs.get(id) { + if let Some(ready_at) = job.next_queue() { + let job_eta = ready_at + .duration_since(now) + .unwrap_or(Duration::from_secs(0)); + + if job_eta < duration { + return job_eta; + } + } + } + } + + duration + }, + ) + }; + + self.timer.timeout( + duration, + inner + .queues + .entry(queue.to_string()) + .or_insert(Event::new()) + .listen(), + ) + }; + + let _ = listener.await; } - - Ok(j) } async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> { - self.inner.lock().await.queues.insert(id, queue.to_owned()); + let mut inner = self.inner.lock().unwrap(); + + inner.job_queues.insert(id, queue.to_owned()); + + inner + .queues + .entry(queue.to_string()) + .or_insert(Event::new()) + .notify(1); + Ok(()) } async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock().unwrap(); inner.worker_ids.insert(id, worker_id); inner.worker_ids_inverse.insert(worker_id, id); @@ -244,9 +293,9 @@ pub mod memory_storage { } async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock().unwrap(); inner.jobs.remove(&id); - inner.queues.remove(&id); + inner.job_queues.remove(&id); if let Some(worker_id) = inner.worker_ids.remove(&id) { inner.worker_ids_inverse.remove(&worker_id); } @@ -254,14 +303,14 @@ pub mod memory_storage { } async fn get_stats(&self) -> Result { - Ok(self.inner.lock().await.stats.clone()) + Ok(self.inner.lock().unwrap().stats.clone()) } async fn update_stats(&self, f: F) -> Result<(), Self::Error> where F: Fn(Stats) -> Stats + Send, { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock().unwrap(); inner.stats = (f)(inner.stats.clone()); Ok(()) diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index b18807a..05215dc 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -13,10 +13,10 @@ edition = "2021" [dependencies] actix-rt = "2.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.12.0", path = "../jobs-core" } +background-jobs-core = { version = "0.13.0", path = "../jobs-core" } bincode = "1.2" sled = "0.34" serde_cbor = "0.11" thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["rt"] } -uuid = { version = "0.8.1", features = ["v4", "serde"] } +tokio = { version = "1", default-features = false, features = ["rt", "sync"] } +uuid = { version = "1", features = ["v4", "serde"] } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index b7f932a..284f86b 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -13,10 +13,18 @@ //! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! ``` -use actix_rt::task::{spawn_blocking, JoinError}; +use actix_rt::{ + task::{spawn_blocking, JoinError}, + time::timeout, +}; use background_jobs_core::{JobInfo, Stats}; use sled::{Db, Tree}; -use std::time::SystemTime; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::{Duration, SystemTime}, +}; +use tokio::sync::Notify; use uuid::Uuid; /// The error produced by sled storage calls @@ -47,7 +55,8 @@ pub struct Storage { running_inverse: Tree, queue: Tree, stats: Tree, - db: Db, + notifiers: Arc>>>, + _db: Db, } #[async_trait::async_trait] @@ -103,18 +112,59 @@ impl background_jobs_core::Storage for Storage { .await??) } - async fn fetch_job_from_queue(&self, queue: &str) -> Result> { - let this = self.clone(); - let queue = queue.to_owned(); + async fn fetch_job_from_queue(&self, queue: &str) -> Result { + loop { + let this = self.clone(); + let queue2 = queue.to_owned(); - Ok(spawn_blocking(move || { - let mut job; + let job = spawn_blocking(move || { + let queue = queue2; + let mut job; - let now = SystemTime::now(); + let now = SystemTime::now(); - while { - let job_opt = this - .queue + while { + let job_opt = this + .queue + .iter() + .filter_map(|res| res.ok()) + .filter_map(|(id, in_queue)| { + if queue.as_bytes() == in_queue.as_ref() { + Some(id) + } else { + None + } + }) + .filter_map(|id| this.jobinfo.get(id).ok()) + .flatten() + .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) + .find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now)); + + job = if let Some(job) = job_opt { + job + } else { + return Ok(None); + }; + + this.queue.remove(job.id().as_bytes())?.is_none() + } {} + + Ok(Some(job)) as Result> + }) + .await??; + + if let Some(job) = job { + return Ok(job); + } + + let this = self.clone(); + let queue2 = queue.to_owned(); + + let duration = spawn_blocking(move || { + let queue = queue2; + let now = SystemTime::now(); + + this.queue .iter() .filter_map(|res| res.ok()) .filter_map(|(id, in_queue)| { @@ -127,27 +177,36 @@ impl background_jobs_core::Storage for Storage { .filter_map(|id| this.jobinfo.get(id).ok()) .flatten() .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) - .find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now)); + .filter(|job: &JobInfo| !job.is_ready(now) && job.is_pending(now)) + .fold(Duration::from_secs(5), |duration, job| { + if let Some(next_queue) = job.next_queue() { + let job_duration = next_queue + .duration_since(now) + .unwrap_or(Duration::from_secs(0)); - job = if let Some(job) = job_opt { - job - } else { - return Ok(None); - }; + if job_duration < duration { + return job_duration; + } + } - this.queue.remove(job.id().as_bytes())?.is_none() - } {} + duration + }) + }) + .await?; - Ok(Some(job)) as Result> - }) - .await??) + let notifier = self.notifier(queue.to_owned()); + + let _ = timeout(duration, notifier.notified()).await; + } } async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> { let this = self.clone(); - let queue = queue.to_owned(); + let queue2 = queue.to_owned(); + + spawn_blocking(move || { + let queue = queue2; - Ok(spawn_blocking(move || { if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? { this.running.remove(runner_id)?; } @@ -156,7 +215,11 @@ impl background_jobs_core::Storage for Storage { Ok(()) as Result<_> }) - .await??) + .await??; + + self.notify(queue.to_owned()); + + Ok(()) } async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> { @@ -243,9 +306,28 @@ impl Storage { running_inverse: db.open_tree("background-jobs-running-inverse")?, queue: db.open_tree("background-jobs-queue")?, stats: db.open_tree("background-jobs-stats")?, - db, + notifiers: Arc::new(Mutex::new(HashMap::new())), + _db: db, }) } + + fn notifier(&self, queue: String) -> Arc { + self.notifiers + .lock() + .unwrap() + .entry(queue) + .or_insert_with(|| Arc::new(Notify::new())) + .clone() + } + + fn notify(&self, queue: String) { + self.notifiers + .lock() + .unwrap() + .entry(queue) + .or_insert_with(|| Arc::new(Notify::new())) + .notify_one(); + } } impl From for Error {