jobs-core: change 'timeout' to 'heartbeat_interval'

This commit is contained in:
asonix 2024-01-10 15:06:36 -06:00
parent 0f8b279e3f
commit 63ee0d7cb7
4 changed files with 33 additions and 33 deletions

View file

@ -74,15 +74,14 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// Jobs can override /// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2); const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
/// ///
/// Defaults to 15 seconds /// Defaults to 5 seconds
/// Jobs can override /// Jobs can override
const TIMEOUT: u64 = 15_000; const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
@ -118,13 +117,12 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
Self::BACKOFF Self::BACKOFF
} }
/// Define the maximum number of milliseconds this job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
Self::TIMEOUT Self::HEARTBEAT_INTERVAL
} }
} }
@ -138,7 +136,7 @@ where
job.queue().to_owned(), job.queue().to_owned(),
job.max_retries(), job.max_retries(),
job.backoff_strategy(), job.backoff_strategy(),
job.timeout(), job.heartbeat_interval(),
serde_json::to_value(job).map_err(|_| ToJson)?, serde_json::to_value(job).map_err(|_| ToJson)?,
); );

View file

@ -60,7 +60,7 @@ pub struct NewJobInfo {
/// Milliseconds from execution until the job is considered dead /// Milliseconds from execution until the job is considered dead
/// ///
/// This is important for storage implementations to reap unfinished jobs /// This is important for storage implementations to reap unfinished jobs
timeout: u64, heartbeat_interval: u64,
} }
impl NewJobInfo { impl NewJobInfo {
@ -73,7 +73,7 @@ impl NewJobInfo {
queue: String, queue: String,
max_retries: MaxRetries, max_retries: MaxRetries,
backoff_strategy: Backoff, backoff_strategy: Backoff,
timeout: u64, heartbeat_interval: u64,
args: Value, args: Value,
) -> Self { ) -> Self {
NewJobInfo { NewJobInfo {
@ -83,7 +83,7 @@ impl NewJobInfo {
max_retries, max_retries,
next_queue: None, next_queue: None,
backoff_strategy, backoff_strategy,
timeout, heartbeat_interval,
} }
} }
@ -113,7 +113,7 @@ impl NewJobInfo {
max_retries: self.max_retries, max_retries: self.max_retries,
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()), next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
backoff_strategy: self.backoff_strategy, backoff_strategy: self.backoff_strategy,
timeout: self.timeout, heartbeat_interval: self.heartbeat_interval,
} }
} }
} }
@ -159,7 +159,7 @@ pub struct JobInfo {
/// Milliseconds from execution until the job is considered dead /// Milliseconds from execution until the job is considered dead
/// ///
/// This is important for storage implementations to reap unfinished jobs /// This is important for storage implementations to reap unfinished jobs
pub timeout: u64, pub heartbeat_interval: u64,
} }
impl JobInfo { impl JobInfo {

View file

@ -272,16 +272,20 @@ pub mod memory_storage {
}; };
match result { match result {
// successful jobs are removed
JobResult::Success => Ok(true), JobResult::Success => Ok(true),
JobResult::Unregistered | JobResult::Unexecuted => Ok(true), // Unregistered or Unexecuted jobs are restored as-is
JobResult::Failure => { JobResult::Unregistered | JobResult::Unexecuted => {
if job.prepare_retry() {
self.insert(job); self.insert(job);
return Ok(false); Ok(false)
} else {
Ok(true)
} }
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job);
Ok(false)
} }
// dead jobs are removed
JobResult::Failure => Ok(true),
} }
} }
} }

View file

@ -76,15 +76,14 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
/// Jobs can override /// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2); const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
/// ///
/// Defaults to 15 seconds /// Defaults to 5 seconds
/// Jobs can override /// Jobs can override
const TIMEOUT: u64 = 15_000; const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
@ -120,13 +119,12 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
Self::BACKOFF Self::BACKOFF
} }
/// Define the maximum number of milliseconds this job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
Self::TIMEOUT Self::HEARTBEAT_INTERVAL
} }
} }
@ -156,7 +154,7 @@ where
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE; const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES; const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF; const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
const TIMEOUT: u64 = <Self as UnsendJob>::TIMEOUT; const HEARTBEAT_INTERVAL: u64 = <Self as UnsendJob>::HEARTBEAT_INTERVAL;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
UnwrapFuture(T::Spawner::spawn( UnwrapFuture(T::Spawner::spawn(
@ -180,7 +178,7 @@ where
UnsendJob::backoff_strategy(self) UnsendJob::backoff_strategy(self)
} }
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
UnsendJob::timeout(self) UnsendJob::heartbeat_interval(self)
} }
} }