diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 7d999ac..fa88cae 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -11,6 +11,6 @@ actix = "0.8" background-jobs = { version = "0.6.0", path = "../.." } failure = "0.1" futures = "0.1" +sled-extensions = { version = "0.1.0", git = "https://git.asonix.dog/asonix/sled-extensions" } serde = "1.0" serde_derive = "1.0" -sled = "0.24" diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 04a0e9a..5c26657 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -31,9 +31,9 @@ fn main() -> Result<(), Error> { /* // Optionally, a storage backend using the Sled database is provided - use sled::Db; use background_jobs::sled_storage::Storage; - let db = Db::start_default("my-sled-db")?; + use sled_extensions::Db; + let db = Db::open("my-sled-db")?; let storage = Storage::new(db)?; */ diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 8aef3f8..98188d4 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -66,7 +66,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// #[derive(Clone)] /// struct MyProcessor; /// -/// impl Processor<()> for MyProcessor { +/// impl Processor for MyProcessor { /// type Job = MyJob; /// /// const NAME: &'static str = "IncrementProcessor"; diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 6f69e8d..7b0f302 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -13,7 +13,4 @@ edition = "2018" [dependencies] background-jobs-core = { version = "0.6", path = "../jobs-core" } chrono = "0.4" -failure = "0.1" -sled = "0.26" -serde = "1.0" -serde_json = "1.0" +sled-extensions = { version = "0.1", git = "https://git.asonix.dog/asonix/sled-extensions", features = ["bincode", "cbor"] } diff --git a/jobs-sled/src/error.rs b/jobs-sled/src/error.rs deleted file mode 100644 index d320058..0000000 --- a/jobs-sled/src/error.rs +++ /dev/null @@ -1,21 +0,0 @@ -use failure::Fail; - -pub type Result = std::result::Result; - -#[derive(Clone, Debug, Fail)] -pub enum Error { - #[fail(display = "Error in database: {}", _0)] - Sled(#[cause] sled::Error), - - #[fail(display = "Failed to deserialize data")] - Deserialize, - - #[fail(display = "Failed to serialize data")] - Serialize, -} - -impl From for Error { - fn from(e: sled::Error) -> Self { - Error::Sled(e) - } -} diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 9cd467a..d66b020 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -1,37 +1,37 @@ use background_jobs_core::{JobInfo, Stats, Storage}; use chrono::offset::Utc; +use sled_extensions::{BincodeTree, CborTree, Db}; -mod error; -mod sled_wrappers; +pub use sled_extensions::Error; -pub use error::Error; - -use self::{error::Result, sled_wrappers::Tree}; +type Result = std::result::Result; #[derive(Clone)] pub struct SledStorage { - jobinfo: Tree, - running: Tree, - running_inverse: Tree, - queue: Tree, - stats: Tree, - lock: Tree, - db: sled::Db, + jobinfo: CborTree, + running: BincodeTree, + running_inverse: BincodeTree, + queue: BincodeTree, + stats: BincodeTree, + lock: BincodeTree, + db: Db, } impl Storage for SledStorage { type Error = Error; fn generate_id(&mut self) -> Result { - self.db.generate_id().map_err(Error::from) + self.db.generate_id() } fn save_job(&mut self, job: JobInfo) -> Result<()> { - self.jobinfo.insert(&job_key(job.id()), job).map(|_| ()) + self.jobinfo + .insert(job_key(job.id()).as_bytes(), job) + .map(|_| ()) } fn fetch_job(&mut self, id: u64) -> Result> { - self.jobinfo.get(&job_key(id)) + self.jobinfo.get(job_key(id)) } fn fetch_job_from_queue(&mut self, queue: &str) -> Result> { @@ -64,14 +64,15 @@ impl Storage for SledStorage { } self.queue - .insert(&job_key(id), queue.to_owned()) + .insert(job_key(id).as_bytes(), queue.to_owned()) .map(|_| ()) } fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> { - self.queue.remove(&job_key(id))?; - self.running.insert(&runner_key(runner_id), id)?; - self.running_inverse.insert(&job_key(id), runner_id)?; + self.queue.remove(job_key(id))?; + self.running.insert(runner_key(runner_id).as_bytes(), id)?; + self.running_inverse + .insert(job_key(id).as_bytes(), runner_id)?; Ok(()) } @@ -109,14 +110,14 @@ impl Storage for SledStorage { } impl SledStorage { - pub fn new(db: sled::Db) -> Result { + pub fn new(db: Db) -> Result { Ok(SledStorage { - jobinfo: open_tree(&db, "background-jobs-jobinfo")?, - running: open_tree(&db, "background-jobs-running")?, - running_inverse: open_tree(&db, "background-jobs-running-inverse")?, - queue: open_tree(&db, "background-jobs-queue")?, - stats: open_tree(&db, "background-jobs-stats")?, - lock: open_tree(&db, "background-jobs-lock")?, + jobinfo: db.open_cbor_tree("background-jobs-jobinfo")?, + running: db.open_bincode_tree("background-jobs-running")?, + running_inverse: db.open_bincode_tree("background-jobs-running-inverse")?, + queue: db.open_bincode_tree("background-jobs-queue")?, + stats: db.open_bincode_tree("background-jobs-stats")?, + lock: db.open_bincode_tree("background-jobs-lock")?, db, }) } @@ -152,10 +153,3 @@ fn job_key(id: u64) -> String { fn runner_key(runner_id: u64) -> String { format!("runner-{}", runner_id) } - -fn open_tree(db: &sled::Db, name: &str) -> sled::Result> -where - T: serde::de::DeserializeOwned + serde::ser::Serialize, -{ - db.open_tree(name).map(Tree::new) -} diff --git a/jobs-sled/src/sled_wrappers.rs b/jobs-sled/src/sled_wrappers.rs deleted file mode 100644 index 5a4034a..0000000 --- a/jobs-sled/src/sled_wrappers.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::{marker::PhantomData, sync::Arc}; - -use crate::{Error, Result}; - -#[derive(Clone)] -pub struct Tree(Arc, PhantomData); - -impl Tree -where - T: serde::de::DeserializeOwned + serde::ser::Serialize, -{ - pub(crate) fn new(t: Arc) -> Self { - Tree(t, PhantomData) - } - - pub(crate) fn iter(&self) -> Iter { - Iter::new(self.0.iter()) - } - - pub(crate) fn get(&self, key: K) -> Result> - where - K: AsRef<[u8]>, - { - match self.0.get(key)? { - Some(vec) => serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some), - None => Ok(None), - } - } - - pub(crate) fn insert(&self, key: &str, value: T) -> Result> { - let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?; - - Ok(self.0.insert(key, vec)?.map(move |_| value)) - } - - pub(crate) fn remove(&self, key: &str) -> Result> { - match self.0.remove(key)? { - Some(vec) => serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some), - None => Ok(None), - } - } - - pub(crate) fn fetch_and_update(&self, key: &str, f: F) -> Result> - where - F: Fn(Option) -> Option, - { - let final_opt = self.0.fetch_and_update(key, |opt| { - let new_opt = match opt { - Some(vec) => { - let t = serde_json::from_slice(&vec).map(Some).unwrap_or(None); - - (f)(t) - } - None => (f)(None), - }; - - match new_opt { - Some(t) => serde_json::to_vec(&t).map(Some).unwrap_or(None), - None => None, - } - })?; - - match final_opt { - Some(vec) => serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some), - None => Ok(None), - } - } -} - -pub(crate) struct Iter<'a, T>(sled::Iter<'a>, PhantomData); - -impl<'a, T> Iter<'a, T> { - fn new(i: sled::Iter<'a>) -> Self { - Iter(i, PhantomData) - } -} - -impl<'a, T> Iterator for Iter<'a, T> -where - T: serde::de::DeserializeOwned, -{ - type Item = Result<(sled::IVec, T)>; - - fn next(&mut self) -> Option { - self.0.next().map(|res| { - res.map_err(Error::from).and_then(|(k, v)| { - serde_json::from_slice(&v) - .map(|item| (k, item)) - .map_err(|_| Error::Deserialize) - }) - }) - } -} - -impl<'a, T> DoubleEndedIterator for Iter<'a, T> -where - T: serde::de::DeserializeOwned, -{ - fn next_back(&mut self) -> Option { - self.0.next_back().map(|res| { - res.map_err(Error::from).and_then(|(k, v)| { - serde_json::from_slice(&v) - .map(|item| (k, item)) - .map_err(|_| Error::Deserialize) - }) - }) - } -}