jobs-sled: Update to new storage api

This commit is contained in:
asonix 2024-01-08 16:28:55 -06:00
parent cb4124b282
commit f73712c098

View file

@ -90,6 +90,10 @@ pub struct Storage {
impl background_jobs_core::Storage for Storage { impl background_jobs_core::Storage for Storage {
type Error = Error; type Error = Error;
async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
self.get(job_id)
}
async fn push(&self, job: NewJobInfo) -> Result<Uuid> { async fn push(&self, job: NewJobInfo) -> Result<Uuid> {
self.insert(job.build()) self.insert(job.build())
} }
@ -121,28 +125,29 @@ impl background_jobs_core::Storage for Storage {
self.set_heartbeat(job_id, runner_id) self.set_heartbeat(job_id, runner_id)
} }
async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<()> { async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<bool> {
let mut job = if let Some(job) = self.remove_job(id)? { let mut job = if let Some(job) = self.remove_job(id)? {
job job
} else { } else {
return Ok(()); return Ok(true);
}; };
match result { match result {
JobResult::Success => { JobResult::Success => {
// ok // ok
Ok(()) Ok(true)
} }
JobResult::Unexecuted | JobResult::Unregistered => { JobResult::Unexecuted | JobResult::Unregistered => {
// TODO: handle // TODO: handle
Ok(()) Ok(true)
} }
JobResult::Failure => { JobResult::Failure => {
if job.prepare_retry() { if job.prepare_retry() {
self.insert(job)?; self.insert(job)?;
Ok(false)
} else {
Ok(true)
} }
Ok(())
} }
} }
} }
@ -159,6 +164,16 @@ impl Storage {
}) })
} }
fn get(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
if let Some(ivec) = self.jobs.get(job_id.as_bytes())? {
let job_info = serde_cbor::from_slice(&ivec)?;
Ok(Some(job_info))
} else {
Ok(None)
}
}
fn notifier(&self, queue: String) -> Arc<Notify> { fn notifier(&self, queue: String) -> Arc<Notify> {
self.queues self.queues
.lock() .lock()