mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-25 05:21:00 +00:00
Remove actix-job, make ActixSpawner public
This commit is contained in:
parent
0cd0f91369
commit
d1d578aa97
3 changed files with 12 additions and 170 deletions
|
@ -1,10 +1,9 @@
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
|
||||||
use anyhow::Error;
|
use background_jobs_core::{JoinError, UnsendSpawner};
|
||||||
use background_jobs_core::{Backoff, JoinError, MaxRetries, UnsendJob, UnsendSpawner};
|
|
||||||
use serde::{de::DeserializeOwned, ser::Serialize};
|
|
||||||
use tracing::Span;
|
|
||||||
|
|
||||||
|
/// Provide a spawner for actix-based systems for Unsend Jobs
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
pub struct ActixSpawner;
|
pub struct ActixSpawner;
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -42,161 +41,3 @@ impl<T> Drop for ActixHandle<T> {
|
||||||
self.0.abort();
|
self.0.abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The UnsendJob trait defines parameters pertaining to an instance of a background job
|
|
||||||
///
|
|
||||||
/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires
|
|
||||||
/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send
|
|
||||||
/// future
|
|
||||||
pub trait ActixJob: Serialize + DeserializeOwned + 'static {
|
|
||||||
/// The application state provided to this job at runtime.
|
|
||||||
type State: Clone + 'static;
|
|
||||||
|
|
||||||
/// The future returned by this job
|
|
||||||
///
|
|
||||||
/// Importantly, this Future does not require Send
|
|
||||||
type Future: Future<Output = Result<(), Error>>;
|
|
||||||
|
|
||||||
/// The name of the job
|
|
||||||
///
|
|
||||||
/// This name must be unique!!!
|
|
||||||
const NAME: &'static str;
|
|
||||||
|
|
||||||
/// The name of the default queue for this job
|
|
||||||
///
|
|
||||||
/// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
|
|
||||||
/// the job will never be processed.
|
|
||||||
const QUEUE: &'static str = "default";
|
|
||||||
|
|
||||||
/// Define the default number of retries for this job
|
|
||||||
///
|
|
||||||
/// Defaults to Count(5)
|
|
||||||
/// Jobs can override
|
|
||||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(5);
|
|
||||||
|
|
||||||
/// Define the default backoff strategy for this job
|
|
||||||
///
|
|
||||||
/// Defaults to Exponential(2)
|
|
||||||
/// Jobs can override
|
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(2);
|
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds a job should be allowed to run before being
|
|
||||||
/// considered dead.
|
|
||||||
///
|
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
|
||||||
/// completed.
|
|
||||||
///
|
|
||||||
/// Defaults to 15 seconds
|
|
||||||
/// Jobs can override
|
|
||||||
const TIMEOUT: i64 = 15_000;
|
|
||||||
|
|
||||||
/// Users of this library must define what it means to run a job.
|
|
||||||
///
|
|
||||||
/// This should contain all the logic needed to complete a job. If that means queuing more
|
|
||||||
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
|
|
||||||
/// processes, that logic should all be called from inside this method.
|
|
||||||
///
|
|
||||||
/// The state passed into this job is initialized at the start of the application. The state
|
|
||||||
/// argument could be useful for containing a hook into something like r2d2, or the address of
|
|
||||||
/// an actor in an actix-based system.
|
|
||||||
fn run(self, state: Self::State) -> Self::Future;
|
|
||||||
|
|
||||||
/// Generate a Span that the job will be processed within
|
|
||||||
fn span(&self) -> Option<Span> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If this job should not use it's default queue, this can be overridden in
|
|
||||||
/// user-code.
|
|
||||||
fn queue(&self) -> &str {
|
|
||||||
Self::QUEUE
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If this job should not use it's default maximum retry count, this can be
|
|
||||||
/// overridden in user-code.
|
|
||||||
fn max_retries(&self) -> MaxRetries {
|
|
||||||
Self::MAX_RETRIES
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If this job should not use it's default backoff strategy, this can be
|
|
||||||
/// overridden in user-code.
|
|
||||||
fn backoff_strategy(&self) -> Backoff {
|
|
||||||
Self::BACKOFF
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds this job should be allowed to run before being
|
|
||||||
/// considered dead.
|
|
||||||
///
|
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
|
||||||
/// completed.
|
|
||||||
fn timeout(&self) -> i64 {
|
|
||||||
Self::TIMEOUT
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Provide helper methods for queuing ActixJobs
|
|
||||||
pub trait ActixJobExt: ActixJob {
|
|
||||||
/// Turn an ActixJob into a type that implements Job
|
|
||||||
fn into_job(self) -> ActixJobWrapper<Self>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
ActixJobWrapper(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> ActixJobExt for T where T: ActixJob {}
|
|
||||||
|
|
||||||
impl<T> From<T> for ActixJobWrapper<T>
|
|
||||||
where
|
|
||||||
T: ActixJob,
|
|
||||||
{
|
|
||||||
fn from(value: T) -> Self {
|
|
||||||
ActixJobWrapper(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
|
||||||
// A wrapper for ActixJob implementing UnsendJob with an ActixSpawner
|
|
||||||
pub struct ActixJobWrapper<T>(T);
|
|
||||||
|
|
||||||
impl<T> UnsendJob for ActixJobWrapper<T>
|
|
||||||
where
|
|
||||||
T: ActixJob,
|
|
||||||
{
|
|
||||||
type State = <T as ActixJob>::State;
|
|
||||||
|
|
||||||
type Future = <T as ActixJob>::Future;
|
|
||||||
|
|
||||||
type Spawner = ActixSpawner;
|
|
||||||
|
|
||||||
const NAME: &'static str = <T as ActixJob>::NAME;
|
|
||||||
const QUEUE: &'static str = <T as ActixJob>::QUEUE;
|
|
||||||
const MAX_RETRIES: MaxRetries = <T as ActixJob>::MAX_RETRIES;
|
|
||||||
const BACKOFF: Backoff = <T as ActixJob>::BACKOFF;
|
|
||||||
const TIMEOUT: i64 = <T as ActixJob>::TIMEOUT;
|
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
|
||||||
<T as ActixJob>::run(self.0, state)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn span(&self) -> Option<Span> {
|
|
||||||
self.0.span()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn queue(&self) -> &str {
|
|
||||||
self.0.queue()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn max_retries(&self) -> MaxRetries {
|
|
||||||
self.0.max_retries()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn backoff_strategy(&self) -> Backoff {
|
|
||||||
self.0.backoff_strategy()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn timeout(&self) -> i64 {
|
|
||||||
self.0.timeout()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use crate::{ActixJob, QueueHandle};
|
use crate::QueueHandle;
|
||||||
use actix_rt::time::{interval_at, Instant};
|
use actix_rt::time::{interval_at, Instant};
|
||||||
|
use background_jobs_core::Job;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
/// A type used to schedule recurring jobs.
|
/// A type used to schedule recurring jobs.
|
||||||
|
@ -10,7 +11,7 @@ use std::time::Duration;
|
||||||
/// ```
|
/// ```
|
||||||
pub(crate) async fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
|
pub(crate) async fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
|
||||||
where
|
where
|
||||||
J: ActixJob + Clone + Send,
|
J: Job + Clone + Send,
|
||||||
{
|
{
|
||||||
let mut interval = interval_at(Instant::now(), duration);
|
let mut interval = interval_at(Instant::now(), duration);
|
||||||
|
|
||||||
|
|
|
@ -136,7 +136,7 @@ mod worker;
|
||||||
|
|
||||||
use self::{every::every, server::Server};
|
use self::{every::every, server::Server};
|
||||||
|
|
||||||
pub use actix_job::{ActixJob, ActixJobExt};
|
pub use actix_job::ActixSpawner;
|
||||||
|
|
||||||
/// A timer implementation for the Memory Storage backend
|
/// A timer implementation for the Memory Storage backend
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -472,9 +472,9 @@ impl QueueHandle {
|
||||||
/// job's queue is free to do so.
|
/// job's queue is free to do so.
|
||||||
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
|
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
J: ActixJob,
|
J: Job,
|
||||||
{
|
{
|
||||||
let job = new_job(job.into_job())?;
|
let job = new_job(job)?;
|
||||||
self.inner.push(job).await?;
|
self.inner.push(job).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -485,9 +485,9 @@ impl QueueHandle {
|
||||||
/// and when a worker for the job's queue is free to do so.
|
/// and when a worker for the job's queue is free to do so.
|
||||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
|
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
J: ActixJob,
|
J: Job,
|
||||||
{
|
{
|
||||||
let job = new_scheduled_job(job.into_job(), after)?;
|
let job = new_scheduled_job(job, after)?;
|
||||||
self.inner.push(job).await?;
|
self.inner.push(job).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -498,7 +498,7 @@ impl QueueHandle {
|
||||||
/// processed whenever workers are free to do so.
|
/// processed whenever workers are free to do so.
|
||||||
pub fn every<J>(&self, duration: Duration, job: J)
|
pub fn every<J>(&self, duration: Duration, job: J)
|
||||||
where
|
where
|
||||||
J: ActixJob + Clone + Send + 'static,
|
J: Job + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
actix_rt::spawn(every(self.clone(), duration, job));
|
actix_rt::spawn(every(self.clone(), duration, job));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue