diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 6621491..7e575dd 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "background-jobs-core" -description = "Core types for implementing an asynchronous jobs processor on tokio" +description = "Core types for implementing an asynchronous jobs processor" version = "0.6.0" license-file = "../LICENSE" authors = ["asonix "] diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 4c8cdf7..5b4c29f 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,24 +1,5 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use failure::Error; -use futures::Future; +use futures::{future::IntoFuture, Future}; use serde::{de::DeserializeOwned, ser::Serialize}; use crate::{Backoff, MaxRetries, Processor}; @@ -32,6 +13,9 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// The application state provided to this job at runtime. type State: Clone + 'static; + /// The result of running this operation + type Future: IntoFuture; + /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index e566b38..9e0612d 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,22 +1,3 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use chrono::{offset::Utc, DateTime, Duration as OldDuration}; use log::trace; use serde_derive::{Deserialize, Serialize}; @@ -25,6 +6,7 @@ use serde_json::Value; use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +/// Information about the sate of an attempted job pub struct ReturnJobInfo { pub(crate) id: u64, pub(crate) result: JobResult, @@ -54,6 +36,7 @@ impl ReturnJobInfo { } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +/// Information about a newly created job pub struct NewJobInfo { /// Name of the processor that should handle this job processor: String, @@ -96,10 +79,12 @@ impl NewJobInfo { } } + /// The name of the queue this job will run in pub fn queue(&self) -> &str { &self.queue } + /// Whether this job is ready to be run immediately pub fn is_ready(&self) -> bool { self.next_queue.is_none() } @@ -160,6 +145,7 @@ pub struct JobInfo { } impl JobInfo { + /// The name of the queue this job will run in pub fn queue(&self) -> &str { &self.queue } @@ -176,6 +162,7 @@ impl JobInfo { self.args.clone() } + /// The ID of this job pub fn id(&self) -> u64 { self.id } @@ -207,6 +194,7 @@ impl JobInfo { ); } + /// Whether this job is ready to be run pub fn is_ready(&self, now: DateTime) -> bool { match self.next_queue { Some(ref time) => now > *time, @@ -225,6 +213,7 @@ impl JobInfo { should_retry } + /// Whether this job is pending execution pub fn is_pending(&self) -> bool { self.status == JobStatus::Pending } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 8fc5ac4..97b509a 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -1,21 +1,10 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ +#![deny(missing_docs)] + +//! # Background Jobs Core +//! _basic types and traits for implementing a background jobs processor_ +//! +//! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs +//! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate. use failure::{Error, Fail}; use serde_derive::{Deserialize, Serialize}; @@ -53,33 +42,45 @@ pub enum JobError { } #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +/// Indicate the state of a job after an attempted run pub enum JobResult { + /// The job succeeded Success, + + /// The job failed Failure, + + /// There was no processor to run the job MissingProcessor, } impl JobResult { + /// Indicate a successful job pub fn success() -> Self { JobResult::Success } + /// Indicate a failed job pub fn failure() -> Self { JobResult::Failure } + /// Indicate that the job's processor is not present pub fn missing_processor() -> Self { JobResult::MissingProcessor } + /// Check if the job failed pub fn is_failure(&self) -> bool { *self == JobResult::Failure } + /// Check if the job succeeded pub fn is_success(&self) -> bool { *self == JobResult::Success } + /// Check if the job is missing it's processor pub fn is_missing_processor(&self) -> bool { *self == JobResult::MissingProcessor } @@ -96,33 +97,46 @@ pub enum JobStatus { } impl JobStatus { + /// The job should be queued pub fn pending() -> Self { JobStatus::Pending } + /// The job is running pub fn running() -> Self { JobStatus::Running } + /// Check if the job is ready to be queued pub fn is_pending(&self) -> bool { *self == JobStatus::Pending } + /// Check if the job is running pub fn is_running(&self) -> bool { *self == JobStatus::Running } } #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +/// Different styles for retrying jobs pub enum Backoff { /// Seconds between execution + /// + /// For example, `Backoff::Linear(5)` will retry a failed job 5 seconds after the previous + /// attempt Linear(usize), /// Base for seconds between execution + /// + /// For example, `Backoff::Exponential(2)` will retry a failed job 2 seconds after the first + /// failure, 4 seconds after the second failure, 8 seconds after the third failure, and 16 + /// seconds after the fourth failure Exponential(usize), } #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +/// How many times a job should be retried before giving up pub enum MaxRetries { /// Keep retrying forever Infinite, diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 98188d4..5a1be8a 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -1,22 +1,3 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use chrono::{offset::Utc, DateTime}; use failure::{Error, Fail}; use futures::{ @@ -46,7 +27,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// ```rust /// use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; /// use failure::Error; -/// use futures::future::{Future, IntoFuture}; +/// use futures::future::Future; /// use log::info; /// use serde_derive::{Deserialize, Serialize}; /// @@ -55,11 +36,15 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// count: i32, /// } /// -/// impl Job<()> for MyJob { -/// fn run(self, _state: ()) -> Box + Send> { +/// impl Job for MyJob { +/// type Processor = MyProcessor; +/// type State = (); +/// type Future = Result<(), Error>; +/// +/// fn run(self, _state: Self::State) -> Self::Future { /// info!("Processing {}", self.count); /// -/// Box::new(Ok(()).into_future()) +/// Ok(()) /// } /// } /// @@ -82,6 +67,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// } /// ``` pub trait Processor: Clone { + /// The job this processor will process type Job: Job + 'static; /// The name of the processor @@ -177,7 +163,7 @@ pub trait Processor: Clone { let res = serde_json::from_value::(args); let fut = match res { - Ok(job) => Either::A(job.run(state).map_err(JobError::Processing)), + Ok(job) => Either::A(job.run(state).into_future().map_err(JobError::Processing)), Err(_) => Either::B(Err(JobError::Json).into_future()), }; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 5858eed..24735ea 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,22 +1,3 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use std::{collections::HashMap, sync::Arc}; use futures::future::{Either, Future, IntoFuture}; diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs index cd687d4..7d4df6e 100644 --- a/jobs-core/src/stats.rs +++ b/jobs-core/src/stats.rs @@ -1,34 +1,24 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use chrono::{offset::Utc, DateTime, Datelike, Timelike}; use serde_derive::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] +/// Statistics about the jobs processor pub struct Stats { + /// How many jobs are pending execution pub pending: usize, + + /// How many jobs are currently executing pub running: usize, + + /// How many jobs are permanently failed pub dead: JobStat, + + /// How many jobs have completed successfully pub complete: JobStat, } impl Stats { + /// A new, empty stats struct pub fn new() -> Self { Self::default() } @@ -83,6 +73,7 @@ impl Default for Stats { } #[derive(Clone, Debug, Deserialize, Serialize)] +/// A time-based overview of job completion and failures pub struct JobStat { this_hour: usize, today: usize, @@ -92,6 +83,7 @@ pub struct JobStat { } impl JobStat { + /// A new, empty job statistic pub fn new() -> Self { Self::default() } @@ -133,18 +125,22 @@ impl JobStat { self.this_month = 0; } + /// A count from the last hour pub fn this_hour(&self) -> usize { self.this_hour } + /// A count from the last day pub fn today(&self) -> usize { self.today } + /// A count from the last month pub fn this_month(&self) -> usize { self.this_month } + /// A total count pub fn all_time(&self) -> usize { self.all_time } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 48e4e32..4f5abdf 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,22 +1,3 @@ -/* - * This file is part of Background Jobs. - * - * Copyright © 2019 Riley Trautman - * - * Background Jobs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Background Jobs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Background Jobs. If not, see . - */ - use chrono::offset::Utc; use failure::Fail; use log::error; @@ -72,6 +53,7 @@ pub trait Storage: Clone + Send { where F: Fn(Stats) -> Stats; + /// Generate a new job based on the provided NewJobInfo fn new_job(&mut self, job: NewJobInfo) -> Result { let id = self.generate_id()?; @@ -85,6 +67,7 @@ pub trait Storage: Clone + Send { Ok(id) } + /// Fetch a job that is ready to be executed, marking it as running fn request_job(&mut self, queue: &str, runner_id: u64) -> Result, Self::Error> { match self.fetch_job_from_queue(queue)? { Some(mut job) => { @@ -107,6 +90,7 @@ pub trait Storage: Clone + Send { } } + /// "Return" a job to the database, marking it for retry if needed fn return_job( &mut self, ReturnJobInfo { id, result }: ReturnJobInfo, @@ -140,6 +124,7 @@ pub trait Storage: Clone + Send { } } +/// A default, in-memory implementation of a storage mechanism pub mod memory_storage { use super::{JobInfo, Stats}; use failure::Fail; @@ -150,6 +135,7 @@ pub mod memory_storage { }; #[derive(Clone)] + /// An In-Memory store for jobs pub struct Storage { inner: Arc>, } @@ -165,6 +151,7 @@ pub mod memory_storage { } impl Storage { + /// Create a new, empty job store pub fn new() -> Self { Storage { inner: Arc::new(Mutex::new(Inner { @@ -266,6 +253,7 @@ pub mod memory_storage { } #[derive(Clone, Debug, Fail)] + /// An error that is impossible to create pub enum Never {} impl fmt::Display for Never { @@ -273,8 +261,4 @@ pub mod memory_storage { match *self {} } } - - #[derive(Clone, Debug, Fail)] - #[fail(display = "Created too many storages, can't generate any more IDs")] - pub struct TooManyStoragesError; }