From ca1c07366692c44fce6ae351a4a6762089660a20 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 30 Mar 2020 10:36:49 -0500 Subject: [PATCH] Add ActixJob to handle spawinging ?Send futures --- jobs-actix/Cargo.toml | 2 +- jobs-actix/src/lib.rs | 2 + jobs-core/Cargo.toml | 6 +++ jobs-core/src/actix_job.rs | 101 +++++++++++++++++++++++++++++++++++++ jobs-core/src/lib.rs | 5 ++ src/lib.rs | 2 +- 6 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 jobs-core/src/actix_job.rs diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 9e2c8ea..9539ac8 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -14,7 +14,7 @@ actix = "0.10.0-alpha.2" actix-rt = "1.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs-core = { version = "0.7", path = "../jobs-core" } +background-jobs-core = { version = "0.7", path = "../jobs-core", features = ["with-actix"] } chrono = "0.4" log = "0.4" num_cpus = "1.10.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 9e78ffe..79189a6 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -139,6 +139,8 @@ mod worker; use self::{every::every, server::Server, worker::local_worker}; +pub use background_jobs_core::ActixJob; + /// Create a new Server /// /// In previous versions of this library, the server itself was run on it's own dedicated threads diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 7951cff..ab2750e 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -9,7 +9,12 @@ keywords = ["jobs", "processor"] readme = "../README.md" edition = "2018" +[features] +default = [] +with-actix = ["actix", "tokio"] + [dependencies] +actix = { version = "0.10.0-alpha.1", optional = true } anyhow = "1.0" async-trait = "0.1.24" chrono = { version = "0.4", features = ["serde"] } @@ -18,4 +23,5 @@ log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" +tokio = { version = "0.2.13", optional = true } uuid = { version = "0.8.1", features = ["serde", "v4"] } diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs new file mode 100644 index 0000000..fef73ce --- /dev/null +++ b/jobs-core/src/actix_job.rs @@ -0,0 +1,101 @@ +use crate::{Backoff, Job, MaxRetries, Processor}; +use anyhow::Error; +use log::error; +use serde::{de::DeserializeOwned, ser::Serialize}; +use std::{future::Future, pin::Pin}; +use tokio::sync::oneshot; + +/// The ActixJob trait defines parameters pertaining to an instance of background job +/// +/// This trait is specific to Actix, and will automatically implement the Job trait with the +/// proper translation from ?Send futures to Send futures +pub trait ActixJob: Serialize + DeserializeOwned + 'static { + /// The processor this job is associated with. The job's processor can be used to create a + /// JobInfo from a job, which is used to serialize the job into a storage mechanism. + type Processor: Processor; + + /// The application state provided to this job at runtime. + type State: Clone + 'static; + + /// The future returned by this job + type Future: Future>; + + /// Users of this library must define what it means to run a job. + /// + /// This should contain all the logic needed to complete a job. If that means queuing more + /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy + /// processes, that logic should all be called from inside this method. + /// + /// The state passed into this job is initialized at the start of the application. The state + /// argument could be useful for containing a hook into something like r2d2, or the address of + /// an actor in an actix-based system. + fn run(self, state: Self::State) -> Self::Future; + + /// If this job should not use the default queue for its processor, this can be overridden in + /// user-code. + /// + /// Jobs will only be processed by processors that are registered, and if a queue is supplied + /// here that is not associated with a valid processor for this job, it will never be + /// processed. + fn queue(&self) -> Option<&str> { + None + } + + /// If this job should not use the default maximum retry count for its processor, this can be + /// overridden in user-code. + fn max_retries(&self) -> Option { + None + } + + /// If this job should not use the default backoff strategy for its processor, this can be + /// overridden in user-code. + fn backoff_strategy(&self) -> Option { + None + } + + /// Define the maximum number of milliseconds this job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + fn timeout(&self) -> Option { + None + } +} + +impl Job for T +where + T: ActixJob, +{ + type Processor = T::Processor; + type State = T::State; + type Future = Pin> + Send>>; + + fn run(self, state: Self::State) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + actix::spawn(async move { + if let Err(_) = tx.send(ActixJob::run(self, state).await) { + error!("Job dropped"); + } + }); + + Box::pin(async move { rx.await? }) + } + + fn queue(&self) -> Option<&str> { + ActixJob::queue(self) + } + + fn max_retries(&self) -> Option { + ActixJob::max_retries(self) + } + + fn backoff_strategy(&self) -> Option { + ActixJob::backoff_strategy(self) + } + + fn timeout(&self) -> Option { + ActixJob::timeout(self) + } +} diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 4f97057..7bffecc 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -8,6 +8,8 @@ use anyhow::Error; +#[cfg(feature = "with-actix")] +mod actix_job; mod job; mod job_info; mod processor; @@ -24,6 +26,9 @@ pub use crate::{ storage::{memory_storage, Storage}, }; +#[cfg(feature = "with-actix")] +pub use actix_job::ActixJob; + #[derive(Debug, thiserror::Error)] /// The error type returned by a `Processor`'s `process` method pub enum JobError { diff --git a/src/lib.rs b/src/lib.rs index c2d9e3e..dea4633 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,7 +208,7 @@ pub use background_jobs_core::{ }; #[cfg(feature = "background-jobs-actix")] -pub use background_jobs_actix::{create_server, QueueHandle, WorkerConfig}; +pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig}; #[cfg(feature = "background-jobs-sled-storage")] pub mod sled_storage {