diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index b0dfe65..cf54e64 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -11,7 +11,7 @@ actix = "0.10.0-alpha.2" actix-rt = "1.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs = { version = "0.7.0", path = "../.." } +background-jobs = { version = "0.8.0-alpha.0", path = "../.." } env_logger = "0.7" sled-extensions = "0.2.0" serde = { version = "1.0", features = ["derive"] } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 77055d5..e41dbea 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -44,4 +44,13 @@ pub trait Job: Serialize + DeserializeOwned + 'static { fn backoff_strategy(&self) -> Option { None } + + /// Define the maximum number of milliseconds this job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + fn timeout(&self) -> Option { + None + } } diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 1f97752..ef0b471 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,5 +1,5 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; -use chrono::{offset::Utc, DateTime, Duration as OldDuration}; +use chrono::{offset::Utc, DateTime, Duration}; use log::trace; use serde_json::Value; @@ -53,6 +53,11 @@ pub struct NewJobInfo { /// The time this job should be dequeued next_queue: Option>, + + /// Milliseconds from execution until the job is considered dead + /// + /// This is important for storage implementations to reap unfinished jobs + timeout: i64, } impl NewJobInfo { @@ -66,6 +71,7 @@ impl NewJobInfo { args: Value, max_retries: MaxRetries, backoff_strategy: Backoff, + timeout: i64, ) -> Self { NewJobInfo { processor, @@ -74,6 +80,7 @@ impl NewJobInfo { max_retries, next_queue: None, backoff_strategy, + timeout, } } @@ -99,6 +106,7 @@ impl NewJobInfo { next_queue: self.next_queue, backoff_strategy: self.backoff_strategy, updated_at: Utc::now(), + timeout: self.timeout, } } } @@ -140,6 +148,11 @@ pub struct JobInfo { /// The time this job was last updated updated_at: DateTime, + + /// Milliseconds from execution until the job is considered dead + /// + /// This is important for storage implementations to reap unfinished jobs + timeout: i64, } impl JobInfo { @@ -183,10 +196,10 @@ impl JobInfo { let now = Utc::now(); let next_queue = match self.backoff_strategy { - Backoff::Linear(secs) => now + OldDuration::seconds(secs as i64), + Backoff::Linear(secs) => now + Duration::seconds(secs as i64), Backoff::Exponential(base) => { let secs = base.pow(self.retry_count); - now + OldDuration::seconds(secs as i64) + now + Duration::seconds(secs as i64) } }; @@ -220,8 +233,10 @@ impl JobInfo { } /// Whether this job is pending execution - pub fn is_pending(&self) -> bool { + pub fn is_pending(&self, now: DateTime) -> bool { self.status == JobStatus::Pending + || (self.status == JobStatus::Running + && (self.updated_at + Duration::milliseconds(self.timeout)) < now) } pub(crate) fn is_in_queue(&self, queue: &str) -> bool { diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index ab8d130..85b90c4 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -80,13 +80,25 @@ pub trait Processor: Clone { /// Define the default number of retries for a given processor /// + /// Defaults to Count(5) /// Jobs can override - const MAX_RETRIES: MaxRetries; + const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); /// Define the default backoff strategy for a given processor /// + /// Defaults to Exponential(2) /// Jobs can override - const BACKOFF_STRATEGY: Backoff; + const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); + + /// Define the maximum number of milliseconds a job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + /// + /// Defaults to 15 seconds + /// Jobs can override + const TIMEOUT: i64 = 15_000; /// A provided method to create a new JobInfo from provided arguments /// @@ -96,6 +108,7 @@ pub trait Processor: Clone { let queue = job.queue().unwrap_or(Self::QUEUE).to_owned(); let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES); let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY); + let timeout = job.timeout().unwrap_or(Self::TIMEOUT); let job = NewJobInfo::new( Self::NAME.to_owned(), @@ -103,6 +116,7 @@ pub trait Processor: Clone { serde_json::to_value(job).map_err(|_| ToJson)?, max_retries, backoff_strategy, + timeout, ); Ok(job) diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 5e79a17..42cc8fb 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -76,7 +76,8 @@ pub trait Storage: Clone + Send { ) -> Result, Self::Error> { match self.fetch_job_from_queue(queue).await? { Some(mut job) => { - if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) { + let now = Utc::now(); + if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { job.run(); self.run_job(job.id(), runner_id).await?; self.save_job(job.clone()).await?; diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 94f6945..bcd2d45 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-sled-storage" description = "Sled storage backend for background-jobs" -version = "0.3.0" +version = "0.4.0-alpha.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index cca92f2..db20f00 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -98,7 +98,7 @@ impl Storage for SledStorage { ) .filter_map(|id| job_tree.get(id).ok()) .filter_map(|opt| opt) - .filter(|job| job.is_ready(now)) + .filter(|job| job.is_ready(now) && job.is_pending(now)) .next(); if let Some(ref job) = job {