mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-21 19:40:59 +00:00
Introduce queues to server
This commit is contained in:
parent
02406de28d
commit
0cfec96d4e
11 changed files with 808 additions and 226 deletions
|
@ -49,8 +49,8 @@ impl KvActor {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dequeue_jobs(&self, limit: usize) -> Result<Vec<JobInfo>, Error> {
|
||||
let jobs = self.storage.dequeue_job(limit)?;
|
||||
pub fn dequeue_jobs(&self, limit: usize, queue: &str) -> Result<Vec<JobInfo>, Error> {
|
||||
let jobs = self.storage.dequeue_job(limit, queue)?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ impl Handler<DequeueJobs> for KvActor {
|
|||
type Result = Result<Vec<JobInfo>, Error>;
|
||||
|
||||
fn handle(&mut self, msg: DequeueJobs, _: &mut Self::Context) -> Self::Result {
|
||||
self.dequeue_jobs(msg.0)
|
||||
self.dequeue_jobs(msg.0, &msg.1)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,7 @@ impl Message for StoreJob {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DequeueJobs(usize);
|
||||
pub struct DequeueJobs(usize, String);
|
||||
|
||||
impl Message for DequeueJobs {
|
||||
type Result = Result<Vec<JobInfo>, Error>;
|
||||
|
@ -149,7 +149,7 @@ impl ProcessorActor {
|
|||
wrap_future(
|
||||
actor
|
||||
.store
|
||||
.send(DequeueJobs(1))
|
||||
.send(DequeueJobs(1, "default".to_owned()))
|
||||
.then(coerce)
|
||||
.map_err(|e| error!("Error fetching jobs, {}", e))
|
||||
.and_then(|jobs| jobs.into_iter().next().ok_or(())),
|
||||
|
|
|
@ -11,6 +11,9 @@ pub struct JobInfo {
|
|||
/// Name of the processor that should handle this job
|
||||
processor: String,
|
||||
|
||||
/// Name of the queue that this job is a part of
|
||||
queue: String,
|
||||
|
||||
/// Arguments for a given job
|
||||
args: Value,
|
||||
|
||||
|
@ -26,13 +29,17 @@ pub struct JobInfo {
|
|||
/// How often retries should be scheduled
|
||||
backoff_strategy: Backoff,
|
||||
|
||||
/// The time this job was re-queued
|
||||
/// The time this job should be dequeued
|
||||
next_queue: Option<DateTime<Utc>>,
|
||||
|
||||
/// The time this job was last updated
|
||||
updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl JobInfo {
|
||||
pub(crate) fn new(
|
||||
processor: String,
|
||||
queue: String,
|
||||
args: Value,
|
||||
max_retries: MaxRetries,
|
||||
backoff_strategy: Backoff,
|
||||
|
@ -40,14 +47,21 @@ impl JobInfo {
|
|||
JobInfo {
|
||||
id: None,
|
||||
processor,
|
||||
queue,
|
||||
status: JobStatus::Pending,
|
||||
args,
|
||||
retry_count: 0,
|
||||
max_retries,
|
||||
next_queue: None,
|
||||
backoff_strategy,
|
||||
updated_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn updated(&mut self) {
|
||||
self.updated_at = Utc::now();
|
||||
}
|
||||
|
||||
pub(crate) fn processor(&self) -> &str {
|
||||
&self.processor
|
||||
}
|
||||
|
@ -89,6 +103,10 @@ impl JobInfo {
|
|||
self.next_queue = Some(next_queue);
|
||||
}
|
||||
|
||||
pub(crate) fn is_stale(&self) -> bool {
|
||||
self.updated_at < Utc::now() - OldDuration::days(1)
|
||||
}
|
||||
|
||||
pub(crate) fn is_ready(&self, now: DateTime<Utc>) -> bool {
|
||||
match self.next_queue {
|
||||
Some(ref time) => now > *time,
|
||||
|
@ -100,6 +118,10 @@ impl JobInfo {
|
|||
self.status == JobStatus::Failed
|
||||
}
|
||||
|
||||
pub(crate) fn is_in_queue(&self, queue: &str) -> bool {
|
||||
self.queue == queue
|
||||
}
|
||||
|
||||
pub(crate) fn pending(&mut self) {
|
||||
self.status = JobStatus::Pending;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,12 @@ pub trait Processor: Clone {
|
|||
/// This name must be unique!!! It is used to look up which processor should handle a job
|
||||
fn name() -> &'static str;
|
||||
|
||||
/// The name of the queue
|
||||
///
|
||||
/// The queue determines which workers should process which jobs. By default, all workers
|
||||
/// process all jobs, but that can be configured when starting the workers
|
||||
fn queue() -> &'static str;
|
||||
|
||||
/// Define the default number of retries for a given processor
|
||||
///
|
||||
/// Jobs can override
|
||||
|
@ -56,6 +62,10 @@ pub trait Processor: Clone {
|
|||
/// "IncrementProcessor"
|
||||
/// }
|
||||
///
|
||||
/// fn queue() -> &'static str {
|
||||
/// "default"
|
||||
/// }
|
||||
///
|
||||
/// fn max_retries() -> MaxRetries {
|
||||
/// MaxRetries::Count(1)
|
||||
/// }
|
||||
|
@ -87,6 +97,7 @@ pub trait Processor: Clone {
|
|||
) -> Result<JobInfo, Error> {
|
||||
let job = JobInfo::new(
|
||||
Self::name().to_owned(),
|
||||
Self::queue().to_owned(),
|
||||
serde_json::to_value(args)?,
|
||||
max_retries.unwrap_or(Self::max_retries()),
|
||||
backoff_strategy.unwrap_or(Self::backoff_strategy()),
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
path::PathBuf,
|
||||
str::Utf8Error,
|
||||
sync::{Arc, RwLock, RwLockWriteGuard},
|
||||
};
|
||||
|
||||
|
@ -85,7 +87,66 @@ impl Storage {
|
|||
Ok(new_id)
|
||||
}
|
||||
|
||||
pub fn dequeue_job(&self, limit: usize) -> Result<Vec<JobInfo>, Error> {
|
||||
pub fn check_stalled_jobs(&self) -> Result<(), Error> {
|
||||
let store = self.store.write()?;
|
||||
let job_bucket =
|
||||
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
|
||||
|
||||
let lock_bucket =
|
||||
store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::job_lock()))?;
|
||||
|
||||
let buckets = Buckets::new(&store)?;
|
||||
|
||||
let mut write_txn = store.write_txn()?;
|
||||
let read_txn = store.read_txn()?;
|
||||
|
||||
self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| {
|
||||
let mut cursor = read_txn.read_cursor(&buckets.running)?;
|
||||
match cursor.get(None, CursorOp::First) {
|
||||
Ok(_) => (),
|
||||
Err(e) => match e {
|
||||
Error::NotFound => {
|
||||
return Ok(());
|
||||
}
|
||||
e => {
|
||||
return Err(e);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>;
|
||||
|
||||
let _ = cursor.iter().fold(initial_value, |acc, (key, _)| {
|
||||
acc.and_then(|inner_txn| {
|
||||
let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
||||
|
||||
if job.is_stale() {
|
||||
if job.increment().should_requeue() {
|
||||
let job_value = Json::to_value_buf(job)?;
|
||||
inner_txn.set(&job_bucket, key, job_value)?;
|
||||
self.queue_job(&buckets, inner_txn, key)?;
|
||||
} else {
|
||||
job.fail();
|
||||
let job_value = Json::to_value_buf(job)?;
|
||||
inner_txn.set(&job_bucket, key, job_value)?;
|
||||
self.fail_job(&buckets, inner_txn, key)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(inner_txn)
|
||||
})
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
read_txn.commit()?;
|
||||
write_txn.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dequeue_job(&self, limit: usize, queue: &str) -> Result<Vec<JobInfo>, Error> {
|
||||
let store = self.store.write()?;
|
||||
|
||||
trace!("Got store");
|
||||
|
@ -103,8 +164,11 @@ impl Storage {
|
|||
let mut txn = store.write_txn()?;
|
||||
let read_txn = store.read_txn()?;
|
||||
|
||||
let result =
|
||||
self.with_lock::<_, Vec<JobInfo>>(&lock_bucket, &mut txn, b"job-queue", |inner_txn| {
|
||||
let result = self.with_lock::<_, Vec<JobInfo>, _>(
|
||||
&lock_bucket,
|
||||
&mut txn,
|
||||
b"job-queue",
|
||||
|inner_txn| {
|
||||
let mut cursor = read_txn.read_cursor(&buckets.queued)?;
|
||||
trace!("Got cursor");
|
||||
match cursor.get(None, CursorOp::First) {
|
||||
|
@ -127,15 +191,12 @@ impl Storage {
|
|||
let now = Utc::now();
|
||||
|
||||
trace!("Got lock");
|
||||
let (_inner_txn, vec) =
|
||||
cursor
|
||||
.iter()
|
||||
.fold(initial_value, |acc, (key, _)| match acc {
|
||||
Ok((inner_txn, mut jobs)) => {
|
||||
let (_inner_txn, vec) = cursor.iter().fold(initial_value, |acc, (key, _)| {
|
||||
acc.and_then(|(inner_txn, mut jobs)| {
|
||||
if jobs.len() < limit {
|
||||
let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
||||
|
||||
if job.is_ready(now) {
|
||||
if job.is_ready(now) && job.is_in_queue(queue) {
|
||||
self.run_job(&buckets, inner_txn, key)?;
|
||||
|
||||
jobs.push(job);
|
||||
|
@ -143,12 +204,12 @@ impl Storage {
|
|||
}
|
||||
|
||||
Ok((inner_txn, jobs))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
})
|
||||
})?;
|
||||
|
||||
Ok(vec)
|
||||
})?;
|
||||
},
|
||||
)?;
|
||||
|
||||
trace!("Committing");
|
||||
|
||||
|
@ -171,6 +232,8 @@ impl Storage {
|
|||
}
|
||||
};
|
||||
|
||||
job.updated();
|
||||
|
||||
trace!("Generaged job id, {}", job_id);
|
||||
|
||||
if job.is_failed() {
|
||||
|
@ -212,6 +275,81 @@ impl Storage {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_port_mapping(
|
||||
&self,
|
||||
base_port: usize,
|
||||
queues: BTreeSet<String>,
|
||||
) -> Result<BTreeMap<String, usize>, PortMapError> {
|
||||
let store = self.store.write().map_err(|e| Error::from(e))?;
|
||||
|
||||
let queue_port_bucket =
|
||||
store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::queue_port()))?;
|
||||
|
||||
let read_txn = store.read_txn()?;
|
||||
let mut write_txn = store.write_txn()?;
|
||||
|
||||
let lock_name = "lock-queue";
|
||||
|
||||
let queue_map = self.with_lock::<_, _, PortMapError>(
|
||||
&queue_port_bucket,
|
||||
&mut write_txn,
|
||||
lock_name.as_ref(),
|
||||
|write_txn| {
|
||||
let mut cursor = read_txn.read_cursor(&queue_port_bucket)?;
|
||||
|
||||
let (unused_queues, queue_map) = cursor.iter().fold(
|
||||
Ok((queues.clone(), BTreeMap::new())),
|
||||
|acc: Result<_, PortMapError>, (queue, port)| {
|
||||
acc.and_then(move |(mut queues, mut map)| {
|
||||
let port: usize = port.inner()?.to_serde();
|
||||
let queue = std::str::from_utf8(queue)?.to_owned();
|
||||
|
||||
if queue != lock_name {
|
||||
queues.remove(&queue);
|
||||
map.insert(queue, port);
|
||||
}
|
||||
|
||||
Ok((queues, map))
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
// The starting port for new queues should be one greater than the maximum port
|
||||
// number in the btree set, or 2 greater than the base port.
|
||||
//
|
||||
// This is because there need to be two admin ports before the queue ports begin.
|
||||
let start_port = queue_map
|
||||
.iter()
|
||||
.map(|(_, v)| *v)
|
||||
.filter(|v| *v != 0)
|
||||
.max()
|
||||
.unwrap_or(base_port + 1)
|
||||
+ 1;
|
||||
|
||||
let (_, queue_map, _) = unused_queues.into_iter().fold(
|
||||
Ok((write_txn, queue_map, start_port)),
|
||||
|acc: Result<_, PortMapError>, queue_name| {
|
||||
acc.and_then(|(write_txn, mut queue_map, port_num)| {
|
||||
let port = Json::to_value_buf(port_num)?;
|
||||
|
||||
write_txn.set(&queue_port_bucket, queue_name.as_ref(), port)?;
|
||||
queue_map.insert(queue_name, port_num);
|
||||
|
||||
Ok((write_txn, queue_map, port_num + 1))
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(queue_map)
|
||||
},
|
||||
)?;
|
||||
|
||||
read_txn.commit()?;
|
||||
write_txn.commit()?;
|
||||
|
||||
Ok(queue_map)
|
||||
}
|
||||
|
||||
fn queue_job<'env>(
|
||||
&self,
|
||||
buckets: &'env Buckets<'env>,
|
||||
|
@ -302,15 +440,16 @@ impl Storage {
|
|||
//
|
||||
// But in the event of multiple processes running on the same machine, it is good to have some
|
||||
// way to make sure they don't step on eachother's toes
|
||||
fn with_lock<'env, F, T>(
|
||||
fn with_lock<'env, F, T, E>(
|
||||
&self,
|
||||
lock_bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
||||
txn: &mut Txn<'env>,
|
||||
lock_key: &[u8],
|
||||
callback: F,
|
||||
) -> Result<T, Error>
|
||||
) -> Result<T, E>
|
||||
where
|
||||
F: Fn(&mut Txn<'env>) -> Result<T, Error>,
|
||||
F: Fn(&mut Txn<'env>) -> Result<T, E>,
|
||||
E: From<Error>,
|
||||
{
|
||||
let mut other_runner_id = 0;
|
||||
|
||||
|
@ -339,16 +478,16 @@ impl Storage {
|
|||
}
|
||||
Err(e) => match e {
|
||||
Error::NotFound => continue,
|
||||
e => return Err(e),
|
||||
e => return Err(e.into()),
|
||||
},
|
||||
}
|
||||
|
||||
match e {
|
||||
Error::LMDB(lmdb) => match lmdb {
|
||||
LmdbError::KeyExist => continue,
|
||||
e => return Err(Error::LMDB(e)),
|
||||
e => return Err(Error::LMDB(e).into()),
|
||||
},
|
||||
e => return Err(e),
|
||||
e => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +500,7 @@ impl Storage {
|
|||
Ok(item)
|
||||
}
|
||||
|
||||
fn buckets() -> [&'static str; 7] {
|
||||
fn buckets() -> [&'static str; 8] {
|
||||
[
|
||||
Storage::id_store(),
|
||||
Storage::job_store(),
|
||||
|
@ -370,6 +509,7 @@ impl Storage {
|
|||
Storage::job_running(),
|
||||
Storage::job_lock(),
|
||||
Storage::job_finished(),
|
||||
Storage::queue_port(),
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -400,4 +540,29 @@ impl Storage {
|
|||
fn job_lock() -> &'static str {
|
||||
"job-lock"
|
||||
}
|
||||
|
||||
fn queue_port() -> &'static str {
|
||||
"queue-port"
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum PortMapError {
|
||||
#[fail(display = "Error in KV, {}", _0)]
|
||||
Kv(#[cause] Error),
|
||||
|
||||
#[fail(display = "Error parsing to Utf8, {}", _0)]
|
||||
Utf8(#[cause] Utf8Error),
|
||||
}
|
||||
|
||||
impl From<Error> for PortMapError {
|
||||
fn from(e: Error) -> Self {
|
||||
PortMapError::Kv(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Utf8Error> for PortMapError {
|
||||
fn from(e: Utf8Error) -> Self {
|
||||
PortMapError::Utf8(e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,192 +0,0 @@
|
|||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{
|
||||
future::{lazy, poll_fn},
|
||||
stream::iter_ok,
|
||||
Future, Stream,
|
||||
};
|
||||
use jobs_core::{JobInfo, Storage};
|
||||
use tokio::timer::Interval;
|
||||
use tokio_threadpool::blocking;
|
||||
use tokio_zmq::{prelude::*, Multipart, Pull, Push};
|
||||
use zmq::{Context, Message};
|
||||
|
||||
use crate::coerce;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Config {
|
||||
ip: String,
|
||||
job_port: usize,
|
||||
queue_port: usize,
|
||||
runner_id: usize,
|
||||
db_path: PathBuf,
|
||||
context: Arc<Context>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn create_server(&self) -> Result<ServerConfig, Error> {
|
||||
let pusher = Push::builder(self.context.clone())
|
||||
.bind(&format!("tcp://{}:{}", self.ip, self.job_port))
|
||||
.build()?;
|
||||
|
||||
let puller = Pull::builder(self.context.clone())
|
||||
.bind(&format!("tcp://{}:{}", self.ip, self.queue_port))
|
||||
.build()?;
|
||||
|
||||
let storage = Storage::init(self.runner_id, self.db_path.clone())?;
|
||||
|
||||
let server = ServerConfig {
|
||||
pusher,
|
||||
puller,
|
||||
storage,
|
||||
config: self.clone(),
|
||||
};
|
||||
|
||||
Ok(server)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServerConfig {
|
||||
pusher: Push,
|
||||
puller: Pull,
|
||||
storage: Storage,
|
||||
// TODO: Recover from failure
|
||||
#[allow(dead_code)]
|
||||
config: Config,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn init<P: AsRef<Path>>(
|
||||
ip: &str,
|
||||
job_port: usize,
|
||||
queue_port: usize,
|
||||
runner_id: usize,
|
||||
db_path: P,
|
||||
) -> Result<Self, Error> {
|
||||
let context = Arc::new(Context::new());
|
||||
|
||||
Self::init_with_context(ip, job_port, queue_port, runner_id, db_path, context)
|
||||
}
|
||||
|
||||
pub fn init_with_context<P: AsRef<Path>>(
|
||||
ip: &str,
|
||||
job_port: usize,
|
||||
queue_port: usize,
|
||||
runner_id: usize,
|
||||
db_path: P,
|
||||
context: Arc<Context>,
|
||||
) -> Result<Self, Error> {
|
||||
let config = Config {
|
||||
ip: ip.to_owned(),
|
||||
job_port,
|
||||
queue_port,
|
||||
runner_id,
|
||||
db_path: db_path.as_ref().to_owned(),
|
||||
context,
|
||||
};
|
||||
|
||||
config.create_server()
|
||||
}
|
||||
|
||||
pub fn run(self) -> impl Future<Item = (), Error = ()> {
|
||||
lazy(|| {
|
||||
let ServerConfig {
|
||||
pusher,
|
||||
puller,
|
||||
storage,
|
||||
config: _,
|
||||
} = self;
|
||||
|
||||
let storage2 = storage.clone();
|
||||
|
||||
let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250))
|
||||
.from_err()
|
||||
.and_then(move |_| dequeue_jobs(storage.clone()))
|
||||
.flatten()
|
||||
.fold(pusher, move |pusher, multipart| {
|
||||
Box::new(push_job(pusher, multipart))
|
||||
});
|
||||
|
||||
tokio::spawn(
|
||||
fut.map(|_| ())
|
||||
.map_err(move |e| error!("Error in server, {}", e)),
|
||||
);
|
||||
|
||||
puller
|
||||
.stream()
|
||||
.from_err()
|
||||
.and_then(parse_job)
|
||||
.and_then(move |job| store_job(job, storage2.clone()))
|
||||
.or_else(|e| Ok(error!("Error storing job, {}", e)))
|
||||
.for_each(|_| Ok(()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn dequeue_jobs(
|
||||
storage: Storage,
|
||||
) -> impl Future<Item = impl Stream<Item = Multipart, Error = Error>, Error = Error> {
|
||||
poll_fn(move || {
|
||||
let storage = storage.clone();
|
||||
blocking(move || wrap_fetch_queue(storage))
|
||||
})
|
||||
.then(coerce)
|
||||
.map(|jobs| iter_ok(jobs))
|
||||
.or_else(|e| {
|
||||
error!("Error fetching jobs, {}", e);
|
||||
Ok(iter_ok(vec![]))
|
||||
})
|
||||
}
|
||||
|
||||
fn push_job(pusher: Push, message: Multipart) -> impl Future<Item = Push, Error = Error> {
|
||||
pusher.send(message).map_err(Error::from)
|
||||
}
|
||||
|
||||
fn store_job(job: JobInfo, storage: Storage) -> impl Future<Item = (), Error = Error> {
|
||||
let storage = storage.clone();
|
||||
|
||||
poll_fn(move || {
|
||||
let job = job.clone();
|
||||
let storage = storage.clone();
|
||||
|
||||
blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from)
|
||||
})
|
||||
.then(coerce)
|
||||
}
|
||||
|
||||
fn wrap_fetch_queue(storage: Storage) -> Result<Vec<Multipart>, Error> {
|
||||
let response = fetch_queue(storage)?;
|
||||
|
||||
let jobs = response
|
||||
.into_iter()
|
||||
.map(|job| {
|
||||
serde_json::to_string(&job)
|
||||
.map_err(Error::from)
|
||||
.and_then(|json| Message::from_slice(json.as_ref()).map_err(Error::from))
|
||||
.map(Multipart::from)
|
||||
})
|
||||
.collect::<Result<Vec<_>, Error>>()?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
fn fetch_queue(storage: Storage) -> Result<Vec<JobInfo>, Error> {
|
||||
storage.dequeue_job(100).map_err(Error::from)
|
||||
}
|
||||
|
||||
fn parse_job(mut multipart: Multipart) -> Result<JobInfo, Error> {
|
||||
let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
|
||||
|
||||
let parsed = serde_json::from_slice(&unparsed_msg)?;
|
||||
|
||||
Ok(parsed)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Fail)]
|
||||
#[fail(display = "Message was empty")]
|
||||
pub struct EmptyMessage;
|
129
jobs-server-tokio/src/server/mod.rs
Normal file
129
jobs-server-tokio/src/server/mod.rs
Normal file
|
@ -0,0 +1,129 @@
|
|||
use std::{
|
||||
collections::BTreeSet,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{future::poll_fn, Future};
|
||||
use jobs_core::Storage;
|
||||
use tokio_threadpool::blocking;
|
||||
use zmq::Context;
|
||||
|
||||
use crate::coerce;
|
||||
|
||||
mod portmap;
|
||||
mod pull;
|
||||
mod push;
|
||||
mod stalled;
|
||||
|
||||
use self::{portmap::PortMapConfig, pull::PullConfig, push::PushConfig, stalled::StalledConfig};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Config {
|
||||
ip: String,
|
||||
base_port: usize,
|
||||
runner_id: usize,
|
||||
queues: BTreeSet<String>,
|
||||
db_path: PathBuf,
|
||||
context: Arc<Context>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn create_server(&self) -> impl Future<Item = (), Error = ()> {
|
||||
let runner_id = self.runner_id;
|
||||
let db_path = self.db_path.clone();
|
||||
let base_port = self.base_port;
|
||||
let queues = self.queues.clone();
|
||||
|
||||
let config = Arc::new(self.clone());
|
||||
|
||||
poll_fn(move || {
|
||||
let runner_id = runner_id;
|
||||
let db_path = db_path.clone();
|
||||
let base_port = base_port;
|
||||
let queues = queues.clone();
|
||||
|
||||
blocking(move || {
|
||||
let storage = Arc::new(Storage::init(runner_id, db_path)?);
|
||||
storage.check_stalled_jobs()?;
|
||||
let port_map = storage.get_port_mapping(base_port, queues)?;
|
||||
|
||||
Ok((storage, port_map))
|
||||
})
|
||||
})
|
||||
.from_err::<Error>()
|
||||
.then(coerce)
|
||||
.and_then(|(storage, port_map)| {
|
||||
for queue in config.queues.iter() {
|
||||
let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?;
|
||||
|
||||
let address = format!("tcp://{}:{}", config.ip, port);
|
||||
|
||||
tokio::spawn(PushConfig::init(
|
||||
address,
|
||||
queue.to_owned(),
|
||||
storage.clone(),
|
||||
config.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
StalledConfig::init(storage.clone());
|
||||
|
||||
let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1);
|
||||
|
||||
tokio::spawn(PortMapConfig::init(
|
||||
portmap_address,
|
||||
port_map,
|
||||
config.clone(),
|
||||
));
|
||||
|
||||
let pull_address = format!("tcp://{}:{}", config.ip, config.base_port);
|
||||
|
||||
tokio::spawn(PullConfig::init(pull_address, storage, config));
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| error!("Error starting server, {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Fail)]
|
||||
#[fail(display = "Queue is missing from map, {}", _0)]
|
||||
struct MissingQueue(String);
|
||||
|
||||
pub struct ServerConfig;
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn init<P: AsRef<Path>>(
|
||||
ip: &str,
|
||||
base_port: usize,
|
||||
runner_id: usize,
|
||||
queues: BTreeSet<String>,
|
||||
db_path: P,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let context = Arc::new(Context::new());
|
||||
|
||||
Self::init_with_context(ip, base_port, runner_id, queues, db_path, context)
|
||||
}
|
||||
|
||||
pub fn init_with_context<P: AsRef<Path>>(
|
||||
ip: &str,
|
||||
base_port: usize,
|
||||
runner_id: usize,
|
||||
queues: BTreeSet<String>,
|
||||
db_path: P,
|
||||
context: Arc<Context>,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let config = Config {
|
||||
ip: ip.to_owned(),
|
||||
base_port,
|
||||
runner_id,
|
||||
queues,
|
||||
db_path: db_path.as_ref().to_owned(),
|
||||
context,
|
||||
};
|
||||
|
||||
config.create_server()
|
||||
}
|
||||
}
|
105
jobs-server-tokio/src/server/portmap.rs
Normal file
105
jobs-server-tokio/src/server/portmap.rs
Normal file
|
@ -0,0 +1,105 @@
|
|||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{future::lazy, stream::iter_ok, Future, Stream};
|
||||
use tokio::timer::Delay;
|
||||
use tokio_zmq::{prelude::*, Multipart, Push};
|
||||
use zmq::Message;
|
||||
|
||||
use crate::server::Config;
|
||||
|
||||
pub(crate) struct PortMapConfig {
|
||||
pusher: Push,
|
||||
address: String,
|
||||
port_map: BTreeMap<String, usize>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl PortMapConfig {
|
||||
pub(crate) fn init(
|
||||
address: String,
|
||||
port_map: BTreeMap<String, usize>,
|
||||
config: Arc<Config>,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let cfg = ResetPortMapConfig {
|
||||
address,
|
||||
port_map,
|
||||
config,
|
||||
};
|
||||
|
||||
cfg.build()
|
||||
.map_err(|e| error!("Error starting pusher, {}", e))
|
||||
}
|
||||
|
||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||
let reset = self.reset();
|
||||
|
||||
let PortMapConfig {
|
||||
pusher,
|
||||
address: _,
|
||||
port_map,
|
||||
config: _,
|
||||
} = self;
|
||||
|
||||
let fut = iter_ok::<_, Error>(0..)
|
||||
.and_then(move |count| {
|
||||
trace!("Pushed {} portmaps", count);
|
||||
|
||||
let s = serde_json::to_string(&port_map)?;
|
||||
let m = Message::from_slice(s.as_ref())?;
|
||||
|
||||
Ok(Multipart::from(m))
|
||||
})
|
||||
.forward(pusher.sink())
|
||||
.map(move |_| info!("portmap pusher shutting down"))
|
||||
.map_err(|e| {
|
||||
error!("Error pushing portmap, {}", e);
|
||||
|
||||
tokio::spawn(reset.rebuild());
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
}
|
||||
|
||||
fn reset(&self) -> ResetPortMapConfig {
|
||||
ResetPortMapConfig {
|
||||
address: self.address.clone(),
|
||||
port_map: self.port_map.clone(),
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ResetPortMapConfig {
|
||||
address: String,
|
||||
port_map: BTreeMap<String, usize>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl ResetPortMapConfig {
|
||||
fn rebuild(self) -> impl Future<Item = (), Error = ()> {
|
||||
Delay::new(tokio::clock::now() + Duration::from_secs(5))
|
||||
.from_err()
|
||||
.and_then(move |_| self.build())
|
||||
.map_err(|e| error!("Error restarting pusher, {}", e))
|
||||
}
|
||||
|
||||
fn build(self) -> impl Future<Item = (), Error = Error> {
|
||||
lazy(|| {
|
||||
let pusher = Push::builder(self.config.context.clone())
|
||||
.bind(&self.address)
|
||||
.build()?;
|
||||
|
||||
let config = PortMapConfig {
|
||||
pusher,
|
||||
address: self.address,
|
||||
port_map: self.port_map,
|
||||
config: self.config,
|
||||
};
|
||||
|
||||
tokio::spawn(config.run());
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
125
jobs-server-tokio/src/server/pull.rs
Normal file
125
jobs-server-tokio/src/server/pull.rs
Normal file
|
@ -0,0 +1,125 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{
|
||||
future::{lazy, poll_fn},
|
||||
Future, Stream,
|
||||
};
|
||||
use jobs_core::{JobInfo, Storage};
|
||||
use tokio::timer::Delay;
|
||||
use tokio_threadpool::blocking;
|
||||
use tokio_zmq::{prelude::*, Multipart, Pull};
|
||||
|
||||
use crate::server::{coerce, Config};
|
||||
|
||||
pub(crate) struct PullConfig {
|
||||
puller: Pull,
|
||||
address: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl PullConfig {
|
||||
pub(crate) fn init(
|
||||
address: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let cfg = ResetPullConfig {
|
||||
address,
|
||||
storage,
|
||||
config,
|
||||
};
|
||||
|
||||
cfg.build()
|
||||
.map_err(|e| error!("Error starting puller, {}", e))
|
||||
}
|
||||
|
||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||
let config = self.reset();
|
||||
|
||||
let storage = self.storage.clone();
|
||||
|
||||
let fut = self
|
||||
.puller
|
||||
.stream()
|
||||
.from_err()
|
||||
.and_then(parse_job)
|
||||
.and_then(move |job| store_job(job, storage.clone()))
|
||||
.for_each(|_| Ok(()))
|
||||
.map(|_| info!("Puller is shutting down"))
|
||||
.map_err(|e| {
|
||||
error!("Error storing job, {}", e);
|
||||
|
||||
tokio::spawn(config.rebuild());
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
}
|
||||
|
||||
fn reset(&self) -> ResetPullConfig {
|
||||
ResetPullConfig {
|
||||
address: self.address.clone(),
|
||||
storage: self.storage.clone(),
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Fail)]
|
||||
#[fail(display = "Message was empty")]
|
||||
pub struct EmptyMessage;
|
||||
|
||||
fn parse_job(mut multipart: Multipart) -> Result<JobInfo, Error> {
|
||||
let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
|
||||
|
||||
let parsed = serde_json::from_slice(&unparsed_msg)?;
|
||||
|
||||
Ok(parsed)
|
||||
}
|
||||
|
||||
fn store_job(job: JobInfo, storage: Arc<Storage>) -> impl Future<Item = (), Error = Error> {
|
||||
let storage = storage.clone();
|
||||
|
||||
poll_fn(move || {
|
||||
let job = job.clone();
|
||||
let storage = storage.clone();
|
||||
|
||||
blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from)
|
||||
})
|
||||
.then(coerce)
|
||||
}
|
||||
|
||||
struct ResetPullConfig {
|
||||
address: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl ResetPullConfig {
|
||||
fn rebuild(self) -> impl Future<Item = (), Error = ()> {
|
||||
Delay::new(tokio::clock::now() + Duration::from_secs(5))
|
||||
.from_err()
|
||||
.and_then(move |_| self.build())
|
||||
.map_err(|e| error!("Error restarting puller, {}", e))
|
||||
}
|
||||
|
||||
fn build(self) -> impl Future<Item = (), Error = Error> {
|
||||
lazy(|| {
|
||||
let puller = Pull::builder(self.config.context.clone())
|
||||
.bind(&self.address)
|
||||
.build()?;
|
||||
|
||||
let config = PullConfig {
|
||||
puller,
|
||||
address: self.address,
|
||||
storage: self.storage,
|
||||
config: self.config,
|
||||
};
|
||||
|
||||
tokio::spawn(config.run());
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
157
jobs-server-tokio/src/server/push.rs
Normal file
157
jobs-server-tokio/src/server/push.rs
Normal file
|
@ -0,0 +1,157 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{
|
||||
future::{lazy, poll_fn},
|
||||
stream::iter_ok,
|
||||
Future, Stream,
|
||||
};
|
||||
use jobs_core::{JobInfo, Storage};
|
||||
use tokio::timer::{Delay, Interval};
|
||||
use tokio_threadpool::blocking;
|
||||
use tokio_zmq::{prelude::*, Multipart, Push};
|
||||
use zmq::Message;
|
||||
|
||||
use crate::server::{coerce, Config};
|
||||
|
||||
pub(crate) struct PushConfig {
|
||||
pusher: Push,
|
||||
address: String,
|
||||
queue: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl PushConfig {
|
||||
pub(crate) fn init(
|
||||
address: String,
|
||||
queue: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let cfg = ResetPushConfig {
|
||||
address,
|
||||
queue,
|
||||
storage,
|
||||
config,
|
||||
};
|
||||
|
||||
cfg.build()
|
||||
.map_err(|e| error!("Error starting pusher, {}", e))
|
||||
}
|
||||
|
||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||
let reset = self.reset();
|
||||
|
||||
let PushConfig {
|
||||
address: _,
|
||||
pusher,
|
||||
queue,
|
||||
storage,
|
||||
config: _,
|
||||
} = self;
|
||||
|
||||
let queue_2_electric_boogaloo = queue.clone();
|
||||
|
||||
let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250))
|
||||
.from_err()
|
||||
.and_then(move |_| dequeue_jobs(storage.clone(), queue.clone()))
|
||||
.flatten()
|
||||
.forward(pusher.sink())
|
||||
.map(move |_| {
|
||||
info!(
|
||||
"Pusher for queue {} is shutting down",
|
||||
queue_2_electric_boogaloo
|
||||
)
|
||||
})
|
||||
.map_err(|e| {
|
||||
error!("Error dequeuing job, {}", e);
|
||||
|
||||
tokio::spawn(reset.rebuild());
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
}
|
||||
|
||||
fn reset(&self) -> ResetPushConfig {
|
||||
ResetPushConfig {
|
||||
address: self.address.clone(),
|
||||
queue: self.queue.clone(),
|
||||
storage: self.storage.clone(),
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dequeue_jobs(
|
||||
storage: Arc<Storage>,
|
||||
queue: String,
|
||||
) -> impl Future<Item = impl Stream<Item = Multipart, Error = Error>, Error = Error> {
|
||||
poll_fn(move || {
|
||||
let storage = storage.clone();
|
||||
let queue = queue.clone();
|
||||
blocking(move || wrap_fetch_queue(storage, &queue))
|
||||
})
|
||||
.then(coerce)
|
||||
.map(|jobs| iter_ok(jobs))
|
||||
.or_else(|e| {
|
||||
error!("Error fetching jobs, {}", e);
|
||||
Ok(iter_ok(vec![]))
|
||||
})
|
||||
}
|
||||
|
||||
fn wrap_fetch_queue(storage: Arc<Storage>, queue: &str) -> Result<Vec<Multipart>, Error> {
|
||||
let response = fetch_queue(storage, queue)?;
|
||||
|
||||
let jobs = response
|
||||
.into_iter()
|
||||
.map(|job| {
|
||||
serde_json::to_string(&job)
|
||||
.map_err(Error::from)
|
||||
.and_then(|json| Message::from_slice(json.as_ref()).map_err(Error::from))
|
||||
.map(Multipart::from)
|
||||
})
|
||||
.collect::<Result<Vec<_>, Error>>()?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
fn fetch_queue(storage: Arc<Storage>, queue: &str) -> Result<Vec<JobInfo>, Error> {
|
||||
storage.dequeue_job(100, queue).map_err(Error::from)
|
||||
}
|
||||
|
||||
struct ResetPushConfig {
|
||||
address: String,
|
||||
queue: String,
|
||||
storage: Arc<Storage>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl ResetPushConfig {
|
||||
fn rebuild(self) -> impl Future<Item = (), Error = ()> {
|
||||
Delay::new(tokio::clock::now() + Duration::from_secs(5))
|
||||
.from_err()
|
||||
.and_then(move |_| self.build())
|
||||
.map_err(|e| error!("Error restarting pusher, {}", e))
|
||||
}
|
||||
|
||||
fn build(self) -> impl Future<Item = (), Error = Error> {
|
||||
lazy(|| {
|
||||
let pusher = Push::builder(self.config.context.clone())
|
||||
.bind(&self.address)
|
||||
.build()?;
|
||||
|
||||
let config = PushConfig {
|
||||
pusher,
|
||||
address: self.address,
|
||||
queue: self.queue,
|
||||
storage: self.storage,
|
||||
config: self.config,
|
||||
};
|
||||
|
||||
tokio::spawn(config.run());
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
57
jobs-server-tokio/src/server/stalled.rs
Normal file
57
jobs-server-tokio/src/server/stalled.rs
Normal file
|
@ -0,0 +1,57 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use failure::Error;
|
||||
use futures::{future::poll_fn, Future, Stream};
|
||||
use jobs_core::Storage;
|
||||
use tokio::timer::{Delay, Interval};
|
||||
use tokio_threadpool::blocking;
|
||||
|
||||
use crate::server::coerce;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct StalledConfig {
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl StalledConfig {
|
||||
pub(crate) fn init(storage: Arc<Storage>) {
|
||||
let cfg = StalledConfig { storage };
|
||||
|
||||
tokio::spawn(cfg.run());
|
||||
}
|
||||
|
||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||
let reset = self.clone();
|
||||
|
||||
let StalledConfig { storage } = self;
|
||||
|
||||
let fut = Interval::new(tokio::clock::now(), Duration::from_secs(60 * 30))
|
||||
.from_err::<Error>()
|
||||
.and_then(move |_| {
|
||||
let storage = storage.clone();
|
||||
poll_fn(move || {
|
||||
let storage = storage.clone();
|
||||
blocking(move || storage.check_stalled_jobs().map_err(Error::from))
|
||||
})
|
||||
.from_err()
|
||||
})
|
||||
.then(coerce)
|
||||
.for_each(|_| Ok(()))
|
||||
.map(|_| info!("Stalled Job Checker is shutting down"))
|
||||
.map_err(|e| {
|
||||
error!("Error checking stalled jobs, {}", e);
|
||||
|
||||
tokio::spawn(reset.rebuild());
|
||||
});
|
||||
|
||||
Box::new(fut)
|
||||
}
|
||||
|
||||
fn rebuild(self) -> impl Future<Item = (), Error = ()> {
|
||||
Delay::new(tokio::clock::now() + Duration::from_secs(5))
|
||||
.from_err::<Error>()
|
||||
.map(move |_| tokio::spawn(self.run()))
|
||||
.map(|_| ())
|
||||
.map_err(|e| error!("Error restarting stalled job checker, {}", e))
|
||||
}
|
||||
}
|
|
@ -74,6 +74,7 @@ fn return_job(
|
|||
|
||||
fn try_process_job(
|
||||
storage: Storage,
|
||||
queue: String,
|
||||
processor_count: usize,
|
||||
processors: Processors,
|
||||
tx: Sender<ProcessorMessage>,
|
||||
|
@ -81,10 +82,11 @@ fn try_process_job(
|
|||
if processor_count > 0 {
|
||||
let fut = poll_fn(move || {
|
||||
let storage = storage.clone();
|
||||
let queue = queue.clone();
|
||||
|
||||
blocking(move || {
|
||||
storage
|
||||
.dequeue_job(processor_count)
|
||||
.dequeue_job(processor_count, &queue)
|
||||
.map_err(|e| error!("Error dequeuing job, {}", e))
|
||||
})
|
||||
.map_err(|e| error!("Error blocking, {}", e))
|
||||
|
@ -145,6 +147,7 @@ fn process_jobs(
|
|||
}
|
||||
ProcessorMessage::Time(_) => Either::B(Either::A(try_process_job(
|
||||
storage.clone(),
|
||||
"default".to_owned(),
|
||||
processor_count,
|
||||
processors,
|
||||
tx.clone(),
|
||||
|
|
Loading…
Reference in a new issue