Permit cached state in local actix workers

This commit is contained in:
asonix 2019-10-31 13:46:41 -05:00
parent f254b24814
commit cca9aca333
4 changed files with 52 additions and 20 deletions

View file

@ -267,7 +267,7 @@ where
LocalWorker::new( LocalWorker::new(
acc + i + 1000, acc + i + 1000,
key.clone(), key.clone(),
processors.clone(), processors.cached(),
queue_handle.inner.clone(), queue_handle.inner.clone(),
) )
.start(); .start();
@ -289,7 +289,7 @@ where
LocalWorker::new( LocalWorker::new(
acc + i + 1000, acc + i + 1000,
key.clone(), key.clone(),
processors.clone(), processors.cached(),
queue_handle.inner.clone(), queue_handle.inner.clone(),
) )
}); });

View file

@ -3,7 +3,7 @@ use actix::{
fut::{wrap_future, ActorFuture}, fut::{wrap_future, ActorFuture},
Actor, Addr, AsyncContext, Context, Handler, Message, Actor, Addr, AsyncContext, Context, Handler, Message,
}; };
use background_jobs_core::{JobInfo, ProcessorMap}; use background_jobs_core::{JobInfo, CachedProcessorMap};
use log::info; use log::info;
use crate::{RequestJob, ReturningJob}; use crate::{RequestJob, ReturningJob};
@ -53,7 +53,7 @@ where
{ {
id: u64, id: u64,
queue: String, queue: String,
processors: ProcessorMap<State>, processors: CachedProcessorMap<State>,
server: Addr<S>, server: Addr<S>,
} }
@ -64,7 +64,7 @@ where
State: Clone + 'static, State: Clone + 'static,
{ {
/// Create a new local worker /// Create a new local worker
pub fn new(id: u64, queue: String, processors: ProcessorMap<State>, server: Addr<S>) -> Self { pub fn new(id: u64, queue: String, processors: CachedProcessorMap<State>, server: Addr<S>) -> Self {
LocalWorker { LocalWorker {
id, id,
queue, queue,

View file

@ -20,7 +20,7 @@ pub use crate::{
job::Job, job::Job,
job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, job_info::{JobInfo, NewJobInfo, ReturnJobInfo},
processor::Processor, processor::Processor,
processor_map::ProcessorMap, processor_map::{CachedProcessorMap, ProcessorMap},
stats::{JobStat, Stats}, stats::{JobStat, Stats},
storage::{memory_storage, Storage}, storage::{memory_storage, Storage},
}; };

View file

@ -8,10 +8,7 @@ use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo};
/// A generic function that processes a job /// A generic function that processes a job
/// ///
/// Instead of storing /// Instead of storing [`Processor`] type directly, the [`ProcessorMap`]
/// [`Processor`](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) type
/// directly, the
/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.ProcessorMap.html)
/// struct stores these `ProcessFn` types that don't expose differences in Job types. /// struct stores these `ProcessFn` types that don't expose differences in Job types.
pub type ProcessFn<S> = pub type ProcessFn<S> =
Arc<dyn Fn(Value, S) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>; Arc<dyn Fn(Value, S) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>;
@ -20,18 +17,24 @@ pub type StateFn<S> = Arc<dyn Fn() -> S + Send + Sync>;
/// A type for storing the relationships between processor names and the processor itself /// A type for storing the relationships between processor names and the processor itself
/// ///
/// [`Processor`s](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) must /// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an
/// be registered with the `ProcessorMap` in the initialization phase of an application before /// application before workers are spawned in order to handle queued jobs.
/// workers are spawned in order to handle queued jobs.
#[derive(Clone)] #[derive(Clone)]
pub struct ProcessorMap<S> pub struct ProcessorMap<S> {
where
S: Clone,
{
inner: HashMap<String, ProcessFn<S>>, inner: HashMap<String, ProcessFn<S>>,
state_fn: StateFn<S>, state_fn: StateFn<S>,
} }
/// A type for storing the relationships between processor names and the processor itself, with the
/// state pre-cached instead of being generated from the state function each time
///
/// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an
/// application before workers are spawned in order to handle queued jobs.
pub struct CachedProcessorMap<S> {
inner: HashMap<String, ProcessFn<S>>,
state: S,
}
impl<S> ProcessorMap<S> impl<S> ProcessorMap<S>
where where
S: Clone + 'static, S: Clone + 'static,
@ -48,9 +51,7 @@ where
} }
} }
/// Register a /// Register a [`Processor`] with this `ProcessorMap`.
/// [`Processor`](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html) with
/// this `ProcessorMap`.
/// ///
/// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so
/// make sure to register all your processors up-front. /// make sure to register all your processors up-front.
@ -66,6 +67,14 @@ where
); );
} }
/// Initialize the State from the State Function
pub fn cached(&self) -> CachedProcessorMap<S> {
CachedProcessorMap {
inner: self.inner.clone(),
state: (self.state_fn)(),
}
}
/// Process a given job /// Process a given job
/// ///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// This should not be called from outside implementations of a backgoround-jobs runtime. It is
@ -85,6 +94,29 @@ where
} }
} }
impl<S> CachedProcessorMap<S>
where
S: Clone + 'static,
{
/// Process a given job
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = ReturnJobInfo, Error = ()> {
let opt = self
.inner
.get(job.processor())
.map(|processor| process(processor, self.state.clone(), job.clone()));
if let Some(fut) = opt {
Either::A(fut)
} else {
error!("Processor {} not present", job.processor());
Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future())
}
}
}
fn process<S>( fn process<S>(
process_fn: &ProcessFn<S>, process_fn: &ProcessFn<S>,
state: S, state: S,