Make jobs processor accessible over a network

This commit is contained in:
asonix 2018-11-07 23:47:11 -06:00
parent 77fa5c0df7
commit e70a6b80e4
15 changed files with 202 additions and 14 deletions

View file

@ -10,8 +10,9 @@ members = [
"jobs-core", "jobs-core",
"jobs-server-tokio", "jobs-server-tokio",
"jobs-tokio", "jobs-tokio",
"examples/tokio-jobs-example",
"examples/actix-jobs-example", "examples/actix-jobs-example",
"examples/server-jobs-example",
"examples/tokio-jobs-example",
] ]
[features] [features]

View file

@ -0,0 +1 @@
RUST_LOG=server_jobs_example,jobs_server_tokio=trace

View file

@ -0,0 +1,21 @@
[package]
name = "server-jobs-example"
version = "0.1.0"
authors = ["asonix <asonix@asonix.dog>"]
edition = "2018"
[dependencies]
dotenv = "0.13"
env_logger = "0.5"
failure = "0.1"
futures = "0.1"
log = "0.4"
serde = "1.0"
serde_derive = "1.0"
tokio = "0.1"
[dependencies.jobs]
version = "0.1"
path = "../.."
default-features = false
features = ["jobs-server-tokio"]

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,13 @@
use failure::Error;
use jobs::ServerConfig;
fn main() -> Result<(), Error> {
dotenv::dotenv().ok();
env_logger::init();
let config = ServerConfig::init("127.0.0.1", 5555, 1234, 1, "example-db")?;
tokio::run(config.run());
Ok(())
}

View file

@ -0,0 +1,25 @@
use futures::{future::lazy, Future};
use jobs::{Processor, SpawnerConfig};
use server_jobs_example::{MyJobArguments, MyProcessor};
fn main() {
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
acc.push(MyJobArguments::new(x, y));
(y, x + y, acc)
});
let spawner = SpawnerConfig::new("localhost", 5555);
tokio::run(lazy(move || {
for job in jobs {
tokio::spawn(
spawner
.queue(MyProcessor::new_job(job, None, None).unwrap())
.map_err(|_| ()),
);
}
Ok(())
}));
}

View file

@ -0,0 +1,13 @@
use failure::Error;
use jobs::ClientConfig;
use server_jobs_example::MyProcessor;
fn main() -> Result<(), Error> {
let mut client = ClientConfig::init(16, "localhost", 5555)?;
client.register_processor(MyProcessor);
tokio::run(client.run());
Ok(())
}

View file

@ -0,0 +1,46 @@
#[macro_use]
extern crate serde_derive;
use failure::Error;
use futures::{future::IntoFuture, Future};
use jobs::{Backoff, MaxRetries, Processor};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MyJobArguments {
some_usize: usize,
other_usize: usize,
}
impl MyJobArguments {
pub fn new(some_usize: usize, other_usize: usize) -> Self {
MyJobArguments {
some_usize,
other_usize,
}
}
}
#[derive(Clone, Debug)]
pub struct MyProcessor;
impl Processor for MyProcessor {
type Arguments = MyJobArguments;
fn name() -> &'static str {
"MyProcessor"
}
fn max_retries() -> MaxRetries {
MaxRetries::Count(1)
}
fn backoff_strategy() -> Backoff {
Backoff::Exponential(2)
}
fn process(&self, args: Self::Arguments) -> Box<dyn Future<Item = (), Error = Error> + Send> {
println!("args: {:?}", args);
Box::new(Ok(()).into_future())
}
}

View file

@ -47,21 +47,21 @@ fn main() {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
env_logger::init(); env_logger::init();
tokio::run(lazy(|| { let (_, _, jobs) = (1..18).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
acc.push(MyJobArguments {
some_usize: x,
other_usize: y,
});
(y, x + y, acc)
});
tokio::run(lazy(move || {
let mut runner = JobRunner::new(1234, 4, "example-db"); let mut runner = JobRunner::new(1234, 4, "example-db");
runner.register_processor(MyProcessor); runner.register_processor(MyProcessor);
let handle = runner.spawn(); let handle = runner.spawn();
let (_, _, jobs) = (1..18).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
acc.push(MyJobArguments {
some_usize: x,
other_usize: y,
});
(y, x + y, acc)
});
for job in jobs { for job in jobs {
tokio::spawn( tokio::spawn(
handle handle

View file

@ -44,6 +44,8 @@ impl Storage {
let mut manager = Manager::new(); let mut manager = Manager::new();
let mut cfg = Config::default(path); let mut cfg = Config::default(path);
cfg.set_max_readers(18);
// Create our buckets // Create our buckets
for bucket in Storage::buckets().iter() { for bucket in Storage::buckets().iter() {
cfg.bucket(bucket, None); cfg.bucket(bucket, None);
@ -150,6 +152,8 @@ impl Storage {
trace!("Committing"); trace!("Committing");
read_txn.commit()?;
txn.commit()?; txn.commit()?;
trace!("Committed"); trace!("Committed");

View file

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use failure::Error; use failure::Error;
use futures::{ use futures::{
@ -6,6 +6,7 @@ use futures::{
Future, Stream, Future, Stream,
}; };
use jobs_core::{Processor, Processors}; use jobs_core::{Processor, Processors};
use tokio::timer::Delay;
use tokio_zmq::{prelude::*, Multipart, Req}; use tokio_zmq::{prelude::*, Multipart, Req};
use zmq::{Context, Message}; use zmq::{Context, Message};
@ -133,11 +134,17 @@ fn process_response(
response: ServerResponse, response: ServerResponse,
processors: &Processors, processors: &Processors,
) -> impl Future<Item = ServerRequest, Error = Error> { ) -> impl Future<Item = ServerRequest, Error = Error> {
let either_a = Either::A(
Delay::new(tokio::clock::now() + Duration::from_millis(500))
.from_err()
.and_then(|_| Ok(ServerRequest::FetchJobs(1))),
);
match response { match response {
ServerResponse::FetchJobs(jobs) => { ServerResponse::FetchJobs(jobs) => {
let job = match jobs.into_iter().next() { let job = match jobs.into_iter().next() {
Some(job) => job, Some(job) => job,
None => return Either::A(Ok(ServerRequest::FetchJobs(1)).into_future()), None => return either_a,
}; };
let fut = processors let fut = processors
@ -147,7 +154,10 @@ fn process_response(
Either::B(fut) Either::B(fut)
} }
_ => return Either::A(Ok(ServerRequest::FetchJobs(1)).into_future()), e => {
error!("Error from server, {:?}", e);
return either_a;
}
} }
} }

View file

@ -9,10 +9,12 @@ use failure::Error;
mod client; mod client;
mod server; mod server;
mod spawner;
pub use crate::{ pub use crate::{
client::ClientConfig, client::ClientConfig,
server::{ServerConfig, ServerRequest, ServerResponse}, server::{ServerConfig, ServerRequest, ServerResponse},
spawner::SpawnerConfig,
}; };
fn coerce<T, F>(res: Result<Result<T, Error>, F>) -> Result<T, Error> fn coerce<T, F>(res: Result<Result<T, Error>, F>) -> Result<T, Error>

View file

@ -0,0 +1,49 @@
use std::sync::Arc;
use failure::Error;
use futures::{future::IntoFuture, Future};
use jobs_core::JobInfo;
use tokio_zmq::{prelude::*, Req};
use zmq::{Context, Message};
use crate::ServerRequest;
pub struct SpawnerConfig {
server: String,
ctx: Arc<Context>,
}
impl SpawnerConfig {
pub fn new(server_host: &str, server_port: usize) -> Self {
let ctx = Arc::new(Context::new());
SpawnerConfig {
server: format!("tcp://{}:{}", server_host, server_port),
ctx,
}
}
pub fn queue(&self, job: JobInfo) -> impl Future<Item = (), Error = Error> {
let msg = serde_json::to_string(&ServerRequest::ReturnJob(job))
.map_err(Error::from)
.and_then(|s| {
Message::from_slice(s.as_ref())
.map(|m| m.into())
.map_err(Error::from)
})
.into_future();
Req::builder(self.ctx.clone())
.connect(&self.server)
.build()
.into_future()
.from_err()
.join(msg)
.and_then(move |(req, msg)| {
req.send(msg)
.from_err()
.and_then(|req| req.recv().from_err())
.map(|_| ())
})
}
}

View file

@ -7,3 +7,6 @@ pub use jobs_tokio::{JobRunner, ProcessorHandle};
#[cfg(feature = "jobs-actix")] #[cfg(feature = "jobs-actix")]
pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob}; pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob};
#[cfg(feature = "jobs-server-tokio")]
pub use jobs_server_tokio::{ClientConfig, ServerConfig, SpawnerConfig};