diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index cf54e64..a961d31 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -13,5 +13,6 @@ anyhow = "1.0" async-trait = "0.1.24" background-jobs = { version = "0.8.0-alpha.0", path = "../.." } env_logger = "0.7" +futures = "0.3" sled-extensions = "0.2.0" serde = { version = "1.0", features = ["derive"] } diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 25a7ad1..3103600 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,5 +1,6 @@ use anyhow::Error; use background_jobs::{create_server, Job, MaxRetries, Processor, WorkerConfig}; +use futures::future::{ok, Ready}; const DEFAULT_QUEUE: &'static str = "default"; @@ -73,11 +74,12 @@ impl MyJob { impl Job for MyJob { type Processor = MyProcessor; type State = MyState; + type Future = Ready>; - async fn run(self, state: MyState) -> Result<(), Error> { + fn run(self, state: MyState) -> Self::Future { println!("{}: args, {:?}", state.app_name, self); - Ok(()) + ok(()) } } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 460f7c9..06023a6 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -13,9 +13,9 @@ //! ### Example //! ```rust,ignore //! use actix::System; -//! use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig}; //! use anyhow::Error; -//! use serde_derive::{Deserialize, Serialize}; +//! use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig}; +//! use futures::future::{ok, Ready}; //! //! const DEFAULT_QUEUE: &'static str = "default"; //! @@ -24,7 +24,7 @@ //! pub app_name: String, //! } //! -//! #[derive(Clone, Debug, Deserialize, Serialize)] +//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] //! pub struct MyJob { //! some_usize: usize, //! other_usize: usize, @@ -80,11 +80,12 @@ //! impl Job for MyJob { //! type Processor = MyProcessor; //! type State = MyState; +//! type Future = Ready>; //! -//! async fn run(self, state: MyState) -> Result<(), Error> { +//! async fn run(self, state: MyState) -> Self::Future { //! println!("{}: args, {:?}", state.app_name, self); //! -//! Ok(()) +//! ok(()) //! } //! } //! diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index e41dbea..f55d414 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,9 +1,9 @@ use crate::{Backoff, MaxRetries, Processor}; use anyhow::Error; use serde::{de::DeserializeOwned, ser::Serialize}; +use std::future::Future; /// The Job trait defines parameters pertaining to an instance of background job -#[async_trait::async_trait] pub trait Job: Serialize + DeserializeOwned + 'static { /// The processor this job is associated with. The job's processor can be used to create a /// JobInfo from a job, which is used to serialize the job into a storage mechanism. @@ -12,6 +12,9 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The application state provided to this job at runtime. type State: Clone + 'static; + /// The future returned by this job + type Future: Future> + Send; + /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more @@ -21,7 +24,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The state passed into this job is initialized at the start of the application. The state /// argument could be useful for containing a hook into something like r2d2, or the address of /// an actor in an actix-based system. - async fn run(self, state: Self::State) -> Result<(), Error>; + fn run(self, state: Self::State) -> Self::Future; /// If this job should not use the default queue for its processor, this can be overridden in /// user-code. diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index baa62e5..fb9b225 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -23,6 +23,7 @@ use std::{future::Future, pin::Pin}; /// ```rust /// use anyhow::Error; /// use background_jobs_core::{Job, Processor}; +/// use futures::future::{ok, Ready}; /// use log::info; /// /// #[derive(serde::Deserialize, serde::Serialize)] @@ -30,15 +31,15 @@ use std::{future::Future, pin::Pin}; /// count: i32, /// } /// -/// #[async_trait::async_trait] /// impl Job for MyJob { /// type Processor = MyProcessor; /// type State = (); +/// type Future = Ready>; /// -/// async fn run(self, _state: Self::State) -> Result<(), Error> { +/// fn run(self, _state: Self::State) -> Self::Future { /// info!("Processing {}", self.count); /// -/// Ok(()) +/// ok(()) /// } /// } /// diff --git a/src/lib.rs b/src/lib.rs index 9ffc255..c2d9e3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,10 +30,9 @@ //! [dependencies] //! actix = "0.8" //! background-jobs = "0.6.0" -//! failure = "0.1" +//! anyhow = "1.0" //! futures = "0.1" -//! serde = "1.0" -//! serde_drive = "1.0" +//! serde = { version = "1.0", features = ["derive"] } //! sled = "0.28" //! ``` //! @@ -42,11 +41,11 @@ //! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. //! //! ```rust,ignore +//! use anyhow::Error; //! use background_jobs::Job; -//! use serde_derive::{Deserialize, Serialize}; -//! use failure::Error; +//! use futures::future::{ok, Ready}; //! -//! #[derive(Clone, Debug, Deserialize, Serialize)] +//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] //! pub struct MyJob { //! some_usize: usize, //! other_usize: usize, @@ -64,12 +63,12 @@ //! impl Job for MyJob { //! type Processor = MyProcessor; // We'll define this later //! type State = (); -//! type Future = Result<(), Error>; +//! type Future = Ready>; //! //! fn run(self, _: Self::State) -> Self::Future { //! println!("args: {:?}", self); //! -//! Ok(()) +//! ok(()) //! } //! } //! ``` @@ -82,7 +81,8 @@ //! Let's re-define the job to care about some application state. //! //! ```rust,ignore -//! # use failure::Error; +//! use anyhow::Error; +//! //! #[derive(Clone, Debug)] //! pub struct MyState { //! pub app_name: String, @@ -99,12 +99,12 @@ //! impl Job for MyJob { //! type Processor = MyProcessor; //! type State = MyState; -//! type Future = Result<(), Error>; +//! type Future = Ready>; //! //! fn run(self, state: Self::State) -> Self::Future { //! info!("{}: args, {:?}", state.app_name, self); //! -//! Ok(()) +//! ok(()) //! } //! } //! ``` @@ -164,15 +164,12 @@ //! //! ##### Main //! ```rust,ignore -//! use actix::System; +//! use anyhow::Error; //! use background_jobs::{ServerConfig, sled_storage::Storage, WorkerConfig}; -//! use failure::Error; //! use sled::Db; //! -//! fn main() -> Result<(), Error> { -//! // First set up the Actix System to ensure we have a runtime to spawn jobs on. -//! let sys = System::new("my-actix-system"); -//! +//! #[actix_rt::main] +//! async fn main() -> Result<(), Error> { //! // Set up our Storage //! let db = Db::start_default("my-sled-db")?; //! let storage = Storage::new(db)?; @@ -192,7 +189,7 @@ //! queue_handle.queue::(MyJob::new(5, 6))?; //! //! // Block on Actix -//! sys.run()?; +//! actix_rt::signal::ctrl_c().await?; //! Ok(()) //! } //! ```