From 6cd5344b7bce4df059907de98eecb05f3cad1d14 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 27 May 2019 12:29:11 -0500 Subject: [PATCH] Hide generics behind Box --- Cargo.toml | 6 +- examples/actix-example/Cargo.toml | 2 +- examples/actix-example/src/main.rs | 22 ++-- jobs-actix/Cargo.toml | 3 +- jobs-actix/src/lib.rs | 74 ++++++++----- jobs-actix/src/pinger.rs | 29 ++---- jobs-actix/src/server.rs | 160 ++++++++++------------------- jobs-actix/src/storage.rs | 39 +++++++ jobs-actix/src/worker.rs | 86 +++++++++++----- jobs-core/Cargo.toml | 2 +- jobs-core/src/storage.rs | 6 +- jobs-sled/Cargo.toml | 2 +- jobs-sled/src/lib.rs | 10 +- jobs-sled/src/sled_wrappers.rs | 40 +++----- src/lib.rs | 2 +- 15 files changed, 249 insertions(+), 234 deletions(-) create mode 100644 jobs-actix/src/storage.rs diff --git a/Cargo.toml b/Cargo.toml index 6744002..817e44e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with sled, actix, and futures" -version = "0.5.1" +version = "0.6.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -25,11 +25,11 @@ version = "0.5" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.5" +version = "0.6" path = "jobs-actix" optional = true [dependencies.background-jobs-sled-storage] -version = "0.1.1" +version = "0.1.3" path = "jobs-sled" optional = true diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 71a3aea..7d999ac 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] actix = "0.8" -background-jobs = { version = "0.5.1", path = "../.." } +background-jobs = { version = "0.6.0", path = "../.." } failure = "0.1" futures = "0.1" serde = "1.0" diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 8ae165a..942fe8e 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,7 +1,9 @@ use actix::System; -use background_jobs::{ServerConfig, SledStorage, WorkerConfig, Processor, Job, Backoff, MaxRetries}; +use background_jobs::{ + Backoff, Job, MaxRetries, Processor, ServerConfig, SledStorage, WorkerConfig, +}; use failure::Error; -use futures::{Future, future::ok}; +use futures::{future::ok, Future}; use serde_derive::{Deserialize, Serialize}; use sled::Db; @@ -27,16 +29,16 @@ fn main() -> Result<(), Error> { let db = Db::start_default("my-sled-db")?; let storage = SledStorage::new(db)?; - let queue_handle = ServerConfig::new(storage).start(); + let queue_handle = ServerConfig::new(storage).thread_count(2).start(); - let mut worker_config = WorkerConfig::new(move || MyState::new("My App")); - worker_config.register(MyProcessor); - worker_config.set_processor_count(DEFAULT_QUEUE, 16); - worker_config.start(queue_handle.clone()); + WorkerConfig::new(move || MyState::new("My App")) + .register(MyProcessor) + .set_processor_count(DEFAULT_QUEUE, 16) + .start(queue_handle.clone()); - queue_handle.queue::(MyJob::new(1, 2))?; - queue_handle.queue::(MyJob::new(3, 4))?; - queue_handle.queue::(MyJob::new(5, 6))?; + queue_handle.queue::(MyJob::new(1, 2))?; + queue_handle.queue::(MyJob::new(3, 4))?; + queue_handle.queue::(MyJob::new(5, 6))?; sys.run()?; Ok(()) diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 771990a..e2953f0 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.5.0" +version = "0.6.0" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -15,6 +15,7 @@ chrono = "0.4" failure = "0.1" futures = "0.1" log = "0.4" +num_cpus = "1.10.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 657da9e..c75fd54 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -7,34 +7,64 @@ use futures::Future; mod pinger; mod server; +mod storage; mod worker; pub use self::{server::Server, worker::LocalWorker}; use self::{ pinger::Pinger, server::{CheckDb, GetStats, NewJob, RequestJob, ReturningJob}, - worker::ProcessJob, + storage::{ActixStorage, StorageWrapper}, + worker::Worker, }; pub struct ServerConfig { storage: S, + threads: usize, } impl ServerConfig where S: Storage + Sync + 'static, { + /// Create a new ServerConfig pub fn new(storage: S) -> Self { - ServerConfig { storage } + ServerConfig { + storage, + threads: num_cpus::get(), + } } - pub fn start(self) -> QueueHandle - where - State: Clone + 'static, - { - let ServerConfig { storage } = self; + /// Set the number of threads to use for the server. + /// + /// This is not related to the number of workers or the number of worker threads. This is + /// purely how many threads will be used to manage access to the job store. + /// + /// By default, this is the number of processor cores available to the application. On systems + /// with logical cores (such as Intel hyperthreads), this will be the total number of logical + /// cores. + /// + /// In certain cases, it may be beneficial to limit the server process count to 1. + /// + /// When using actix-web, any configuration performed inside `HttpServer::new` closure will + /// happen on each thread started by the web server. In order to reduce the number of running + /// threads, one job server can be started per web server thread. + /// + /// Another case to use a single server is if your job store has not locking guarantee, and you + /// want to enforce that no job can be requested more than once. The default storage + /// implementation does provide this guarantee, but other implementations may not. + pub fn thread_count(mut self, threads: usize) -> Self { + self.threads = threads; + self + } - let server = SyncArbiter::start(4, move || Server::new(storage.clone())); + /// Spin up the server processes + pub fn start(self) -> QueueHandle { + let ServerConfig { storage, threads } = self; + + let server = SyncArbiter::start(threads, move || { + Server::new(StorageWrapper(storage.clone())) + }); Pinger::new(server.clone()).start(); @@ -61,22 +91,21 @@ where } } - pub fn register

(&mut self, processor: P) + pub fn register

(mut self, processor: P) -> Self where P: Processor + Send + 'static, { self.queues.insert(P::QUEUE.to_owned(), 4); self.processors.register_processor(processor); + self } - pub fn set_processor_count(&mut self, queue: &str, count: u64) { + pub fn set_processor_count(mut self, queue: &str, count: u64) -> Self { self.queues.insert(queue.to_owned(), count); + self } - pub fn start(self, queue_handle: QueueHandle) - where - S: Storage + 'static, - { + pub fn start(self, queue_handle: QueueHandle) { let processors = Arc::new(self.processors); self.queues.into_iter().fold(0, |acc, (key, count)| { @@ -96,22 +125,15 @@ where } #[derive(Clone)] -pub struct QueueHandle -where - S: Storage + 'static, - State: Clone + 'static, -{ - inner: Addr>>, +pub struct QueueHandle { + inner: Addr, } -impl QueueHandle -where - S: Storage + 'static, - State: Clone + 'static, -{ - pub fn queue

(&self, job: P::Job) -> Result<(), Error> +impl QueueHandle { + pub fn queue(&self, job: P::Job) -> Result<(), Error> where P: Processor, + State: Clone, { self.inner.do_send(NewJob(P::new_job(job)?)); Ok(()) diff --git a/jobs-actix/src/pinger.rs b/jobs-actix/src/pinger.rs index 78a188c..8ad8911 100644 --- a/jobs-actix/src/pinger.rs +++ b/jobs-actix/src/pinger.rs @@ -1,34 +1,19 @@ +use actix::{Actor, Addr, AsyncContext, Context}; use std::time::Duration; -use actix::{Actor, Addr, AsyncContext, Context, Handler, SyncContext}; -use background_jobs_core::Storage; +use crate::{CheckDb, Server}; -use crate::{CheckDb, ProcessJob, Server}; - -pub struct Pinger -where - S: Storage + 'static, - W: Actor + Handler, -{ - server: Addr>, +pub struct Pinger { + server: Addr, } -impl Pinger -where - S: Storage + 'static, - W: Actor + Handler, -{ - pub fn new(server: Addr>) -> Self { +impl Pinger { + pub fn new(server: Addr) -> Self { Pinger { server } } } -impl Actor for Pinger -where - S: Storage + 'static, - W: Actor + Handler, - Server: Actor>> + Handler, -{ +impl Actor for Pinger { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 7f12d3f..7765cfe 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,12 +1,30 @@ use std::collections::{HashMap, VecDeque}; -use actix::{Actor, Addr, Context, Handler, Message, SyncContext}; -use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; +use actix::{Actor, Handler, Message, SyncContext}; +use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats}; use failure::Error; use log::trace; use serde_derive::Deserialize; -use crate::ProcessJob; +use crate::{ActixStorage, Worker}; + +pub struct Server { + storage: Box, + cache: HashMap>>, +} + +impl Server { + pub(crate) fn new(storage: impl ActixStorage + Send + 'static) -> Self { + Server { + storage: Box::new(storage), + cache: HashMap::new(), + } + } +} + +impl Actor for Server { + type Context = SyncContext; +} #[derive(Clone, Debug, Deserialize)] pub struct NewJob(pub(crate) NewJobInfo); @@ -14,6 +32,12 @@ pub struct NewJob(pub(crate) NewJobInfo); #[derive(Clone, Debug, Deserialize)] pub struct ReturningJob(pub(crate) ReturnJobInfo); +pub struct RequestJob(pub(crate) Box); + +pub struct CheckDb; + +pub struct GetStats; + impl Message for NewJob { type Result = Result<(), Error>; } @@ -22,82 +46,19 @@ impl Message for ReturningJob { type Result = Result<(), Error>; } -pub struct RequestJob -where - W: Actor + Handler, -{ - worker_id: u64, - queue: String, - addr: Addr, -} - -impl RequestJob -where - W: Actor + Handler, -{ - pub fn new(worker_id: u64, queue: &str, addr: Addr) -> Self { - RequestJob { - worker_id, - queue: queue.to_owned(), - addr, - } - } -} - -impl Message for RequestJob -where - W: Actor + Handler, -{ +impl Message for RequestJob { type Result = Result<(), Error>; } -pub struct CheckDb; - impl Message for CheckDb { type Result = Result<(), Error>; } -pub struct GetStats; - impl Message for GetStats { type Result = Result; } -pub struct Server -where - S: Storage + 'static, - W: Actor + Handler, -{ - storage: S, - cache: HashMap>>, -} - -impl Server -where - S: Storage + 'static, - W: Actor + Handler, -{ - pub fn new(storage: S) -> Self { - Server { - storage, - cache: HashMap::new(), - } - } -} - -impl Actor for Server -where - S: Storage + 'static, - W: Actor + Handler, -{ - type Context = SyncContext; -} - -impl Handler for Server -where - S: Storage + 'static, - W: Actor> + Handler, -{ +impl Handler for Server { type Result = Result<(), Error>; fn handle(&mut self, msg: NewJob, _: &mut Self::Context) -> Self::Result { @@ -106,16 +67,13 @@ where self.storage.new_job(msg.0)?; if ready { - let entry = self - .cache - .entry(queue.clone()) - .or_insert(VecDeque::new()); + let entry = self.cache.entry(queue.clone()).or_insert(VecDeque::new()); - if let Some(request) = entry.pop_front() { - if let Some(job) = self.storage.request_job(&queue, request.worker_id)? { - request.addr.do_send(ProcessJob::new(job)); + if let Some(worker) = entry.pop_front() { + if let Some(job) = self.storage.request_job(&queue, worker.id())? { + worker.process_job(job); } else { - entry.push_back(request); + entry.push_back(worker); } } } @@ -124,11 +82,7 @@ where } } -impl Handler for Server -where - S: Storage + 'static, - W: Actor> + Handler, -{ +impl Handler for Server { type Result = Result<(), Error>; fn handle(&mut self, msg: ReturningJob, _: &mut Self::Context) -> Self::Result { @@ -136,37 +90,33 @@ where } } -impl Handler> for Server -where - S: Storage + 'static, - W: Actor> + Handler, -{ +impl Handler for Server { type Result = Result<(), Error>; - fn handle(&mut self, msg: RequestJob, _: &mut Self::Context) -> Self::Result { - trace!("Worker {} requested job", msg.worker_id); - let job = self.storage.request_job(&msg.queue, msg.worker_id)?; + fn handle(&mut self, RequestJob(worker): RequestJob, _: &mut Self::Context) -> Self::Result { + trace!("Worker {} requested job", worker.id()); + let job = self.storage.request_job(worker.queue(), worker.id())?; if let Some(job) = job { - msg.addr.do_send(ProcessJob::new(job.clone())); + worker.process_job(job.clone()); } else { - trace!("storing worker {} for queue {}", msg.worker_id, msg.queue); + trace!( + "storing worker {} for queue {}", + worker.id(), + worker.queue() + ); let entry = self .cache - .entry(msg.queue.to_owned()) + .entry(worker.queue().to_owned()) .or_insert(VecDeque::new()); - entry.push_back(msg); + entry.push_back(worker); } Ok(()) } } -impl Handler for Server -where - S: Storage + 'static, - W: Actor> + Handler, -{ +impl Handler for Server { type Result = Result<(), Error>; fn handle(&mut self, _: CheckDb, _: &mut Self::Context) -> Self::Result { @@ -174,11 +124,11 @@ where for (queue, workers) in self.cache.iter_mut() { while !workers.is_empty() { - if let Some(request) = workers.pop_front() { - if let Some(job) = self.storage.request_job(queue, request.worker_id)? { - request.addr.do_send(ProcessJob::new(job)); + if let Some(worker) = workers.pop_front() { + if let Some(job) = self.storage.request_job(queue, worker.id())? { + worker.process_job(job); } else { - workers.push_back(request); + workers.push_back(worker); break; } } @@ -189,11 +139,7 @@ where } } -impl Handler for Server -where - S: Storage + 'static, - W: Actor> + Handler, -{ +impl Handler for Server { type Result = Result; fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result { diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs new file mode 100644 index 0000000..76b39f2 --- /dev/null +++ b/jobs-actix/src/storage.rs @@ -0,0 +1,39 @@ +use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage}; +use failure::{Error, Fail}; + +pub(crate) trait ActixStorage { + fn new_job(&mut self, job: NewJobInfo) -> Result; + + fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Error>; + + fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error>; + + fn get_stats(&self) -> Result; +} + +pub(crate) struct StorageWrapper(pub(crate) S) +where + S: Storage, + E: Fail; + +impl ActixStorage for StorageWrapper +where + S: Storage, + E: Fail, +{ + fn new_job(&mut self, job: NewJobInfo) -> Result { + self.0.new_job(job).map_err(Error::from) + } + + fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Error> { + self.0.request_job(queue, runner_id).map_err(Error::from) + } + + fn return_job(&mut self, ret: ReturnJobInfo) -> Result<(), Error> { + self.0.return_job(ret).map_err(Error::from) + } + + fn get_stats(&self) -> Result { + self.0.get_stats().map_err(Error::from) + } +} diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 7c73247..afcbae3 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,49 +1,74 @@ use std::sync::Arc; use actix::{ + dev::ToEnvelope, fut::{wrap_future, ActorFuture}, Actor, Addr, AsyncContext, Context, Handler, Message, }; -use background_jobs_core::{JobInfo, ProcessorMap, Storage}; +use background_jobs_core::{JobInfo, ProcessorMap}; use log::info; -use crate::{RequestJob, ReturningJob, Server}; +use crate::{RequestJob, ReturningJob}; -pub struct ProcessJob { - job: JobInfo, +pub trait Worker { + fn process_job(&self, job: JobInfo); + + fn id(&self) -> u64; + + fn queue(&self) -> &str; } -impl ProcessJob { - pub fn new(job: JobInfo) -> Self { - ProcessJob { job } +pub struct LocalWorkerHandle +where + W: Actor + Handler, + W::Context: ToEnvelope, +{ + addr: Addr, + id: u64, + queue: String, +} + +impl Worker for LocalWorkerHandle +where + W: Actor + Handler, + W::Context: ToEnvelope, +{ + fn process_job(&self, job: JobInfo) { + self.addr.do_send(ProcessJob(job)); } -} -impl Message for ProcessJob { - type Result = (); + fn id(&self) -> u64 { + self.id + } + + fn queue(&self) -> &str { + &self.queue + } } pub struct LocalWorker where - S: Storage + 'static, + S: Actor + Handler + Handler, + S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { id: u64, queue: String, processors: Arc>, - server: Addr>>, + server: Addr, } impl LocalWorker where - S: Storage + 'static, + S: Actor + Handler + Handler, + S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { pub fn new( id: u64, queue: String, processors: Arc>, - server: Addr>, + server: Addr, ) -> Self { LocalWorker { id, @@ -56,32 +81,45 @@ where impl Actor for LocalWorker where - S: Storage + 'static, + S: Actor + Handler + Handler, + S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - self.server - .do_send(RequestJob::new(self.id, &self.queue, ctx.address())); + self.server.do_send(RequestJob(Box::new(LocalWorkerHandle { + id: self.id, + queue: self.queue.clone(), + addr: ctx.address(), + }))); } } +pub struct ProcessJob(JobInfo); + +impl Message for ProcessJob { + type Result = (); +} + impl Handler for LocalWorker where - S: Storage + 'static, + S: Actor + Handler + Handler, + S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { type Result = (); - fn handle(&mut self, msg: ProcessJob, ctx: &mut Self::Context) -> Self::Result { - info!("Worker {} processing job {}", self.id, msg.job.id()); + fn handle(&mut self, ProcessJob(job): ProcessJob, ctx: &mut Self::Context) -> Self::Result { + info!("Worker {} processing job {}", self.id, job.id()); let fut = - wrap_future::<_, Self>(self.processors.process_job(msg.job)).map(|job, actor, ctx| { + wrap_future::<_, Self>(self.processors.process_job(job)).map(|job, actor, ctx| { actor.server.do_send(ReturningJob(job)); - actor - .server - .do_send(RequestJob::new(actor.id, &actor.queue, ctx.address())); + actor.server.do_send(RequestJob(Box::new(LocalWorkerHandle { + id: actor.id, + queue: actor.queue.clone(), + addr: ctx.address(), + }))); }); ctx.spawn(fut); diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index c7536b3..dbd7103 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.5.0" +version = "0.5.1" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 6d23117..8b45a6f 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -78,11 +78,7 @@ pub trait Storage: Clone + Send { Ok(id) } - fn request_job( - &mut self, - queue: &str, - runner_id: u64, - ) -> Result, Self::Error> { + fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Self::Error> { match self.fetch_job_from_queue(queue)? { Some(mut job) => { if job.is_pending() && job.is_ready(Utc::now()) && job.is_in_queue(queue) { diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 4d39fe7..cd6379e 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-sled-storage" description = "Sled storage backend for background-jobs" -version = "0.1.2" +version = "0.1.3" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 187e21c..176d256 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -1,4 +1,4 @@ -use background_jobs_core::{JobInfo, Storage, Stats}; +use background_jobs_core::{JobInfo, Stats, Storage}; use chrono::offset::Utc; mod error; @@ -6,10 +6,7 @@ mod sled_wrappers; pub use error::Error; -use self::{ - error::Result, - sled_wrappers::Tree, -}; +use self::{error::Result, sled_wrappers::Tree}; #[derive(Clone)] pub struct SledStorage { @@ -128,7 +125,7 @@ impl SledStorage { { let id = self.db.generate_id()?; - let mut prev; + let mut prev; while { prev = self.lock.fetch_and_update(queue, move |opt| match opt { Some(_) => opt, @@ -160,4 +157,3 @@ where { db.open_tree(name).map(Tree::new) } - diff --git a/jobs-sled/src/sled_wrappers.rs b/jobs-sled/src/sled_wrappers.rs index e3190a4..faaec9e 100644 --- a/jobs-sled/src/sled_wrappers.rs +++ b/jobs-sled/src/sled_wrappers.rs @@ -19,14 +19,12 @@ where pub(crate) fn get(&self, key: K) -> Result> where - K: AsRef<[u8]> + K: AsRef<[u8]>, { match self.0.get(key)? { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, + Some(vec) => serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some), None => Ok(None), } } @@ -39,11 +37,9 @@ where pub(crate) fn del(&self, key: &str) -> Result> { match self.0.del(key)? { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, + Some(vec) => serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some), None => Ok(None), } } @@ -55,29 +51,23 @@ where let final_opt = self.0.fetch_and_update(key, |opt| { let new_opt = match opt { Some(vec) => { - let t = serde_json::from_slice(&vec) - .map(Some) - .unwrap_or(None); + let t = serde_json::from_slice(&vec).map(Some).unwrap_or(None); (f)(t) - }, + } None => (f)(None), }; match new_opt { - Some(t) => serde_json::to_vec(&t) - .map(Some) - .unwrap_or(None), + Some(t) => serde_json::to_vec(&t).map(Some).unwrap_or(None), None => None, } })?; match final_opt { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, + Some(vec) => serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some), None => Ok(None), } } @@ -93,7 +83,7 @@ impl<'a, T> Iter<'a, T> { impl<'a, T> Iterator for Iter<'a, T> where - T: serde::de::DeserializeOwned + T: serde::de::DeserializeOwned, { type Item = Result<(Vec, T)>; @@ -110,7 +100,7 @@ where impl<'a, T> DoubleEndedIterator for Iter<'a, T> where - T: serde::de::DeserializeOwned + T: serde::de::DeserializeOwned, { fn next_back(&mut self) -> Option { self.0.next_back().map(|res| { diff --git a/src/lib.rs b/src/lib.rs index a17a060..a8db3c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -203,4 +203,4 @@ pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Sta pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig}; #[cfg(feature = "background-jobs-sled-storage")] -pub use background_jobs_sled_storage::{SledStorage, Error as SledStorageError}; +pub use background_jobs_sled_storage::{Error as SledStorageError, SledStorage};