Move job back to named Future associated type

This commit is contained in:
asonix 2020-03-21 09:44:38 -05:00
parent 4514db49ee
commit 2fe8e9885c
6 changed files with 35 additions and 30 deletions

View file

@ -13,5 +13,6 @@ anyhow = "1.0"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs = { version = "0.8.0-alpha.0", path = "../.." } background-jobs = { version = "0.8.0-alpha.0", path = "../.." }
env_logger = "0.7" env_logger = "0.7"
futures = "0.3"
sled-extensions = "0.2.0" sled-extensions = "0.2.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View file

@ -1,5 +1,6 @@
use anyhow::Error; use anyhow::Error;
use background_jobs::{create_server, Job, MaxRetries, Processor, WorkerConfig}; use background_jobs::{create_server, Job, MaxRetries, Processor, WorkerConfig};
use futures::future::{ok, Ready};
const DEFAULT_QUEUE: &'static str = "default"; const DEFAULT_QUEUE: &'static str = "default";
@ -73,11 +74,12 @@ impl MyJob {
impl Job for MyJob { impl Job for MyJob {
type Processor = MyProcessor; type Processor = MyProcessor;
type State = MyState; type State = MyState;
type Future = Ready<Result<(), Error>>;
async fn run(self, state: MyState) -> Result<(), Error> { fn run(self, state: MyState) -> Self::Future {
println!("{}: args, {:?}", state.app_name, self); println!("{}: args, {:?}", state.app_name, self);
Ok(()) ok(())
} }
} }

View file

@ -13,9 +13,9 @@
//! ### Example //! ### Example
//! ```rust,ignore //! ```rust,ignore
//! use actix::System; //! use actix::System;
//! use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig};
//! use anyhow::Error; //! use anyhow::Error;
//! use serde_derive::{Deserialize, Serialize}; //! use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig};
//! use futures::future::{ok, Ready};
//! //!
//! const DEFAULT_QUEUE: &'static str = "default"; //! const DEFAULT_QUEUE: &'static str = "default";
//! //!
@ -24,7 +24,7 @@
//! pub app_name: String, //! pub app_name: String,
//! } //! }
//! //!
//! #[derive(Clone, Debug, Deserialize, Serialize)] //! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
//! pub struct MyJob { //! pub struct MyJob {
//! some_usize: usize, //! some_usize: usize,
//! other_usize: usize, //! other_usize: usize,
@ -80,11 +80,12 @@
//! impl Job for MyJob { //! impl Job for MyJob {
//! type Processor = MyProcessor; //! type Processor = MyProcessor;
//! type State = MyState; //! type State = MyState;
//! type Future = Ready<Result<(), Error>>;
//! //!
//! async fn run(self, state: MyState) -> Result<(), Error> { //! async fn run(self, state: MyState) -> Self::Future {
//! println!("{}: args, {:?}", state.app_name, self); //! println!("{}: args, {:?}", state.app_name, self);
//! //!
//! Ok(()) //! ok(())
//! } //! }
//! } //! }
//! //!

View file

@ -1,9 +1,9 @@
use crate::{Backoff, MaxRetries, Processor}; use crate::{Backoff, MaxRetries, Processor};
use anyhow::Error; use anyhow::Error;
use serde::{de::DeserializeOwned, ser::Serialize}; use serde::{de::DeserializeOwned, ser::Serialize};
use std::future::Future;
/// The Job trait defines parameters pertaining to an instance of background job /// The Job trait defines parameters pertaining to an instance of background job
#[async_trait::async_trait]
pub trait Job: Serialize + DeserializeOwned + 'static { pub trait Job: Serialize + DeserializeOwned + 'static {
/// The processor this job is associated with. The job's processor can be used to create a /// The processor this job is associated with. The job's processor can be used to create a
/// JobInfo from a job, which is used to serialize the job into a storage mechanism. /// JobInfo from a job, which is used to serialize the job into a storage mechanism.
@ -12,6 +12,9 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// The application state provided to this job at runtime. /// The application state provided to this job at runtime.
type State: Clone + 'static; type State: Clone + 'static;
/// The future returned by this job
type Future: Future<Output = Result<(), Error>> + Send;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
/// This should contain all the logic needed to complete a job. If that means queuing more /// This should contain all the logic needed to complete a job. If that means queuing more
@ -21,7 +24,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// The state passed into this job is initialized at the start of the application. The state /// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of /// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system. /// an actor in an actix-based system.
async fn run(self, state: Self::State) -> Result<(), Error>; fn run(self, state: Self::State) -> Self::Future;
/// If this job should not use the default queue for its processor, this can be overridden in /// If this job should not use the default queue for its processor, this can be overridden in
/// user-code. /// user-code.

View file

@ -23,6 +23,7 @@ use std::{future::Future, pin::Pin};
/// ```rust /// ```rust
/// use anyhow::Error; /// use anyhow::Error;
/// use background_jobs_core::{Job, Processor}; /// use background_jobs_core::{Job, Processor};
/// use futures::future::{ok, Ready};
/// use log::info; /// use log::info;
/// ///
/// #[derive(serde::Deserialize, serde::Serialize)] /// #[derive(serde::Deserialize, serde::Serialize)]
@ -30,15 +31,15 @@ use std::{future::Future, pin::Pin};
/// count: i32, /// count: i32,
/// } /// }
/// ///
/// #[async_trait::async_trait]
/// impl Job for MyJob { /// impl Job for MyJob {
/// type Processor = MyProcessor; /// type Processor = MyProcessor;
/// type State = (); /// type State = ();
/// type Future = Ready<Result<(), Error>>;
/// ///
/// async fn run(self, _state: Self::State) -> Result<(), Error> { /// fn run(self, _state: Self::State) -> Self::Future {
/// info!("Processing {}", self.count); /// info!("Processing {}", self.count);
/// ///
/// Ok(()) /// ok(())
/// } /// }
/// } /// }
/// ///

View file

@ -30,10 +30,9 @@
//! [dependencies] //! [dependencies]
//! actix = "0.8" //! actix = "0.8"
//! background-jobs = "0.6.0" //! background-jobs = "0.6.0"
//! failure = "0.1" //! anyhow = "1.0"
//! futures = "0.1" //! futures = "0.1"
//! serde = "1.0" //! serde = { version = "1.0", features = ["derive"] }
//! serde_drive = "1.0"
//! sled = "0.28" //! sled = "0.28"
//! ``` //! ```
//! //!
@ -42,11 +41,11 @@
//! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. //! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
//! //!
//! ```rust,ignore //! ```rust,ignore
//! use anyhow::Error;
//! use background_jobs::Job; //! use background_jobs::Job;
//! use serde_derive::{Deserialize, Serialize}; //! use futures::future::{ok, Ready};
//! use failure::Error;
//! //!
//! #[derive(Clone, Debug, Deserialize, Serialize)] //! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
//! pub struct MyJob { //! pub struct MyJob {
//! some_usize: usize, //! some_usize: usize,
//! other_usize: usize, //! other_usize: usize,
@ -64,12 +63,12 @@
//! impl Job for MyJob { //! impl Job for MyJob {
//! type Processor = MyProcessor; // We'll define this later //! type Processor = MyProcessor; // We'll define this later
//! type State = (); //! type State = ();
//! type Future = Result<(), Error>; //! type Future = Ready<Result<(), Error>>;
//! //!
//! fn run(self, _: Self::State) -> Self::Future { //! fn run(self, _: Self::State) -> Self::Future {
//! println!("args: {:?}", self); //! println!("args: {:?}", self);
//! //!
//! Ok(()) //! ok(())
//! } //! }
//! } //! }
//! ``` //! ```
@ -82,7 +81,8 @@
//! Let's re-define the job to care about some application state. //! Let's re-define the job to care about some application state.
//! //!
//! ```rust,ignore //! ```rust,ignore
//! # use failure::Error; //! use anyhow::Error;
//!
//! #[derive(Clone, Debug)] //! #[derive(Clone, Debug)]
//! pub struct MyState { //! pub struct MyState {
//! pub app_name: String, //! pub app_name: String,
@ -99,12 +99,12 @@
//! impl Job for MyJob { //! impl Job for MyJob {
//! type Processor = MyProcessor; //! type Processor = MyProcessor;
//! type State = MyState; //! type State = MyState;
//! type Future = Result<(), Error>; //! type Future = Ready<Result<(), Error>>;
//! //!
//! fn run(self, state: Self::State) -> Self::Future { //! fn run(self, state: Self::State) -> Self::Future {
//! info!("{}: args, {:?}", state.app_name, self); //! info!("{}: args, {:?}", state.app_name, self);
//! //!
//! Ok(()) //! ok(())
//! } //! }
//! } //! }
//! ``` //! ```
@ -164,15 +164,12 @@
//! //!
//! ##### Main //! ##### Main
//! ```rust,ignore //! ```rust,ignore
//! use actix::System; //! use anyhow::Error;
//! use background_jobs::{ServerConfig, sled_storage::Storage, WorkerConfig}; //! use background_jobs::{ServerConfig, sled_storage::Storage, WorkerConfig};
//! use failure::Error;
//! use sled::Db; //! use sled::Db;
//! //!
//! fn main() -> Result<(), Error> { //! #[actix_rt::main]
//! // First set up the Actix System to ensure we have a runtime to spawn jobs on. //! async fn main() -> Result<(), Error> {
//! let sys = System::new("my-actix-system");
//!
//! // Set up our Storage //! // Set up our Storage
//! let db = Db::start_default("my-sled-db")?; //! let db = Db::start_default("my-sled-db")?;
//! let storage = Storage::new(db)?; //! let storage = Storage::new(db)?;
@ -192,7 +189,7 @@
//! queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?; //! queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
//! //!
//! // Block on Actix //! // Block on Actix
//! sys.run()?; //! actix_rt::signal::ctrl_c().await?;
//! Ok(()) //! Ok(())
//! } //! }
//! ``` //! ```