mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-12 13:21:07 +00:00
Hide generics behind Box<dyn Trait>
This commit is contained in:
parent
dfba0cf7f2
commit
6cd5344b7b
15 changed files with 249 additions and 234 deletions
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs"
|
||||
description = "Background Jobs implemented with sled, actix, and futures"
|
||||
version = "0.5.1"
|
||||
version = "0.6.0"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
@ -25,11 +25,11 @@ version = "0.5"
|
|||
path = "jobs-core"
|
||||
|
||||
[dependencies.background-jobs-actix]
|
||||
version = "0.5"
|
||||
version = "0.6"
|
||||
path = "jobs-actix"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-sled-storage]
|
||||
version = "0.1.1"
|
||||
version = "0.1.3"
|
||||
path = "jobs-sled"
|
||||
optional = true
|
||||
|
|
|
@ -8,7 +8,7 @@ edition = "2018"
|
|||
|
||||
[dependencies]
|
||||
actix = "0.8"
|
||||
background-jobs = { version = "0.5.1", path = "../.." }
|
||||
background-jobs = { version = "0.6.0", path = "../.." }
|
||||
failure = "0.1"
|
||||
futures = "0.1"
|
||||
serde = "1.0"
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
use actix::System;
|
||||
use background_jobs::{ServerConfig, SledStorage, WorkerConfig, Processor, Job, Backoff, MaxRetries};
|
||||
use background_jobs::{
|
||||
Backoff, Job, MaxRetries, Processor, ServerConfig, SledStorage, WorkerConfig,
|
||||
};
|
||||
use failure::Error;
|
||||
use futures::{Future, future::ok};
|
||||
use futures::{future::ok, Future};
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use sled::Db;
|
||||
|
||||
|
@ -27,16 +29,16 @@ fn main() -> Result<(), Error> {
|
|||
let db = Db::start_default("my-sled-db")?;
|
||||
let storage = SledStorage::new(db)?;
|
||||
|
||||
let queue_handle = ServerConfig::new(storage).start();
|
||||
let queue_handle = ServerConfig::new(storage).thread_count(2).start();
|
||||
|
||||
let mut worker_config = WorkerConfig::new(move || MyState::new("My App"));
|
||||
worker_config.register(MyProcessor);
|
||||
worker_config.set_processor_count(DEFAULT_QUEUE, 16);
|
||||
worker_config.start(queue_handle.clone());
|
||||
WorkerConfig::new(move || MyState::new("My App"))
|
||||
.register(MyProcessor)
|
||||
.set_processor_count(DEFAULT_QUEUE, 16)
|
||||
.start(queue_handle.clone());
|
||||
|
||||
queue_handle.queue::<MyProcessor>(MyJob::new(1, 2))?;
|
||||
queue_handle.queue::<MyProcessor>(MyJob::new(3, 4))?;
|
||||
queue_handle.queue::<MyProcessor>(MyJob::new(5, 6))?;
|
||||
queue_handle.queue::<MyProcessor, _>(MyJob::new(1, 2))?;
|
||||
queue_handle.queue::<MyProcessor, _>(MyJob::new(3, 4))?;
|
||||
queue_handle.queue::<MyProcessor, _>(MyJob::new(5, 6))?;
|
||||
|
||||
sys.run()?;
|
||||
Ok(())
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-actix"
|
||||
description = "in-process jobs processor based on Actix"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
@ -15,6 +15,7 @@ chrono = "0.4"
|
|||
failure = "0.1"
|
||||
futures = "0.1"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10.0"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
|
|
|
@ -7,34 +7,64 @@ use futures::Future;
|
|||
|
||||
mod pinger;
|
||||
mod server;
|
||||
mod storage;
|
||||
mod worker;
|
||||
pub use self::{server::Server, worker::LocalWorker};
|
||||
|
||||
use self::{
|
||||
pinger::Pinger,
|
||||
server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob},
|
||||
worker::ProcessJob,
|
||||
storage::{ActixStorage, StorageWrapper},
|
||||
worker::Worker,
|
||||
};
|
||||
|
||||
pub struct ServerConfig<S> {
|
||||
storage: S,
|
||||
threads: usize,
|
||||
}
|
||||
|
||||
impl<S> ServerConfig<S>
|
||||
where
|
||||
S: Storage + Sync + 'static,
|
||||
{
|
||||
/// Create a new ServerConfig
|
||||
pub fn new(storage: S) -> Self {
|
||||
ServerConfig { storage }
|
||||
ServerConfig {
|
||||
storage,
|
||||
threads: num_cpus::get(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start<State>(self) -> QueueHandle<S, State>
|
||||
where
|
||||
State: Clone + 'static,
|
||||
{
|
||||
let ServerConfig { storage } = self;
|
||||
/// Set the number of threads to use for the server.
|
||||
///
|
||||
/// This is not related to the number of workers or the number of worker threads. This is
|
||||
/// purely how many threads will be used to manage access to the job store.
|
||||
///
|
||||
/// By default, this is the number of processor cores available to the application. On systems
|
||||
/// with logical cores (such as Intel hyperthreads), this will be the total number of logical
|
||||
/// cores.
|
||||
///
|
||||
/// In certain cases, it may be beneficial to limit the server process count to 1.
|
||||
///
|
||||
/// When using actix-web, any configuration performed inside `HttpServer::new` closure will
|
||||
/// happen on each thread started by the web server. In order to reduce the number of running
|
||||
/// threads, one job server can be started per web server thread.
|
||||
///
|
||||
/// Another case to use a single server is if your job store has not locking guarantee, and you
|
||||
/// want to enforce that no job can be requested more than once. The default storage
|
||||
/// implementation does provide this guarantee, but other implementations may not.
|
||||
pub fn thread_count(mut self, threads: usize) -> Self {
|
||||
self.threads = threads;
|
||||
self
|
||||
}
|
||||
|
||||
let server = SyncArbiter::start(4, move || Server::new(storage.clone()));
|
||||
/// Spin up the server processes
|
||||
pub fn start(self) -> QueueHandle {
|
||||
let ServerConfig { storage, threads } = self;
|
||||
|
||||
let server = SyncArbiter::start(threads, move || {
|
||||
Server::new(StorageWrapper(storage.clone()))
|
||||
});
|
||||
|
||||
Pinger::new(server.clone()).start();
|
||||
|
||||
|
@ -61,22 +91,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn register<P>(&mut self, processor: P)
|
||||
pub fn register<P>(mut self, processor: P) -> Self
|
||||
where
|
||||
P: Processor<State> + Send + 'static,
|
||||
{
|
||||
self.queues.insert(P::QUEUE.to_owned(), 4);
|
||||
self.processors.register_processor(processor);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn set_processor_count(&mut self, queue: &str, count: u64) {
|
||||
pub fn set_processor_count(mut self, queue: &str, count: u64) -> Self {
|
||||
self.queues.insert(queue.to_owned(), count);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn start<S>(self, queue_handle: QueueHandle<S, State>)
|
||||
where
|
||||
S: Storage + 'static,
|
||||
{
|
||||
pub fn start(self, queue_handle: QueueHandle) {
|
||||
let processors = Arc::new(self.processors);
|
||||
|
||||
self.queues.into_iter().fold(0, |acc, (key, count)| {
|
||||
|
@ -96,22 +125,15 @@ where
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct QueueHandle<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
inner: Addr<Server<S, LocalWorker<S, State>>>,
|
||||
pub struct QueueHandle {
|
||||
inner: Addr<Server>,
|
||||
}
|
||||
|
||||
impl<S, State> QueueHandle<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
pub fn queue<P>(&self, job: P::Job) -> Result<(), Error>
|
||||
impl QueueHandle {
|
||||
pub fn queue<P, State>(&self, job: P::Job) -> Result<(), Error>
|
||||
where
|
||||
P: Processor<State>,
|
||||
State: Clone,
|
||||
{
|
||||
self.inner.do_send(NewJob(P::new_job(job)?));
|
||||
Ok(())
|
||||
|
|
|
@ -1,34 +1,19 @@
|
|||
use actix::{Actor, Addr, AsyncContext, Context};
|
||||
use std::time::Duration;
|
||||
|
||||
use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext};
|
||||
use background_jobs_core::Storage;
|
||||
use crate::{CheckDb, Server};
|
||||
|
||||
use crate::{CheckDb, ProcessJob, Server};
|
||||
|
||||
pub struct Pinger<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
server: Addr<Server<S, W>>,
|
||||
pub struct Pinger {
|
||||
server: Addr<Server>,
|
||||
}
|
||||
|
||||
impl<S, W> Pinger<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
pub fn new(server: Addr<Server<S, W>>) -> Self {
|
||||
impl Pinger {
|
||||
pub fn new(server: Addr<Server>) -> Self {
|
||||
Pinger { server }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> Actor for Pinger<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
Server<S, W>: Actor<Context = SyncContext<Server<S, W>>> + Handler<CheckDb>,
|
||||
{
|
||||
impl Actor for Pinger {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
|
|
|
@ -1,12 +1,30 @@
|
|||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use actix::{Actor, Addr, Context, Handler, Message, SyncContext};
|
||||
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
|
||||
use actix::{Actor, Handler, Message, SyncContext};
|
||||
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats};
|
||||
use failure::Error;
|
||||
use log::trace;
|
||||
use serde_derive::Deserialize;
|
||||
|
||||
use crate::ProcessJob;
|
||||
use crate::{ActixStorage, Worker};
|
||||
|
||||
pub struct Server {
|
||||
storage: Box<dyn ActixStorage + Send>,
|
||||
cache: HashMap<String, VecDeque<Box<dyn Worker + Send>>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub(crate) fn new(storage: impl ActixStorage + Send + 'static) -> Self {
|
||||
Server {
|
||||
storage: Box::new(storage),
|
||||
cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Server {
|
||||
type Context = SyncContext<Self>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct NewJob(pub(crate) NewJobInfo);
|
||||
|
@ -14,6 +32,12 @@ pub struct NewJob(pub(crate) NewJobInfo);
|
|||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct ReturningJob(pub(crate) ReturnJobInfo);
|
||||
|
||||
pub struct RequestJob(pub(crate) Box<dyn Worker + Send + 'static>);
|
||||
|
||||
pub struct CheckDb;
|
||||
|
||||
pub struct GetStats;
|
||||
|
||||
impl Message for NewJob {
|
||||
type Result = Result<(), Error>;
|
||||
}
|
||||
|
@ -22,82 +46,19 @@ impl Message for ReturningJob {
|
|||
type Result = Result<(), Error>;
|
||||
}
|
||||
|
||||
pub struct RequestJob<W>
|
||||
where
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
worker_id: u64,
|
||||
queue: String,
|
||||
addr: Addr<W>,
|
||||
}
|
||||
|
||||
impl<W> RequestJob<W>
|
||||
where
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
pub fn new(worker_id: u64, queue: &str, addr: Addr<W>) -> Self {
|
||||
RequestJob {
|
||||
worker_id,
|
||||
queue: queue.to_owned(),
|
||||
addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> Message for RequestJob<W>
|
||||
where
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
impl Message for RequestJob {
|
||||
type Result = Result<(), Error>;
|
||||
}
|
||||
|
||||
pub struct CheckDb;
|
||||
|
||||
impl Message for CheckDb {
|
||||
type Result = Result<(), Error>;
|
||||
}
|
||||
|
||||
pub struct GetStats;
|
||||
|
||||
impl Message for GetStats {
|
||||
type Result = Result<Stats, Error>;
|
||||
}
|
||||
|
||||
pub struct Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
storage: S,
|
||||
cache: HashMap<String, VecDeque<RequestJob<W>>>,
|
||||
}
|
||||
|
||||
impl<S, W> Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
pub fn new(storage: S) -> Self {
|
||||
Server {
|
||||
storage,
|
||||
cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> Actor for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
{
|
||||
type Context = SyncContext<Self>;
|
||||
}
|
||||
|
||||
impl<S, W> Handler<NewJob> for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||
{
|
||||
impl Handler<NewJob> for Server {
|
||||
type Result = Result<(), Error>;
|
||||
|
||||
fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result {
|
||||
|
@ -106,16 +67,13 @@ where
|
|||
self.storage.new_job(msg.0)?;
|
||||
|
||||
if ready {
|
||||
let entry = self
|
||||
.cache
|
||||
.entry(queue.clone())
|
||||
.or_insert(VecDeque::new());
|
||||
let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new());
|
||||
|
||||
if let Some(request) = entry.pop_front() {
|
||||
if let Some(job) = self.storage.request_job(&queue, request.worker_id)? {
|
||||
request.addr.do_send(ProcessJob::new(job));
|
||||
if let Some(worker) = entry.pop_front() {
|
||||
if let Some(job) = self.storage.request_job(&queue, worker.id())? {
|
||||
worker.process_job(job);
|
||||
} else {
|
||||
entry.push_back(request);
|
||||
entry.push_back(worker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,11 +82,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, W> Handler<ReturningJob> for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||
{
|
||||
impl Handler<ReturningJob> for Server {
|
||||
type Result = Result<(), Error>;
|
||||
|
||||
fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result {
|
||||
|
@ -136,37 +90,33 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, W> Handler<RequestJob<W>> for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||
{
|
||||
impl Handler<RequestJob> for Server {
|
||||
type Result = Result<(), Error>;
|
||||
|
||||
fn handle(&mut self, msg: RequestJob<W>, _: &mut Self::Context) -> Self::Result {
|
||||
trace!("Worker {} requested job", msg.worker_id);
|
||||
let job = self.storage.request_job(&msg.queue, msg.worker_id)?;
|
||||
fn handle(&mut self, RequestJob(worker): RequestJob, _: &mut Self::Context) -> Self::Result {
|
||||
trace!("Worker {} requested job", worker.id());
|
||||
let job = self.storage.request_job(worker.queue(), worker.id())?;
|
||||
|
||||
if let Some(job) = job {
|
||||
msg.addr.do_send(ProcessJob::new(job.clone()));
|
||||
worker.process_job(job.clone());
|
||||
} else {
|
||||
trace!("storing worker {} for queue {}", msg.worker_id, msg.queue);
|
||||
trace!(
|
||||
"storing worker {} for queue {}",
|
||||
worker.id(),
|
||||
worker.queue()
|
||||
);
|
||||
let entry = self
|
||||
.cache
|
||||
.entry(msg.queue.to_owned())
|
||||
.entry(worker.queue().to_owned())
|
||||
.or_insert(VecDeque::new());
|
||||
entry.push_back(msg);
|
||||
entry.push_back(worker);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, W> Handler<CheckDb> for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||
{
|
||||
impl Handler<CheckDb> for Server {
|
||||
type Result = Result<(), Error>;
|
||||
|
||||
fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result {
|
||||
|
@ -174,11 +124,11 @@ where
|
|||
|
||||
for (queue, workers) in self.cache.iter_mut() {
|
||||
while !workers.is_empty() {
|
||||
if let Some(request) = workers.pop_front() {
|
||||
if let Some(job) = self.storage.request_job(queue, request.worker_id)? {
|
||||
request.addr.do_send(ProcessJob::new(job));
|
||||
if let Some(worker) = workers.pop_front() {
|
||||
if let Some(job) = self.storage.request_job(queue, worker.id())? {
|
||||
worker.process_job(job);
|
||||
} else {
|
||||
workers.push_back(request);
|
||||
workers.push_back(worker);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -189,11 +139,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, W> Handler<GetStats> for Server<S, W>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
W: Actor<Context = Context<W>> + Handler<ProcessJob>,
|
||||
{
|
||||
impl Handler<GetStats> for Server {
|
||||
type Result = Result<Stats, Error>;
|
||||
|
||||
fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result {
|
||||
|
|
39
jobs-actix/src/storage.rs
Normal file
39
jobs-actix/src/storage.rs
Normal file
|
@ -0,0 +1,39 @@
|
|||
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage};
|
||||
use failure::{Error, Fail};
|
||||
|
||||
pub(crate) trait ActixStorage {
|
||||
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Error>;
|
||||
|
||||
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error>;
|
||||
|
||||
fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error>;
|
||||
|
||||
fn get_stats(&self) -> Result<Stats, Error>;
|
||||
}
|
||||
|
||||
pub(crate) struct StorageWrapper<S, E>(pub(crate) S)
|
||||
where
|
||||
S: Storage<Error = E>,
|
||||
E: Fail;
|
||||
|
||||
impl<S, E> ActixStorage for StorageWrapper<S, E>
|
||||
where
|
||||
S: Storage<Error = E>,
|
||||
E: Fail,
|
||||
{
|
||||
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Error> {
|
||||
self.0.new_job(job).map_err(Error::from)
|
||||
}
|
||||
|
||||
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Error> {
|
||||
self.0.request_job(queue, runner_id).map_err(Error::from)
|
||||
}
|
||||
|
||||
fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error> {
|
||||
self.0.return_job(ret).map_err(Error::from)
|
||||
}
|
||||
|
||||
fn get_stats(&self) -> Result<Stats, Error> {
|
||||
self.0.get_stats().map_err(Error::from)
|
||||
}
|
||||
}
|
|
@ -1,49 +1,74 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use actix::{
|
||||
dev::ToEnvelope,
|
||||
fut::{wrap_future, ActorFuture},
|
||||
Actor, Addr, AsyncContext, Context, Handler, Message,
|
||||
};
|
||||
use background_jobs_core::{JobInfo, ProcessorMap, Storage};
|
||||
use background_jobs_core::{JobInfo, ProcessorMap};
|
||||
use log::info;
|
||||
|
||||
use crate::{RequestJob, ReturningJob, Server};
|
||||
use crate::{RequestJob, ReturningJob};
|
||||
|
||||
pub struct ProcessJob {
|
||||
job: JobInfo,
|
||||
pub trait Worker {
|
||||
fn process_job(&self, job: JobInfo);
|
||||
|
||||
fn id(&self) -> u64;
|
||||
|
||||
fn queue(&self) -> &str;
|
||||
}
|
||||
|
||||
impl ProcessJob {
|
||||
pub fn new(job: JobInfo) -> Self {
|
||||
ProcessJob { job }
|
||||
pub struct LocalWorkerHandle<W>
|
||||
where
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
W::Context: ToEnvelope<W, ProcessJob>,
|
||||
{
|
||||
addr: Addr<W>,
|
||||
id: u64,
|
||||
queue: String,
|
||||
}
|
||||
|
||||
impl<W> Worker for LocalWorkerHandle<W>
|
||||
where
|
||||
W: Actor + Handler<ProcessJob>,
|
||||
W::Context: ToEnvelope<W, ProcessJob>,
|
||||
{
|
||||
fn process_job(&self, job: JobInfo) {
|
||||
self.addr.do_send(ProcessJob(job));
|
||||
}
|
||||
}
|
||||
|
||||
impl Message for ProcessJob {
|
||||
type Result = ();
|
||||
fn id(&self) -> u64 {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn queue(&self) -> &str {
|
||||
&self.queue
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LocalWorker<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
||||
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
id: u64,
|
||||
queue: String,
|
||||
processors: Arc<ProcessorMap<State>>,
|
||||
server: Addr<Server<S, LocalWorker<S, State>>>,
|
||||
server: Addr<S>,
|
||||
}
|
||||
|
||||
impl<S, State> LocalWorker<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
||||
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
id: u64,
|
||||
queue: String,
|
||||
processors: Arc<ProcessorMap<State>>,
|
||||
server: Addr<Server<S, Self>>,
|
||||
server: Addr<S>,
|
||||
) -> Self {
|
||||
LocalWorker {
|
||||
id,
|
||||
|
@ -56,32 +81,45 @@ where
|
|||
|
||||
impl<S, State> Actor for LocalWorker<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
||||
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.server
|
||||
.do_send(RequestJob::new(self.id, &self.queue, ctx.address()));
|
||||
self.server.do_send(RequestJob(Box::new(LocalWorkerHandle {
|
||||
id: self.id,
|
||||
queue: self.queue.clone(),
|
||||
addr: ctx.address(),
|
||||
})));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProcessJob(JobInfo);
|
||||
|
||||
impl Message for ProcessJob {
|
||||
type Result = ();
|
||||
}
|
||||
|
||||
impl<S, State> Handler<ProcessJob> for LocalWorker<S, State>
|
||||
where
|
||||
S: Storage + 'static,
|
||||
S: Actor + Handler<ReturningJob> + Handler<RequestJob>,
|
||||
S::Context: ToEnvelope<S, ReturningJob> + ToEnvelope<S, RequestJob>,
|
||||
State: Clone + 'static,
|
||||
{
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: ProcessJob, ctx: &mut Self::Context) -> Self::Result {
|
||||
info!("Worker {} processing job {}", self.id, msg.job.id());
|
||||
fn handle(&mut self, ProcessJob(job): ProcessJob, ctx: &mut Self::Context) -> Self::Result {
|
||||
info!("Worker {} processing job {}", self.id, job.id());
|
||||
let fut =
|
||||
wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| {
|
||||
wrap_future::<_, Self>(self.processors.process_job(job)).map(|job, actor, ctx| {
|
||||
actor.server.do_send(ReturningJob(job));
|
||||
actor
|
||||
.server
|
||||
.do_send(RequestJob::new(actor.id, &actor.queue, ctx.address()));
|
||||
actor.server.do_send(RequestJob(Box::new(LocalWorkerHandle {
|
||||
id: actor.id,
|
||||
queue: actor.queue.clone(),
|
||||
addr: ctx.address(),
|
||||
})));
|
||||
});
|
||||
|
||||
ctx.spawn(fut);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-core"
|
||||
description = "Core types for implementing an asynchronous jobs processor on tokio"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
|
|
@ -78,11 +78,7 @@ pub trait Storage: Clone + Send {
|
|||
Ok(id)
|
||||
}
|
||||
|
||||
fn request_job(
|
||||
&mut self,
|
||||
queue: &str,
|
||||
runner_id: u64,
|
||||
) -> Result<Option<JobInfo>, Self::Error> {
|
||||
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Self::Error> {
|
||||
match self.fetch_job_from_queue(queue)? {
|
||||
Some(mut job) => {
|
||||
if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-sled-storage"
|
||||
description = "Sled storage backend for background-jobs"
|
||||
version = "0.1.2"
|
||||
version = "0.1.3"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use background_jobs_core::{JobInfo, Storage, Stats};
|
||||
use background_jobs_core::{JobInfo, Stats, Storage};
|
||||
use chrono::offset::Utc;
|
||||
|
||||
mod error;
|
||||
|
@ -6,10 +6,7 @@ mod sled_wrappers;
|
|||
|
||||
pub use error::Error;
|
||||
|
||||
use self::{
|
||||
error::Result,
|
||||
sled_wrappers::Tree,
|
||||
};
|
||||
use self::{error::Result, sled_wrappers::Tree};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SledStorage {
|
||||
|
@ -128,7 +125,7 @@ impl SledStorage {
|
|||
{
|
||||
let id = self.db.generate_id()?;
|
||||
|
||||
let mut prev;
|
||||
let mut prev;
|
||||
while {
|
||||
prev = self.lock.fetch_and_update(queue, move |opt| match opt {
|
||||
Some(_) => opt,
|
||||
|
@ -160,4 +157,3 @@ where
|
|||
{
|
||||
db.open_tree(name).map(Tree::new)
|
||||
}
|
||||
|
||||
|
|
|
@ -19,14 +19,12 @@ where
|
|||
|
||||
pub(crate) fn get<K>(&self, key: K) -> Result<Option<T>>
|
||||
where
|
||||
K: AsRef<[u8]>
|
||||
K: AsRef<[u8]>,
|
||||
{
|
||||
match self.0.get(key)? {
|
||||
Some(vec) => {
|
||||
serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some)
|
||||
},
|
||||
Some(vec) => serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -39,11 +37,9 @@ where
|
|||
|
||||
pub(crate) fn del(&self, key: &str) -> Result<Option<T>> {
|
||||
match self.0.del(key)? {
|
||||
Some(vec) => {
|
||||
serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some)
|
||||
},
|
||||
Some(vec) => serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -55,29 +51,23 @@ where
|
|||
let final_opt = self.0.fetch_and_update(key, |opt| {
|
||||
let new_opt = match opt {
|
||||
Some(vec) => {
|
||||
let t = serde_json::from_slice(&vec)
|
||||
.map(Some)
|
||||
.unwrap_or(None);
|
||||
let t = serde_json::from_slice(&vec).map(Some).unwrap_or(None);
|
||||
|
||||
(f)(t)
|
||||
},
|
||||
}
|
||||
None => (f)(None),
|
||||
};
|
||||
|
||||
match new_opt {
|
||||
Some(t) => serde_json::to_vec(&t)
|
||||
.map(Some)
|
||||
.unwrap_or(None),
|
||||
Some(t) => serde_json::to_vec(&t).map(Some).unwrap_or(None),
|
||||
None => None,
|
||||
}
|
||||
})?;
|
||||
|
||||
match final_opt {
|
||||
Some(vec) => {
|
||||
serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some)
|
||||
},
|
||||
Some(vec) => serde_json::from_slice(&vec)
|
||||
.map_err(|_| Error::Deserialize)
|
||||
.map(Some),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +83,7 @@ impl<'a, T> Iter<'a, T> {
|
|||
|
||||
impl<'a, T> Iterator for Iter<'a, T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
type Item = Result<(Vec<u8>, T)>;
|
||||
|
||||
|
@ -110,7 +100,7 @@ where
|
|||
|
||||
impl<'a, T> DoubleEndedIterator for Iter<'a, T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
fn next_back(&mut self) -> Option<Self::Item> {
|
||||
self.0.next_back().map(|res| {
|
||||
|
|
|
@ -203,4 +203,4 @@ pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Sta
|
|||
pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};
|
||||
|
||||
#[cfg(feature = "background-jobs-sled-storage")]
|
||||
pub use background_jobs_sled_storage::{SledStorage, Error as SledStorageError};
|
||||
pub use background_jobs_sled_storage::{Error as SledStorageError, SledStorage};
|
||||
|
|
Loading…
Reference in a new issue