Remove concept of ticking, instead wait for jobs

This commit is contained in:
Aode (lion) 2022-07-02 13:42:17 -05:00
parent bf65fe802a
commit 1ac3c0bc86
14 changed files with 287 additions and 287 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs"
description = "Background Jobs implemented with actix and futures"
version = "0.12.0"
version = "0.13.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -11,25 +11,28 @@ edition = "2021"
[workspace]
members = [
"jobs-actix",
"jobs-core",
"jobs-sled",
"examples/basic-example",
"examples/long-example",
"examples/managed-example",
"examples/panic-example",
"jobs-actix",
"jobs-core",
"jobs-sled",
"examples/basic-example",
"examples/long-example",
"examples/managed-example",
"examples/panic-example",
]
[features]
default = ["background-jobs-actix"]
completion-logging = ["background-jobs-core/completion-logging", "error-logging"]
completion-logging = [
"background-jobs-core/completion-logging",
"error-logging",
]
error-logging = ["background-jobs-core/error-logging"]
[dependencies.background-jobs-core]
version = "0.12.0"
version = "0.13.0"
path = "jobs-core"
[dependencies.background-jobs-actix]
version = "0.12.0"
version = "0.13.0"
path = "jobs-actix"
optional = true

View file

@ -9,7 +9,9 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] }
background-jobs = { version = "0.13.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
tracing = "0.1"
tracing-subscriber = { version = "0.2", features = ["fmt"] }

View file

@ -9,7 +9,9 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] }
background-jobs = { version = "0.13.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
tracing = "0.1"
tracing-subscriber = { version = "0.2", features = ["fmt"] }

View file

@ -9,7 +9,9 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] }
background-jobs = { version = "0.13.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
tracing = "0.1"
tracing-subscriber = { version = "0.2", features = ["fmt"] }

View file

@ -53,12 +53,14 @@ async fn main() -> Result<(), Error> {
.await?;
// Block on Actix
tracing::info!("Press CTRL^C to continue");
actix_rt::signal::ctrl_c().await?;
// kill the current arbiter
manager.queue(StopJob).await?;
// Block on Actix
tracing::info!("Press CTRL^C to continue");
actix_rt::signal::ctrl_c().await?;
// See that the workers have respawned
@ -70,6 +72,7 @@ async fn main() -> Result<(), Error> {
.await?;
// Block on Actix
tracing::info!("Press CTRL^C to quit");
actix_rt::signal::ctrl_c().await?;
drop(manager);

View file

@ -9,7 +9,9 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.0"
anyhow = "1.0"
background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] }
background-jobs = { version = "0.13.0", path = "../..", features = [
"error-logging",
] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
time = "0.3"
tracing = "0.1"

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-actix"
description = "in-process jobs processor based on Actix"
version = "0.12.0"
version = "0.13.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -14,12 +14,18 @@ actix-rt = "2.5.1"
anyhow = "1.0"
async-mutex = "1.0.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.12.0", path = "../jobs-core", features = ["with-actix"] }
background-jobs-core = { version = "0.13.0", path = "../jobs-core", features = [
"with-actix",
] }
tracing = "0.1"
tracing-futures = "0.2"
num_cpus = "1.10.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["macros", "rt", "sync"] }
uuid = { version ="0.8.1", features = ["v4", "serde"] }
tokio = { version = "1", default-features = false, features = [
"macros",
"rt",
"sync",
] }
uuid = { version = "1", features = ["v4", "serde"] }

View file

@ -171,10 +171,6 @@ impl Manager {
};
loop {
worker_config
.queue_handle
.inner
.ticker(arbiter.handle(), drop_notifier.clone());
worker_config.start_managed(&arbiter.handle(), &drop_notifier);
notifier.notified().await;
@ -254,20 +250,6 @@ impl Drop for DropNotifier {
}
}
/// Create a new Server
///
/// In previous versions of this library, the server itself was run on it's own dedicated threads
/// and guarded access to jobs via messages. Since we now have futures-aware synchronization
/// primitives, the Server has become an object that gets shared between client threads.
fn create_server_in_arbiter<S>(arbiter: ArbiterHandle, storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
let handle = create_server_managed(storage);
handle.inner.ticker(arbiter, ());
handle
}
/// Create a new managed Server
///
/// In previous versions of this library, the server itself was run on it's own dedicated threads
@ -361,7 +343,7 @@ where
storage: S,
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
) -> Self {
let queue_handle = create_server_in_arbiter(arbiter.clone(), storage);
let queue_handle = create_server_managed(storage);
let q2 = queue_handle.clone();
WorkerConfig {

View file

@ -2,63 +2,10 @@ use crate::{
storage::{ActixStorage, StorageWrapper},
worker::Worker,
};
use actix_rt::{
time::{interval_at, Instant},
ArbiterHandle,
};
use anyhow::Error;
use async_mutex::Mutex;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Duration,
};
use tracing::{error, trace, warn};
type WorkerQueue = VecDeque<Box<dyn Worker + Send + Sync>>;
#[derive(Clone)]
pub(crate) struct ServerCache {
cache: Arc<Mutex<HashMap<String, WorkerQueue>>>,
}
pub(super) struct Ticker<Extras: Send + 'static> {
server: Server,
extras: Option<Extras>,
arbiter: ArbiterHandle,
}
impl<Extras: Send + 'static> Drop for Ticker<Extras> {
fn drop(&mut self) {
let online = self.arbiter.spawn(async move {});
let extras = self.extras.take().unwrap();
if online {
let server = self.server.clone();
let arbiter = self.arbiter.clone();
let spawned = self.arbiter.spawn(async move {
let _ticker = server.ticker(arbiter, extras);
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
if spawned {
return;
}
}
warn!("Not restarting ticker, arbiter is dead");
}
}
use std::sync::Arc;
use tracing::{error, trace};
/// The server Actor
///
@ -67,22 +14,9 @@ impl<Extras: Send + 'static> Drop for Ticker<Extras> {
#[derive(Clone)]
pub(crate) struct Server {
storage: Arc<dyn ActixStorage + Send + Sync>,
cache: ServerCache,
}
impl Server {
pub(super) fn ticker<Extras: Send + 'static>(
&self,
arbiter: ArbiterHandle,
extras: Extras,
) -> Ticker<Extras> {
Ticker {
server: self.clone(),
extras: Some(extras),
arbiter,
}
}
/// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(storage: S) -> Self
where
@ -90,26 +24,10 @@ impl Server {
{
Server {
storage: Arc::new(StorageWrapper(storage)),
cache: ServerCache::new(),
}
}
async fn check_db(&self) -> Result<(), Error> {
trace!("Checking db for ready jobs");
for queue in self.cache.keys().await {
'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await {
if !self.try_turning(queue.clone(), worker).await? {
break 'worker_loop;
}
}
trace!("Finished job lookups for queue {}", queue);
}
Ok(())
}
pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> {
let queue = job.queue().to_owned();
let ready = job.is_ready();
self.storage.new_job(job).await?;
@ -118,10 +36,6 @@ impl Server {
return Ok(());
}
if let Some(worker) = self.cache.pop(queue.clone()).await {
self.try_turning(queue, worker).await?;
}
Ok(())
}
@ -130,30 +44,17 @@ impl Server {
worker: Box<dyn Worker + Send + Sync + 'static>,
) -> Result<(), Error> {
trace!("Worker {} requested job", worker.id());
let job = self
.storage
.request_job(worker.queue(), worker.id())
.await?;
self.try_turning(worker.queue().to_owned(), worker).await?;
Ok(())
}
async fn try_turning(
&self,
queue: String,
worker: Box<dyn Worker + Send + Sync + 'static>,
) -> Result<bool, Error> {
trace!("Trying to find job for worker {}", worker.id());
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
if let Err(job) = worker.process(job).await {
error!("Worker {} has hung up", worker.id());
self.storage.return_job(job.unexecuted()).await?
}
} else {
trace!("No job exists, returning worker {}", worker.id());
self.cache.push(queue.clone(), worker).await;
return Ok(false);
if let Err(job) = worker.process(job).await {
error!("Worker {} has hung up", worker.id());
self.storage.return_job(job.unexecuted()).await?;
}
Ok(true)
Ok(())
}
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {
@ -164,37 +65,3 @@ impl Server {
Ok(self.storage.get_stats().await?)
}
}
impl ServerCache {
fn new() -> Self {
ServerCache {
cache: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn keys(&self) -> Vec<String> {
let cache = self.cache.lock().await;
cache.keys().cloned().collect()
}
async fn push(&self, queue: String, worker: Box<dyn Worker + Send + Sync>) {
let mut cache = self.cache.lock().await;
let entry = cache.entry(queue).or_insert_with(VecDeque::new);
entry.push_back(worker);
}
async fn pop(&self, queue: String) -> Option<Box<dyn Worker + Send + Sync>> {
let mut cache = self.cache.lock().await;
let mut vec_deque = cache.remove(&queue)?;
let item = vec_deque.pop_front()?;
if !vec_deque.is_empty() {
cache.insert(queue, vec_deque);
}
Some(item)
}
}

View file

@ -6,7 +6,7 @@ use uuid::Uuid;
pub(crate) trait ActixStorage {
async fn new_job(&self, job: NewJobInfo) -> Result<Uuid, Error>;
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<Option<JobInfo>, Error>;
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>;
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>;
@ -28,7 +28,7 @@ where
Ok(self.0.new_job(job).await?)
}
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<Option<JobInfo>, Error> {
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error> {
Ok(self.0.request_job(queue, runner_id).await?)
}

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor"
version = "0.12.0"
version = "0.13.0"
license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs"
@ -18,12 +18,12 @@ error-logging = []
[dependencies]
actix-rt = { version = "2.3.0", optional = true }
anyhow = "1.0"
async-mutex = "1.0.1"
async-trait = "0.1.24"
event-listener = "2"
time = { version = "0.3", features = ["serde-human-readable"] }
tracing = "0.1"
tracing-futures = "0.2.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "0.8.1", features = ["serde", "v4"] }
uuid = { version = "1", features = ["serde", "v4"] }

View file

@ -1,6 +1,6 @@
use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
use std::{error::Error, time::SystemTime};
use tracing::info;
use tracing::warn;
use uuid::Uuid;
/// Define a storage backend for jobs
@ -29,8 +29,8 @@ pub trait Storage: Clone + Send {
/// This should fetch a job ready to be processed from the queue
///
/// If a job is not ready, is currently running, or is not in the requested queue, this method
/// should not return it. If no jobs meet these criteria, this method should return Ok(None)
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error>;
/// should not return it. If no jobs meet these criteria, this method wait until a job becomes available
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error>;
/// This method tells the storage mechanism to mark the given job as being in the provided
/// queue
@ -68,31 +68,25 @@ pub trait Storage: Clone + Send {
}
/// Fetch a job that is ready to be executed, marking it as running
async fn request_job(
&self,
queue: &str,
runner_id: Uuid,
) -> Result<Option<JobInfo>, Self::Error> {
match self.fetch_job_from_queue(queue).await? {
Some(mut job) => {
let now = SystemTime::now();
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
job.run();
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
self.update_stats(Stats::run_job).await?;
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
let mut job = self.fetch_job_from_queue(queue).await?;
Ok(Some(job))
} else {
info!(
"Not fetching job {}, it is not ready for processing",
job.id()
);
self.queue_job(job.queue(), job.id()).await?;
Ok(None)
}
let now = SystemTime::now();
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
job.run();
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
self.update_stats(Stats::run_job).await?;
return Ok(job);
} else {
warn!(
"Not fetching job {}, it is not ready for processing",
job.id()
);
self.queue_job(job.queue(), job.id()).await?;
}
None => Ok(None),
}
}
@ -136,54 +130,66 @@ pub trait Storage: Clone + Send {
/// A default, in-memory implementation of a storage mechanism
pub mod memory_storage {
use super::{JobInfo, Stats};
use async_mutex::Mutex;
use std::{collections::HashMap, convert::Infallible, sync::Arc, time::SystemTime};
use event_listener::Event;
use std::{
collections::HashMap,
convert::Infallible,
sync::Arc,
sync::Mutex,
time::{Duration, SystemTime},
};
use uuid::Uuid;
#[derive(Clone)]
/// An In-Memory store for jobs
pub struct Storage {
inner: Arc<Mutex<Inner>>,
/// Allows memory storage to set timeouts for when to retry checking a queue for a job
#[async_trait::async_trait]
pub trait Timer {
/// Race a future against the clock, returning an empty tuple if the clock wins
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where
F: std::future::Future;
}
#[derive(Clone)]
/// An In-Memory store for jobs
pub struct Storage<T> {
timer: T,
inner: Arc<Mutex<Inner>>,
}
struct Inner {
queues: HashMap<String, Event>,
jobs: HashMap<Uuid, JobInfo>,
queues: HashMap<Uuid, String>,
job_queues: HashMap<Uuid, String>,
worker_ids: HashMap<Uuid, Uuid>,
worker_ids_inverse: HashMap<Uuid, Uuid>,
stats: Stats,
}
impl Storage {
impl<T: Timer> Storage<T> {
/// Create a new, empty job store
pub fn new() -> Self {
pub fn new(timer: T) -> Self {
Storage {
inner: Arc::new(Mutex::new(Inner {
jobs: HashMap::new(),
queues: HashMap::new(),
jobs: HashMap::new(),
job_queues: HashMap::new(),
worker_ids: HashMap::new(),
worker_ids_inverse: HashMap::new(),
stats: Stats::default(),
})),
timer,
}
}
}
impl Default for Storage {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl super::Storage for Storage {
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
type Error = Infallible;
async fn generate_id(&self) -> Result<Uuid, Self::Error> {
let uuid = loop {
let uuid = Uuid::new_v4();
if !self.inner.lock().await.jobs.contains_key(&uuid) {
if !self.inner.lock().unwrap().jobs.contains_key(&uuid) {
break uuid;
}
};
@ -192,51 +198,94 @@ pub mod memory_storage {
}
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> {
self.inner.lock().await.jobs.insert(job.id(), job);
self.inner.lock().unwrap().jobs.insert(job.id(), job);
Ok(())
}
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
let j = self.inner.lock().await.jobs.get(&id).cloned();
let j = self.inner.lock().unwrap().jobs.get(&id).cloned();
Ok(j)
}
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error> {
let mut inner = self.inner.lock().await;
let now = SystemTime::now();
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error> {
loop {
let listener = {
let mut inner = self.inner.lock().unwrap();
let now = SystemTime::now();
let j = inner
.queues
.iter()
.filter_map(|(k, v)| {
if v == queue {
let job = inner.jobs.get(k)?;
let j = inner.job_queues.iter().find_map(|(k, v)| {
if v == queue {
let job = inner.jobs.get(k)?;
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
return Some(job.clone());
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
return Some(job.clone());
}
}
}
None
})
.next();
None
});
if let Some(ref j) = j {
inner.queues.remove(&j.id());
let duration = if let Some(j) = j {
if inner.job_queues.remove(&j.id()).is_some() {
return Ok(j);
} else {
continue;
}
} else {
inner.job_queues.iter().fold(
Duration::from_secs(5),
|duration, (id, v_queue)| {
if v_queue == queue {
if let Some(job) = inner.jobs.get(id) {
if let Some(ready_at) = job.next_queue() {
let job_eta = ready_at
.duration_since(now)
.unwrap_or(Duration::from_secs(0));
if job_eta < duration {
return job_eta;
}
}
}
}
duration
},
)
};
self.timer.timeout(
duration,
inner
.queues
.entry(queue.to_string())
.or_insert(Event::new())
.listen(),
)
};
let _ = listener.await;
}
Ok(j)
}
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> {
self.inner.lock().await.queues.insert(id, queue.to_owned());
let mut inner = self.inner.lock().unwrap();
inner.job_queues.insert(id, queue.to_owned());
inner
.queues
.entry(queue.to_string())
.or_insert(Event::new())
.notify(1);
Ok(())
}
async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock().unwrap();
inner.worker_ids.insert(id, worker_id);
inner.worker_ids_inverse.insert(worker_id, id);
@ -244,9 +293,9 @@ pub mod memory_storage {
}
async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock().unwrap();
inner.jobs.remove(&id);
inner.queues.remove(&id);
inner.job_queues.remove(&id);
if let Some(worker_id) = inner.worker_ids.remove(&id) {
inner.worker_ids_inverse.remove(&worker_id);
}
@ -254,14 +303,14 @@ pub mod memory_storage {
}
async fn get_stats(&self) -> Result<Stats, Self::Error> {
Ok(self.inner.lock().await.stats.clone())
Ok(self.inner.lock().unwrap().stats.clone())
}
async fn update_stats<F>(&self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats + Send,
{
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock().unwrap();
inner.stats = (f)(inner.stats.clone());
Ok(())

View file

@ -13,10 +13,10 @@ edition = "2021"
[dependencies]
actix-rt = "2.0.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.12.0", path = "../jobs-core" }
background-jobs-core = { version = "0.13.0", path = "../jobs-core" }
bincode = "1.2"
sled = "0.34"
serde_cbor = "0.11"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt"] }
uuid = { version = "0.8.1", features = ["v4", "serde"] }
tokio = { version = "1", default-features = false, features = ["rt", "sync"] }
uuid = { version = "1", features = ["v4", "serde"] }

View file

@ -13,10 +13,18 @@
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
//! ```
use actix_rt::task::{spawn_blocking, JoinError};
use actix_rt::{
task::{spawn_blocking, JoinError},
time::timeout,
};
use background_jobs_core::{JobInfo, Stats};
use sled::{Db, Tree};
use std::time::SystemTime;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use tokio::sync::Notify;
use uuid::Uuid;
/// The error produced by sled storage calls
@ -47,7 +55,8 @@ pub struct Storage {
running_inverse: Tree,
queue: Tree,
stats: Tree,
db: Db,
notifiers: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
_db: Db,
}
#[async_trait::async_trait]
@ -103,18 +112,59 @@ impl background_jobs_core::Storage for Storage {
.await??)
}
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>> {
let this = self.clone();
let queue = queue.to_owned();
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo> {
loop {
let this = self.clone();
let queue2 = queue.to_owned();
Ok(spawn_blocking(move || {
let mut job;
let job = spawn_blocking(move || {
let queue = queue2;
let mut job;
let now = SystemTime::now();
let now = SystemTime::now();
while {
let job_opt = this
.queue
while {
let job_opt = this
.queue
.iter()
.filter_map(|res| res.ok())
.filter_map(|(id, in_queue)| {
if queue.as_bytes() == in_queue.as_ref() {
Some(id)
} else {
None
}
})
.filter_map(|id| this.jobinfo.get(id).ok())
.flatten()
.filter_map(|ivec| serde_cbor::from_slice(&ivec).ok())
.find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now));
job = if let Some(job) = job_opt {
job
} else {
return Ok(None);
};
this.queue.remove(job.id().as_bytes())?.is_none()
} {}
Ok(Some(job)) as Result<Option<JobInfo>>
})
.await??;
if let Some(job) = job {
return Ok(job);
}
let this = self.clone();
let queue2 = queue.to_owned();
let duration = spawn_blocking(move || {
let queue = queue2;
let now = SystemTime::now();
this.queue
.iter()
.filter_map(|res| res.ok())
.filter_map(|(id, in_queue)| {
@ -127,27 +177,36 @@ impl background_jobs_core::Storage for Storage {
.filter_map(|id| this.jobinfo.get(id).ok())
.flatten()
.filter_map(|ivec| serde_cbor::from_slice(&ivec).ok())
.find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now));
.filter(|job: &JobInfo| !job.is_ready(now) && job.is_pending(now))
.fold(Duration::from_secs(5), |duration, job| {
if let Some(next_queue) = job.next_queue() {
let job_duration = next_queue
.duration_since(now)
.unwrap_or(Duration::from_secs(0));
job = if let Some(job) = job_opt {
job
} else {
return Ok(None);
};
if job_duration < duration {
return job_duration;
}
}
this.queue.remove(job.id().as_bytes())?.is_none()
} {}
duration
})
})
.await?;
Ok(Some(job)) as Result<Option<JobInfo>>
})
.await??)
let notifier = self.notifier(queue.to_owned());
let _ = timeout(duration, notifier.notified()).await;
}
}
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> {
let this = self.clone();
let queue = queue.to_owned();
let queue2 = queue.to_owned();
spawn_blocking(move || {
let queue = queue2;
Ok(spawn_blocking(move || {
if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? {
this.running.remove(runner_id)?;
}
@ -156,7 +215,11 @@ impl background_jobs_core::Storage for Storage {
Ok(()) as Result<_>
})
.await??)
.await??;
self.notify(queue.to_owned());
Ok(())
}
async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> {
@ -243,9 +306,28 @@ impl Storage {
running_inverse: db.open_tree("background-jobs-running-inverse")?,
queue: db.open_tree("background-jobs-queue")?,
stats: db.open_tree("background-jobs-stats")?,
db,
notifiers: Arc::new(Mutex::new(HashMap::new())),
_db: db,
})
}
fn notifier(&self, queue: String) -> Arc<Notify> {
self.notifiers
.lock()
.unwrap()
.entry(queue)
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
fn notify(&self, queue: String) {
self.notifiers
.lock()
.unwrap()
.entry(queue)
.or_insert_with(|| Arc::new(Notify::new()))
.notify_one();
}
}
impl From<JoinError> for Error {