Prepare jobs-core for release

This commit is contained in:
asonix 2019-09-22 12:12:08 -05:00
parent 0522c83c33
commit d8d4b026fe
8 changed files with 77 additions and 143 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "background-jobs-core" name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor on tokio" description = "Core types for implementing an asynchronous jobs processor"
version = "0.6.0" version = "0.6.0"
license-file = "../LICENSE" license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]

View file

@ -1,24 +1,5 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use failure::Error; use failure::Error;
use futures::Future; use futures::{future::IntoFuture, Future};
use serde::{de::DeserializeOwned, ser::Serialize}; use serde::{de::DeserializeOwned, ser::Serialize};
use crate::{Backoff, MaxRetries, Processor}; use crate::{Backoff, MaxRetries, Processor};
@ -32,6 +13,9 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// The application state provided to this job at runtime. /// The application state provided to this job at runtime.
type State: Clone + 'static; type State: Clone + 'static;
/// The result of running this operation
type Future: IntoFuture<Item = (), Error = Error>;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
/// This should contain all the logic needed to complete a job. If that means queuing more /// This should contain all the logic needed to complete a job. If that means queuing more

View file

@ -1,22 +1,3 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use chrono::{offset::Utc, DateTime, Duration as OldDuration}; use chrono::{offset::Utc, DateTime, Duration as OldDuration};
use log::trace; use log::trace;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
@ -25,6 +6,7 @@ use serde_json::Value;
use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
/// Information about the sate of an attempted job
pub struct ReturnJobInfo { pub struct ReturnJobInfo {
pub(crate) id: u64, pub(crate) id: u64,
pub(crate) result: JobResult, pub(crate) result: JobResult,
@ -54,6 +36,7 @@ impl ReturnJobInfo {
} }
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
/// Information about a newly created job
pub struct NewJobInfo { pub struct NewJobInfo {
/// Name of the processor that should handle this job /// Name of the processor that should handle this job
processor: String, processor: String,
@ -96,10 +79,12 @@ impl NewJobInfo {
} }
} }
/// The name of the queue this job will run in
pub fn queue(&self) -> &str { pub fn queue(&self) -> &str {
&self.queue &self.queue
} }
/// Whether this job is ready to be run immediately
pub fn is_ready(&self) -> bool { pub fn is_ready(&self) -> bool {
self.next_queue.is_none() self.next_queue.is_none()
} }
@ -160,6 +145,7 @@ pub struct JobInfo {
} }
impl JobInfo { impl JobInfo {
/// The name of the queue this job will run in
pub fn queue(&self) -> &str { pub fn queue(&self) -> &str {
&self.queue &self.queue
} }
@ -176,6 +162,7 @@ impl JobInfo {
self.args.clone() self.args.clone()
} }
/// The ID of this job
pub fn id(&self) -> u64 { pub fn id(&self) -> u64 {
self.id self.id
} }
@ -207,6 +194,7 @@ impl JobInfo {
); );
} }
/// Whether this job is ready to be run
pub fn is_ready(&self, now: DateTime<Utc>) -> bool { pub fn is_ready(&self, now: DateTime<Utc>) -> bool {
match self.next_queue { match self.next_queue {
Some(ref time) => now > *time, Some(ref time) => now > *time,
@ -225,6 +213,7 @@ impl JobInfo {
should_retry should_retry
} }
/// Whether this job is pending execution
pub fn is_pending(&self) -> bool { pub fn is_pending(&self) -> bool {
self.status == JobStatus::Pending self.status == JobStatus::Pending
} }

View file

@ -1,21 +1,10 @@
/* #![deny(missing_docs)]
* This file is part of Background Jobs.
* //! # Background Jobs Core
* Copyright © 2019 Riley Trautman //! _basic types and traits for implementing a background jobs processor_
* //!
* Background Jobs is free software: you can redistribute it and/or modify //! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs
* it under the terms of the GNU General Public License as published by //! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate.
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use failure::{Error, Fail}; use failure::{Error, Fail};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
@ -53,33 +42,45 @@ pub enum JobError {
} }
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
/// Indicate the state of a job after an attempted run
pub enum JobResult { pub enum JobResult {
/// The job succeeded
Success, Success,
/// The job failed
Failure, Failure,
/// There was no processor to run the job
MissingProcessor, MissingProcessor,
} }
impl JobResult { impl JobResult {
/// Indicate a successful job
pub fn success() -> Self { pub fn success() -> Self {
JobResult::Success JobResult::Success
} }
/// Indicate a failed job
pub fn failure() -> Self { pub fn failure() -> Self {
JobResult::Failure JobResult::Failure
} }
/// Indicate that the job's processor is not present
pub fn missing_processor() -> Self { pub fn missing_processor() -> Self {
JobResult::MissingProcessor JobResult::MissingProcessor
} }
/// Check if the job failed
pub fn is_failure(&self) -> bool { pub fn is_failure(&self) -> bool {
*self == JobResult::Failure *self == JobResult::Failure
} }
/// Check if the job succeeded
pub fn is_success(&self) -> bool { pub fn is_success(&self) -> bool {
*self == JobResult::Success *self == JobResult::Success
} }
/// Check if the job is missing it's processor
pub fn is_missing_processor(&self) -> bool { pub fn is_missing_processor(&self) -> bool {
*self == JobResult::MissingProcessor *self == JobResult::MissingProcessor
} }
@ -96,33 +97,46 @@ pub enum JobStatus {
} }
impl JobStatus { impl JobStatus {
/// The job should be queued
pub fn pending() -> Self { pub fn pending() -> Self {
JobStatus::Pending JobStatus::Pending
} }
/// The job is running
pub fn running() -> Self { pub fn running() -> Self {
JobStatus::Running JobStatus::Running
} }
/// Check if the job is ready to be queued
pub fn is_pending(&self) -> bool { pub fn is_pending(&self) -> bool {
*self == JobStatus::Pending *self == JobStatus::Pending
} }
/// Check if the job is running
pub fn is_running(&self) -> bool { pub fn is_running(&self) -> bool {
*self == JobStatus::Running *self == JobStatus::Running
} }
} }
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
/// Different styles for retrying jobs
pub enum Backoff { pub enum Backoff {
/// Seconds between execution /// Seconds between execution
///
/// For example, `Backoff::Linear(5)` will retry a failed job 5 seconds after the previous
/// attempt
Linear(usize), Linear(usize),
/// Base for seconds between execution /// Base for seconds between execution
///
/// For example, `Backoff::Exponential(2)` will retry a failed job 2 seconds after the first
/// failure, 4 seconds after the second failure, 8 seconds after the third failure, and 16
/// seconds after the fourth failure
Exponential(usize), Exponential(usize),
} }
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
/// How many times a job should be retried before giving up
pub enum MaxRetries { pub enum MaxRetries {
/// Keep retrying forever /// Keep retrying forever
Infinite, Infinite,

View file

@ -1,22 +1,3 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use chrono::{offset::Utc, DateTime}; use chrono::{offset::Utc, DateTime};
use failure::{Error, Fail}; use failure::{Error, Fail};
use futures::{ use futures::{
@ -46,7 +27,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// ```rust /// ```rust
/// use background_jobs_core::{Backoff, Job, MaxRetries, Processor}; /// use background_jobs_core::{Backoff, Job, MaxRetries, Processor};
/// use failure::Error; /// use failure::Error;
/// use futures::future::{Future, IntoFuture}; /// use futures::future::Future;
/// use log::info; /// use log::info;
/// use serde_derive::{Deserialize, Serialize}; /// use serde_derive::{Deserialize, Serialize};
/// ///
@ -55,11 +36,15 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// count: i32, /// count: i32,
/// } /// }
/// ///
/// impl Job<()> for MyJob { /// impl Job for MyJob {
/// fn run(self, _state: ()) -> Box<dyn Future<Item = (), Error = Error> + Send> { /// type Processor = MyProcessor;
/// type State = ();
/// type Future = Result<(), Error>;
///
/// fn run(self, _state: Self::State) -> Self::Future {
/// info!("Processing {}", self.count); /// info!("Processing {}", self.count);
/// ///
/// Box::new(Ok(()).into_future()) /// Ok(())
/// } /// }
/// } /// }
/// ///
@ -82,6 +67,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo};
/// } /// }
/// ``` /// ```
pub trait Processor: Clone { pub trait Processor: Clone {
/// The job this processor will process
type Job: Job + 'static; type Job: Job + 'static;
/// The name of the processor /// The name of the processor
@ -177,7 +163,7 @@ pub trait Processor: Clone {
let res = serde_json::from_value::<Self::Job>(args); let res = serde_json::from_value::<Self::Job>(args);
let fut = match res { let fut = match res {
Ok(job) => Either::A(job.run(state).map_err(JobError::Processing)), Ok(job) => Either::A(job.run(state).into_future().map_err(JobError::Processing)),
Err(_) => Either::B(Err(JobError::Json).into_future()), Err(_) => Either::B(Err(JobError::Json).into_future()),
}; };

View file

@ -1,22 +1,3 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use futures::future::{Either, Future, IntoFuture}; use futures::future::{Either, Future, IntoFuture};

View file

@ -1,34 +1,24 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use chrono::{offset::Utc, DateTime, Datelike, Timelike}; use chrono::{offset::Utc, DateTime, Datelike, Timelike};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
/// Statistics about the jobs processor
pub struct Stats { pub struct Stats {
/// How many jobs are pending execution
pub pending: usize, pub pending: usize,
/// How many jobs are currently executing
pub running: usize, pub running: usize,
/// How many jobs are permanently failed
pub dead: JobStat, pub dead: JobStat,
/// How many jobs have completed successfully
pub complete: JobStat, pub complete: JobStat,
} }
impl Stats { impl Stats {
/// A new, empty stats struct
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -83,6 +73,7 @@ impl Default for Stats {
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
/// A time-based overview of job completion and failures
pub struct JobStat { pub struct JobStat {
this_hour: usize, this_hour: usize,
today: usize, today: usize,
@ -92,6 +83,7 @@ pub struct JobStat {
} }
impl JobStat { impl JobStat {
/// A new, empty job statistic
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -133,18 +125,22 @@ impl JobStat {
self.this_month = 0; self.this_month = 0;
} }
/// A count from the last hour
pub fn this_hour(&self) -> usize { pub fn this_hour(&self) -> usize {
self.this_hour self.this_hour
} }
/// A count from the last day
pub fn today(&self) -> usize { pub fn today(&self) -> usize {
self.today self.today
} }
/// A count from the last month
pub fn this_month(&self) -> usize { pub fn this_month(&self) -> usize {
self.this_month self.this_month
} }
/// A total count
pub fn all_time(&self) -> usize { pub fn all_time(&self) -> usize {
self.all_time self.all_time
} }

View file

@ -1,22 +1,3 @@
/*
* This file is part of Background Jobs.
*
* Copyright © 2019 Riley Trautman
*
* Background Jobs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Background Jobs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Background Jobs. If not, see <http://www.gnu.org/licenses/>.
*/
use chrono::offset::Utc; use chrono::offset::Utc;
use failure::Fail; use failure::Fail;
use log::error; use log::error;
@ -72,6 +53,7 @@ pub trait Storage: Clone + Send {
where where
F: Fn(Stats) -> Stats; F: Fn(Stats) -> Stats;
/// Generate a new job based on the provided NewJobInfo
fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Self::Error> { fn new_job(&mut self, job: NewJobInfo) -> Result<u64, Self::Error> {
let id = self.generate_id()?; let id = self.generate_id()?;
@ -85,6 +67,7 @@ pub trait Storage: Clone + Send {
Ok(id) Ok(id)
} }
/// Fetch a job that is ready to be executed, marking it as running
fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Self::Error> { fn request_job(&mut self, queue: &str, runner_id: u64) -> Result<Option<JobInfo>, Self::Error> {
match self.fetch_job_from_queue(queue)? { match self.fetch_job_from_queue(queue)? {
Some(mut job) => { Some(mut job) => {
@ -107,6 +90,7 @@ pub trait Storage: Clone + Send {
} }
} }
/// "Return" a job to the database, marking it for retry if needed
fn return_job( fn return_job(
&mut self, &mut self,
ReturnJobInfo { id, result }: ReturnJobInfo, ReturnJobInfo { id, result }: ReturnJobInfo,
@ -140,6 +124,7 @@ pub trait Storage: Clone + Send {
} }
} }
/// A default, in-memory implementation of a storage mechanism
pub mod memory_storage { pub mod memory_storage {
use super::{JobInfo, Stats}; use super::{JobInfo, Stats};
use failure::Fail; use failure::Fail;
@ -150,6 +135,7 @@ pub mod memory_storage {
}; };
#[derive(Clone)] #[derive(Clone)]
/// An In-Memory store for jobs
pub struct Storage { pub struct Storage {
inner: Arc<Mutex<Inner>>, inner: Arc<Mutex<Inner>>,
} }
@ -165,6 +151,7 @@ pub mod memory_storage {
} }
impl Storage { impl Storage {
/// Create a new, empty job store
pub fn new() -> Self { pub fn new() -> Self {
Storage { Storage {
inner: Arc::new(Mutex::new(Inner { inner: Arc::new(Mutex::new(Inner {
@ -266,6 +253,7 @@ pub mod memory_storage {
} }
#[derive(Clone, Debug, Fail)] #[derive(Clone, Debug, Fail)]
/// An error that is impossible to create
pub enum Never {} pub enum Never {}
impl fmt::Display for Never { impl fmt::Display for Never {
@ -273,8 +261,4 @@ pub mod memory_storage {
match *self {} match *self {}
} }
} }
#[derive(Clone, Debug, Fail)]
#[fail(display = "Created too many storages, can't generate any more IDs")]
pub struct TooManyStoragesError;
} }