From df932f669968c7363cd4a54a55a7598f41a6dd22 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 25 May 2019 15:32:14 -0500 Subject: [PATCH] Organize sled --- jobs-sled/src/error.rs | 21 ++++ jobs-sled/src/lib.rs | 169 ++++----------------------------- jobs-sled/src/sled_wrappers.rs | 124 ++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 151 deletions(-) create mode 100644 jobs-sled/src/error.rs create mode 100644 jobs-sled/src/sled_wrappers.rs diff --git a/jobs-sled/src/error.rs b/jobs-sled/src/error.rs new file mode 100644 index 0000000..d320058 --- /dev/null +++ b/jobs-sled/src/error.rs @@ -0,0 +1,21 @@ +use failure::Fail; + +pub type Result = std::result::Result; + +#[derive(Clone, Debug, Fail)] +pub enum Error { + #[fail(display = "Error in database: {}", _0)] + Sled(#[cause] sled::Error), + + #[fail(display = "Failed to deserialize data")] + Deserialize, + + #[fail(display = "Failed to serialize data")] + Serialize, +} + +impl From for Error { + fn from(e: sled::Error) -> Self { + Error::Sled(e) + } +} diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index e121781..183a66f 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -1,6 +1,14 @@ use background_jobs_core::{JobInfo, Storage, Stats}; -use failure::Fail; -use std::{marker::PhantomData, sync::Arc}; + +mod error; +mod sled_wrappers; + +pub use error::Error; + +use self::{ + error::Result, + sled_wrappers::Tree, +}; #[derive(Clone)] pub struct SledStorage { @@ -88,14 +96,6 @@ impl Storage for SledStorage { } } -fn job_key(id: u64) -> String { - format!("job-{}", id) -} - -fn runner_key(runner_id: u64) -> String { - format!("runner-{}", runner_id) -} - impl SledStorage { pub fn new(db: sled::Db) -> Result { Ok(SledStorage { @@ -109,6 +109,14 @@ impl SledStorage { } } +fn job_key(id: u64) -> String { + format!("job-{}", id) +} + +fn runner_key(runner_id: u64) -> String { + format!("runner-{}", runner_id) +} + fn open_tree(db: &sled::Db, name: &str) -> sled::Result> where T: serde::de::DeserializeOwned + serde::ser::Serialize, @@ -116,144 +124,3 @@ where db.open_tree(name).map(Tree::new) } - -#[derive(Clone)] -struct Tree(Arc, PhantomData); - -impl Tree -where - T: serde::de::DeserializeOwned + serde::ser::Serialize, -{ - fn new(t: Arc) -> Self { - Tree(t, PhantomData) - } - - fn iter(&self) -> Iter { - Iter::new(self.0.iter()) - } - - fn get(&self, key: K) -> Result> - where - K: AsRef<[u8]> - { - match self.0.get(key)? { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, - None => Ok(None), - } - } - - fn set(&self, key: &str, value: T) -> Result> { - let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?; - - Ok(self.0.set(key, vec)?.map(move |_| value)) - } - - fn del(&self, key: &str) -> Result> { - match self.0.del(key)? { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, - None => Ok(None), - } - } - - fn fetch_and_update(&self, key: &str, f: F) -> Result> - where - F: Fn(Option) -> Option, - { - let final_opt = self.0.fetch_and_update(key, |opt| { - let new_opt = match opt { - Some(vec) => { - let t = serde_json::from_slice(&vec) - .map(Some) - .unwrap_or(None); - - (f)(t) - }, - None => (f)(None), - }; - - match new_opt { - Some(t) => serde_json::to_vec(&t) - .map(Some) - .unwrap_or(None), - None => None, - } - })?; - - match final_opt { - Some(vec) => { - serde_json::from_slice(&vec) - .map_err(|_| Error::Deserialize) - .map(Some) - }, - None => Ok(None), - } - } -} - -struct Iter<'a, T>(sled::Iter<'a>, PhantomData); - -impl<'a, T> Iter<'a, T> { - fn new(i: sled::Iter<'a>) -> Self { - Iter(i, PhantomData) - } -} - -#[derive(Clone, Debug, Fail)] -pub enum Error { - #[fail(display = "Error in database: {}", _0)] - Sled(#[cause] sled::Error), - - #[fail(display = "Failed to deserialize data")] - Deserialize, - - #[fail(display = "Failed to serialize data")] - Serialize, -} - -type Result = std::result::Result; - -impl<'a, T> Iterator for Iter<'a, T> -where - T: serde::de::DeserializeOwned -{ - type Item = Result<(Vec, T)>; - - fn next(&mut self) -> Option { - self.0.next().map(|res| { - res.map_err(Error::from).and_then(|(k, v)| { - serde_json::from_slice(&v) - .map(|item| (k, item)) - .map_err(|_| Error::Deserialize) - }) - }) - } -} - -impl<'a, T> DoubleEndedIterator for Iter<'a, T> -where - T: serde::de::DeserializeOwned -{ - fn next_back(&mut self) -> Option { - self.0.next_back().map(|res| { - res.map_err(Error::from).and_then(|(k, v)| { - serde_json::from_slice(&v) - .map(|item| (k, item)) - .map_err(|_| Error::Deserialize) - }) - }) - } -} - -impl From for Error { - fn from(e: sled::Error) -> Self { - Error::Sled(e) - } -} diff --git a/jobs-sled/src/sled_wrappers.rs b/jobs-sled/src/sled_wrappers.rs new file mode 100644 index 0000000..e3190a4 --- /dev/null +++ b/jobs-sled/src/sled_wrappers.rs @@ -0,0 +1,124 @@ +use std::{marker::PhantomData, sync::Arc}; + +use crate::{Error, Result}; + +#[derive(Clone)] +pub struct Tree(Arc, PhantomData); + +impl Tree +where + T: serde::de::DeserializeOwned + serde::ser::Serialize, +{ + pub(crate) fn new(t: Arc) -> Self { + Tree(t, PhantomData) + } + + pub(crate) fn iter(&self) -> Iter { + Iter::new(self.0.iter()) + } + + pub(crate) fn get(&self, key: K) -> Result> + where + K: AsRef<[u8]> + { + match self.0.get(key)? { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } + + pub(crate) fn set(&self, key: &str, value: T) -> Result> { + let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?; + + Ok(self.0.set(key, vec)?.map(move |_| value)) + } + + pub(crate) fn del(&self, key: &str) -> Result> { + match self.0.del(key)? { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } + + pub(crate) fn fetch_and_update(&self, key: &str, f: F) -> Result> + where + F: Fn(Option) -> Option, + { + let final_opt = self.0.fetch_and_update(key, |opt| { + let new_opt = match opt { + Some(vec) => { + let t = serde_json::from_slice(&vec) + .map(Some) + .unwrap_or(None); + + (f)(t) + }, + None => (f)(None), + }; + + match new_opt { + Some(t) => serde_json::to_vec(&t) + .map(Some) + .unwrap_or(None), + None => None, + } + })?; + + match final_opt { + Some(vec) => { + serde_json::from_slice(&vec) + .map_err(|_| Error::Deserialize) + .map(Some) + }, + None => Ok(None), + } + } +} + +pub(crate) struct Iter<'a, T>(sled::Iter<'a>, PhantomData); + +impl<'a, T> Iter<'a, T> { + fn new(i: sled::Iter<'a>) -> Self { + Iter(i, PhantomData) + } +} + +impl<'a, T> Iterator for Iter<'a, T> +where + T: serde::de::DeserializeOwned +{ + type Item = Result<(Vec, T)>; + + fn next(&mut self) -> Option { + self.0.next().map(|res| { + res.map_err(Error::from).and_then(|(k, v)| { + serde_json::from_slice(&v) + .map(|item| (k, item)) + .map_err(|_| Error::Deserialize) + }) + }) + } +} + +impl<'a, T> DoubleEndedIterator for Iter<'a, T> +where + T: serde::de::DeserializeOwned +{ + fn next_back(&mut self) -> Option { + self.0.next_back().map(|res| { + res.map_err(Error::from).and_then(|(k, v)| { + serde_json::from_slice(&v) + .map(|item| (k, item)) + .map_err(|_| Error::Deserialize) + }) + }) + } +}