From d266315f1fe7e79e197fd6e249eee7bd8e72c3d6 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 16 Dec 2018 12:43:44 -0600 Subject: [PATCH] Add jobs-actix --- Cargo.toml | 12 +- examples/server-jobs-example/Cargo.toml | 2 - .../server-jobs-example/src/bin/server.rs | 2 +- jobs-actix/Cargo.toml | 16 + jobs-actix/src/lib.rs | 113 +++++++ jobs-actix/src/pinger.rs | 35 ++ jobs-actix/src/server.rs | 249 ++++++++++++++ jobs-actix/src/worker.rs | 85 +++++ jobs-core/Cargo.toml | 2 +- jobs-core/src/job_info.rs | 124 +++++-- jobs-core/src/lib.rs | 15 +- jobs-core/src/processor.rs | 18 +- jobs-core/src/processor_map.rs | 5 +- jobs-core/src/storage.rs | 316 ++++++++++-------- jobs-server/Cargo.toml | 6 +- jobs-server/src/server/mod.rs | 28 +- jobs-server/src/server/pull.rs | 37 +- jobs-server/src/server/push.rs | 30 +- jobs-server/src/server/stalled.rs | 9 +- src/lib.rs | 3 + 20 files changed, 881 insertions(+), 226 deletions(-) create mode 100644 jobs-actix/Cargo.toml create mode 100644 jobs-actix/src/lib.rs create mode 100644 jobs-actix/src/pinger.rs create mode 100644 jobs-actix/src/server.rs create mode 100644 jobs-actix/src/worker.rs diff --git a/Cargo.toml b/Cargo.toml index 2a587ba..9d3f5a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ edition = "2018" [workspace] members = [ + "jobs-actix", "jobs-core", "jobs-server", "examples/server-jobs-example", @@ -18,12 +19,19 @@ members = [ [features] default = ["background-jobs-server", "background-jobs-server/tokio-zmq"] +no_unix = ["background-jobs-server", "background-jobs-server/futures-zmq"] +actix = ["background-jobs-actix"] [dependencies.background-jobs-core] -version = "0.3" +version = "0.4" path = "jobs-core" [dependencies.background-jobs-server] -version = "0.3" +version = "0.4" path = "jobs-server" optional = true + +[dependencies.background-jobs-actix] +version = "0.4" +path = "jobs-actix" +optional = true diff --git a/examples/server-jobs-example/Cargo.toml b/examples/server-jobs-example/Cargo.toml index 82f725a..7c19745 100644 --- a/examples/server-jobs-example/Cargo.toml +++ b/examples/server-jobs-example/Cargo.toml @@ -17,5 +17,3 @@ tokio = "0.1" [dependencies.background-jobs] version = "0.3" path = "../.." -default-features = false -features = ["background-jobs-server"] diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs index 62b76b3..df0c7c2 100644 --- a/examples/server-jobs-example/src/bin/server.rs +++ b/examples/server-jobs-example/src/bin/server.rs @@ -26,9 +26,9 @@ fn main() -> Result<(), Error> { env_logger::init(); tokio::run(ServerConfig::init( + 1, "127.0.0.1", 5555, - 1, queue_set(), "example-db", )); diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml new file mode 100644 index 0000000..f7d04fd --- /dev/null +++ b/jobs-actix/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "background-jobs-actix" +version = "0.4.0" +authors = ["asonix "] +edition = "2018" + +[dependencies] +actix = "0.7" +background-jobs-core = { version = "0.4", path = "../jobs-core" } +chrono = "0.4" +failure = "0.1" +futures = "0.1" +log = "0.4" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs new file mode 100644 index 0000000..a85b8eb --- /dev/null +++ b/jobs-actix/src/lib.rs @@ -0,0 +1,113 @@ +use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; + +use actix::{Actor, Addr, SyncArbiter}; +use background_jobs_core::{Processor, ProcessorMap, Storage}; +use failure::Error; + +mod pinger; +mod server; +mod worker; +pub use self::{server::Server, worker::LocalWorker}; + +use self::{ + pinger::Pinger, + server::{CheckDb, EitherJob, RequestJob}, + worker::ProcessJob, +}; + +pub struct ServerConfig { + server_id: usize, + db_path: PathBuf, +} + +impl ServerConfig { + pub fn new(server_id: usize, db_path: PathBuf) -> Self { + ServerConfig { server_id, db_path } + } + + pub fn start(self) -> QueueHandle + where + S: Clone + Send + Sync + 'static, + { + let ServerConfig { server_id, db_path } = self; + + let server = SyncArbiter::start(1, move || { + Server::new(server_id, Storage::init(db_path.clone()).unwrap()) + }); + + Pinger::new(server.clone()).start(); + + QueueHandle { inner: server } + } +} + +pub struct WorkerConfig +where + S: Clone + Send + Sync + 'static, +{ + processors: ProcessorMap, + queues: BTreeMap, +} + +impl WorkerConfig +where + S: Clone + Send + Sync + 'static, +{ + pub fn new(state: S) -> Self { + WorkerConfig { + processors: ProcessorMap::new(state), + queues: BTreeMap::new(), + } + } + + pub fn register

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + self.queues.insert(P::QUEUE.to_owned(), 4); + self.processors.register_processor(processor); + } + + pub fn set_processor_count(&mut self, queue: &str, count: usize) { + self.queues.insert(queue.to_owned(), count); + } + + pub fn start(self, queue_handle: QueueHandle) { + let processors = Arc::new(self.processors); + + self.queues.into_iter().fold(0, |acc, (key, count)| { + (0..count).for_each(|i| { + LocalWorker::new( + acc + i + 1000, + key.clone(), + processors.clone(), + queue_handle.inner.clone(), + ) + .start(); + }); + + acc + count + }); + } +} + +#[derive(Clone)] +pub struct QueueHandle +where + S: Clone + Send + Sync + 'static, +{ + inner: Addr>>, +} + +impl QueueHandle +where + S: Clone + Send + Sync + 'static, +{ + pub fn queue

(&self, job: P::Job) -> Result<(), Error> + where + P: Processor, + { + self.inner.do_send(EitherJob::New(P::new_job(job)?)); + Ok(()) + } +} diff --git a/jobs-actix/src/pinger.rs b/jobs-actix/src/pinger.rs new file mode 100644 index 0000000..dde21de --- /dev/null +++ b/jobs-actix/src/pinger.rs @@ -0,0 +1,35 @@ +use std::time::Duration; + +use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext}; + +use crate::{CheckDb, ProcessJob, Server}; + +pub struct Pinger +where + W: Actor + Handler, +{ + server: Addr>, +} + +impl Pinger +where + W: Actor + Handler, +{ + pub fn new(server: Addr>) -> Self { + Pinger { server } + } +} + +impl Actor for Pinger +where + W: Actor + Handler, + Server: Actor>> + Handler, +{ + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + ctx.run_interval(Duration::from_secs(1), |actor, _| { + actor.server.do_send(CheckDb); + }); + } +} diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs new file mode 100644 index 0000000..d70033c --- /dev/null +++ b/jobs-actix/src/server.rs @@ -0,0 +1,249 @@ +use std::collections::{HashMap, VecDeque}; + +use actix::{Actor, Addr, Context, Handler, Message, SyncContext}; +use background_jobs_core::{JobInfo, NewJobInfo, Storage}; +use failure::Error; +use log::{debug, trace}; +use serde_derive::Deserialize; + +use crate::ProcessJob; + +#[derive(Clone, Debug, Deserialize)] +pub enum EitherJob { + New(NewJobInfo), + Existing(JobInfo), +} + +impl Message for EitherJob { + type Result = Result<(), Error>; +} + +pub struct RequestJob +where + W: Actor + Handler, +{ + worker_id: usize, + queue: String, + addr: Addr, +} + +impl RequestJob +where + W: Actor + Handler, +{ + pub fn new(worker_id: usize, queue: &str, addr: Addr) -> Self { + RequestJob { + worker_id, + queue: queue.to_owned(), + addr, + } + } +} + +impl Message for RequestJob +where + W: Actor + Handler, +{ + type Result = Result<(), Error>; +} + +pub struct CheckDb; + +impl Message for CheckDb { + type Result = Result<(), Error>; +} + +struct Cache +where + W: Actor + Handler, +{ + workers: VecDeque>, + jobs: VecDeque, +} + +impl Cache +where + W: Actor + Handler, +{ + fn new() -> Self { + Cache { + workers: VecDeque::new(), + jobs: VecDeque::new(), + } + } +} + +pub struct Server +where + W: Actor + Handler, +{ + server_id: usize, + storage: Storage, + cache: HashMap>, + cache_size: usize, +} + +impl Server +where + W: Actor + Handler, +{ + pub fn new(server_id: usize, storage: Storage) -> Self { + Server { + server_id, + storage, + cache: HashMap::new(), + cache_size: 25, + } + } + + pub fn set_cache_size(&mut self, cache_size: usize) { + self.cache_size = cache_size; + } + + fn populate(&mut self, queue: &str) -> Result { + trace!("Populating queue {}", queue); + let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new()); + + if entry.jobs.is_empty() { + let new_jobs = self + .storage + .stage_jobs(self.cache_size, queue, self.server_id)?; + let empty = new_jobs.is_empty(); + + debug!("Retrieved {} jobs from storage", new_jobs.len()); + trace!("{:?}", new_jobs.iter().map(|j| j.id()).collect::>()); + + new_jobs + .into_iter() + .for_each(|job| entry.jobs.push_back(job)); + Ok(!empty) + } else { + Ok(true) + } + } +} + +impl Actor for Server +where + W: Actor + Handler, +{ + type Context = SyncContext; + + fn started(&mut self, _: &mut Self::Context) { + self.storage.requeue_staged_jobs(self.server_id).unwrap(); + self.storage.check_stalled_jobs(self.server_id).unwrap(); + } +} + +impl Handler for Server +where + W: Actor> + Handler, +{ + type Result = Result<(), Error>; + + fn handle(&mut self, msg: EitherJob, _: &mut Self::Context) -> Self::Result { + let mut job = match msg { + EitherJob::New(new_job) => { + let job = self.storage.assign_id(new_job, self.server_id)?; + debug!("Created job {}, {:?}", job.id(), job); + job + } + EitherJob::Existing(job) => job, + }; + + let retry_now = job.is_pending() || (job.needs_retry() && job.retry_ready()); + + if job.is_pending() && !retry_now { + trace!("Storing job {} for later processing", job.id()); + } + self.storage.store_job(job.clone(), self.server_id)?; + + if retry_now { + let entry = self + .cache + .entry(job.queue().to_owned()) + .or_insert(Cache::new()); + + if let Some(worker) = entry.workers.pop_front() { + debug!("Retrying job {} on worker {}", job.id(), worker.worker_id); + worker.addr.do_send(ProcessJob::new(job.clone())); + job.set_running(); + self.storage.store_job(job, worker.worker_id)?; + } else if entry.jobs.len() < self.cache_size { + entry.jobs.push_back(job); + } + } + + Ok(()) + } +} + +impl Handler> for Server +where + W: Actor> + Handler, +{ + type Result = Result<(), Error>; + + fn handle(&mut self, msg: RequestJob, _: &mut Self::Context) -> Self::Result { + trace!("Worker {} requested job", msg.worker_id); + self.populate(&msg.queue)?; + + let job = self + .cache + .get_mut(&msg.queue) + .and_then(|cache| cache.jobs.pop_front()); + + if let Some(mut job) = job { + msg.addr.do_send(ProcessJob::new(job.clone())); + job.set_running(); + self.storage.store_job(job, msg.worker_id)?; + } else { + trace!("storing worker {} for queue {}", msg.worker_id, msg.queue); + let entry = self.cache.entry(msg.queue.clone()).or_insert(Cache::new()); + entry.workers.push_back(msg); + } + + Ok(()) + } +} + +impl Handler for Server +where + W: Actor> + Handler, +{ + type Result = Result<(), Error>; + + fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result { + trace!("Checkdb"); + let queues: Vec = self.cache.keys().cloned().collect(); + + let mut todo = Vec::new(); + + for queue in queues { + if self.populate(&queue)? { + debug!("Cached jobs for {}", queue); + } + + let entry = self.cache.entry(queue.to_owned()).or_insert(Cache::new()); + + let min_len = entry.jobs.len().min(entry.workers.len()); + + entry + .jobs + .drain(..min_len) + .zip(entry.workers.drain(..min_len)) + .for_each(|pair| { + todo.push(pair); + }); + } + + for (mut job, worker) in todo { + debug!("Sending job {} to worker {}", job.id(), worker.worker_id); + worker.addr.do_send(ProcessJob::new(job.clone())); + job.set_running(); + self.storage.store_job(job, worker.worker_id)?; + } + + Ok(()) + } +} diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs new file mode 100644 index 0000000..baf0dde --- /dev/null +++ b/jobs-actix/src/worker.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use actix::{ + fut::{wrap_future, ActorFuture}, + Actor, Addr, AsyncContext, Context, Handler, Message, +}; +use background_jobs_core::{JobInfo, ProcessorMap}; +use log::info; + +use crate::{EitherJob, RequestJob, Server}; + +pub struct ProcessJob { + job: JobInfo, +} + +impl ProcessJob { + pub fn new(job: JobInfo) -> Self { + ProcessJob { job } + } +} + +impl Message for ProcessJob { + type Result = (); +} + +pub struct LocalWorker +where + State: Clone + Send + Sync + 'static, +{ + id: usize, + queue: String, + processors: Arc>, + server: Addr>>, +} + +impl LocalWorker +where + State: Clone + Send + Sync + 'static, +{ + pub fn new( + id: usize, + queue: String, + processors: Arc>, + server: Addr>, + ) -> Self { + LocalWorker { + id, + queue, + processors, + server, + } + } +} + +impl Actor for LocalWorker +where + State: Clone + Send + Sync + 'static, +{ + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.server + .do_send(RequestJob::new(self.id, &self.queue, ctx.address())); + } +} + +impl Handler for LocalWorker +where + State: Clone + Send + Sync + 'static, +{ + type Result = (); + + fn handle(&mut self, msg: ProcessJob, ctx: &mut Self::Context) -> Self::Result { + info!("Worker {} processing job {}", self.id, msg.job.id()); + let fut = + wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| { + actor.server.do_send(EitherJob::Existing(job)); + actor + .server + .do_send(RequestJob::new(actor.id, &actor.queue, ctx.address())); + }); + + ctx.spawn(fut); + } +} diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 138d115..6bfeba4 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.3.2" +version = "0.4.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index a2567dd..769b3c9 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -18,10 +18,71 @@ */ use chrono::{offset::Utc, DateTime, Duration as OldDuration}; +use log::trace; +use serde_derive::{Deserialize, Serialize}; use serde_json::Value; use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct NewJobInfo { + /// Name of the processor that should handle this job + processor: String, + + /// Name of the queue that this job is a part of + queue: String, + + /// Arguments for a given job + args: Value, + + /// the initial MaxRetries value, for comparing to the current retry count + max_retries: MaxRetries, + + /// How often retries should be scheduled + backoff_strategy: Backoff, + + /// The time this job should be dequeued + next_queue: Option>, +} + +impl NewJobInfo { + pub(crate) fn schedule(&mut self, time: DateTime) { + self.next_queue = Some(time); + } + + pub(crate) fn new( + processor: String, + queue: String, + args: Value, + max_retries: MaxRetries, + backoff_strategy: Backoff, + ) -> Self { + NewJobInfo { + processor, + queue, + args, + max_retries, + next_queue: None, + backoff_strategy, + } + } + + pub(crate) fn with_id(self, id: usize) -> JobInfo { + JobInfo { + id, + processor: self.processor, + queue: self.queue, + status: JobStatus::Pending, + args: self.args, + retry_count: 0, + max_retries: self.max_retries, + next_queue: self.next_queue, + backoff_strategy: self.backoff_strategy, + updated_at: Utc::now(), + } + } +} + #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] /// Metadata pertaining to a job that exists within the background_jobs system /// @@ -30,8 +91,8 @@ use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; /// [Processor](https://docs.rs/background-jobs/0.3.0/background_jobs/trait.Processor.html)'s /// new_job method. pub struct JobInfo { - /// ID of the job, None means an ID has not been set - id: Option, + /// ID of the job + id: usize, /// Name of the processor that should handle this job processor: String, @@ -62,25 +123,8 @@ pub struct JobInfo { } impl JobInfo { - pub(crate) fn new( - processor: String, - queue: String, - args: Value, - max_retries: MaxRetries, - backoff_strategy: Backoff, - ) -> Self { - JobInfo { - id: None, - processor, - queue, - status: JobStatus::Pending, - args, - retry_count: 0, - max_retries, - next_queue: None, - backoff_strategy, - updated_at: Utc::now(), - } + pub fn queue(&self) -> &str { + &self.queue } pub(crate) fn updated(&mut self) { @@ -99,14 +143,8 @@ impl JobInfo { self.status.clone() } - pub(crate) fn id(&self) -> Option { - self.id.clone() - } - - pub(crate) fn set_id(&mut self, id: usize) { - if self.id.is_none() { - self.id = Some(id); - } + pub fn id(&self) -> usize { + self.id } pub(crate) fn increment(&mut self) -> ShouldStop { @@ -126,10 +164,13 @@ impl JobInfo { }; self.next_queue = Some(next_queue); - } - pub(crate) fn schedule(&mut self, time: DateTime) { - self.next_queue = Some(time); + trace!( + "Now {}, Next queue {}, ready {}", + now, + next_queue, + self.is_ready(now), + ); } pub(crate) fn is_stale(&self) -> bool { @@ -147,6 +188,25 @@ impl JobInfo { self.status == JobStatus::Failed } + pub fn needs_retry(&mut self) -> bool { + let should_retry = self.is_failed() && self.increment().should_requeue(); + + if should_retry { + self.pending(); + self.next_queue(); + } + + should_retry + } + + pub fn retry_ready(&self) -> bool { + self.is_ready(Utc::now()) + } + + pub fn is_pending(&self) -> bool { + self.status == JobStatus::Pending + } + pub(crate) fn is_in_queue(&self, queue: &str) -> bool { self.queue == queue } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index cb152d8..9d0d6ab 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -17,14 +17,8 @@ * along with Background Jobs. If not, see . */ -#[macro_use] -extern crate failure; -#[macro_use] -extern crate log; -#[macro_use] -extern crate serde_derive; - -use failure::Error; +use failure::{Error, Fail}; +use serde_derive::{Deserialize, Serialize}; mod job; mod job_info; @@ -33,7 +27,10 @@ mod processor_map; mod storage; pub use crate::{ - job::Job, job_info::JobInfo, processor::Processor, processor_map::ProcessorMap, + job::Job, + job_info::{JobInfo, NewJobInfo}, + processor::Processor, + processor_map::ProcessorMap, storage::Storage, }; diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 3bfa831..8798eee 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -18,14 +18,14 @@ */ use chrono::{offset::Utc, DateTime}; -use failure::Error; +use failure::{Error, Fail}; use futures::{ future::{Either, IntoFuture}, Future, }; use serde_json::Value; -use crate::{Backoff, Job, JobError, JobInfo, MaxRetries}; +use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// ## The Processor trait /// @@ -85,7 +85,7 @@ pub trait Processor: Clone where S: Clone + Send + Sync + 'static, { - type Job: Job; + type Job: Job + 'static; /// The name of the processor /// @@ -112,15 +112,15 @@ where /// /// This is required for spawning jobs, since it enforces the relationship between the job and /// the Processor that should handle it. - fn new_job(job: Self::Job) -> Result { + fn new_job(job: Self::Job) -> Result { let queue = job.queue().unwrap_or(Self::QUEUE).to_owned(); let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES); let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY); - let job = JobInfo::new( + let job = NewJobInfo::new( Self::NAME.to_owned(), queue, - serde_json::to_value(job)?, + serde_json::to_value(job).map_err(|_| ToJson)?, max_retries, backoff_strategy, ); @@ -129,7 +129,7 @@ where } /// Create a JobInfo to schedule a job to be performed after a certain time - fn new_scheduled_job(job: Self::Job, after: DateTime) -> Result { + fn new_scheduled_job(job: Self::Job, after: DateTime) -> Result { let mut job = Self::new_job(job)?; job.schedule(after); @@ -187,3 +187,7 @@ where Box::new(fut) } } + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Failed to to turn job into value")] +pub struct ToJson; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 3beecf4..fe9e867 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use futures::future::{Either, Future, IntoFuture}; +use log::{error, info}; use serde_json::Value; use crate::{JobError, JobInfo, Processor}; @@ -107,12 +108,12 @@ fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { - info!("Job completed, {}", processor); + info!("Job {} completed, {}", job.id(), processor); job.pass(); Ok(job) } Err(e) => { - error!("Job errored, {}, {}", processor, e); + error!("Job {} errored, {}, {}", job.id(), processor, e); job.fail(); Ok(job) } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index f4d2aae..c79f387 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -18,17 +18,19 @@ */ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, path::PathBuf, str::Utf8Error, sync::{Arc, RwLock, RwLockWriteGuard}, }; use chrono::offset::Utc; +use failure::Fail; use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; use lmdb::Error as LmdbError; +use log::{info, trace}; -use crate::{JobInfo, JobStatus}; +use crate::{JobInfo, JobStatus, NewJobInfo}; struct Buckets<'a> { queued: Bucket<'a, &'a [u8], ValueBuf>>, @@ -61,16 +63,15 @@ impl<'a> Buckets<'a> { /// None of the methods in this module are intended to be used outside of a background-jobs /// runtime. pub struct Storage { - runner_id: usize, store: Arc>, } impl Storage { - pub fn new(runner_id: usize, store: Arc>) -> Self { - Storage { runner_id, store } + pub fn new(store: Arc>) -> Self { + Storage { store } } - pub fn init(runner_id: usize, path: PathBuf) -> Result { + pub fn init(path: PathBuf) -> Result { let mut manager = Manager::new(); let mut cfg = Config::default(path); @@ -83,17 +84,17 @@ impl Storage { let handle = manager.open(cfg)?; - Ok(Storage::new(runner_id, handle)) + Ok(Storage::new(handle)) } - pub fn get_new_id(&self) -> Result { + pub fn get_new_id(&self, runner_id: usize) -> Result { let store = self.store.write()?; let bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::id_store()))?; let mut txn = store.write_txn()?; - let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", |txn| { + let new_id = self.with_lock(&bucket, &mut txn, b"id-lock", runner_id, |txn| { let id = match txn.get(&bucket, b"current-id") { Ok(id) => id.inner()?.to_serde(), Err(e) => match e { @@ -115,7 +116,7 @@ impl Storage { Ok(new_id) } - pub fn requeue_staged_jobs(&self) -> Result<(), Error> { + pub fn requeue_staged_jobs(&self, runner_id: usize) -> Result<(), Error> { let store = self.store.write()?; let job_bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; @@ -128,95 +129,42 @@ impl Storage { let mut write_txn = store.write_txn()?; let read_txn = store.read_txn()?; - self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| { - let mut cursor = read_txn.read_cursor(&buckets.staged)?; - match cursor.get(None, CursorOp::First) { - Ok(_) => (), - Err(e) => match e { - Error::NotFound => { - return Ok(()); - } - e => { - return Err(e); - } - }, - } - - let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; - - let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { - acc.and_then(|inner_txn| { - let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.queue_job(&buckets, inner_txn, key)?; - - Ok(inner_txn) - }) - })?; - - Ok(()) - })?; - - read_txn.commit()?; - write_txn.commit()?; - - Ok(()) - } - - pub fn check_stalled_jobs(&self) -> Result<(), Error> { - let store = self.store.write()?; - let job_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; - - let lock_bucket = - store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; - - let buckets = Buckets::new(&store)?; - - let mut write_txn = store.write_txn()?; - let read_txn = store.read_txn()?; - - self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| { - let mut cursor = read_txn.read_cursor(&buckets.running)?; - match cursor.get(None, CursorOp::First) { - Ok(_) => (), - Err(e) => match e { - Error::NotFound => { - return Ok(()); - } - e => { - return Err(e); - } - }, - } - - let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; - - let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { - acc.and_then(|inner_txn| { - let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - - if job.is_stale() { - if job.increment().should_requeue() { - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.queue_job(&buckets, inner_txn, key)?; - } else { - job.fail(); - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.fail_job(&buckets, inner_txn, key)?; + self.with_lock::<_, (), _>( + &lock_bucket, + &mut write_txn, + b"job-queue", + runner_id, + |inner_txn| { + let mut cursor = read_txn.read_cursor(&buckets.staged)?; + match cursor.get(None, CursorOp::First) { + Ok(_) => (), + Err(e) => match e { + Error::NotFound => { + return Ok(()); } - } + e => { + return Err(e); + } + }, + } - Ok(inner_txn) - }) - })?; + let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; - Ok(()) - })?; + let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { + acc.and_then(|inner_txn| { + let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.queue_job(&buckets, inner_txn, key, runner_id)?; + + Ok(inner_txn) + }) + })?; + + Ok(()) + }, + )?; read_txn.commit()?; write_txn.commit()?; @@ -224,7 +172,77 @@ impl Storage { Ok(()) } - pub fn stage_jobs(&self, limit: usize, queue: &str) -> Result, Error> { + pub fn check_stalled_jobs(&self, runner_id: usize) -> Result<(), Error> { + let store = self.store.write()?; + let job_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; + + let lock_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; + + let buckets = Buckets::new(&store)?; + + let mut write_txn = store.write_txn()?; + let read_txn = store.read_txn()?; + + self.with_lock::<_, (), _>( + &lock_bucket, + &mut write_txn, + b"job-queue", + runner_id, + |inner_txn| { + let mut cursor = read_txn.read_cursor(&buckets.running)?; + match cursor.get(None, CursorOp::First) { + Ok(_) => (), + Err(e) => match e { + Error::NotFound => { + return Ok(()); + } + e => { + return Err(e); + } + }, + } + + let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; + + let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { + acc.and_then(|inner_txn| { + let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + if job.is_stale() { + if job.increment().should_requeue() { + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.queue_job(&buckets, inner_txn, key, runner_id)?; + } else { + job.fail(); + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.fail_job(&buckets, inner_txn, key, runner_id)?; + } + } + + Ok(inner_txn) + }) + })?; + + Ok(()) + }, + )?; + + read_txn.commit()?; + write_txn.commit()?; + + Ok(()) + } + + pub fn stage_jobs( + &self, + limit: usize, + queue: &str, + runner_id: usize, + ) -> Result, Error> { let store = self.store.write()?; trace!("Got store"); @@ -246,11 +264,34 @@ impl Storage { &lock_bucket, &mut txn, b"job-queue", + runner_id, |inner_txn| { + trace!("Got lock"); + + let mut jobs = HashMap::new(); let mut cursor = read_txn.read_cursor(&buckets.queued)?; + let now = Utc::now(); + trace!("Got cursor"); match cursor.get(None, CursorOp::First) { - Ok(_) => (), + Ok((maybe_key, _)) => { + if let Some(key) = maybe_key { + match inner_txn.get(&job_bucket, &key) { + Ok(job) => { + let mut job = job.inner()?.to_serde(); + if job.is_ready(now) && job.is_in_queue(queue) { + job.stage(); + self.stage_job(&buckets, inner_txn, key, runner_id)?; + jobs.insert(job.id(), job); + } + } + Err(e) => match e { + Error::NotFound => (), + err => return Err(err), + }, + } + } + } Err(e) => match e { Error::NotFound => { trace!("No items in queue"); @@ -264,22 +305,18 @@ impl Storage { trace!("Set cursor to first"); let initial_value = - Ok((inner_txn, Vec::new())) as Result<(&mut Txn, Vec), Error>; + Ok((inner_txn, jobs)) as Result<(&mut Txn, HashMap), Error>; - let now = Utc::now(); - - trace!("Got lock"); - let (_inner_txn, vec) = cursor.iter().fold(initial_value, |acc, (key, _)| { + let (_inner_txn, mut hm) = cursor.iter().fold(initial_value, |acc, (key, _)| { acc.and_then(|(inner_txn, mut jobs)| { if jobs.len() < limit { let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); - job.stage(); - if job.is_ready(now) && job.is_in_queue(queue) { - self.stage_job(&buckets, inner_txn, key)?; + job.stage(); + self.stage_job(&buckets, inner_txn, key, runner_id)?; - jobs.push(job); + jobs.insert(job.id(), job); } } @@ -287,7 +324,7 @@ impl Storage { }) })?; - Ok(vec) + Ok(hm.drain().map(|(_, v)| v).collect()) }, )?; @@ -302,26 +339,18 @@ impl Storage { Ok(result) } - pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> { - let job_id = match job.id() { - Some(id) => id.to_string(), - None => { - let id = self.get_new_id()?; - job.set_id(id); - id.to_string() - } - }; + pub fn assign_id(&self, job: NewJobInfo, runner_id: usize) -> Result { + let id = self.get_new_id(runner_id)?; + let job = job.with_id(id); + trace!("Generaged job id, {}", job.id()); + Ok(job) + } + pub fn store_job(&self, mut job: JobInfo, runner_id: usize) -> Result<(), Error> { + let job_id = job.id().to_string(); job.updated(); - trace!("Generaged job id, {}", job_id); - - if job.is_failed() { - if job.increment().should_requeue() { - job.pending(); - job.next_queue(); - } - } + job.needs_retry(); let status = job.status(); let job_value = Json::to_value_buf(job)?; @@ -341,11 +370,13 @@ impl Storage { trace!("Set value"); match status { - JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref())?, - JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref())?, - JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref())?, - JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref())?, - JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id.as_ref())?, + JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, + JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, + JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, + JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, + JobStatus::Finished => { + self.finish_job(&buckets, &mut txn, job_id.as_ref(), runner_id)? + } } trace!("Committing"); @@ -360,6 +391,7 @@ impl Storage { &self, base_port: usize, queues: BTreeSet, + runner_id: usize, ) -> Result, PortMapError> { let store = self.store.write().map_err(|e| Error::from(e))?; @@ -375,6 +407,7 @@ impl Storage { &queue_port_bucket, &mut write_txn, lock_name.as_ref(), + runner_id, |write_txn| { let mut cursor = read_txn.read_cursor(&queue_port_bucket)?; @@ -436,8 +469,9 @@ impl Storage { buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - self.add_job_to(&buckets.staged, txn, id)?; + self.add_job_to(&buckets.staged, txn, id, runner_id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; @@ -451,8 +485,9 @@ impl Storage { buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - self.add_job_to(&buckets.queued, txn, id)?; + self.add_job_to(&buckets.queued, txn, id, runner_id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; @@ -466,8 +501,9 @@ impl Storage { buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - self.add_job_to(&buckets.failed, txn, id)?; + self.add_job_to(&buckets.failed, txn, id, runner_id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; self.delete_job_from(&buckets.staged, txn, id)?; @@ -481,8 +517,9 @@ impl Storage { buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - self.add_job_to(&buckets.running, txn, id)?; + self.add_job_to(&buckets.running, txn, id, runner_id)?; self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; @@ -496,8 +533,9 @@ impl Storage { buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - self.add_job_to(&buckets.finished, txn, id)?; + self.add_job_to(&buckets.finished, txn, id, runner_id)?; self.delete_job_from(&buckets.running, txn, id)?; self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; @@ -511,8 +549,9 @@ impl Storage { bucket: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, id: &[u8], + runner_id: usize, ) -> Result<(), Error> { - txn.set(bucket, id, Json::to_value_buf(self.runner_id)?)?; + txn.set(bucket, id, Json::to_value_buf(runner_id)?)?; trace!("Set value"); Ok(()) @@ -545,16 +584,17 @@ impl Storage { lock_bucket: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, lock_key: &[u8], + runner_id: usize, callback: F, ) -> Result where F: Fn(&mut Txn<'env>) -> Result, E: From, { - let mut other_runner_id = 0; + let mut other_runner_id = 10; loop { - let lock_value = Json::to_value_buf(self.runner_id)?; + let lock_value = Json::to_value_buf(runner_id)?; let mut inner_txn = txn.txn()?; let res = inner_txn.set_no_overwrite(lock_bucket, lock_key, lock_value); diff --git a/jobs-server/Cargo.toml b/jobs-server/Cargo.toml index 6dd8af4..fbf594b 100644 --- a/jobs-server/Cargo.toml +++ b/jobs-server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-server" description = "Jobs processor server based on ZeroMQ" -version = "0.3.1" +version = "0.4.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,10 +20,10 @@ tokio-threadpool = "0.1" zmq = "0.8" [features] -default = ["tokio-zmq"] +default = [] [dependencies.background-jobs-core] -version = "0.3" +version = "0.4" path = "../jobs-core" [dependencies.tokio-zmq] diff --git a/jobs-server/src/server/mod.rs b/jobs-server/src/server/mod.rs index d3d7c8f..6b4e6c1 100644 --- a/jobs-server/src/server/mod.rs +++ b/jobs-server/src/server/mod.rs @@ -41,9 +41,9 @@ use self::{portmap::PortMapConfig, pull::PullConfig, push::PushConfig, stalled:: #[derive(Clone)] pub(crate) struct Config { + server_id: usize, ip: String, base_port: usize, - runner_id: usize, queues: BTreeSet, db_path: PathBuf, context: Arc, @@ -51,31 +51,30 @@ pub(crate) struct Config { impl Config { fn create_server(&self) -> Box + Send> { - let runner_id = self.runner_id; let db_path = self.db_path.clone(); let base_port = self.base_port; let queues = self.queues.clone(); + let server_id = self.server_id; let config = Arc::new(self.clone()); let fut = poll_fn(move || { - let runner_id = runner_id; let db_path = db_path.clone(); let base_port = base_port; let queues = queues.clone(); blocking(move || { - let storage = Arc::new(Storage::init(runner_id, db_path)?); - storage.requeue_staged_jobs()?; - storage.check_stalled_jobs()?; - let port_map = storage.get_port_mapping(base_port, queues)?; + let storage = Arc::new(Storage::init(db_path)?); + storage.requeue_staged_jobs(server_id)?; + storage.check_stalled_jobs(server_id)?; + let port_map = storage.get_port_mapping(base_port, queues, server_id)?; Ok((storage, port_map)) }) }) .from_err::() .then(coerce) - .and_then(|(storage, port_map)| { + .and_then(move |(storage, port_map)| { for queue in config.queues.iter() { let port = port_map.get(queue).ok_or(MissingQueue(queue.to_owned()))?; @@ -84,6 +83,7 @@ impl Config { info!("Creating queue {} on address {}", queue, address); tokio::spawn(PushConfig::init( + server_id, address, queue.to_owned(), storage.clone(), @@ -91,7 +91,7 @@ impl Config { )); } - StalledConfig::init(storage.clone()); + StalledConfig::init(server_id, storage.clone()); let portmap_address = format!("tcp://{}:{}", config.ip, config.base_port + 1); info!("Creating portmap on address {}", portmap_address); @@ -105,7 +105,7 @@ impl Config { let pull_address = format!("tcp://{}:{}", config.ip, config.base_port); info!("Creating puller on address {}", pull_address); - tokio::spawn(PullConfig::init(pull_address, storage, config)); + tokio::spawn(PullConfig::init(server_id, pull_address, storage, config)); Ok(()) }) @@ -169,15 +169,15 @@ impl ServerConfig { /// This method returns a future that, when run, spawns all of the server's required futures /// onto tokio. Therefore, this can only be used from tokio. pub fn init>( + server_id: usize, ip: &str, base_port: usize, - runner_id: usize, queues: BTreeSet, db_path: P, ) -> Box + Send> { let context = Arc::new(Context::new()); - Self::init_with_context(ip, base_port, runner_id, queues, db_path, context) + Self::init_with_context(server_id, ip, base_port, queues, db_path, context) } /// The same as `ServerConfig::init()`, but with a provided ZeroMQ Context. @@ -188,17 +188,17 @@ impl ServerConfig { /// If you're running the Server, Worker, and Spawner in the same application, you should share /// a ZeroMQ context between them. pub fn init_with_context>( + server_id: usize, ip: &str, base_port: usize, - runner_id: usize, queues: BTreeSet, db_path: P, context: Arc, ) -> Box + Send> { let config = Config { + server_id, ip: ip.to_owned(), base_port, - runner_id, queues, db_path: db_path.as_ref().to_owned(), context, diff --git a/jobs-server/src/server/pull.rs b/jobs-server/src/server/pull.rs index 9dd9ae4..11431a6 100644 --- a/jobs-server/src/server/pull.rs +++ b/jobs-server/src/server/pull.rs @@ -19,12 +19,13 @@ use std::{sync::Arc, time::Duration}; -use background_jobs_core::{JobInfo, Storage}; +use background_jobs_core::{JobInfo, NewJobInfo, Storage}; use failure::{Error, Fail}; use futures::{future::poll_fn, Future, Stream}; #[cfg(feature = "futures-zmq")] use futures_zmq::{prelude::*, Multipart, Pull}; use log::{error, info, trace}; +use serde_derive::Deserialize; use tokio::timer::Delay; use tokio_threadpool::blocking; #[cfg(feature = "tokio-zmq")] @@ -32,7 +33,15 @@ use tokio_zmq::{prelude::*, Multipart, Pull}; use crate::server::{coerce, Config}; +#[derive(Clone, Debug, Deserialize)] +#[serde(untagged)] +enum EitherJob { + New(NewJobInfo), + Existing(JobInfo), +} + pub(crate) struct PullConfig { + server_id: usize, puller: Pull, address: String, storage: Arc, @@ -41,11 +50,13 @@ pub(crate) struct PullConfig { impl PullConfig { pub(crate) fn init( + server_id: usize, address: String, storage: Arc, config: Arc, ) -> impl Future { let cfg = ResetPullConfig { + server_id, address, storage, config, @@ -57,6 +68,7 @@ impl PullConfig { fn run(self) -> Box + Send> { let config = self.reset(); + let server_id = self.server_id; let storage = self.storage.clone(); @@ -71,7 +83,7 @@ impl PullConfig { .and_then(parse_job) .and_then(move |job| { trace!("Storing job, {:?}", job); - store_job(job, storage.clone()) + store_job(job, storage.clone(), server_id) }) .for_each(|_| Ok(())) .map(|_| info!("Puller is shutting down")) @@ -86,6 +98,7 @@ impl PullConfig { fn reset(&self) -> ResetPullConfig { ResetPullConfig { + server_id: self.server_id, address: self.address.clone(), storage: self.storage.clone(), config: self.config.clone(), @@ -97,7 +110,7 @@ impl PullConfig { #[fail(display = "Message was empty")] pub struct EmptyMessage; -fn parse_job(mut multipart: Multipart) -> Result { +fn parse_job(mut multipart: Multipart) -> Result { let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?; let parsed = serde_json::from_slice(&unparsed_msg)?; @@ -105,19 +118,32 @@ fn parse_job(mut multipart: Multipart) -> Result { Ok(parsed) } -fn store_job(job: JobInfo, storage: Arc) -> impl Future { +fn store_job( + job: EitherJob, + storage: Arc, + server_id: usize, +) -> impl Future { let storage = storage.clone(); poll_fn(move || { let job = job.clone(); let storage = storage.clone(); - blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from) + blocking(move || { + let job = match job { + EitherJob::New(new_job) => storage.assign_id(new_job, server_id)?, + EitherJob::Existing(job) => job, + }; + + storage.store_job(job, server_id).map_err(Error::from) + }) + .map_err(Error::from) }) .then(coerce) } struct ResetPullConfig { + server_id: usize, address: String, storage: Arc, config: Arc, @@ -137,6 +163,7 @@ impl ResetPullConfig { .build() .map(|puller| { let config = PullConfig { + server_id: self.server_id, puller, address: self.address, storage: self.storage, diff --git a/jobs-server/src/server/push.rs b/jobs-server/src/server/push.rs index fc4f4f9..9a5ea2e 100644 --- a/jobs-server/src/server/push.rs +++ b/jobs-server/src/server/push.rs @@ -34,6 +34,7 @@ use zmq::Message; use crate::server::{coerce, Config}; pub(crate) struct PushConfig { + server_id: usize, pusher: Push, address: String, queue: String, @@ -43,12 +44,14 @@ pub(crate) struct PushConfig { impl PushConfig { pub(crate) fn init( + server_id: usize, address: String, queue: String, storage: Arc, config: Arc, ) -> impl Future { let cfg = ResetPushConfig { + server_id, address, queue, storage, @@ -63,6 +66,7 @@ impl PushConfig { let reset = self.reset(); let PushConfig { + server_id, address: _, pusher, queue, @@ -74,7 +78,7 @@ impl PushConfig { let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250)) .from_err() - .and_then(move |_| dequeue_jobs(storage.clone(), queue.clone())) + .and_then(move |_| dequeue_jobs(storage.clone(), queue.clone(), server_id)) .flatten() .forward(pusher.sink(25)) .map(move |_| { @@ -94,6 +98,7 @@ impl PushConfig { fn reset(&self) -> ResetPushConfig { ResetPushConfig { + server_id: self.server_id, address: self.address.clone(), queue: self.queue.clone(), storage: self.storage.clone(), @@ -105,11 +110,12 @@ impl PushConfig { fn dequeue_jobs( storage: Arc, queue: String, + server_id: usize, ) -> impl Future, Error = Error> { poll_fn(move || { let storage = storage.clone(); let queue = queue.clone(); - blocking(move || wrap_fetch_queue(storage, &queue)) + blocking(move || wrap_fetch_queue(storage, &queue, server_id)) }) .then(coerce) .map(|jobs| iter_ok(jobs)) @@ -119,8 +125,12 @@ fn dequeue_jobs( }) } -fn wrap_fetch_queue(storage: Arc, queue: &str) -> Result, Error> { - let response = fetch_queue(storage, queue)?; +fn wrap_fetch_queue( + storage: Arc, + queue: &str, + server_id: usize, +) -> Result, Error> { + let response = fetch_queue(storage, queue, server_id)?; let jobs = response .into_iter() @@ -135,11 +145,18 @@ fn wrap_fetch_queue(storage: Arc, queue: &str) -> Result Ok(jobs) } -fn fetch_queue(storage: Arc, queue: &str) -> Result, Error> { - storage.stage_jobs(100, queue).map_err(Error::from) +fn fetch_queue( + storage: Arc, + queue: &str, + server_id: usize, +) -> Result, Error> { + storage + .stage_jobs(100, queue, server_id) + .map_err(Error::from) } struct ResetPushConfig { + server_id: usize, address: String, queue: String, storage: Arc, @@ -161,6 +178,7 @@ impl ResetPushConfig { .build() .map(|pusher| { let config = PushConfig { + server_id: self.server_id, pusher, address: self.address, queue: self.queue, diff --git a/jobs-server/src/server/stalled.rs b/jobs-server/src/server/stalled.rs index 8cb0791..c357eb3 100644 --- a/jobs-server/src/server/stalled.rs +++ b/jobs-server/src/server/stalled.rs @@ -30,12 +30,13 @@ use crate::server::coerce; #[derive(Clone)] pub(crate) struct StalledConfig { + server_id: usize, storage: Arc, } impl StalledConfig { - pub(crate) fn init(storage: Arc) { - let cfg = StalledConfig { storage }; + pub(crate) fn init(server_id: usize, storage: Arc) { + let cfg = StalledConfig { server_id, storage }; tokio::spawn(cfg.run()); } @@ -43,7 +44,7 @@ impl StalledConfig { fn run(self) -> Box + Send> { let reset = self.clone(); - let StalledConfig { storage } = self; + let StalledConfig { server_id, storage } = self; let fut = Interval::new(tokio::clock::now(), Duration::from_secs(60 * 30)) .from_err::() @@ -51,7 +52,7 @@ impl StalledConfig { let storage = storage.clone(); poll_fn(move || { let storage = storage.clone(); - blocking(move || storage.check_stalled_jobs().map_err(Error::from)) + blocking(move || storage.check_stalled_jobs(server_id).map_err(Error::from)) }) .from_err() }) diff --git a/src/lib.rs b/src/lib.rs index 324f794..affe66f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -242,3 +242,6 @@ pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; #[cfg(feature = "background-jobs-server")] pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig}; + +#[cfg(feature = "background-jobs-actix")] +pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};