From 3045f003b71c057c4e7c38afef3342f5c37b821a Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 8 Jan 2024 16:27:46 -0600 Subject: [PATCH] jobs-core: remove JobStatus, constify some methods, return whether job is finished in complete --- jobs-core/src/job_info.rs | 5 ++ jobs-core/src/lib.rs | 100 +++++++-------------------------- jobs-core/src/processor_map.rs | 10 ++-- jobs-core/src/storage.rs | 33 +++++++---- 4 files changed, 53 insertions(+), 95 deletions(-) diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index b4e417c..acb3a1b 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -92,6 +92,11 @@ impl NewJobInfo { &self.queue } + /// The name of this job + pub fn name(&self) -> &str { + &self.name + } + /// Whether this job is ready to be run immediately pub fn is_ready(&self) -> bool { self.next_queue.is_none() diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 9b1e89e..f4ef644 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -40,7 +40,7 @@ pub enum JobError { Unregistered, } -#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// Indicate the state of a job after an attempted run pub enum JobResult { /// The job succeeded @@ -58,100 +58,42 @@ pub enum JobResult { impl JobResult { /// Indicate a successful job - pub fn success() -> Self { + pub const fn success() -> Self { JobResult::Success } /// Indicate a failed job - pub fn failure() -> Self { + pub const fn failure() -> Self { JobResult::Failure } /// Indicate that the job was not registered for this worker - pub fn unregistered() -> Self { + pub const fn unregistered() -> Self { JobResult::Unregistered } /// Check if the job failed - pub fn is_failure(&self) -> bool { - *self == JobResult::Failure + pub const fn is_failure(self) -> bool { + matches!(self, JobResult::Failure) } /// Check if the job succeeded - pub fn is_success(&self) -> bool { - *self == JobResult::Success + pub const fn is_success(self) -> bool { + matches!(self, JobResult::Success) } /// Check if the job is missing it's processor - pub fn is_unregistered(&self) -> bool { - *self == JobResult::Unregistered + pub const fn is_unregistered(self) -> bool { + matches!(self, JobResult::Unregistered) } /// Check if the job was returned without an execution attempt - pub fn is_unexecuted(&self) -> bool { - *self == JobResult::Unexecuted + pub const fn is_unexecuted(self) -> bool { + matches!(self, JobResult::Unexecuted) } } -#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] -/// Set the status of a job when storing it -pub enum JobStatus { - /// Job should be queued - Pending, - - /// Job is running - Running, -} - -impl JobStatus { - /// The job should be queued - pub fn pending() -> Self { - JobStatus::Pending - } - - /// The job is running - pub fn running() -> Self { - JobStatus::Running - } - - /// Check if the job is ready to be queued - pub fn is_pending(&self) -> bool { - *self == JobStatus::Pending - } - - /// Check if the job is running - pub fn is_running(&self) -> bool { - *self == JobStatus::Running - } -} - -impl std::fmt::Display for JobStatus { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - JobStatus::Pending => write!(f, "Pending"), - JobStatus::Running => write!(f, "Running"), - } - } -} - -#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd, thiserror::Error)] -#[error("Invalid job status")] -/// The error generated when parsing a job's status if it's not 'Pending' or 'Running' -pub struct JobStatusError; - -impl std::str::FromStr for JobStatus { - type Err = JobStatusError; - - fn from_str(s: &str) -> Result { - match s { - "Pending" => Ok(JobStatus::Pending), - "Running" => Ok(JobStatus::Running), - _ => Err(JobStatusError), - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// Different styles for retrying jobs pub enum Backoff { /// Seconds between execution @@ -168,7 +110,7 @@ pub enum Backoff { Exponential(usize), } -#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] /// How many times a job should be retried before giving up pub enum MaxRetries { /// Keep retrying forever @@ -179,11 +121,11 @@ pub enum MaxRetries { } impl MaxRetries { - fn compare(&self, retry_count: u32) -> ShouldStop { - match *self { + fn compare(self, retry_count: u32) -> ShouldStop { + match self { MaxRetries::Infinite => ShouldStop::Requeue, - MaxRetries::Count(ref count) => { - if (retry_count as usize) <= *count { + MaxRetries::Count(count) => { + if (retry_count as usize) <= count { ShouldStop::Requeue } else { ShouldStop::LimitReached @@ -193,7 +135,7 @@ impl MaxRetries { } } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] /// A type that represents whether a job should be requeued pub enum ShouldStop { /// The job has hit the maximum allowed number of retries, and should be failed permanently @@ -205,8 +147,8 @@ pub enum ShouldStop { impl ShouldStop { /// A boolean representation of this state - pub fn should_requeue(&self) -> bool { - *self == ShouldStop::Requeue + pub const fn should_requeue(&self) -> bool { + matches!(self, ShouldStop::Requeue) } } diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 03fba37..30e0673 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -165,6 +165,8 @@ where { let args = job.args.clone(); let id = job.id; + let name = job.name.clone(); + let queue = job.queue.clone(); let start = Instant::now(); @@ -177,12 +179,12 @@ where let span = Span::current(); span.record("job.execution_time", &tracing::field::display(&seconds)); - metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue.clone(), "name" => job.name.clone()).record(seconds); + metrics::histogram!("background-jobs.job.execution_time", "queue" => queue.clone(), "name" => name.clone()).record(seconds); match res { Ok(Ok(_)) => { #[cfg(feature = "completion-logging")] - tracing::info!("Job completed"); + tracing::info!("Job {queue}: {name}-{id} completed"); ReturnJobInfo::pass(id) } @@ -192,7 +194,7 @@ where span.record("exception.message", &tracing::field::display(&display)); span.record("exception.details", &tracing::field::display(&debug)); #[cfg(feature = "error-logging")] - tracing::warn!("Job errored"); + tracing::warn!("Job {queue}: {name}-{id} errored"); ReturnJobInfo::fail(id) } Err(_) => { @@ -205,7 +207,7 @@ where &tracing::field::display("Job panicked"), ); #[cfg(feature = "error-logging")] - tracing::warn!("Job panicked"); + tracing::warn!("Job {queue}: {name}-{id} panicked"); ReturnJobInfo::fail(id) } } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 7561e27..c105f92 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -13,6 +13,9 @@ pub trait Storage: Clone + Send { /// The error type used by the storage mechansim. type Error: Error + Send + Sync; + /// Get the JobInfo for a given job ID + async fn info(&self, job_id: Uuid) -> Result, Self::Error>; + /// push a job into the queue async fn push(&self, job: NewJobInfo) -> Result; @@ -23,7 +26,9 @@ pub trait Storage: Clone + Send { async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>; /// "Return" a job to the database, marking it for retry if needed - async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), Self::Error>; + /// + /// returns `true` if the job has not been requeued + async fn complete(&self, return_job_info: ReturnJobInfo) -> Result; } /// A default, in-memory implementation of a storage mechanism @@ -83,6 +88,10 @@ pub mod memory_storage { } } + fn get(&self, job_id: Uuid) -> Option { + self.inner.lock().unwrap().jobs.get(&job_id).cloned() + } + fn listener(&self, pop_queue: String) -> (Pin>, Duration) { let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); let now = OffsetDateTime::now_utc(); @@ -216,6 +225,10 @@ pub mod memory_storage { impl super::Storage for Storage { type Error = Infallible; + async fn info(&self, job_id: Uuid) -> Result, Self::Error> { + Ok(self.get(job_id)) + } + /// push a job into the queue async fn push(&self, job: NewJobInfo) -> Result { Ok(self.insert(job.build())) @@ -251,29 +264,25 @@ pub mod memory_storage { async fn complete( &self, ReturnJobInfo { id, result }: ReturnJobInfo, - ) -> Result<(), Self::Error> { + ) -> Result { let mut job = if let Some(job) = self.remove_job(id) { job } else { - return Ok(()); + return Ok(true); }; match result { - JobResult::Success => { - // nothing - } - JobResult::Unregistered | JobResult::Unexecuted => { - // do stuff... - } + JobResult::Success => Ok(true), + JobResult::Unregistered | JobResult::Unexecuted => Ok(true), JobResult::Failure => { - // requeue if job.prepare_retry() { self.insert(job); + return Ok(false); + } else { + Ok(true) } } } - - Ok(()) } } }