From ffa61b3c33bd7994b7173272b717cef3f00394fc Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 27 May 2019 19:23:25 -0500 Subject: [PATCH] Add Every, a tool to create recurring jobs --- jobs-actix/src/every.rs | 54 +++++++++++++++++++++++++++++++++++++++++ jobs-actix/src/lib.rs | 4 ++- src/lib.rs | 2 +- 3 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 jobs-actix/src/every.rs diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs new file mode 100644 index 0000000..d826db6 --- /dev/null +++ b/jobs-actix/src/every.rs @@ -0,0 +1,54 @@ +use std::time::Duration; + +use super::{Job, QueueHandle}; +use actix::{Actor, AsyncContext, Context}; +use log::error; + +/// A type used to schedule recurring jobs. +/// +/// ```rust,ignore +/// let server = ServerConfig::new(storage).start(); +/// Every::new(server, Duration::from_secs(60 * 30), MyJob::new()).start(); +/// ``` +pub struct Every +where + J: Job + Clone + 'static, +{ + spawner: QueueHandle, + duration: Duration, + job: J, +} + +impl Every +where + J: Job + Clone + 'static, +{ + pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self { + Every { + spawner, + duration, + job, + } + } +} + +impl Actor for Every +where + J: Job + Clone + 'static, +{ + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + match self.spawner.queue(self.job.clone()) { + Ok(_) => (), + Err(_) => error!("Failed to queue job"), + }; + + ctx.run_interval(self.duration.clone(), move |actor, _| { + match actor.spawner.queue(actor.job.clone()) { + Ok(_) => (), + Err(_) => error!("Failed to queue job"), + } + }); + } +} diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 8a24ed2..8d3e3f2 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -5,11 +5,13 @@ use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; use failure::Error; use futures::Future; +mod every; mod pinger; mod server; mod storage; mod worker; -pub use self::{server::Server, worker::LocalWorker}; + +pub use self::{every::Every, server::Server, worker::LocalWorker}; use self::{ pinger::Pinger, diff --git a/src/lib.rs b/src/lib.rs index b503938..c5a733a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -202,7 +202,7 @@ pub use background_jobs_core::{ }; #[cfg(feature = "background-jobs-actix")] -pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig}; +pub use background_jobs_actix::{Every, QueueHandle, ServerConfig, WorkerConfig}; #[cfg(feature = "background-jobs-sled-storage")] pub mod sled_storage {