jobs-sled: revive it

This commit is contained in:
asonix 2021-02-04 12:37:29 -06:00
parent 928b6adb9b
commit bd51c9d73d
2 changed files with 132 additions and 133 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-sled-storage" name = "background-jobs-sled-storage"
description = "Sled storage backend for background-jobs" description = "Sled storage backend for background-jobs"
version = "0.8.0-alpha.0" version = "0.9.0"
license-file = "../LICENSE" license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs" repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -11,10 +11,13 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix-threadpool = "0.3.1" actix-rt = "2.0.0"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs-core = { version = "0.9.0", path = "../jobs-core" } background-jobs-core = { version = "0.9.0", path = "../jobs-core" }
bincode = "1.2"
chrono = "0.4" chrono = "0.4"
sled-extensions = { version = "0.3.0-alpha.0", features = ["bincode", "cbor"], git = "https://git.asonix.dog/Aardwolf/sled-extensions" } sled = "0.34"
serde_cbor = "0.11"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt"] }
uuid = { version = "0.8.1", features = ["v4", "serde"] } uuid = { version = "0.8.1", features = ["v4", "serde"] }

View file

@ -13,10 +13,11 @@
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
//! ``` //! ```
use actix_threadpool::{run, BlockingError}; use actix_rt::task::spawn_blocking;
use background_jobs_core::{JobInfo, Stats, Storage}; use background_jobs_core::{JobInfo, Stats};
use chrono::offset::Utc; use chrono::offset::Utc;
use sled_extensions::{bincode::Tree, cbor, Db, DbExt}; use sled::{Db, Tree};
use tokio::task::JoinError;
use uuid::Uuid; use uuid::Uuid;
/// The error produced by sled storage calls /// The error produced by sled storage calls
@ -24,7 +25,11 @@ use uuid::Uuid;
pub enum Error { pub enum Error {
/// Error in the database /// Error in the database
#[error("Error in sled extensions, {0}")] #[error("Error in sled extensions, {0}")]
Sled(sled_extensions::Error), Sled(#[from] sled::Error),
/// Error storing or retrieving job info
#[error("Error transforming job info, {0}")]
Cbor(#[from] serde_cbor::Error),
/// Error executing db operation /// Error executing db operation
#[error("Blocking operation was canceled")] #[error("Blocking operation was canceled")]
@ -36,140 +41,171 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)] #[derive(Clone)]
/// The Sled-backed storage implementation /// The Sled-backed storage implementation
pub struct SledStorage { pub struct Storage {
jobinfo: cbor::Tree<JobInfo>, id: Tree,
running: Tree<Uuid>, jobinfo: Tree,
running_inverse: Tree<Uuid>, running: Tree,
queue: Tree<String>, running_inverse: Tree,
stats: Tree<Stats>, queue: Tree,
lock: Tree<Uuid>, stats: Tree,
db: Db, db: Db,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Storage for SledStorage { impl background_jobs_core::Storage for Storage {
type Error = Error; type Error = Error;
async fn generate_id(&self) -> Result<Uuid> { async fn generate_id(&self) -> Result<Uuid> {
let this = self.clone(); let this = self.clone();
Ok(run(move || { Ok(spawn_blocking(move || {
let uuid = loop { let mut uuid;
let uuid = Uuid::new_v4(); while {
uuid = Uuid::new_v4();
if !this.jobinfo.contains_key(job_key(uuid))? { this.id
break uuid; .compare_and_swap(
} uuid.as_bytes(),
}; None as Option<&[u8]>,
Some(uuid.as_bytes()),
)?
.is_err()
} {}
Ok(uuid) as sled_extensions::Result<Uuid> Ok(uuid) as Result<Uuid>
}) })
.await?) .await??)
} }
async fn save_job(&self, job: JobInfo) -> Result<()> { async fn save_job(&self, job: JobInfo) -> Result<()> {
let this = self.clone(); let this = self.clone();
Ok(run(move || { Ok(spawn_blocking(move || {
this.jobinfo let job_vec = serde_cbor::to_vec(&job)?;
.insert(job_key(job.id()).as_bytes(), job)
.map(|_| ()) this.jobinfo.insert(job.id().as_bytes(), job_vec)?;
Ok(()) as Result<_>
}) })
.await?) .await??)
} }
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>> { async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>> {
let this = self.clone(); let this = self.clone();
Ok(run(move || this.jobinfo.get(job_key(id))).await?) Ok(spawn_blocking(move || {
if let Some(job_ivec) = this.jobinfo.get(id.as_bytes())? {
let job: JobInfo = serde_cbor::from_slice(&job_ivec)?;
Ok(Some(job)) as Result<_>
} else {
Ok(None)
}
})
.await??)
} }
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>> { async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>> {
let this = self.clone(); let this = self.clone();
let queue = queue.to_owned(); let queue = queue.to_owned();
Ok(run(move || { Ok(spawn_blocking(move || {
let queue_tree = this.queue.clone(); let mut job;
let job_tree = this.jobinfo.clone();
let queue2 = queue.clone();
this.lock_queue(&queue2, move || { let now = Utc::now();
let now = Utc::now();
let job = queue_tree while {
let job_opt = this
.queue
.iter() .iter()
.filter_map(|res| res.ok()) .filter_map(|res| res.ok())
.filter_map( .filter_map(|(id, in_queue)| {
|(id, in_queue)| { if queue.as_bytes() == in_queue.as_ref() {
if queue == in_queue { Some(id)
Some(id) } else {
} else { None
None }
} })
}, .filter_map(|id| this.jobinfo.get(id).ok())
)
.filter_map(|id| job_tree.get(id).ok())
.filter_map(|opt| opt) .filter_map(|opt| opt)
.find(|job| job.is_ready(now) && job.is_pending(now)); .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok())
.find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now));
if let Some(ref job) = job { job = if let Some(job) = job_opt {
queue_tree.remove(&job_key(job.id()))?; job
} } else {
return Ok(None);
};
Ok(job) as sled_extensions::Result<Option<JobInfo>> this.queue.remove(job.id().as_bytes())?.is_none()
}) } {}
Ok(Some(job)) as Result<Option<JobInfo>>
}) })
.await?) .await??)
} }
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> { async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> {
let this = self.clone(); let this = self.clone();
let queue = queue.to_owned(); let queue = queue.to_owned();
Ok(run(move || { Ok(spawn_blocking(move || {
if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? {
this.running.remove(&runner_key(runner_id))?; this.running.remove(runner_id)?;
} }
this.queue.insert(job_key(id).as_bytes(), queue).map(|_| ()) this.queue.insert(id.as_bytes(), queue.as_bytes())?;
Ok(()) as Result<_>
}) })
.await?) .await??)
} }
async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> { async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> {
let this = self.clone(); let this = self.clone();
Ok(run(move || { Ok(spawn_blocking(move || {
this.queue.remove(job_key(id))?; this.queue.remove(id.as_bytes())?;
this.running.insert(runner_key(runner_id).as_bytes(), id)?; this.running.insert(runner_id.as_bytes(), id.as_bytes())?;
this.running_inverse this.running_inverse
.insert(job_key(id).as_bytes(), runner_id)?; .insert(id.as_bytes(), runner_id.as_bytes())?;
Ok(()) as Result<()> Ok(()) as Result<()>
}) })
.await?) .await??)
} }
async fn delete_job(&self, id: Uuid) -> Result<()> { async fn delete_job(&self, id: Uuid) -> Result<()> {
let this = self.clone(); let this = self.clone();
Ok(run(move || { Ok(spawn_blocking(move || {
this.jobinfo.remove(&job_key(id))?; this.jobinfo.remove(id.as_bytes())?;
this.queue.remove(&job_key(id))?; this.queue.remove(id.as_bytes())?;
this.id.remove(id.as_bytes())?;
if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? {
this.running.remove(&runner_key(runner_id))?; this.running.remove(runner_id)?;
} }
Ok(()) as Result<()> Ok(()) as Result<()>
}) })
.await?) .await??)
} }
async fn get_stats(&self) -> Result<Stats> { async fn get_stats(&self) -> Result<Stats> {
let this = self.clone(); let this = self.clone();
Ok(run(move || Ok(this.stats.get("stats")?.unwrap_or_default()) as Result<Stats>).await?) let stats = spawn_blocking(move || {
let stats = if let Some(stats_ivec) = this.stats.get("stats")? {
bincode::deserialize(&stats_ivec).unwrap_or_default()
} else {
Stats::default()
};
Ok(stats) as Result<Stats>
})
.await??;
Ok(stats)
} }
async fn update_stats<F>(&self, f: F) -> Result<()> async fn update_stats<F>(&self, f: F) -> Result<()>
@ -178,83 +214,43 @@ impl Storage for SledStorage {
{ {
let this = self.clone(); let this = self.clone();
Ok(run(move || { Ok(spawn_blocking(move || {
this.stats.fetch_and_update("stats", move |opt| { this.stats.fetch_and_update("stats", move |opt| {
let stats = match opt { let stats = if let Some(stats_ivec) = opt {
Some(stats) => stats, bincode::deserialize(&stats_ivec).unwrap_or_default()
None => Stats::default(), } else {
Stats::default()
}; };
Some((f)(stats)) let new_stats = (f)(stats);
let stats_vec = bincode::serialize(&new_stats).ok()?;
Some(stats_vec)
})?; })?;
Ok(()) as Result<()> Ok(()) as Result<()>
}) })
.await?) .await??)
} }
} }
impl SledStorage { impl Storage {
/// Create a new Storage struct /// Create a new Storage struct
pub fn new(db: Db) -> Result<Self> { pub fn new(db: Db) -> Result<Self> {
Ok(SledStorage { Ok(Storage {
jobinfo: db.open_cbor_tree("background-jobs-jobinfo")?, id: db.open_tree("background-jobs-id")?,
running: db.open_bincode_tree("background-jobs-running")?, jobinfo: db.open_tree("background-jobs-jobinfo")?,
running_inverse: db.open_bincode_tree("background-jobs-running-inverse")?, running: db.open_tree("background-jobs-running")?,
queue: db.open_bincode_tree("background-jobs-queue")?, running_inverse: db.open_tree("background-jobs-running-inverse")?,
stats: db.open_bincode_tree("background-jobs-stats")?, queue: db.open_tree("background-jobs-queue")?,
lock: db.open_bincode_tree("background-jobs-lock")?, stats: db.open_tree("background-jobs-stats")?,
db, db,
}) })
} }
}
fn lock_queue<T, F>(&self, queue: &str, f: F) -> sled_extensions::Result<T> impl From<JoinError> for Error {
where fn from(_: JoinError) -> Self {
F: Fn() -> sled_extensions::Result<T>, Error::Canceled
{
let id = Uuid::new_v4();
let mut prev;
while {
prev = self.lock.fetch_and_update(queue, move |opt| match opt {
Some(_) => opt,
None => Some(id),
})?;
prev.is_some()
} {}
let res = (f)();
self.lock.fetch_and_update(queue, |_| None)?;
res
}
}
fn job_key(id: Uuid) -> String {
format!("job-{}", id)
}
fn runner_key(runner_id: Uuid) -> String {
format!("runner-{}", runner_id)
}
impl<T> From<BlockingError<T>> for Error
where
Error: From<T>,
T: std::fmt::Debug,
{
fn from(e: BlockingError<T>) -> Self {
match e {
BlockingError::Error(e) => e.into(),
BlockingError::Canceled => Error::Canceled,
}
}
}
impl From<sled_extensions::Error> for Error {
fn from(e: sled_extensions::Error) -> Self {
Error::Sled(e)
} }
} }