mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
Update sled
This commit is contained in:
parent
366a328e9f
commit
9c8a8dcfc9
7 changed files with 32 additions and 176 deletions
|
@ -11,6 +11,6 @@ actix = "0.8"
|
||||||
background-jobs = { version = "0.6.0", path = "../.." }
|
background-jobs = { version = "0.6.0", path = "../.." }
|
||||||
failure = "0.1"
|
failure = "0.1"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
|
sled-extensions = { version = "0.1.0", git = "https://git.asonix.dog/asonix/sled-extensions" }
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
sled = "0.24"
|
|
||||||
|
|
|
@ -31,9 +31,9 @@ fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// Optionally, a storage backend using the Sled database is provided
|
// Optionally, a storage backend using the Sled database is provided
|
||||||
use sled::Db;
|
|
||||||
use background_jobs::sled_storage::Storage;
|
use background_jobs::sled_storage::Storage;
|
||||||
let db = Db::start_default("my-sled-db")?;
|
use sled_extensions::Db;
|
||||||
|
let db = Db::open("my-sled-db")?;
|
||||||
let storage = Storage::new(db)?;
|
let storage = Storage::new(db)?;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
|
||||||
/// #[derive(Clone)]
|
/// #[derive(Clone)]
|
||||||
/// struct MyProcessor;
|
/// struct MyProcessor;
|
||||||
///
|
///
|
||||||
/// impl Processor<()> for MyProcessor {
|
/// impl Processor for MyProcessor {
|
||||||
/// type Job = MyJob;
|
/// type Job = MyJob;
|
||||||
///
|
///
|
||||||
/// const NAME: &'static str = "IncrementProcessor";
|
/// const NAME: &'static str = "IncrementProcessor";
|
||||||
|
|
|
@ -13,7 +13,4 @@ edition = "2018"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
background-jobs-core = { version = "0.6", path = "../jobs-core" }
|
background-jobs-core = { version = "0.6", path = "../jobs-core" }
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
failure = "0.1"
|
sled-extensions = { version = "0.1", git = "https://git.asonix.dog/asonix/sled-extensions", features = ["bincode", "cbor"] }
|
||||||
sled = "0.26"
|
|
||||||
serde = "1.0"
|
|
||||||
serde_json = "1.0"
|
|
||||||
|
|
|
@ -1,21 +0,0 @@
|
||||||
use failure::Fail;
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Fail)]
|
|
||||||
pub enum Error {
|
|
||||||
#[fail(display = "Error in database: {}", _0)]
|
|
||||||
Sled(#[cause] sled::Error),
|
|
||||||
|
|
||||||
#[fail(display = "Failed to deserialize data")]
|
|
||||||
Deserialize,
|
|
||||||
|
|
||||||
#[fail(display = "Failed to serialize data")]
|
|
||||||
Serialize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<sled::Error> for Error {
|
|
||||||
fn from(e: sled::Error) -> Self {
|
|
||||||
Error::Sled(e)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,37 +1,37 @@
|
||||||
use background_jobs_core::{JobInfo, Stats, Storage};
|
use background_jobs_core::{JobInfo, Stats, Storage};
|
||||||
use chrono::offset::Utc;
|
use chrono::offset::Utc;
|
||||||
|
use sled_extensions::{BincodeTree, CborTree, Db};
|
||||||
|
|
||||||
mod error;
|
pub use sled_extensions::Error;
|
||||||
mod sled_wrappers;
|
|
||||||
|
|
||||||
pub use error::Error;
|
type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
use self::{error::Result, sled_wrappers::Tree};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SledStorage {
|
pub struct SledStorage {
|
||||||
jobinfo: Tree<JobInfo>,
|
jobinfo: CborTree<JobInfo>,
|
||||||
running: Tree<u64>,
|
running: BincodeTree<u64>,
|
||||||
running_inverse: Tree<u64>,
|
running_inverse: BincodeTree<u64>,
|
||||||
queue: Tree<String>,
|
queue: BincodeTree<String>,
|
||||||
stats: Tree<Stats>,
|
stats: BincodeTree<Stats>,
|
||||||
lock: Tree<u64>,
|
lock: BincodeTree<u64>,
|
||||||
db: sled::Db,
|
db: Db,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage for SledStorage {
|
impl Storage for SledStorage {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn generate_id(&mut self) -> Result<u64> {
|
fn generate_id(&mut self) -> Result<u64> {
|
||||||
self.db.generate_id().map_err(Error::from)
|
self.db.generate_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_job(&mut self, job: JobInfo) -> Result<()> {
|
fn save_job(&mut self, job: JobInfo) -> Result<()> {
|
||||||
self.jobinfo.insert(&job_key(job.id()), job).map(|_| ())
|
self.jobinfo
|
||||||
|
.insert(job_key(job.id()).as_bytes(), job)
|
||||||
|
.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>> {
|
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>> {
|
||||||
self.jobinfo.get(&job_key(id))
|
self.jobinfo.get(job_key(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
|
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
|
||||||
|
@ -64,14 +64,15 @@ impl Storage for SledStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.queue
|
self.queue
|
||||||
.insert(&job_key(id), queue.to_owned())
|
.insert(job_key(id).as_bytes(), queue.to_owned())
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> {
|
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> {
|
||||||
self.queue.remove(&job_key(id))?;
|
self.queue.remove(job_key(id))?;
|
||||||
self.running.insert(&runner_key(runner_id), id)?;
|
self.running.insert(runner_key(runner_id).as_bytes(), id)?;
|
||||||
self.running_inverse.insert(&job_key(id), runner_id)?;
|
self.running_inverse
|
||||||
|
.insert(job_key(id).as_bytes(), runner_id)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -109,14 +110,14 @@ impl Storage for SledStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SledStorage {
|
impl SledStorage {
|
||||||
pub fn new(db: sled::Db) -> Result<Self> {
|
pub fn new(db: Db) -> Result<Self> {
|
||||||
Ok(SledStorage {
|
Ok(SledStorage {
|
||||||
jobinfo: open_tree(&db, "background-jobs-jobinfo")?,
|
jobinfo: db.open_cbor_tree("background-jobs-jobinfo")?,
|
||||||
running: open_tree(&db, "background-jobs-running")?,
|
running: db.open_bincode_tree("background-jobs-running")?,
|
||||||
running_inverse: open_tree(&db, "background-jobs-running-inverse")?,
|
running_inverse: db.open_bincode_tree("background-jobs-running-inverse")?,
|
||||||
queue: open_tree(&db, "background-jobs-queue")?,
|
queue: db.open_bincode_tree("background-jobs-queue")?,
|
||||||
stats: open_tree(&db, "background-jobs-stats")?,
|
stats: db.open_bincode_tree("background-jobs-stats")?,
|
||||||
lock: open_tree(&db, "background-jobs-lock")?,
|
lock: db.open_bincode_tree("background-jobs-lock")?,
|
||||||
db,
|
db,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -152,10 +153,3 @@ fn job_key(id: u64) -> String {
|
||||||
fn runner_key(runner_id: u64) -> String {
|
fn runner_key(runner_id: u64) -> String {
|
||||||
format!("runner-{}", runner_id)
|
format!("runner-{}", runner_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn open_tree<T>(db: &sled::Db, name: &str) -> sled::Result<Tree<T>>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
|
||||||
{
|
|
||||||
db.open_tree(name).map(Tree::new)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,114 +0,0 @@
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
|
||||||
|
|
||||||
use crate::{Error, Result};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Tree<T>(Arc<sled::Tree>, PhantomData<T>);
|
|
||||||
|
|
||||||
impl<T> Tree<T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned + serde::ser::Serialize,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(t: Arc<sled::Tree>) -> Self {
|
|
||||||
Tree(t, PhantomData)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn iter(&self) -> Iter<T> {
|
|
||||||
Iter::new(self.0.iter())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn get<K>(&self, key: K) -> Result<Option<T>>
|
|
||||||
where
|
|
||||||
K: AsRef<[u8]>,
|
|
||||||
{
|
|
||||||
match self.0.get(key)? {
|
|
||||||
Some(vec) => serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn insert(&self, key: &str, value: T) -> Result<Option<T>> {
|
|
||||||
let vec = serde_json::to_vec(&value).map_err(|_| Error::Serialize)?;
|
|
||||||
|
|
||||||
Ok(self.0.insert(key, vec)?.map(move |_| value))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn remove(&self, key: &str) -> Result<Option<T>> {
|
|
||||||
match self.0.remove(key)? {
|
|
||||||
Some(vec) => serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn fetch_and_update<F>(&self, key: &str, f: F) -> Result<Option<T>>
|
|
||||||
where
|
|
||||||
F: Fn(Option<T>) -> Option<T>,
|
|
||||||
{
|
|
||||||
let final_opt = self.0.fetch_and_update(key, |opt| {
|
|
||||||
let new_opt = match opt {
|
|
||||||
Some(vec) => {
|
|
||||||
let t = serde_json::from_slice(&vec).map(Some).unwrap_or(None);
|
|
||||||
|
|
||||||
(f)(t)
|
|
||||||
}
|
|
||||||
None => (f)(None),
|
|
||||||
};
|
|
||||||
|
|
||||||
match new_opt {
|
|
||||||
Some(t) => serde_json::to_vec(&t).map(Some).unwrap_or(None),
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
|
|
||||||
match final_opt {
|
|
||||||
Some(vec) => serde_json::from_slice(&vec)
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
.map(Some),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct Iter<'a, T>(sled::Iter<'a>, PhantomData<T>);
|
|
||||||
|
|
||||||
impl<'a, T> Iter<'a, T> {
|
|
||||||
fn new(i: sled::Iter<'a>) -> Self {
|
|
||||||
Iter(i, PhantomData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, T> Iterator for Iter<'a, T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
type Item = Result<(sled::IVec, T)>;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next().map(|res| {
|
|
||||||
res.map_err(Error::from).and_then(|(k, v)| {
|
|
||||||
serde_json::from_slice(&v)
|
|
||||||
.map(|item| (k, item))
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, T> DoubleEndedIterator for Iter<'a, T>
|
|
||||||
where
|
|
||||||
T: serde::de::DeserializeOwned,
|
|
||||||
{
|
|
||||||
fn next_back(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next_back().map(|res| {
|
|
||||||
res.map_err(Error::from).and_then(|(k, v)| {
|
|
||||||
serde_json::from_slice(&v)
|
|
||||||
.map(|item| (k, item))
|
|
||||||
.map_err(|_| Error::Deserialize)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue