mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-25 05:21:00 +00:00
Prepare jobs-actix for release
This commit is contained in:
parent
d8d4b026fe
commit
7e06ad981f
7 changed files with 142 additions and 4 deletions
|
@ -23,6 +23,7 @@ impl<J> Every<J>
|
||||||
where
|
where
|
||||||
J: Job + Clone + 'static,
|
J: Job + Clone + 'static,
|
||||||
{
|
{
|
||||||
|
/// Create a new Every actor
|
||||||
pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self {
|
pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self {
|
||||||
Every {
|
Every {
|
||||||
spawner,
|
spawner,
|
||||||
|
|
|
@ -1,9 +1,131 @@
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
|
//! # An Actix-based Jobs Processor
|
||||||
|
//!
|
||||||
|
//! This library will spin up as many actors as requested for each processor to process jobs
|
||||||
|
//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
|
||||||
|
//! order to achieve parallel execution, multiple Arbiters must be in use.
|
||||||
|
//!
|
||||||
|
//! The thread count is used to spawn Synchronous Actors to handle the storage of job
|
||||||
|
//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be
|
||||||
|
//! used. By default, the number of cores of the running system is used.
|
||||||
|
//!
|
||||||
|
//! ### Example
|
||||||
|
//! ```rust
|
||||||
|
//! use actix::System;
|
||||||
|
//! use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig};
|
||||||
|
//! use failure::Error;
|
||||||
|
//! use futures::{future::ok, Future};
|
||||||
|
//! use serde_derive::{Deserialize, Serialize};
|
||||||
|
//!
|
||||||
|
//! const DEFAULT_QUEUE: &'static str = "default";
|
||||||
|
//!
|
||||||
|
//! #[derive(Clone, Debug)]
|
||||||
|
//! pub struct MyState {
|
||||||
|
//! pub app_name: String,
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! #[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
//! pub struct MyJob {
|
||||||
|
//! some_usize: usize,
|
||||||
|
//! other_usize: usize,
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! #[derive(Clone, Debug)]
|
||||||
|
//! pub struct MyProcessor;
|
||||||
|
//!
|
||||||
|
//! fn main() -> Result<(), Error> {
|
||||||
|
//! // First set up the Actix System to ensure we have a runtime to spawn jobs on.
|
||||||
|
//! let sys = System::new("my-actix-system");
|
||||||
|
//!
|
||||||
|
//! // Set up our Storage
|
||||||
|
//! // For this example, we use the default in-memory storage mechanism
|
||||||
|
//! use background_jobs::memory_storage::Storage;
|
||||||
|
//! let storage = Storage::new();
|
||||||
|
//!
|
||||||
|
//! // Start the application server. This guards access to to the jobs store
|
||||||
|
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
|
||||||
|
//!
|
||||||
|
//! // Configure and start our workers
|
||||||
|
//! WorkerConfig::new(move || MyState::new("My App"))
|
||||||
|
//! .register(MyProcessor)
|
||||||
|
//! .set_processor_count(DEFAULT_QUEUE, 16)
|
||||||
|
//! .start(queue_handle.clone());
|
||||||
|
//!
|
||||||
|
//! // Queue our jobs
|
||||||
|
//! queue_handle.queue(MyJob::new(1, 2))?;
|
||||||
|
//! queue_handle.queue(MyJob::new(3, 4))?;
|
||||||
|
//! queue_handle.queue(MyJob::new(5, 6))?;
|
||||||
|
//!
|
||||||
|
//! // Block on Actix
|
||||||
|
//! sys.run()?;
|
||||||
|
//! Ok(())
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl MyState {
|
||||||
|
//! pub fn new(app_name: &str) -> Self {
|
||||||
|
//! MyState {
|
||||||
|
//! app_name: app_name.to_owned(),
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl MyJob {
|
||||||
|
//! pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||||
|
//! MyJob {
|
||||||
|
//! some_usize,
|
||||||
|
//! other_usize,
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl Job for MyJob {
|
||||||
|
//! type Processor = MyProcessor;
|
||||||
|
//! type State = MyState;
|
||||||
|
//! type Future = Result<(), Error>;
|
||||||
|
//!
|
||||||
|
//! fn run(self, state: MyState) -> Self::Future {
|
||||||
|
//! println!("{}: args, {:?}", state.app_name, self);
|
||||||
|
//!
|
||||||
|
//! Ok(())
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl Processor for MyProcessor {
|
||||||
|
//! // The kind of job this processor should execute
|
||||||
|
//! type Job = MyJob;
|
||||||
|
//!
|
||||||
|
//! // The name of the processor. It is super important that each processor has a unique name,
|
||||||
|
//! // because otherwise one processor will overwrite another processor when they're being
|
||||||
|
//! // registered.
|
||||||
|
//! const NAME: &'static str = "MyProcessor";
|
||||||
|
//!
|
||||||
|
//! // The queue that this processor belongs to
|
||||||
|
//! //
|
||||||
|
//! // Workers have the option to subscribe to specific queues, so this is important to
|
||||||
|
//! // determine which worker will call the processor
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override the queue they're spawned on
|
||||||
|
//! const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||||
|
//!
|
||||||
|
//! // The number of times background-jobs should try to retry a job before giving up
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override this value
|
||||||
|
//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
|
||||||
|
//!
|
||||||
|
//! // The logic to determine how often to retry this job if it fails
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override this value
|
||||||
|
//! const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
|
||||||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use actix::{Actor, Addr, Arbiter, SyncArbiter};
|
use actix::{Actor, Addr, Arbiter, SyncArbiter};
|
||||||
use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage};
|
use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage};
|
||||||
use failure::{Error, Fail};
|
use failure::{Error, Fail};
|
||||||
use futures::Future;
|
use futures::{future::IntoFuture, Future};
|
||||||
|
|
||||||
mod every;
|
mod every;
|
||||||
mod pinger;
|
mod pinger;
|
||||||
|
@ -20,6 +142,10 @@ use self::{
|
||||||
worker::Worker,
|
worker::Worker,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// The configuration for a jobs server
|
||||||
|
///
|
||||||
|
/// The server guards access to the storage backend, and keeps job information properly
|
||||||
|
/// up-to-date when workers request jobs to process
|
||||||
pub struct ServerConfig<S> {
|
pub struct ServerConfig<S> {
|
||||||
storage: S,
|
storage: S,
|
||||||
threads: usize,
|
threads: usize,
|
||||||
|
@ -114,6 +240,7 @@ where
|
||||||
where
|
where
|
||||||
P: Processor<Job = J> + Send + Sync + 'static,
|
P: Processor<Job = J> + Send + Sync + 'static,
|
||||||
J: Job<State = State>,
|
J: Job<State = State>,
|
||||||
|
<J::Future as IntoFuture>::Future: Send,
|
||||||
{
|
{
|
||||||
self.queues.insert(P::QUEUE.to_owned(), 4);
|
self.queues.insert(P::QUEUE.to_owned(), 4);
|
||||||
self.processors.register_processor(processor);
|
self.processors.register_processor(processor);
|
||||||
|
|
|
@ -8,6 +8,10 @@ use serde_derive::Deserialize;
|
||||||
|
|
||||||
use crate::{ActixStorage, Worker};
|
use crate::{ActixStorage, Worker};
|
||||||
|
|
||||||
|
/// The server Actor
|
||||||
|
///
|
||||||
|
/// This server guards access to Thee storage, and keeps a list of workers that are waiting for
|
||||||
|
/// jobs to process
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
storage: Box<dyn ActixStorage + Send>,
|
storage: Box<dyn ActixStorage + Send>,
|
||||||
cache: HashMap<String, VecDeque<Box<dyn Worker + Send>>>,
|
cache: HashMap<String, VecDeque<Box<dyn Worker + Send>>>,
|
||||||
|
|
|
@ -44,6 +44,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A worker that runs on the same system as the jobs server
|
||||||
pub struct LocalWorker<S, State>
|
pub struct LocalWorker<S, State>
|
||||||
where
|
where
|
||||||
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
||||||
|
@ -62,6 +63,7 @@ where
|
||||||
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
||||||
State: Clone + 'static,
|
State: Clone + 'static,
|
||||||
{
|
{
|
||||||
|
/// Create a new local worker
|
||||||
pub fn new(id: u64, queue: String, processors: ProcessorMap<State>, server: Addr<S>) -> Self {
|
pub fn new(id: u64, queue: String, processors: ProcessorMap<State>, server: Addr<S>) -> Self {
|
||||||
LocalWorker {
|
LocalWorker {
|
||||||
id,
|
id,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use futures::{future::IntoFuture, Future};
|
use futures::future::IntoFuture;
|
||||||
use serde::{de::DeserializeOwned, ser::Serialize};
|
use serde::{de::DeserializeOwned, ser::Serialize};
|
||||||
|
|
||||||
use crate::{Backoff, MaxRetries, Processor};
|
use crate::{Backoff, MaxRetries, Processor};
|
||||||
|
@ -25,7 +25,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
|
||||||
/// The state passed into this job is initialized at the start of the application. The state
|
/// The state passed into this job is initialized at the start of the application. The state
|
||||||
/// argument could be useful for containing a hook into something like r2d2, or the address of
|
/// argument could be useful for containing a hook into something like r2d2, or the address of
|
||||||
/// an actor in an actix-based system.
|
/// an actor in an actix-based system.
|
||||||
fn run(self, state: Self::State) -> Box<dyn Future<Item = (), Error = Error> + Send>;
|
fn run(self, state: Self::State) -> Self::Future;
|
||||||
|
|
||||||
/// If this job should not use the default queue for its processor, this can be overridden in
|
/// If this job should not use the default queue for its processor, this can be overridden in
|
||||||
/// user-code.
|
/// user-code.
|
||||||
|
|
|
@ -159,7 +159,10 @@ pub trait Processor: Clone {
|
||||||
&self,
|
&self,
|
||||||
args: Value,
|
args: Value,
|
||||||
state: <Self::Job as Job>::State,
|
state: <Self::Job as Job>::State,
|
||||||
) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
|
) -> Box<dyn Future<Item = (), Error = JobError> + Send>
|
||||||
|
where
|
||||||
|
<<Self::Job as Job>::Future as IntoFuture>::Future: Send,
|
||||||
|
{
|
||||||
let res = serde_json::from_value::<Self::Job>(args);
|
let res = serde_json::from_value::<Self::Job>(args);
|
||||||
|
|
||||||
let fut = match res {
|
let fut = match res {
|
||||||
|
|
|
@ -58,6 +58,7 @@ where
|
||||||
where
|
where
|
||||||
P: Processor<Job = J> + Sync + Send + 'static,
|
P: Processor<Job = J> + Sync + Send + 'static,
|
||||||
J: Job<State = S>,
|
J: Job<State = S>,
|
||||||
|
<J::Future as IntoFuture>::Future: Send,
|
||||||
{
|
{
|
||||||
self.inner.insert(
|
self.inner.insert(
|
||||||
P::NAME.to_owned(),
|
P::NAME.to_owned(),
|
||||||
|
|
Loading…
Reference in a new issue