diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index e260374..a0c397a 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -16,8 +16,10 @@ async-trait = "0.1.24" background-jobs-core = { version = "0.16.0", path = "../jobs-core" } bincode = "1.2" sled = "0.34" +serde = { version = "1", features = ["derive"] } serde_cbor = "0.11" +time = { version = "0.3", features = ["serde-human-readable"] } thiserror = "1.0" tokio = { version = "1", default-features = false, features = ["rt", "sync"] } tracing = "0.1" -uuid = { version = "1", features = ["v4", "serde"] } +uuid = { version = "1", features = ["v7", "serde"] } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index b92af41..1884686 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -13,48 +13,76 @@ //! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! ``` -use actix_rt::{ - task::{spawn_blocking, JoinError}, - time::timeout, -}; -use background_jobs_core::JobInfo; +use actix_rt::task::JoinError; +use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo}; use sled::{Db, Tree}; use std::{ collections::HashMap, + ops::Bound, sync::{Arc, Mutex}, - time::{Duration, Instant, SystemTime}, + time::Duration, }; use tokio::sync::Notify; -use uuid::Uuid; +use uuid::{NoContext, Timestamp, Uuid}; /// The error produced by sled storage calls #[derive(Debug, thiserror::Error)] pub enum Error { /// Error in the database - #[error("Error in sled extensions, {0}")] + #[error("Error in sled extensions")] Sled(#[from] sled::Error), - /// Error storing or retrieving job info - #[error("Error transforming job info, {0}")] + /// Error in cbor + #[error("Error in cbor")] Cbor(#[from] serde_cbor::Error), + /// Conflict while updating record + #[error("Conflict while updating record")] + Conflict, + + /// Missing record + #[error("Missing record")] + Missing, + /// Error executing db operation #[error("Blocking operation was canceled")] Canceled, } +#[derive(serde::Serialize, serde::Deserialize)] +struct JobMeta { + id: Uuid, + state: Option, +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct JobState { + runner_id: Uuid, + heartbeat: time::OffsetDateTime, +} + +struct JobKey { + queue: String, + next_queue_id: Uuid, +} + +fn encode_key(key: &JobKey) -> Vec { + let mut v = Vec::with_capacity(key.queue.len() + 17); + v.extend_from_slice(key.queue.as_bytes()); + v.push(b','); + v.extend_from_slice(key.next_queue_id.as_bytes()); + v +} + /// A simple alias for Result pub type Result = std::result::Result; #[derive(Clone)] /// The Sled-backed storage implementation pub struct Storage { - id: Tree, - jobinfo: Tree, - running: Tree, - running_inverse: Tree, - queue: Tree, - notifiers: Arc>>>, + jobs: Tree, + queue_jobs: Tree, + queues: Arc>>>, _db: Db, } @@ -62,201 +90,61 @@ pub struct Storage { impl background_jobs_core::Storage for Storage { type Error = Error; - async fn generate_id(&self) -> Result { - let this = self.clone(); - - Ok(spawn_blocking(move || { - let mut uuid; - while { - uuid = Uuid::new_v4(); - - this.id - .compare_and_swap( - uuid.as_bytes(), - None as Option<&[u8]>, - Some(uuid.as_bytes()), - )? - .is_err() - } {} - - Ok(uuid) as Result - }) - .await??) + async fn push(&self, job: NewJobInfo) -> Result { + self.insert(job.build()) } - async fn save_job(&self, job: JobInfo) -> Result<()> { - let this = self.clone(); - - Ok(spawn_blocking(move || { - let job_vec = serde_cbor::to_vec(&job)?; - - this.jobinfo.insert(job.id().as_bytes(), job_vec)?; - - Ok(()) as Result<_> - }) - .await??) - } - - async fn fetch_job(&self, id: Uuid) -> Result> { - let this = self.clone(); - - Ok(spawn_blocking(move || { - if let Some(job_ivec) = this.jobinfo.get(id.as_bytes())? { - let job: JobInfo = serde_cbor::from_slice(&job_ivec)?; - Ok(Some(job)) as Result<_> - } else { - Ok(None) - } - }) - .await??) - } - - async fn fetch_job_from_queue(&self, queue: &str) -> Result { + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { loop { - let notifier = self.notifier(queue.to_owned()); + let notifier = self.notifier(queue.to_string()); - let notified = notifier.notified(); - tokio::pin!(notified); - notified.as_mut().enable(); - - let this = self.clone(); - let queue2 = queue.to_owned(); - - let job = spawn_blocking(move || { - let queue = queue2; - let mut job; - - let now = SystemTime::now(); - - while { - let job_opt = this - .queue - .iter() - .filter_map(|res| res.ok()) - .filter_map(|(id, in_queue)| { - if queue.as_bytes() == in_queue.as_ref() { - Some(id) - } else { - None - } - }) - .filter_map(|id| this.jobinfo.get(id).ok()) - .flatten() - .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) - .find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now)); - - job = if let Some(job) = job_opt { - job - } else { - return Ok(None); - }; - - this.queue.remove(job.id().as_bytes())?.is_none() - } {} - - Ok(Some(job)) as Result> - }) - .await??; - - if let Some(job) = job { + if let Some(job) = self.try_pop(queue.to_string(), runner_id)? { return Ok(job); } - let this = self.clone(); - let queue2 = queue.to_owned(); + let duration = self + .next_duration(queue.to_string()) + .unwrap_or(Duration::from_secs(5)); - let duration = spawn_blocking(move || { - let queue = queue2; - let now = SystemTime::now(); - - this.queue - .iter() - .filter_map(|res| res.ok()) - .filter_map(|(id, in_queue)| { - if queue.as_bytes() == in_queue.as_ref() { - Some(id) - } else { - None - } - }) - .filter_map(|id| this.jobinfo.get(id).ok()) - .flatten() - .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) - .filter(|job: &JobInfo| !job.is_ready(now) && job.is_pending(now)) - .fold(Duration::from_secs(5), |duration, job| { - if let Some(next_queue) = job.next_queue() { - let job_duration = next_queue - .duration_since(now) - .unwrap_or(Duration::from_secs(0)); - - if job_duration < duration { - return job_duration; - } - } - - duration - }) - }) - .await?; - - let before = Instant::now(); - tracing::debug!("Waiting for notification for at most {:?}", duration); - let _ = timeout(duration, notified).await; - tracing::debug!("Notified after {:?}", before.elapsed()); + match tokio::time::timeout(duration, notifier.notified()).await { + Ok(()) => { + // notified + } + Err(_) => { + // timeout + } + } } } - async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> { - let this = self.clone(); - let queue2 = queue.to_owned(); - - spawn_blocking(move || { - let queue = queue2; - - if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? { - this.running.remove(runner_id)?; - } - - this.queue.insert(id.as_bytes(), queue.as_bytes())?; - - Ok(()) as Result<_> - }) - .await??; - - self.notify(queue.to_owned()); - - Ok(()) + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> { + self.set_heartbeat(job_id, runner_id) } - async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> { - let this = self.clone(); + async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<()> { + let mut job = if let Some(job) = self.remove_job(id)? { + job + } else { + return Ok(()); + }; - Ok(spawn_blocking(move || { - this.queue.remove(id.as_bytes())?; - this.running.insert(runner_id.as_bytes(), id.as_bytes())?; - this.running_inverse - .insert(id.as_bytes(), runner_id.as_bytes())?; - - Ok(()) as Result<()> - }) - .await??) - } - - async fn delete_job(&self, id: Uuid) -> Result<()> { - let this = self.clone(); - - Ok(spawn_blocking(move || { - this.jobinfo.remove(id.as_bytes())?; - this.queue.remove(id.as_bytes())?; - this.id.remove(id.as_bytes())?; - - if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? { - this.running.remove(runner_id)?; + match result { + JobResult::Success => { + // ok + Ok(()) } + JobResult::Unexecuted | JobResult::Unregistered => { + // TODO: handle + Ok(()) + } + JobResult::Failure => { + if job.prepare_retry() { + self.insert(job)?; + } - Ok(()) as Result<()> - }) - .await??) + Ok(()) + } + } } } @@ -264,18 +152,15 @@ impl Storage { /// Create a new Storage struct pub fn new(db: Db) -> Result { Ok(Storage { - id: db.open_tree("background-jobs-id")?, - jobinfo: db.open_tree("background-jobs-jobinfo")?, - running: db.open_tree("background-jobs-running")?, - running_inverse: db.open_tree("background-jobs-running-inverse")?, - queue: db.open_tree("background-jobs-queue")?, - notifiers: Arc::new(Mutex::new(HashMap::new())), + jobs: db.open_tree("background-jobs-jobs")?, + queue_jobs: db.open_tree("background-jobs-queue-jobs")?, + queues: Arc::new(Mutex::new(HashMap::new())), _db: db, }) } fn notifier(&self, queue: String) -> Arc { - self.notifiers + self.queues .lock() .unwrap() .entry(queue) @@ -284,13 +169,203 @@ impl Storage { } fn notify(&self, queue: String) { - self.notifiers + self.queues .lock() .unwrap() .entry(queue) .or_insert_with(|| Arc::new(Notify::new())) .notify_one(); } + + fn try_pop(&self, queue: String, runner_id: Uuid) -> Result> { + let lower_bound = encode_key(&JobKey { + queue: queue.clone(), + next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)), + }); + let upper_bound = encode_key(&JobKey { + queue: queue.clone(), + next_queue_id: Uuid::now_v7(), + }); + let now = time::OffsetDateTime::now_utc(); + + for res in self + .queue_jobs + .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) + { + let (key, ivec) = res?; + + if let Ok(JobMeta { id, state }) = serde_cbor::from_slice(&ivec) { + if state.is_none() + || state.is_some_and(|JobState { heartbeat, .. }| { + heartbeat + time::Duration::seconds(30) < now + }) + { + let new_bytes = serde_cbor::to_vec(&JobMeta { + id, + state: Some(JobState { + runner_id, + heartbeat: now, + }), + })?; + + match self + .queue_jobs + .compare_and_swap(key, Some(ivec), Some(new_bytes))? + { + Ok(()) => { + // success + if let Some(job) = self.jobs.get(id.as_bytes())? { + return Ok(Some(serde_cbor::from_slice(&job)?)); + } + } + Err(_) => { + // conflict + } + } + + break; + } + } + } + + Ok(None) + } + + fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> { + let queue = if let Some(job) = self.jobs.get(job_id.as_bytes())? { + let job: JobInfo = serde_cbor::from_slice(&job)?; + job.queue + } else { + return Ok(()); + }; + + let lower_bound = encode_key(&JobKey { + queue: queue.clone(), + next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)), + }); + let upper_bound = encode_key(&JobKey { + queue, + next_queue_id: Uuid::now_v7(), + }); + + for res in self + .queue_jobs + .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) + { + let (key, ivec) = res?; + + if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) { + if id == job_id { + let new_bytes = serde_cbor::to_vec(&JobMeta { + id, + state: Some(JobState { + runner_id, + heartbeat: time::OffsetDateTime::now_utc(), + }), + })?; + + match self + .queue_jobs + .compare_and_swap(key, Some(ivec), Some(new_bytes))? + { + Ok(()) => { + // success + return Ok(()); + } + Err(_) => { + // conflict + return Err(Error::Conflict); + } + } + } + } + } + + Err(Error::Missing) + } + + fn remove_job(&self, job_id: Uuid) -> Result> { + let job: JobInfo = if let Some(job) = self.jobs.remove(job_id.as_bytes())? { + serde_cbor::from_slice(&job)? + } else { + return Ok(None); + }; + + let lower_bound = encode_key(&JobKey { + queue: job.queue.clone(), + next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)), + }); + let upper_bound = encode_key(&JobKey { + queue: job.queue.clone(), + next_queue_id: Uuid::now_v7(), + }); + + for res in self + .queue_jobs + .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) + { + let (key, ivec) = res?; + + if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) { + if id == job_id { + self.queue_jobs.remove(key)?; + return Ok(Some(job)); + } + } + } + + Err(Error::Missing) + } + + fn next_duration(&self, pop_queue: String) -> Option { + let lower_bound = encode_key(&JobKey { + queue: pop_queue.clone(), + next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)), + }); + + let now = time::OffsetDateTime::now_utc(); + + self.queue_jobs + .range((Bound::Excluded(lower_bound), Bound::Unbounded)) + .values() + .filter_map(|res| res.ok()) + .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) + .filter(|JobMeta { state, .. }| state.is_none()) + .filter_map(|JobMeta { id, .. }| self.jobs.get(id.as_bytes()).ok()?) + .filter_map(|ivec| serde_cbor::from_slice::(&ivec).ok()) + .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str()) + .map(|JobInfo { next_queue, .. }| { + if next_queue > now { + next_queue - now + } else { + time::Duration::seconds(0) + } + }) + .find_map(|d| d.try_into().ok()) + } + + fn insert(&self, job: JobInfo) -> Result { + let id = job.id; + let queue = job.queue.clone(); + let next_queue_id = job.next_queue_id(); + + let job_bytes = serde_cbor::to_vec(&job)?; + + self.jobs.insert(id.as_bytes(), job_bytes)?; + + let key_bytes = encode_key(&JobKey { + queue: queue.clone(), + next_queue_id, + }); + + let job_meta_bytes = serde_cbor::to_vec(&JobMeta { id, state: None })?; + + self.queue_jobs.insert(key_bytes, job_meta_bytes)?; + + self.notify(queue); + + Ok(id) + } } impl From for Error {