Relax bounds

This commit is contained in:
asonix 2019-05-23 22:41:34 -05:00
parent 2968ba5360
commit edd63abf0f
6 changed files with 27 additions and 29 deletions

View file

@ -28,7 +28,7 @@ impl ServerConfig {
pub fn start<S>(self) -> QueueHandle<S> pub fn start<S>(self) -> QueueHandle<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
let ServerConfig { server_id, db_path } = self; let ServerConfig { server_id, db_path } = self;
@ -44,7 +44,7 @@ impl ServerConfig {
pub struct WorkerConfig<S> pub struct WorkerConfig<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
processors: ProcessorMap<S>, processors: ProcessorMap<S>,
queues: BTreeMap<String, usize>, queues: BTreeMap<String, usize>,
@ -52,11 +52,11 @@ where
impl<S> WorkerConfig<S> impl<S> WorkerConfig<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
pub fn new(state: S) -> Self { pub fn new(state_fn: impl Fn() -> S + Send + Sync + 'static) -> Self {
WorkerConfig { WorkerConfig {
processors: ProcessorMap::new(state), processors: ProcessorMap::new(Box::new(state_fn)),
queues: BTreeMap::new(), queues: BTreeMap::new(),
} }
} }
@ -95,14 +95,14 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct QueueHandle<S> pub struct QueueHandle<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
inner: Addr<Server<LocalWorker<S>>>, inner: Addr<Server<LocalWorker<S>>>,
} }
impl<S> QueueHandle<S> impl<S> QueueHandle<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
pub fn queue<P>(&self, job: P::Job) -> Result<(), Error> pub fn queue<P>(&self, job: P::Job) -> Result<(), Error>
where where

View file

@ -25,7 +25,7 @@ impl Message for ProcessJob {
pub struct LocalWorker<State> pub struct LocalWorker<State>
where where
State: Clone + Send + 'static, State: Clone + 'static,
{ {
id: usize, id: usize,
queue: String, queue: String,
@ -35,7 +35,7 @@ where
impl<State> LocalWorker<State> impl<State> LocalWorker<State>
where where
State: Clone + Send + 'static, State: Clone + 'static,
{ {
pub fn new( pub fn new(
id: usize, id: usize,
@ -54,7 +54,7 @@ where
impl<State> Actor for LocalWorker<State> impl<State> Actor for LocalWorker<State>
where where
State: Clone + Send + 'static, State: Clone + 'static,
{ {
type Context = Context<Self>; type Context = Context<Self>;
@ -66,7 +66,7 @@ where
impl<State> Handler<ProcessJob> for LocalWorker<State> impl<State> Handler<ProcessJob> for LocalWorker<State>
where where
State: Clone + Send + 'static, State: Clone + 'static,
{ {
type Result = (); type Result = ();

View file

@ -26,7 +26,7 @@ use crate::{Backoff, MaxRetries};
/// The Job trait defines parameters pertaining to an instance of background job /// The Job trait defines parameters pertaining to an instance of background job
pub trait Job<S = ()>: Serialize + DeserializeOwned pub trait Job<S = ()>: Serialize + DeserializeOwned
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///

View file

@ -83,7 +83,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// ``` /// ```
pub trait Processor<S = ()>: Clone pub trait Processor<S = ()>: Clone
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
type Job: Job<S> + 'static; type Job: Job<S> + 'static;

View file

@ -32,8 +32,11 @@ use crate::{JobError, JobInfo, Processor};
/// directly, the /// directly, the
/// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.ProcessorMap.html) /// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.ProcessorMap.html)
/// struct stores these `ProcessFn` types that don't expose differences in Job types. /// struct stores these `ProcessFn` types that don't expose differences in Job types.
pub type ProcessFn = pub type ProcessFn<S> =
Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send>; Box<dyn Fn(Value, S) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send>;
pub type StateFn<S> = Box<dyn Fn() -> S + Send + Sync>;
/// A type for storing the relationships between processor names and the processor itself /// A type for storing the relationships between processor names and the processor itself
/// ///
@ -44,23 +47,23 @@ pub struct ProcessorMap<S>
where where
S: Clone, S: Clone,
{ {
inner: HashMap<String, ProcessFn>, inner: HashMap<String, ProcessFn<S>>,
state: S, state_fn: StateFn<S>,
} }
impl<S> ProcessorMap<S> impl<S> ProcessorMap<S>
where where
S: Clone + Send + 'static, S: Clone + 'static,
{ {
/// Intialize a `ProcessorMap` /// Intialize a `ProcessorMap`
/// ///
/// The state passed into this method will be passed to all jobs executed through this /// The state passed into this method will be passed to all jobs executed through this
/// ProcessorMap. The state argument could be useful for containing a hook into something like /// ProcessorMap. The state argument could be useful for containing a hook into something like
/// r2d2, or the address of an actor in an actix-based system. /// r2d2, or the address of an actor in an actix-based system.
pub fn new(state: S) -> Self { pub fn new(state_fn: StateFn<S>) -> Self {
ProcessorMap { ProcessorMap {
inner: HashMap::new(), inner: HashMap::new(),
state, state_fn,
} }
} }
@ -74,11 +77,9 @@ where
where where
P: Processor<S> + Send + 'static, P: Processor<S> + Send + 'static,
{ {
let state = self.state.clone();
self.inner.insert( self.inner.insert(
P::NAME.to_owned(), P::NAME.to_owned(),
Box::new(move |value| processor.process(value, state.clone())), Box::new(move |value, state| processor.process(value, state)),
); );
} }
@ -90,7 +91,7 @@ where
let opt = self let opt = self
.inner .inner
.get(job.processor()) .get(job.processor())
.map(|processor| process(processor, job.clone())); .map(|processor| process(processor, (self.state_fn)(), job.clone()));
if let Some(fut) = opt { if let Some(fut) = opt {
Either::A(fut) Either::A(fut)
@ -101,12 +102,12 @@ where
} }
} }
fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> { fn process<S>(process_fn: &ProcessFn<S>, state: S, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
let args = job.args(); let args = job.args();
let processor = job.processor().to_owned(); let processor = job.processor().to_owned();
process_fn(args).then(move |res| match res { process_fn(args, state).then(move |res| match res {
Ok(_) => { Ok(_) => {
info!("Job {} completed, {}", job.id(), processor); info!("Job {} completed, {}", job.id(), processor);
job.pass(); job.pass();

View file

@ -275,8 +275,5 @@
pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats}; pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats};
#[cfg(feature = "background-jobs-server")]
pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};
#[cfg(feature = "background-jobs-actix")] #[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig}; pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};