diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 8823b78..d69c615 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -49,8 +49,8 @@ impl KvActor { Ok(()) } - pub fn dequeue_jobs(&self, limit: usize) -> Result, Error> { - let jobs = self.storage.dequeue_job(limit)?; + pub fn dequeue_jobs(&self, limit: usize, queue: &str) -> Result, Error> { + let jobs = self.storage.dequeue_job(limit, queue)?; Ok(jobs) } @@ -72,7 +72,7 @@ impl Handler for KvActor { type Result = Result, Error>; fn handle(&mut self, msg: DequeueJobs, _: &mut Self::Context) -> Self::Result { - self.dequeue_jobs(msg.0) + self.dequeue_jobs(msg.0, &msg.1) } } @@ -84,7 +84,7 @@ impl Message for StoreJob { } #[derive(Debug)] -pub struct DequeueJobs(usize); +pub struct DequeueJobs(usize, String); impl Message for DequeueJobs { type Result = Result, Error>; @@ -149,7 +149,7 @@ impl ProcessorActor { wrap_future( actor .store - .send(DequeueJobs(1)) + .send(DequeueJobs(1, "default".to_owned())) .then(coerce) .map_err(|e| error!("Error fetching jobs, {}", e)) .and_then(|jobs| jobs.into_iter().next().ok_or(())), diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 90e9ae4..fae35df 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -11,6 +11,9 @@ pub struct JobInfo { /// Name of the processor that should handle this job processor: String, + /// Name of the queue that this job is a part of + queue: String, + /// Arguments for a given job args: Value, @@ -26,13 +29,17 @@ pub struct JobInfo { /// How often retries should be scheduled backoff_strategy: Backoff, - /// The time this job was re-queued + /// The time this job should be dequeued next_queue: Option>, + + /// The time this job was last updated + updated_at: DateTime, } impl JobInfo { pub(crate) fn new( processor: String, + queue: String, args: Value, max_retries: MaxRetries, backoff_strategy: Backoff, @@ -40,14 +47,21 @@ impl JobInfo { JobInfo { id: None, processor, + queue, status: JobStatus::Pending, args, retry_count: 0, max_retries, next_queue: None, backoff_strategy, + updated_at: Utc::now(), } } + + pub(crate) fn updated(&mut self) { + self.updated_at = Utc::now(); + } + pub(crate) fn processor(&self) -> &str { &self.processor } @@ -89,6 +103,10 @@ impl JobInfo { self.next_queue = Some(next_queue); } + pub(crate) fn is_stale(&self) -> bool { + self.updated_at < Utc::now() - OldDuration::days(1) + } + pub(crate) fn is_ready(&self, now: DateTime) -> bool { match self.next_queue { Some(ref time) => now > *time, @@ -100,6 +118,10 @@ impl JobInfo { self.status == JobStatus::Failed } + pub(crate) fn is_in_queue(&self, queue: &str) -> bool { + self.queue == queue + } + pub(crate) fn pending(&mut self) { self.status = JobStatus::Pending; } diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index b92045d..b7344bb 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -19,6 +19,12 @@ pub trait Processor: Clone { /// This name must be unique!!! It is used to look up which processor should handle a job fn name() -> &'static str; + /// The name of the queue + /// + /// The queue determines which workers should process which jobs. By default, all workers + /// process all jobs, but that can be configured when starting the workers + fn queue() -> &'static str; + /// Define the default number of retries for a given processor /// /// Jobs can override @@ -56,6 +62,10 @@ pub trait Processor: Clone { /// "IncrementProcessor" /// } /// + /// fn queue() -> &'static str { + /// "default" + /// } + /// /// fn max_retries() -> MaxRetries { /// MaxRetries::Count(1) /// } @@ -87,6 +97,7 @@ pub trait Processor: Clone { ) -> Result { let job = JobInfo::new( Self::name().to_owned(), + Self::queue().to_owned(), serde_json::to_value(args)?, max_retries.unwrap_or(Self::max_retries()), backoff_strategy.unwrap_or(Self::backoff_strategy()), diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 10a112d..cbc166b 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,5 +1,7 @@ use std::{ + collections::{BTreeMap, BTreeSet}, path::PathBuf, + str::Utf8Error, sync::{Arc, RwLock, RwLockWriteGuard}, }; @@ -85,7 +87,66 @@ impl Storage { Ok(new_id) } - pub fn dequeue_job(&self, limit: usize) -> Result, Error> { + pub fn check_stalled_jobs(&self) -> Result<(), Error> { + let store = self.store.write()?; + let job_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; + + let lock_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; + + let buckets = Buckets::new(&store)?; + + let mut write_txn = store.write_txn()?; + let read_txn = store.read_txn()?; + + self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| { + let mut cursor = read_txn.read_cursor(&buckets.running)?; + match cursor.get(None, CursorOp::First) { + Ok(_) => (), + Err(e) => match e { + Error::NotFound => { + return Ok(()); + } + e => { + return Err(e); + } + }, + } + + let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; + + let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { + acc.and_then(|inner_txn| { + let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + if job.is_stale() { + if job.increment().should_requeue() { + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.queue_job(&buckets, inner_txn, key)?; + } else { + job.fail(); + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.fail_job(&buckets, inner_txn, key)?; + } + } + + Ok(inner_txn) + }) + })?; + + Ok(()) + })?; + + read_txn.commit()?; + write_txn.commit()?; + + Ok(()) + } + + pub fn dequeue_job(&self, limit: usize, queue: &str) -> Result, Error> { let store = self.store.write()?; trace!("Got store"); @@ -103,8 +164,11 @@ impl Storage { let mut txn = store.write_txn()?; let read_txn = store.read_txn()?; - let result = - self.with_lock::<_, Vec>(&lock_bucket, &mut txn, b"job-queue", |inner_txn| { + let result = self.with_lock::<_, Vec, _>( + &lock_bucket, + &mut txn, + b"job-queue", + |inner_txn| { let mut cursor = read_txn.read_cursor(&buckets.queued)?; trace!("Got cursor"); match cursor.get(None, CursorOp::First) { @@ -127,28 +191,25 @@ impl Storage { let now = Utc::now(); trace!("Got lock"); - let (_inner_txn, vec) = - cursor - .iter() - .fold(initial_value, |acc, (key, _)| match acc { - Ok((inner_txn, mut jobs)) => { - if jobs.len() < limit { - let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + let (_inner_txn, vec) = cursor.iter().fold(initial_value, |acc, (key, _)| { + acc.and_then(|(inner_txn, mut jobs)| { + if jobs.len() < limit { + let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - if job.is_ready(now) { - self.run_job(&buckets, inner_txn, key)?; + if job.is_ready(now) && job.is_in_queue(queue) { + self.run_job(&buckets, inner_txn, key)?; - jobs.push(job); - } - } - - Ok((inner_txn, jobs)) + jobs.push(job); } - Err(e) => Err(e), - })?; + } + + Ok((inner_txn, jobs)) + }) + })?; Ok(vec) - })?; + }, + )?; trace!("Committing"); @@ -171,6 +232,8 @@ impl Storage { } }; + job.updated(); + trace!("Generaged job id, {}", job_id); if job.is_failed() { @@ -212,6 +275,81 @@ impl Storage { Ok(()) } + pub fn get_port_mapping( + &self, + base_port: usize, + queues: BTreeSet, + ) -> Result, PortMapError> { + let store = self.store.write().map_err(|e| Error::from(e))?; + + let queue_port_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::queue_port()))?; + + let read_txn = store.read_txn()?; + let mut write_txn = store.write_txn()?; + + let lock_name = "lock-queue"; + + let queue_map = self.with_lock::<_, _, PortMapError>( + &queue_port_bucket, + &mut write_txn, + lock_name.as_ref(), + |write_txn| { + let mut cursor = read_txn.read_cursor(&queue_port_bucket)?; + + let (unused_queues, queue_map) = cursor.iter().fold( + Ok((queues.clone(), BTreeMap::new())), + |acc: Result<_, PortMapError>, (queue, port)| { + acc.and_then(move |(mut queues, mut map)| { + let port: usize = port.inner()?.to_serde(); + let queue = std::str::from_utf8(queue)?.to_owned(); + + if queue != lock_name { + queues.remove(&queue); + map.insert(queue, port); + } + + Ok((queues, map)) + }) + }, + )?; + + // The starting port for new queues should be one greater than the maximum port + // number in the btree set, or 2 greater than the base port. + // + // This is because there need to be two admin ports before the queue ports begin. + let start_port = queue_map + .iter() + .map(|(_, v)| *v) + .filter(|v| *v != 0) + .max() + .unwrap_or(base_port + 1) + + 1; + + let (_, queue_map, _) = unused_queues.into_iter().fold( + Ok((write_txn, queue_map, start_port)), + |acc: Result<_, PortMapError>, queue_name| { + acc.and_then(|(write_txn, mut queue_map, port_num)| { + let port = Json::to_value_buf(port_num)?; + + write_txn.set(&queue_port_bucket, queue_name.as_ref(), port)?; + queue_map.insert(queue_name, port_num); + + Ok((write_txn, queue_map, port_num + 1)) + }) + }, + )?; + + Ok(queue_map) + }, + )?; + + read_txn.commit()?; + write_txn.commit()?; + + Ok(queue_map) + } + fn queue_job<'env>( &self, buckets: &'env Buckets<'env>, @@ -302,15 +440,16 @@ impl Storage { // // But in the event of multiple processes running on the same machine, it is good to have some // way to make sure they don't step on eachother's toes - fn with_lock<'env, F, T>( + fn with_lock<'env, F, T, E>( &self, lock_bucket: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, lock_key: &[u8], callback: F, - ) -> Result + ) -> Result where - F: Fn(&mut Txn<'env>) -> Result, + F: Fn(&mut Txn<'env>) -> Result, + E: From, { let mut other_runner_id = 0; @@ -339,16 +478,16 @@ impl Storage { } Err(e) => match e { Error::NotFound => continue, - e => return Err(e), + e => return Err(e.into()), }, } match e { Error::LMDB(lmdb) => match lmdb { LmdbError::KeyExist => continue, - e => return Err(Error::LMDB(e)), + e => return Err(Error::LMDB(e).into()), }, - e => return Err(e), + e => return Err(e.into()), } } } @@ -361,7 +500,7 @@ impl Storage { Ok(item) } - fn buckets() -> [&'static str; 7] { + fn buckets() -> [&'static str; 8] { [ Storage::id_store(), Storage::job_store(), @@ -370,6 +509,7 @@ impl Storage { Storage::job_running(), Storage::job_lock(), Storage::job_finished(), + Storage::queue_port(), ] } @@ -400,4 +540,29 @@ impl Storage { fn job_lock() -> &'static str { "job-lock" } + + fn queue_port() -> &'static str { + "queue-port" + } +} + +#[derive(Debug, Fail)] +pub enum PortMapError { + #[fail(display = "Error in KV, {}", _0)] + Kv(#[cause] Error), + + #[fail(display = "Error parsing to Utf8, {}", _0)] + Utf8(#[cause] Utf8Error), +} + +impl From for PortMapError { + fn from(e: Error) -> Self { + PortMapError::Kv(e) + } +} + +impl From for PortMapError { + fn from(e: Utf8Error) -> Self { + PortMapError::Utf8(e) + } } diff --git a/jobs-server-tokio/src/server.rs b/jobs-server-tokio/src/server.rs deleted file mode 100644 index 1ad7618..0000000 --- a/jobs-server-tokio/src/server.rs +++ /dev/null @@ -1,192 +0,0 @@ -use std::{ - path::{Path, PathBuf}, - sync::Arc, - time::Duration, -}; - -use failure::Error; -use futures::{ - future::{lazy, poll_fn}, - stream::iter_ok, - Future, Stream, -}; -use jobs_core::{JobInfo, Storage}; -use tokio::timer::Interval; -use tokio_threadpool::blocking; -use tokio_zmq::{prelude::*, Multipart, Pull, Push}; -use zmq::{Context, Message}; - -use crate::coerce; - -#[derive(Clone)] -struct Config { - ip: String, - job_port: usize, - queue_port: usize, - runner_id: usize, - db_path: PathBuf, - context: Arc, -} - -impl Config { - fn create_server(&self) -> Result { - let pusher = Push::builder(self.context.clone()) - .bind(&format!("tcp://{}:{}", self.ip, self.job_port)) - .build()?; - - let puller = Pull::builder(self.context.clone()) - .bind(&format!("tcp://{}:{}", self.ip, self.queue_port)) - .build()?; - - let storage = Storage::init(self.runner_id, self.db_path.clone())?; - - let server = ServerConfig { - pusher, - puller, - storage, - config: self.clone(), - }; - - Ok(server) - } -} - -pub struct ServerConfig { - pusher: Push, - puller: Pull, - storage: Storage, - // TODO: Recover from failure - #[allow(dead_code)] - config: Config, -} - -impl ServerConfig { - pub fn init>( - ip: &str, - job_port: usize, - queue_port: usize, - runner_id: usize, - db_path: P, - ) -> Result { - let context = Arc::new(Context::new()); - - Self::init_with_context(ip, job_port, queue_port, runner_id, db_path, context) - } - - pub fn init_with_context>( - ip: &str, - job_port: usize, - queue_port: usize, - runner_id: usize, - db_path: P, - context: Arc, - ) -> Result { - let config = Config { - ip: ip.to_owned(), - job_port, - queue_port, - runner_id, - db_path: db_path.as_ref().to_owned(), - context, - }; - - config.create_server() - } - - pub fn run(self) -> impl Future { - lazy(|| { - let ServerConfig { - pusher, - puller, - storage, - config: _, - } = self; - - let storage2 = storage.clone(); - - let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250)) - .from_err() - .and_then(move |_| dequeue_jobs(storage.clone())) - .flatten() - .fold(pusher, move |pusher, multipart| { - Box::new(push_job(pusher, multipart)) - }); - - tokio::spawn( - fut.map(|_| ()) - .map_err(move |e| error!("Error in server, {}", e)), - ); - - puller - .stream() - .from_err() - .and_then(parse_job) - .and_then(move |job| store_job(job, storage2.clone())) - .or_else(|e| Ok(error!("Error storing job, {}", e))) - .for_each(|_| Ok(())) - }) - } -} - -fn dequeue_jobs( - storage: Storage, -) -> impl Future, Error = Error> { - poll_fn(move || { - let storage = storage.clone(); - blocking(move || wrap_fetch_queue(storage)) - }) - .then(coerce) - .map(|jobs| iter_ok(jobs)) - .or_else(|e| { - error!("Error fetching jobs, {}", e); - Ok(iter_ok(vec![])) - }) -} - -fn push_job(pusher: Push, message: Multipart) -> impl Future { - pusher.send(message).map_err(Error::from) -} - -fn store_job(job: JobInfo, storage: Storage) -> impl Future { - let storage = storage.clone(); - - poll_fn(move || { - let job = job.clone(); - let storage = storage.clone(); - - blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from) - }) - .then(coerce) -} - -fn wrap_fetch_queue(storage: Storage) -> Result, Error> { - let response = fetch_queue(storage)?; - - let jobs = response - .into_iter() - .map(|job| { - serde_json::to_string(&job) - .map_err(Error::from) - .and_then(|json| Message::from_slice(json.as_ref()).map_err(Error::from)) - .map(Multipart::from) - }) - .collect::, Error>>()?; - - Ok(jobs) -} - -fn fetch_queue(storage: Storage) -> Result, Error> { - storage.dequeue_job(100).map_err(Error::from) -} - -fn parse_job(mut multipart: Multipart) -> Result { - let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?; - - let parsed = serde_json::from_slice(&unparsed_msg)?; - - Ok(parsed) -} - -#[derive(Clone, Debug, Fail)] -#[fail(display = "Message was empty")] -pub struct EmptyMessage; diff --git a/jobs-server-tokio/src/server/mod.rs b/jobs-server-tokio/src/server/mod.rs new file mode 100644 index 0000000..76bac69 --- /dev/null +++ b/jobs-server-tokio/src/server/mod.rs @@ -0,0 +1,129 @@ +use std::{ + collections::BTreeSet, + path::{Path, PathBuf}, + sync::Arc, +}; + +use failure::Error; +use futures::{future::poll_fn, Future}; +use jobs_core::Storage; +use tokio_threadpool::blocking; +use zmq::Context; + +use crate::coerce; + +mod portmap; +mod pull; +mod push; +mod stalled; + +use self::{portmap::PortMapConfig, pull::PullConfig, push::PushConfig, stalled::StalledConfig}; + +#[derive(Clone)] +pub(crate) struct Config { + ip: String, + base_port: usize, + runner_id: usize, + queues: BTreeSet, + db_path: PathBuf, + context: Arc, +} + +impl Config { + fn create_server(&self) -> impl Future { + let runner_id = self.runner_id; + let db_path = self.db_path.clone(); + let base_port = self.base_port; + let queues = self.queues.clone(); + + let config = Arc::new(self.clone()); + + poll_fn(move || { + let runner_id = runner_id; + let db_path = db_path.clone(); + let base_port = base_port; + let queues = queues.clone(); + + blocking(move || { + let storage = Arc::new(Storage::init(runner_id, db_path)?); + storage.check_stalled_jobs()?; + let port_map = storage.get_port_mapping(base_port, queues)?; + + Ok((storage, port_map)) + }) + }) + .from_err::() + .then(coerce) + .and_then(|(storage, port_map)| { + for queue in config.queues.iter() { + let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?; + + let address = format!("tcp://{}:{}", config.ip, port); + + tokio::spawn(PushConfig::init( + address, + queue.to_owned(), + storage.clone(), + config.clone(), + )); + } + + StalledConfig::init(storage.clone()); + + let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1); + + tokio::spawn(PortMapConfig::init( + portmap_address, + port_map, + config.clone(), + )); + + let pull_address = format!("tcp://{}:{}", config.ip, config.base_port); + + tokio::spawn(PullConfig::init(pull_address, storage, config)); + + Ok(()) + }) + .map_err(|e| error!("Error starting server, {}", e)) + } +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Queue is missing from map, {}", _0)] +struct MissingQueue(String); + +pub struct ServerConfig; + +impl ServerConfig { + pub fn init>( + ip: &str, + base_port: usize, + runner_id: usize, + queues: BTreeSet, + db_path: P, + ) -> impl Future { + let context = Arc::new(Context::new()); + + Self::init_with_context(ip, base_port, runner_id, queues, db_path, context) + } + + pub fn init_with_context>( + ip: &str, + base_port: usize, + runner_id: usize, + queues: BTreeSet, + db_path: P, + context: Arc, + ) -> impl Future { + let config = Config { + ip: ip.to_owned(), + base_port, + runner_id, + queues, + db_path: db_path.as_ref().to_owned(), + context, + }; + + config.create_server() + } +} diff --git a/jobs-server-tokio/src/server/portmap.rs b/jobs-server-tokio/src/server/portmap.rs new file mode 100644 index 0000000..4b84381 --- /dev/null +++ b/jobs-server-tokio/src/server/portmap.rs @@ -0,0 +1,105 @@ +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use failure::Error; +use futures::{future::lazy, stream::iter_ok, Future, Stream}; +use tokio::timer::Delay; +use tokio_zmq::{prelude::*, Multipart, Push}; +use zmq::Message; + +use crate::server::Config; + +pub(crate) struct PortMapConfig { + pusher: Push, + address: String, + port_map: BTreeMap, + config: Arc, +} + +impl PortMapConfig { + pub(crate) fn init( + address: String, + port_map: BTreeMap, + config: Arc, + ) -> impl Future { + let cfg = ResetPortMapConfig { + address, + port_map, + config, + }; + + cfg.build() + .map_err(|e| error!("Error starting pusher, {}", e)) + } + + fn run(self) -> Box + Send> { + let reset = self.reset(); + + let PortMapConfig { + pusher, + address: _, + port_map, + config: _, + } = self; + + let fut = iter_ok::<_, Error>(0..) + .and_then(move |count| { + trace!("Pushed {} portmaps", count); + + let s = serde_json::to_string(&port_map)?; + let m = Message::from_slice(s.as_ref())?; + + Ok(Multipart::from(m)) + }) + .forward(pusher.sink()) + .map(move |_| info!("portmap pusher shutting down")) + .map_err(|e| { + error!("Error pushing portmap, {}", e); + + tokio::spawn(reset.rebuild()); + }); + + Box::new(fut) + } + + fn reset(&self) -> ResetPortMapConfig { + ResetPortMapConfig { + address: self.address.clone(), + port_map: self.port_map.clone(), + config: self.config.clone(), + } + } +} + +struct ResetPortMapConfig { + address: String, + port_map: BTreeMap, + config: Arc, +} + +impl ResetPortMapConfig { + fn rebuild(self) -> impl Future { + Delay::new(tokio::clock::now() + Duration::from_secs(5)) + .from_err() + .and_then(move |_| self.build()) + .map_err(|e| error!("Error restarting pusher, {}", e)) + } + + fn build(self) -> impl Future { + lazy(|| { + let pusher = Push::builder(self.config.context.clone()) + .bind(&self.address) + .build()?; + + let config = PortMapConfig { + pusher, + address: self.address, + port_map: self.port_map, + config: self.config, + }; + + tokio::spawn(config.run()); + + Ok(()) + }) + } +} diff --git a/jobs-server-tokio/src/server/pull.rs b/jobs-server-tokio/src/server/pull.rs new file mode 100644 index 0000000..8157b7c --- /dev/null +++ b/jobs-server-tokio/src/server/pull.rs @@ -0,0 +1,125 @@ +use std::{sync::Arc, time::Duration}; + +use failure::Error; +use futures::{ + future::{lazy, poll_fn}, + Future, Stream, +}; +use jobs_core::{JobInfo, Storage}; +use tokio::timer::Delay; +use tokio_threadpool::blocking; +use tokio_zmq::{prelude::*, Multipart, Pull}; + +use crate::server::{coerce, Config}; + +pub(crate) struct PullConfig { + puller: Pull, + address: String, + storage: Arc, + config: Arc, +} + +impl PullConfig { + pub(crate) fn init( + address: String, + storage: Arc, + config: Arc, + ) -> impl Future { + let cfg = ResetPullConfig { + address, + storage, + config, + }; + + cfg.build() + .map_err(|e| error!("Error starting puller, {}", e)) + } + + fn run(self) -> Box + Send> { + let config = self.reset(); + + let storage = self.storage.clone(); + + let fut = self + .puller + .stream() + .from_err() + .and_then(parse_job) + .and_then(move |job| store_job(job, storage.clone())) + .for_each(|_| Ok(())) + .map(|_| info!("Puller is shutting down")) + .map_err(|e| { + error!("Error storing job, {}", e); + + tokio::spawn(config.rebuild()); + }); + + Box::new(fut) + } + + fn reset(&self) -> ResetPullConfig { + ResetPullConfig { + address: self.address.clone(), + storage: self.storage.clone(), + config: self.config.clone(), + } + } +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Message was empty")] +pub struct EmptyMessage; + +fn parse_job(mut multipart: Multipart) -> Result { + let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?; + + let parsed = serde_json::from_slice(&unparsed_msg)?; + + Ok(parsed) +} + +fn store_job(job: JobInfo, storage: Arc) -> impl Future { + let storage = storage.clone(); + + poll_fn(move || { + let job = job.clone(); + let storage = storage.clone(); + + blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from) + }) + .then(coerce) +} + +struct ResetPullConfig { + address: String, + storage: Arc, + config: Arc, +} + +impl ResetPullConfig { + fn rebuild(self) -> impl Future { + Delay::new(tokio::clock::now() + Duration::from_secs(5)) + .from_err() + .and_then(move |_| self.build()) + .map_err(|e| error!("Error restarting puller, {}", e)) + } + + fn build(self) -> impl Future { + lazy(|| { + let puller = Pull::builder(self.config.context.clone()) + .bind(&self.address) + .build()?; + + let config = PullConfig { + puller, + address: self.address, + storage: self.storage, + config: self.config, + }; + + tokio::spawn(config.run()); + + Ok(()) + }) + } +} diff --git a/jobs-server-tokio/src/server/push.rs b/jobs-server-tokio/src/server/push.rs new file mode 100644 index 0000000..be4cf1a --- /dev/null +++ b/jobs-server-tokio/src/server/push.rs @@ -0,0 +1,157 @@ +use std::{sync::Arc, time::Duration}; + +use failure::Error; +use futures::{ + future::{lazy, poll_fn}, + stream::iter_ok, + Future, Stream, +}; +use jobs_core::{JobInfo, Storage}; +use tokio::timer::{Delay, Interval}; +use tokio_threadpool::blocking; +use tokio_zmq::{prelude::*, Multipart, Push}; +use zmq::Message; + +use crate::server::{coerce, Config}; + +pub(crate) struct PushConfig { + pusher: Push, + address: String, + queue: String, + storage: Arc, + config: Arc, +} + +impl PushConfig { + pub(crate) fn init( + address: String, + queue: String, + storage: Arc, + config: Arc, + ) -> impl Future { + let cfg = ResetPushConfig { + address, + queue, + storage, + config, + }; + + cfg.build() + .map_err(|e| error!("Error starting pusher, {}", e)) + } + + fn run(self) -> Box + Send> { + let reset = self.reset(); + + let PushConfig { + address: _, + pusher, + queue, + storage, + config: _, + } = self; + + let queue_2_electric_boogaloo = queue.clone(); + + let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250)) + .from_err() + .and_then(move |_| dequeue_jobs(storage.clone(), queue.clone())) + .flatten() + .forward(pusher.sink()) + .map(move |_| { + info!( + "Pusher for queue {} is shutting down", + queue_2_electric_boogaloo + ) + }) + .map_err(|e| { + error!("Error dequeuing job, {}", e); + + tokio::spawn(reset.rebuild()); + }); + + Box::new(fut) + } + + fn reset(&self) -> ResetPushConfig { + ResetPushConfig { + address: self.address.clone(), + queue: self.queue.clone(), + storage: self.storage.clone(), + config: self.config.clone(), + } + } +} + +fn dequeue_jobs( + storage: Arc, + queue: String, +) -> impl Future, Error = Error> { + poll_fn(move || { + let storage = storage.clone(); + let queue = queue.clone(); + blocking(move || wrap_fetch_queue(storage, &queue)) + }) + .then(coerce) + .map(|jobs| iter_ok(jobs)) + .or_else(|e| { + error!("Error fetching jobs, {}", e); + Ok(iter_ok(vec![])) + }) +} + +fn wrap_fetch_queue(storage: Arc, queue: &str) -> Result, Error> { + let response = fetch_queue(storage, queue)?; + + let jobs = response + .into_iter() + .map(|job| { + serde_json::to_string(&job) + .map_err(Error::from) + .and_then(|json| Message::from_slice(json.as_ref()).map_err(Error::from)) + .map(Multipart::from) + }) + .collect::, Error>>()?; + + Ok(jobs) +} + +fn fetch_queue(storage: Arc, queue: &str) -> Result, Error> { + storage.dequeue_job(100, queue).map_err(Error::from) +} + +struct ResetPushConfig { + address: String, + queue: String, + storage: Arc, + config: Arc, +} + +impl ResetPushConfig { + fn rebuild(self) -> impl Future { + Delay::new(tokio::clock::now() + Duration::from_secs(5)) + .from_err() + .and_then(move |_| self.build()) + .map_err(|e| error!("Error restarting pusher, {}", e)) + } + + fn build(self) -> impl Future { + lazy(|| { + let pusher = Push::builder(self.config.context.clone()) + .bind(&self.address) + .build()?; + + let config = PushConfig { + pusher, + address: self.address, + queue: self.queue, + storage: self.storage, + config: self.config, + }; + + tokio::spawn(config.run()); + + Ok(()) + }) + } +} diff --git a/jobs-server-tokio/src/server/stalled.rs b/jobs-server-tokio/src/server/stalled.rs new file mode 100644 index 0000000..adad3c7 --- /dev/null +++ b/jobs-server-tokio/src/server/stalled.rs @@ -0,0 +1,57 @@ +use std::{sync::Arc, time::Duration}; + +use failure::Error; +use futures::{future::poll_fn, Future, Stream}; +use jobs_core::Storage; +use tokio::timer::{Delay, Interval}; +use tokio_threadpool::blocking; + +use crate::server::coerce; + +#[derive(Clone)] +pub(crate) struct StalledConfig { + storage: Arc, +} + +impl StalledConfig { + pub(crate) fn init(storage: Arc) { + let cfg = StalledConfig { storage }; + + tokio::spawn(cfg.run()); + } + + fn run(self) -> Box + Send> { + let reset = self.clone(); + + let StalledConfig { storage } = self; + + let fut = Interval::new(tokio::clock::now(), Duration::from_secs(60 * 30)) + .from_err::() + .and_then(move |_| { + let storage = storage.clone(); + poll_fn(move || { + let storage = storage.clone(); + blocking(move || storage.check_stalled_jobs().map_err(Error::from)) + }) + .from_err() + }) + .then(coerce) + .for_each(|_| Ok(())) + .map(|_| info!("Stalled Job Checker is shutting down")) + .map_err(|e| { + error!("Error checking stalled jobs, {}", e); + + tokio::spawn(reset.rebuild()); + }); + + Box::new(fut) + } + + fn rebuild(self) -> impl Future { + Delay::new(tokio::clock::now() + Duration::from_secs(5)) + .from_err::() + .map(move |_| tokio::spawn(self.run())) + .map(|_| ()) + .map_err(|e| error!("Error restarting stalled job checker, {}", e)) + } +} diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index 154e607..a869ee7 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -74,6 +74,7 @@ fn return_job( fn try_process_job( storage: Storage, + queue: String, processor_count: usize, processors: Processors, tx: Sender, @@ -81,10 +82,11 @@ fn try_process_job( if processor_count > 0 { let fut = poll_fn(move || { let storage = storage.clone(); + let queue = queue.clone(); blocking(move || { storage - .dequeue_job(processor_count) + .dequeue_job(processor_count, &queue) .map_err(|e| error!("Error dequeuing job, {}", e)) }) .map_err(|e| error!("Error blocking, {}", e)) @@ -145,6 +147,7 @@ fn process_jobs( } ProcessorMessage::Time(_) => Either::B(Either::A(try_process_job( storage.clone(), + "default".to_owned(), processor_count, processors, tx.clone(),