diff --git a/Cargo.toml b/Cargo.toml index 21d5430..3a152ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,9 @@ members = [ "jobs-core", "jobs-server-tokio", "jobs-tokio", - "examples/tokio-jobs-example", "examples/actix-jobs-example", + "examples/server-jobs-example", + "examples/tokio-jobs-example", ] [features] diff --git a/examples/server-jobs-example/.env b/examples/server-jobs-example/.env new file mode 100644 index 0000000..0e7a139 --- /dev/null +++ b/examples/server-jobs-example/.env @@ -0,0 +1 @@ +RUST_LOG=server_jobs_example,jobs_server_tokio=trace diff --git a/examples/server-jobs-example/Cargo.toml b/examples/server-jobs-example/Cargo.toml new file mode 100644 index 0000000..8829764 --- /dev/null +++ b/examples/server-jobs-example/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "server-jobs-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +[dependencies] +dotenv = "0.13" +env_logger = "0.5" +failure = "0.1" +futures = "0.1" +log = "0.4" +serde = "1.0" +serde_derive = "1.0" +tokio = "0.1" + +[dependencies.jobs] +version = "0.1" +path = "../.." +default-features = false +features = ["jobs-server-tokio"] diff --git a/examples/server-jobs-example/example-db/data.mdb b/examples/server-jobs-example/example-db/data.mdb new file mode 100644 index 0000000..a931091 Binary files /dev/null and b/examples/server-jobs-example/example-db/data.mdb differ diff --git a/examples/server-jobs-example/example-db/lock.mdb b/examples/server-jobs-example/example-db/lock.mdb new file mode 100644 index 0000000..377cf45 Binary files /dev/null and b/examples/server-jobs-example/example-db/lock.mdb differ diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs new file mode 100644 index 0000000..89db98b --- /dev/null +++ b/examples/server-jobs-example/src/bin/server.rs @@ -0,0 +1,13 @@ +use failure::Error; +use jobs::ServerConfig; + +fn main() -> Result<(), Error> { + dotenv::dotenv().ok(); + env_logger::init(); + + let config = ServerConfig::init("127.0.0.1", 5555, 1234, 1, "example-db")?; + + tokio::run(config.run()); + + Ok(()) +} diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs new file mode 100644 index 0000000..2830efe --- /dev/null +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -0,0 +1,25 @@ +use futures::{future::lazy, Future}; +use jobs::{Processor, SpawnerConfig}; +use server_jobs_example::{MyJobArguments, MyProcessor}; + +fn main() { + let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { + acc.push(MyJobArguments::new(x, y)); + + (y, x + y, acc) + }); + + let spawner = SpawnerConfig::new("localhost", 5555); + + tokio::run(lazy(move || { + for job in jobs { + tokio::spawn( + spawner + .queue(MyProcessor::new_job(job, None, None).unwrap()) + .map_err(|_| ()), + ); + } + + Ok(()) + })); +} diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs new file mode 100644 index 0000000..04cc94b --- /dev/null +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -0,0 +1,13 @@ +use failure::Error; +use jobs::ClientConfig; +use server_jobs_example::MyProcessor; + +fn main() -> Result<(), Error> { + let mut client = ClientConfig::init(16, "localhost", 5555)?; + + client.register_processor(MyProcessor); + + tokio::run(client.run()); + + Ok(()) +} diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs new file mode 100644 index 0000000..ebd9433 --- /dev/null +++ b/examples/server-jobs-example/src/lib.rs @@ -0,0 +1,46 @@ +#[macro_use] +extern crate serde_derive; + +use failure::Error; +use futures::{future::IntoFuture, Future}; +use jobs::{Backoff, MaxRetries, Processor}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MyJobArguments { + some_usize: usize, + other_usize: usize, +} + +impl MyJobArguments { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJobArguments { + some_usize, + other_usize, + } + } +} + +#[derive(Clone, Debug)] +pub struct MyProcessor; + +impl Processor for MyProcessor { + type Arguments = MyJobArguments; + + fn name() -> &'static str { + "MyProcessor" + } + + fn max_retries() -> MaxRetries { + MaxRetries::Count(1) + } + + fn backoff_strategy() -> Backoff { + Backoff::Exponential(2) + } + + fn process(&self, args: Self::Arguments) -> Box + Send> { + println!("args: {:?}", args); + + Box::new(Ok(()).into_future()) + } +} diff --git a/examples/tokio-jobs-example/src/main.rs b/examples/tokio-jobs-example/src/main.rs index e9826c1..f784fba 100644 --- a/examples/tokio-jobs-example/src/main.rs +++ b/examples/tokio-jobs-example/src/main.rs @@ -47,21 +47,21 @@ fn main() { dotenv::dotenv().ok(); env_logger::init(); - tokio::run(lazy(|| { + let (_, _, jobs) = (1..18).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { + acc.push(MyJobArguments { + some_usize: x, + other_usize: y, + }); + + (y, x + y, acc) + }); + + tokio::run(lazy(move || { let mut runner = JobRunner::new(1234, 4, "example-db"); runner.register_processor(MyProcessor); let handle = runner.spawn(); - let (_, _, jobs) = (1..18).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { - acc.push(MyJobArguments { - some_usize: x, - other_usize: y, - }); - - (y, x + y, acc) - }); - for job in jobs { tokio::spawn( handle diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index e7861b1..10a112d 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -44,6 +44,8 @@ impl Storage { let mut manager = Manager::new(); let mut cfg = Config::default(path); + cfg.set_max_readers(18); + // Create our buckets for bucket in Storage::buckets().iter() { cfg.bucket(bucket, None); @@ -150,6 +152,8 @@ impl Storage { trace!("Committing"); + read_txn.commit()?; + txn.commit()?; trace!("Committed"); diff --git a/jobs-server-tokio/src/client.rs b/jobs-server-tokio/src/client.rs index 43a2b97..d4d71b2 100644 --- a/jobs-server-tokio/src/client.rs +++ b/jobs-server-tokio/src/client.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use failure::Error; use futures::{ @@ -6,6 +6,7 @@ use futures::{ Future, Stream, }; use jobs_core::{Processor, Processors}; +use tokio::timer::Delay; use tokio_zmq::{prelude::*, Multipart, Req}; use zmq::{Context, Message}; @@ -133,11 +134,17 @@ fn process_response( response: ServerResponse, processors: &Processors, ) -> impl Future { + let either_a = Either::A( + Delay::new(tokio::clock::now() + Duration::from_millis(500)) + .from_err() + .and_then(|_| Ok(ServerRequest::FetchJobs(1))), + ); + match response { ServerResponse::FetchJobs(jobs) => { let job = match jobs.into_iter().next() { Some(job) => job, - None => return Either::A(Ok(ServerRequest::FetchJobs(1)).into_future()), + None => return either_a, }; let fut = processors @@ -147,7 +154,10 @@ fn process_response( Either::B(fut) } - _ => return Either::A(Ok(ServerRequest::FetchJobs(1)).into_future()), + e => { + error!("Error from server, {:?}", e); + return either_a; + } } } diff --git a/jobs-server-tokio/src/lib.rs b/jobs-server-tokio/src/lib.rs index 826aace..869edbf 100644 --- a/jobs-server-tokio/src/lib.rs +++ b/jobs-server-tokio/src/lib.rs @@ -9,10 +9,12 @@ use failure::Error; mod client; mod server; +mod spawner; pub use crate::{ client::ClientConfig, server::{ServerConfig, ServerRequest, ServerResponse}, + spawner::SpawnerConfig, }; fn coerce(res: Result, F>) -> Result diff --git a/jobs-server-tokio/src/spawner.rs b/jobs-server-tokio/src/spawner.rs new file mode 100644 index 0000000..fd9d0ad --- /dev/null +++ b/jobs-server-tokio/src/spawner.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use failure::Error; +use futures::{future::IntoFuture, Future}; +use jobs_core::JobInfo; +use tokio_zmq::{prelude::*, Req}; +use zmq::{Context, Message}; + +use crate::ServerRequest; + +pub struct SpawnerConfig { + server: String, + ctx: Arc, +} + +impl SpawnerConfig { + pub fn new(server_host: &str, server_port: usize) -> Self { + let ctx = Arc::new(Context::new()); + + SpawnerConfig { + server: format!("tcp://{}:{}", server_host, server_port), + ctx, + } + } + + pub fn queue(&self, job: JobInfo) -> impl Future { + let msg = serde_json::to_string(&ServerRequest::ReturnJob(job)) + .map_err(Error::from) + .and_then(|s| { + Message::from_slice(s.as_ref()) + .map(|m| m.into()) + .map_err(Error::from) + }) + .into_future(); + + Req::builder(self.ctx.clone()) + .connect(&self.server) + .build() + .into_future() + .from_err() + .join(msg) + .and_then(move |(req, msg)| { + req.send(msg) + .from_err() + .and_then(|req| req.recv().from_err()) + .map(|_| ()) + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 87aba3c..279fd40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,6 @@ pub use jobs_tokio::{JobRunner, ProcessorHandle}; #[cfg(feature = "jobs-actix")] pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob}; + +#[cfg(feature = "jobs-server-tokio")] +pub use jobs_server_tokio::{ClientConfig, ServerConfig, SpawnerConfig};