diff --git a/Cargo.toml b/Cargo.toml index 086e53c..fee7624 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,22 +6,23 @@ edition = "2018" [workspace] members = [ + "jobs-actix", "jobs-core", - "jobs-executor", "jobs-tokio", - "examples/process-jobs", + "examples/tokio-jobs-example", + "examples/actix-jobs-example", ] [features] -default = ["jobs-tokio"] +default = ["jobs-tokio", "jobs-actix"] [dependencies.jobs-core] version = "0.1" path = "jobs-core" -[dependencies.jobs-executor] +[dependencies.jobs-actix] version = "0.1" -path = "jobs-executor" +path = "jobs-actix" optional = true [dependencies.jobs-tokio] diff --git a/examples/actix-jobs-example/.env b/examples/actix-jobs-example/.env new file mode 100644 index 0000000..41e2ab9 --- /dev/null +++ b/examples/actix-jobs-example/.env @@ -0,0 +1 @@ +RUST_LOG=actix_jobs_example,jobs_actix=trace diff --git a/examples/process-jobs/.gitignore b/examples/actix-jobs-example/.gitignore similarity index 100% rename from examples/process-jobs/.gitignore rename to examples/actix-jobs-example/.gitignore diff --git a/examples/actix-jobs-example/Cargo.toml b/examples/actix-jobs-example/Cargo.toml new file mode 100644 index 0000000..7ca1edc --- /dev/null +++ b/examples/actix-jobs-example/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "actix-jobs-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +[dependencies] +actix = "0.7" +dotenv = "0.13" +env_logger = "0.5" +failure = "0.1" +futures = "0.1" +log = "0.4" +serde = "1.0" +serde_derive = "1.0" + +[dependencies.jobs] +version = "0.1" +path = "../.." +default-features = false +features = ["jobs-actix"] diff --git a/examples/actix-jobs-example/src/main.rs b/examples/actix-jobs-example/src/main.rs new file mode 100644 index 0000000..6b63441 --- /dev/null +++ b/examples/actix-jobs-example/src/main.rs @@ -0,0 +1,99 @@ +#[macro_use] +extern crate log; +#[macro_use] +extern crate serde_derive; + +use failure::Error; +use futures::{future::IntoFuture, Future}; +use jobs::{Backoff, JobsBuilder, MaxRetries, Processor, QueueJob}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct MyJobArguments { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug)] +struct MyProcessor; + +impl Processor for MyProcessor { + type Arguments = MyJobArguments; + + fn name() -> &'static str { + "MyProcessor" + } + + fn max_retries() -> MaxRetries { + MaxRetries::Count(1) + } + + fn backoff_strategy() -> Backoff { + Backoff::Exponential(2) + } + + fn process(&self, args: Self::Arguments) -> Box + Send> { + info!("args: {:?}", args); + + Box::new(Ok(()).into_future()) + } +} + +fn main() -> Result<(), Error> { + dotenv::dotenv().ok(); + env_logger::init(); + + let sys = actix::System::new("jobs-system"); + + let mut builder = JobsBuilder::new(1234, 4, "example-db"); + + builder.register_processor(MyProcessor); + + let jobs_actor = builder.build()?; + + let jobs = vec![ + MyJobArguments { + some_usize: 0, + other_usize: 1, + }, + MyJobArguments { + some_usize: 1, + other_usize: 2, + }, + MyJobArguments { + some_usize: 3, + other_usize: 5, + }, + MyJobArguments { + some_usize: 8, + other_usize: 13, + }, + MyJobArguments { + some_usize: 21, + other_usize: 34, + }, + MyJobArguments { + some_usize: 55, + other_usize: 89, + }, + MyJobArguments { + some_usize: 144, + other_usize: 233, + }, + MyJobArguments { + some_usize: 377, + other_usize: 610, + }, + MyJobArguments { + some_usize: 987, + other_usize: 1597, + }, + ]; + + for job in jobs { + jobs_actor.do_send(QueueJob(MyProcessor::new_job(job, None, None)?)); + } + + let _ = sys.run(); + + Ok(()) +} diff --git a/examples/process-jobs/.env b/examples/process-jobs/.env deleted file mode 100644 index 0d3ea5c..0000000 --- a/examples/process-jobs/.env +++ /dev/null @@ -1 +0,0 @@ -RUST_LOG=jobs_tokio,process_jobs=trace diff --git a/examples/process-jobs/src/main.rs b/examples/process-jobs/src/main.rs deleted file mode 100644 index 5e64d37..0000000 --- a/examples/process-jobs/src/main.rs +++ /dev/null @@ -1,115 +0,0 @@ -#[macro_use] -extern crate serde_derive; - -use std::time::Duration; - -use failure::Error; -use futures::{ - future::{lazy, IntoFuture}, - Future, -}; -use jobs::{Backoff, JobRunner, MaxRetries, Processor}; - -#[derive(Clone, Debug, Deserialize, Serialize)] -struct MyJobArguments { - some_usize: usize, - other_usize: usize, -} - -struct MyProcessor; - -impl Processor for MyProcessor { - type Arguments = MyJobArguments; - - fn name() -> &'static str { - "MyProcessor" - } - - fn max_retries() -> MaxRetries { - MaxRetries::Count(1) - } - - fn backoff_strategy() -> Backoff { - Backoff::Exponential(2) - } - - fn process(&self, args: Self::Arguments) -> Box + Send> { - println!("args: {:?}", args); - - Box::new(Ok(()).into_future()) - } -} - -fn main() { - dotenv::dotenv().ok(); - env_logger::init(); - - tokio::run( - lazy(|| { - let mut runner = JobRunner::new(1234, 4, "example-db"); - runner.register_processor(MyProcessor); - - let handle = runner.spawn(); - - let jobs = vec![ - MyJobArguments { - some_usize: 0, - other_usize: 1, - }, - MyJobArguments { - some_usize: 1, - other_usize: 2, - }, - MyJobArguments { - some_usize: 3, - other_usize: 5, - }, - MyJobArguments { - some_usize: 8, - other_usize: 13, - }, - MyJobArguments { - some_usize: 21, - other_usize: 34, - }, - MyJobArguments { - some_usize: 55, - other_usize: 89, - }, - MyJobArguments { - some_usize: 144, - other_usize: 233, - }, - MyJobArguments { - some_usize: 377, - other_usize: 610, - }, - MyJobArguments { - some_usize: 987, - other_usize: 1597, - }, - ]; - - let _: Vec<_> = jobs - .into_iter() - .map(|job| { - tokio::spawn( - handle - .queue(MyProcessor::new_job(job, None, None).unwrap()) - .then(|_| Ok(())), - ); - }) - .collect(); - - Ok(handle) - }) - .and_then(|handle| { - tokio::timer::Delay::new(tokio::clock::now() + Duration::from_secs(2)) - .map(move |_| { - let _ = handle; - () - }) - .map_err(|_| ()) - }), - ); -} diff --git a/examples/tokio-jobs-example/.env b/examples/tokio-jobs-example/.env new file mode 100644 index 0000000..eb6e3b7 --- /dev/null +++ b/examples/tokio-jobs-example/.env @@ -0,0 +1 @@ +RUST_LOG=tokio_jobs_example,jobs_tokio=trace diff --git a/examples/tokio-jobs-example/.gitignore b/examples/tokio-jobs-example/.gitignore new file mode 100644 index 0000000..ac25cf6 --- /dev/null +++ b/examples/tokio-jobs-example/.gitignore @@ -0,0 +1 @@ +example-db diff --git a/examples/process-jobs/Cargo.toml b/examples/tokio-jobs-example/Cargo.toml similarity index 85% rename from examples/process-jobs/Cargo.toml rename to examples/tokio-jobs-example/Cargo.toml index 1348753..0ab0fa4 100644 --- a/examples/process-jobs/Cargo.toml +++ b/examples/tokio-jobs-example/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "process-jobs" +name = "tokio-jobs-example" version = "0.1.0" authors = ["asonix "] edition = "2018" @@ -17,4 +17,5 @@ tokio = "0.1" [dependencies.jobs] version = "0.1" path = "../.." +default-features = false features = ["jobs-tokio"] diff --git a/examples/tokio-jobs-example/src/main.rs b/examples/tokio-jobs-example/src/main.rs new file mode 100644 index 0000000..c77b907 --- /dev/null +++ b/examples/tokio-jobs-example/src/main.rs @@ -0,0 +1,110 @@ +#[macro_use] +extern crate log; +#[macro_use] +extern crate serde_derive; + +use std::time::Duration; + +use failure::Error; +use futures::{ + future::{lazy, IntoFuture}, + Future, +}; +use jobs::{Backoff, JobRunner, MaxRetries, Processor}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct MyJobArguments { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug)] +struct MyProcessor; + +impl Processor for MyProcessor { + type Arguments = MyJobArguments; + + fn name() -> &'static str { + "MyProcessor" + } + + fn max_retries() -> MaxRetries { + MaxRetries::Count(1) + } + + fn backoff_strategy() -> Backoff { + Backoff::Exponential(2) + } + + fn process(&self, args: Self::Arguments) -> Box + Send> { + info!("args: {:?}", args); + + Box::new(Ok(()).into_future()) + } +} + +fn main() { + dotenv::dotenv().ok(); + env_logger::init(); + + tokio::run(lazy(|| { + let mut runner = JobRunner::new(1234, 4, "example-db"); + runner.register_processor(MyProcessor); + + let handle = runner.spawn(); + + let jobs = vec![ + MyJobArguments { + some_usize: 0, + other_usize: 1, + }, + MyJobArguments { + some_usize: 1, + other_usize: 2, + }, + MyJobArguments { + some_usize: 3, + other_usize: 5, + }, + MyJobArguments { + some_usize: 8, + other_usize: 13, + }, + MyJobArguments { + some_usize: 21, + other_usize: 34, + }, + MyJobArguments { + some_usize: 55, + other_usize: 89, + }, + MyJobArguments { + some_usize: 144, + other_usize: 233, + }, + MyJobArguments { + some_usize: 377, + other_usize: 610, + }, + MyJobArguments { + some_usize: 987, + other_usize: 1597, + }, + ]; + + for job in jobs { + tokio::spawn( + handle + .queue(MyProcessor::new_job(job, None, None).unwrap()) + .then(|_| Ok(())), + ); + } + + tokio::timer::Delay::new(tokio::clock::now() + Duration::from_secs(1)) + .map(move |_| { + let _ = handle; + () + }) + .map_err(|_| ()) + })); +} diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml new file mode 100644 index 0000000..083191b --- /dev/null +++ b/jobs-actix/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "jobs-actix" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +[dependencies] +actix = "0.7" +failure = "0.1" +futures = "0.1" +log = "0.4" + +[dependencies.jobs-core] +version = "0.1" +path = "../jobs-core" diff --git a/jobs-executor/LICENSE b/jobs-actix/LICENSE similarity index 100% rename from jobs-executor/LICENSE rename to jobs-actix/LICENSE diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs new file mode 100644 index 0000000..7d74213 --- /dev/null +++ b/jobs-actix/src/lib.rs @@ -0,0 +1,267 @@ +#[macro_use] +extern crate failure; +#[macro_use] +extern crate log; + +use std::{ + fs::create_dir_all, + path::{Path, PathBuf}, + time::Duration, +}; + +use actix::{ + fut::wrap_future, utils::IntervalFunc, Actor, ActorFuture, ActorStream, Addr, AsyncContext, + Context, ContextFutureSpawner, Handler, Message, ResponseFuture, SyncArbiter, SyncContext, +}; +use failure::Error; +use futures::Future; +use jobs_core::{storage::Storage, JobInfo, Processor, Processors}; + +fn coerce(res: Result, F>) -> Result +where + F: Into, +{ + match res { + Ok(inner_res) => inner_res, + Err(f) => Err(f.into()), + } +} + +#[derive(Clone)] +pub struct KvActor { + storage: Storage, +} + +impl KvActor { + pub fn init(runner_id: usize, db_path: PathBuf) -> Result { + create_dir_all(db_path.clone())?; + + let storage = Storage::init(runner_id, db_path)?; + + let actor = KvActor { storage }; + + Ok(actor) + } + + pub fn store_job(&self, job: JobInfo) -> Result<(), Error> { + self.storage.store_job(job)?; + + Ok(()) + } + + pub fn dequeue_jobs(&self, limit: usize) -> Result, Error> { + let jobs = self.storage.dequeue_job(limit)?; + + Ok(jobs) + } +} + +impl Actor for KvActor { + type Context = SyncContext; +} + +impl Handler for KvActor { + type Result = Result<(), Error>; + + fn handle(&mut self, msg: StoreJob, _: &mut Self::Context) -> Self::Result { + self.store_job(msg.0) + } +} + +impl Handler for KvActor { + type Result = Result, Error>; + + fn handle(&mut self, msg: DequeueJobs, _: &mut Self::Context) -> Self::Result { + self.dequeue_jobs(msg.0) + } +} + +#[derive(Debug)] +pub struct StoreJob(JobInfo); + +impl Message for StoreJob { + type Result = Result<(), Error>; +} + +#[derive(Debug)] +pub struct DequeueJobs(usize); + +impl Message for DequeueJobs { + type Result = Result, Error>; +} + +pub struct JobsActor { + store: Addr, +} + +impl JobsActor { + fn new(store: Addr) -> Self { + JobsActor { store } + } + + fn store_job(&mut self, job: JobInfo) -> impl Future { + self.store.send(StoreJob(job)).then(coerce) + } +} + +impl Actor for JobsActor { + type Context = Context; +} + +impl Handler for JobsActor { + type Result = ResponseFuture<(), Error>; + + fn handle(&mut self, msg: QueueJob, _: &mut Self::Context) -> Self::Result { + Box::new(self.store_job(msg.0)) + } +} + +#[derive(Debug)] +pub struct QueueJob(pub JobInfo); + +impl Message for QueueJob { + type Result = Result<(), Error>; +} + +pub struct ProcessorActor { + processors: Processors, + store: Addr, +} + +impl ProcessorActor { + fn new(processors: Processors, store: Addr) -> Self { + ProcessorActor { processors, store } + } + + fn process_job(&mut self, ctx: &mut Context) { + Self::fetch_job((), self, ctx) + .and_then(Self::run_job) + .and_then(Self::return_job) + .map(Self::call_process_job) + .spawn(ctx); + } + + fn fetch_job( + _: (), + actor: &mut Self, + _: &mut Context, + ) -> impl ActorFuture { + wrap_future( + actor + .store + .send(DequeueJobs(1)) + .then(coerce) + .map_err(|e| error!("Error fetching jobs, {}", e)) + .and_then(|jobs| jobs.into_iter().next().ok_or(())), + ) + } + + fn run_job( + job: JobInfo, + actor: &mut Self, + _: &mut Context, + ) -> impl ActorFuture { + wrap_future(actor.processors.process_job(job)) + } + + fn return_job( + job: JobInfo, + actor: &mut Self, + _: &mut Context, + ) -> impl ActorFuture { + wrap_future( + actor + .store + .send(StoreJob(job)) + .then(coerce) + .map_err(|e| error!("Error returning jobs, {}", e)), + ) + } + + fn call_process_job(_: (), _: &mut Self, ctx: &mut Context) { + ctx.address().do_send(ProcessJob); + } +} + +impl Actor for ProcessorActor { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + IntervalFunc::new(Duration::from_millis(500), Self::process_job) + .finish() + .spawn(ctx); + } +} + +impl Handler for ProcessorActor { + type Result = (); + + fn handle(&mut self, _: ProcessJob, ctx: &mut Self::Context) -> Self::Result { + self.process_job(ctx) + } +} + +struct ProcessJob; + +impl Message for ProcessJob { + type Result = (); +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "No jobs to process")] +pub struct NoJobs; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error processing jobs")] +pub struct ProcessError; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error in Interval")] +pub struct IntervalError; + +pub struct JobsBuilder { + num_processors: usize, + runner_id: usize, + db_path: PathBuf, + processors: Vec, +} + +impl JobsBuilder { + pub fn new>(runner_id: usize, num_processors: usize, db_path: P) -> Self { + JobsBuilder { + num_processors, + runner_id, + db_path: db_path.as_ref().to_owned(), + processors: (0..num_processors).map(|_| Processors::new()).collect(), + } + } + + pub fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + for processors in self.processors.iter_mut() { + processors.register_processor(processor.clone()); + } + } + + pub fn build(self) -> Result, Error> { + let JobsBuilder { + num_processors, + runner_id, + db_path, + processors, + } = self; + + let kv_actor = KvActor::init(runner_id, db_path)?; + let store = SyncArbiter::start(num_processors + 1, move || kv_actor.clone()); + + for processors in processors { + ProcessorActor::new(processors, store.clone()).start(); + } + + let actor = JobsActor::new(store).start(); + + Ok(actor) + } +} diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index f426d2c..b8c9ad2 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -28,7 +28,7 @@ pub enum JobError { /// The Processor trait /// /// Processors define the logic for executing jobs -pub trait Processor { +pub trait Processor: Clone { type Arguments: Serialize + DeserializeOwned; /// The name of the processor diff --git a/jobs-executor/Cargo.toml b/jobs-executor/Cargo.toml deleted file mode 100644 index 93a62c1..0000000 --- a/jobs-executor/Cargo.toml +++ /dev/null @@ -1,7 +0,0 @@ -[package] -name = "jobs-executor" -version = "0.1.0" -authors = ["asonix "] -edition = "2018" - -[dependencies] diff --git a/jobs-executor/src/lib.rs b/jobs-executor/src/lib.rs deleted file mode 100644 index 31e1bb2..0000000 --- a/jobs-executor/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index 0f41f6e..e00e4ed 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -242,11 +242,3 @@ impl JobRunner { }) } } - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/src/lib.rs b/src/lib.rs index 37a447d..977ffdb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,6 @@ pub use jobs_core::{ #[cfg(feature = "jobs-tokio")] pub use jobs_tokio::{JobRunner, ProcessorHandle}; + +#[cfg(feature = "jobs-actix")] +pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob};