diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 9dd8c36..f876f14 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-sled-storage" description = "Sled storage backend for background-jobs" -version = "0.8.0-alpha.0" +version = "0.9.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -11,10 +11,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-threadpool = "0.3.1" +actix-rt = "2.0.0" async-trait = "0.1.24" background-jobs-core = { version = "0.9.0", path = "../jobs-core" } +bincode = "1.2" chrono = "0.4" -sled-extensions = { version = "0.3.0-alpha.0", features = ["bincode", "cbor"], git = "https://git.asonix.dog/Aardwolf/sled-extensions" } +sled = "0.34" +serde_cbor = "0.11" thiserror = "1.0" +tokio = { version = "1", default-features = false, features = ["rt"] } uuid = { version = "0.8.1", features = ["v4", "serde"] } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index e371fe1..77f52ca 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -13,10 +13,11 @@ //! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! ``` -use actix_threadpool::{run, BlockingError}; -use background_jobs_core::{JobInfo, Stats, Storage}; +use actix_rt::task::spawn_blocking; +use background_jobs_core::{JobInfo, Stats}; use chrono::offset::Utc; -use sled_extensions::{bincode::Tree, cbor, Db, DbExt}; +use sled::{Db, Tree}; +use tokio::task::JoinError; use uuid::Uuid; /// The error produced by sled storage calls @@ -24,7 +25,11 @@ use uuid::Uuid; pub enum Error { /// Error in the database #[error("Error in sled extensions, {0}")] - Sled(sled_extensions::Error), + Sled(#[from] sled::Error), + + /// Error storing or retrieving job info + #[error("Error transforming job info, {0}")] + Cbor(#[from] serde_cbor::Error), /// Error executing db operation #[error("Blocking operation was canceled")] @@ -36,140 +41,171 @@ pub type Result = std::result::Result; #[derive(Clone)] /// The Sled-backed storage implementation -pub struct SledStorage { - jobinfo: cbor::Tree, - running: Tree, - running_inverse: Tree, - queue: Tree, - stats: Tree, - lock: Tree, +pub struct Storage { + id: Tree, + jobinfo: Tree, + running: Tree, + running_inverse: Tree, + queue: Tree, + stats: Tree, db: Db, } #[async_trait::async_trait] -impl Storage for SledStorage { +impl background_jobs_core::Storage for Storage { type Error = Error; async fn generate_id(&self) -> Result { let this = self.clone(); - Ok(run(move || { - let uuid = loop { - let uuid = Uuid::new_v4(); + Ok(spawn_blocking(move || { + let mut uuid; + while { + uuid = Uuid::new_v4(); - if !this.jobinfo.contains_key(job_key(uuid))? { - break uuid; - } - }; + this.id + .compare_and_swap( + uuid.as_bytes(), + None as Option<&[u8]>, + Some(uuid.as_bytes()), + )? + .is_err() + } {} - Ok(uuid) as sled_extensions::Result + Ok(uuid) as Result }) - .await?) + .await??) } async fn save_job(&self, job: JobInfo) -> Result<()> { let this = self.clone(); - Ok(run(move || { - this.jobinfo - .insert(job_key(job.id()).as_bytes(), job) - .map(|_| ()) + Ok(spawn_blocking(move || { + let job_vec = serde_cbor::to_vec(&job)?; + + this.jobinfo.insert(job.id().as_bytes(), job_vec)?; + + Ok(()) as Result<_> }) - .await?) + .await??) } async fn fetch_job(&self, id: Uuid) -> Result> { let this = self.clone(); - Ok(run(move || this.jobinfo.get(job_key(id))).await?) + Ok(spawn_blocking(move || { + if let Some(job_ivec) = this.jobinfo.get(id.as_bytes())? { + let job: JobInfo = serde_cbor::from_slice(&job_ivec)?; + Ok(Some(job)) as Result<_> + } else { + Ok(None) + } + }) + .await??) } async fn fetch_job_from_queue(&self, queue: &str) -> Result> { let this = self.clone(); let queue = queue.to_owned(); - Ok(run(move || { - let queue_tree = this.queue.clone(); - let job_tree = this.jobinfo.clone(); - let queue2 = queue.clone(); + Ok(spawn_blocking(move || { + let mut job; - this.lock_queue(&queue2, move || { - let now = Utc::now(); + let now = Utc::now(); - let job = queue_tree + while { + let job_opt = this + .queue .iter() .filter_map(|res| res.ok()) - .filter_map( - |(id, in_queue)| { - if queue == in_queue { - Some(id) - } else { - None - } - }, - ) - .filter_map(|id| job_tree.get(id).ok()) + .filter_map(|(id, in_queue)| { + if queue.as_bytes() == in_queue.as_ref() { + Some(id) + } else { + None + } + }) + .filter_map(|id| this.jobinfo.get(id).ok()) .filter_map(|opt| opt) - .find(|job| job.is_ready(now) && job.is_pending(now)); + .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) + .find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now)); - if let Some(ref job) = job { - queue_tree.remove(&job_key(job.id()))?; - } + job = if let Some(job) = job_opt { + job + } else { + return Ok(None); + }; - Ok(job) as sled_extensions::Result> - }) + this.queue.remove(job.id().as_bytes())?.is_none() + } {} + + Ok(Some(job)) as Result> }) - .await?) + .await??) } async fn queue_job(&self, queue: &str, id: Uuid) -> Result<()> { let this = self.clone(); let queue = queue.to_owned(); - Ok(run(move || { - if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { - this.running.remove(&runner_key(runner_id))?; + Ok(spawn_blocking(move || { + if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? { + this.running.remove(runner_id)?; } - this.queue.insert(job_key(id).as_bytes(), queue).map(|_| ()) + this.queue.insert(id.as_bytes(), queue.as_bytes())?; + + Ok(()) as Result<_> }) - .await?) + .await??) } async fn run_job(&self, id: Uuid, runner_id: Uuid) -> Result<()> { let this = self.clone(); - Ok(run(move || { - this.queue.remove(job_key(id))?; - this.running.insert(runner_key(runner_id).as_bytes(), id)?; + Ok(spawn_blocking(move || { + this.queue.remove(id.as_bytes())?; + this.running.insert(runner_id.as_bytes(), id.as_bytes())?; this.running_inverse - .insert(job_key(id).as_bytes(), runner_id)?; + .insert(id.as_bytes(), runner_id.as_bytes())?; Ok(()) as Result<()> }) - .await?) + .await??) } async fn delete_job(&self, id: Uuid) -> Result<()> { let this = self.clone(); - Ok(run(move || { - this.jobinfo.remove(&job_key(id))?; - this.queue.remove(&job_key(id))?; + Ok(spawn_blocking(move || { + this.jobinfo.remove(id.as_bytes())?; + this.queue.remove(id.as_bytes())?; + this.id.remove(id.as_bytes())?; - if let Some(runner_id) = this.running_inverse.remove(&job_key(id))? { - this.running.remove(&runner_key(runner_id))?; + if let Some(runner_id) = this.running_inverse.remove(id.as_bytes())? { + this.running.remove(runner_id)?; } Ok(()) as Result<()> }) - .await?) + .await??) } async fn get_stats(&self) -> Result { let this = self.clone(); - Ok(run(move || Ok(this.stats.get("stats")?.unwrap_or_default()) as Result).await?) + let stats = spawn_blocking(move || { + let stats = if let Some(stats_ivec) = this.stats.get("stats")? { + bincode::deserialize(&stats_ivec).unwrap_or_default() + } else { + Stats::default() + }; + + Ok(stats) as Result + }) + .await??; + + Ok(stats) } async fn update_stats(&self, f: F) -> Result<()> @@ -178,83 +214,43 @@ impl Storage for SledStorage { { let this = self.clone(); - Ok(run(move || { + Ok(spawn_blocking(move || { this.stats.fetch_and_update("stats", move |opt| { - let stats = match opt { - Some(stats) => stats, - None => Stats::default(), + let stats = if let Some(stats_ivec) = opt { + bincode::deserialize(&stats_ivec).unwrap_or_default() + } else { + Stats::default() }; - Some((f)(stats)) + let new_stats = (f)(stats); + + let stats_vec = bincode::serialize(&new_stats).ok()?; + Some(stats_vec) })?; Ok(()) as Result<()> }) - .await?) + .await??) } } -impl SledStorage { +impl Storage { /// Create a new Storage struct pub fn new(db: Db) -> Result { - Ok(SledStorage { - jobinfo: db.open_cbor_tree("background-jobs-jobinfo")?, - running: db.open_bincode_tree("background-jobs-running")?, - running_inverse: db.open_bincode_tree("background-jobs-running-inverse")?, - queue: db.open_bincode_tree("background-jobs-queue")?, - stats: db.open_bincode_tree("background-jobs-stats")?, - lock: db.open_bincode_tree("background-jobs-lock")?, + Ok(Storage { + id: db.open_tree("background-jobs-id")?, + jobinfo: db.open_tree("background-jobs-jobinfo")?, + running: db.open_tree("background-jobs-running")?, + running_inverse: db.open_tree("background-jobs-running-inverse")?, + queue: db.open_tree("background-jobs-queue")?, + stats: db.open_tree("background-jobs-stats")?, db, }) } +} - fn lock_queue(&self, queue: &str, f: F) -> sled_extensions::Result - where - F: Fn() -> sled_extensions::Result, - { - let id = Uuid::new_v4(); - - let mut prev; - while { - prev = self.lock.fetch_and_update(queue, move |opt| match opt { - Some(_) => opt, - None => Some(id), - })?; - - prev.is_some() - } {} - - let res = (f)(); - - self.lock.fetch_and_update(queue, |_| None)?; - - res - } -} - -fn job_key(id: Uuid) -> String { - format!("job-{}", id) -} - -fn runner_key(runner_id: Uuid) -> String { - format!("runner-{}", runner_id) -} - -impl From> for Error -where - Error: From, - T: std::fmt::Debug, -{ - fn from(e: BlockingError) -> Self { - match e { - BlockingError::Error(e) => e.into(), - BlockingError::Canceled => Error::Canceled, - } - } -} - -impl From for Error { - fn from(e: sled_extensions::Error) -> Self { - Error::Sled(e) +impl From for Error { + fn from(_: JoinError) -> Self { + Error::Canceled } }