mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-27 14:30:59 +00:00
Store queue_time_id to avoid O(n) heartbeats & removals
This commit is contained in:
parent
005b8f851b
commit
f5163454da
1 changed files with 53 additions and 47 deletions
|
@ -65,13 +65,23 @@ pub mod memory_storage {
|
|||
inner: Arc<Mutex<Inner>>,
|
||||
}
|
||||
|
||||
type OrderedKey = (String, Uuid);
|
||||
type JobState = Option<(Uuid, OffsetDateTime)>;
|
||||
type JobMeta = (Uuid, time::Duration, JobState);
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct QueueTimeId(Uuid);
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct JobId(Uuid);
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct RunnerId(Uuid);
|
||||
|
||||
type OrderedKey = (String, QueueTimeId);
|
||||
type JobState = Option<(RunnerId, OffsetDateTime)>;
|
||||
type JobMeta = (JobId, time::Duration, JobState);
|
||||
type QueueMeta = (JobInfo, QueueTimeId);
|
||||
|
||||
struct Inner {
|
||||
queues: HashMap<String, Event>,
|
||||
jobs: HashMap<Uuid, JobInfo>,
|
||||
jobs: HashMap<JobId, QueueMeta>,
|
||||
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
|
||||
}
|
||||
|
||||
|
@ -89,11 +99,16 @@ pub mod memory_storage {
|
|||
}
|
||||
|
||||
fn get(&self, job_id: Uuid) -> Option<JobInfo> {
|
||||
self.inner.lock().unwrap().jobs.get(&job_id).cloned()
|
||||
self.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.jobs
|
||||
.get(&JobId(job_id))
|
||||
.map(|(job_info, _)| job_info.clone())
|
||||
}
|
||||
|
||||
fn listener(&self, pop_queue: String) -> (Pin<Box<EventListener>>, Duration) {
|
||||
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
@ -108,8 +123,8 @@ pub mod memory_storage {
|
|||
))
|
||||
.filter(|(_, (_, _, meta))| meta.is_none())
|
||||
.filter_map(|(_, (id, _, _))| inner.jobs.get(id))
|
||||
.take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
|
||||
.map(|JobInfo { next_queue, .. }| {
|
||||
.take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str())
|
||||
.map(|(JobInfo { next_queue, .. }, _)| {
|
||||
if *next_queue > now {
|
||||
*next_queue - now
|
||||
} else {
|
||||
|
@ -122,8 +137,10 @@ pub mod memory_storage {
|
|||
}
|
||||
|
||||
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
|
||||
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||
let upper_bound = Uuid::now_v7();
|
||||
let runner_id = RunnerId(runner_id);
|
||||
|
||||
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
|
||||
let upper_bound = QueueTimeId(Uuid::now_v7());
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
@ -135,7 +152,9 @@ pub mod memory_storage {
|
|||
Bound::Included((queue.to_string(), upper_bound)),
|
||||
)) {
|
||||
if job_meta.is_none()
|
||||
|| job_meta.is_some_and(|(_, h)| h + (5 * *heartbeat_interval) < now)
|
||||
|| job_meta.is_some_and(|(_, heartbeat_timestamp)| {
|
||||
heartbeat_timestamp + (5 * *heartbeat_interval) < now
|
||||
})
|
||||
{
|
||||
*job_meta = Some((runner_id, now));
|
||||
pop_job = Some(*job_id);
|
||||
|
@ -144,73 +163,60 @@ pub mod memory_storage {
|
|||
}
|
||||
|
||||
if let Some(job_id) = pop_job {
|
||||
return inner.jobs.get(&job_id).cloned();
|
||||
return inner
|
||||
.jobs
|
||||
.get(&job_id)
|
||||
.map(|(job_info, _)| job_info.clone());
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
|
||||
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||
let upper_bound = Uuid::now_v7();
|
||||
let job_id = JobId(job_id);
|
||||
let runner_id = RunnerId(runner_id);
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
let queue = if let Some(job) = inner.jobs.get(&job_id) {
|
||||
job.queue.clone()
|
||||
let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) {
|
||||
(job.queue.clone(), *queue_time_id)
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
for (_, (found_job_id, _, found_job_meta)) in inner.queue_jobs.range_mut((
|
||||
Bound::Excluded((queue.clone(), lower_bound)),
|
||||
Bound::Included((queue, upper_bound)),
|
||||
)) {
|
||||
if *found_job_id == job_id {
|
||||
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
|
||||
return;
|
||||
}
|
||||
if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) {
|
||||
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
|
||||
} else {
|
||||
metrics::counter!("background-jobs.core.heartbeat.missing-queue-job").increment(1);
|
||||
tracing::warn!("Missing job meta for {queue_key:?}");
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
|
||||
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||
let upper_bound = Uuid::now_v7();
|
||||
let job_id = JobId(job_id);
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
let job = inner.jobs.remove(&job_id)?;
|
||||
let (job, queue_time_id) = inner.jobs.remove(&job_id)?;
|
||||
let queue_key = (job.queue.clone(), queue_time_id);
|
||||
|
||||
let mut key = None;
|
||||
|
||||
for (found_key, (found_job_id, _, _)) in inner.queue_jobs.range_mut((
|
||||
Bound::Excluded((job.queue.clone(), lower_bound)),
|
||||
Bound::Included((job.queue.clone(), upper_bound)),
|
||||
)) {
|
||||
if *found_job_id == job_id {
|
||||
key = Some(found_key.clone());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(key) = key {
|
||||
if inner.queue_jobs.remove(&key).is_none() {
|
||||
tracing::warn!("failed to remove {key:?}");
|
||||
}
|
||||
if inner.queue_jobs.remove(&queue_key).is_none() {
|
||||
metrics::counter!("background-jobs.core.remove.missing-queue-job").increment(1);
|
||||
tracing::warn!("failed to remove job meta for {queue_key:?}");
|
||||
}
|
||||
|
||||
Some(job)
|
||||
}
|
||||
|
||||
fn insert(&self, job: JobInfo) -> Uuid {
|
||||
let id = job.id;
|
||||
let id = JobId(job.id);
|
||||
let queue = job.queue.clone();
|
||||
let queue_time_id = job.next_queue_id();
|
||||
let queue_time_id = QueueTimeId(job.next_queue_id());
|
||||
let heartbeat_interval = job.heartbeat_interval;
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
inner.jobs.insert(id, job);
|
||||
inner.jobs.insert(id, (job, queue_time_id));
|
||||
|
||||
inner.queue_jobs.insert(
|
||||
(queue.clone(), queue_time_id),
|
||||
|
@ -223,7 +229,7 @@ pub mod memory_storage {
|
|||
|
||||
inner.queues.entry(queue).or_default().notify(1);
|
||||
|
||||
id
|
||||
id.0
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue