Reimplement job storage

This commit is contained in:
asonix 2024-01-07 18:52:09 -06:00
parent 21c98d607f
commit 4809c123c2
14 changed files with 526 additions and 445 deletions

View file

@ -13,9 +13,7 @@ edition = "2021"
actix-rt = "2.5.1"
anyhow = "1.0"
async-trait = "0.1.24"
background-jobs-core = { version = "0.16.0", path = "../jobs-core", features = [
"with-actix",
] }
background-jobs-core = { version = "0.16.0", path = "../jobs-core" }
metrics = "0.22.0"
tracing = "0.1"
tracing-futures = "0.2"
@ -27,4 +25,4 @@ tokio = { version = "1", default-features = false, features = [
"rt",
"sync",
] }
uuid = { version = "1", features = ["v4", "serde"] }
uuid = { version = "1", features = ["v7", "serde"] }

202
jobs-actix/src/actix_job.rs Normal file
View file

@ -0,0 +1,202 @@
use std::future::Future;
use anyhow::Error;
use background_jobs_core::{Backoff, JoinError, MaxRetries, UnsendJob, UnsendSpawner};
use serde::{de::DeserializeOwned, ser::Serialize};
use tracing::Span;
pub struct ActixSpawner;
#[doc(hidden)]
pub struct ActixHandle<T>(actix_rt::task::JoinHandle<T>);
impl UnsendSpawner for ActixSpawner {
type Handle<T> = ActixHandle<T> where T: Send;
fn spawn<Fut>(future: Fut) -> Self::Handle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
ActixHandle(actix_rt::spawn(future))
}
}
impl<T> Unpin for ActixHandle<T> {}
impl<T> Future for ActixHandle<T> {
type Output = Result<T, JoinError>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(std::pin::Pin::new(&mut self.0).poll(cx));
std::task::Poll::Ready(res.map_err(|_| JoinError))
}
}
impl<T> Drop for ActixHandle<T> {
fn drop(&mut self) {
self.0.abort();
}
}
/// The UnsendJob trait defines parameters pertaining to an instance of a background job
///
/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires
/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send
/// future
pub trait ActixJob: Serialize + DeserializeOwned + std::panic::UnwindSafe + 'static {
/// The application state provided to this job at runtime.
type State: Clone + 'static;
/// The future returned by this job
///
/// Importantly, this Future does not require Send
type Future: Future<Output = Result<(), Error>>;
/// The name of the job
///
/// This name must be unique!!!
const NAME: &'static str;
/// The name of the default queue for this job
///
/// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
/// the job will never be processed.
const QUEUE: &'static str = "default";
/// Define the default number of retries for this job
///
/// Defaults to Count(5)
/// Jobs can override
const MAX_RETRIES: MaxRetries = MaxRetries::Count(5);
/// Define the default backoff strategy for this job
///
/// Defaults to Exponential(2)
/// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being
/// considered dead.
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
///
/// Defaults to 15 seconds
/// Jobs can override
const TIMEOUT: i64 = 15_000;
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
/// processes, that logic should all be called from inside this method.
///
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: Self::State) -> Self::Future;
/// Generate a Span that the job will be processed within
fn span(&self) -> Option<Span> {
None
}
/// If this job should not use it's default queue, this can be overridden in
/// user-code.
fn queue(&self) -> &str {
Self::QUEUE
}
/// If this job should not use it's default maximum retry count, this can be
/// overridden in user-code.
fn max_retries(&self) -> MaxRetries {
Self::MAX_RETRIES
}
/// If this job should not use it's default backoff strategy, this can be
/// overridden in user-code.
fn backoff_strategy(&self) -> Backoff {
Self::BACKOFF
}
/// Define the maximum number of milliseconds this job should be allowed to run before being
/// considered dead.
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
fn timeout(&self) -> i64 {
Self::TIMEOUT
}
}
/// Provide helper methods for queuing ActixJobs
pub trait ActixJobExt: ActixJob {
/// Turn an ActixJob into a type that implements Job
fn into_job(self) -> ActixJobWrapper<Self>
where
Self: Sized,
{
ActixJobWrapper(self)
}
}
impl<T> ActixJobExt for T where T: ActixJob {}
impl<T> From<T> for ActixJobWrapper<T>
where
T: ActixJob,
{
fn from(value: T) -> Self {
ActixJobWrapper(value)
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
// A wrapper for ActixJob implementing UnsendJob with an ActixSpawner
pub struct ActixJobWrapper<T>(T);
impl<T> UnsendJob for ActixJobWrapper<T>
where
T: ActixJob,
{
type State = <T as ActixJob>::State;
type Future = <T as ActixJob>::Future;
type Spawner = ActixSpawner;
const NAME: &'static str = <T as ActixJob>::NAME;
const QUEUE: &'static str = <T as ActixJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <T as ActixJob>::MAX_RETRIES;
const BACKOFF: Backoff = <T as ActixJob>::BACKOFF;
const TIMEOUT: i64 = <T as ActixJob>::TIMEOUT;
fn run(self, state: Self::State) -> Self::Future {
<T as ActixJob>::run(self.0, state)
}
fn span(&self) -> Option<Span> {
self.0.span()
}
fn queue(&self) -> &str {
self.0.queue()
}
fn max_retries(&self) -> MaxRetries {
self.0.max_retries()
}
fn backoff_strategy(&self) -> Backoff {
self.0.backoff_strategy()
}
fn timeout(&self) -> i64 {
self.0.timeout()
}
}

View file

@ -1,4 +1,4 @@
use crate::{Job, QueueHandle};
use crate::{ActixJob, QueueHandle};
use actix_rt::time::{interval_at, Instant};
use std::time::Duration;
@ -10,7 +10,7 @@ use std::time::Duration;
/// ```
pub(crate) async fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
where
J: Job + Clone + Send,
J: ActixJob + Clone + Send,
{
let mut interval = interval_at(Instant::now(), duration);

View file

@ -128,6 +128,7 @@ use std::{
};
use tokio::sync::Notify;
mod actix_job;
mod every;
mod server;
mod storage;
@ -135,7 +136,7 @@ mod worker;
use self::{every::every, server::Server};
pub use background_jobs_core::ActixJob;
pub use actix_job::{ActixJob, ActixJobExt};
/// A timer implementation for the Memory Storage backend
#[derive(Debug, Clone)]
@ -471,10 +472,10 @@ impl QueueHandle {
/// job's queue is free to do so.
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
where
J: Job,
J: ActixJob,
{
let job = new_job(job)?;
self.inner.new_job(job).await?;
let job = new_job(job.into_job())?;
self.inner.push(job).await?;
Ok(())
}
@ -484,10 +485,10 @@ impl QueueHandle {
/// and when a worker for the job's queue is free to do so.
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
where
J: Job,
J: ActixJob,
{
let job = new_scheduled_job(job, after)?;
self.inner.new_job(job).await?;
let job = new_scheduled_job(job.into_job(), after)?;
self.inner.push(job).await?;
Ok(())
}
@ -497,7 +498,7 @@ impl QueueHandle {
/// processed whenever workers are free to do so.
pub fn every<J>(&self, duration: Duration, job: J)
where
J: Job + Clone + Send + 'static,
J: ActixJob + Clone + Send + 'static,
{
actix_rt::spawn(every(self.clone(), duration, job));
}

View file

@ -1,8 +1,8 @@
use std::{ops::Deref, sync::Arc};
use background_jobs_core::Storage;
use crate::storage::{ActixStorage, StorageWrapper};
use anyhow::Error;
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
use std::sync::Arc;
use uuid::Uuid;
/// The server Actor
///
@ -23,21 +23,12 @@ impl Server {
storage: Arc::new(StorageWrapper(storage)),
}
}
}
pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> {
self.storage.new_job(job).await.map(|_| ())
}
impl Deref for Server {
type Target = dyn ActixStorage;
pub(crate) async fn request_job(
&self,
worker_id: Uuid,
worker_queue: &str,
) -> Result<JobInfo, Error> {
tracing::trace!("Worker {} requested job", worker_id);
self.storage.request_job(worker_queue, worker_id).await
}
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {
self.storage.return_job(job).await
fn deref(&self) -> &Self::Target {
self.storage.as_ref()
}
}

View file

@ -4,11 +4,13 @@ use uuid::Uuid;
#[async_trait::async_trait]
pub(crate) trait ActixStorage {
async fn new_job(&self, job: NewJobInfo) -> Result<Uuid, Error>;
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Error>;
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>;
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>;
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>;
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error>;
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error>;
}
pub(crate) struct StorageWrapper<S>(pub(crate) S)
@ -22,15 +24,19 @@ where
S: Storage + Send + Sync,
S::Error: Send + Sync + 'static,
{
async fn new_job(&self, job: NewJobInfo) -> Result<Uuid, Error> {
Ok(self.0.new_job(job).await?)
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Error> {
Ok(self.0.push(job).await?)
}
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error> {
Ok(self.0.request_job(queue, runner_id).await?)
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error> {
Ok(self.0.pop(queue, runner_id).await?)
}
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> {
Ok(self.0.return_job(ret).await?)
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error> {
Ok(self.0.heartbeat(job_id, runner_id).await?)
}
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error> {
Ok(self.0.complete(ret).await?)
}
}

View file

@ -93,7 +93,7 @@ pub(crate) async fn local_worker<State, Extras>(
extras: Some(extras),
};
let id = Uuid::new_v4();
let id = Uuid::now_v7();
let log_on_drop = RunOnDrop(|| {
make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing"));
@ -103,7 +103,7 @@ pub(crate) async fn local_worker<State, Extras>(
let request_span = make_span(id, &queue, "request");
let job = match request_span
.in_scope(|| server.request_job(id, &queue))
.in_scope(|| server.pop(&queue, id))
.instrument(request_span.clone())
.await
{
@ -123,7 +123,7 @@ pub(crate) async fn local_worker<State, Extras>(
drop(request_span);
let process_span = make_span(id, &queue, "process");
let job_id = job.id();
let job_id = job.id;
let return_job = process_span
.in_scope(|| time_job(Box::pin(processors.process(job)), job_id))
.instrument(process_span)
@ -131,7 +131,7 @@ pub(crate) async fn local_worker<State, Extras>(
let return_span = make_span(id, &queue, "return");
if let Err(e) = return_span
.in_scope(|| server.return_job(return_job))
.in_scope(|| server.complete(return_job))
.instrument(return_span.clone())
.await
{
@ -156,7 +156,7 @@ fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
"Worker",
worker.id = tracing::field::display(id),
worker.queue = tracing::field::display(queue),
worker.operation.id = tracing::field::display(&Uuid::new_v4()),
worker.operation.id = tracing::field::display(&Uuid::now_v7()),
worker.operation.name = tracing::field::display(operation),
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,

View file

@ -11,15 +11,13 @@ edition = "2021"
[features]
default = ["error-logging"]
with-actix = ["actix-rt"]
completion-logging = []
error-logging = []
[dependencies]
actix-rt = { version = "2.3.0", optional = true }
anyhow = "1.0"
async-trait = "0.1.24"
event-listener = "2"
event-listener = "4"
metrics = "0.22.0"
time = { version = "0.3", features = ["serde-human-readable"] }
tracing = "0.1"
@ -27,4 +25,4 @@ tracing-futures = "0.2.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "1", features = ["serde", "v4"] }
uuid = { version = "1.6", features = ["serde", "v7"] }

View file

@ -1,8 +1,8 @@
use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
use crate::{Backoff, JobResult, MaxRetries, ShouldStop};
use serde_json::Value;
use std::time::SystemTime;
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
/// Information about the sate of an attempted job
@ -95,16 +95,15 @@ impl NewJobInfo {
self.next_queue.is_none()
}
pub(crate) fn with_id(self, id: Uuid) -> JobInfo {
pub(crate) fn build(self) -> JobInfo {
JobInfo {
id,
id: Uuid::now_v7(),
name: self.name,
queue: self.queue,
status: JobStatus::Pending,
args: self.args,
retry_count: 0,
max_retries: self.max_retries,
next_queue: self.next_queue,
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
backoff_strategy: self.backoff_strategy,
updated_at: OffsetDateTime::now_utc(),
timeout: self.timeout,
@ -112,6 +111,14 @@ impl NewJobInfo {
}
}
fn uuid_from_timestamp(timestamp: OffsetDateTime) -> Uuid {
let unix_seconds = timestamp.unix_timestamp().abs_diff(0);
let unix_nanos = (timestamp.unix_timestamp_nanos() % i128::from(timestamp.unix_timestamp()))
.abs_diff(0) as u32;
Uuid::new_v7(Timestamp::from_unix(NoContext, unix_seconds, unix_nanos))
}
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
/// Metadata pertaining to a job that exists within the background_jobs system
///
@ -119,69 +126,39 @@ impl NewJobInfo {
/// is impossible to create outside of the new_job method.
pub struct JobInfo {
/// ID of the job
id: Uuid,
pub id: Uuid,
/// Name of the job
name: String,
pub name: String,
/// Name of the queue that this job is a part of
queue: String,
pub queue: String,
/// Arguments for a given job
args: Value,
/// Status of the job
status: JobStatus,
pub args: Value,
/// Retries left for this job, None means no limit
retry_count: u32,
pub retry_count: u32,
/// the initial MaxRetries value, for comparing to the current retry count
max_retries: MaxRetries,
pub max_retries: MaxRetries,
/// How often retries should be scheduled
backoff_strategy: Backoff,
pub backoff_strategy: Backoff,
/// The time this job should be dequeued
next_queue: Option<OffsetDateTime>,
pub next_queue: OffsetDateTime,
/// The time this job was last updated
updated_at: OffsetDateTime,
pub updated_at: OffsetDateTime,
/// Milliseconds from execution until the job is considered dead
///
/// This is important for storage implementations to reap unfinished jobs
timeout: i64,
pub timeout: i64,
}
impl JobInfo {
/// The name of the queue this job will run in
pub fn queue(&self) -> &str {
&self.queue
}
fn updated(&mut self) {
self.updated_at = OffsetDateTime::now_utc();
}
pub(crate) fn name(&self) -> &str {
&self.name
}
pub(crate) fn args(&self) -> Value {
self.args.clone()
}
/// The ID of this job
pub fn id(&self) -> Uuid {
self.id
}
/// How long (in milliseconds) before this job is considered failed and can be requeued
pub fn timeout(&self) -> i64 {
self.timeout
}
/// Convert a JobInfo into a ReturnJobInfo without executing it
pub fn unexecuted(self) -> ReturnJobInfo {
ReturnJobInfo {
@ -190,21 +167,23 @@ impl JobInfo {
}
}
/// If the job is queued to run in the future, when is that
pub fn next_queue(&self) -> Option<SystemTime> {
self.next_queue.map(|time| time.into())
/// Produce a UUID from the next_queue timestamp
pub fn next_queue_id(&self) -> Uuid {
uuid_from_timestamp(self.next_queue)
}
pub(crate) fn increment(&mut self) -> ShouldStop {
self.updated();
// Increment the retry-count and determine if the job should be requeued
fn increment(&mut self) -> ShouldStop {
self.updated_at = OffsetDateTime::now_utc();
self.retry_count += 1;
self.max_retries.compare(self.retry_count)
}
/// Update the timestamp on the JobInfo to reflect the next queue time
fn set_next_queue(&mut self) {
let now = OffsetDateTime::now_utc();
let next_queue = match self.backoff_strategy {
self.next_queue = match self.backoff_strategy {
Backoff::Linear(secs) => now + Duration::seconds(secs as i64),
Backoff::Exponential(base) => {
let secs = base.pow(self.retry_count);
@ -212,63 +191,19 @@ impl JobInfo {
}
};
self.next_queue = Some(next_queue);
tracing::trace!(
"Now {}, Next queue {}, ready {}",
now,
next_queue,
self.is_ready(now.into()),
);
tracing::trace!("Now {}, Next queue {}", now, self.next_queue);
}
/// Whether this job is ready to be run
pub fn is_ready(&self, now: SystemTime) -> bool {
match self.next_queue {
Some(ref time) => now > *time,
None => true,
}
}
pub(crate) fn needs_retry(&mut self) -> bool {
/// Increment the retry-count and set next_queue based on the job's configuration
///
/// returns `true` if the job should be retried
pub fn prepare_retry(&mut self) -> bool {
let should_retry = self.increment().should_requeue();
if should_retry {
self.pending();
self.set_next_queue();
}
should_retry
}
/// Whether this job is pending execution
pub fn is_pending(&self, now: SystemTime) -> bool {
self.status == JobStatus::Pending
|| (self.status == JobStatus::Running
&& (self.updated_at + Duration::milliseconds(self.timeout)) < now)
}
/// Get the status of the job
pub fn status(&self) -> JobStatus {
self.status.clone()
}
/// The the date of the most recent update
pub fn updated_at(&self) -> SystemTime {
self.updated_at.into()
}
pub(crate) fn is_in_queue(&self, queue: &str) -> bool {
self.queue == queue
}
pub(crate) fn run(&mut self) {
self.updated();
self.status = JobStatus::Running;
}
pub(crate) fn pending(&mut self) {
self.updated();
self.status = JobStatus::Pending;
}
}

View file

@ -8,13 +8,12 @@
use anyhow::Error;
#[cfg(feature = "with-actix")]
mod actix_job;
mod catch_unwind;
mod job;
mod job_info;
mod processor_map;
mod storage;
mod unsend_job;
pub use crate::{
job::{new_job, new_scheduled_job, process, Job},
@ -23,8 +22,7 @@ pub use crate::{
storage::{memory_storage, Storage},
};
#[cfg(feature = "with-actix")]
pub use actix_job::ActixJob;
pub use unsend_job::{JoinError, UnsendJob, UnsendSpawner};
#[derive(Debug, thiserror::Error)]
/// The error type returned by the `process` method

View file

@ -86,7 +86,7 @@ where
let fut = async move {
let opt = self
.inner
.get(job.name())
.get(&job.name)
.map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone()));
let res = if let Some(fut) = opt {
@ -102,7 +102,7 @@ where
&tracing::field::display("Not registered"),
);
tracing::error!("Not registered");
ReturnJobInfo::unregistered(job.id())
ReturnJobInfo::unregistered(job.id)
};
res
@ -124,7 +124,7 @@ where
let span = job_span(&job);
let fut = async move {
let res = if let Some(name) = self.inner.get(job.name()) {
let res = if let Some(name) = self.inner.get(&job.name) {
process(Arc::clone(name), self.state.clone(), job).await
} else {
let span = Span::current();
@ -137,7 +137,7 @@ where
&tracing::field::display("Not registered"),
);
tracing::error!("Not registered");
ReturnJobInfo::unregistered(job.id())
ReturnJobInfo::unregistered(job.id)
};
res
@ -150,9 +150,9 @@ where
fn job_span(job: &JobInfo) -> Span {
tracing::info_span!(
"Job",
execution_id = tracing::field::display(&Uuid::new_v4()),
job.id = tracing::field::display(&job.id()),
job.name = tracing::field::display(&job.name()),
execution_id = tracing::field::display(&Uuid::now_v7()),
job.id = tracing::field::display(&job.id),
job.name = tracing::field::display(&job.name),
job.execution_time = tracing::field::Empty,
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
@ -163,8 +163,8 @@ async fn process<S>(process_fn: ProcessFn<S>, state: S, job: JobInfo) -> ReturnJ
where
S: Clone,
{
let args = job.args();
let id = job.id();
let args = job.args.clone();
let id = job.id;
let start = Instant::now();
@ -177,7 +177,7 @@ where
let span = Span::current();
span.record("job.execution_time", &tracing::field::display(&seconds));
metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue().to_string(), "name" => job.name().to_string()).record(seconds);
metrics::histogram!("background-jobs.job.execution_time", "queue" => job.queue.clone(), "name" => job.name.clone()).record(seconds);
match res {
Ok(Ok(_)) => {

View file

@ -1,5 +1,5 @@
use crate::{JobInfo, NewJobInfo, ReturnJobInfo};
use std::{error::Error, time::SystemTime};
use std::error::Error;
use uuid::Uuid;
/// Define a storage backend for jobs
@ -13,141 +13,36 @@ pub trait Storage: Clone + Send {
/// The error type used by the storage mechansim.
type Error: Error + Send + Sync;
/// This method generates unique IDs for jobs
async fn generate_id(&self) -> Result<Uuid, Self::Error>;
/// push a job into the queue
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error>;
/// This method should store the supplied job
///
/// The supplied job _may already be present_. The implementation should overwrite the stored
/// job with the new job so that future calls to `fetch_job` return the new one.
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error>;
/// pop a job from the provided queue
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error>;
/// This method should return the job with the given ID regardless of what state the job is in.
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error>;
/// This should fetch a job ready to be processed from the queue
///
/// If a job is not ready, is currently running, or is not in the requested queue, this method
/// should not return it. If no jobs meet these criteria, this method wait until a job becomes available
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error>;
/// This method tells the storage mechanism to mark the given job as being in the provided
/// queue
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error>;
/// This method tells the storage mechanism to mark a given job as running
async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>;
/// This method tells the storage mechanism to remove the job
///
/// This happens when a job has been completed or has failed too many times
async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error>;
/// Generate a new job based on the provided NewJobInfo
async fn new_job(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
let id = self.generate_id().await?;
let job = job.with_id(id);
metrics::counter!("background-jobs.job.created", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
let queue = job.queue().to_owned();
self.save_job(job).await?;
self.queue_job(&queue, id).await?;
Ok(id)
}
/// Fetch a job that is ready to be executed, marking it as running
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
let mut job = self.fetch_job_from_queue(queue).await?;
let now = SystemTime::now();
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
job.run();
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
metrics::counter!("background-jobs.job.started", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
return Ok(job);
} else {
tracing::warn!(
"Not fetching job {}, it is not ready for processing",
job.id()
);
self.queue_job(job.queue(), job.id()).await?;
}
}
}
/// mark a job as being actively worked on
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>;
/// "Return" a job to the database, marking it for retry if needed
async fn return_job(
&self,
ReturnJobInfo { id, result }: ReturnJobInfo,
) -> Result<(), Self::Error> {
if result.is_failure() {
if let Some(mut job) = self.fetch_job(id).await? {
if job.needs_retry() {
metrics::counter!("background-jobs.job.failed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
self.queue_job(job.queue(), id).await?;
self.save_job(job).await
} else {
metrics::counter!("background-jobs.job.dead", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
#[cfg(feature = "error-logging")]
tracing::warn!("Job {} failed permanently", id);
self.delete_job(id).await
}
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing").increment(1);
Ok(())
}
} else if result.is_unregistered() || result.is_unexecuted() {
if let Some(mut job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.returned", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
job.pending();
self.queue_job(job.queue(), id).await?;
self.save_job(job).await
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing").increment(1);
Ok(())
}
} else {
if let Some(job) = self.fetch_job(id).await? {
metrics::counter!("background-jobs.job.completed", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
metrics::counter!("background-jobs.job.finished", "queue" => job.queue().to_string(), "name" => job.name().to_string()).increment(1);
} else {
tracing::warn!("Returned non-existant job");
metrics::counter!("background-jobs.job.missing").increment(1);
}
self.delete_job(id).await
}
}
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), Self::Error>;
}
/// A default, in-memory implementation of a storage mechanism
pub mod memory_storage {
use super::JobInfo;
use crate::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
use event_listener::{Event, EventListener};
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
convert::Infallible,
future::Future,
ops::Bound,
pin::Pin,
sync::Arc,
sync::Mutex,
time::{Duration, SystemTime},
time::Duration,
};
use uuid::Uuid;
use time::OffsetDateTime;
use uuid::{NoContext, Timestamp, Uuid};
/// Allows memory storage to set timeouts for when to retry checking a queue for a job
#[async_trait::async_trait]
@ -165,12 +60,14 @@ pub mod memory_storage {
inner: Arc<Mutex<Inner>>,
}
type OrderedKey = (String, Uuid);
type JobState = Option<(Uuid, OffsetDateTime)>;
type JobMeta = (Uuid, JobState);
struct Inner {
queues: HashMap<String, Event>,
jobs: HashMap<Uuid, JobInfo>,
job_queues: HashMap<Uuid, String>,
worker_ids: HashMap<Uuid, Uuid>,
worker_ids_inverse: HashMap<Uuid, Uuid>,
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
}
impl<T: Timer> Storage<T> {
@ -180,167 +77,202 @@ pub mod memory_storage {
inner: Arc::new(Mutex::new(Inner {
queues: HashMap::new(),
jobs: HashMap::new(),
job_queues: HashMap::new(),
worker_ids: HashMap::new(),
worker_ids_inverse: HashMap::new(),
queue_jobs: BTreeMap::new(),
})),
timer,
}
}
fn contains_job(&self, uuid: &Uuid) -> bool {
self.inner.lock().unwrap().jobs.contains_key(uuid)
}
fn listener(&self, queue: String) -> (Pin<Box<EventListener>>, Duration) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
fn insert_job(&self, job: JobInfo) {
self.inner.lock().unwrap().jobs.insert(job.id(), job);
}
fn get_job(&self, id: &Uuid) -> Option<JobInfo> {
self.inner.lock().unwrap().jobs.get(id).cloned()
}
#[tracing::instrument(skip(self))]
fn try_deque(&self, queue: &str, now: SystemTime) -> Option<JobInfo> {
let mut inner = self.inner.lock().unwrap();
let j = inner.job_queues.iter().find_map(|(k, v)| {
if v == queue {
let job = inner.jobs.get(k)?;
let listener = inner.queues.entry(queue.clone()).or_default().listen();
if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) {
return Some(job.clone());
let next_job = inner
.queue_jobs
.range((
Bound::Excluded((queue.clone(), lower_bound)),
Bound::Included((queue, upper_bound)),
))
.find_map(|(_, (id, meta))| {
if meta.is_none() {
inner.jobs.get(id)
} else {
None
}
}
});
let duration = if let Some(job) = next_job {
let duration = OffsetDateTime::now_utc() - job.next_queue;
duration.try_into().ok()
} else {
None
});
};
if let Some(job) = j {
inner.job_queues.remove(&job.id());
return Some(job);
(listener, duration.unwrap_or(Duration::from_secs(10)))
}
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let now = time::OffsetDateTime::now_utc();
let mut inner = self.inner.lock().unwrap();
let mut pop_job = None;
for (_, (job_id, job_meta)) in inner.queue_jobs.range_mut((
Bound::Excluded((queue.to_string(), lower_bound)),
Bound::Included((queue.to_string(), upper_bound)),
)) {
if job_meta.is_none()
|| job_meta.is_some_and(|(_, h)| h + time::Duration::seconds(30) < now)
{
*job_meta = Some((runner_id, now));
pop_job = Some(*job_id);
break;
}
}
if let Some(job_id) = pop_job {
return inner.jobs.get(&job_id).cloned();
}
None
}
#[tracing::instrument(skip(self))]
fn listener(&self, queue: &str, now: SystemTime) -> (Duration, EventListener) {
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let mut inner = self.inner.lock().unwrap();
let duration =
inner
.job_queues
.iter()
.fold(Duration::from_secs(5), |duration, (id, v_queue)| {
if v_queue == queue {
if let Some(job) = inner.jobs.get(id) {
if let Some(ready_at) = job.next_queue() {
let job_eta = ready_at
.duration_since(now)
.unwrap_or(Duration::from_secs(0));
let queue = if let Some(job) = inner.jobs.get(&job_id) {
job.queue.clone()
} else {
return;
};
if job_eta < duration {
return job_eta;
}
}
}
}
duration
});
let listener = inner.queues.entry(queue.to_string()).or_default().listen();
(duration, listener)
}
fn queue_and_notify(&self, queue: &str, id: Uuid) {
let mut inner = self.inner.lock().unwrap();
inner.job_queues.insert(id, queue.to_owned());
inner.queues.entry(queue.to_string()).or_default().notify(1);
}
fn mark_running(&self, job_id: Uuid, worker_id: Uuid) {
let mut inner = self.inner.lock().unwrap();
inner.worker_ids.insert(job_id, worker_id);
inner.worker_ids_inverse.insert(worker_id, job_id);
}
fn purge_job(&self, job_id: Uuid) {
let mut inner = self.inner.lock().unwrap();
inner.jobs.remove(&job_id);
inner.job_queues.remove(&job_id);
if let Some(worker_id) = inner.worker_ids.remove(&job_id) {
inner.worker_ids_inverse.remove(&worker_id);
for (_, (found_job_id, found_job_meta)) in inner.queue_jobs.range_mut((
Bound::Excluded((queue.clone(), lower_bound)),
Bound::Included((queue, upper_bound)),
)) {
if *found_job_id == job_id {
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
return;
}
}
}
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let mut inner = self.inner.lock().unwrap();
let job = inner.jobs.remove(&job_id)?;
let mut key = None;
for (found_key, (found_job_id, _)) in inner.queue_jobs.range_mut((
Bound::Excluded((job.queue.clone(), lower_bound)),
Bound::Included((job.queue.clone(), upper_bound)),
)) {
if *found_job_id == job_id {
key = Some(found_key.clone());
break;
}
}
if let Some(key) = key {
inner.queue_jobs.remove(&key);
}
Some(job)
}
fn insert(&self, job: JobInfo) -> Uuid {
let id = job.id;
let queue = job.queue.clone();
let queue_time_id = job.next_queue_id();
let mut inner = self.inner.lock().unwrap();
inner.jobs.insert(id, job);
inner
.queue_jobs
.insert((queue.clone(), queue_time_id), (id, None));
inner.queues.entry(queue).or_default().notify(1);
id
}
}
#[async_trait::async_trait]
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
type Error = Infallible;
async fn generate_id(&self) -> Result<Uuid, Self::Error> {
let uuid = loop {
let uuid = Uuid::new_v4();
if !self.contains_job(&uuid) {
break uuid;
}
};
Ok(uuid)
/// push a job into the queue
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
Ok(self.insert(job.build()))
}
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> {
self.insert_job(job);
Ok(())
}
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
Ok(self.get_job(&id))
}
#[tracing::instrument(skip(self))]
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error> {
/// pop a job from the provided queue
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
let now = SystemTime::now();
let (listener, duration) = self.listener(queue.to_string());
if let Some(job) = self.try_deque(queue, now) {
if let Some(job) = self.try_pop(queue, runner_id) {
return Ok(job);
}
tracing::debug!("No job ready in queue");
let (duration, listener) = self.listener(queue, now);
tracing::debug!("waiting at most {} seconds", duration.as_secs());
if duration > Duration::from_secs(0) {
let _ = self.timer.timeout(duration, listener).await;
match self.timer.timeout(duration, listener).await {
Ok(()) => {
// listener wakeup
}
Err(()) => {
// timeout
}
}
tracing::debug!("Finished waiting, trying dequeue");
}
}
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> {
self.queue_and_notify(queue, id);
/// mark a job as being actively worked on
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> {
self.set_heartbeat(job_id, runner_id);
Ok(())
}
async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> {
self.mark_running(id, worker_id);
/// "Return" a job to the database, marking it for retry if needed
async fn complete(
&self,
ReturnJobInfo { id, result }: ReturnJobInfo,
) -> Result<(), Self::Error> {
let mut job = if let Some(job) = self.remove_job(id) {
job
} else {
return Ok(());
};
Ok(())
}
async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> {
self.purge_job(id);
match result {
JobResult::Success => {
// nothing
}
JobResult::Unregistered | JobResult::Unexecuted => {
// do stuff...
}
JobResult::Failure => {
// requeue
if job.prepare_retry() {
self.insert(job);
}
}
}
Ok(())
}

View file

@ -1,5 +1,4 @@
use crate::{Backoff, Job, MaxRetries};
use actix_rt::task::JoinHandle;
use anyhow::Error;
use serde::{de::DeserializeOwned, ser::Serialize};
use std::{
@ -11,11 +10,39 @@ use std::{
use tracing::Span;
use tracing_futures::Instrument;
/// The ActixJob trait defines parameters pertaining to an instance of background job
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// The type produced when a task is dropped before completion as a result of being deliberately
/// canceled, or it panicking
pub struct JoinError;
impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Task has been canceled")
}
}
impl std::error::Error for JoinError {}
/// The mechanism used to spawn Unsend futures, making them Send
pub trait UnsendSpawner {
/// The Handle to the job, implements a Send future with the Job's output
type Handle<T>: Future<Output = Result<T, JoinError>> + Send + Unpin
where
T: Send;
/// Spawn the unsend future producing a Send handle
fn spawn<Fut>(future: Fut) -> Self::Handle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: Send + 'static;
}
/// The UnsendJob trait defines parameters pertaining to an instance of a background job
///
/// This trait is specific to Actix, and will automatically implement the Job trait with the
/// proper translation from ?Send futures to Send futures
pub trait ActixJob: Serialize + DeserializeOwned + 'static {
/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires
/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send
/// future
pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
/// The application state provided to this job at runtime.
type State: Clone + 'static;
@ -24,6 +51,9 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static {
/// Importantly, this Future does not require Send
type Future: Future<Output = Result<(), Error>>;
/// The spawner type that will be used to spawn the unsend future
type Spawner: UnsendSpawner;
/// The name of the job
///
/// This name must be unique!!!
@ -118,42 +148,40 @@ where
impl<T> Job for T
where
T: ActixJob + std::panic::UnwindSafe,
T: UnsendJob + std::panic::UnwindSafe,
{
type State = T::State;
type Future = UnwrapFuture<JoinHandle<Result<(), Error>>>;
type Future = UnwrapFuture<<T::Spawner as UnsendSpawner>::Handle<Result<(), Error>>>;
const NAME: &'static str = <Self as ActixJob>::NAME;
const QUEUE: &'static str = <Self as ActixJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <Self as ActixJob>::MAX_RETRIES;
const BACKOFF: Backoff = <Self as ActixJob>::BACKOFF;
const TIMEOUT: i64 = <Self as ActixJob>::TIMEOUT;
const NAME: &'static str = <Self as UnsendJob>::NAME;
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
const TIMEOUT: i64 = <Self as UnsendJob>::TIMEOUT;
fn run(self, state: Self::State) -> Self::Future {
let fut = ActixJob::run(self, state);
let instrumented = fut.instrument(Span::current());
let handle = actix_rt::spawn(instrumented);
UnwrapFuture(handle)
UnwrapFuture(T::Spawner::spawn(
UnsendJob::run(self, state).instrument(Span::current()),
))
}
fn span(&self) -> Option<Span> {
ActixJob::span(self)
UnsendJob::span(self)
}
fn queue(&self) -> &str {
ActixJob::queue(self)
UnsendJob::queue(self)
}
fn max_retries(&self) -> MaxRetries {
ActixJob::max_retries(self)
UnsendJob::max_retries(self)
}
fn backoff_strategy(&self) -> Backoff {
ActixJob::backoff_strategy(self)
UnsendJob::backoff_strategy(self)
}
fn timeout(&self) -> i64 {
ActixJob::timeout(self)
UnsendJob::timeout(self)
}
}

View file

@ -326,30 +326,22 @@ impl HistogramFn for Histogram {
fn record(&self, _: f64) {}
}
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
const MONTHS: u64 = 30 * DAYS;
const SECOND: u64 = 1;
const MINUTE: u64 = 60 * SECOND;
const HOUR: u64 = 60 * MINUTE;
const DAY: u64 = 24 * HOUR;
const MONTH: u64 = 30 * DAY;
impl Default for JobStatStorage {
fn default() -> Self {
JobStatStorage {
hour: Buckets::new(
Duration::from_secs(1 * HOURS),
Duration::from_secs(3 * MINUTES),
Duration::from_secs(HOUR),
Duration::from_secs(3 * MINUTE),
20,
),
day: Buckets::new(
Duration::from_secs(1 * DAYS),
Duration::from_secs(1 * HOURS),
24,
),
month: Buckets::new(
Duration::from_secs(1 * MONTHS),
Duration::from_secs(1 * DAYS),
30,
),
day: Buckets::new(Duration::from_secs(DAY), Duration::from_secs(HOUR), 24),
month: Buckets::new(Duration::from_secs(MONTH), Duration::from_secs(DAY), 30),
total: 0,
}
}