diff --git a/examples/server-jobs-example/.env b/examples/server-jobs-example/.env index 0e7a139..711bfdf 100644 --- a/examples/server-jobs-example/.env +++ b/examples/server-jobs-example/.env @@ -1 +1 @@ -RUST_LOG=server_jobs_example,jobs_server_tokio=trace +RUST_LOG=server_jobs_example,jobs_server_tokio=info diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs index 6d55b1d..edb5fff 100644 --- a/examples/server-jobs-example/src/bin/server.rs +++ b/examples/server-jobs-example/src/bin/server.rs @@ -1,13 +1,18 @@ use failure::Error; use jobs::ServerConfig; +use server_jobs_example::queue_set; fn main() -> Result<(), Error> { dotenv::dotenv().ok(); env_logger::init(); - let config = ServerConfig::init("127.0.0.1", 5555, 5556, 1, "example-db")?; - - tokio::run(config.run()); + tokio::run(ServerConfig::init( + "127.0.0.1", + 5555, + 1, + queue_set(), + "example-db", + )); Ok(()) } diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs index 64a24bb..522002b 100644 --- a/examples/server-jobs-example/src/bin/spawner.rs +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -3,13 +3,16 @@ use jobs::{Processor, SpawnerConfig}; use server_jobs_example::{MyJobArguments, MyProcessor}; fn main() { + dotenv::dotenv().ok(); + env_logger::init(); + let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| { acc.push(MyJobArguments::new(x, y)); (y, x + y, acc) }); - let spawner = SpawnerConfig::new("localhost", 5556); + let spawner = SpawnerConfig::new("localhost", 5555); tokio::run(lazy(move || { for job in jobs { diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs index 09ee0b6..5d2af9b 100644 --- a/examples/server-jobs-example/src/bin/worker.rs +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -1,9 +1,12 @@ use failure::Error; use jobs::WorkerConfig; -use server_jobs_example::MyProcessor; +use server_jobs_example::{queue_map, MyProcessor}; fn main() -> Result<(), Error> { - let mut worker = WorkerConfig::init(16, "localhost", 5555, 5556)?; + dotenv::dotenv().ok(); + env_logger::init(); + + let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map()); worker.register_processor(MyProcessor); diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs index ebd9433..2d5ff52 100644 --- a/examples/server-jobs-example/src/lib.rs +++ b/examples/server-jobs-example/src/lib.rs @@ -1,10 +1,28 @@ #[macro_use] +extern crate log; +#[macro_use] extern crate serde_derive; +use std::collections::{BTreeMap, BTreeSet}; + use failure::Error; use futures::{future::IntoFuture, Future}; use jobs::{Backoff, MaxRetries, Processor}; +pub fn queue_map() -> BTreeMap { + let mut map = BTreeMap::new(); + map.insert("default".to_owned(), 18); + + map +} + +pub fn queue_set() -> BTreeSet { + let mut set = BTreeSet::new(); + set.insert("default".to_owned()); + + set +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct MyJobArguments { some_usize: usize, @@ -30,6 +48,10 @@ impl Processor for MyProcessor { "MyProcessor" } + fn queue() -> &'static str { + "default" + } + fn max_retries() -> MaxRetries { MaxRetries::Count(1) } @@ -39,7 +61,7 @@ impl Processor for MyProcessor { } fn process(&self, args: Self::Arguments) -> Box + Send> { - println!("args: {:?}", args); + info!("args: {:?}", args); Box::new(Ok(()).into_future()) } diff --git a/jobs-core/src/processors.rs b/jobs-core/src/processors.rs index 0c3f1ef..c055976 100644 --- a/jobs-core/src/processors.rs +++ b/jobs-core/src/processors.rs @@ -6,7 +6,7 @@ use serde_json::Value; use crate::{JobError, JobInfo, Processor}; pub type ProcessFn = - Box Box + Send> + Send>; + Box Box + Send> + Send + Sync>; pub struct Processors { inner: HashMap, diff --git a/jobs-server-tokio/src/server/mod.rs b/jobs-server-tokio/src/server/mod.rs index 76bac69..3975ffc 100644 --- a/jobs-server-tokio/src/server/mod.rs +++ b/jobs-server-tokio/src/server/mod.rs @@ -30,7 +30,7 @@ pub(crate) struct Config { } impl Config { - fn create_server(&self) -> impl Future { + fn create_server(&self) -> Box + Send> { let runner_id = self.runner_id; let db_path = self.db_path.clone(); let base_port = self.base_port; @@ -38,7 +38,7 @@ impl Config { let config = Arc::new(self.clone()); - poll_fn(move || { + let fut = poll_fn(move || { let runner_id = runner_id; let db_path = db_path.clone(); let base_port = base_port; @@ -60,6 +60,8 @@ impl Config { let address = format!("tcp://{}:{}", config.ip, port); + info!("Creating queue {} on address {}", queue, address); + tokio::spawn(PushConfig::init( address, queue.to_owned(), @@ -71,6 +73,7 @@ impl Config { StalledConfig::init(storage.clone()); let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1); + info!("Creating portmap on address {}", portmap_address); tokio::spawn(PortMapConfig::init( portmap_address, @@ -79,12 +82,15 @@ impl Config { )); let pull_address = format!("tcp://{}:{}", config.ip, config.base_port); + info!("Creating puller on address {}", pull_address); tokio::spawn(PullConfig::init(pull_address, storage, config)); Ok(()) }) - .map_err(|e| error!("Error starting server, {}", e)) + .map_err(|e| error!("Error starting server, {}", e)); + + Box::new(fut) } } @@ -101,7 +107,7 @@ impl ServerConfig { runner_id: usize, queues: BTreeSet, db_path: P, - ) -> impl Future { + ) -> Box + Send> { let context = Arc::new(Context::new()); Self::init_with_context(ip, base_port, runner_id, queues, db_path, context) @@ -114,7 +120,7 @@ impl ServerConfig { queues: BTreeSet, db_path: P, context: Arc, - ) -> impl Future { + ) -> Box + Send> { let config = Config { ip: ip.to_owned(), base_port, diff --git a/jobs-server-tokio/src/server/portmap.rs b/jobs-server-tokio/src/server/portmap.rs index 4b84381..b1267c2 100644 --- a/jobs-server-tokio/src/server/portmap.rs +++ b/jobs-server-tokio/src/server/portmap.rs @@ -1,15 +1,15 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use failure::Error; -use futures::{future::lazy, stream::iter_ok, Future, Stream}; +use futures::{future::lazy, Future, Stream}; use tokio::timer::Delay; -use tokio_zmq::{prelude::*, Multipart, Push}; +use tokio_zmq::{prelude::*, Multipart, Rep}; use zmq::Message; use crate::server::Config; pub(crate) struct PortMapConfig { - pusher: Push, + rep: Rep, address: String, port_map: BTreeMap, config: Arc, @@ -27,33 +27,33 @@ impl PortMapConfig { config, }; - cfg.build() - .map_err(|e| error!("Error starting pusher, {}", e)) + cfg.build().map_err(|e| error!("Error starting rep, {}", e)) } fn run(self) -> Box + Send> { let reset = self.reset(); let PortMapConfig { - pusher, + rep, address: _, port_map, config: _, } = self; - let fut = iter_ok::<_, Error>(0..) - .and_then(move |count| { - trace!("Pushed {} portmaps", count); + let (sink, stream) = rep.sink_stream().split(); + let fut = stream + .from_err::() + .and_then(move |_| { let s = serde_json::to_string(&port_map)?; let m = Message::from_slice(s.as_ref())?; Ok(Multipart::from(m)) }) - .forward(pusher.sink()) - .map(move |_| info!("portmap pusher shutting down")) + .forward(sink) + .map(move |_| info!("portmap rep shutting down")) .map_err(|e| { - error!("Error pushing portmap, {}", e); + error!("Error sending portmap, {}", e); tokio::spawn(reset.rebuild()); }); @@ -81,17 +81,17 @@ impl ResetPortMapConfig { Delay::new(tokio::clock::now() + Duration::from_secs(5)) .from_err() .and_then(move |_| self.build()) - .map_err(|e| error!("Error restarting pusher, {}", e)) + .map_err(|e| error!("Error restarting rep, {}", e)) } fn build(self) -> impl Future { lazy(|| { - let pusher = Push::builder(self.config.context.clone()) + let rep = Rep::builder(self.config.context.clone()) .bind(&self.address) .build()?; let config = PortMapConfig { - pusher, + rep, address: self.address, port_map: self.port_map, config: self.config, diff --git a/jobs-server-tokio/src/server/pull.rs b/jobs-server-tokio/src/server/pull.rs index 8157b7c..ca3a47c 100644 --- a/jobs-server-tokio/src/server/pull.rs +++ b/jobs-server-tokio/src/server/pull.rs @@ -44,8 +44,15 @@ impl PullConfig { .puller .stream() .from_err() + .map(|m| { + trace!("Handling new message"); + m + }) .and_then(parse_job) - .and_then(move |job| store_job(job, storage.clone())) + .and_then(move |job| { + trace!("Storing job, {:?}", job); + store_job(job, storage.clone()) + }) .for_each(|_| Ok(())) .map(|_| info!("Puller is shutting down")) .map_err(|e| { diff --git a/jobs-server-tokio/src/spawner.rs b/jobs-server-tokio/src/spawner.rs index d5be08b..9893f96 100644 --- a/jobs-server-tokio/src/spawner.rs +++ b/jobs-server-tokio/src/spawner.rs @@ -12,11 +12,11 @@ pub struct SpawnerConfig { } impl SpawnerConfig { - pub fn new(server_host: &str, queue_port: usize) -> Self { + pub fn new(server_host: &str, base_port: usize) -> Self { let ctx = Arc::new(Context::new()); SpawnerConfig { - server: format!("tcp://{}:{}", server_host, queue_port), + server: format!("tcp://{}:{}", server_host, base_port), ctx, } } @@ -31,12 +31,17 @@ impl SpawnerConfig { }) .into_future(); + debug!("Sending message to {}", self.server); + Push::builder(self.ctx.clone()) .connect(&self.server) .build() .into_future() .from_err() .join(msg) - .and_then(move |(req, msg)| req.send(msg).from_err().map(|_| ())) + .and_then(move |(push, msg)| { + trace!("Sending"); + push.send(msg).from_err().map(|_| trace!("sent")) + }) } } diff --git a/jobs-server-tokio/src/worker.rs b/jobs-server-tokio/src/worker.rs deleted file mode 100644 index 9551f84..0000000 --- a/jobs-server-tokio/src/worker.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::sync::Arc; - -use failure::Error; -use futures::{ - future::{lazy, Either, IntoFuture}, - Future, Stream, -}; -use jobs_core::{JobInfo, Processor, Processors}; -use tokio_zmq::{prelude::*, Multipart, Pull, Push}; -use zmq::{Context, Message}; - -struct Worker { - processors: Processors, - pull: Pull, - push: Push, -} - -impl Worker { - pub fn init( - server_host: &str, - job_port: usize, - queue_port: usize, - ctx: Arc, - ) -> Result { - let pull = Pull::builder(ctx.clone()) - .connect(&format!("tcp://{}:{}", server_host, job_port)) - .build()?; - - let push = Push::builder(ctx.clone()) - .connect(&format!("tcp://{}:{}", server_host, queue_port)) - .build()?; - - let processors = Processors::new(); - - let worker = Worker { - processors, - push, - pull, - }; - - Ok(worker) - } - - fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + Sync + 'static, - { - self.processors.register_processor(processor); - } -} - -pub struct WorkerConfig { - workers: Vec, -} - -impl WorkerConfig { - pub fn init( - num_processors: usize, - server_host: &str, - job_port: usize, - queue_port: usize, - ) -> Result { - let ctx = Arc::new(Context::new()); - - let mut workers = Vec::new(); - - for _ in 0..num_processors { - let worker = Worker::init(server_host, job_port, queue_port, ctx.clone())?; - - workers.push(worker); - } - - let cfg = WorkerConfig { workers }; - - Ok(cfg) - } - - pub fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + Sync + 'static, - { - for worker in self.workers.iter_mut() { - worker.register_processor(processor.clone()); - } - } - - pub fn run(self) -> impl Future { - let WorkerConfig { workers } = self; - - lazy(|| { - for worker in workers.into_iter() { - tokio::spawn(worker_future(worker)); - } - - Ok(()) - }) - } -} - -fn worker_future(worker: Worker) -> impl Future { - let Worker { - push, - pull, - processors, - } = worker; - - pull.stream() - .from_err() - .and_then(move |multipart| wrap_processing(multipart, &processors)) - .map(Some) - .or_else(|e| { - error!("Error processing job, {}", e); - Ok(None) - }) - .filter_map(|item| item) - .forward(push.sink()) - .map_err(|e: Error| error!("Error pushing job, {}", e)) - .map(|_| ()) -} - -fn serialize_request(job: JobInfo) -> Result { - let request = serde_json::to_string(&job)?; - let msg = Message::from_slice(request.as_ref())?; - - Ok(msg.into()) -} - -fn parse_multipart(mut multipart: Multipart) -> Result { - let message = multipart.pop_front().ok_or(ParseError)?; - - let parsed = serde_json::from_slice(&message)?; - - Ok(parsed) -} - -fn wrap_processing( - multipart: Multipart, - processors: &Processors, -) -> impl Future { - let msg = match parse_multipart(multipart) { - Ok(msg) => msg, - Err(e) => return Either::A(Err(e).into_future()), - }; - - let fut = process_job(msg, processors).and_then(serialize_request); - - Either::B(fut) -} - -fn process_job( - job: JobInfo, - processors: &Processors, -) -> impl Future { - processors - .process_job(job.clone()) - .map_err(|_| ProcessError) - .from_err() -} - -#[derive(Clone, Debug, Fail)] -#[fail(display = "Error parsing job")] -struct ParseError; - -#[derive(Clone, Debug, Fail)] -#[fail(display = "Error processing job")] -struct ProcessError; diff --git a/jobs-server-tokio/src/worker/config.rs b/jobs-server-tokio/src/worker/config.rs new file mode 100644 index 0000000..00130e1 --- /dev/null +++ b/jobs-server-tokio/src/worker/config.rs @@ -0,0 +1,172 @@ +use std::{sync::Arc, time::Duration}; + +use failure::Error; +use futures::{ + future::{lazy, Either, IntoFuture}, + Future, Stream, +}; +use jobs_core::{JobInfo, Processors}; +use tokio::timer::Delay; +use tokio_zmq::{prelude::*, Multipart, Pull, Push}; +use zmq::{Context, Message}; + +pub(crate) struct Worker { + pull: Pull, + push: Push, + push_address: String, + pull_address: String, + queue: String, + processors: Arc, + context: Arc, +} + +impl Worker { + pub(crate) fn init( + push_address: String, + pull_address: String, + queue: String, + processors: Arc, + context: Arc, + ) -> impl Future { + let cfg = ResetWorker { + push_address, + pull_address, + queue: queue.clone(), + processors, + context, + }; + + cfg.build() + .map_err(move |e| error!("Error starting worker for queue {}, {}", queue, e)) + } + + fn run(self) -> Box + Send> { + let reset = self.reset(); + + let Worker { + push, + pull, + push_address: _, + pull_address: _, + queue, + processors, + context: _, + } = self; + + let fut = pull + .stream() + .from_err::() + .and_then(move |multipart| wrap_processing(multipart, &processors)) + .forward(push.sink()) + .map(move |_| info!("worker for queue {} is shutting down", queue)) + .map_err(|e| { + error!("Error processing job, {}", e); + + tokio::spawn(reset.rebuild()); + }); + + Box::new(fut) + } + + fn reset(&self) -> ResetWorker { + ResetWorker { + push_address: self.push_address.clone(), + pull_address: self.pull_address.clone(), + queue: self.queue.clone(), + processors: self.processors.clone(), + context: self.context.clone(), + } + } +} + +struct ResetWorker { + push_address: String, + pull_address: String, + queue: String, + processors: Arc, + context: Arc, +} + +impl ResetWorker { + fn rebuild(self) -> impl Future { + let queue = self.queue.clone(); + + Delay::new(tokio::clock::now() + Duration::from_secs(5)) + .from_err() + .and_then(move |_| self.build()) + .map_err(move |e| error!("Error restarting worker for queue {}, {}", queue, e)) + } + + fn build(self) -> impl Future { + lazy(|| { + let push = Push::builder(self.context.clone()) + .connect(&self.push_address) + .build()?; + + let pull = Pull::builder(self.context.clone()) + .connect(&self.pull_address) + .build()?; + + let config = Worker { + push, + pull, + push_address: self.push_address, + pull_address: self.pull_address, + queue: self.queue, + processors: self.processors, + context: self.context, + }; + + tokio::spawn(config.run()); + + Ok(()) + }) + } +} + +fn serialize_request(job: JobInfo) -> Result { + let request = serde_json::to_string(&job)?; + let msg = Message::from_slice(request.as_ref())?; + + Ok(msg.into()) +} + +fn parse_multipart(mut multipart: Multipart) -> Result { + let message = multipart.pop_front().ok_or(ParseError)?; + + let parsed = serde_json::from_slice(&message)?; + + Ok(parsed) +} + +fn wrap_processing( + multipart: Multipart, + processors: &Processors, +) -> impl Future { + let msg = match parse_multipart(multipart) { + Ok(msg) => msg, + Err(e) => return Either::A(Err(e).into_future()), + }; + + let fut = process_job(msg, processors).and_then(serialize_request); + + Either::B(fut) +} + +fn process_job( + job: JobInfo, + processors: &Processors, +) -> impl Future { + processors + .process_job(job.clone()) + .map_err(|_| ProcessError) + .from_err() +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error parsing job")] +struct ParseError; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error processing job")] +struct ProcessError; diff --git a/jobs-server-tokio/src/worker/mod.rs b/jobs-server-tokio/src/worker/mod.rs new file mode 100644 index 0000000..779cf07 --- /dev/null +++ b/jobs-server-tokio/src/worker/mod.rs @@ -0,0 +1,98 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use futures::Future; +use jobs_core::{Processor, Processors}; +use zmq::Context; + +mod config; +mod portmap; + +use self::{config::Worker, portmap::PortMap}; + +pub struct WorkerConfig { + processors: Processors, + queues: BTreeMap, + server_host: String, + base_port: usize, + context: Arc, +} + +impl WorkerConfig { + pub fn new(server_host: String, base_port: usize, queues: BTreeMap) -> Self { + let context = Arc::new(Context::new()); + + Self::new_with_context(server_host, base_port, queues, context) + } + + pub fn new_with_context( + server_host: String, + base_port: usize, + queues: BTreeMap, + context: Arc, + ) -> Self { + WorkerConfig { + processors: Processors::new(), + server_host, + base_port, + queues, + context, + } + } + + pub fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + self.processors.register_processor(processor); + } + + pub fn run(self) -> Box + Send> { + let WorkerConfig { + processors, + server_host, + base_port, + queues, + context, + } = self; + + info!("Starting workers"); + + let processors = Arc::new(processors); + + let push_address = format!("tcp://{}:{}", server_host, base_port); + let portmap_address = format!("tcp://{}:{}", server_host, base_port + 1); + + info!("push address, {}", push_address); + info!("portmap address, {}", portmap_address); + + let fut = PortMap::init(portmap_address, context.clone()) + .and_then(move |port_map| { + info!("Got port_map, {:?}", port_map); + + for (queue, num) in queues.iter() { + let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?; + + let pull_address = format!("tcp://{}:{}", server_host, port); + + for _ in 0..*num { + tokio::spawn(Worker::init( + push_address.clone(), + pull_address.clone(), + queue.to_owned(), + processors.clone(), + context.clone(), + )); + } + } + + Ok(()) + }) + .map_err(|e| error!("Error starting worker, {}", e)); + + Box::new(fut) + } +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Could not find queue, {}", _0)] +struct MissingQueue(String); diff --git a/jobs-server-tokio/src/worker/portmap.rs b/jobs-server-tokio/src/worker/portmap.rs new file mode 100644 index 0000000..f5054e3 --- /dev/null +++ b/jobs-server-tokio/src/worker/portmap.rs @@ -0,0 +1,38 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use failure::Error; +use futures::{future::lazy, Future}; +use tokio_zmq::{prelude::*, Req}; +use zmq::{Context, Message}; + +pub(crate) struct PortMap; + +impl PortMap { + pub(crate) fn init( + address: String, + context: Arc, + ) -> impl Future, Error = Error> { + lazy(move || { + let req = Req::builder(context.clone()).connect(&address).build()?; + + Ok(req) + }) + .and_then(|req| { + Message::from_slice(b"h") + .map_err(Error::from) + .map(move |msg| (req, msg.into())) + }) + .and_then(|(req, msg)| req.send(msg).and_then(|req| req.recv()).from_err()) + .and_then(|(mut multipart, _)| { + let msg = multipart.pop_front().ok_or(EmptyMessage)?; + + let map = serde_json::from_slice(&msg)?; + + Ok(map) + }) + } +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Message was empty")] +struct EmptyMessage;