mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
Organize sled
This commit is contained in:
parent
5b13908886
commit
df932f6699
3 changed files with 163 additions and 151 deletions
21
jobs-sled/src/error.rs
Normal file
21
jobs-sled/src/error.rs
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
use failure::Fail;
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Fail)]
|
||||||
|
pub enum Error {
|
||||||
|
#[fail(display = "Error in database: {}", _0)]
|
||||||
|
Sled(#[cause] sled::Error),
|
||||||
|
|
||||||
|
#[fail(display = "Failed to deserialize data")]
|
||||||
|
Deserialize,
|
||||||
|
|
||||||
|
#[fail(display = "Failed to serialize data")]
|
||||||
|
Serialize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<sled::Error> for Error {
|
||||||
|
fn from(e: sled::Error) -> Self {
|
||||||
|
Error::Sled(e)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,14 @@
|
||||||
use background_jobs_core::{JobInfo, Storage, Stats};
|
use background_jobs_core::{JobInfo, Storage, Stats};
|
||||||
use failure::Fail;
|
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
mod error;
|
||||||
|
mod sled_wrappers;
|
||||||
|
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
|
use self::{
|
||||||
|
error::Result,
|
||||||
|
sled_wrappers::Tree,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SledStorage {
|
pub struct SledStorage {
|
||||||
|
@ -88,14 +96,6 @@ impl Storage for SledStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn job_key(id: u64) -> String {
|
|
||||||
format!("job-{}", id)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn runner_key(runner_id: u64) -> String {
|
|
||||||
format!("runner-{}", runner_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SledStorage {
|
impl SledStorage {
|
||||||
pub fn new(db: sled::Db) -> Result<Self> {
|
pub fn new(db: sled::Db) -> Result<Self> {
|
||||||
Ok(SledStorage {
|
Ok(SledStorage {
|
||||||
|
@ -109,6 +109,14 @@ impl SledStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn job_key(id: u64) -> String {
|
||||||
|
format!("job-{}", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn runner_key(runner_id: u64) -> String {
|
||||||
|
format!("runner-{}", runner_id)
|
||||||
|
}
|
||||||
|
|
||||||
fn open_tree<T>(db: &sled::Db, name: &str) -> sled::Result<Tree<T>>
|
fn open_tree<T>(db: &sled::Db, name: &str) -> sled::Result<Tree<T>>
|
||||||
where
|
where
|
||||||
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
||||||
|
@ -116,144 +124,3 @@ where
|
||||||
db.open_tree(name).map(Tree::new)
|
db.open_tree(name).map(Tree::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct Tree<T>(Arc<sled::Tree>, PhantomData<T>);
|
|
||||||
|
|
||||||
impl<T> Tree<T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
|
||||||
{
|
|
||||||
fn new(t: Arc<sled::Tree>) -> Self {
|
|
||||||
Tree(t, PhantomData)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn iter(&self) -> Iter<T> {
|
|
||||||
Iter::new(self.0.iter())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get<K>(&self, key: K) -> Result<Option<T>>
|
|
||||||
where
|
|
||||||
K: AsRef<[u8]>
|
|
||||||
{
|
|
||||||
match self.0.get(key)? {
|
|
||||||
Some(vec) => {
|
|
||||||
serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some)
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set(&self, key: &str, value: T) -> Result<Option<T>> {
|
|
||||||
let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?;
|
|
||||||
|
|
||||||
Ok(self.0.set(key, vec)?.map(move |_| value))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn del(&self, key: &str) -> Result<Option<T>> {
|
|
||||||
match self.0.del(key)? {
|
|
||||||
Some(vec) => {
|
|
||||||
serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some)
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn fetch_and_update<F>(&self, key: &str, f: F) -> Result<Option<T>>
|
|
||||||
where
|
|
||||||
F: Fn(Option<T>) -> Option<T>,
|
|
||||||
{
|
|
||||||
let final_opt = self.0.fetch_and_update(key, |opt| {
|
|
||||||
let new_opt = match opt {
|
|
||||||
Some(vec) => {
|
|
||||||
let t = serde_json::from_slice(&vec)
|
|
||||||
.map(Some)
|
|
||||||
.unwrap_or(None);
|
|
||||||
|
|
||||||
(f)(t)
|
|
||||||
},
|
|
||||||
None => (f)(None),
|
|
||||||
};
|
|
||||||
|
|
||||||
match new_opt {
|
|
||||||
Some(t) => serde_json::to_vec(&t)
|
|
||||||
.map(Some)
|
|
||||||
.unwrap_or(None),
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
|
|
||||||
match final_opt {
|
|
||||||
Some(vec) => {
|
|
||||||
serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some)
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Iter<'a, T>(sled::Iter<'a>, PhantomData<T>);
|
|
||||||
|
|
||||||
impl<'a, T> Iter<'a, T> {
|
|
||||||
fn new(i: sled::Iter<'a>) -> Self {
|
|
||||||
Iter(i, PhantomData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Fail)]
|
|
||||||
pub enum Error {
|
|
||||||
#[fail(display = "Error in database: {}", _0)]
|
|
||||||
Sled(#[cause] sled::Error),
|
|
||||||
|
|
||||||
#[fail(display = "Failed to deserialize data")]
|
|
||||||
Deserialize,
|
|
||||||
|
|
||||||
#[fail(display = "Failed to serialize data")]
|
|
||||||
Serialize,
|
|
||||||
}
|
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
|
||||||
|
|
||||||
impl<'a, T> Iterator for Iter<'a, T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned
|
|
||||||
{
|
|
||||||
type Item = Result<(Vec<u8>, T)>;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next().map(|res| {
|
|
||||||
res.map_err(Error::from).and_then(|(k, v)| {
|
|
||||||
serde_json::from_slice(&v)
|
|
||||||
.map(|item| (k, item))
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, T> DoubleEndedIterator for Iter<'a, T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned
|
|
||||||
{
|
|
||||||
fn next_back(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next_back().map(|res| {
|
|
||||||
res.map_err(Error::from).and_then(|(k, v)| {
|
|
||||||
serde_json::from_slice(&v)
|
|
||||||
.map(|item| (k, item))
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<sled::Error> for Error {
|
|
||||||
fn from(e: sled::Error) -> Self {
|
|
||||||
Error::Sled(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
124
jobs-sled/src/sled_wrappers.rs
Normal file
124
jobs-sled/src/sled_wrappers.rs
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
use std::{marker::PhantomData, sync::Arc};
|
||||||
|
|
||||||
|
use crate::{Error, Result};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Tree<T>(Arc<sled::Tree>, PhantomData<T>);
|
||||||
|
|
||||||
|
impl<T> Tree<T>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
||||||
|
{
|
||||||
|
pub(crate) fn new(t: Arc<sled::Tree>) -> Self {
|
||||||
|
Tree(t, PhantomData)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn iter(&self) -> Iter<T> {
|
||||||
|
Iter::new(self.0.iter())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get<K>(&self, key: K) -> Result<Option<T>>
|
||||||
|
where
|
||||||
|
K: AsRef<[u8]>
|
||||||
|
{
|
||||||
|
match self.0.get(key)? {
|
||||||
|
Some(vec) => {
|
||||||
|
serde_json::from_slice(&vec)
|
||||||
|
.map_err(|_| Error::Deserialize)
|
||||||
|
.map(Some)
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set(&self, key: &str, value: T) -> Result<Option<T>> {
|
||||||
|
let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?;
|
||||||
|
|
||||||
|
Ok(self.0.set(key, vec)?.map(move |_| value))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn del(&self, key: &str) -> Result<Option<T>> {
|
||||||
|
match self.0.del(key)? {
|
||||||
|
Some(vec) => {
|
||||||
|
serde_json::from_slice(&vec)
|
||||||
|
.map_err(|_| Error::Deserialize)
|
||||||
|
.map(Some)
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn fetch_and_update<F>(&self, key: &str, f: F) -> Result<Option<T>>
|
||||||
|
where
|
||||||
|
F: Fn(Option<T>) -> Option<T>,
|
||||||
|
{
|
||||||
|
let final_opt = self.0.fetch_and_update(key, |opt| {
|
||||||
|
let new_opt = match opt {
|
||||||
|
Some(vec) => {
|
||||||
|
let t = serde_json::from_slice(&vec)
|
||||||
|
.map(Some)
|
||||||
|
.unwrap_or(None);
|
||||||
|
|
||||||
|
(f)(t)
|
||||||
|
},
|
||||||
|
None => (f)(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
match new_opt {
|
||||||
|
Some(t) => serde_json::to_vec(&t)
|
||||||
|
.map(Some)
|
||||||
|
.unwrap_or(None),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
match final_opt {
|
||||||
|
Some(vec) => {
|
||||||
|
serde_json::from_slice(&vec)
|
||||||
|
.map_err(|_| Error::Deserialize)
|
||||||
|
.map(Some)
|
||||||
|
},
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct Iter<'a, T>(sled::Iter<'a>, PhantomData<T>);
|
||||||
|
|
||||||
|
impl<'a, T> Iter<'a, T> {
|
||||||
|
fn new(i: sled::Iter<'a>) -> Self {
|
||||||
|
Iter(i, PhantomData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Iterator for Iter<'a, T>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned
|
||||||
|
{
|
||||||
|
type Item = Result<(Vec<u8>, T)>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.0.next().map(|res| {
|
||||||
|
res.map_err(Error::from).and_then(|(k, v)| {
|
||||||
|
serde_json::from_slice(&v)
|
||||||
|
.map(|item| (k, item))
|
||||||
|
.map_err(|_| Error::Deserialize)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> DoubleEndedIterator for Iter<'a, T>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned
|
||||||
|
{
|
||||||
|
fn next_back(&mut self) -> Option<Self::Item> {
|
||||||
|
self.0.next_back().map(|res| {
|
||||||
|
res.map_err(Error::from).and_then(|(k, v)| {
|
||||||
|
serde_json::from_slice(&v)
|
||||||
|
.map(|item| (k, item))
|
||||||
|
.map_err(|_| Error::Deserialize)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue