mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
Avoid unwraps by removing the need to parse
This commit is contained in:
parent
5c8b50643d
commit
3f6e27a9f7
1 changed files with 15 additions and 23 deletions
|
@ -134,11 +134,7 @@ impl Storage {
|
||||||
let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
||||||
|
|
||||||
if job.is_ready(now) {
|
if job.is_ready(now) {
|
||||||
self.run_job(
|
self.run_job(&buckets, inner_txn, key)?;
|
||||||
&buckets,
|
|
||||||
inner_txn,
|
|
||||||
std::str::from_utf8(key).unwrap().parse().unwrap(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
jobs.push(job);
|
jobs.push(job);
|
||||||
}
|
}
|
||||||
|
@ -163,11 +159,11 @@ impl Storage {
|
||||||
|
|
||||||
pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> {
|
pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> {
|
||||||
let job_id = match job.id() {
|
let job_id = match job.id() {
|
||||||
Some(id) => id,
|
Some(id) => id.to_string(),
|
||||||
None => {
|
None => {
|
||||||
let id = self.get_new_id()?;
|
let id = self.get_new_id()?;
|
||||||
job.set_id(id);
|
job.set_id(id);
|
||||||
id
|
id.to_string()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -198,10 +194,10 @@ impl Storage {
|
||||||
trace!("Set value");
|
trace!("Set value");
|
||||||
|
|
||||||
match status {
|
match status {
|
||||||
JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id)?,
|
JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref())?,
|
||||||
JobStatus::Running => self.run_job(&buckets, &mut txn, job_id)?,
|
JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref())?,
|
||||||
JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id)?,
|
JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref())?,
|
||||||
JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id)?,
|
JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id.as_ref())?,
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Committing");
|
trace!("Committing");
|
||||||
|
@ -216,7 +212,7 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.queued, txn, id)?;
|
self.add_job_to(&buckets.queued, txn, id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
|
@ -230,7 +226,7 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.failed, txn, id)?;
|
self.add_job_to(&buckets.failed, txn, id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
|
@ -244,7 +240,7 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.running, txn, id)?;
|
self.add_job_to(&buckets.running, txn, id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
|
@ -258,7 +254,7 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.finished, txn, id)?;
|
self.add_job_to(&buckets.finished, txn, id)?;
|
||||||
self.delete_job_from(&buckets.running, txn, id)?;
|
self.delete_job_from(&buckets.running, txn, id)?;
|
||||||
|
@ -272,13 +268,9 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
txn.set(
|
txn.set(bucket, id, Json::to_value_buf(self.runner_id)?)?;
|
||||||
bucket,
|
|
||||||
id.to_string().as_ref(),
|
|
||||||
Json::to_value_buf(self.runner_id)?,
|
|
||||||
)?;
|
|
||||||
trace!("Set value");
|
trace!("Set value");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -288,9 +280,9 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: usize,
|
id: &[u8],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match txn.del(bucket, id.to_string().as_ref()) {
|
match txn.del(bucket, id) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
Error::NotFound => (),
|
Error::NotFound => (),
|
||||||
|
|
Loading…
Reference in a new issue