From 4809c123c224c16e9e57dce2af20ed408b96262f Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 7 Jan 2024 18:52:09 -0600 Subject: [PATCH] Reimplement job storage --- jobs-actix/Cargo.toml | 6 +- jobs-actix/src/actix_job.rs | 202 +++++++++ jobs-actix/src/every.rs | 4 +- jobs-actix/src/lib.rs | 17 +- jobs-actix/src/server.rs | 27 +- jobs-actix/src/storage.rs | 24 +- jobs-actix/src/worker.rs | 10 +- jobs-core/Cargo.toml | 6 +- jobs-core/src/job_info.rs | 137 ++---- jobs-core/src/lib.rs | 6 +- jobs-core/src/processor_map.rs | 20 +- jobs-core/src/storage.rs | 414 ++++++++---------- jobs-core/src/{actix_job.rs => unsend_job.rs} | 72 ++- jobs-metrics/src/recorder.rs | 26 +- 14 files changed, 526 insertions(+), 445 deletions(-) create mode 100644 jobs-actix/src/actix_job.rs rename jobs-core/src/{actix_job.rs => unsend_job.rs} (64%) diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 890b548..4060fe8 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -13,9 +13,7 @@ edition = "2021" actix-rt = "2.5.1" anyhow = "1.0" async-trait = "0.1.24" -background-jobs-core = { version = "0.16.0", path = "../jobs-core", features = [ - "with-actix", -] } +background-jobs-core = { version = "0.16.0", path = "../jobs-core" } metrics = "0.22.0" tracing = "0.1" tracing-futures = "0.2" @@ -27,4 +25,4 @@ tokio = { version = "1", default-features = false, features = [ "rt", "sync", ] } -uuid = { version = "1", features = ["v4", "serde"] } +uuid = { version = "1", features = ["v7", "serde"] } diff --git a/jobs-actix/src/actix_job.rs b/jobs-actix/src/actix_job.rs new file mode 100644 index 0000000..38f8f57 --- /dev/null +++ b/jobs-actix/src/actix_job.rs @@ -0,0 +1,202 @@ +use std::future::Future; + +use anyhow::Error; +use background_jobs_core::{Backoff, JoinError, MaxRetries, UnsendJob, UnsendSpawner}; +use serde::{de::DeserializeOwned, ser::Serialize}; +use tracing::Span; + +pub struct ActixSpawner; + +#[doc(hidden)] +pub struct ActixHandle(actix_rt::task::JoinHandle); + +impl UnsendSpawner for ActixSpawner { + type Handle = ActixHandle where T: Send; + + fn spawn(future: Fut) -> Self::Handle + where + Fut: Future + 'static, + Fut::Output: Send + 'static, + { + ActixHandle(actix_rt::spawn(future)) + } +} + +impl Unpin for ActixHandle {} + +impl Future for ActixHandle { + type Output = Result; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let res = std::task::ready!(std::pin::Pin::new(&mut self.0).poll(cx)); + + std::task::Poll::Ready(res.map_err(|_| JoinError)) + } +} + +impl Drop for ActixHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +/// The UnsendJob trait defines parameters pertaining to an instance of a background job +/// +/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires +/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send +/// future +pub trait ActixJob: Serialize + DeserializeOwned + std::panic::UnwindSafe + 'static { + /// The application state provided to this job at runtime. + type State: Clone + 'static; + + /// The future returned by this job + /// + /// Importantly, this Future does not require Send + type Future: Future>; + + /// The name of the job + /// + /// This name must be unique!!! + const NAME: &'static str; + + /// The name of the default queue for this job + /// + /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, + /// the job will never be processed. + const QUEUE: &'static str = "default"; + + /// Define the default number of retries for this job + /// + /// Defaults to Count(5) + /// Jobs can override + const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); + + /// Define the default backoff strategy for this job + /// + /// Defaults to Exponential(2) + /// Jobs can override + const BACKOFF: Backoff = Backoff::Exponential(2); + + /// Define the maximum number of milliseconds a job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + /// + /// Defaults to 15 seconds + /// Jobs can override + const TIMEOUT: i64 = 15_000; + + /// Users of this library must define what it means to run a job. + /// + /// This should contain all the logic needed to complete a job. If that means queuing more + /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy + /// processes, that logic should all be called from inside this method. + /// + /// The state passed into this job is initialized at the start of the application. The state + /// argument could be useful for containing a hook into something like r2d2, or the address of + /// an actor in an actix-based system. + fn run(self, state: Self::State) -> Self::Future; + + /// Generate a Span that the job will be processed within + fn span(&self) -> Option { + None + } + + /// If this job should not use it's default queue, this can be overridden in + /// user-code. + fn queue(&self) -> &str { + Self::QUEUE + } + + /// If this job should not use it's default maximum retry count, this can be + /// overridden in user-code. + fn max_retries(&self) -> MaxRetries { + Self::MAX_RETRIES + } + + /// If this job should not use it's default backoff strategy, this can be + /// overridden in user-code. + fn backoff_strategy(&self) -> Backoff { + Self::BACKOFF + } + + /// Define the maximum number of milliseconds this job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + fn timeout(&self) -> i64 { + Self::TIMEOUT + } +} + +/// Provide helper methods for queuing ActixJobs +pub trait ActixJobExt: ActixJob { + /// Turn an ActixJob into a type that implements Job + fn into_job(self) -> ActixJobWrapper + where + Self: Sized, + { + ActixJobWrapper(self) + } +} + +impl ActixJobExt for T where T: ActixJob {} + +impl From for ActixJobWrapper +where + T: ActixJob, +{ + fn from(value: T) -> Self { + ActixJobWrapper(value) + } +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +// A wrapper for ActixJob implementing UnsendJob with an ActixSpawner +pub struct ActixJobWrapper(T); + +impl UnsendJob for ActixJobWrapper +where + T: ActixJob, +{ + type State = ::State; + + type Future = ::Future; + + type Spawner = ActixSpawner; + + const NAME: &'static str = ::NAME; + const QUEUE: &'static str = ::QUEUE; + const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; + const BACKOFF: Backoff = ::BACKOFF; + const TIMEOUT: i64 = ::TIMEOUT; + + fn run(self, state: Self::State) -> Self::Future { + ::run(self.0, state) + } + + fn span(&self) -> Option { + self.0.span() + } + + fn queue(&self) -> &str { + self.0.queue() + } + + fn max_retries(&self) -> MaxRetries { + self.0.max_retries() + } + + fn backoff_strategy(&self) -> Backoff { + self.0.backoff_strategy() + } + + fn timeout(&self) -> i64 { + self.0.timeout() + } +} diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 5410fef..5687444 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,4 +1,4 @@ -use crate::{Job, QueueHandle}; +use crate::{ActixJob, QueueHandle}; use actix_rt::time::{interval_at, Instant}; use std::time::Duration; @@ -10,7 +10,7 @@ use std::time::Duration; /// ``` pub(crate) async fn every(spawner: QueueHandle, duration: Duration, job: J) where - J: Job + Clone + Send, + J: ActixJob + Clone + Send, { let mut interval = interval_at(Instant::now(), duration); diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 60614c0..0cd4888 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -128,6 +128,7 @@ use std::{ }; use tokio::sync::Notify; +mod actix_job; mod every; mod server; mod storage; @@ -135,7 +136,7 @@ mod worker; use self::{every::every, server::Server}; -pub use background_jobs_core::ActixJob; +pub use actix_job::{ActixJob, ActixJobExt}; /// A timer implementation for the Memory Storage backend #[derive(Debug, Clone)] @@ -471,10 +472,10 @@ impl QueueHandle { /// job's queue is free to do so. pub async fn queue(&self, job: J) -> Result<(), Error> where - J: Job, + J: ActixJob, { - let job = new_job(job)?; - self.inner.new_job(job).await?; + let job = new_job(job.into_job())?; + self.inner.push(job).await?; Ok(()) } @@ -484,10 +485,10 @@ impl QueueHandle { /// and when a worker for the job's queue is free to do so. pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> where - J: Job, + J: ActixJob, { - let job = new_scheduled_job(job, after)?; - self.inner.new_job(job).await?; + let job = new_scheduled_job(job.into_job(), after)?; + self.inner.push(job).await?; Ok(()) } @@ -497,7 +498,7 @@ impl QueueHandle { /// processed whenever workers are free to do so. pub fn every(&self, duration: Duration, job: J) where - J: Job + Clone + Send + 'static, + J: ActixJob + Clone + Send + 'static, { actix_rt::spawn(every(self.clone(), duration, job)); } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 8f03d72..ff21c17 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,8 +1,8 @@ +use std::{ops::Deref, sync::Arc}; + +use background_jobs_core::Storage; + use crate::storage::{ActixStorage, StorageWrapper}; -use anyhow::Error; -use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage}; -use std::sync::Arc; -use uuid::Uuid; /// The server Actor /// @@ -23,21 +23,12 @@ impl Server { storage: Arc::new(StorageWrapper(storage)), } } +} - pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { - self.storage.new_job(job).await.map(|_| ()) - } +impl Deref for Server { + type Target = dyn ActixStorage; - pub(crate) async fn request_job( - &self, - worker_id: Uuid, - worker_queue: &str, - ) -> Result { - tracing::trace!("Worker {} requested job", worker_id); - self.storage.request_job(worker_queue, worker_id).await - } - - pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { - self.storage.return_job(job).await + fn deref(&self) -> &Self::Target { + self.storage.as_ref() } } diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs index bafd411..8077e3d 100644 --- a/jobs-actix/src/storage.rs +++ b/jobs-actix/src/storage.rs @@ -4,11 +4,13 @@ use uuid::Uuid; #[async_trait::async_trait] pub(crate) trait ActixStorage { - async fn new_job(&self, job: NewJobInfo) -> Result; + async fn push(&self, job: NewJobInfo) -> Result; - async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result; + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result; - async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>; + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error>; + + async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error>; } pub(crate) struct StorageWrapper(pub(crate) S) @@ -22,15 +24,19 @@ where S: Storage + Send + Sync, S::Error: Send + Sync + 'static, { - async fn new_job(&self, job: NewJobInfo) -> Result { - Ok(self.0.new_job(job).await?) + async fn push(&self, job: NewJobInfo) -> Result { + Ok(self.0.push(job).await?) } - async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result { - Ok(self.0.request_job(queue, runner_id).await?) + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { + Ok(self.0.pop(queue, runner_id).await?) } - async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> { - Ok(self.0.return_job(ret).await?) + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error> { + Ok(self.0.heartbeat(job_id, runner_id).await?) + } + + async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error> { + Ok(self.0.complete(ret).await?) } } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 10e4005..9fa86f6 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -93,7 +93,7 @@ pub(crate) async fn local_worker( extras: Some(extras), }; - let id = Uuid::new_v4(); + let id = Uuid::now_v7(); let log_on_drop = RunOnDrop(|| { make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing")); @@ -103,7 +103,7 @@ pub(crate) async fn local_worker( let request_span = make_span(id, &queue, "request"); let job = match request_span - .in_scope(|| server.request_job(id, &queue)) + .in_scope(|| server.pop(&queue, id)) .instrument(request_span.clone()) .await { @@ -123,7 +123,7 @@ pub(crate) async fn local_worker( drop(request_span); let process_span = make_span(id, &queue, "process"); - let job_id = job.id(); + let job_id = job.id; let return_job = process_span .in_scope(|| time_job(Box::pin(processors.process(job)), job_id)) .instrument(process_span) @@ -131,7 +131,7 @@ pub(crate) async fn local_worker( let return_span = make_span(id, &queue, "return"); if let Err(e) = return_span - .in_scope(|| server.return_job(return_job)) + .in_scope(|| server.complete(return_job)) .instrument(return_span.clone()) .await { @@ -156,7 +156,7 @@ fn make_span(id: Uuid, queue: &str, operation: &str) -> Span { "Worker", worker.id = tracing::field::display(id), worker.queue = tracing::field::display(queue), - worker.operation.id = tracing::field::display(&Uuid::new_v4()), + worker.operation.id = tracing::field::display(&Uuid::now_v7()), worker.operation.name = tracing::field::display(operation), exception.message = tracing::field::Empty, exception.details = tracing::field::Empty, diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index f1abf5a..5ed15b7 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -11,15 +11,13 @@ edition = "2021" [features] default = ["error-logging"] -with-actix = ["actix-rt"] completion-logging = [] error-logging = [] [dependencies] -actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" async-trait = "0.1.24" -event-listener = "2" +event-listener = "4" metrics = "0.22.0" time = { version = "0.3", features = ["serde-human-readable"] } tracing = "0.1" @@ -27,4 +25,4 @@ tracing-futures = "0.2.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -uuid = { version = "1", features = ["serde", "v4"] } +uuid = { version = "1.6", features = ["serde", "v7"] } diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 33e8a81..7c60914 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,8 +1,8 @@ -use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; +use crate::{Backoff, JobResult, MaxRetries, ShouldStop}; use serde_json::Value; use std::time::SystemTime; use time::{Duration, OffsetDateTime}; -use uuid::Uuid; +use uuid::{NoContext, Timestamp, Uuid}; #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] /// Information about the sate of an attempted job @@ -95,16 +95,15 @@ impl NewJobInfo { self.next_queue.is_none() } - pub(crate) fn with_id(self, id: Uuid) -> JobInfo { + pub(crate) fn build(self) -> JobInfo { JobInfo { - id, + id: Uuid::now_v7(), name: self.name, queue: self.queue, - status: JobStatus::Pending, args: self.args, retry_count: 0, max_retries: self.max_retries, - next_queue: self.next_queue, + next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()), backoff_strategy: self.backoff_strategy, updated_at: OffsetDateTime::now_utc(), timeout: self.timeout, @@ -112,6 +111,14 @@ impl NewJobInfo { } } +fn uuid_from_timestamp(timestamp: OffsetDateTime) -> Uuid { + let unix_seconds = timestamp.unix_timestamp().abs_diff(0); + let unix_nanos = (timestamp.unix_timestamp_nanos() % i128::from(timestamp.unix_timestamp())) + .abs_diff(0) as u32; + + Uuid::new_v7(Timestamp::from_unix(NoContext, unix_seconds, unix_nanos)) +} + #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] /// Metadata pertaining to a job that exists within the background_jobs system /// @@ -119,69 +126,39 @@ impl NewJobInfo { /// is impossible to create outside of the new_job method. pub struct JobInfo { /// ID of the job - id: Uuid, + pub id: Uuid, /// Name of the job - name: String, + pub name: String, /// Name of the queue that this job is a part of - queue: String, + pub queue: String, /// Arguments for a given job - args: Value, - - /// Status of the job - status: JobStatus, + pub args: Value, /// Retries left for this job, None means no limit - retry_count: u32, + pub retry_count: u32, /// the initial MaxRetries value, for comparing to the current retry count - max_retries: MaxRetries, + pub max_retries: MaxRetries, /// How often retries should be scheduled - backoff_strategy: Backoff, + pub backoff_strategy: Backoff, /// The time this job should be dequeued - next_queue: Option, + pub next_queue: OffsetDateTime, /// The time this job was last updated - updated_at: OffsetDateTime, + pub updated_at: OffsetDateTime, /// Milliseconds from execution until the job is considered dead /// /// This is important for storage implementations to reap unfinished jobs - timeout: i64, + pub timeout: i64, } impl JobInfo { - /// The name of the queue this job will run in - pub fn queue(&self) -> &str { - &self.queue - } - - fn updated(&mut self) { - self.updated_at = OffsetDateTime::now_utc(); - } - - pub(crate) fn name(&self) -> &str { - &self.name - } - - pub(crate) fn args(&self) -> Value { - self.args.clone() - } - - /// The ID of this job - pub fn id(&self) -> Uuid { - self.id - } - - /// How long (in milliseconds) before this job is considered failed and can be requeued - pub fn timeout(&self) -> i64 { - self.timeout - } - /// Convert a JobInfo into a ReturnJobInfo without executing it pub fn unexecuted(self) -> ReturnJobInfo { ReturnJobInfo { @@ -190,21 +167,23 @@ impl JobInfo { } } - /// If the job is queued to run in the future, when is that - pub fn next_queue(&self) -> Option { - self.next_queue.map(|time| time.into()) + /// Produce a UUID from the next_queue timestamp + pub fn next_queue_id(&self) -> Uuid { + uuid_from_timestamp(self.next_queue) } - pub(crate) fn increment(&mut self) -> ShouldStop { - self.updated(); + // Increment the retry-count and determine if the job should be requeued + fn increment(&mut self) -> ShouldStop { + self.updated_at = OffsetDateTime::now_utc(); self.retry_count += 1; self.max_retries.compare(self.retry_count) } + /// Update the timestamp on the JobInfo to reflect the next queue time fn set_next_queue(&mut self) { let now = OffsetDateTime::now_utc(); - let next_queue = match self.backoff_strategy { + self.next_queue = match self.backoff_strategy { Backoff::Linear(secs) => now + Duration::seconds(secs as i64), Backoff::Exponential(base) => { let secs = base.pow(self.retry_count); @@ -212,63 +191,19 @@ impl JobInfo { } }; - self.next_queue = Some(next_queue); - - tracing::trace!( - "Now {}, Next queue {}, ready {}", - now, - next_queue, - self.is_ready(now.into()), - ); + tracing::trace!("Now {}, Next queue {}", now, self.next_queue); } - /// Whether this job is ready to be run - pub fn is_ready(&self, now: SystemTime) -> bool { - match self.next_queue { - Some(ref time) => now > *time, - None => true, - } - } - - pub(crate) fn needs_retry(&mut self) -> bool { + /// Increment the retry-count and set next_queue based on the job's configuration + /// + /// returns `true` if the job should be retried + pub fn prepare_retry(&mut self) -> bool { let should_retry = self.increment().should_requeue(); if should_retry { - self.pending(); self.set_next_queue(); } should_retry } - - /// Whether this job is pending execution - pub fn is_pending(&self, now: SystemTime) -> bool { - self.status == JobStatus::Pending - || (self.status == JobStatus::Running - && (self.updated_at + Duration::milliseconds(self.timeout)) < now) - } - - /// Get the status of the job - pub fn status(&self) -> JobStatus { - self.status.clone() - } - - /// The the date of the most recent update - pub fn updated_at(&self) -> SystemTime { - self.updated_at.into() - } - - pub(crate) fn is_in_queue(&self, queue: &str) -> bool { - self.queue == queue - } - - pub(crate) fn run(&mut self) { - self.updated(); - self.status = JobStatus::Running; - } - - pub(crate) fn pending(&mut self) { - self.updated(); - self.status = JobStatus::Pending; - } } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 117f2ca..9b1e89e 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -8,13 +8,12 @@ use anyhow::Error; -#[cfg(feature = "with-actix")] -mod actix_job; mod catch_unwind; mod job; mod job_info; mod processor_map; mod storage; +mod unsend_job; pub use crate::{ job::{new_job, new_scheduled_job, process, Job}, @@ -23,8 +22,7 @@ pub use crate::{ storage::{memory_storage, Storage}, }; -#[cfg(feature = "with-actix")] -pub use actix_job::ActixJob; +pub use unsend_job::{JoinError, UnsendJob, UnsendSpawner}; #[derive(Debug, thiserror::Error)] /// The error type returned by the `process` method diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 92e55d4..03fba37 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -86,7 +86,7 @@ where let fut = async move { let opt = self .inner - .get(job.name()) + .get(&job.name) .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone())); let res = if let Some(fut) = opt { @@ -102,7 +102,7 @@ where &tracing::field::display("Not registered"), ); tracing::error!("Not registered"); - ReturnJobInfo::unregistered(job.id()) + ReturnJobInfo::unregistered(job.id) }; res @@ -124,7 +124,7 @@ where let span = job_span(&job); let fut = async move { - let res = if let Some(name) = self.inner.get(job.name()) { + let res = if let Some(name) = self.inner.get(&job.name) { process(Arc::clone(name), self.state.clone(), job).await } else { let span = Span::current(); @@ -137,7 +137,7 @@ where &tracing::field::display("Not registered"), ); tracing::error!("Not registered"); - ReturnJobInfo::unregistered(job.id()) + ReturnJobInfo::unregistered(job.id) }; res @@ -150,9 +150,9 @@ where fn job_span(job: &JobInfo) -> Span { tracing::info_span!( "Job", - execution_id = tracing::field::display(&Uuid::new_v4()), - job.id = tracing::field::display(&job.id()), - job.name = tracing::field::display(&job.name()), + execution_id = tracing::field::display(&Uuid::now_v7()), + job.id = tracing::field::display(&job.id), + job.name = tracing::field::display(&job.name), job.execution_time = tracing::field::Empty, exception.message = tracing::field::Empty, exception.details = tracing::field::Empty, @@ -163,8 +163,8 @@ async fn process(process_fn: ProcessFn, state: S, job: JobInfo) -> ReturnJ where S: Clone, { - let args = job.args(); - let id = job.id(); + let args = job.args.clone(); + let id = job.id; let start = Instant::now(); @@ -177,7 +177,7 @@ where let span = Span::current(); span.record("job.execution_time", &tracing::field::display(&seconds)); - metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue().to_string(), "name" => job.name().to_string()).record(seconds); + metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue.clone(), "name" => job.name.clone()).record(seconds); match res { Ok(Ok(_)) => { diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index aa29c5e..6575440 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,5 +1,5 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo}; -use std::{error::Error, time::SystemTime}; +use std::error::Error; use uuid::Uuid; /// Define a storage backend for jobs @@ -13,141 +13,36 @@ pub trait Storage: Clone + Send { /// The error type used by the storage mechansim. type Error: Error + Send + Sync; - /// This method generates unique IDs for jobs - async fn generate_id(&self) -> Result; + /// push a job into the queue + async fn push(&self, job: NewJobInfo) -> Result; - /// This method should store the supplied job - /// - /// The supplied job _may already be present_. The implementation should overwrite the stored - /// job with the new job so that future calls to `fetch_job` return the new one. - async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error>; + /// pop a job from the provided queue + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result; - /// This method should return the job with the given ID regardless of what state the job is in. - async fn fetch_job(&self, id: Uuid) -> Result, Self::Error>; - - /// This should fetch a job ready to be processed from the queue - /// - /// If a job is not ready, is currently running, or is not in the requested queue, this method - /// should not return it. If no jobs meet these criteria, this method wait until a job becomes available - async fn fetch_job_from_queue(&self, queue: &str) -> Result; - - /// This method tells the storage mechanism to mark the given job as being in the provided - /// queue - async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error>; - - /// This method tells the storage mechanism to mark a given job as running - async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>; - - /// This method tells the storage mechanism to remove the job - /// - /// This happens when a job has been completed or has failed too many times - async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error>; - - /// Generate a new job based on the provided NewJobInfo - async fn new_job(&self, job: NewJobInfo) -> Result { - let id = self.generate_id().await?; - - let job = job.with_id(id); - metrics::counter!("background-jobs.job.created", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - - let queue = job.queue().to_owned(); - self.save_job(job).await?; - self.queue_job(&queue, id).await?; - - Ok(id) - } - - /// Fetch a job that is ready to be executed, marking it as running - async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result { - loop { - let mut job = self.fetch_job_from_queue(queue).await?; - - let now = SystemTime::now(); - if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { - job.run(); - self.run_job(job.id(), runner_id).await?; - self.save_job(job.clone()).await?; - - metrics::counter!("background-jobs.job.started", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - - return Ok(job); - } else { - tracing::warn!( - "Not fetching job {}, it is not ready for processing", - job.id() - ); - self.queue_job(job.queue(), job.id()).await?; - } - } - } + /// mark a job as being actively worked on + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>; /// "Return" a job to the database, marking it for retry if needed - async fn return_job( - &self, - ReturnJobInfo { id, result }: ReturnJobInfo, - ) -> Result<(), Self::Error> { - if result.is_failure() { - if let Some(mut job) = self.fetch_job(id).await? { - if job.needs_retry() { - metrics::counter!("background-jobs.job.failed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - - self.queue_job(job.queue(), id).await?; - self.save_job(job).await - } else { - metrics::counter!("background-jobs.job.dead", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - - #[cfg(feature = "error-logging")] - tracing::warn!("Job {} failed permanently", id); - - self.delete_job(id).await - } - } else { - tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing").increment(1); - Ok(()) - } - } else if result.is_unregistered() || result.is_unexecuted() { - if let Some(mut job) = self.fetch_job(id).await? { - metrics::counter!("background-jobs.job.returned", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - - job.pending(); - self.queue_job(job.queue(), id).await?; - self.save_job(job).await - } else { - tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing").increment(1); - Ok(()) - } - } else { - if let Some(job) = self.fetch_job(id).await? { - metrics::counter!("background-jobs.job.completed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1); - } else { - tracing::warn!("Returned non-existant job"); - metrics::counter!("background-jobs.job.missing").increment(1); - } - - self.delete_job(id).await - } - } + async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), Self::Error>; } /// A default, in-memory implementation of a storage mechanism pub mod memory_storage { - use super::JobInfo; + use crate::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo}; + use event_listener::{Event, EventListener}; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, convert::Infallible, future::Future, + ops::Bound, + pin::Pin, sync::Arc, sync::Mutex, - time::{Duration, SystemTime}, + time::Duration, }; - use uuid::Uuid; + use time::OffsetDateTime; + use uuid::{NoContext, Timestamp, Uuid}; /// Allows memory storage to set timeouts for when to retry checking a queue for a job #[async_trait::async_trait] @@ -165,12 +60,14 @@ pub mod memory_storage { inner: Arc>, } + type OrderedKey = (String, Uuid); + type JobState = Option<(Uuid, OffsetDateTime)>; + type JobMeta = (Uuid, JobState); + struct Inner { queues: HashMap, jobs: HashMap, - job_queues: HashMap, - worker_ids: HashMap, - worker_ids_inverse: HashMap, + queue_jobs: BTreeMap, } impl Storage { @@ -180,167 +77,202 @@ pub mod memory_storage { inner: Arc::new(Mutex::new(Inner { queues: HashMap::new(), jobs: HashMap::new(), - job_queues: HashMap::new(), - worker_ids: HashMap::new(), - worker_ids_inverse: HashMap::new(), + queue_jobs: BTreeMap::new(), })), timer, } } - fn contains_job(&self, uuid: &Uuid) -> bool { - self.inner.lock().unwrap().jobs.contains_key(uuid) - } + fn listener(&self, queue: String) -> (Pin>, Duration) { + let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); + let upper_bound = Uuid::now_v7(); - fn insert_job(&self, job: JobInfo) { - self.inner.lock().unwrap().jobs.insert(job.id(), job); - } - - fn get_job(&self, id: &Uuid) -> Option { - self.inner.lock().unwrap().jobs.get(id).cloned() - } - - #[tracing::instrument(skip(self))] - fn try_deque(&self, queue: &str, now: SystemTime) -> Option { let mut inner = self.inner.lock().unwrap(); - let j = inner.job_queues.iter().find_map(|(k, v)| { - if v == queue { - let job = inner.jobs.get(k)?; + let listener = inner.queues.entry(queue.clone()).or_default().listen(); - if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { - return Some(job.clone()); + let next_job = inner + .queue_jobs + .range(( + Bound::Excluded((queue.clone(), lower_bound)), + Bound::Included((queue, upper_bound)), + )) + .find_map(|(_, (id, meta))| { + if meta.is_none() { + inner.jobs.get(id) + } else { + None } - } + }); + let duration = if let Some(job) = next_job { + let duration = OffsetDateTime::now_utc() - job.next_queue; + duration.try_into().ok() + } else { None - }); + }; - if let Some(job) = j { - inner.job_queues.remove(&job.id()); - return Some(job); + (listener, duration.unwrap_or(Duration::from_secs(10))) + } + + fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option { + let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); + let upper_bound = Uuid::now_v7(); + let now = time::OffsetDateTime::now_utc(); + + let mut inner = self.inner.lock().unwrap(); + + let mut pop_job = None; + + for (_, (job_id, job_meta)) in inner.queue_jobs.range_mut(( + Bound::Excluded((queue.to_string(), lower_bound)), + Bound::Included((queue.to_string(), upper_bound)), + )) { + if job_meta.is_none() + || job_meta.is_some_and(|(_, h)| h + time::Duration::seconds(30) < now) + { + *job_meta = Some((runner_id, now)); + pop_job = Some(*job_id); + break; + } + } + + if let Some(job_id) = pop_job { + return inner.jobs.get(&job_id).cloned(); } None } - #[tracing::instrument(skip(self))] - fn listener(&self, queue: &str, now: SystemTime) -> (Duration, EventListener) { + fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) { + let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); + let upper_bound = Uuid::now_v7(); + let mut inner = self.inner.lock().unwrap(); - let duration = - inner - .job_queues - .iter() - .fold(Duration::from_secs(5), |duration, (id, v_queue)| { - if v_queue == queue { - if let Some(job) = inner.jobs.get(id) { - if let Some(ready_at) = job.next_queue() { - let job_eta = ready_at - .duration_since(now) - .unwrap_or(Duration::from_secs(0)); + let queue = if let Some(job) = inner.jobs.get(&job_id) { + job.queue.clone() + } else { + return; + }; - if job_eta < duration { - return job_eta; - } - } - } - } - - duration - }); - - let listener = inner.queues.entry(queue.to_string()).or_default().listen(); - - (duration, listener) - } - - fn queue_and_notify(&self, queue: &str, id: Uuid) { - let mut inner = self.inner.lock().unwrap(); - - inner.job_queues.insert(id, queue.to_owned()); - - inner.queues.entry(queue.to_string()).or_default().notify(1); - } - - fn mark_running(&self, job_id: Uuid, worker_id: Uuid) { - let mut inner = self.inner.lock().unwrap(); - - inner.worker_ids.insert(job_id, worker_id); - inner.worker_ids_inverse.insert(worker_id, job_id); - } - - fn purge_job(&self, job_id: Uuid) { - let mut inner = self.inner.lock().unwrap(); - - inner.jobs.remove(&job_id); - inner.job_queues.remove(&job_id); - - if let Some(worker_id) = inner.worker_ids.remove(&job_id) { - inner.worker_ids_inverse.remove(&worker_id); + for (_, (found_job_id, found_job_meta)) in inner.queue_jobs.range_mut(( + Bound::Excluded((queue.clone(), lower_bound)), + Bound::Included((queue, upper_bound)), + )) { + if *found_job_id == job_id { + *found_job_meta = Some((runner_id, OffsetDateTime::now_utc())); + return; + } } } + + fn remove_job(&self, job_id: Uuid) -> Option { + let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); + let upper_bound = Uuid::now_v7(); + + let mut inner = self.inner.lock().unwrap(); + + let job = inner.jobs.remove(&job_id)?; + + let mut key = None; + + for (found_key, (found_job_id, _)) in inner.queue_jobs.range_mut(( + Bound::Excluded((job.queue.clone(), lower_bound)), + Bound::Included((job.queue.clone(), upper_bound)), + )) { + if *found_job_id == job_id { + key = Some(found_key.clone()); + break; + } + } + + if let Some(key) = key { + inner.queue_jobs.remove(&key); + } + + Some(job) + } + + fn insert(&self, job: JobInfo) -> Uuid { + let id = job.id; + let queue = job.queue.clone(); + let queue_time_id = job.next_queue_id(); + + let mut inner = self.inner.lock().unwrap(); + + inner.jobs.insert(id, job); + + inner + .queue_jobs + .insert((queue.clone(), queue_time_id), (id, None)); + + inner.queues.entry(queue).or_default().notify(1); + + id + } } #[async_trait::async_trait] impl super::Storage for Storage { type Error = Infallible; - async fn generate_id(&self) -> Result { - let uuid = loop { - let uuid = Uuid::new_v4(); - if !self.contains_job(&uuid) { - break uuid; - } - }; - - Ok(uuid) + /// push a job into the queue + async fn push(&self, job: NewJobInfo) -> Result { + Ok(self.insert(job.build())) } - async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> { - self.insert_job(job); - - Ok(()) - } - - async fn fetch_job(&self, id: Uuid) -> Result, Self::Error> { - Ok(self.get_job(&id)) - } - - #[tracing::instrument(skip(self))] - async fn fetch_job_from_queue(&self, queue: &str) -> Result { + /// pop a job from the provided queue + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { loop { - let now = SystemTime::now(); + let (listener, duration) = self.listener(queue.to_string()); - if let Some(job) = self.try_deque(queue, now) { + if let Some(job) = self.try_pop(queue, runner_id) { return Ok(job); } - tracing::debug!("No job ready in queue"); - let (duration, listener) = self.listener(queue, now); - tracing::debug!("waiting at most {} seconds", duration.as_secs()); - - if duration > Duration::from_secs(0) { - let _ = self.timer.timeout(duration, listener).await; + match self.timer.timeout(duration, listener).await { + Ok(()) => { + // listener wakeup + } + Err(()) => { + // timeout + } } - tracing::debug!("Finished waiting, trying dequeue"); } } - async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> { - self.queue_and_notify(queue, id); - + /// mark a job as being actively worked on + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> { + self.set_heartbeat(job_id, runner_id); Ok(()) } - async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> { - self.mark_running(id, worker_id); + /// "Return" a job to the database, marking it for retry if needed + async fn complete( + &self, + ReturnJobInfo { id, result }: ReturnJobInfo, + ) -> Result<(), Self::Error> { + let mut job = if let Some(job) = self.remove_job(id) { + job + } else { + return Ok(()); + }; - Ok(()) - } - - async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> { - self.purge_job(id); + match result { + JobResult::Success => { + // nothing + } + JobResult::Unregistered | JobResult::Unexecuted => { + // do stuff... + } + JobResult::Failure => { + // requeue + if job.prepare_retry() { + self.insert(job); + } + } + } Ok(()) } diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/unsend_job.rs similarity index 64% rename from jobs-core/src/actix_job.rs rename to jobs-core/src/unsend_job.rs index efd3cc6..77d5aa4 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/unsend_job.rs @@ -1,5 +1,4 @@ use crate::{Backoff, Job, MaxRetries}; -use actix_rt::task::JoinHandle; use anyhow::Error; use serde::{de::DeserializeOwned, ser::Serialize}; use std::{ @@ -11,11 +10,39 @@ use std::{ use tracing::Span; use tracing_futures::Instrument; -/// The ActixJob trait defines parameters pertaining to an instance of background job +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// The type produced when a task is dropped before completion as a result of being deliberately +/// canceled, or it panicking +pub struct JoinError; + +impl std::fmt::Display for JoinError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Task has been canceled") + } +} + +impl std::error::Error for JoinError {} + +/// The mechanism used to spawn Unsend futures, making them Send +pub trait UnsendSpawner { + /// The Handle to the job, implements a Send future with the Job's output + type Handle: Future> + Send + Unpin + where + T: Send; + + /// Spawn the unsend future producing a Send handle + fn spawn(future: Fut) -> Self::Handle + where + Fut: Future + 'static, + Fut::Output: Send + 'static; +} + +/// The UnsendJob trait defines parameters pertaining to an instance of a background job /// -/// This trait is specific to Actix, and will automatically implement the Job trait with the -/// proper translation from ?Send futures to Send futures -pub trait ActixJob: Serialize + DeserializeOwned + 'static { +/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires +/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send +/// future +pub trait UnsendJob: Serialize + DeserializeOwned + 'static { /// The application state provided to this job at runtime. type State: Clone + 'static; @@ -24,6 +51,9 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { /// Importantly, this Future does not require Send type Future: Future>; + /// The spawner type that will be used to spawn the unsend future + type Spawner: UnsendSpawner; + /// The name of the job /// /// This name must be unique!!! @@ -118,42 +148,40 @@ where impl Job for T where - T: ActixJob + std::panic::UnwindSafe, + T: UnsendJob + std::panic::UnwindSafe, { type State = T::State; - type Future = UnwrapFuture>>; + type Future = UnwrapFuture<::Handle>>; - const NAME: &'static str = ::NAME; - const QUEUE: &'static str = ::QUEUE; - const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; - const BACKOFF: Backoff = ::BACKOFF; - const TIMEOUT: i64 = ::TIMEOUT; + const NAME: &'static str = ::NAME; + const QUEUE: &'static str = ::QUEUE; + const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; + const BACKOFF: Backoff = ::BACKOFF; + const TIMEOUT: i64 = ::TIMEOUT; fn run(self, state: Self::State) -> Self::Future { - let fut = ActixJob::run(self, state); - let instrumented = fut.instrument(Span::current()); - let handle = actix_rt::spawn(instrumented); - - UnwrapFuture(handle) + UnwrapFuture(T::Spawner::spawn( + UnsendJob::run(self, state).instrument(Span::current()), + )) } fn span(&self) -> Option { - ActixJob::span(self) + UnsendJob::span(self) } fn queue(&self) -> &str { - ActixJob::queue(self) + UnsendJob::queue(self) } fn max_retries(&self) -> MaxRetries { - ActixJob::max_retries(self) + UnsendJob::max_retries(self) } fn backoff_strategy(&self) -> Backoff { - ActixJob::backoff_strategy(self) + UnsendJob::backoff_strategy(self) } fn timeout(&self) -> i64 { - ActixJob::timeout(self) + UnsendJob::timeout(self) } } diff --git a/jobs-metrics/src/recorder.rs b/jobs-metrics/src/recorder.rs index 7035a74..9d107f9 100644 --- a/jobs-metrics/src/recorder.rs +++ b/jobs-metrics/src/recorder.rs @@ -326,30 +326,22 @@ impl HistogramFn for Histogram { fn record(&self, _: f64) {} } -const SECONDS: u64 = 1; -const MINUTES: u64 = 60 * SECONDS; -const HOURS: u64 = 60 * MINUTES; -const DAYS: u64 = 24 * HOURS; -const MONTHS: u64 = 30 * DAYS; +const SECOND: u64 = 1; +const MINUTE: u64 = 60 * SECOND; +const HOUR: u64 = 60 * MINUTE; +const DAY: u64 = 24 * HOUR; +const MONTH: u64 = 30 * DAY; impl Default for JobStatStorage { fn default() -> Self { JobStatStorage { hour: Buckets::new( - Duration::from_secs(1 * HOURS), - Duration::from_secs(3 * MINUTES), + Duration::from_secs(HOUR), + Duration::from_secs(3 * MINUTE), 20, ), - day: Buckets::new( - Duration::from_secs(1 * DAYS), - Duration::from_secs(1 * HOURS), - 24, - ), - month: Buckets::new( - Duration::from_secs(1 * MONTHS), - Duration::from_secs(1 * DAYS), - 30, - ), + day: Buckets::new(Duration::from_secs(DAY), Duration::from_secs(HOUR), 24), + month: Buckets::new(Duration::from_secs(MONTH), Duration::from_secs(DAY), 30), total: 0, } }