diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index c2284ae..57a7968 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -65,13 +65,23 @@ pub mod memory_storage { inner: Arc>, } - type OrderedKey = (String, Uuid); - type JobState = Option<(Uuid, OffsetDateTime)>; - type JobMeta = (Uuid, time::Duration, JobState); + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct QueueTimeId(Uuid); + + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct JobId(Uuid); + + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct RunnerId(Uuid); + + type OrderedKey = (String, QueueTimeId); + type JobState = Option<(RunnerId, OffsetDateTime)>; + type JobMeta = (JobId, time::Duration, JobState); + type QueueMeta = (JobInfo, QueueTimeId); struct Inner { queues: HashMap, - jobs: HashMap, + jobs: HashMap, queue_jobs: BTreeMap, } @@ -89,11 +99,16 @@ pub mod memory_storage { } fn get(&self, job_id: Uuid) -> Option { - self.inner.lock().unwrap().jobs.get(&job_id).cloned() + self.inner + .lock() + .unwrap() + .jobs + .get(&JobId(job_id)) + .map(|(job_info, _)| job_info.clone()) } fn listener(&self, pop_queue: String) -> (Pin>, Duration) { - let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); + let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0))); let now = OffsetDateTime::now_utc(); let mut inner = self.inner.lock().unwrap(); @@ -108,8 +123,8 @@ pub mod memory_storage { )) .filter(|(_, (_, _, meta))| meta.is_none()) .filter_map(|(_, (id, _, _))| inner.jobs.get(id)) - .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str()) - .map(|JobInfo { next_queue, .. }| { + .take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str()) + .map(|(JobInfo { next_queue, .. }, _)| { if *next_queue > now { *next_queue - now } else { @@ -122,8 +137,10 @@ pub mod memory_storage { } fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option { - let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); - let upper_bound = Uuid::now_v7(); + let runner_id = RunnerId(runner_id); + + let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0))); + let upper_bound = QueueTimeId(Uuid::now_v7()); let now = time::OffsetDateTime::now_utc(); let mut inner = self.inner.lock().unwrap(); @@ -135,7 +152,9 @@ pub mod memory_storage { Bound::Included((queue.to_string(), upper_bound)), )) { if job_meta.is_none() - || job_meta.is_some_and(|(_, h)| h + (5 * *heartbeat_interval) < now) + || job_meta.is_some_and(|(_, heartbeat_timestamp)| { + heartbeat_timestamp + (5 * *heartbeat_interval) < now + }) { *job_meta = Some((runner_id, now)); pop_job = Some(*job_id); @@ -144,73 +163,60 @@ pub mod memory_storage { } if let Some(job_id) = pop_job { - return inner.jobs.get(&job_id).cloned(); + return inner + .jobs + .get(&job_id) + .map(|(job_info, _)| job_info.clone()); } None } fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) { - let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); - let upper_bound = Uuid::now_v7(); + let job_id = JobId(job_id); + let runner_id = RunnerId(runner_id); let mut inner = self.inner.lock().unwrap(); - let queue = if let Some(job) = inner.jobs.get(&job_id) { - job.queue.clone() + let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) { + (job.queue.clone(), *queue_time_id) } else { return; }; - for (_, (found_job_id, _, found_job_meta)) in inner.queue_jobs.range_mut(( - Bound::Excluded((queue.clone(), lower_bound)), - Bound::Included((queue, upper_bound)), - )) { - if *found_job_id == job_id { - *found_job_meta = Some((runner_id, OffsetDateTime::now_utc())); - return; - } + if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) { + *found_job_meta = Some((runner_id, OffsetDateTime::now_utc())); + } else { + metrics::counter!("background-jobs.core.heartbeat.missing-queue-job").increment(1); + tracing::warn!("Missing job meta for {queue_key:?}"); } } fn remove_job(&self, job_id: Uuid) -> Option { - let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); - let upper_bound = Uuid::now_v7(); + let job_id = JobId(job_id); let mut inner = self.inner.lock().unwrap(); - let job = inner.jobs.remove(&job_id)?; + let (job, queue_time_id) = inner.jobs.remove(&job_id)?; + let queue_key = (job.queue.clone(), queue_time_id); - let mut key = None; - - for (found_key, (found_job_id, _, _)) in inner.queue_jobs.range_mut(( - Bound::Excluded((job.queue.clone(), lower_bound)), - Bound::Included((job.queue.clone(), upper_bound)), - )) { - if *found_job_id == job_id { - key = Some(found_key.clone()); - break; - } - } - - if let Some(key) = key { - if inner.queue_jobs.remove(&key).is_none() { - tracing::warn!("failed to remove {key:?}"); - } + if inner.queue_jobs.remove(&queue_key).is_none() { + metrics::counter!("background-jobs.core.remove.missing-queue-job").increment(1); + tracing::warn!("failed to remove job meta for {queue_key:?}"); } Some(job) } fn insert(&self, job: JobInfo) -> Uuid { - let id = job.id; + let id = JobId(job.id); let queue = job.queue.clone(); - let queue_time_id = job.next_queue_id(); + let queue_time_id = QueueTimeId(job.next_queue_id()); let heartbeat_interval = job.heartbeat_interval; let mut inner = self.inner.lock().unwrap(); - inner.jobs.insert(id, job); + inner.jobs.insert(id, (job, queue_time_id)); inner.queue_jobs.insert( (queue.clone(), queue_time_id), @@ -223,7 +229,7 @@ pub mod memory_storage { inner.queues.entry(queue).or_default().notify(1); - id + id.0 } }