mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-24 13:01:00 +00:00
Add documentation, license info, Make API cleaner
This commit is contained in:
parent
c23aa0961d
commit
cf6ede67ea
24 changed files with 1192 additions and 191 deletions
180
README.md
180
README.md
|
@ -1,3 +1,179 @@
|
||||||
# background-jobs
|
# Background Jobs
|
||||||
|
|
||||||
a job processor for Rust based on Futures
|
This crate provides tooling required to run some processes asynchronously from a usually
|
||||||
|
synchronous application. The standard example of this is Web Services, where certain things
|
||||||
|
need to be processed, but processing them while a user is waiting for their browser to respond
|
||||||
|
might not be the best experience.
|
||||||
|
|
||||||
|
### Usage
|
||||||
|
#### Add Background Jobs to your project
|
||||||
|
```toml
|
||||||
|
[dependencies]
|
||||||
|
background-jobs = "0.1"
|
||||||
|
failure = "0.1"
|
||||||
|
futures = "0.1"
|
||||||
|
tokio = "0.1"
|
||||||
|
```
|
||||||
|
|
||||||
|
#### To get started with Background Jobs, first you should define a job.
|
||||||
|
Jobs are a combination of the data required to perform an operation, and the logic of that
|
||||||
|
operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
pub struct MyJob {
|
||||||
|
some_usize: usize,
|
||||||
|
other_usize: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MyJob {
|
||||||
|
pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||||
|
MyJob {
|
||||||
|
some_usize,
|
||||||
|
other_usize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for MyJob {
|
||||||
|
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
||||||
|
info!("args: {:?}", self);
|
||||||
|
|
||||||
|
Box::new(Ok(()).into_future())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Next, define a Processor.
|
||||||
|
Processors are types that define default attributes for jobs, as well as containing some logic
|
||||||
|
used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct MyProcessor;
|
||||||
|
|
||||||
|
impl Processor for MyProcessor {
|
||||||
|
// The kind of job this processor should execute
|
||||||
|
type Job = MyJob;
|
||||||
|
|
||||||
|
// The name of the processor. It is super important that each processor has a unique name,
|
||||||
|
// because otherwise one processor will overwrite another processor when they're being
|
||||||
|
// registered.
|
||||||
|
fn name() -> &'static str {
|
||||||
|
"MyProcessor"
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queue that this processor belongs to
|
||||||
|
//
|
||||||
|
// Workers have the option to subscribe to specific queues, so this is important to
|
||||||
|
// determine which worker will call the processor
|
||||||
|
//
|
||||||
|
// Jobs can optionally override the queue they're spawned on
|
||||||
|
fn queue() -> &'static str {
|
||||||
|
DEFAULT_QUEUE
|
||||||
|
}
|
||||||
|
|
||||||
|
// The number of times background-jobs should try to retry a job before giving up
|
||||||
|
//
|
||||||
|
// Jobs can optionally override this value
|
||||||
|
fn max_retries() -> MaxRetries {
|
||||||
|
MaxRetries::Count(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The logic to determine how often to retry this job if it fails
|
||||||
|
//
|
||||||
|
// Jobs can optionally override this value
|
||||||
|
fn backoff_strategy() -> Backoff {
|
||||||
|
Backoff::Exponential(2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Running jobs
|
||||||
|
By default, this crate ships with the `background-jobs-server` feature enabled. This uses the
|
||||||
|
`background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for
|
||||||
|
spawning new jobs.
|
||||||
|
|
||||||
|
##### Starting the job server
|
||||||
|
```rust
|
||||||
|
use background_jobs::ServerConfig;
|
||||||
|
use failure::Error;
|
||||||
|
use server_jobs_example::queue_set;
|
||||||
|
|
||||||
|
fn main() -> Result<(), Error> {
|
||||||
|
// Run our job server
|
||||||
|
tokio::run(ServerConfig::init(
|
||||||
|
"127.0.0.1",
|
||||||
|
5555,
|
||||||
|
1,
|
||||||
|
queue_set(),
|
||||||
|
"example-db",
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
```
|
||||||
|
##### Starting the job worker
|
||||||
|
```rust
|
||||||
|
use background_jobs::WorkerConfig;
|
||||||
|
use failure::Error;
|
||||||
|
use server_jobs_example::{queue_map, MyProcessor};
|
||||||
|
|
||||||
|
fn main() -> Result<(), Error> {
|
||||||
|
// Create the worker config
|
||||||
|
let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map());
|
||||||
|
|
||||||
|
// Register our processor
|
||||||
|
worker.register_processor(MyProcessor);
|
||||||
|
|
||||||
|
// Spin up the workers
|
||||||
|
tokio::run(worker.run());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
```
|
||||||
|
##### Queuing jobs
|
||||||
|
```rust
|
||||||
|
use background_jobs::SpawnerConfig;
|
||||||
|
use futures::{future::lazy, Future};
|
||||||
|
use server_jobs_example::{MyJob, MyProcessor};
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
// Create 50 new jobs, each with two consecutive values of the fibonacci sequence
|
||||||
|
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
|
||||||
|
acc.push(MyJob::new(x, y));
|
||||||
|
|
||||||
|
(y, x + y, acc)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create the spawner
|
||||||
|
let spawner = SpawnerConfig::new("localhost", 5555);
|
||||||
|
|
||||||
|
// Queue each job
|
||||||
|
tokio::run(lazy(move || {
|
||||||
|
for job in jobs {
|
||||||
|
tokio::spawn(spawner.queue::<MyProcessor>(job).map_err(|_| ()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Not using a ZeroMQ based client/server model
|
||||||
|
If you want to create your own jobs processor based on this idea, you can depend on the
|
||||||
|
`background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some
|
||||||
|
other useful types for implementing a jobs processor.
|
||||||
|
|
||||||
|
### Contributing
|
||||||
|
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the GPLv3.
|
||||||
|
|
||||||
|
### License
|
||||||
|
|
||||||
|
Copyright © 2018 Riley Trautman
|
||||||
|
|
||||||
|
Background Jobs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||||
|
|
||||||
|
Background Jobs is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. This file is part of Background Jobs.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License along with Background Jobs. If not, see [http://www.gnu.org/licenses/](http://www.gnu.org/licenses/).
|
||||||
|
|
2
TODO
2
TODO
|
@ -1,2 +1,4 @@
|
||||||
1.
|
1.
|
||||||
Gracefull Shutdown
|
Gracefull Shutdown
|
||||||
|
2.
|
||||||
|
Periodically check staged jobs
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use background_jobs::ServerConfig;
|
use background_jobs::ServerConfig;
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use server_jobs_example::queue_set;
|
use server_jobs_example::queue_set;
|
||||||
|
|
|
@ -1,13 +1,32 @@
|
||||||
use background_jobs::{Processor, SpawnerConfig};
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use background_jobs::SpawnerConfig;
|
||||||
use futures::{future::lazy, Future};
|
use futures::{future::lazy, Future};
|
||||||
use server_jobs_example::{MyJobArguments, MyProcessor};
|
use server_jobs_example::{MyJob, MyProcessor};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
|
let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
|
||||||
acc.push(MyJobArguments::new(x, y));
|
acc.push(MyJob::new(x, y));
|
||||||
|
|
||||||
(y, x + y, acc)
|
(y, x + y, acc)
|
||||||
});
|
});
|
||||||
|
@ -16,11 +35,7 @@ fn main() {
|
||||||
|
|
||||||
tokio::run(lazy(move || {
|
tokio::run(lazy(move || {
|
||||||
for job in jobs {
|
for job in jobs {
|
||||||
tokio::spawn(
|
tokio::spawn(spawner.queue::<MyProcessor>(job).map_err(|_| ()));
|
||||||
spawner
|
|
||||||
.queue(MyProcessor::new_job(job, None, None).unwrap())
|
|
||||||
.map_err(|_| ()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use background_jobs::WorkerConfig;
|
use background_jobs::WorkerConfig;
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use server_jobs_example::{queue_map, MyProcessor};
|
use server_jobs_example::{queue_map, MyProcessor};
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -5,51 +24,58 @@ extern crate serde_derive;
|
||||||
|
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
|
||||||
use background_jobs::{Backoff, MaxRetries, Processor};
|
use background_jobs::{Backoff, Job, MaxRetries, Processor};
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use futures::{future::IntoFuture, Future};
|
use futures::{future::IntoFuture, Future};
|
||||||
|
|
||||||
|
const DEFAULT_QUEUE: &'static str = "default";
|
||||||
|
|
||||||
pub fn queue_map() -> BTreeMap<String, usize> {
|
pub fn queue_map() -> BTreeMap<String, usize> {
|
||||||
let mut map = BTreeMap::new();
|
let mut map = BTreeMap::new();
|
||||||
map.insert("default".to_owned(), 18);
|
map.insert(DEFAULT_QUEUE.to_owned(), 18);
|
||||||
|
|
||||||
map
|
map
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn queue_set() -> BTreeSet<String> {
|
pub fn queue_set() -> BTreeSet<String> {
|
||||||
let mut set = BTreeSet::new();
|
queue_map().keys().cloned().collect()
|
||||||
set.insert("default".to_owned());
|
|
||||||
|
|
||||||
set
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
pub struct MyJobArguments {
|
pub struct MyJob {
|
||||||
some_usize: usize,
|
some_usize: usize,
|
||||||
other_usize: usize,
|
other_usize: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MyJobArguments {
|
impl MyJob {
|
||||||
pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||||
MyJobArguments {
|
MyJob {
|
||||||
some_usize,
|
some_usize,
|
||||||
other_usize,
|
other_usize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Job for MyJob {
|
||||||
|
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
||||||
|
info!("args: {:?}", self);
|
||||||
|
|
||||||
|
Box::new(Ok(()).into_future())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct MyProcessor;
|
pub struct MyProcessor;
|
||||||
|
|
||||||
impl Processor for MyProcessor {
|
impl Processor for MyProcessor {
|
||||||
type Arguments = MyJobArguments;
|
type Job = MyJob;
|
||||||
|
|
||||||
fn name() -> &'static str {
|
fn name() -> &'static str {
|
||||||
"MyProcessor"
|
"MyProcessor"
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue() -> &'static str {
|
fn queue() -> &'static str {
|
||||||
"default"
|
DEFAULT_QUEUE
|
||||||
}
|
}
|
||||||
|
|
||||||
fn max_retries() -> MaxRetries {
|
fn max_retries() -> MaxRetries {
|
||||||
|
@ -59,10 +85,4 @@ impl Processor for MyProcessor {
|
||||||
fn backoff_strategy() -> Backoff {
|
fn backoff_strategy() -> Backoff {
|
||||||
Backoff::Exponential(2)
|
Backoff::Exponential(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process(&self, args: Self::Arguments) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
|
||||||
info!("args: {:?}", args);
|
|
||||||
|
|
||||||
Box::new(Ok(()).into_future())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
56
jobs-core/src/job.rs
Normal file
56
jobs-core/src/job.rs
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use failure::Error;
|
||||||
|
use futures::Future;
|
||||||
|
use serde::{de::DeserializeOwned, ser::Serialize};
|
||||||
|
|
||||||
|
use crate::{Backoff, MaxRetries};
|
||||||
|
|
||||||
|
/// The Job trait defines parameters pertaining to an instance of background job
|
||||||
|
pub trait Job: Serialize + DeserializeOwned {
|
||||||
|
/// Users of this library must define what it means to run a job.
|
||||||
|
///
|
||||||
|
/// This should contain all the logic needed to complete a job. If that means queuing more
|
||||||
|
/// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
|
||||||
|
/// processes, that logic should all be called from inside this method.
|
||||||
|
fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send>;
|
||||||
|
|
||||||
|
/// If this job should not use the default queue for its processor, this can be overridden in
|
||||||
|
/// user-code.
|
||||||
|
///
|
||||||
|
/// Jobs will only be processed by processors that are registered, and if a queue is supplied
|
||||||
|
/// here that is not associated with a valid processor for this job, it will never be
|
||||||
|
/// processed.
|
||||||
|
fn queue(&self) -> Option<&str> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If this job should not use the default maximum retry count for its processor, this can be
|
||||||
|
/// overridden in user-code.
|
||||||
|
fn max_retries(&self) -> Option<MaxRetries> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If this job should not use the default backoff strategy for its processor, this can be
|
||||||
|
/// overridden in user-code.
|
||||||
|
fn backoff_strategy(&self) -> Option<Backoff> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,34 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use chrono::{offset::Utc, DateTime, Duration as OldDuration};
|
use chrono::{offset::Utc, DateTime, Duration as OldDuration};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
|
use crate::{Backoff, JobStatus, MaxRetries, ShouldStop};
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
||||||
|
/// Metadata pertaining to a job that exists within the background_jobs system
|
||||||
|
///
|
||||||
|
/// Although exposed publically, this type should only really be handled by the library itself, and
|
||||||
|
/// is impossible to create outside of a
|
||||||
|
/// [Processor](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor)'s new_job
|
||||||
|
/// method.
|
||||||
pub struct JobInfo {
|
pub struct JobInfo {
|
||||||
/// ID of the job, None means an ID has not been set
|
/// ID of the job, None means an ID has not been set
|
||||||
id: Option<usize>,
|
id: Option<usize>,
|
||||||
|
@ -126,7 +151,11 @@ impl JobInfo {
|
||||||
self.status = JobStatus::Staged;
|
self.status = JobStatus::Staged;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&mut self) {
|
/// This method sets the Job's status to running
|
||||||
|
///
|
||||||
|
/// Touching this outside of the background_jobs crates is dangerous, since these libraries
|
||||||
|
/// rely on the state of the job being correct.
|
||||||
|
pub fn set_running(&mut self) {
|
||||||
self.status = JobStatus::Running;
|
self.status = JobStatus::Running;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate failure;
|
extern crate failure;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -7,27 +26,35 @@ extern crate serde_derive;
|
||||||
|
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
|
|
||||||
|
mod job;
|
||||||
mod job_info;
|
mod job_info;
|
||||||
mod processor;
|
mod processor;
|
||||||
mod processors;
|
mod processor_map;
|
||||||
mod storage;
|
mod storage;
|
||||||
|
|
||||||
pub use crate::{
|
pub use crate::{
|
||||||
job_info::JobInfo, processor::Processor, processors::Processors, storage::Storage,
|
job::Job, job_info::JobInfo, processor::Processor, processor_map::ProcessorMap,
|
||||||
|
storage::Storage,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Fail)]
|
#[derive(Debug, Fail)]
|
||||||
|
/// The error type returned by a `Processor`'s `process` method
|
||||||
pub enum JobError {
|
pub enum JobError {
|
||||||
|
/// Some error occurred while processing the job
|
||||||
#[fail(display = "Error performing job: {}", _0)]
|
#[fail(display = "Error performing job: {}", _0)]
|
||||||
Processing(#[cause] Error),
|
Processing(#[cause] Error),
|
||||||
|
|
||||||
|
/// Creating a `Job` type from the provided `serde_json::Value` failed
|
||||||
#[fail(display = "Could not make JSON value from arguments")]
|
#[fail(display = "Could not make JSON value from arguments")]
|
||||||
Json,
|
Json,
|
||||||
|
|
||||||
|
/// No processor was present to handle a given job
|
||||||
#[fail(display = "No processor available for job")]
|
#[fail(display = "No processor available for job")]
|
||||||
MissingProcessor,
|
MissingProcessor,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the status of a job when storing it
|
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
|
||||||
|
/// Set the status of a job when storing it
|
||||||
pub enum JobStatus {
|
pub enum JobStatus {
|
||||||
/// Job should be queued
|
/// Job should be queued
|
||||||
Pending,
|
Pending,
|
||||||
|
@ -79,12 +106,17 @@ impl MaxRetries {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||||
|
/// A type that represents whether a job should be requeued
|
||||||
pub enum ShouldStop {
|
pub enum ShouldStop {
|
||||||
|
/// The job has hit the maximum allowed number of retries, and should be failed permanently
|
||||||
LimitReached,
|
LimitReached,
|
||||||
|
|
||||||
|
/// The job is allowed to be put back into the job queue
|
||||||
Requeue,
|
Requeue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldStop {
|
impl ShouldStop {
|
||||||
|
/// A boolean representation of this state
|
||||||
pub fn should_requeue(&self) -> bool {
|
pub fn should_requeue(&self) -> bool {
|
||||||
*self == ShouldStop::Requeue
|
*self == ShouldStop::Requeue
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,28 +1,107 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{Either, IntoFuture},
|
future::{Either, IntoFuture},
|
||||||
Future,
|
Future,
|
||||||
};
|
};
|
||||||
use serde::{de::DeserializeOwned, ser::Serialize};
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{Backoff, JobError, JobInfo, MaxRetries};
|
use crate::{Backoff, Job, JobError, JobInfo, MaxRetries};
|
||||||
|
|
||||||
/// The Processor trait
|
/// ## The Processor trait
|
||||||
///
|
///
|
||||||
/// Processors define the logic for executing jobs
|
/// Processors define the logic spawning jobs such as
|
||||||
|
/// - The job's name
|
||||||
|
/// - The job's default queue
|
||||||
|
/// - The job's default maximum number of retries
|
||||||
|
/// - The job's [backoff
|
||||||
|
/// strategy](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Backoff)
|
||||||
|
///
|
||||||
|
/// Processors also provide the default mechanism for running a job, and the only mechanism for
|
||||||
|
/// creating a [JobInfo](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.JobInfo),
|
||||||
|
/// which is the type required for queuing jobs to be executed.
|
||||||
|
///
|
||||||
|
/// ### Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
|
||||||
|
/// use failure::Error;
|
||||||
|
/// use futures::future::{Future, IntoFuture};
|
||||||
|
/// use log::info;
|
||||||
|
/// use serde_derive::{Deserialize, Serialize};
|
||||||
|
///
|
||||||
|
/// #[derive(Deserialize, Serialize)]
|
||||||
|
/// struct MyJob {
|
||||||
|
/// count: i32,
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl Job for MyJob {
|
||||||
|
/// fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
||||||
|
/// info!("Processing {}", self.count);
|
||||||
|
///
|
||||||
|
/// Box::new(Ok(()).into_future())
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// #[derive(Clone)]
|
||||||
|
/// struct MyProcessor;
|
||||||
|
///
|
||||||
|
/// impl Processor for MyProcessor {
|
||||||
|
/// type Job = MyJob;
|
||||||
|
///
|
||||||
|
/// fn name() -> &'static str {
|
||||||
|
/// "IncrementProcessor"
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// fn queue() -> &'static str {
|
||||||
|
/// "default"
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// fn max_retries() -> MaxRetries {
|
||||||
|
/// MaxRetries::Count(1)
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// fn backoff_strategy() -> Backoff {
|
||||||
|
/// Backoff::Exponential(2)
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// fn main() -> Result<(), Error> {
|
||||||
|
/// let job = MyProcessor::new_job(MyJob { count: 1234 })?;
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub trait Processor: Clone {
|
pub trait Processor: Clone {
|
||||||
type Arguments: Serialize + DeserializeOwned;
|
type Job: Job;
|
||||||
|
|
||||||
/// The name of the processor
|
/// The name of the processor
|
||||||
///
|
///
|
||||||
/// This name must be unique!!! It is used to look up which processor should handle a job
|
/// This name must be unique!!! It is used to look up which processor should handle a job
|
||||||
fn name() -> &'static str;
|
fn name() -> &'static str;
|
||||||
|
|
||||||
/// The name of the queue
|
/// The name of the default queue for jobs created with this processor
|
||||||
///
|
///
|
||||||
/// The queue determines which workers should process which jobs. By default, all workers
|
/// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
|
||||||
/// process all jobs, but that can be configured when starting the workers
|
/// the job will never be processed.
|
||||||
fn queue() -> &'static str;
|
fn queue() -> &'static str;
|
||||||
|
|
||||||
/// Define the default number of retries for a given processor
|
/// Define the default number of retries for a given processor
|
||||||
|
@ -35,83 +114,59 @@ pub trait Processor: Clone {
|
||||||
/// Jobs can override
|
/// Jobs can override
|
||||||
fn backoff_strategy() -> Backoff;
|
fn backoff_strategy() -> Backoff;
|
||||||
|
|
||||||
/// Defines how jobs for this processor are processed
|
/// A provided method to create a new JobInfo from provided arguments
|
||||||
///
|
///
|
||||||
/// Please do not perform blocking operations in the process method except if put behind
|
/// This is required for spawning jobs, since it enforces the relationship between the job and
|
||||||
/// tokio's `blocking` abstraction
|
/// the Processor that should handle it.
|
||||||
fn process(&self, args: Self::Arguments) -> Box<dyn Future<Item = (), Error = Error> + Send>;
|
fn new_job(job: Self::Job) -> Result<JobInfo, Error> {
|
||||||
|
let queue = job.queue().unwrap_or(Self::queue()).to_owned();
|
||||||
|
let max_retries = job.max_retries().unwrap_or(Self::max_retries());
|
||||||
|
let backoff_strategy = job.backoff_strategy().unwrap_or(Self::backoff_strategy());
|
||||||
|
|
||||||
/// A provided method to create a new Job from provided arguments
|
|
||||||
///
|
|
||||||
/// ### Example
|
|
||||||
///
|
|
||||||
/// ```rust
|
|
||||||
/// #[macro_use]
|
|
||||||
/// extern crate log;
|
|
||||||
///
|
|
||||||
/// use jobs::{Processor, MaxRetries};
|
|
||||||
/// use failure::Error;
|
|
||||||
/// use futures::future::{Future, IntoFuture};
|
|
||||||
///
|
|
||||||
/// struct MyProcessor;
|
|
||||||
///
|
|
||||||
/// impl Processor for MyProcessor {
|
|
||||||
/// type Arguments = i32;
|
|
||||||
///
|
|
||||||
/// fn name() -> &'static str {
|
|
||||||
/// "IncrementProcessor"
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn queue() -> &'static str {
|
|
||||||
/// "default"
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn max_retries() -> MaxRetries {
|
|
||||||
/// MaxRetries::Count(1)
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn backoff_strategy() -> Backoff {
|
|
||||||
/// Backoff::Exponential(2)
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn process(
|
|
||||||
/// &self,
|
|
||||||
/// args: Self::Arguments,
|
|
||||||
/// ) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
|
||||||
/// info!("Processing {}", args);
|
|
||||||
///
|
|
||||||
/// Box::new(Ok(()).into_future())
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn main() -> Result<(), Error> {
|
|
||||||
/// let job = MyProcessor::new_job(1234, None)?;
|
|
||||||
///
|
|
||||||
/// Ok(())
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
fn new_job(
|
|
||||||
args: Self::Arguments,
|
|
||||||
max_retries: Option<MaxRetries>,
|
|
||||||
backoff_strategy: Option<Backoff>,
|
|
||||||
) -> Result<JobInfo, Error> {
|
|
||||||
let job = JobInfo::new(
|
let job = JobInfo::new(
|
||||||
Self::name().to_owned(),
|
Self::name().to_owned(),
|
||||||
Self::queue().to_owned(),
|
queue,
|
||||||
serde_json::to_value(args)?,
|
serde_json::to_value(job)?,
|
||||||
max_retries.unwrap_or(Self::max_retries()),
|
max_retries,
|
||||||
backoff_strategy.unwrap_or(Self::backoff_strategy()),
|
backoff_strategy,
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(job)
|
Ok(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A provided method to coerce arguments into the expected type
|
/// A provided method to coerce arguments into the expected type and run the job
|
||||||
fn do_processing(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
|
///
|
||||||
let res = serde_json::from_value::<Self::Arguments>(args);
|
/// Advanced users may want to override this method in order to provide their own custom
|
||||||
|
/// before/after logic for certain job processors
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
|
||||||
|
/// let res = serde_json::from_value::<Self::Job>(args);
|
||||||
|
///
|
||||||
|
/// let fut = match res {
|
||||||
|
/// Ok(job) => {
|
||||||
|
/// // Perform some custom pre-job logic
|
||||||
|
/// Either::A(job.run().map_err(JobError::Processing))
|
||||||
|
/// },
|
||||||
|
/// Err(_) => Either::B(Err(JobError::Json).into_future()),
|
||||||
|
/// };
|
||||||
|
///
|
||||||
|
/// Box::new(fut.and_then(|_| {
|
||||||
|
/// // Perform some custom post-job logic
|
||||||
|
/// }))
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Patterns like this could be useful if you want to use the same job type for multiple
|
||||||
|
/// scenarios. Defining the `process` method for multiple `Processor`s with different
|
||||||
|
/// before/after logic for the same
|
||||||
|
/// [`Job`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Job) type is
|
||||||
|
/// supported.
|
||||||
|
fn process(&self, args: Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
|
||||||
|
let res = serde_json::from_value::<Self::Job>(args);
|
||||||
|
|
||||||
let fut = match res {
|
let fut = match res {
|
||||||
Ok(item) => Either::A(self.process(item).map_err(JobError::Processing)),
|
Ok(job) => Either::A(job.run().map_err(JobError::Processing)),
|
||||||
Err(_) => Either::B(Err(JobError::Json).into_future()),
|
Err(_) => Either::B(Err(JobError::Json).into_future()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
112
jobs-core/src/processor_map.rs
Normal file
112
jobs-core/src/processor_map.rs
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use futures::future::{Either, Future, IntoFuture};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::{JobError, JobInfo, Processor};
|
||||||
|
|
||||||
|
/// A generic function that processes a job
|
||||||
|
///
|
||||||
|
/// Instead of storing
|
||||||
|
/// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) type
|
||||||
|
/// directly, the
|
||||||
|
/// [`ProcessorMap`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.ProcessorMap)
|
||||||
|
/// struct stores these `ProcessFn` types that don't expose differences in Job types.
|
||||||
|
pub type ProcessFn =
|
||||||
|
Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>;
|
||||||
|
|
||||||
|
/// A type for storing the relationships between processor names and the processor itself
|
||||||
|
///
|
||||||
|
/// [`Processor`s](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) must be
|
||||||
|
/// registered with the `ProcessorMap` in the initialization phase of an application before
|
||||||
|
/// workers are spawned in order to handle queued jobs.
|
||||||
|
pub struct ProcessorMap {
|
||||||
|
inner: HashMap<String, ProcessFn>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessorMap {
|
||||||
|
/// Intialize a `ProcessorMap`
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Default::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a
|
||||||
|
/// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor) with
|
||||||
|
/// this `ProcessorMap`.
|
||||||
|
///
|
||||||
|
/// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so
|
||||||
|
/// make sure to register all your processors up-front.
|
||||||
|
pub fn register_processor<P>(&mut self, processor: P)
|
||||||
|
where
|
||||||
|
P: Processor + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
self.inner.insert(
|
||||||
|
P::name().to_owned(),
|
||||||
|
Box::new(move |value| processor.process(value)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process a given job
|
||||||
|
///
|
||||||
|
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
|
||||||
|
/// intended for internal use.
|
||||||
|
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
|
||||||
|
let opt = self
|
||||||
|
.inner
|
||||||
|
.get(job.processor())
|
||||||
|
.map(|processor| process(processor, job.clone()));
|
||||||
|
|
||||||
|
if let Some(fut) = opt {
|
||||||
|
Either::A(fut)
|
||||||
|
} else {
|
||||||
|
error!("Processor {} not present", job.processor());
|
||||||
|
Either::B(Ok(job).into_future())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ProcessorMap {
|
||||||
|
fn default() -> Self {
|
||||||
|
ProcessorMap {
|
||||||
|
inner: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
|
||||||
|
let args = job.args();
|
||||||
|
|
||||||
|
let processor = job.processor().to_owned();
|
||||||
|
|
||||||
|
process_fn(args).then(move |res| match res {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("Job completed, {}", processor);
|
||||||
|
job.pass();
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Job errored, {}, {}", processor, e);
|
||||||
|
job.fail();
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -1,70 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use futures::future::{Either, Future, IntoFuture};
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use crate::{JobError, JobInfo, Processor};
|
|
||||||
|
|
||||||
pub type ProcessFn =
|
|
||||||
Box<dyn Fn(Value) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>;
|
|
||||||
|
|
||||||
pub struct Processors {
|
|
||||||
inner: HashMap<String, ProcessFn>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Processors {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Default::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn register_processor<P>(&mut self, processor: P)
|
|
||||||
where
|
|
||||||
P: Processor + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
self.inner.insert(
|
|
||||||
P::name().to_owned(),
|
|
||||||
Box::new(move |value| processor.do_processing(value)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
|
|
||||||
let opt = self
|
|
||||||
.inner
|
|
||||||
.get(job.processor())
|
|
||||||
.map(|processor| process(processor, job.clone()));
|
|
||||||
|
|
||||||
if let Some(fut) = opt {
|
|
||||||
Either::A(fut)
|
|
||||||
} else {
|
|
||||||
error!("Processor {} not present", job.processor());
|
|
||||||
Either::B(Ok(job).into_future())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Processors {
|
|
||||||
fn default() -> Self {
|
|
||||||
Processors {
|
|
||||||
inner: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future<Item = JobInfo, Error = ()> {
|
|
||||||
let args = job.args();
|
|
||||||
|
|
||||||
let processor = job.processor().to_owned();
|
|
||||||
|
|
||||||
process_fn(args).then(move |res| match res {
|
|
||||||
Ok(_) => {
|
|
||||||
info!("Job completed, {}", processor);
|
|
||||||
job.pass();
|
|
||||||
Ok(job)
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Job errored, {}, {}", processor, e);
|
|
||||||
job.fail();
|
|
||||||
Ok(job)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
@ -34,6 +53,13 @@ impl<'a> Buckets<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
/// All the logic to interact with the persisted data is defined on this type.
|
||||||
|
///
|
||||||
|
/// Perhapse in the future this will be made generic, but for now it is hard-coded to use LMDB to
|
||||||
|
/// store job information.
|
||||||
|
///
|
||||||
|
/// None of the methods in this module are intended to be used outside of a background-jobs
|
||||||
|
/// runtime.
|
||||||
pub struct Storage {
|
pub struct Storage {
|
||||||
runner_id: usize,
|
runner_id: usize,
|
||||||
store: Arc<RwLock<Store>>,
|
store: Arc<RwLock<Store>>,
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
|
|
||||||
mod server;
|
mod server;
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeSet,
|
collections::BTreeSet,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -100,9 +119,55 @@ impl Config {
|
||||||
#[fail(display = "Queue is missing from map, {}", _0)]
|
#[fail(display = "Queue is missing from map, {}", _0)]
|
||||||
struct MissingQueue(String);
|
struct MissingQueue(String);
|
||||||
|
|
||||||
|
/// The entry point for creating a background-jobs server
|
||||||
|
///
|
||||||
|
/// `ServerConfig` is used to spin up the infrastructure to manage queueing and storing jobs, but
|
||||||
|
/// it does not provide functionality to execute jobs. For that, you must create a
|
||||||
|
/// [`Worker`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.WorkerConfig)
|
||||||
|
/// that will connect to the running server.
|
||||||
|
///
|
||||||
|
/// This type doesn't have any associated data, but is used as a proxy for starting the
|
||||||
|
/// background-jobs runtime.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use std::collections::BTreeSet;
|
||||||
|
/// use background_jobs_server::ServerConfig;
|
||||||
|
/// use failure::Error;
|
||||||
|
///
|
||||||
|
/// fn main() -> Result<(), Error> {
|
||||||
|
/// let mut queue_set = BTreeSet::new();
|
||||||
|
/// queue_set.insert("default".to_owned());
|
||||||
|
///
|
||||||
|
/// let start_server = ServerConfig::init(
|
||||||
|
/// "127.0.0.1",
|
||||||
|
/// 5555,
|
||||||
|
/// 1,
|
||||||
|
/// queue_set,
|
||||||
|
/// "example-db",
|
||||||
|
/// );
|
||||||
|
///
|
||||||
|
/// # let _ = start_server;
|
||||||
|
/// // Comment out the start so we don't run the full server in doctests
|
||||||
|
/// // tokio::run(start_server)
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub struct ServerConfig;
|
pub struct ServerConfig;
|
||||||
|
|
||||||
impl ServerConfig {
|
impl ServerConfig {
|
||||||
|
/// Create a new background-jobs Server that binds to the provided `ip` with ports starting at
|
||||||
|
/// `base_port`.
|
||||||
|
///
|
||||||
|
/// The smallest background-jobs server will bind to 3 ports. Each port serves a different
|
||||||
|
/// purpose:
|
||||||
|
/// - `base_port` is the port that jobs are sent to the server on
|
||||||
|
/// - `base_port` + 1 is the port that the server uses to advertise which queues are available
|
||||||
|
/// - `base_port` + n is bound for an individual queue of jobs that the server pushes to
|
||||||
|
/// workers.
|
||||||
|
///
|
||||||
|
/// This method returns a future that, when run, spawns all of the server's required futures
|
||||||
|
/// onto tokio. Therefore, this can only be used from tokio.
|
||||||
pub fn init<P: AsRef<Path>>(
|
pub fn init<P: AsRef<Path>>(
|
||||||
ip: &str,
|
ip: &str,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
|
@ -115,6 +180,13 @@ impl ServerConfig {
|
||||||
Self::init_with_context(ip, base_port, runner_id, queues, db_path, context)
|
Self::init_with_context(ip, base_port, runner_id, queues, db_path, context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The same as `ServerConfig::init()`, but with a provided ZeroMQ Context.
|
||||||
|
///
|
||||||
|
/// This can be useful if you have other uses of ZeroMQ in your application, and want to share
|
||||||
|
/// a context with your dependencies.
|
||||||
|
///
|
||||||
|
/// If you're running the Server, Worker, and Spawner in the same application, you should share
|
||||||
|
/// a ZeroMQ context between them.
|
||||||
pub fn init_with_context<P: AsRef<Path>>(
|
pub fn init_with_context<P: AsRef<Path>>(
|
||||||
ip: &str,
|
ip: &str,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use background_jobs_core::{JobInfo, Storage};
|
use background_jobs_core::{JobInfo, Storage};
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use background_jobs_core::{JobInfo, Storage};
|
use background_jobs_core::{JobInfo, Storage};
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use background_jobs_core::Storage;
|
use background_jobs_core::Storage;
|
||||||
|
|
|
@ -1,6 +1,25 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use background_jobs_core::JobInfo;
|
use background_jobs_core::Processor;
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use futures::{future::IntoFuture, Future};
|
use futures::{future::IntoFuture, Future};
|
||||||
#[cfg(feature = "futures-zmq")]
|
#[cfg(feature = "futures-zmq")]
|
||||||
|
@ -10,24 +29,58 @@ use log::{debug, trace};
|
||||||
use tokio_zmq::{prelude::*, Push};
|
use tokio_zmq::{prelude::*, Push};
|
||||||
use zmq::{Context, Message};
|
use zmq::{Context, Message};
|
||||||
|
|
||||||
|
/// SpawnerConfig is the only part of this library required to actually exist in your application.
|
||||||
|
///
|
||||||
|
/// This type is used to queue new jobs into the `background-jobs` server.
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// let spawner = SpawnerConfig::new("localhost", 5555);
|
||||||
|
///
|
||||||
|
/// tokio::spawn(
|
||||||
|
/// spawner
|
||||||
|
/// .queue::<MyProcessor>(job)
|
||||||
|
/// .map_err(|_| ()),
|
||||||
|
/// );
|
||||||
|
/// ```
|
||||||
pub struct SpawnerConfig {
|
pub struct SpawnerConfig {
|
||||||
server: String,
|
server: String,
|
||||||
ctx: Arc<Context>,
|
ctx: Arc<Context>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpawnerConfig {
|
impl SpawnerConfig {
|
||||||
|
/// Create a `SpawnerConfig`
|
||||||
|
///
|
||||||
|
/// - `server_host` is the hostname or IP address of the host that the server is running on
|
||||||
|
/// - `base_port` is the same `base_port` from the server config. The spawner will only ever
|
||||||
|
/// need to communicate over `base_port`
|
||||||
pub fn new(server_host: &str, base_port: usize) -> Self {
|
pub fn new(server_host: &str, base_port: usize) -> Self {
|
||||||
let ctx = Arc::new(Context::new());
|
let ctx = Arc::new(Context::new());
|
||||||
|
|
||||||
|
Self::new_with_context(server_host, base_port, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The same as `SpawnerConfig::new()`, but with a provided ZeroMQ Context.
|
||||||
|
///
|
||||||
|
/// This can be useful if you have other uses of ZeroMQ in your application, and want to share
|
||||||
|
/// a context with your dependencies.
|
||||||
|
///
|
||||||
|
/// If you're running the Server, Worker, and Spawner in the same application, you should share
|
||||||
|
/// a ZeroMQ context between them.
|
||||||
|
pub fn new_with_context(server_host: &str, base_port: usize, ctx: Arc<Context>) -> Self {
|
||||||
SpawnerConfig {
|
SpawnerConfig {
|
||||||
server: format!("tcp://{}:{}", server_host, base_port),
|
server: format!("tcp://{}:{}", server_host, base_port),
|
||||||
ctx,
|
ctx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn queue(&self, job: JobInfo) -> impl Future<Item = (), Error = Error> {
|
/// Queue a job to be executed in the background
|
||||||
let msg = serde_json::to_string(&job)
|
pub fn queue<P>(&self, job: P::Job) -> impl Future<Item = (), Error = Error>
|
||||||
|
where
|
||||||
|
P: Processor,
|
||||||
|
{
|
||||||
|
let msg = P::new_job(job)
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
|
.and_then(|job| serde_json::to_string(&job).map_err(Error::from))
|
||||||
.and_then(|s| {
|
.and_then(|s| {
|
||||||
Message::from_slice(s.as_ref())
|
Message::from_slice(s.as_ref())
|
||||||
.map(|m| m.into())
|
.map(|m| m.into())
|
||||||
|
@ -40,7 +93,6 @@ impl SpawnerConfig {
|
||||||
Push::builder(self.ctx.clone())
|
Push::builder(self.ctx.clone())
|
||||||
.connect(&self.server)
|
.connect(&self.server)
|
||||||
.build()
|
.build()
|
||||||
.into_future()
|
|
||||||
.from_err()
|
.from_err()
|
||||||
.join(msg)
|
.join(msg)
|
||||||
.and_then(move |(push, msg)| {
|
.and_then(move |(push, msg)| {
|
||||||
|
|
|
@ -1,6 +1,25 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use background_jobs_core::{JobInfo, Processors};
|
use background_jobs_core::{JobInfo, ProcessorMap};
|
||||||
use failure::{Error, Fail};
|
use failure::{Error, Fail};
|
||||||
use futures::{
|
use futures::{
|
||||||
sync::mpsc::{channel, Sender},
|
sync::mpsc::{channel, Sender},
|
||||||
|
@ -21,7 +40,7 @@ pub(crate) struct Worker {
|
||||||
push_address: String,
|
push_address: String,
|
||||||
pull_address: String,
|
pull_address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
processors: Arc<Processors>,
|
processors: Arc<ProcessorMap>,
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,7 +49,7 @@ impl Worker {
|
||||||
push_address: String,
|
push_address: String,
|
||||||
pull_address: String,
|
pull_address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
processors: Arc<Processors>,
|
processors: Arc<ProcessorMap>,
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
) -> impl Future<Item = (), Error = ()> {
|
) -> impl Future<Item = (), Error = ()> {
|
||||||
let cfg = ResetWorker {
|
let cfg = ResetWorker {
|
||||||
|
@ -103,7 +122,7 @@ struct ResetWorker {
|
||||||
push_address: String,
|
push_address: String,
|
||||||
pull_address: String,
|
pull_address: String,
|
||||||
queue: String,
|
queue: String,
|
||||||
processors: Arc<Processors>,
|
processors: Arc<ProcessorMap>,
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,7 +187,7 @@ fn report_running(
|
||||||
mut job: JobInfo,
|
mut job: JobInfo,
|
||||||
push: Sender<JobInfo>,
|
push: Sender<JobInfo>,
|
||||||
) -> impl Future<Item = JobInfo, Error = Error> {
|
) -> impl Future<Item = JobInfo, Error = Error> {
|
||||||
job.run();
|
job.set_running();
|
||||||
|
|
||||||
push.send(job.clone())
|
push.send(job.clone())
|
||||||
.map(move |_| job)
|
.map(move |_| job)
|
||||||
|
@ -177,7 +196,7 @@ fn report_running(
|
||||||
|
|
||||||
fn process_job(
|
fn process_job(
|
||||||
job: JobInfo,
|
job: JobInfo,
|
||||||
processors: &Processors,
|
processors: &ProcessorMap,
|
||||||
) -> impl Future<Item = JobInfo, Error = Error> {
|
) -> impl Future<Item = JobInfo, Error = Error> {
|
||||||
processors
|
processors
|
||||||
.process_job(job.clone())
|
.process_job(job.clone())
|
||||||
|
|
|
@ -1,6 +1,25 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
use background_jobs_core::{Processor, Processors};
|
use background_jobs_core::{Processor, ProcessorMap};
|
||||||
use failure::Fail;
|
use failure::Fail;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
@ -11,8 +30,35 @@ mod portmap;
|
||||||
|
|
||||||
use self::{config::Worker, portmap::PortMap};
|
use self::{config::Worker, portmap::PortMap};
|
||||||
|
|
||||||
|
/// The entry point for creating a background-jobs worker.
|
||||||
|
///
|
||||||
|
/// A worker handles the processing of jobs, but not the queueing or storing of jobs. It connects
|
||||||
|
/// to a server (crated with
|
||||||
|
/// [`ServerConfig`](https://docs.rs/background-jobs-server/0.1.0/background_jobs_server/struct.ServerConfig))
|
||||||
|
/// and receives work from there.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use std::collections::BTreeMap;
|
||||||
|
/// use background_jobs_server::WorkerConfig;
|
||||||
|
/// use failure::Error;
|
||||||
|
///
|
||||||
|
/// fn main() -> Result<(), Error> {
|
||||||
|
/// let mut queue_map = BTreeMap::new();
|
||||||
|
/// queue_map.insert("default".to_owned(), 10);
|
||||||
|
///
|
||||||
|
/// let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map);
|
||||||
|
///
|
||||||
|
/// // Register a processor
|
||||||
|
/// // worker.register_processor(MyProcessor);
|
||||||
|
///
|
||||||
|
/// // Run the workers
|
||||||
|
/// // tokio::run(worker.run());
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub struct WorkerConfig {
|
pub struct WorkerConfig {
|
||||||
processors: Processors,
|
processors: ProcessorMap,
|
||||||
queues: BTreeMap<String, usize>,
|
queues: BTreeMap<String, usize>,
|
||||||
server_host: String,
|
server_host: String,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
|
@ -20,12 +66,29 @@ pub struct WorkerConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerConfig {
|
impl WorkerConfig {
|
||||||
|
/// Create a new worker
|
||||||
|
///
|
||||||
|
/// This method takes three arguments
|
||||||
|
/// - `server_host` is the hostname, or IP address, of the background-jobs server.
|
||||||
|
/// - `base_port` is the same value from the `ServerConfig` initialization. It dictates the
|
||||||
|
/// port the worker uses to return jobs to the server. The worker is guaranteed to connect
|
||||||
|
/// to at least 2 other ports on the server when functioning properly, `base_port` + 1, and
|
||||||
|
/// `base_port` + n.
|
||||||
|
/// - queues is a mapping between the name of a queue, and the number of workers that should
|
||||||
|
/// be started to process jobs in that queue.
|
||||||
pub fn new(server_host: String, base_port: usize, queues: BTreeMap<String, usize>) -> Self {
|
pub fn new(server_host: String, base_port: usize, queues: BTreeMap<String, usize>) -> Self {
|
||||||
let context = Arc::new(Context::new());
|
let context = Arc::new(Context::new());
|
||||||
|
|
||||||
Self::new_with_context(server_host, base_port, queues, context)
|
Self::new_with_context(server_host, base_port, queues, context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The same as `WorkerConfig::new()`, but with a provided ZeroMQ Context.
|
||||||
|
///
|
||||||
|
/// This can be useful if you have other uses of ZeroMQ in your application, and want to share
|
||||||
|
/// a context with your dependencies.
|
||||||
|
///
|
||||||
|
/// If you're running the Server, Worker, and Spawner in the same application, you should share
|
||||||
|
/// a ZeroMQ context between them.
|
||||||
pub fn new_with_context(
|
pub fn new_with_context(
|
||||||
server_host: String,
|
server_host: String,
|
||||||
base_port: usize,
|
base_port: usize,
|
||||||
|
@ -33,7 +96,7 @@ impl WorkerConfig {
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
WorkerConfig {
|
WorkerConfig {
|
||||||
processors: Processors::new(),
|
processors: ProcessorMap::new(),
|
||||||
server_host,
|
server_host,
|
||||||
base_port,
|
base_port,
|
||||||
queues,
|
queues,
|
||||||
|
@ -41,6 +104,10 @@ impl WorkerConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a processor with this worker
|
||||||
|
///
|
||||||
|
/// For more information, see
|
||||||
|
/// [`Processor`](https://docs.rs/background-jobs/0.1.0/background_jobs/struct.Processor).
|
||||||
pub fn register_processor<P>(&mut self, processor: P)
|
pub fn register_processor<P>(&mut self, processor: P)
|
||||||
where
|
where
|
||||||
P: Processor + Send + Sync + 'static,
|
P: Processor + Send + Sync + 'static,
|
||||||
|
@ -48,6 +115,10 @@ impl WorkerConfig {
|
||||||
self.processors.register_processor(processor);
|
self.processors.register_processor(processor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start the workers
|
||||||
|
///
|
||||||
|
/// This method returns a future that, when run, spawns all of the worker's required futures
|
||||||
|
/// onto tokio. Therefore, this can only be used from tokio.
|
||||||
pub fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
pub fn run(self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||||
let WorkerConfig {
|
let WorkerConfig {
|
||||||
processors,
|
processors,
|
||||||
|
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Background Jobs.
|
||||||
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
use failure::{Error, Fail};
|
use failure::{Error, Fail};
|
||||||
|
|
188
src/lib.rs
188
src/lib.rs
|
@ -1,6 +1,188 @@
|
||||||
pub use background_jobs_core::{
|
/*
|
||||||
Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, ShouldStop, Storage,
|
* This file is part of Background Jobs.
|
||||||
};
|
*
|
||||||
|
* Copyright © 2018 Riley Trautman
|
||||||
|
*
|
||||||
|
* Background Jobs is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Background Jobs is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//! # Background Jobs
|
||||||
|
//!
|
||||||
|
//! This crate provides tooling required to run some processes asynchronously from a usually
|
||||||
|
//! synchronous application. The standard example of this is Web Services, where certain things
|
||||||
|
//! need to be processed, but processing them while a user is waiting for their browser to respond
|
||||||
|
//! might not be the best experience.
|
||||||
|
//!
|
||||||
|
//! ### Usage
|
||||||
|
//! #### Add Background Jobs to your project
|
||||||
|
//! ```toml
|
||||||
|
//! [dependencies]
|
||||||
|
//! background-jobs = "0.1"
|
||||||
|
//! failure = "0.1"
|
||||||
|
//! futures = "0.1"
|
||||||
|
//! tokio = "0.1"
|
||||||
|
//! ```
|
||||||
|
//! #### To get started with Background Jobs, first you should define a job.
|
||||||
|
//! Jobs are a combination of the data required to perform an operation, and the logic of that
|
||||||
|
//! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
|
||||||
|
//!
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! #[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
//! pub struct MyJob {
|
||||||
|
//! some_usize: usize,
|
||||||
|
//! other_usize: usize,
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl MyJob {
|
||||||
|
//! pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||||
|
//! MyJob {
|
||||||
|
//! some_usize,
|
||||||
|
//! other_usize,
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! impl Job for MyJob {
|
||||||
|
//! fn run(self) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
||||||
|
//! info!("args: {:?}", self);
|
||||||
|
//!
|
||||||
|
//! Box::new(Ok(()).into_future())
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! #### Next, define a Processor.
|
||||||
|
//! Processors are types that define default attributes for jobs, as well as containing some logic
|
||||||
|
//! used internally to perform the job. Processors must implement `Proccessor` and `Clone`.
|
||||||
|
//!
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! #[derive(Clone, Debug)]
|
||||||
|
//! pub struct MyProcessor;
|
||||||
|
//!
|
||||||
|
//! impl Processor for MyProcessor {
|
||||||
|
//! // The kind of job this processor should execute
|
||||||
|
//! type Job = MyJob;
|
||||||
|
//!
|
||||||
|
//! // The name of the processor. It is super important that each processor has a unique name,
|
||||||
|
//! // because otherwise one processor will overwrite another processor when they're being
|
||||||
|
//! // registered.
|
||||||
|
//! fn name() -> &'static str {
|
||||||
|
//! "MyProcessor"
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! // The queue that this processor belongs to
|
||||||
|
//! //
|
||||||
|
//! // Workers have the option to subscribe to specific queues, so this is important to
|
||||||
|
//! // determine which worker will call the processor
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override the queue they're spawned on
|
||||||
|
//! fn queue() -> &'static str {
|
||||||
|
//! DEFAULT_QUEUE
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! // The number of times background-jobs should try to retry a job before giving up
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override this value
|
||||||
|
//! fn max_retries() -> MaxRetries {
|
||||||
|
//! MaxRetries::Count(1)
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! // The logic to determine how often to retry this job if it fails
|
||||||
|
//! //
|
||||||
|
//! // Jobs can optionally override this value
|
||||||
|
//! fn backoff_strategy() -> Backoff {
|
||||||
|
//! Backoff::Exponential(2)
|
||||||
|
//! }
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! #### Running jobs
|
||||||
|
//! By default, this crate ships with the `background-jobs-server` feature enabled. This uses the
|
||||||
|
//! `background-jobs-server` crate to spin up a Server and Workers, and provides a mechanism for
|
||||||
|
//! spawning new jobs.
|
||||||
|
//!
|
||||||
|
//! ##### Starting the job server
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! use background_jobs::ServerConfig;
|
||||||
|
//! use failure::Error;
|
||||||
|
//! use server_jobs_example::queue_set;
|
||||||
|
//!
|
||||||
|
//! fn main() -> Result<(), Error> {
|
||||||
|
//! // Run our job server
|
||||||
|
//! tokio::run(ServerConfig::init(
|
||||||
|
//! "127.0.0.1",
|
||||||
|
//! 5555,
|
||||||
|
//! 1,
|
||||||
|
//! queue_set(),
|
||||||
|
//! "example-db",
|
||||||
|
//! ));
|
||||||
|
//!
|
||||||
|
//! Ok(())
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//! ##### Starting the job worker
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! use background_jobs::WorkerConfig;
|
||||||
|
//! use failure::Error;
|
||||||
|
//! use server_jobs_example::{queue_map, MyProcessor};
|
||||||
|
//!
|
||||||
|
//! fn main() -> Result<(), Error> {
|
||||||
|
//! // Create the worker config
|
||||||
|
//! let mut worker = WorkerConfig::new("localhost".to_owned(), 5555, queue_map());
|
||||||
|
//!
|
||||||
|
//! // Register our processor
|
||||||
|
//! worker.register_processor(MyProcessor);
|
||||||
|
//!
|
||||||
|
//! // Spin up the workers
|
||||||
|
//! tokio::run(worker.run());
|
||||||
|
//!
|
||||||
|
//! Ok(())
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//! ##### Queuing jobs
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! use background_jobs::SpawnerConfig;
|
||||||
|
//! use futures::{future::lazy, Future};
|
||||||
|
//! use server_jobs_example::{MyJob, MyProcessor};
|
||||||
|
//!
|
||||||
|
//! fn main() {
|
||||||
|
//! // Create 50 new jobs, each with two consecutive values of the fibonacci sequence
|
||||||
|
//! let (_, _, jobs) = (1..50).fold((0, 1, Vec::new()), |(x, y, mut acc), _| {
|
||||||
|
//! acc.push(MyJob::new(x, y));
|
||||||
|
//!
|
||||||
|
//! (y, x + y, acc)
|
||||||
|
//! });
|
||||||
|
//!
|
||||||
|
//! // Create the spawner
|
||||||
|
//! let spawner = SpawnerConfig::new("localhost", 5555);
|
||||||
|
//!
|
||||||
|
//! // Queue each job
|
||||||
|
//! tokio::run(lazy(move || {
|
||||||
|
//! for job in jobs {
|
||||||
|
//! tokio::spawn(spawner.queue::<MyProcessor>(job).map_err(|_| ()));
|
||||||
|
//! }
|
||||||
|
//!
|
||||||
|
//! Ok(())
|
||||||
|
//! }));
|
||||||
|
//! }
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! If you want to create your own jobs processor based on this idea, you can depend on the
|
||||||
|
//! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some
|
||||||
|
//! other useful types for implementing a jobs processor.
|
||||||
|
|
||||||
|
pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
|
||||||
|
|
||||||
#[cfg(feature = "background-jobs-server")]
|
#[cfg(feature = "background-jobs-server")]
|
||||||
pub use background_jobs_server::{ServerConfig, SpawnerConfig, WorkerConfig};
|
pub use background_jobs_server::{ServerConfig, SpawnerConfig, WorkerConfig};
|
||||||
|
|
Loading…
Reference in a new issue