Update to new futures, new actix

This commit is contained in:
asonix 2020-03-20 21:31:03 -05:00
parent 147a15b2fe
commit 74ac3a9b61
21 changed files with 529 additions and 647 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs"
description = "Background Jobs implemented with sled, actix, and futures"
version = "0.7.0"
version = "0.8.0-alpha.0"
license-file = "LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -21,15 +21,15 @@ members = [
default = ["background-jobs-actix", "background-jobs-sled-storage"]
[dependencies.background-jobs-core]
version = "0.6"
version = "0.7.0"
path = "jobs-core"
[dependencies.background-jobs-actix]
version = "0.6"
version = "0.7.0-alpha.0"
path = "jobs-actix"
optional = true
[dependencies.background-jobs-sled-storage]
version = "0.3.0"
version = "0.4.0-alpha.0"
path = "jobs-sled"
optional = true

View file

@ -7,10 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix = "0.8"
actix = "0.10.0-alpha.2"
actix-rt = "1.0.0"
anyhow = "1.0"
async-trait = "0.1.24"
background-jobs = { version = "0.7.0", path = "../.." }
failure = "0.1"
futures = "0.1"
env_logger = "0.7"
sled-extensions = "0.2.0"
serde = "1.0"
serde_derive = "1.0"
serde = { version = "1.0", features = ["derive"] }

View file

@ -1,7 +1,5 @@
use actix::System;
use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig};
use failure::Error;
use serde_derive::{Deserialize, Serialize};
use anyhow::Error;
use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig};
const DEFAULT_QUEUE: &'static str = "default";
@ -10,7 +8,7 @@ pub struct MyState {
pub app_name: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_usize: usize,
other_usize: usize,
@ -19,10 +17,9 @@ pub struct MyJob {
#[derive(Clone, Debug)]
pub struct MyProcessor;
fn main() -> Result<(), Error> {
// First set up the Actix System to ensure we have a runtime to spawn jobs on.
let sys = System::new("my-actix-system");
#[actix_rt::main]
async fn main() -> Result<(), Error> {
env_logger::init();
// Set up our Storage
// For this example, we use the default in-memory storage mechanism
use background_jobs::memory_storage::Storage;
@ -37,7 +34,7 @@ fn main() -> Result<(), Error> {
*/
// Start the application server. This guards access to to the jobs store
let queue_handle = ServerConfig::new(storage).thread_count(8).start();
let queue_handle = create_server(storage);
// Configure and start our workers
WorkerConfig::new(move || MyState::new("My App"))
@ -51,7 +48,7 @@ fn main() -> Result<(), Error> {
queue_handle.queue(MyJob::new(5, 6))?;
// Block on Actix
sys.run()?;
actix_rt::signal::ctrl_c().await?;
Ok(())
}
@ -72,12 +69,12 @@ impl MyJob {
}
}
#[async_trait::async_trait]
impl Job for MyJob {
type Processor = MyProcessor;
type State = MyState;
type Future = Result<(), Error>;
fn run(self, state: MyState) -> Self::Future {
async fn run(self, state: MyState) -> Result<(), Error> {
println!("{}: args, {:?}", state.app_name, self);
Ok(())

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-actix"
description = "in-process jobs processor based on Actix"
version = "0.6.1"
version = "0.7.0-alpha.0"
license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -10,14 +10,17 @@ readme = "../README.md"
edition = "2018"
[dependencies]
actix = "0.8"
background-jobs-core = { version = "0.6", path = "../jobs-core" }
actix = "0.10.0-alpha.2"
actix-rt = "1.0.0"
anyhow = "1.0"
async-trait = "0.1.24"
background-jobs-core = { version = "0.7", path = "../jobs-core" }
chrono = "0.4"
failure = "0.1"
futures = "0.1"
log = "0.4"
num_cpus = "1.10.0"
rand = "0.7.0"
serde = "1.0"
serde_derive = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "0.2.13", features = ["sync"] }

View file

@ -1,55 +1,30 @@
use std::time::Duration;
use super::{Job, QueueHandle};
use actix::{Actor, AsyncContext, Context};
use crate::{Job, QueueHandle};
use actix::{
clock::{interval_at, Duration, Instant},
Arbiter,
};
use log::error;
/// A type used to schedule recurring jobs.
///
/// ```rust,ignore
/// let server = ServerConfig::new(storage).start();
/// Every::new(server, Duration::from_secs(60 * 30), MyJob::new()).start();
/// every(server, Duration::from_secs(60 * 30), MyJob::new());
/// ```
pub struct Every<J>
pub fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
where
J: Job + Clone + 'static,
J: Job + Clone,
{
spawner: QueueHandle,
duration: Duration,
job: J,
}
Arbiter::spawn(async move {
let mut interval = interval_at(Instant::now(), duration);
impl<J> Every<J>
where
J: Job + Clone + 'static,
{
/// Create a new Every actor
pub fn new(spawner: QueueHandle, duration: Duration, job: J) -> Self {
Every {
spawner,
duration,
job,
}
}
}
loop {
interval.tick().await;
impl<J> Actor for Every<J>
where
J: Job + Clone + 'static,
{
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
match self.spawner.queue(self.job.clone()) {
Ok(_) => (),
match spawner.queue(job.clone()) {
Err(_) => error!("Failed to queue job"),
_ => (),
};
ctx.run_interval(self.duration.clone(), move |actor, _| {
match actor.spawner.queue(actor.job.clone()) {
Ok(_) => (),
Err(_) => error!("Failed to queue job"),
}
});
}
}

View file

@ -15,7 +15,6 @@
//! use actix::System;
//! use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig};
//! use failure::Error;
//! use futures::{future::ok, Future};
//! use serde_derive::{Deserialize, Serialize};
//!
//! const DEFAULT_QUEUE: &'static str = "default";
@ -34,10 +33,8 @@
//! #[derive(Clone, Debug)]
//! pub struct MyProcessor;
//!
//! fn main() -> Result<(), Error> {
//! // First set up the Actix System to ensure we have a runtime to spawn jobs on.
//! let sys = System::new("my-actix-system");
//!
//! #[actix_rt::main]
//! async fn main() -> Result<(), Error> {
//! // Set up our Storage
//! // For this example, we use the default in-memory storage mechanism
//! use background_jobs::memory_storage::Storage;
@ -57,8 +54,8 @@
//! queue_handle.queue(MyJob::new(3, 4))?;
//! queue_handle.queue(MyJob::new(5, 6))?;
//!
//! // Block on Actix
//! sys.run()?;
//! actix_rt::signal::ctrl_c().await?;
//!
//! Ok(())
//! }
//!
@ -79,12 +76,12 @@
//! }
//! }
//!
//! #[async_trait::async_trait]
//! impl Job for MyJob {
//! type Processor = MyProcessor;
//! type State = MyState;
//! type Future = Result<(), Error>;
//!
//! fn run(self, state: MyState) -> Self::Future {
//! async fn run(self, state: MyState) -> Result<(), Error> {
//! println!("{}: args, {:?}", state.app_name, self);
//!
//! Ok(())
@ -120,84 +117,32 @@
//! }
//! ```
use actix::Arbiter;
use anyhow::Error;
use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage};
use log::error;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use actix::{Actor, Addr, Arbiter, SyncArbiter};
use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage};
use failure::{Error, Fail};
use futures::{future::IntoFuture, Future};
mod every;
mod pinger;
mod server;
mod storage;
mod worker;
pub use self::{every::Every, server::Server, worker::LocalWorker};
use self::{every::every, server::Server, worker::local_worker};
use self::{
pinger::Pinger,
server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob},
storage::{ActixStorage, StorageWrapper},
worker::Worker,
};
/// The configuration for a jobs server
/// Create a new Server
///
/// The server guards access to the storage backend, and keeps job information properly
/// up-to-date when workers request jobs to process
pub struct ServerConfig<S> {
storage: S,
threads: usize,
}
impl<S> ServerConfig<S>
/// In previous versions of this library, the server itself was run on it's own dedicated threads
/// and guarded access to jobs via messages. Since we now have futures-aware synchronization
/// primitives, the Server has become an object that gets shared between client threads.
///
/// This method should only be called once.
pub fn create_server<S>(storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
S::Error: Fail,
{
/// Create a new ServerConfig
pub fn new(storage: S) -> Self {
ServerConfig {
storage,
threads: num_cpus::get(),
}
}
/// Set the number of threads to use for the server.
///
/// This is not related to the number of workers or the number of worker threads. This is
/// purely how many threads will be used to manage access to the job store.
///
/// By default, this is the number of processor cores available to the application. On systems
/// with logical cores (such as Intel hyperthreads), this will be the total number of logical
/// cores.
///
/// In certain cases, it may be beneficial to limit the server process count to 1.
///
/// When using actix-web, any configuration performed inside `HttpServer::new` closure will
/// happen on each thread started by the web server. In order to reduce the number of running
/// threads, one job server can be started per web server thread.
///
/// Another case to use a single server is if your job store has not locking guarantee, and you
/// want to enforce that no job can be requested more than once. The default storage
/// implementation does provide this guarantee, but other implementations may not.
pub fn thread_count(mut self, threads: usize) -> Self {
self.threads = threads;
self
}
/// Spin up the server processes
pub fn start(self) -> QueueHandle {
let ServerConfig { storage, threads } = self;
let server = SyncArbiter::start(threads, move || {
Server::new(StorageWrapper(storage.clone()))
});
Pinger::new(server.clone(), threads).start();
QueueHandle { inner: server }
QueueHandle {
inner: Server::new(storage),
}
}
@ -240,7 +185,6 @@ where
where
P: Processor<Job = J> + Send + Sync + 'static,
J: Job<State = State>,
<J::Future as IntoFuture>::Future: Send,
{
self.queues.insert(P::QUEUE.to_owned(), 4);
self.processors.register_processor(processor);
@ -264,13 +208,12 @@ where
self.queues.into_iter().fold(0, |acc, (key, count)| {
(0..count).for_each(|i| {
LocalWorker::new(
local_worker(
acc + i + 1000,
key.clone(),
processors.cached(),
queue_handle.inner.clone(),
)
.start();
);
});
acc + count
@ -285,13 +228,13 @@ where
let processors = processors.clone();
let queue_handle = queue_handle.clone();
let key = key.clone();
LocalWorker::start_in_arbiter(arbiter, move |_| {
LocalWorker::new(
arbiter.exec_fn(move || {
local_worker(
acc + i + 1000,
key.clone(),
processors.cached(),
queue_handle.inner.clone(),
)
);
});
});
@ -306,7 +249,7 @@ where
/// application to spawn jobs.
#[derive(Clone)]
pub struct QueueHandle {
inner: Addr<Server>,
inner: Server,
}
impl QueueHandle {
@ -318,7 +261,13 @@ impl QueueHandle {
where
J: Job,
{
self.inner.do_send(NewJob(J::Processor::new_job(job)?));
let job = J::Processor::new_job(job)?;
let server = self.inner.clone();
actix::spawn(async move {
if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e);
}
});
Ok(())
}
@ -330,21 +279,11 @@ impl QueueHandle {
where
J: Job + Clone + 'static,
{
Every::new(self.clone(), duration, job).start();
every(self.clone(), duration, job);
}
/// Return an overview of the processor's statistics
pub fn get_stats(&self) -> Box<dyn Future<Item = Stats, Error = Error> + Send> {
Box::new(self.inner.send(GetStats).then(coerce))
}
}
fn coerce<I, E, F>(res: Result<Result<I, E>, F>) -> Result<I, E>
where
E: From<F>,
{
match res {
Ok(inner) => inner,
Err(e) => Err(e.into()),
pub async fn get_stats(&self) -> Result<Stats, Error> {
self.inner.get_stats().await
}
}

View file

@ -1,27 +0,0 @@
use actix::{Actor, Addr, AsyncContext, Context};
use std::time::Duration;
use crate::{CheckDb, Server};
pub struct Pinger {
server: Addr<Server>,
threads: usize,
}
impl Pinger {
pub fn new(server: Addr<Server>, threads: usize) -> Self {
Pinger { server, threads }
}
}
impl Actor for Pinger {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(1), |actor, _| {
for _ in 0..actor.threads {
actor.server.do_send(CheckDb);
}
});
}
}

View file

@ -1,150 +1,122 @@
use std::collections::{HashMap, VecDeque};
use crate::{
storage::{ActixStorage, StorageWrapper},
worker::Worker,
};
use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{error, trace};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use tokio::sync::Mutex;
use actix::{Actor, Handler, Message, SyncContext};
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats};
use failure::Error;
use log::trace;
use serde_derive::Deserialize;
use crate::{ActixStorage, Worker};
#[derive(Clone)]
pub(crate) struct ServerCache {
cache: Arc<Mutex<HashMap<String, VecDeque<Box<dyn Worker + Send>>>>>,
}
/// The server Actor
///
/// This server guards access to Thee storage, and keeps a list of workers that are waiting for
/// jobs to process
pub struct Server {
storage: Box<dyn ActixStorage + Send>,
cache: HashMap<String, VecDeque<Box<dyn Worker + Send>>>,
#[derive(Clone)]
pub(crate) struct Server {
storage: Arc<dyn ActixStorage + Send + Sync>,
cache: ServerCache,
}
impl Server {
pub(crate) fn new(storage: impl ActixStorage + Send + 'static) -> Self {
/// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(storage: S) -> Self
where
S: Storage + Sync + 'static,
{
Server {
storage: Box::new(storage),
cache: HashMap::new(),
storage: Arc::new(StorageWrapper(storage)),
cache: ServerCache::new(),
}
}
}
impl Actor for Server {
type Context = SyncContext<Self>;
}
pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> {
let queue = job.queue().to_owned();
let ready = job.is_ready();
self.storage.new_job(job).await?;
#[derive(Clone, Debug, Deserialize)]
pub struct NewJob(pub(crate) NewJobInfo);
if !ready {
return Ok(());
}
#[derive(Clone, Debug, Deserialize)]
pub struct ReturningJob(pub(crate) ReturnJobInfo);
pub struct RequestJob(pub(crate) Box<dyn Worker + Send + 'static>);
pub struct CheckDb;
pub struct GetStats;
impl Message for NewJob {
type Result = Result<(), Error>;
}
impl Message for ReturningJob {
type Result = Result<(), Error>;
}
impl Message for RequestJob {
type Result = Result<(), Error>;
}
impl Message for CheckDb {
type Result = ();
}
impl Message for GetStats {
type Result = Result<Stats, Error>;
}
impl Handler<NewJob> for Server {
type Result = Result<(), Error>;
fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result {
let queue = msg.0.queue().to_owned();
let ready = msg.0.is_ready();
self.storage.new_job(msg.0)?;
if ready {
let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new());
if let Some(worker) = entry.pop_front() {
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()) {
worker.process_job(job);
if let Some(worker) = self.cache.pop(queue.clone()).await {
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?;
}
} else {
entry.push_back(worker);
}
self.cache.push(queue, worker).await;
}
}
Ok(())
}
}
impl Handler<ReturningJob> for Server {
type Result = Result<(), Error>;
fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result {
self.storage.return_job(msg.0).map_err(|e| e.into())
}
}
impl Handler<RequestJob> for Server {
type Result = Result<(), Error>;
fn handle(&mut self, RequestJob(worker): RequestJob, _: &mut Self::Context) -> Self::Result {
pub(crate) async fn request_job(
&self,
worker: Box<dyn Worker + Send + 'static>,
) -> Result<(), Error> {
trace!("Worker {} requested job", worker.id());
let job = self.storage.request_job(worker.queue(), worker.id())?;
if let Some(job) = job {
worker.process_job(job.clone());
if let Ok(Some(job)) = self.storage.request_job(worker.queue(), worker.id()).await {
if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await?;
}
} else {
trace!(
"storing worker {} for queue {}",
worker.id(),
worker.queue()
);
let entry = self
.cache
.entry(worker.queue().to_owned())
.or_insert(VecDeque::new());
entry.push_back(worker);
self.cache.push(worker.queue().to_owned(), worker).await;
}
Ok(())
}
}
impl Handler<CheckDb> for Server {
type Result = ();
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {
Ok(self.storage.return_job(job).await?)
}
fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result {
trace!("Checkdb");
for (queue, workers) in self.cache.iter_mut() {
while !workers.is_empty() {
if let Some(worker) = workers.pop_front() {
if let Ok(Some(job)) = self.storage.request_job(queue, worker.id()) {
worker.process_job(job);
} else {
workers.push_back(worker);
break;
}
}
}
}
pub(crate) async fn get_stats(&self) -> Result<Stats, Error> {
Ok(self.storage.get_stats().await?)
}
}
impl Handler<GetStats> for Server {
type Result = Result<Stats, Error>;
impl ServerCache {
fn new() -> Self {
ServerCache {
cache: Arc::new(Mutex::new(HashMap::new())),
}
}
fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result {
self.storage.get_stats().map_err(|e| e.into())
async fn push(&self, queue: String, worker: Box<dyn Worker + Send>) {
let mut cache = self.cache.lock().await;
let entry = cache.entry(queue).or_insert(VecDeque::new());
entry.push_back(worker);
}
async fn pop(&self, queue: String) -> Option<Box<dyn Worker + Send>> {
let mut cache = self.cache.lock().await;
let mut vec_deque = cache.remove(&queue)?;
let item = vec_deque.pop_front()?;
if !vec_deque.is_empty() {
cache.insert(queue, vec_deque);
}
Some(item)
}
}

View file

@ -1,39 +1,41 @@
use anyhow::Error;
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage};
use failure::{Error, Fail};
#[async_trait::async_trait]
pub(crate) trait ActixStorage {
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Error>;
async fn new_job(&self, job: NewJobInfo) -> Result<u64, Error>;
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error>;
async fn request_job(&self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error>;
fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error>;
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>;
fn get_stats(&self) -> Result<Stats, Error>;
async fn get_stats(&self) -> Result<Stats, Error>;
}
pub(crate) struct StorageWrapper<S>(pub(crate) S)
where
S: Storage,
S::Error: Fail;
S: Storage + Send + Sync,
S::Error: Send + Sync + 'static;
#[async_trait::async_trait]
impl<S> ActixStorage for StorageWrapper<S>
where
S: Storage,
S::Error: Fail,
S: Storage + Send + Sync,
S::Error: Send + Sync + 'static,
{
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Error> {
self.0.new_job(job).map_err(Error::from)
async fn new_job(&self, job: NewJobInfo) -> Result<u64, Error> {
Ok(self.0.new_job(job).await?)
}
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error> {
self.0.request_job(queue, runner_id).map_err(Error::from)
async fn request_job(&self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error> {
Ok(self.0.request_job(queue, runner_id).await?)
}
fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error> {
self.0.return_job(ret).map_err(Error::from)
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> {
Ok(self.0.return_job(ret).await?)
}
fn get_stats(&self) -> Result<Stats, Error> {
self.0.get_stats().map_err(Error::from)
async fn get_stats(&self) -> Result<Stats, Error> {
Ok(self.0.get_stats().await?)
}
}

View file

@ -1,38 +1,34 @@
use actix::{
dev::ToEnvelope,
fut::{wrap_future, ActorFuture},
Actor, Addr, AsyncContext, Context, Handler, Message,
};
use background_jobs_core::{JobInfo, CachedProcessorMap};
use log::info;
use crate::{RequestJob, ReturningJob};
use crate::Server;
use background_jobs_core::{CachedProcessorMap, JobInfo};
use log::{debug, error, warn};
use tokio::sync::mpsc::{channel, Sender};
#[async_trait::async_trait]
pub trait Worker {
fn process_job(&self, job: JobInfo);
async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo>;
fn id(&self) -> u64;
fn queue(&self) -> &str;
}
pub struct LocalWorkerHandle<W>
where
W: Actor + Handler<ProcessJob>,
W::Context: ToEnvelope<W, ProcessJob>,
{
addr: Addr<W>,
#[derive(Clone)]
pub(crate) struct LocalWorkerHandle {
tx: Sender<JobInfo>,
id: u64,
queue: String,
}
impl<W> Worker for LocalWorkerHandle<W>
where
W: Actor + Handler<ProcessJob>,
W::Context: ToEnvelope<W, ProcessJob>,
{
fn process_job(&self, job: JobInfo) {
self.addr.do_send(ProcessJob(job));
#[async_trait::async_trait]
impl Worker for LocalWorkerHandle {
async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo> {
match self.tx.clone().send(job).await {
Err(e) => {
error!("Unable to send job");
Err(e.0)
}
_ => Ok(()),
}
}
fn id(&self) -> u64 {
@ -44,79 +40,39 @@ where
}
}
/// A worker that runs on the same system as the jobs server
pub struct LocalWorker<S, State>
where
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
State: Clone + 'static,
{
pub(crate) fn local_worker<State>(
id: u64,
queue: String,
processors: CachedProcessorMap<State>,
server: Addr<S>,
}
impl<S, State> LocalWorker<S, State>
where
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
server: Server,
) where
State: Clone + 'static,
{
/// Create a new local worker
pub fn new(id: u64, queue: String, processors: CachedProcessorMap<State>, server: Addr<S>) -> Self {
LocalWorker {
let (tx, mut rx) = channel(16);
let handle = LocalWorkerHandle {
tx: tx.clone(),
id,
queue,
processors,
server,
queue: queue.clone(),
};
actix::spawn(async move {
debug!("Beginning worker loop for {}", id);
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
error!("Couldn't request first job, bailing, {}", e);
return;
}
while let Some(job) = rx.recv().await {
let return_job = processors.process_job(job).await;
if let Err(e) = server.return_job(return_job).await {
error!("Error returning job, {}", e);
}
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
error!("Error requesting job, {}", e);
break;
}
}
}
impl<S, State> Actor for LocalWorker<S, State>
where
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
State: Clone + 'static,
{
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.server.do_send(RequestJob(Box::new(LocalWorkerHandle {
id: self.id,
queue: self.queue.clone(),
addr: ctx.address(),
})));
}
}
pub struct ProcessJob(JobInfo);
impl Message for ProcessJob {
type Result = ();
}
impl<S, State> Handler<ProcessJob> for LocalWorker<S, State>
where
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
State: Clone + 'static,
{
type Result = ();
fn handle(&mut self, ProcessJob(job): ProcessJob, ctx: &mut Self::Context) -> Self::Result {
info!("Worker {} processing job {}", self.id, job.id());
let fut =
wrap_future::<_, Self>(self.processors.process_job(job)).map(|job, actor, ctx| {
actor.server.do_send(ReturningJob(job));
actor.server.do_send(RequestJob(Box::new(LocalWorkerHandle {
id: actor.id,
queue: actor.queue.clone(),
addr: ctx.address(),
})));
warn!("Worker {} closing", id);
});
ctx.spawn(fut);
}
}

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor"
version = "0.6.1"
version = "0.7.0"
license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -10,10 +10,11 @@ readme = "../README.md"
edition = "2018"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.24"
chrono = { version = "0.4", features = ["serde"] }
failure = "0.1"
futures = "0.1.21"
futures = "0.3.4"
log = "0.4"
serde = "1.0"
serde_derive = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

View file

@ -1,10 +1,9 @@
use failure::Error;
use futures::future::IntoFuture;
use crate::{Backoff, MaxRetries, Processor};
use anyhow::Error;
use serde::{de::DeserializeOwned, ser::Serialize};
use crate::{Backoff, MaxRetries, Processor};
/// The Job trait defines parameters pertaining to an instance of background job
#[async_trait::async_trait]
pub trait Job: Serialize + DeserializeOwned + 'static {
/// The processor this job is associated with. The job's processor can be used to create a
/// JobInfo from a job, which is used to serialize the job into a storage mechanism.
@ -13,9 +12,6 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// The application state provided to this job at runtime.
type State: Clone + 'static;
/// The result of running this operation
type Future: IntoFuture<Item = (), Error = Error>;
/// Users of this library must define what it means to run a job.
///
/// This should contain all the logic needed to complete a job. If that means queuing more
@ -25,7 +21,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// The state passed into this job is initialized at the start of the application. The state
/// argument could be useful for containing a hook into something like r2d2, or the address of
/// an actor in an actix-based system.
fn run(self, state: Self::State) -> Self::Future;
async fn run(self, state: Self::State) -> Result<(), Error>;
/// If this job should not use the default queue for its processor, this can be overridden in
/// user-code.

View file

@ -1,11 +1,9 @@
use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
use chrono::{offset::Utc, DateTime, Duration as OldDuration};
use log::trace;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
/// Information about the sate of an attempted job
pub struct ReturnJobInfo {
pub(crate) id: u64,
@ -35,7 +33,7 @@ impl ReturnJobInfo {
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
/// Information about a newly created job
pub struct NewJobInfo {
/// Name of the processor that should handle this job
@ -105,7 +103,7 @@ impl NewJobInfo {
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
/// Metadata pertaining to a job that exists within the background_jobs system
///
/// Although exposed publically, this type should only really be handled by the library itself, and
@ -167,6 +165,14 @@ impl JobInfo {
self.id
}
/// Convert a JobInfo into a ReturnJobInfo without executing it
pub fn unexecuted(self) -> ReturnJobInfo {
ReturnJobInfo {
id: self.id,
result: JobResult::Unexecuted,
}
}
pub(crate) fn increment(&mut self) -> ShouldStop {
self.updated();
self.retry_count += 1;

View file

@ -6,8 +6,7 @@
//! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs
//! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate.
use failure::{Error, Fail};
use serde_derive::{Deserialize, Serialize};
use anyhow::Error;
mod job;
mod job_info;
@ -25,23 +24,23 @@ pub use crate::{
storage::{memory_storage, Storage},
};
#[derive(Debug, Fail)]
#[derive(Debug, thiserror::Error)]
/// The error type returned by a `Processor`'s `process` method
pub enum JobError {
/// Some error occurred while processing the job
#[fail(display = "Error performing job: {}", _0)]
Processing(#[cause] Error),
#[error("Error performing job: {0}")]
Processing(#[from] Error),
/// Creating a `Job` type from the provided `serde_json::Value` failed
#[fail(display = "Could not make JSON value from arguments")]
#[error("Could not make JSON value from arguments")]
Json,
/// No processor was present to handle a given job
#[fail(display = "No processor available for job")]
#[error("No processor available for job")]
MissingProcessor,
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Indicate the state of a job after an attempted run
pub enum JobResult {
/// The job succeeded
@ -52,6 +51,9 @@ pub enum JobResult {
/// There was no processor to run the job
MissingProcessor,
/// The worker requesting this job closed
Unexecuted,
}
impl JobResult {
@ -86,7 +88,7 @@ impl JobResult {
}
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Set the status of a job when storing it
pub enum JobStatus {
/// Job should be queued
@ -118,7 +120,7 @@ impl JobStatus {
}
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Different styles for retrying jobs
pub enum Backoff {
/// Seconds between execution
@ -135,7 +137,7 @@ pub enum Backoff {
Exponential(usize),
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// How many times a job should be retried before giving up
pub enum MaxRetries {
/// Keep retrying forever
@ -176,3 +178,9 @@ impl ShouldStop {
*self == ShouldStop::Requeue
}
}
impl From<serde_json::error::Error> for JobError {
fn from(_: serde_json::error::Error) -> Self {
JobError::Json
}
}

View file

@ -1,12 +1,8 @@
use chrono::{offset::Utc, DateTime};
use failure::{Error, Fail};
use futures::{
future::{Either, IntoFuture},
Future,
};
use serde_json::Value;
use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
use anyhow::Error;
use chrono::{offset::Utc, DateTime};
use serde_json::Value;
use std::{future::Future, pin::Pin};
/// ## The Processor trait
///
@ -25,11 +21,12 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// ### Example
///
/// ```rust
/// use anyhow::Error;
/// use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
/// use failure::Error;
/// use futures::future::Future;
/// use futures::future::{ok, Ready};
/// use log::info;
/// use serde_derive::{Deserialize, Serialize};
/// use std::future::Future;
///
/// #[derive(Deserialize, Serialize)]
/// struct MyJob {
@ -39,12 +36,12 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// impl Job for MyJob {
/// type Processor = MyProcessor;
/// type State = ();
/// type Future = Result<(), Error>;
/// type Future = Ready<Result<(), Error>>;
///
/// fn run(self, _state: Self::State) -> Self::Future {
/// info!("Processing {}", self.count);
///
/// Ok(())
/// ok(())
/// }
/// }
///
@ -133,20 +130,18 @@ pub trait Processor: Clone {
/// &self,
/// args: Value,
/// state: S
/// ) -> Box<dyn Future<Item = (), Error = JobError> + Send> {
/// ) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> {
/// let res = serde_json::from_value::<Self::Job>(args);
///
/// let fut = match res {
/// Ok(job) => {
/// // Perform some custom pre-job logic
/// Either::A(job.run(state).map_err(JobError::Processing))
/// },
/// Err(_) => Either::B(Err(JobError::Json).into_future()),
/// };
/// Box::pin(async move {
/// let job = res.map_err(|_| JobError::Json)?;
/// // Perform some custom pre-job locic
///
/// job.run(state).await.map_err(JobError::Processing)?;
///
/// Box::new(fut.and_then(|_| {
/// // Perform some custom post-job logic
/// }))
/// Ok(())
/// })
/// }
/// ```
///
@ -157,21 +152,19 @@ pub trait Processor: Clone {
&self,
args: Value,
state: <Self::Job as Job>::State,
) -> Box<dyn Future<Item = (), Error = JobError> + Send>
where
<<Self::Job as Job>::Future as IntoFuture>::Future: Send,
{
let res = serde_json::from_value::<Self::Job>(args);
) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> {
// Call run on the job here because State isn't Send, but the future produced by job IS
// Send
let res = serde_json::from_value::<Self::Job>(args).map(move |job| job.run(state));
let fut = match res {
Ok(job) => Either::A(job.run(state).into_future().map_err(JobError::Processing)),
Err(_) => Either::B(Err(JobError::Json).into_future()),
};
Box::pin(async move {
res?.await?;
Box::new(fut)
Ok(())
})
}
}
#[derive(Clone, Debug, Fail)]
#[fail(display = "Failed to to turn job into value")]
#[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to to turn job into value")]
pub struct ToJson;

View file

@ -1,17 +1,15 @@
use std::{collections::HashMap, sync::Arc};
use futures::future::{Either, Future, IntoFuture};
use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo};
use log::{error, info};
use serde_json::Value;
use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
/// A generic function that processes a job
///
/// Instead of storing [`Processor`] type directly, the [`ProcessorMap`]
/// struct stores these `ProcessFn` types that don't expose differences in Job types.
pub type ProcessFn<S> =
Arc<dyn Fn(Value, S) -> Box<dyn Future<Item = (), Error = JobError> + Send> + Send + Sync>;
pub type ProcessFn<S> = Arc<
dyn Fn(Value, S) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send + Sync,
>;
pub type StateFn<S> = Arc<dyn Fn() -> S + Send + Sync>;
@ -59,7 +57,6 @@ where
where
P: Processor<Job = J> + Sync + Send + 'static,
J: Job<State = S>,
<J::Future as IntoFuture>::Future: Send,
{
self.inner.insert(
P::NAME.to_owned(),
@ -79,17 +76,17 @@ where
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = ReturnJobInfo, Error = ()> {
pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo {
let opt = self
.inner
.get(job.processor())
.map(|processor| process(processor, (self.state_fn)(), job.clone()));
if let Some(fut) = opt {
Either::A(fut)
fut.await
} else {
error!("Processor {} not present", job.processor());
Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future())
ReturnJobInfo::missing_processor(job.id())
}
}
}
@ -102,38 +99,29 @@ where
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
pub fn process_job(&self, job: JobInfo) -> impl Future<Item = ReturnJobInfo, Error = ()> {
let opt = self
.inner
.get(job.processor())
.map(|processor| process(processor, self.state.clone(), job.clone()));
if let Some(fut) = opt {
Either::A(fut)
pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo {
if let Some(processor) = self.inner.get(job.processor()) {
process(processor, self.state.clone(), job).await
} else {
error!("Processor {} not present", job.processor());
Either::B(Ok(ReturnJobInfo::missing_processor(job.id())).into_future())
ReturnJobInfo::missing_processor(job.id())
}
}
}
fn process<S>(
process_fn: &ProcessFn<S>,
state: S,
job: JobInfo,
) -> impl Future<Item = ReturnJobInfo, Error = ()> {
async fn process<S>(process_fn: &ProcessFn<S>, state: S, job: JobInfo) -> ReturnJobInfo {
let args = job.args();
let id = job.id();
let processor = job.processor().to_owned();
process_fn(args, state).then(move |res| match res {
match process_fn(args, state).await {
Ok(_) => {
info!("Job {} completed, {}", id, processor);
Ok(ReturnJobInfo::pass(id))
ReturnJobInfo::pass(id)
}
Err(e) => {
error!("Job {} errored, {}, {}", id, processor, e);
Ok(ReturnJobInfo::fail(id))
ReturnJobInfo::fail(id)
}
}
})
}

View file

@ -1,7 +1,6 @@
use chrono::{offset::Utc, DateTime, Datelike, Timelike};
use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// Statistics about the jobs processor
pub struct Stats {
/// How many jobs are pending execution
@ -72,7 +71,7 @@ impl Default for Stats {
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// A time-based overview of job completion and failures
pub struct JobStat {
this_hour: usize,

View file

@ -1,6 +1,6 @@
use chrono::offset::Utc;
use failure::Fail;
use log::error;
use std::error::Error;
use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
@ -10,72 +10,77 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
/// HashMaps and uses counting to assign IDs. If jobs must be persistent across application
/// restarts, look into the [`sled-backed`](https://github.com/spacejam/sled) implementation from
/// the `background-jobs-sled-storage` crate.
#[async_trait::async_trait]
pub trait Storage: Clone + Send {
/// The error type used by the storage mechansim.
type Error: Fail;
type Error: Error + Send + Sync;
/// This method generates unique IDs for jobs
fn generate_id(&mut self) -> Result<u64, Self::Error>;
async fn generate_id(&self) -> Result<u64, Self::Error>;
/// This method should store the supplied job
///
/// The supplied job _may already be present_. The implementation should overwrite the stored
/// job with the new job so that future calls to `fetch_job` return the new one.
fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error>;
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error>;
/// This method should return the job with the given ID regardless of what state the job is in.
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>, Self::Error>;
async fn fetch_job(&self, id: u64) -> Result<Option<JobInfo>, Self::Error>;
/// This should fetch a job ready to be processed from the queue
///
/// If a job is not ready, is currently running, or is not in the requested queue, this method
/// should not return it. If no jobs meet these criteria, this method should return Ok(None)
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>, Self::Error>;
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error>;
/// This method tells the storage mechanism to mark the given job as being in the provided
/// queue
fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error>;
async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error>;
/// This method tells the storage mechanism to mark a given job as running
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<(), Self::Error>;
async fn run_job(&self, id: u64, runner_id: u64) -> Result<(), Self::Error>;
/// This method tells the storage mechanism to remove the job
///
/// This happens when a job has been completed or has failed too many times
fn delete_job(&mut self, id: u64) -> Result<(), Self::Error>;
async fn delete_job(&self, id: u64) -> Result<(), Self::Error>;
/// This method returns the current statistics, or Stats::default() if none exists.
fn get_stats(&self) -> Result<Stats, Self::Error>;
async fn get_stats(&self) -> Result<Stats, Self::Error>;
/// This method fetches the existing statistics or Stats::default(), and stores the result of
/// calling `update_stats` on it.
fn update_stats<F>(&mut self, f: F) -> Result<(), Self::Error>
async fn update_stats<F>(&self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats;
F: Fn(Stats) -> Stats + Send + 'static;
/// Generate a new job based on the provided NewJobInfo
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Self::Error> {
let id = self.generate_id()?;
async fn new_job(&self, job: NewJobInfo) -> Result<u64, Self::Error> {
let id = self.generate_id().await?;
let job = job.with_id(id);
let queue = job.queue().to_owned();
self.save_job(job)?;
self.queue_job(&queue, id)?;
self.update_stats(Stats::new_job)?;
self.save_job(job).await?;
self.queue_job(&queue, id).await?;
self.update_stats(Stats::new_job).await?;
Ok(id)
}
/// Fetch a job that is ready to be executed, marking it as running
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Self::Error> {
match self.fetch_job_from_queue(queue)? {
async fn request_job(
&self,
queue: &str,
runner_id: u64,
) -> Result<Option<JobInfo>, Self::Error> {
match self.fetch_job_from_queue(queue).await? {
Some(mut job) => {
if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) {
job.run();
self.run_job(job.id(), runner_id)?;
self.save_job(job.clone())?;
self.update_stats(Stats::run_job)?;
self.run_job(job.id(), runner_id).await?;
self.save_job(job.clone()).await?;
self.update_stats(Stats::run_job).await?;
Ok(Some(job))
} else {
@ -91,35 +96,35 @@ pub trait Storage: Clone + Send {
}
/// "Return" a job to the database, marking it for retry if needed
fn return_job(
&mut self,
async fn return_job(
&self,
ReturnJobInfo { id, result }: ReturnJobInfo,
) -> Result<(), Self::Error> {
if result.is_failure() {
if let Some(mut job) = self.fetch_job(id)? {
if let Some(mut job) = self.fetch_job(id).await? {
if job.needs_retry() {
self.queue_job(job.queue(), id)?;
self.save_job(job)?;
self.update_stats(Stats::retry_job)
self.queue_job(job.queue(), id).await?;
self.save_job(job).await?;
self.update_stats(Stats::retry_job).await
} else {
self.delete_job(id)?;
self.update_stats(Stats::fail_job)
self.delete_job(id).await?;
self.update_stats(Stats::fail_job).await
}
} else {
Ok(())
}
} else if result.is_missing_processor() {
if let Some(mut job) = self.fetch_job(id)? {
if let Some(mut job) = self.fetch_job(id).await? {
job.pending();
self.queue_job(job.queue(), id)?;
self.save_job(job)?;
self.update_stats(Stats::retry_job)
self.queue_job(job.queue(), id).await?;
self.save_job(job).await?;
self.update_stats(Stats::retry_job).await
} else {
Ok(())
}
} else {
self.delete_job(id)?;
self.update_stats(Stats::complete_job)
self.delete_job(id).await?;
self.update_stats(Stats::complete_job).await
}
}
}
@ -127,12 +132,8 @@ pub trait Storage: Clone + Send {
/// A default, in-memory implementation of a storage mechanism
pub mod memory_storage {
use super::{JobInfo, Stats};
use failure::Fail;
use std::{
collections::HashMap,
fmt,
sync::{Arc, Mutex},
};
use futures::lock::Mutex;
use std::{collections::HashMap, convert::Infallible, sync::Arc};
#[derive(Clone)]
/// An In-Memory store for jobs
@ -166,30 +167,31 @@ pub mod memory_storage {
}
}
#[async_trait::async_trait]
impl super::Storage for Storage {
type Error = Never;
type Error = Infallible;
fn generate_id(&mut self) -> Result<u64, Self::Error> {
let mut inner = self.inner.lock().unwrap();
async fn generate_id(&self) -> Result<u64, Self::Error> {
let mut inner = self.inner.lock().await;
let id = inner.count;
inner.count = inner.count.wrapping_add(1);
Ok(id)
}
fn save_job(&mut self, job: JobInfo) -> Result<(), Self::Error> {
self.inner.lock().unwrap().jobs.insert(job.id(), job);
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> {
self.inner.lock().await.jobs.insert(job.id(), job);
Ok(())
}
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>, Self::Error> {
let j = self.inner.lock().unwrap().jobs.get(&id).map(|j| j.clone());
async fn fetch_job(&self, id: u64) -> Result<Option<JobInfo>, Self::Error> {
let j = self.inner.lock().await.jobs.get(&id).map(|j| j.clone());
Ok(j)
}
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>, Self::Error> {
let mut inner = self.inner.lock().unwrap();
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, Self::Error> {
let mut inner = self.inner.lock().await;
let j = inner
.queues
@ -210,25 +212,21 @@ pub mod memory_storage {
Ok(j)
}
fn queue_job(&mut self, queue: &str, id: u64) -> Result<(), Self::Error> {
self.inner
.lock()
.unwrap()
.queues
.insert(id, queue.to_owned());
async fn queue_job(&self, queue: &str, id: u64) -> Result<(), Self::Error> {
self.inner.lock().await.queues.insert(id, queue.to_owned());
Ok(())
}
fn run_job(&mut self, id: u64, worker_id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
async fn run_job(&self, id: u64, worker_id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().await;
inner.worker_ids.insert(id, worker_id);
inner.worker_ids_inverse.insert(worker_id, id);
Ok(())
}
fn delete_job(&mut self, id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
async fn delete_job(&self, id: u64) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().await;
inner.jobs.remove(&id);
inner.queues.remove(&id);
if let Some(worker_id) = inner.worker_ids.remove(&id) {
@ -237,28 +235,18 @@ pub mod memory_storage {
Ok(())
}
fn get_stats(&self) -> Result<Stats, Self::Error> {
Ok(self.inner.lock().unwrap().stats.clone())
async fn get_stats(&self) -> Result<Stats, Self::Error> {
Ok(self.inner.lock().await.stats.clone())
}
fn update_stats<F>(&mut self, f: F) -> Result<(), Self::Error>
async fn update_stats<F>(&self, f: F) -> Result<(), Self::Error>
where
F: Fn(Stats) -> Stats,
F: Fn(Stats) -> Stats + Send,
{
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock().await;
inner.stats = (f)(inner.stats.clone());
Ok(())
}
}
#[derive(Clone, Debug, Fail)]
/// An error that is impossible to create
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
}

View file

@ -11,6 +11,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
background-jobs-core = { version = "0.6", path = "../jobs-core" }
actix-threadpool = "0.3.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.7", path = "../jobs-core" }
chrono = "0.4"
sled-extensions = { version = "0.2", features = ["bincode", "cbor"] }
thiserror = "1.0"

View file

@ -13,11 +13,25 @@
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
//! ```
use actix_threadpool::{run, BlockingError};
use background_jobs_core::{JobInfo, Stats, Storage};
use chrono::offset::Utc;
use sled_extensions::{bincode::Tree, cbor, Db, DbExt};
pub use sled_extensions::{Error, Result};
/// The error produced by sled storage calls
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error in the database
#[error("Error in sled extensions, {0}")]
Sled(sled_extensions::Error),
/// Error executing db operation
#[error("Blocking operation was canceled")]
Canceled,
}
/// A simple alias for Result<T, Error>
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
/// The Sled-backed storage implementation
@ -31,34 +45,57 @@ pub struct SledStorage {
db: Db,
}
#[async_trait::async_trait]
impl Storage for SledStorage {
type Error = Error;
fn generate_id(&mut self) -> Result<u64> {
Ok(self.db.generate_id()?)
async fn generate_id(&self) -> Result<u64> {
let this = self.clone();
Ok(run(move || Ok(this.db.generate_id()?) as sled_extensions::Result<u64>).await?)
}
fn save_job(&mut self, job: JobInfo) -> Result<()> {
self.jobinfo
async fn save_job(&self, job: JobInfo) -> Result<()> {
let this = self.clone();
Ok(run(move || {
this.jobinfo
.insert(job_key(job.id()).as_bytes(), job)
.map(|_| ())
})
.await?)
}
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>> {
self.jobinfo.get(job_key(id))
async fn fetch_job(&self, id: u64) -> Result<Option<JobInfo>> {
let this = self.clone();
Ok(run(move || this.jobinfo.get(job_key(id))).await?)
}
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
let queue_tree = self.queue.clone();
let job_tree = self.jobinfo.clone();
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>> {
let this = self.clone();
let queue = queue.to_owned();
self.lock_queue(queue, move || {
Ok(run(move || {
let queue_tree = this.queue.clone();
let job_tree = this.jobinfo.clone();
let queue2 = queue.clone();
this.lock_queue(&queue2, move || {
let now = Utc::now();
let job = queue_tree
.iter()
.filter_map(|res| res.ok())
.filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None })
.filter_map(
|(id, in_queue)| {
if queue == in_queue {
Some(id)
} else {
None
}
},
)
.filter_map(|id| job_tree.get(id).ok())
.filter_map(|opt| opt)
.filter(|job| job.is_ready(now))
@ -68,49 +105,73 @@ impl Storage for SledStorage {
queue_tree.remove(&job_key(job.id()))?;
}
Ok(job)
Ok(job) as sled_extensions::Result<Option<JobInfo>>
})
})
.await?)
}
fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> {
if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? {
self.running.remove(&runner_key(runner_id))?;
async fn queue_job(&self, queue: &str, id: u64) -> Result<()> {
let this = self.clone();
let queue = queue.to_owned();
Ok(run(move || {
if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? {
this.running.remove(&runner_key(runner_id))?;
}
self.queue
.insert(job_key(id).as_bytes(), queue.to_owned())
.map(|_| ())
this.queue.insert(job_key(id).as_bytes(), queue).map(|_| ())
})
.await?)
}
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> {
self.queue.remove(job_key(id))?;
self.running.insert(runner_key(runner_id).as_bytes(), id)?;
self.running_inverse
async fn run_job(&self, id: u64, runner_id: u64) -> Result<()> {
let this = self.clone();
Ok(run(move || {
this.queue.remove(job_key(id))?;
this.running.insert(runner_key(runner_id).as_bytes(), id)?;
this.running_inverse
.insert(job_key(id).as_bytes(), runner_id)?;
Ok(())
Ok(()) as Result<()>
})
.await?)
}
fn delete_job(&mut self, id: u64) -> Result<()> {
self.jobinfo.remove(&job_key(id))?;
self.queue.remove(&job_key(id))?;
async fn delete_job(&self, id: u64) -> Result<()> {
let this = self.clone();
if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? {
self.running.remove(&runner_key(runner_id))?;
Ok(run(move || {
this.jobinfo.remove(&job_key(id))?;
this.queue.remove(&job_key(id))?;
if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? {
this.running.remove(&runner_key(runner_id))?;
}
Ok(())
Ok(()) as Result<()>
})
.await?)
}
fn get_stats(&self) -> Result<Stats> {
Ok(self.stats.get("stats")?.unwrap_or(Stats::default()))
async fn get_stats(&self) -> Result<Stats> {
let this = self.clone();
Ok(
run(move || Ok(this.stats.get("stats")?.unwrap_or(Stats::default())) as Result<Stats>)
.await?,
)
}
fn update_stats<F>(&mut self, f: F) -> Result<()>
async fn update_stats<F>(&self, f: F) -> Result<()>
where
F: Fn(Stats) -> Stats,
F: Fn(Stats) -> Stats + Send + 'static,
{
self.stats.fetch_and_update("stats", |opt| {
let this = self.clone();
Ok(run(move || {
this.stats.fetch_and_update("stats", move |opt| {
let stats = match opt {
Some(stats) => stats,
None => Stats::default(),
@ -119,7 +180,9 @@ impl Storage for SledStorage {
Some((f)(stats))
})?;
Ok(())
Ok(()) as Result<()>
})
.await?)
}
}
@ -137,9 +200,9 @@ impl SledStorage {
})
}
fn lock_queue<T, F>(&self, queue: &str, f: F) -> Result<T>
fn lock_queue<T, F>(&self, queue: &str, f: F) -> sled_extensions::Result<T>
where
F: Fn() -> Result<T>,
F: Fn() -> sled_extensions::Result<T>,
{
let id = self.db.generate_id()?;
@ -168,3 +231,22 @@ fn job_key(id: u64) -> String {
fn runner_key(runner_id: u64) -> String {
format!("runner-{}", runner_id)
}
impl<T> From<BlockingError<T>> for Error
where
Error: From<T>,
T: std::fmt::Debug,
{
fn from(e: BlockingError<T>) -> Self {
match e {
BlockingError::Error(e) => e.into(),
BlockingError::Canceled => Error::Canceled,
}
}
}
impl From<sled_extensions::Error> for Error {
fn from(e: sled_extensions::Error) -> Self {
Error::Sled(e)
}
}

View file

@ -211,7 +211,7 @@ pub use background_jobs_core::{
};
#[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{Every, QueueHandle, ServerConfig, WorkerConfig};
pub use background_jobs_actix::{create_server, QueueHandle, WorkerConfig};
#[cfg(feature = "background-jobs-sled-storage")]
pub mod sled_storage {