mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
Add jobs-actix
This commit is contained in:
parent
6e79341b38
commit
d266315f1f
20 changed files with 881 additions and 226 deletions
12
Cargo.toml
12
Cargo.toml
|
@ -11,6 +11,7 @@ edition = "2018"
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
|
"jobs-actix",
|
||||||
"jobs-core",
|
"jobs-core",
|
||||||
"jobs-server",
|
"jobs-server",
|
||||||
"examples/server-jobs-example",
|
"examples/server-jobs-example",
|
||||||
|
@ -18,12 +19,19 @@ members = [
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["background-jobs-server", "background-jobs-server/tokio-zmq"]
|
default = ["background-jobs-server", "background-jobs-server/tokio-zmq"]
|
||||||
|
no_unix = ["background-jobs-server", "background-jobs-server/futures-zmq"]
|
||||||
|
actix = ["background-jobs-actix"]
|
||||||
|
|
||||||
[dependencies.background-jobs-core]
|
[dependencies.background-jobs-core]
|
||||||
version = "0.3"
|
version = "0.4"
|
||||||
path = "jobs-core"
|
path = "jobs-core"
|
||||||
|
|
||||||
[dependencies.background-jobs-server]
|
[dependencies.background-jobs-server]
|
||||||
version = "0.3"
|
version = "0.4"
|
||||||
path = "jobs-server"
|
path = "jobs-server"
|
||||||
optional = true
|
optional = true
|
||||||
|
|
||||||
|
[dependencies.background-jobs-actix]
|
||||||
|
version = "0.4"
|
||||||
|
path = "jobs-actix"
|
||||||
|
optional = true
|
||||||
|
|
|
@ -17,5 +17,3 @@ tokio = "0.1"
|
||||||
[dependencies.background-jobs]
|
[dependencies.background-jobs]
|
||||||
version = "0.3"
|
version = "0.3"
|
||||||
path = "../.."
|
path = "../.."
|
||||||
default-features = false
|
|
||||||
features = ["background-jobs-server"]
|
|
||||||
|
|
|
@ -26,9 +26,9 @@ fn main() -> Result<(), Error> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
tokio::run(ServerConfig::init(
|
tokio::run(ServerConfig::init(
|
||||||
|
1,
|
||||||
"127.0.0.1",
|
"127.0.0.1",
|
||||||
5555,
|
5555,
|
||||||
1,
|
|
||||||
queue_set(),
|
queue_set(),
|
||||||
"example-db",
|
"example-db",
|
||||||
));
|
));
|
||||||
|
|
16
jobs-actix/Cargo.toml
Normal file
16
jobs-actix/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
[package]
|
||||||
|
name = "background-jobs-actix"
|
||||||
|
version = "0.4.0"
|
||||||
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
actix = "0.7"
|
||||||
|
background-jobs-core = { version = "0.4", path = "../jobs-core" }
|
||||||
|
chrono = "0.4"
|
||||||
|
failure = "0.1"
|
||||||
|
futures = "0.1"
|
||||||
|
log = "0.4"
|
||||||
|
serde = "1.0"
|
||||||
|
serde_derive = "1.0"
|
||||||
|
serde_json = "1.0"
|
113
jobs-actix/src/lib.rs
Normal file
113
jobs-actix/src/lib.rs
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
|
use actix::{Actor, Addr, SyncArbiter};
|
||||||
|
use background_jobs_core::{Processor, ProcessorMap, Storage};
|
||||||
|
use failure::Error;
|
||||||
|
|
||||||
|
mod pinger;
|
||||||
|
mod server;
|
||||||
|
mod worker;
|
||||||
|
pub use self::{server::Server, worker::LocalWorker};
|
||||||
|
|
||||||
|
use self::{
|
||||||
|
pinger::Pinger,
|
||||||
|
server::{CheckDb, EitherJob, RequestJob},
|
||||||
|
worker::ProcessJob,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct ServerConfig {
|
||||||
|
server_id: usize,
|
||||||
|
db_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServerConfig {
|
||||||
|
pub fn new(server_id: usize, db_path: PathBuf) -> Self {
|
||||||
|
ServerConfig { server_id, db_path }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start<S>(self) -> QueueHandle<S>
|
||||||
|
where
|
||||||
|
S: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let ServerConfig { server_id, db_path } = self;
|
||||||
|
|
||||||
|
let server = SyncArbiter::start(1, move || {
|
||||||
|
Server::new(server_id, Storage::init(db_path.clone()).unwrap())
|
||||||
|
});
|
||||||
|
|
||||||
|
Pinger::new(server.clone()).start();
|
||||||
|
|
||||||
|
QueueHandle { inner: server }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WorkerConfig<S>
|
||||||
|
where
|
||||||
|
S: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
processors: ProcessorMap<S>,
|
||||||
|
queues: BTreeMap<String, usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> WorkerConfig<S>
|
||||||
|
where
|
||||||
|
S: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
pub fn new(state: S) -> Self {
|
||||||
|
WorkerConfig {
|
||||||
|
processors: ProcessorMap::new(state),
|
||||||
|
queues: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register<P>(&mut self, processor: P)
|
||||||
|
where
|
||||||
|
P: Processor<S> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
self.queues.insert(P::QUEUE.to_owned(), 4);
|
||||||
|
self.processors.register_processor(processor);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_processor_count(&mut self, queue: &str, count: usize) {
|
||||||
|
self.queues.insert(queue.to_owned(), count);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(self, queue_handle: QueueHandle<S>) {
|
||||||
|
let processors = Arc::new(self.processors);
|
||||||
|
|
||||||
|
self.queues.into_iter().fold(0, |acc, (key, count)| {
|
||||||
|
(0..count).for_each(|i| {
|
||||||
|
LocalWorker::new(
|
||||||
|
acc + i + 1000,
|
||||||
|
key.clone(),
|
||||||
|
processors.clone(),
|
||||||
|
queue_handle.inner.clone(),
|
||||||
|
)
|
||||||
|
.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
acc + count
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct QueueHandle<S>
|
||||||
|
where
|
||||||
|
S: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
inner: Addr<Server<LocalWorker<S>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> QueueHandle<S>
|
||||||
|
where
|
||||||
|
S: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
pub fn queue<P>(&self, job: P::Job) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
P: Processor<S>,
|
||||||
|
{
|
||||||
|
self.inner.do_send(EitherJob::New(P::new_job(job)?));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
35
jobs-actix/src/pinger.rs
Normal file
35
jobs-actix/src/pinger.rs
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext};
|
||||||
|
|
||||||
|
use crate::{CheckDb, ProcessJob, Server};
|
||||||
|
|
||||||
|
pub struct Pinger<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
server: Addr<Server<W>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Pinger<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
pub fn new(server: Addr<Server<W>>) -> Self {
|
||||||
|
Pinger { server }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Actor for Pinger<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
Server<W>: Actor<Context = SyncContext<Server<W>>> + Handler<CheckDb>,
|
||||||
|
{
|
||||||
|
type Context = Context<Self>;
|
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
ctx.run_interval(Duration::from_secs(1), |actor, _| {
|
||||||
|
actor.server.do_send(CheckDb);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
249
jobs-actix/src/server.rs
Normal file
249
jobs-actix/src/server.rs
Normal file
|
@ -0,0 +1,249 @@
|
||||||
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
|
||||||
|
use actix::{Actor, Addr, Context, Handler, Message, SyncContext};
|
||||||
|
use background_jobs_core::{JobInfo, NewJobInfo, Storage};
|
||||||
|
use failure::Error;
|
||||||
|
use log::{debug, trace};
|
||||||
|
use serde_derive::Deserialize;
|
||||||
|
|
||||||
|
use crate::ProcessJob;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub enum EitherJob {
|
||||||
|
New(NewJobInfo),
|
||||||
|
Existing(JobInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message for EitherJob {
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RequestJob<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
worker_id: usize,
|
||||||
|
queue: String,
|
||||||
|
addr: Addr<W>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> RequestJob<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
pub fn new(worker_id: usize, queue: &str, addr: Addr<W>) -> Self {
|
||||||
|
RequestJob {
|
||||||
|
worker_id,
|
||||||
|
queue: queue.to_owned(),
|
||||||
|
addr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Message for RequestJob<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CheckDb;
|
||||||
|
|
||||||
|
impl Message for CheckDb {
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Cache<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
workers: VecDeque<RequestJob<W>>,
|
||||||
|
jobs: VecDeque<JobInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Cache<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
fn new() -> Self {
|
||||||
|
Cache {
|
||||||
|
workers: VecDeque::new(),
|
||||||
|
jobs: VecDeque::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Server<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
server_id: usize,
|
||||||
|
storage: Storage,
|
||||||
|
cache: HashMap<String, Cache<W>>,
|
||||||
|
cache_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Server<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
pub fn new(server_id: usize, storage: Storage) -> Self {
|
||||||
|
Server {
|
||||||
|
server_id,
|
||||||
|
storage,
|
||||||
|
cache: HashMap::new(),
|
||||||
|
cache_size: 25,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_cache_size(&mut self, cache_size: usize) {
|
||||||
|
self.cache_size = cache_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn populate(&mut self, queue: &str) -> Result<bool, Error> {
|
||||||
|
trace!("Populating queue {}", queue);
|
||||||
|
let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new());
|
||||||
|
|
||||||
|
if entry.jobs.is_empty() {
|
||||||
|
let new_jobs = self
|
||||||
|
.storage
|
||||||
|
.stage_jobs(self.cache_size, queue, self.server_id)?;
|
||||||
|
let empty = new_jobs.is_empty();
|
||||||
|
|
||||||
|
debug!("Retrieved {} jobs from storage", new_jobs.len());
|
||||||
|
trace!("{:?}", new_jobs.iter().map(|j| j.id()).collect::<Vec<_>>());
|
||||||
|
|
||||||
|
new_jobs
|
||||||
|
.into_iter()
|
||||||
|
.for_each(|job| entry.jobs.push_back(job));
|
||||||
|
Ok(!empty)
|
||||||
|
} else {
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Actor for Server<W>
|
||||||
|
where
|
||||||
|
W: Actor + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
type Context = SyncContext<Self>;
|
||||||
|
|
||||||
|
fn started(&mut self, _: &mut Self::Context) {
|
||||||
|
self.storage.requeue_staged_jobs(self.server_id).unwrap();
|
||||||
|
self.storage.check_stalled_jobs(self.server_id).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Handler<EitherJob> for Server<W>
|
||||||
|
where
|
||||||
|
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
|
||||||
|
fn handle(&mut self, msg: EitherJob, _: &mut Self::Context) -> Self::Result {
|
||||||
|
let mut job = match msg {
|
||||||
|
EitherJob::New(new_job) => {
|
||||||
|
let job = self.storage.assign_id(new_job, self.server_id)?;
|
||||||
|
debug!("Created job {}, {:?}", job.id(), job);
|
||||||
|
job
|
||||||
|
}
|
||||||
|
EitherJob::Existing(job) => job,
|
||||||
|
};
|
||||||
|
|
||||||
|
let retry_now = job.is_pending() || (job.needs_retry() && job.retry_ready());
|
||||||
|
|
||||||
|
if job.is_pending() && !retry_now {
|
||||||
|
trace!("Storing job {} for later processing", job.id());
|
||||||
|
}
|
||||||
|
self.storage.store_job(job.clone(), self.server_id)?;
|
||||||
|
|
||||||
|
if retry_now {
|
||||||
|
let entry = self
|
||||||
|
.cache
|
||||||
|
.entry(job.queue().to_owned())
|
||||||
|
.or_insert(Cache::new());
|
||||||
|
|
||||||
|
if let Some(worker) = entry.workers.pop_front() {
|
||||||
|
debug!("Retrying job {} on worker {}", job.id(), worker.worker_id);
|
||||||
|
worker.addr.do_send(ProcessJob::new(job.clone()));
|
||||||
|
job.set_running();
|
||||||
|
self.storage.store_job(job, worker.worker_id)?;
|
||||||
|
} else if entry.jobs.len() < self.cache_size {
|
||||||
|
entry.jobs.push_back(job);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Handler<RequestJob<W>> for Server<W>
|
||||||
|
where
|
||||||
|
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
|
||||||
|
fn handle(&mut self, msg: RequestJob<W>, _: &mut Self::Context) -> Self::Result {
|
||||||
|
trace!("Worker {} requested job", msg.worker_id);
|
||||||
|
self.populate(&msg.queue)?;
|
||||||
|
|
||||||
|
let job = self
|
||||||
|
.cache
|
||||||
|
.get_mut(&msg.queue)
|
||||||
|
.and_then(|cache| cache.jobs.pop_front());
|
||||||
|
|
||||||
|
if let Some(mut job) = job {
|
||||||
|
msg.addr.do_send(ProcessJob::new(job.clone()));
|
||||||
|
job.set_running();
|
||||||
|
self.storage.store_job(job, msg.worker_id)?;
|
||||||
|
} else {
|
||||||
|
trace!("storing worker {} for queue {}", msg.worker_id, msg.queue);
|
||||||
|
let entry = self.cache.entry(msg.queue.clone()).or_insert(Cache::new());
|
||||||
|
entry.workers.push_back(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W> Handler<CheckDb> for Server<W>
|
||||||
|
where
|
||||||
|
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||||
|
{
|
||||||
|
type Result = Result<(), Error>;
|
||||||
|
|
||||||
|
fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result {
|
||||||
|
trace!("Checkdb");
|
||||||
|
let queues: Vec<String> = self.cache.keys().cloned().collect();
|
||||||
|
|
||||||
|
let mut todo = Vec::new();
|
||||||
|
|
||||||
|
for queue in queues {
|
||||||
|
if self.populate(&queue)? {
|
||||||
|
debug!("Cached jobs for {}", queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new());
|
||||||
|
|
||||||
|
let min_len = entry.jobs.len().min(entry.workers.len());
|
||||||
|
|
||||||
|
entry
|
||||||
|
.jobs
|
||||||
|
.drain(..min_len)
|
||||||
|
.zip(entry.workers.drain(..min_len))
|
||||||
|
.for_each(|pair| {
|
||||||
|
todo.push(pair);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (mut job, worker) in todo {
|
||||||
|
debug!("Sending job {} to worker {}", job.id(), worker.worker_id);
|
||||||
|
worker.addr.do_send(ProcessJob::new(job.clone()));
|
||||||
|
job.set_running();
|
||||||
|
self.storage.store_job(job, worker.worker_id)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
85
jobs-actix/src/worker.rs
Normal file
85
jobs-actix/src/worker.rs
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use actix::{
|
||||||
|
fut::{wrap_future, ActorFuture},
|
||||||
|
Actor, Addr, AsyncContext, Context, Handler, Message,
|
||||||
|
};
|
||||||
|
use background_jobs_core::{JobInfo, ProcessorMap};
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
use crate::{EitherJob, RequestJob, Server};
|
||||||
|
|
||||||
|
pub struct ProcessJob {
|
||||||
|
job: JobInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessJob {
|
||||||
|
pub fn new(job: JobInfo) -> Self {
|
||||||
|
ProcessJob { job }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message for ProcessJob {
|
||||||
|
type Result = ();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalWorker<State>
|
||||||
|
where
|
||||||
|
State: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
id: usize,
|
||||||
|
queue: String,
|
||||||
|
processors: Arc<ProcessorMap<State>>,
|
||||||
|
server: Addr<Server<LocalWorker<State>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<State> LocalWorker<State>
|
||||||
|
where
|
||||||
|
State: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
id: usize,
|
||||||
|
queue: String,
|
||||||
|
processors: Arc<ProcessorMap<State>>,
|
||||||
|
server: Addr<Server<Self>>,
|
||||||
|
) -> Self {
|
||||||
|
LocalWorker {
|
||||||
|
id,
|
||||||
|
queue,
|
||||||
|
processors,
|
||||||
|
server,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<State> Actor for LocalWorker<State>
|
||||||
|
where
|
||||||
|
State: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
type Context = Context<Self>;
|
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
self.server
|
||||||
|
.do_send(RequestJob::new(self.id, &self.queue, ctx.address()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<State> Handler<ProcessJob> for LocalWorker<State>
|
||||||
|
where
|
||||||
|
State: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
type Result = ();
|
||||||
|
|
||||||
|
fn handle(&mut self, msg: ProcessJob, ctx: &mut Self::Context) -> Self::Result {
|
||||||
|
info!("Worker {} processing job {}", self.id, msg.job.id());
|
||||||
|
let fut =
|
||||||
|
wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| {
|
||||||
|
actor.server.do_send(EitherJob::Existing(job));
|
||||||
|
actor
|
||||||
|
.server
|
||||||
|
.do_send(RequestJob::new(actor.id, &actor.queue, ctx.address()));
|
||||||
|
});
|
||||||
|
|
||||||
|
ctx.spawn(fut);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "background-jobs-core"
|
name = "background-jobs-core"
|
||||||
description = "Core types for implementing an asynchronous jobs processor on tokio"
|
description = "Core types for implementing an asynchronous jobs processor on tokio"
|
||||||
version = "0.3.2"
|
version = "0.4.0"
|
||||||
license = "GPL-3.0"
|
license = "GPL-3.0"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||||
|
|
|
@ -18,10 +18,71 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use chrono::{offset::Utc, DateTime, Duration as OldDuration};
|
use chrono::{offset::Utc, DateTime, Duration as OldDuration};
|
||||||
|
use log::trace;
|
||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
|
use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
||||||
|
pub struct NewJobInfo {
|
||||||
|
/// Name of the processor that should handle this job
|
||||||
|
processor: String,
|
||||||
|
|
||||||
|
/// Name of the queue that this job is a part of
|
||||||
|
queue: String,
|
||||||
|
|
||||||
|
/// Arguments for a given job
|
||||||
|
args: Value,
|
||||||
|
|
||||||
|
/// the initial MaxRetries value, for comparing to the current retry count
|
||||||
|
max_retries: MaxRetries,
|
||||||
|
|
||||||
|
/// How often retries should be scheduled
|
||||||
|
backoff_strategy: Backoff,
|
||||||
|
|
||||||
|
/// The time this job should be dequeued
|
||||||
|
next_queue: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NewJobInfo {
|
||||||
|
pub(crate) fn schedule(&mut self, time: DateTime<Utc>) {
|
||||||
|
self.next_queue = Some(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn new(
|
||||||
|
processor: String,
|
||||||
|
queue: String,
|
||||||
|
args: Value,
|
||||||
|
max_retries: MaxRetries,
|
||||||
|
backoff_strategy: Backoff,
|
||||||
|
) -> Self {
|
||||||
|
NewJobInfo {
|
||||||
|
processor,
|
||||||
|
queue,
|
||||||
|
args,
|
||||||
|
max_retries,
|
||||||
|
next_queue: None,
|
||||||
|
backoff_strategy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn with_id(self, id: usize) -> JobInfo {
|
||||||
|
JobInfo {
|
||||||
|
id,
|
||||||
|
processor: self.processor,
|
||||||
|
queue: self.queue,
|
||||||
|
status: JobStatus::Pending,
|
||||||
|
args: self.args,
|
||||||
|
retry_count: 0,
|
||||||
|
max_retries: self.max_retries,
|
||||||
|
next_queue: self.next_queue,
|
||||||
|
backoff_strategy: self.backoff_strategy,
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
||||||
/// Metadata pertaining to a job that exists within the background_jobs system
|
/// Metadata pertaining to a job that exists within the background_jobs system
|
||||||
///
|
///
|
||||||
|
@ -30,8 +91,8 @@ use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
|
||||||
/// [Processor](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html)'s
|
/// [Processor](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html)'s
|
||||||
/// new_job method.
|
/// new_job method.
|
||||||
pub struct JobInfo {
|
pub struct JobInfo {
|
||||||
/// ID of the job, None means an ID has not been set
|
/// ID of the job
|
||||||
id: Option<usize>,
|
id: usize,
|
||||||
|
|
||||||
/// Name of the processor that should handle this job
|
/// Name of the processor that should handle this job
|
||||||
processor: String,
|
processor: String,
|
||||||
|
@ -62,25 +123,8 @@ pub struct JobInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobInfo {
|
impl JobInfo {
|
||||||
pub(crate) fn new(
|
pub fn queue(&self) -> &str {
|
||||||
processor: String,
|
&self.queue
|
||||||
queue: String,
|
|
||||||
args: Value,
|
|
||||||
max_retries: MaxRetries,
|
|
||||||
backoff_strategy: Backoff,
|
|
||||||
) -> Self {
|
|
||||||
JobInfo {
|
|
||||||
id: None,
|
|
||||||
processor,
|
|
||||||
queue,
|
|
||||||
status: JobStatus::Pending,
|
|
||||||
args,
|
|
||||||
retry_count: 0,
|
|
||||||
max_retries,
|
|
||||||
next_queue: None,
|
|
||||||
backoff_strategy,
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn updated(&mut self) {
|
pub(crate) fn updated(&mut self) {
|
||||||
|
@ -99,14 +143,8 @@ impl JobInfo {
|
||||||
self.status.clone()
|
self.status.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn id(&self) -> Option<usize> {
|
pub fn id(&self) -> usize {
|
||||||
self.id.clone()
|
self.id
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn set_id(&mut self, id: usize) {
|
|
||||||
if self.id.is_none() {
|
|
||||||
self.id = Some(id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn increment(&mut self) -> ShouldStop {
|
pub(crate) fn increment(&mut self) -> ShouldStop {
|
||||||
|
@ -126,10 +164,13 @@ impl JobInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
self.next_queue = Some(next_queue);
|
self.next_queue = Some(next_queue);
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn schedule(&mut self, time: DateTime<Utc>) {
|
trace!(
|
||||||
self.next_queue = Some(time);
|
"Now {}, Next queue {}, ready {}",
|
||||||
|
now,
|
||||||
|
next_queue,
|
||||||
|
self.is_ready(now),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_stale(&self) -> bool {
|
pub(crate) fn is_stale(&self) -> bool {
|
||||||
|
@ -147,6 +188,25 @@ impl JobInfo {
|
||||||
self.status == JobStatus::Failed
|
self.status == JobStatus::Failed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn needs_retry(&mut self) -> bool {
|
||||||
|
let should_retry = self.is_failed() && self.increment().should_requeue();
|
||||||
|
|
||||||
|
if should_retry {
|
||||||
|
self.pending();
|
||||||
|
self.next_queue();
|
||||||
|
}
|
||||||
|
|
||||||
|
should_retry
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn retry_ready(&self) -> bool {
|
||||||
|
self.is_ready(Utc::now())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_pending(&self) -> bool {
|
||||||
|
self.status == JobStatus::Pending
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn is_in_queue(&self, queue: &str) -> bool {
|
pub(crate) fn is_in_queue(&self, queue: &str) -> bool {
|
||||||
self.queue == queue
|
self.queue == queue
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,8 @@
|
||||||
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#[macro_use]
|
use failure::{Error, Fail};
|
||||||
extern crate failure;
|
use serde_derive::{Deserialize, Serialize};
|
||||||
#[macro_use]
|
|
||||||
extern crate log;
|
|
||||||
#[macro_use]
|
|
||||||
extern crate serde_derive;
|
|
||||||
|
|
||||||
use failure::Error;
|
|
||||||
|
|
||||||
mod job;
|
mod job;
|
||||||
mod job_info;
|
mod job_info;
|
||||||
|
@ -33,7 +27,10 @@ mod processor_map;
|
||||||
mod storage;
|
mod storage;
|
||||||
|
|
||||||
pub use crate::{
|
pub use crate::{
|
||||||
job::Job, job_info::JobInfo, processor::Processor, processor_map::ProcessorMap,
|
job::Job,
|
||||||
|
job_info::{JobInfo, NewJobInfo},
|
||||||
|
processor::Processor,
|
||||||
|
processor_map::ProcessorMap,
|
||||||
storage::Storage,
|
storage::Storage,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -18,14 +18,14 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use chrono::{offset::Utc, DateTime};
|
use chrono::{offset::Utc, DateTime};
|
||||||
use failure::Error;
|
use failure::{Error, Fail};
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{Either, IntoFuture},
|
future::{Either, IntoFuture},
|
||||||
Future,
|
Future,
|
||||||
};
|
};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
|
use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
|
||||||
|
|
||||||
/// ## The Processor trait
|
/// ## The Processor trait
|
||||||
///
|
///
|
||||||
|
@ -85,7 +85,7 @@ pub trait Processor<S = ()>: Clone
|
||||||
where
|
where
|
||||||
S: Clone + Send + Sync + 'static,
|
S: Clone + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
type Job: Job<S>;
|
type Job: Job<S> + 'static;
|
||||||
|
|
||||||
/// The name of the processor
|
/// The name of the processor
|
||||||
///
|
///
|
||||||
|
@ -112,15 +112,15 @@ where
|
||||||
///
|
///
|
||||||
/// This is required for spawning jobs, since it enforces the relationship between the job and
|
/// This is required for spawning jobs, since it enforces the relationship between the job and
|
||||||
/// the Processor that should handle it.
|
/// the Processor that should handle it.
|
||||||
fn new_job(job: Self::Job) -> Result<JobInfo, Error> {
|
fn new_job(job: Self::Job) -> Result<NewJobInfo, Error> {
|
||||||
let queue = job.queue().unwrap_or(Self::QUEUE).to_owned();
|
let queue = job.queue().unwrap_or(Self::QUEUE).to_owned();
|
||||||
let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES);
|
let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES);
|
||||||
let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY);
|
let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY);
|
||||||
|
|
||||||
let job = JobInfo::new(
|
let job = NewJobInfo::new(
|
||||||
Self::NAME.to_owned(),
|
Self::NAME.to_owned(),
|
||||||
queue,
|
queue,
|
||||||
serde_json::to_value(job)?,
|
serde_json::to_value(job).map_err(|_| ToJson)?,
|
||||||
max_retries,
|
max_retries,
|
||||||
backoff_strategy,
|
backoff_strategy,
|
||||||
);
|
);
|
||||||
|
@ -129,7 +129,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a JobInfo to schedule a job to be performed after a certain time
|
/// Create a JobInfo to schedule a job to be performed after a certain time
|
||||||
fn new_scheduled_job(job: Self::Job, after: DateTime<Utc>) -> Result<JobInfo, Error> {
|
fn new_scheduled_job(job: Self::Job, after: DateTime<Utc>) -> Result<NewJobInfo, Error> {
|
||||||
let mut job = Self::new_job(job)?;
|
let mut job = Self::new_job(job)?;
|
||||||
job.schedule(after);
|
job.schedule(after);
|
||||||
|
|
||||||
|
@ -187,3 +187,7 @@ where
|
||||||
Box::new(fut)
|
Box::new(fut)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Fail)]
|
||||||
|
#[fail(display = "Failed to to turn job into value")]
|
||||||
|
pub struct ToJson;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use futures::future::{Either, Future, IntoFuture};
|
use futures::future::{Either, Future, IntoFuture};
|
||||||
|
use log::{error, info};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{JobError, JobInfo, Processor};
|
use crate::{JobError, JobInfo, Processor};
|
||||||
|
@ -107,12 +108,12 @@ fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobIn
|
||||||
|
|
||||||
process_fn(args).then(move |res| match res {
|
process_fn(args).then(move |res| match res {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("Job completed, {}", processor);
|
info!("Job {} completed, {}", job.id(), processor);
|
||||||
job.pass();
|
job.pass();
|
||||||
Ok(job)
|
Ok(job)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Job errored, {}, {}", processor, e);
|
error!("Job {} errored, {}, {}", job.id(), processor, e);
|
||||||
job.fail();
|
job.fail();
|
||||||
Ok(job)
|
Ok(job)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,17 +18,19 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet, HashMap},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
str::Utf8Error,
|
str::Utf8Error,
|
||||||
sync::{Arc, RwLock, RwLockWriteGuard},
|
sync::{Arc, RwLock, RwLockWriteGuard},
|
||||||
};
|
};
|
||||||
|
|
||||||
use chrono::offset::Utc;
|
use chrono::offset::Utc;
|
||||||
|
use failure::Fail;
|
||||||
use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf};
|
use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf};
|
||||||
use lmdb::Error as LmdbError;
|
use lmdb::Error as LmdbError;
|
||||||
|
use log::{info, trace};
|
||||||
|
|
||||||
use crate::{JobInfo, JobStatus};
|
use crate::{JobInfo, JobStatus, NewJobInfo};
|
||||||
|
|
||||||
struct Buckets<'a> {
|
struct Buckets<'a> {
|
||||||
queued: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
|
queued: Bucket<'a, &'a [u8], ValueBuf<Json<usize>>>,
|
||||||
|
@ -61,16 +63,15 @@ impl<'a> Buckets<'a> {
|
||||||
/// None of the methods in this module are intended to be used outside of a background-jobs
|
/// None of the methods in this module are intended to be used outside of a background-jobs
|
||||||
/// runtime.
|
/// runtime.
|
||||||
pub struct Storage {
|
pub struct Storage {
|
||||||
runner_id: usize,
|
|
||||||
store: Arc<RwLock<Store>>,
|
store: Arc<RwLock<Store>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage {
|
impl Storage {
|
||||||
pub fn new(runner_id: usize, store: Arc<RwLock<Store>>) -> Self {
|
pub fn new(store: Arc<RwLock<Store>>) -> Self {
|
||||||
Storage { runner_id, store }
|
Storage { store }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init(runner_id: usize, path: PathBuf) -> Result<Self, Error> {
|
pub fn init(path: PathBuf) -> Result<Self, Error> {
|
||||||
let mut manager = Manager::new();
|
let mut manager = Manager::new();
|
||||||
let mut cfg = Config::default(path);
|
let mut cfg = Config::default(path);
|
||||||
|
|
||||||
|
@ -83,17 +84,17 @@ impl Storage {
|
||||||
|
|
||||||
let handle = manager.open(cfg)?;
|
let handle = manager.open(cfg)?;
|
||||||
|
|
||||||
Ok(Storage::new(runner_id, handle))
|
Ok(Storage::new(handle))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_new_id(&self) -> Result<usize, Error> {
|
pub fn get_new_id(&self, runner_id: usize) -> Result<usize, Error> {
|
||||||
let store = self.store.write()?;
|
let store = self.store.write()?;
|
||||||
|
|
||||||
let bucket = store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::id_store()))?;
|
let bucket = store.bucket::<&[u8], ValueBuf<Json<usize>>>(Some(Storage::id_store()))?;
|
||||||
|
|
||||||
let mut txn = store.write_txn()?;
|
let mut txn = store.write_txn()?;
|
||||||
|
|
||||||
let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", |txn| {
|
let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", runner_id, |txn| {
|
||||||
let id = match txn.get(&bucket, b"current-id") {
|
let id = match txn.get(&bucket, b"current-id") {
|
||||||
Ok(id) => id.inner()?.to_serde(),
|
Ok(id) => id.inner()?.to_serde(),
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
|
@ -115,7 +116,7 @@ impl Storage {
|
||||||
Ok(new_id)
|
Ok(new_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn requeue_staged_jobs(&self) -> Result<(), Error> {
|
pub fn requeue_staged_jobs(&self, runner_id: usize) -> Result<(), Error> {
|
||||||
let store = self.store.write()?;
|
let store = self.store.write()?;
|
||||||
let job_bucket =
|
let job_bucket =
|
||||||
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
|
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
|
||||||
|
@ -128,7 +129,12 @@ impl Storage {
|
||||||
let mut write_txn = store.write_txn()?;
|
let mut write_txn = store.write_txn()?;
|
||||||
let read_txn = store.read_txn()?;
|
let read_txn = store.read_txn()?;
|
||||||
|
|
||||||
self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| {
|
self.with_lock::<_, (), _>(
|
||||||
|
&lock_bucket,
|
||||||
|
&mut write_txn,
|
||||||
|
b"job-queue",
|
||||||
|
runner_id,
|
||||||
|
|inner_txn| {
|
||||||
let mut cursor = read_txn.read_cursor(&buckets.staged)?;
|
let mut cursor = read_txn.read_cursor(&buckets.staged)?;
|
||||||
match cursor.get(None, CursorOp::First) {
|
match cursor.get(None, CursorOp::First) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -150,14 +156,15 @@ impl Storage {
|
||||||
|
|
||||||
let job_value = Json::to_value_buf(job)?;
|
let job_value = Json::to_value_buf(job)?;
|
||||||
inner_txn.set(&job_bucket, key, job_value)?;
|
inner_txn.set(&job_bucket, key, job_value)?;
|
||||||
self.queue_job(&buckets, inner_txn, key)?;
|
self.queue_job(&buckets, inner_txn, key, runner_id)?;
|
||||||
|
|
||||||
Ok(inner_txn)
|
Ok(inner_txn)
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
read_txn.commit()?;
|
read_txn.commit()?;
|
||||||
write_txn.commit()?;
|
write_txn.commit()?;
|
||||||
|
@ -165,7 +172,7 @@ impl Storage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_stalled_jobs(&self) -> Result<(), Error> {
|
pub fn check_stalled_jobs(&self, runner_id: usize) -> Result<(), Error> {
|
||||||
let store = self.store.write()?;
|
let store = self.store.write()?;
|
||||||
let job_bucket =
|
let job_bucket =
|
||||||
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
|
store.bucket::<&[u8], ValueBuf<Json<JobInfo>>>(Some(Storage::job_store()))?;
|
||||||
|
@ -178,7 +185,12 @@ impl Storage {
|
||||||
let mut write_txn = store.write_txn()?;
|
let mut write_txn = store.write_txn()?;
|
||||||
let read_txn = store.read_txn()?;
|
let read_txn = store.read_txn()?;
|
||||||
|
|
||||||
self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| {
|
self.with_lock::<_, (), _>(
|
||||||
|
&lock_bucket,
|
||||||
|
&mut write_txn,
|
||||||
|
b"job-queue",
|
||||||
|
runner_id,
|
||||||
|
|inner_txn| {
|
||||||
let mut cursor = read_txn.read_cursor(&buckets.running)?;
|
let mut cursor = read_txn.read_cursor(&buckets.running)?;
|
||||||
match cursor.get(None, CursorOp::First) {
|
match cursor.get(None, CursorOp::First) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -202,12 +214,12 @@ impl Storage {
|
||||||
if job.increment().should_requeue() {
|
if job.increment().should_requeue() {
|
||||||
let job_value = Json::to_value_buf(job)?;
|
let job_value = Json::to_value_buf(job)?;
|
||||||
inner_txn.set(&job_bucket, key, job_value)?;
|
inner_txn.set(&job_bucket, key, job_value)?;
|
||||||
self.queue_job(&buckets, inner_txn, key)?;
|
self.queue_job(&buckets, inner_txn, key, runner_id)?;
|
||||||
} else {
|
} else {
|
||||||
job.fail();
|
job.fail();
|
||||||
let job_value = Json::to_value_buf(job)?;
|
let job_value = Json::to_value_buf(job)?;
|
||||||
inner_txn.set(&job_bucket, key, job_value)?;
|
inner_txn.set(&job_bucket, key, job_value)?;
|
||||||
self.fail_job(&buckets, inner_txn, key)?;
|
self.fail_job(&buckets, inner_txn, key, runner_id)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +228,8 @@ impl Storage {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
read_txn.commit()?;
|
read_txn.commit()?;
|
||||||
write_txn.commit()?;
|
write_txn.commit()?;
|
||||||
|
@ -224,7 +237,12 @@ impl Storage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stage_jobs(&self, limit: usize, queue: &str) -> Result<Vec<JobInfo>, Error> {
|
pub fn stage_jobs(
|
||||||
|
&self,
|
||||||
|
limit: usize,
|
||||||
|
queue: &str,
|
||||||
|
runner_id: usize,
|
||||||
|
) -> Result<Vec<JobInfo>, Error> {
|
||||||
let store = self.store.write()?;
|
let store = self.store.write()?;
|
||||||
|
|
||||||
trace!("Got store");
|
trace!("Got store");
|
||||||
|
@ -246,11 +264,34 @@ impl Storage {
|
||||||
&lock_bucket,
|
&lock_bucket,
|
||||||
&mut txn,
|
&mut txn,
|
||||||
b"job-queue",
|
b"job-queue",
|
||||||
|
runner_id,
|
||||||
|inner_txn| {
|
|inner_txn| {
|
||||||
|
trace!("Got lock");
|
||||||
|
|
||||||
|
let mut jobs = HashMap::new();
|
||||||
let mut cursor = read_txn.read_cursor(&buckets.queued)?;
|
let mut cursor = read_txn.read_cursor(&buckets.queued)?;
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
trace!("Got cursor");
|
trace!("Got cursor");
|
||||||
match cursor.get(None, CursorOp::First) {
|
match cursor.get(None, CursorOp::First) {
|
||||||
Ok(_) => (),
|
Ok((maybe_key, _)) => {
|
||||||
|
if let Some(key) = maybe_key {
|
||||||
|
match inner_txn.get(&job_bucket, &key) {
|
||||||
|
Ok(job) => {
|
||||||
|
let mut job = job.inner()?.to_serde();
|
||||||
|
if job.is_ready(now) && job.is_in_queue(queue) {
|
||||||
|
job.stage();
|
||||||
|
self.stage_job(&buckets, inner_txn, key, runner_id)?;
|
||||||
|
jobs.insert(job.id(), job);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
Error::NotFound => (),
|
||||||
|
err => return Err(err),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
Error::NotFound => {
|
Error::NotFound => {
|
||||||
trace!("No items in queue");
|
trace!("No items in queue");
|
||||||
|
@ -264,22 +305,18 @@ impl Storage {
|
||||||
trace!("Set cursor to first");
|
trace!("Set cursor to first");
|
||||||
|
|
||||||
let initial_value =
|
let initial_value =
|
||||||
Ok((inner_txn, Vec::new())) as Result<(&mut Txn, Vec<JobInfo>), Error>;
|
Ok((inner_txn, jobs)) as Result<(&mut Txn, HashMap<usize, JobInfo>), Error>;
|
||||||
|
|
||||||
let now = Utc::now();
|
let (_inner_txn, mut hm) = cursor.iter().fold(initial_value, |acc, (key, _)| {
|
||||||
|
|
||||||
trace!("Got lock");
|
|
||||||
let (_inner_txn, vec) = cursor.iter().fold(initial_value, |acc, (key, _)| {
|
|
||||||
acc.and_then(|(inner_txn, mut jobs)| {
|
acc.and_then(|(inner_txn, mut jobs)| {
|
||||||
if jobs.len() < limit {
|
if jobs.len() < limit {
|
||||||
let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde();
|
||||||
|
|
||||||
job.stage();
|
|
||||||
|
|
||||||
if job.is_ready(now) && job.is_in_queue(queue) {
|
if job.is_ready(now) && job.is_in_queue(queue) {
|
||||||
self.stage_job(&buckets, inner_txn, key)?;
|
job.stage();
|
||||||
|
self.stage_job(&buckets, inner_txn, key, runner_id)?;
|
||||||
|
|
||||||
jobs.push(job);
|
jobs.insert(job.id(), job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,7 +324,7 @@ impl Storage {
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(vec)
|
Ok(hm.drain().map(|(_, v)| v).collect())
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -302,26 +339,18 @@ impl Storage {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> {
|
pub fn assign_id(&self, job: NewJobInfo, runner_id: usize) -> Result<JobInfo, Error> {
|
||||||
let job_id = match job.id() {
|
let id = self.get_new_id(runner_id)?;
|
||||||
Some(id) => id.to_string(),
|
let job = job.with_id(id);
|
||||||
None => {
|
trace!("Generaged job id, {}", job.id());
|
||||||
let id = self.get_new_id()?;
|
Ok(job)
|
||||||
job.set_id(id);
|
|
||||||
id.to_string()
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
|
pub fn store_job(&self, mut job: JobInfo, runner_id: usize) -> Result<(), Error> {
|
||||||
|
let job_id = job.id().to_string();
|
||||||
job.updated();
|
job.updated();
|
||||||
|
|
||||||
trace!("Generaged job id, {}", job_id);
|
job.needs_retry();
|
||||||
|
|
||||||
if job.is_failed() {
|
|
||||||
if job.increment().should_requeue() {
|
|
||||||
job.pending();
|
|
||||||
job.next_queue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let status = job.status();
|
let status = job.status();
|
||||||
let job_value = Json::to_value_buf(job)?;
|
let job_value = Json::to_value_buf(job)?;
|
||||||
|
@ -341,11 +370,13 @@ impl Storage {
|
||||||
trace!("Set value");
|
trace!("Set value");
|
||||||
|
|
||||||
match status {
|
match status {
|
||||||
JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref())?,
|
JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?,
|
||||||
JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref())?,
|
JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?,
|
||||||
JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref())?,
|
JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?,
|
||||||
JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref())?,
|
JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?,
|
||||||
JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id.as_ref())?,
|
JobStatus::Finished => {
|
||||||
|
self.finish_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Committing");
|
trace!("Committing");
|
||||||
|
@ -360,6 +391,7 @@ impl Storage {
|
||||||
&self,
|
&self,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
queues: BTreeSet<String>,
|
queues: BTreeSet<String>,
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<BTreeMap<String, usize>, PortMapError> {
|
) -> Result<BTreeMap<String, usize>, PortMapError> {
|
||||||
let store = self.store.write().map_err(|e| Error::from(e))?;
|
let store = self.store.write().map_err(|e| Error::from(e))?;
|
||||||
|
|
||||||
|
@ -375,6 +407,7 @@ impl Storage {
|
||||||
&queue_port_bucket,
|
&queue_port_bucket,
|
||||||
&mut write_txn,
|
&mut write_txn,
|
||||||
lock_name.as_ref(),
|
lock_name.as_ref(),
|
||||||
|
runner_id,
|
||||||
|write_txn| {
|
|write_txn| {
|
||||||
let mut cursor = read_txn.read_cursor(&queue_port_bucket)?;
|
let mut cursor = read_txn.read_cursor(&queue_port_bucket)?;
|
||||||
|
|
||||||
|
@ -436,8 +469,9 @@ impl Storage {
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.staged, txn, id)?;
|
self.add_job_to(&buckets.staged, txn, id, runner_id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
self.delete_job_from(&buckets.failed, txn, id)?;
|
self.delete_job_from(&buckets.failed, txn, id)?;
|
||||||
self.delete_job_from(&buckets.running, txn, id)?;
|
self.delete_job_from(&buckets.running, txn, id)?;
|
||||||
|
@ -451,8 +485,9 @@ impl Storage {
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.queued, txn, id)?;
|
self.add_job_to(&buckets.queued, txn, id, runner_id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
self.delete_job_from(&buckets.failed, txn, id)?;
|
self.delete_job_from(&buckets.failed, txn, id)?;
|
||||||
self.delete_job_from(&buckets.running, txn, id)?;
|
self.delete_job_from(&buckets.running, txn, id)?;
|
||||||
|
@ -466,8 +501,9 @@ impl Storage {
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.failed, txn, id)?;
|
self.add_job_to(&buckets.failed, txn, id, runner_id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
self.delete_job_from(&buckets.running, txn, id)?;
|
self.delete_job_from(&buckets.running, txn, id)?;
|
||||||
self.delete_job_from(&buckets.staged, txn, id)?;
|
self.delete_job_from(&buckets.staged, txn, id)?;
|
||||||
|
@ -481,8 +517,9 @@ impl Storage {
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.running, txn, id)?;
|
self.add_job_to(&buckets.running, txn, id, runner_id)?;
|
||||||
self.delete_job_from(&buckets.staged, txn, id)?;
|
self.delete_job_from(&buckets.staged, txn, id)?;
|
||||||
self.delete_job_from(&buckets.finished, txn, id)?;
|
self.delete_job_from(&buckets.finished, txn, id)?;
|
||||||
self.delete_job_from(&buckets.failed, txn, id)?;
|
self.delete_job_from(&buckets.failed, txn, id)?;
|
||||||
|
@ -496,8 +533,9 @@ impl Storage {
|
||||||
buckets: &'env Buckets<'env>,
|
buckets: &'env Buckets<'env>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
self.add_job_to(&buckets.finished, txn, id)?;
|
self.add_job_to(&buckets.finished, txn, id, runner_id)?;
|
||||||
self.delete_job_from(&buckets.running, txn, id)?;
|
self.delete_job_from(&buckets.running, txn, id)?;
|
||||||
self.delete_job_from(&buckets.staged, txn, id)?;
|
self.delete_job_from(&buckets.staged, txn, id)?;
|
||||||
self.delete_job_from(&buckets.failed, txn, id)?;
|
self.delete_job_from(&buckets.failed, txn, id)?;
|
||||||
|
@ -511,8 +549,9 @@ impl Storage {
|
||||||
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
id: &[u8],
|
id: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
txn.set(bucket, id, Json::to_value_buf(self.runner_id)?)?;
|
txn.set(bucket, id, Json::to_value_buf(runner_id)?)?;
|
||||||
trace!("Set value");
|
trace!("Set value");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -545,16 +584,17 @@ impl Storage {
|
||||||
lock_bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
lock_bucket: &'env Bucket<&[u8], ValueBuf<Json<usize>>>,
|
||||||
txn: &mut Txn<'env>,
|
txn: &mut Txn<'env>,
|
||||||
lock_key: &[u8],
|
lock_key: &[u8],
|
||||||
|
runner_id: usize,
|
||||||
callback: F,
|
callback: F,
|
||||||
) -> Result<T, E>
|
) -> Result<T, E>
|
||||||
where
|
where
|
||||||
F: Fn(&mut Txn<'env>) -> Result<T, E>,
|
F: Fn(&mut Txn<'env>) -> Result<T, E>,
|
||||||
E: From<Error>,
|
E: From<Error>,
|
||||||
{
|
{
|
||||||
let mut other_runner_id = 0;
|
let mut other_runner_id = 10;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let lock_value = Json::to_value_buf(self.runner_id)?;
|
let lock_value = Json::to_value_buf(runner_id)?;
|
||||||
|
|
||||||
let mut inner_txn = txn.txn()?;
|
let mut inner_txn = txn.txn()?;
|
||||||
let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value);
|
let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "background-jobs-server"
|
name = "background-jobs-server"
|
||||||
description = "Jobs processor server based on ZeroMQ"
|
description = "Jobs processor server based on ZeroMQ"
|
||||||
version = "0.3.1"
|
version = "0.4.0"
|
||||||
license = "GPL-3.0"
|
license = "GPL-3.0"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||||
|
@ -20,10 +20,10 @@ tokio-threadpool = "0.1"
|
||||||
zmq = "0.8"
|
zmq = "0.8"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["tokio-zmq"]
|
default = []
|
||||||
|
|
||||||
[dependencies.background-jobs-core]
|
[dependencies.background-jobs-core]
|
||||||
version = "0.3"
|
version = "0.4"
|
||||||
path = "../jobs-core"
|
path = "../jobs-core"
|
||||||
|
|
||||||
[dependencies.tokio-zmq]
|
[dependencies.tokio-zmq]
|
||||||
|
|
|
@ -41,9 +41,9 @@ use self::{portmap::PortMapConfig, pull::PullConfig, push::PushConfig, stalled::
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct Config {
|
pub(crate) struct Config {
|
||||||
|
server_id: usize,
|
||||||
ip: String,
|
ip: String,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
runner_id: usize,
|
|
||||||
queues: BTreeSet<String>,
|
queues: BTreeSet<String>,
|
||||||
db_path: PathBuf,
|
db_path: PathBuf,
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
|
@ -51,31 +51,30 @@ pub(crate) struct Config {
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
fn create_server(&self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
fn create_server(&self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let runner_id = self.runner_id;
|
|
||||||
let db_path = self.db_path.clone();
|
let db_path = self.db_path.clone();
|
||||||
let base_port = self.base_port;
|
let base_port = self.base_port;
|
||||||
let queues = self.queues.clone();
|
let queues = self.queues.clone();
|
||||||
|
let server_id = self.server_id;
|
||||||
|
|
||||||
let config = Arc::new(self.clone());
|
let config = Arc::new(self.clone());
|
||||||
|
|
||||||
let fut = poll_fn(move || {
|
let fut = poll_fn(move || {
|
||||||
let runner_id = runner_id;
|
|
||||||
let db_path = db_path.clone();
|
let db_path = db_path.clone();
|
||||||
let base_port = base_port;
|
let base_port = base_port;
|
||||||
let queues = queues.clone();
|
let queues = queues.clone();
|
||||||
|
|
||||||
blocking(move || {
|
blocking(move || {
|
||||||
let storage = Arc::new(Storage::init(runner_id, db_path)?);
|
let storage = Arc::new(Storage::init(db_path)?);
|
||||||
storage.requeue_staged_jobs()?;
|
storage.requeue_staged_jobs(server_id)?;
|
||||||
storage.check_stalled_jobs()?;
|
storage.check_stalled_jobs(server_id)?;
|
||||||
let port_map = storage.get_port_mapping(base_port, queues)?;
|
let port_map = storage.get_port_mapping(base_port, queues, server_id)?;
|
||||||
|
|
||||||
Ok((storage, port_map))
|
Ok((storage, port_map))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.from_err::<Error>()
|
.from_err::<Error>()
|
||||||
.then(coerce)
|
.then(coerce)
|
||||||
.and_then(|(storage, port_map)| {
|
.and_then(move |(storage, port_map)| {
|
||||||
for queue in config.queues.iter() {
|
for queue in config.queues.iter() {
|
||||||
let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?;
|
let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?;
|
||||||
|
|
||||||
|
@ -84,6 +83,7 @@ impl Config {
|
||||||
info!("Creating queue {} on address {}", queue, address);
|
info!("Creating queue {} on address {}", queue, address);
|
||||||
|
|
||||||
tokio::spawn(PushConfig::init(
|
tokio::spawn(PushConfig::init(
|
||||||
|
server_id,
|
||||||
address,
|
address,
|
||||||
queue.to_owned(),
|
queue.to_owned(),
|
||||||
storage.clone(),
|
storage.clone(),
|
||||||
|
@ -91,7 +91,7 @@ impl Config {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
StalledConfig::init(storage.clone());
|
StalledConfig::init(server_id, storage.clone());
|
||||||
|
|
||||||
let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1);
|
let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1);
|
||||||
info!("Creating portmap on address {}", portmap_address);
|
info!("Creating portmap on address {}", portmap_address);
|
||||||
|
@ -105,7 +105,7 @@ impl Config {
|
||||||
let pull_address = format!("tcp://{}:{}", config.ip, config.base_port);
|
let pull_address = format!("tcp://{}:{}", config.ip, config.base_port);
|
||||||
info!("Creating puller on address {}", pull_address);
|
info!("Creating puller on address {}", pull_address);
|
||||||
|
|
||||||
tokio::spawn(PullConfig::init(pull_address, storage, config));
|
tokio::spawn(PullConfig::init(server_id, pull_address, storage, config));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
@ -169,15 +169,15 @@ impl ServerConfig {
|
||||||
/// This method returns a future that, when run, spawns all of the server's required futures
|
/// This method returns a future that, when run, spawns all of the server's required futures
|
||||||
/// onto tokio. Therefore, this can only be used from tokio.
|
/// onto tokio. Therefore, this can only be used from tokio.
|
||||||
pub fn init<P: AsRef<Path>>(
|
pub fn init<P: AsRef<Path>>(
|
||||||
|
server_id: usize,
|
||||||
ip: &str,
|
ip: &str,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
runner_id: usize,
|
|
||||||
queues: BTreeSet<String>,
|
queues: BTreeSet<String>,
|
||||||
db_path: P,
|
db_path: P,
|
||||||
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let context = Arc::new(Context::new());
|
let context = Arc::new(Context::new());
|
||||||
|
|
||||||
Self::init_with_context(ip, base_port, runner_id, queues, db_path, context)
|
Self::init_with_context(server_id, ip, base_port, queues, db_path, context)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The same as `ServerConfig::init()`, but with a provided ZeroMQ Context.
|
/// The same as `ServerConfig::init()`, but with a provided ZeroMQ Context.
|
||||||
|
@ -188,17 +188,17 @@ impl ServerConfig {
|
||||||
/// If you're running the Server, Worker, and Spawner in the same application, you should share
|
/// If you're running the Server, Worker, and Spawner in the same application, you should share
|
||||||
/// a ZeroMQ context between them.
|
/// a ZeroMQ context between them.
|
||||||
pub fn init_with_context<P: AsRef<Path>>(
|
pub fn init_with_context<P: AsRef<Path>>(
|
||||||
|
server_id: usize,
|
||||||
ip: &str,
|
ip: &str,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
runner_id: usize,
|
|
||||||
queues: BTreeSet<String>,
|
queues: BTreeSet<String>,
|
||||||
db_path: P,
|
db_path: P,
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let config = Config {
|
let config = Config {
|
||||||
|
server_id,
|
||||||
ip: ip.to_owned(),
|
ip: ip.to_owned(),
|
||||||
base_port,
|
base_port,
|
||||||
runner_id,
|
|
||||||
queues,
|
queues,
|
||||||
db_path: db_path.as_ref().to_owned(),
|
db_path: db_path.as_ref().to_owned(),
|
||||||
context,
|
context,
|
||||||
|
|
|
@ -19,12 +19,13 @@
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use background_jobs_core::{JobInfo, Storage};
|
use background_jobs_core::{JobInfo, NewJobInfo, Storage};
|
||||||
use failure::{Error, Fail};
|
use failure::{Error, Fail};
|
||||||
use futures::{future::poll_fn, Future, Stream};
|
use futures::{future::poll_fn, Future, Stream};
|
||||||
#[cfg(feature = "futures-zmq")]
|
#[cfg(feature = "futures-zmq")]
|
||||||
use futures_zmq::{prelude::*, Multipart, Pull};
|
use futures_zmq::{prelude::*, Multipart, Pull};
|
||||||
use log::{error, info, trace};
|
use log::{error, info, trace};
|
||||||
|
use serde_derive::Deserialize;
|
||||||
use tokio::timer::Delay;
|
use tokio::timer::Delay;
|
||||||
use tokio_threadpool::blocking;
|
use tokio_threadpool::blocking;
|
||||||
#[cfg(feature = "tokio-zmq")]
|
#[cfg(feature = "tokio-zmq")]
|
||||||
|
@ -32,7 +33,15 @@ use tokio_zmq::{prelude::*, Multipart, Pull};
|
||||||
|
|
||||||
use crate::server::{coerce, Config};
|
use crate::server::{coerce, Config};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
enum EitherJob {
|
||||||
|
New(NewJobInfo),
|
||||||
|
Existing(JobInfo),
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct PullConfig {
|
pub(crate) struct PullConfig {
|
||||||
|
server_id: usize,
|
||||||
puller: Pull,
|
puller: Pull,
|
||||||
address: String,
|
address: String,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
|
@ -41,11 +50,13 @@ pub(crate) struct PullConfig {
|
||||||
|
|
||||||
impl PullConfig {
|
impl PullConfig {
|
||||||
pub(crate) fn init(
|
pub(crate) fn init(
|
||||||
|
server_id: usize,
|
||||||
address: String,
|
address: String,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
) -> impl Future<Item = (), Error = ()> {
|
) -> impl Future<Item = (), Error = ()> {
|
||||||
let cfg = ResetPullConfig {
|
let cfg = ResetPullConfig {
|
||||||
|
server_id,
|
||||||
address,
|
address,
|
||||||
storage,
|
storage,
|
||||||
config,
|
config,
|
||||||
|
@ -57,6 +68,7 @@ impl PullConfig {
|
||||||
|
|
||||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let config = self.reset();
|
let config = self.reset();
|
||||||
|
let server_id = self.server_id;
|
||||||
|
|
||||||
let storage = self.storage.clone();
|
let storage = self.storage.clone();
|
||||||
|
|
||||||
|
@ -71,7 +83,7 @@ impl PullConfig {
|
||||||
.and_then(parse_job)
|
.and_then(parse_job)
|
||||||
.and_then(move |job| {
|
.and_then(move |job| {
|
||||||
trace!("Storing job, {:?}", job);
|
trace!("Storing job, {:?}", job);
|
||||||
store_job(job, storage.clone())
|
store_job(job, storage.clone(), server_id)
|
||||||
})
|
})
|
||||||
.for_each(|_| Ok(()))
|
.for_each(|_| Ok(()))
|
||||||
.map(|_| info!("Puller is shutting down"))
|
.map(|_| info!("Puller is shutting down"))
|
||||||
|
@ -86,6 +98,7 @@ impl PullConfig {
|
||||||
|
|
||||||
fn reset(&self) -> ResetPullConfig {
|
fn reset(&self) -> ResetPullConfig {
|
||||||
ResetPullConfig {
|
ResetPullConfig {
|
||||||
|
server_id: self.server_id,
|
||||||
address: self.address.clone(),
|
address: self.address.clone(),
|
||||||
storage: self.storage.clone(),
|
storage: self.storage.clone(),
|
||||||
config: self.config.clone(),
|
config: self.config.clone(),
|
||||||
|
@ -97,7 +110,7 @@ impl PullConfig {
|
||||||
#[fail(display = "Message was empty")]
|
#[fail(display = "Message was empty")]
|
||||||
pub struct EmptyMessage;
|
pub struct EmptyMessage;
|
||||||
|
|
||||||
fn parse_job(mut multipart: Multipart) -> Result<JobInfo, Error> {
|
fn parse_job(mut multipart: Multipart) -> Result<EitherJob, Error> {
|
||||||
let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
|
let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?;
|
||||||
|
|
||||||
let parsed = serde_json::from_slice(&unparsed_msg)?;
|
let parsed = serde_json::from_slice(&unparsed_msg)?;
|
||||||
|
@ -105,19 +118,32 @@ fn parse_job(mut multipart: Multipart) -> Result<JobInfo, Error> {
|
||||||
Ok(parsed)
|
Ok(parsed)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_job(job: JobInfo, storage: Arc<Storage>) -> impl Future<Item = (), Error = Error> {
|
fn store_job(
|
||||||
|
job: EitherJob,
|
||||||
|
storage: Arc<Storage>,
|
||||||
|
server_id: usize,
|
||||||
|
) -> impl Future<Item = (), Error = Error> {
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
|
|
||||||
poll_fn(move || {
|
poll_fn(move || {
|
||||||
let job = job.clone();
|
let job = job.clone();
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
|
|
||||||
blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from)
|
blocking(move || {
|
||||||
|
let job = match job {
|
||||||
|
EitherJob::New(new_job) => storage.assign_id(new_job, server_id)?,
|
||||||
|
EitherJob::Existing(job) => job,
|
||||||
|
};
|
||||||
|
|
||||||
|
storage.store_job(job, server_id).map_err(Error::from)
|
||||||
|
})
|
||||||
|
.map_err(Error::from)
|
||||||
})
|
})
|
||||||
.then(coerce)
|
.then(coerce)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ResetPullConfig {
|
struct ResetPullConfig {
|
||||||
|
server_id: usize,
|
||||||
address: String,
|
address: String,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
|
@ -137,6 +163,7 @@ impl ResetPullConfig {
|
||||||
.build()
|
.build()
|
||||||
.map(|puller| {
|
.map(|puller| {
|
||||||
let config = PullConfig {
|
let config = PullConfig {
|
||||||
|
server_id: self.server_id,
|
||||||
puller,
|
puller,
|
||||||
address: self.address,
|
address: self.address,
|
||||||
storage: self.storage,
|
storage: self.storage,
|
||||||
|
|
|
@ -34,6 +34,7 @@ use zmq::Message;
|
||||||
use crate::server::{coerce, Config};
|
use crate::server::{coerce, Config};
|
||||||
|
|
||||||
pub(crate) struct PushConfig {
|
pub(crate) struct PushConfig {
|
||||||
|
server_id: usize,
|
||||||
pusher: Push,
|
pusher: Push,
|
||||||
address: String,
|
address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
|
@ -43,12 +44,14 @@ pub(crate) struct PushConfig {
|
||||||
|
|
||||||
impl PushConfig {
|
impl PushConfig {
|
||||||
pub(crate) fn init(
|
pub(crate) fn init(
|
||||||
|
server_id: usize,
|
||||||
address: String,
|
address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
) -> impl Future<Item = (), Error = ()> {
|
) -> impl Future<Item = (), Error = ()> {
|
||||||
let cfg = ResetPushConfig {
|
let cfg = ResetPushConfig {
|
||||||
|
server_id,
|
||||||
address,
|
address,
|
||||||
queue,
|
queue,
|
||||||
storage,
|
storage,
|
||||||
|
@ -63,6 +66,7 @@ impl PushConfig {
|
||||||
let reset = self.reset();
|
let reset = self.reset();
|
||||||
|
|
||||||
let PushConfig {
|
let PushConfig {
|
||||||
|
server_id,
|
||||||
address: _,
|
address: _,
|
||||||
pusher,
|
pusher,
|
||||||
queue,
|
queue,
|
||||||
|
@ -74,7 +78,7 @@ impl PushConfig {
|
||||||
|
|
||||||
let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250))
|
let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250))
|
||||||
.from_err()
|
.from_err()
|
||||||
.and_then(move |_| dequeue_jobs(storage.clone(), queue.clone()))
|
.and_then(move |_| dequeue_jobs(storage.clone(), queue.clone(), server_id))
|
||||||
.flatten()
|
.flatten()
|
||||||
.forward(pusher.sink(25))
|
.forward(pusher.sink(25))
|
||||||
.map(move |_| {
|
.map(move |_| {
|
||||||
|
@ -94,6 +98,7 @@ impl PushConfig {
|
||||||
|
|
||||||
fn reset(&self) -> ResetPushConfig {
|
fn reset(&self) -> ResetPushConfig {
|
||||||
ResetPushConfig {
|
ResetPushConfig {
|
||||||
|
server_id: self.server_id,
|
||||||
address: self.address.clone(),
|
address: self.address.clone(),
|
||||||
queue: self.queue.clone(),
|
queue: self.queue.clone(),
|
||||||
storage: self.storage.clone(),
|
storage: self.storage.clone(),
|
||||||
|
@ -105,11 +110,12 @@ impl PushConfig {
|
||||||
fn dequeue_jobs(
|
fn dequeue_jobs(
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
queue: String,
|
queue: String,
|
||||||
|
server_id: usize,
|
||||||
) -> impl Future<Item = impl Stream<Item = Multipart, Error = Error>, Error = Error> {
|
) -> impl Future<Item = impl Stream<Item = Multipart, Error = Error>, Error = Error> {
|
||||||
poll_fn(move || {
|
poll_fn(move || {
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
let queue = queue.clone();
|
let queue = queue.clone();
|
||||||
blocking(move || wrap_fetch_queue(storage, &queue))
|
blocking(move || wrap_fetch_queue(storage, &queue, server_id))
|
||||||
})
|
})
|
||||||
.then(coerce)
|
.then(coerce)
|
||||||
.map(|jobs| iter_ok(jobs))
|
.map(|jobs| iter_ok(jobs))
|
||||||
|
@ -119,8 +125,12 @@ fn dequeue_jobs(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wrap_fetch_queue(storage: Arc<Storage>, queue: &str) -> Result<Vec<Multipart>, Error> {
|
fn wrap_fetch_queue(
|
||||||
let response = fetch_queue(storage, queue)?;
|
storage: Arc<Storage>,
|
||||||
|
queue: &str,
|
||||||
|
server_id: usize,
|
||||||
|
) -> Result<Vec<Multipart>, Error> {
|
||||||
|
let response = fetch_queue(storage, queue, server_id)?;
|
||||||
|
|
||||||
let jobs = response
|
let jobs = response
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -135,11 +145,18 @@ fn wrap_fetch_queue(storage: Arc<Storage>, queue: &str) -> Result<Vec<Multipart>
|
||||||
Ok(jobs)
|
Ok(jobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_queue(storage: Arc<Storage>, queue: &str) -> Result<Vec<JobInfo>, Error> {
|
fn fetch_queue(
|
||||||
storage.stage_jobs(100, queue).map_err(Error::from)
|
storage: Arc<Storage>,
|
||||||
|
queue: &str,
|
||||||
|
server_id: usize,
|
||||||
|
) -> Result<Vec<JobInfo>, Error> {
|
||||||
|
storage
|
||||||
|
.stage_jobs(100, queue, server_id)
|
||||||
|
.map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ResetPushConfig {
|
struct ResetPushConfig {
|
||||||
|
server_id: usize,
|
||||||
address: String,
|
address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
|
@ -161,6 +178,7 @@ impl ResetPushConfig {
|
||||||
.build()
|
.build()
|
||||||
.map(|pusher| {
|
.map(|pusher| {
|
||||||
let config = PushConfig {
|
let config = PushConfig {
|
||||||
|
server_id: self.server_id,
|
||||||
pusher,
|
pusher,
|
||||||
address: self.address,
|
address: self.address,
|
||||||
queue: self.queue,
|
queue: self.queue,
|
||||||
|
|
|
@ -30,12 +30,13 @@ use crate::server::coerce;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct StalledConfig {
|
pub(crate) struct StalledConfig {
|
||||||
|
server_id: usize,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StalledConfig {
|
impl StalledConfig {
|
||||||
pub(crate) fn init(storage: Arc<Storage>) {
|
pub(crate) fn init(server_id: usize, storage: Arc<Storage>) {
|
||||||
let cfg = StalledConfig { storage };
|
let cfg = StalledConfig { server_id, storage };
|
||||||
|
|
||||||
tokio::spawn(cfg.run());
|
tokio::spawn(cfg.run());
|
||||||
}
|
}
|
||||||
|
@ -43,7 +44,7 @@ impl StalledConfig {
|
||||||
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let reset = self.clone();
|
let reset = self.clone();
|
||||||
|
|
||||||
let StalledConfig { storage } = self;
|
let StalledConfig { server_id, storage } = self;
|
||||||
|
|
||||||
let fut = Interval::new(tokio::clock::now(), Duration::from_secs(60 * 30))
|
let fut = Interval::new(tokio::clock::now(), Duration::from_secs(60 * 30))
|
||||||
.from_err::<Error>()
|
.from_err::<Error>()
|
||||||
|
@ -51,7 +52,7 @@ impl StalledConfig {
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
poll_fn(move || {
|
poll_fn(move || {
|
||||||
let storage = storage.clone();
|
let storage = storage.clone();
|
||||||
blocking(move || storage.check_stalled_jobs().map_err(Error::from))
|
blocking(move || storage.check_stalled_jobs(server_id).map_err(Error::from))
|
||||||
})
|
})
|
||||||
.from_err()
|
.from_err()
|
||||||
})
|
})
|
||||||
|
|
|
@ -242,3 +242,6 @@ pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
|
||||||
|
|
||||||
#[cfg(feature = "background-jobs-server")]
|
#[cfg(feature = "background-jobs-server")]
|
||||||
pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};
|
pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};
|
||||||
|
|
||||||
|
#[cfg(feature = "background-jobs-actix")]
|
||||||
|
pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};
|
||||||
|
|
Loading…
Reference in a new issue