diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 54cd945..c2284ae 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -67,7 +67,7 @@ pub mod memory_storage { type OrderedKey = (String, Uuid); type JobState = Option<(Uuid, OffsetDateTime)>; - type JobMeta = (Uuid, JobState); + type JobMeta = (Uuid, time::Duration, JobState); struct Inner { queues: HashMap, @@ -106,8 +106,8 @@ pub mod memory_storage { Bound::Excluded((pop_queue.clone(), lower_bound)), Bound::Unbounded, )) - .filter(|(_, (_, meta))| meta.is_none()) - .filter_map(|(_, (id, _))| inner.jobs.get(id)) + .filter(|(_, (_, _, meta))| meta.is_none()) + .filter_map(|(_, (id, _, _))| inner.jobs.get(id)) .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str()) .map(|JobInfo { next_queue, .. }| { if *next_queue > now { @@ -130,12 +130,12 @@ pub mod memory_storage { let mut pop_job = None; - for (_, (job_id, job_meta)) in inner.queue_jobs.range_mut(( + for (_, (job_id, heartbeat_interval, job_meta)) in inner.queue_jobs.range_mut(( Bound::Excluded((queue.to_string(), lower_bound)), Bound::Included((queue.to_string(), upper_bound)), )) { if job_meta.is_none() - || job_meta.is_some_and(|(_, h)| h + time::Duration::seconds(30) < now) + || job_meta.is_some_and(|(_, h)| h + (5 * *heartbeat_interval) < now) { *job_meta = Some((runner_id, now)); pop_job = Some(*job_id); @@ -162,7 +162,7 @@ pub mod memory_storage { return; }; - for (_, (found_job_id, found_job_meta)) in inner.queue_jobs.range_mut(( + for (_, (found_job_id, _, found_job_meta)) in inner.queue_jobs.range_mut(( Bound::Excluded((queue.clone(), lower_bound)), Bound::Included((queue, upper_bound)), )) { @@ -183,7 +183,7 @@ pub mod memory_storage { let mut key = None; - for (found_key, (found_job_id, _)) in inner.queue_jobs.range_mut(( + for (found_key, (found_job_id, _, _)) in inner.queue_jobs.range_mut(( Bound::Excluded((job.queue.clone(), lower_bound)), Bound::Included((job.queue.clone(), upper_bound)), )) { @@ -206,14 +206,20 @@ pub mod memory_storage { let id = job.id; let queue = job.queue.clone(); let queue_time_id = job.next_queue_id(); + let heartbeat_interval = job.heartbeat_interval; let mut inner = self.inner.lock().unwrap(); inner.jobs.insert(id, job); - inner - .queue_jobs - .insert((queue.clone(), queue_time_id), (id, None)); + inner.queue_jobs.insert( + (queue.clone(), queue_time_id), + ( + id, + time::Duration::milliseconds(heartbeat_interval as _), + None, + ), + ); inner.queues.entry(queue).or_default().notify(1); @@ -225,16 +231,19 @@ pub mod memory_storage { impl super::Storage for Storage { type Error = Infallible; + #[tracing::instrument(skip(self))] async fn info(&self, job_id: Uuid) -> Result, Self::Error> { Ok(self.get(job_id)) } /// push a job into the queue + #[tracing::instrument(skip_all)] async fn push(&self, job: NewJobInfo) -> Result { Ok(self.insert(job.build())) } /// pop a job from the provided queue + #[tracing::instrument(skip(self))] async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { loop { let (listener, duration) = self.listener(queue.to_string()); @@ -255,12 +264,14 @@ pub mod memory_storage { } /// mark a job as being actively worked on + #[tracing::instrument(skip(self))] async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> { self.set_heartbeat(job_id, runner_id); Ok(()) } /// "Return" a job to the database, marking it for retry if needed + #[tracing::instrument(skip(self))] async fn complete( &self, ReturnJobInfo { id, result }: ReturnJobInfo, diff --git a/jobs-postgres/src/lib.rs b/jobs-postgres/src/lib.rs index fe32e99..1cc13c0 100644 --- a/jobs-postgres/src/lib.rs +++ b/jobs-postgres/src/lib.rs @@ -217,6 +217,7 @@ impl From for JobInfo { impl background_jobs_core::Storage for Storage { type Error = PostgresError; + #[tracing::instrument] async fn info( &self, job_id: Uuid, @@ -245,10 +246,12 @@ impl background_jobs_core::Storage for Storage { } } + #[tracing::instrument(skip_all)] async fn push(&self, job: NewJobInfo) -> Result { self.insert(job.build()).await } + #[tracing::instrument(skip(self))] async fn pop(&self, in_queue: &str, in_runner_id: Uuid) -> Result { loop { tracing::trace!("pop: looping"); @@ -370,6 +373,7 @@ impl background_jobs_core::Storage for Storage { } } + #[tracing::instrument(skip(self))] async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; @@ -390,6 +394,7 @@ impl background_jobs_core::Storage for Storage { Ok(()) } + #[tracing::instrument(skip(self))] async fn complete(&self, return_job_info: ReturnJobInfo) -> Result { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 5821255..2916ea4 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -36,6 +36,10 @@ pub enum Error { #[error("Error in cbor")] Cbor(#[from] serde_cbor::Error), + /// Error spawning task + #[error("Failed to spawn blocking task")] + Spawn(#[from] std::io::Error), + /// Conflict while updating record #[error("Conflict while updating record")] Conflict, @@ -52,6 +56,7 @@ pub enum Error { #[derive(serde::Serialize, serde::Deserialize)] struct JobMeta { id: Uuid, + heartbeat_interval: time::Duration, state: Option, } @@ -80,9 +85,13 @@ pub type Result = std::result::Result; #[derive(Clone)] /// The Sled-backed storage implementation pub struct Storage { + inner: Arc, +} + +struct Inner { jobs: Tree, queue_jobs: Tree, - queues: Arc>>>, + queues: Mutex>>, _db: Db, } @@ -90,25 +99,49 @@ pub struct Storage { impl background_jobs_core::Storage for Storage { type Error = Error; + #[tracing::instrument(skip(self))] async fn info(&self, job_id: Uuid) -> Result> { - self.get(job_id) + let this = self.clone(); + + tokio::task::Builder::new() + .name("jobs-info") + .spawn_blocking(move || this.get(job_id))? + .await? } + #[tracing::instrument(skip_all)] async fn push(&self, job: NewJobInfo) -> Result { - self.insert(job.build()) + let this = self.clone(); + + tokio::task::Builder::new() + .name("jobs-push") + .spawn_blocking(move || this.insert(job.build()))? + .await? } + #[tracing::instrument(skip(self))] async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { loop { let notifier = self.notifier(queue.to_string()); - if let Some(job) = self.try_pop(queue.to_string(), runner_id)? { + let this = self.clone(); + let queue2 = queue.to_string(); + if let Some(job) = tokio::task::Builder::new() + .name("jobs-try-pop") + .spawn_blocking(move || this.try_pop(queue2, runner_id))? + .await?? + { return Ok(job); } - let duration = self - .next_duration(queue.to_string()) - .unwrap_or(Duration::from_secs(5)); + let this = self.clone(); + let queue2 = queue.to_string(); + let duration = tokio::task::Builder::new() + .name("jobs-next-duration") + .spawn_blocking(move || { + this.next_duration(queue2).unwrap_or(Duration::from_secs(5)) + })? + .await?; match tokio::time::timeout(duration, notifier.notified()).await { Ok(()) => { @@ -121,12 +154,24 @@ impl background_jobs_core::Storage for Storage { } } + #[tracing::instrument(skip(self))] async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> { - self.set_heartbeat(job_id, runner_id) + let this = self.clone(); + + tokio::task::Builder::new() + .name("jobs-heartbeat") + .spawn_blocking(move || this.set_heartbeat(job_id, runner_id))? + .await? } + #[tracing::instrument(skip(self))] async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result { - let mut job = if let Some(job) = self.remove_job(id)? { + let this = self.clone(); + let mut job = if let Some(job) = tokio::task::Builder::new() + .name("jobs-remove") + .spawn_blocking(move || this.remove_job(id))? + .await?? + { job } else { return Ok(true); @@ -137,12 +182,20 @@ impl background_jobs_core::Storage for Storage { JobResult::Success => Ok(true), // Unregistered or Unexecuted jobs are restored as-is JobResult::Unexecuted | JobResult::Unregistered => { - self.insert(job)?; + let this = self.clone(); + tokio::task::Builder::new() + .name("jobs-requeue") + .spawn_blocking(move || this.insert(job))? + .await??; Ok(false) } // retryable failed jobs are restored JobResult::Failure if job.prepare_retry() => { - self.insert(job)?; + let this = self.clone(); + tokio::task::Builder::new() + .name("jobs-requeue") + .spawn_blocking(move || this.insert(job))? + .await??; Ok(false) } // dead jobs are removed @@ -155,15 +208,17 @@ impl Storage { /// Create a new Storage struct pub fn new(db: Db) -> Result { Ok(Storage { - jobs: db.open_tree("background-jobs-jobs")?, - queue_jobs: db.open_tree("background-jobs-queue-jobs")?, - queues: Arc::new(Mutex::new(HashMap::new())), - _db: db, + inner: Arc::new(Inner { + jobs: db.open_tree("background-jobs-jobs")?, + queue_jobs: db.open_tree("background-jobs-queue-jobs")?, + queues: Mutex::new(HashMap::new()), + _db: db, + }), }) } fn get(&self, job_id: Uuid) -> Result> { - if let Some(ivec) = self.jobs.get(job_id.as_bytes())? { + if let Some(ivec) = self.inner.jobs.get(job_id.as_bytes())? { let job_info = serde_cbor::from_slice(&ivec)?; Ok(Some(job_info)) @@ -173,7 +228,8 @@ impl Storage { } fn notifier(&self, queue: String) -> Arc { - self.queues + self.inner + .queues .lock() .unwrap() .entry(queue) @@ -182,7 +238,8 @@ impl Storage { } fn notify(&self, queue: String) { - self.queues + self.inner + .queues .lock() .unwrap() .entry(queue) @@ -202,32 +259,40 @@ impl Storage { let now = time::OffsetDateTime::now_utc(); for res in self + .inner .queue_jobs .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) { let (key, ivec) = res?; - if let Ok(JobMeta { id, state }) = serde_cbor::from_slice(&ivec) { + if let Ok(JobMeta { + id, + heartbeat_interval, + state, + }) = serde_cbor::from_slice(&ivec) + { if state.is_none() || state.is_some_and(|JobState { heartbeat, .. }| { - heartbeat + time::Duration::seconds(30) < now + heartbeat + (5 * heartbeat_interval) < now }) { let new_bytes = serde_cbor::to_vec(&JobMeta { id, + heartbeat_interval, state: Some(JobState { runner_id, heartbeat: now, }), })?; - match self - .queue_jobs - .compare_and_swap(key, Some(ivec), Some(new_bytes))? - { + match self.inner.queue_jobs.compare_and_swap( + key, + Some(ivec), + Some(new_bytes), + )? { Ok(()) => { // success - if let Some(job) = self.jobs.get(id.as_bytes())? { + if let Some(job) = self.inner.jobs.get(id.as_bytes())? { return Ok(Some(serde_cbor::from_slice(&job)?)); } } @@ -245,42 +310,37 @@ impl Storage { } fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> { - let queue = if let Some(job) = self.jobs.get(job_id.as_bytes())? { + let queue = if let Some(job) = self.inner.jobs.get(job_id.as_bytes())? { let job: JobInfo = serde_cbor::from_slice(&job)?; job.queue } else { return Ok(()); }; - let lower_bound = encode_key(&JobKey { - queue: queue.clone(), - next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)), - }); - let upper_bound = encode_key(&JobKey { - queue, - next_queue_id: Uuid::now_v7(), - }); - - for res in self - .queue_jobs - .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) - { + for res in self.inner.queue_jobs.scan_prefix(queue.as_bytes()) { let (key, ivec) = res?; - if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) { + if let Ok(JobMeta { + id, + heartbeat_interval, + .. + }) = serde_cbor::from_slice(&ivec) + { if id == job_id { let new_bytes = serde_cbor::to_vec(&JobMeta { id, + heartbeat_interval, state: Some(JobState { runner_id, heartbeat: time::OffsetDateTime::now_utc(), }), })?; - match self - .queue_jobs - .compare_and_swap(key, Some(ivec), Some(new_bytes))? - { + match self.inner.queue_jobs.compare_and_swap( + key, + Some(ivec), + Some(new_bytes), + )? { Ok(()) => { // success return Ok(()); @@ -298,7 +358,7 @@ impl Storage { } fn remove_job(&self, job_id: Uuid) -> Result> { - let job: JobInfo = if let Some(job) = self.jobs.remove(job_id.as_bytes())? { + let job: JobInfo = if let Some(job) = self.inner.jobs.remove(job_id.as_bytes())? { serde_cbor::from_slice(&job)? } else { return Ok(None); @@ -314,6 +374,7 @@ impl Storage { }); for res in self + .inner .queue_jobs .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound))) { @@ -321,7 +382,7 @@ impl Storage { if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) { if id == job_id { - self.queue_jobs.remove(key)?; + self.inner.queue_jobs.remove(key)?; return Ok(Some(job)); } } @@ -338,13 +399,14 @@ impl Storage { let now = time::OffsetDateTime::now_utc(); - self.queue_jobs + self.inner + .queue_jobs .range((Bound::Excluded(lower_bound), Bound::Unbounded)) .values() .filter_map(|res| res.ok()) .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) .filter(|JobMeta { state, .. }| state.is_none()) - .filter_map(|JobMeta { id, .. }| self.jobs.get(id.as_bytes()).ok()?) + .filter_map(|JobMeta { id, .. }| self.inner.jobs.get(id.as_bytes()).ok()?) .filter_map(|ivec| serde_cbor::from_slice::(&ivec).ok()) .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str()) .map(|JobInfo { next_queue, .. }| { @@ -361,19 +423,24 @@ impl Storage { let id = job.id; let queue = job.queue.clone(); let next_queue_id = job.next_queue_id(); + let heartbeat_interval = job.heartbeat_interval; let job_bytes = serde_cbor::to_vec(&job)?; - self.jobs.insert(id.as_bytes(), job_bytes)?; + self.inner.jobs.insert(id.as_bytes(), job_bytes)?; let key_bytes = encode_key(&JobKey { queue: queue.clone(), next_queue_id, }); - let job_meta_bytes = serde_cbor::to_vec(&JobMeta { id, state: None })?; + let job_meta_bytes = serde_cbor::to_vec(&JobMeta { + id, + heartbeat_interval: time::Duration::milliseconds(heartbeat_interval as _), + state: None, + })?; - self.queue_jobs.insert(key_bytes, job_meta_bytes)?; + self.inner.queue_jobs.insert(key_bytes, job_meta_bytes)?; self.notify(queue);