From cca9aca333bba391744f8ec10f006c360b1432aa Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 31 Oct 2019 13:46:41 -0500 Subject: [PATCH] Permit cached state in local actix workers --- jobs-actix/src/lib.rs | 4 +-- jobs-actix/src/worker.rs | 6 ++-- jobs-core/src/lib.rs | 2 +- jobs-core/src/processor_map.rs | 60 ++++++++++++++++++++++++++-------- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 9c83350..76d29a6 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -267,7 +267,7 @@ where LocalWorker::new( acc + i + 1000, key.clone(), - processors.clone(), + processors.cached(), queue_handle.inner.clone(), ) .start(); @@ -289,7 +289,7 @@ where LocalWorker::new( acc + i + 1000, key.clone(), - processors.clone(), + processors.cached(), queue_handle.inner.clone(), ) }); diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 519dfc3..e6cfa89 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -3,7 +3,7 @@ use actix::{ fut::{wrap_future, ActorFuture}, Actor, Addr, AsyncContext, Context, Handler, Message, }; -use background_jobs_core::{JobInfo, ProcessorMap}; +use background_jobs_core::{JobInfo, CachedProcessorMap}; use log::info; use crate::{RequestJob, ReturningJob}; @@ -53,7 +53,7 @@ where { id: u64, queue: String, - processors: ProcessorMap, + processors: CachedProcessorMap, server: Addr, } @@ -64,7 +64,7 @@ where State: Clone + 'static, { /// Create a new local worker - pub fn new(id: u64, queue: String, processors: ProcessorMap, server: Addr) -> Self { + pub fn new(id: u64, queue: String, processors: CachedProcessorMap, server: Addr) -> Self { LocalWorker { id, queue, diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 97b509a..8915916 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -20,7 +20,7 @@ pub use crate::{ job::Job, job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, processor::Processor, - processor_map::ProcessorMap, + processor_map::{CachedProcessorMap, ProcessorMap}, stats::{JobStat, Stats}, storage::{memory_storage, Storage}, }; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 63084d2..cb679fe 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -8,10 +8,7 @@ use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo}; /// A generic function that processes a job /// -/// Instead of storing -/// [`Processor`](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) type -/// directly, the -/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.ProcessorMap.html) +/// Instead of storing [`Processor`] type directly, the [`ProcessorMap`] /// struct stores these `ProcessFn` types that don't expose differences in Job types. pub type ProcessFn = Arc Box + Send> + Send + Sync>; @@ -20,18 +17,24 @@ pub type StateFn = Arc S + Send + Sync>; /// A type for storing the relationships between processor names and the processor itself /// -/// [`Processor`s](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) must -/// be registered with the `ProcessorMap` in the initialization phase of an application before -/// workers are spawned in order to handle queued jobs. +/// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an +/// application before workers are spawned in order to handle queued jobs. #[derive(Clone)] -pub struct ProcessorMap -where - S: Clone, -{ +pub struct ProcessorMap { inner: HashMap>, state_fn: StateFn, } +/// A type for storing the relationships between processor names and the processor itself, with the +/// state pre-cached instead of being generated from the state function each time +/// +/// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an +/// application before workers are spawned in order to handle queued jobs. +pub struct CachedProcessorMap { + inner: HashMap>, + state: S, +} + impl ProcessorMap where S: Clone + 'static, @@ -48,9 +51,7 @@ where } } - /// Register a - /// [`Processor`](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) with - /// this `ProcessorMap`. + /// Register a [`Processor`] with this `ProcessorMap`. /// /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so /// make sure to register all your processors up-front. @@ -66,6 +67,14 @@ where ); } + /// Initialize the State from the State Function + pub fn cached(&self) -> CachedProcessorMap { + CachedProcessorMap { + inner: self.inner.clone(), + state: (self.state_fn)(), + } + } + /// Process a given job /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is @@ -85,6 +94,29 @@ where } } +impl CachedProcessorMap +where + S: Clone + 'static, +{ + /// Process a given job + /// + /// This should not be called from outside implementations of a backgoround-jobs runtime. It is + /// intended for internal use. + pub fn process_job(&self, job: JobInfo) -> impl Future { + let opt = self + .inner + .get(job.processor()) + .map(|processor| process(processor, self.state.clone(), job.clone())); + + if let Some(fut) = opt { + Either::A(fut) + } else { + error!("Processor {} not present", job.processor()); + Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future()) + } + } +} + fn process( process_fn: &ProcessFn, state: S,