diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index a85b8eb..5fb8a93 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -1,8 +1,9 @@ use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; use actix::{Actor, Addr, SyncArbiter}; -use background_jobs_core::{Processor, ProcessorMap, Storage}; +use background_jobs_core::{Processor, ProcessorMap, Stats, Storage}; use failure::Error; +use futures::Future; mod pinger; mod server; @@ -11,7 +12,7 @@ pub use self::{server::Server, worker::LocalWorker}; use self::{ pinger::Pinger, - server::{CheckDb, EitherJob, RequestJob}, + server::{CheckDb, EitherJob, GetStats, RequestJob}, worker::ProcessJob, }; @@ -110,4 +111,18 @@ where self.inner.do_send(EitherJob::New(P::new_job(job)?)); Ok(()) } + + pub fn get_stats(&self) -> Box + Send> { + Box::new(self.inner.send(GetStats).then(coerce)) + } +} + +fn coerce(res: Result, F>) -> Result +where + E: From, +{ + match res { + Ok(inner) => inner, + Err(e) => Err(e.into()), + } } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index d70033c..d3ee3cc 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, VecDeque}; use actix::{Actor, Addr, Context, Handler, Message, SyncContext}; -use background_jobs_core::{JobInfo, NewJobInfo, Storage}; +use background_jobs_core::{JobInfo, NewJobInfo, Stats, Storage}; use failure::Error; use log::{debug, trace}; use serde_derive::Deserialize; @@ -53,6 +53,12 @@ impl Message for CheckDb { type Result = Result<(), Error>; } +pub struct GetStats; + +impl Message for GetStats { + type Result = Result; +} + struct Cache where W: Actor + Handler, @@ -247,3 +253,14 @@ where Ok(()) } } + +impl Handler for Server +where + W: Actor> + Handler, +{ + type Result = Result; + + fn handle(&mut self, _: GetStats, _: &mut Self::Context) -> Self::Result { + Ok(self.storage.get_stats()?) + } +} diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 9d0d6ab..b59c1a3 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -31,7 +31,7 @@ pub use crate::{ job_info::{JobInfo, NewJobInfo}, processor::Processor, processor_map::ProcessorMap, - storage::Storage, + storage::{JobStat, Stat, Stats, Storage}, }; #[derive(Debug, Fail)] diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index c79f387..3bd38a8 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -21,14 +21,15 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, path::PathBuf, str::Utf8Error, - sync::{Arc, RwLock, RwLockWriteGuard}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; -use chrono::offset::Utc; +use chrono::{offset::Utc, DateTime, Datelike, Timelike}; use failure::Fail; use kv::{json::Json, Bucket, Config, CursorOp, Error, Manager, Serde, Store, Txn, ValueBuf}; use lmdb::Error as LmdbError; use log::{info, trace}; +use serde_derive::{Deserialize, Serialize}; use crate::{JobInfo, JobStatus, NewJobInfo}; @@ -38,6 +39,7 @@ struct Buckets<'a> { staged: Bucket<'a, &'a [u8], ValueBuf>>, failed: Bucket<'a, &'a [u8], ValueBuf>>, finished: Bucket<'a, &'a [u8], ValueBuf>>, + stats: Bucket<'a, &'a [u8], ValueBuf>>, } impl<'a> Buckets<'a> { @@ -48,6 +50,20 @@ impl<'a> Buckets<'a> { staged: store.bucket(Some(Storage::job_staged()))?, failed: store.bucket(Some(Storage::job_failed()))?, finished: store.bucket(Some(Storage::job_finished()))?, + stats: store.bucket(Some(Storage::stats_store()))?, + }; + + Ok(b) + } + + fn new_readonly(store: &'a RwLockReadGuard) -> Result { + let b = Buckets { + queued: store.bucket(Some(Storage::job_queue()))?, + running: store.bucket(Some(Storage::job_running()))?, + staged: store.bucket(Some(Storage::job_staged()))?, + failed: store.bucket(Some(Storage::job_failed()))?, + finished: store.bucket(Some(Storage::job_finished()))?, + stats: store.bucket(Some(Storage::stats_store()))?, }; Ok(b) @@ -216,10 +232,7 @@ impl Storage { inner_txn.set(&job_bucket, key, job_value)?; self.queue_job(&buckets, inner_txn, key, runner_id)?; } else { - job.fail(); - let job_value = Json::to_value_buf(job)?; - inner_txn.set(&job_bucket, key, job_value)?; - self.fail_job(&buckets, inner_txn, key, runner_id)?; + self.fail_job(&buckets, &job_bucket, inner_txn, key, runner_id)?; } } @@ -359,23 +372,26 @@ impl Storage { let store = self.store.write()?; trace!("Got store"); - let bucket = store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; + let job_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; trace!("Got bucket"); let buckets = Buckets::new(&store)?; let mut txn = store.write_txn()?; trace!("Opened write txn"); - txn.set(&bucket, job_id.to_string().as_ref(), job_value)?; + txn.set(&job_bucket, job_id.to_string().as_ref(), job_value)?; trace!("Set value"); match status { JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, - JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref(), runner_id)?, + JobStatus::Failed => { + self.fail_job(&buckets, &job_bucket, &mut txn, job_id.as_ref(), runner_id)? + } JobStatus::Finished => { - self.finish_job(&buckets, &mut txn, job_id.as_ref(), runner_id)? + self.finish_job(&buckets, &job_bucket, &mut txn, job_id.as_ref(), runner_id)? } } @@ -496,22 +512,6 @@ impl Storage { Ok(()) } - fn fail_job<'env>( - &self, - buckets: &'env Buckets<'env>, - txn: &mut Txn<'env>, - id: &[u8], - runner_id: usize, - ) -> Result<(), Error> { - self.add_job_to(&buckets.failed, txn, id, runner_id)?; - self.delete_job_from(&buckets.finished, txn, id)?; - self.delete_job_from(&buckets.running, txn, id)?; - self.delete_job_from(&buckets.staged, txn, id)?; - self.delete_job_from(&buckets.queued, txn, id)?; - - Ok(()) - } - fn run_job<'env>( &self, buckets: &'env Buckets<'env>, @@ -528,9 +528,32 @@ impl Storage { Ok(()) } + fn fail_job<'env>( + &self, + buckets: &'env Buckets<'env>, + job_store: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + id: &[u8], + runner_id: usize, + ) -> Result<(), Error> { + self.add_job_to(&buckets.failed, txn, id, runner_id)?; + self.delete_job_from(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.staged, txn, id)?; + self.delete_job_from(&buckets.queued, txn, id)?; + txn.del(job_store, id)?; + + Stat::get_dead(&buckets.stats, txn)? + .fail_job() + .save(&buckets.stats, txn)?; + + Ok(()) + } + fn finish_job<'env>( &self, buckets: &'env Buckets<'env>, + job_store: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, id: &[u8], runner_id: usize, @@ -540,6 +563,11 @@ impl Storage { self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.queued, txn, id)?; + txn.del(job_store, id)?; + + Stat::get_finished(&buckets.stats, txn)? + .finish_job() + .save(&buckets.stats, txn)?; Ok(()) } @@ -575,6 +603,36 @@ impl Storage { Ok(()) } + pub fn get_stats(&self) -> Result { + let store = self.store.read()?; + let buckets = Buckets::new_readonly(&store)?; + + let mut txn = store.read_txn()?; + + let stats = { + let dead = Stat::get_dead(&buckets.stats, &mut txn)?.inner_stat(); + let complete = Stat::get_finished(&buckets.stats, &mut txn)?.inner_stat(); + + let mut queued_cursor = txn.read_cursor(&buckets.queued)?; + let mut staged_cursor = txn.read_cursor(&buckets.staged)?; + + let pending = queued_cursor.iter().count() + staged_cursor.iter().count(); + + let mut running_cursor = txn.read_cursor(&buckets.running)?; + let running = running_cursor.iter().count(); + + Stats { + dead, + complete, + pending, + running, + } + }; + + txn.commit()?; + Ok(stats) + } + // In all likelihood, this function is not necessary // // But in the event of multiple processes running on the same machine, it is good to have some @@ -640,7 +698,7 @@ impl Storage { Ok(item) } - fn buckets() -> [&'static str; 9] { + fn buckets() -> [&'static str; 10] { [ Storage::id_store(), Storage::job_store(), @@ -651,6 +709,7 @@ impl Storage { Storage::job_lock(), Storage::job_finished(), Storage::queue_port(), + Storage::stats_store(), ] } @@ -689,6 +748,10 @@ impl Storage { fn queue_port() -> &'static str { "queue-port" } + + fn stats_store() -> &'static str { + "stats-store" + } } #[derive(Debug, Fail)] @@ -711,3 +774,179 @@ impl From for PortMapError { PortMapError::Utf8(e) } } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Stats { + pub pending: usize, + pub running: usize, + pub dead: JobStat, + pub complete: JobStat, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Stat { + DeadJobs(JobStat), + CompletedJobs(JobStat), +} + +impl Stat { + fn get_finished<'env>( + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + ) -> Result { + Self::get(bucket, txn, Self::completed_jobs()).map(|opt| match opt { + Some(stat) => stat, + None => Stat::CompletedJobs(JobStat::new()), + }) + } + + fn get_dead<'env>( + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + ) -> Result { + Self::get(bucket, txn, Self::dead_jobs()).map(|opt| match opt { + Some(stat) => stat, + None => Stat::DeadJobs(JobStat::new()), + }) + } + + fn get<'env>( + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + key: &str, + ) -> Result, Error> { + match txn.get(bucket, key.as_ref()) { + Ok(stat) => Ok(Some(stat.inner()?.to_serde())), + Err(e) => match e { + Error::NotFound => Ok(None), + err => return Err(err), + }, + } + } + + fn name(&self) -> &str { + match *self { + Stat::DeadJobs(_) => Stat::dead_jobs(), + Stat::CompletedJobs(_) => Stat::completed_jobs(), + } + } + + fn finish_job(self) -> Self { + match self { + Stat::CompletedJobs(mut job_stat) => { + job_stat.increment(); + Stat::CompletedJobs(job_stat) + } + other => other, + } + } + + fn fail_job(self) -> Self { + match self { + Stat::DeadJobs(mut job_stat) => { + job_stat.increment(); + Stat::DeadJobs(job_stat) + } + other => other, + } + } + + fn inner_stat(self) -> JobStat { + match self { + Stat::DeadJobs(job_stat) => job_stat, + Stat::CompletedJobs(job_stat) => job_stat, + } + } + + fn dead_jobs() -> &'static str { + "DeadJobs" + } + + fn completed_jobs() -> &'static str { + "CompletedJobs" + } + + fn save<'env>( + self, + bucket: &'env Bucket<&[u8], ValueBuf>>, + txn: &mut Txn<'env>, + ) -> Result<(), Error> { + let name = self.name().to_owned(); + txn.set(bucket, name.as_ref(), Json::to_value_buf(self)?)?; + Ok(()) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct JobStat { + this_hour: usize, + today: usize, + this_month: usize, + all_time: usize, + updated_at: DateTime, +} + +impl JobStat { + fn new() -> Self { + JobStat { + this_hour: 0, + today: 0, + this_month: 0, + all_time: 0, + updated_at: Utc::now(), + } + } + + fn increment(&mut self) { + self.this_hour += 1; + self.today += 1; + self.this_month += 1; + self.all_time += 1; + + self.tick(); + } + + fn tick(&mut self) { + let now = Utc::now(); + + if now.month() != self.updated_at.month() { + self.next_month(); + } else if now.day() != self.updated_at.day() { + self.next_day(); + } else if now.hour() != self.updated_at.hour() { + self.next_hour(); + } + + self.updated_at = now; + } + + fn next_hour(&mut self) { + self.this_hour = 0; + } + + fn next_day(&mut self) { + self.next_hour(); + self.today = 0; + } + + fn next_month(&mut self) { + self.next_day(); + self.this_month = 0; + } + + pub fn this_hour(&self) -> usize { + self.this_hour + } + + pub fn today(&self) -> usize { + self.today + } + + pub fn this_month(&self) -> usize { + self.this_month + } + + pub fn all_time(&self) -> usize { + self.all_time + } +} diff --git a/src/lib.rs b/src/lib.rs index 860accd..7ac2395 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -273,7 +273,7 @@ //! `background-jobs-core` crate, which provides the LMDB storage, Processor and Job traits, as well as some //! other useful types for implementing a jobs processor. -pub use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; +pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats}; #[cfg(feature = "background-jobs-server")] pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig};