mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-05-21 21:18:08 +00:00
Compare commits
5 commits
9cb57a03f5
...
13b72ec9cc
Author | SHA1 | Date | |
---|---|---|---|
13b72ec9cc | |||
141db9afc8 | |||
8217012237 | |||
424f792cb0 | |||
048dc341bc |
15
Cargo.toml
15
Cargo.toml
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs"
|
||||
description = "asynchronous background jobs implemented with pluggable backends and runtimes"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -21,6 +21,7 @@ members = [
|
|||
"jobs-sled",
|
||||
"jobs-tokio",
|
||||
"examples/basic-example",
|
||||
"examples/error-example",
|
||||
"examples/long-example",
|
||||
"examples/managed-example",
|
||||
"examples/metrics-example",
|
||||
|
@ -43,30 +44,30 @@ completion-logging = [
|
|||
error-logging = ["background-jobs-core/error-logging"]
|
||||
|
||||
[dependencies.background-jobs-core]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-core"
|
||||
|
||||
[dependencies.background-jobs-actix]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-actix"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-metrics]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-metrics"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-postgres]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-postgres"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-sled]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-sled"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-tokio]
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
path = "jobs-tokio"
|
||||
optional = true
|
||||
|
|
15
README.md
15
README.md
|
@ -15,7 +15,6 @@ might not be the best experience.
|
|||
[dependencies]
|
||||
actix-rt = "2.2.0"
|
||||
background-jobs = "0.15.0"
|
||||
anyhow = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
```
|
||||
|
||||
|
@ -24,8 +23,7 @@ Jobs are a combination of the data required to perform an operation, and the log
|
|||
operation. They implement the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
|
||||
|
||||
```rust
|
||||
use background_jobs::Job;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{Job, BoxError};
|
||||
use std::future::{ready, Ready};
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
|
@ -45,7 +43,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = ();
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
const NAME: &'static str = "MyJob";
|
||||
|
||||
|
@ -80,7 +79,8 @@ impl MyState {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
@ -126,11 +126,10 @@ With that out of the way, back to the examples:
|
|||
|
||||
##### Main
|
||||
```rust
|
||||
use background_jobs::{create_server, WorkerConfig};
|
||||
use anyhow::Error;
|
||||
use background_jobs::{create_server, actix::WorkerConfig, BoxError};
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
// Set up our Storage
|
||||
// For this example, we use the default in-memory storage mechanism
|
||||
use background_jobs::memory_storage::{ActixTimer, Storage};
|
||||
|
|
|
@ -8,8 +8,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
memory_storage::{ActixTimer, Storage},
|
||||
MaxRetries, UnsendJob as Job,
|
||||
BoxError, MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
|
@ -26,7 +25,7 @@ pub struct MyJob {
|
|||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -84,7 +83,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
|
|
1
examples/error-example/.gitignore
vendored
Normal file
1
examples/error-example/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
/my-sled-db
|
19
examples/error-example/Cargo.toml
Normal file
19
examples/error-example/Cargo.toml
Normal file
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "error-example"
|
||||
version = "0.1.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
background-jobs = { version = "0.18.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
"sled",
|
||||
] }
|
||||
time = "0.3"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
sled = "0.34"
|
140
examples/error-example/src/main.rs
Normal file
140
examples/error-example/src/main.rs
Normal file
|
@ -0,0 +1,140 @@
|
|||
use actix_rt::Arbiter;
|
||||
use background_jobs::{actix::WorkerConfig, sled::Storage, BoxError, Job, MaxRetries};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
const DEFAULT_QUEUE: &str = "default";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MyState {
|
||||
pub app_name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct MyJob {
|
||||
some_usize: usize,
|
||||
other_usize: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct ErroringJob;
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.init();
|
||||
|
||||
// Set up our Storage
|
||||
let db = sled::Config::new().temporary(true).open()?;
|
||||
let storage = Storage::new(db)?;
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
// Configure and start our workers
|
||||
let queue_handle =
|
||||
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
|
||||
.register::<ErroringJob>()
|
||||
.register::<MyJob>()
|
||||
.set_worker_count(DEFAULT_QUEUE, 16)
|
||||
.start();
|
||||
|
||||
// Queue some panicking job
|
||||
for _ in 0..32 {
|
||||
queue_handle.queue(ErroringJob).await?;
|
||||
}
|
||||
|
||||
// Queue our jobs
|
||||
queue_handle.queue(MyJob::new(1, 2)).await?;
|
||||
queue_handle.queue(MyJob::new(3, 4)).await?;
|
||||
queue_handle.queue(MyJob::new(5, 6)).await?;
|
||||
queue_handle
|
||||
.schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2))
|
||||
.await?;
|
||||
|
||||
// Block on Actix
|
||||
actix_rt::signal::ctrl_c().await?;
|
||||
|
||||
arbiter.stop();
|
||||
let _ = arbiter.join();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl MyState {
|
||||
pub fn new(app_name: &str) -> Self {
|
||||
MyState {
|
||||
app_name: app_name.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MyJob {
|
||||
pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||
MyJob {
|
||||
some_usize,
|
||||
other_usize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
// registered.
|
||||
const NAME: &'static str = "MyJob";
|
||||
|
||||
// The queue that this processor belongs to
|
||||
//
|
||||
// Workers have the option to subscribe to specific queues, so this is important to
|
||||
// determine which worker will call the processor
|
||||
//
|
||||
// Jobs can optionally override the queue they're spawned on
|
||||
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
|
||||
// The number of times background-jobs should try to retry a job before giving up
|
||||
//
|
||||
// Jobs can optionally override this value
|
||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
|
||||
|
||||
fn run(self, state: MyState) -> Self::Future {
|
||||
info!("{}: args, {:?}", state.app_name, self);
|
||||
|
||||
ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Boom;
|
||||
impl std::fmt::Display for Boom {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "boom")
|
||||
}
|
||||
}
|
||||
impl std::error::Error for Boom {}
|
||||
|
||||
impl Job for ErroringJob {
|
||||
type State = MyState;
|
||||
type Error = Boom;
|
||||
type Future = Ready<Result<(), Boom>>;
|
||||
|
||||
const NAME: &'static str = "ErroringJob";
|
||||
|
||||
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
|
||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(0);
|
||||
|
||||
fn run(self, _: MyState) -> Self::Future {
|
||||
ready(Err(Boom))
|
||||
}
|
||||
}
|
|
@ -8,8 +8,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
BoxError, MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Future, Ready},
|
||||
|
@ -30,7 +29,7 @@ pub struct MyJob {
|
|||
pub struct LongJob;
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -90,7 +89,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
|
@ -120,7 +120,8 @@ impl Job for MyJob {
|
|||
|
||||
impl Job for LongJob {
|
||||
type State = MyState;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||
type Error = BoxError;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>>>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
const NAME: &'static str = "LongJob";
|
||||
|
|
|
@ -8,8 +8,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled"] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", features = [ "error-logging", "sled"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
BoxError, MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
|
@ -29,7 +28,7 @@ pub struct MyJob {
|
|||
pub struct StopJob;
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -102,7 +101,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
|
@ -132,7 +132,8 @@ impl Job for MyJob {
|
|||
|
||||
impl Job for StopJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
const NAME: &'static str = "StopJob";
|
||||
|
|
|
@ -8,8 +8,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
metrics::MetricsStorage,
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
BoxError, MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{future::Future, pin::Pin, time::Duration};
|
||||
use tracing::info;
|
||||
|
@ -24,7 +23,7 @@ pub struct MyJob {
|
|||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn"));
|
||||
|
||||
// Install the metrics subscriber
|
||||
|
@ -89,7 +88,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + 'static>>;
|
||||
type Error = BoxError;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + 'static>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
|
|
|
@ -7,8 +7,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio" ] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio" ] }
|
||||
time = "0.3"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use anyhow::Error;
|
||||
use background_jobs::{sled::Storage, tokio::WorkerConfig, Job, MaxRetries};
|
||||
use background_jobs::{sled::Storage, tokio::WorkerConfig, BoxError, Job, MaxRetries};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
|
@ -24,7 +23,7 @@ pub struct MyJob {
|
|||
pub struct PanickingJob;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -83,7 +82,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
@ -112,7 +112,8 @@ impl Job for MyJob {
|
|||
|
||||
impl Job for PanickingJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
const NAME: &'static str = "PanickingJob";
|
||||
|
||||
|
|
|
@ -7,8 +7,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.9.0"
|
||||
anyhow = "1.0.79"
|
||||
background-jobs = { version = "0.17.0", features = ["postgres"], path = "../.." }
|
||||
background-jobs = { version = "0.18.0", features = ["postgres"], path = "../.." }
|
||||
serde = { version = "1.0.195", features = ["derive"] }
|
||||
tokio = { version = "1.35.1", features = ["full"] }
|
||||
tracing = "0.1.40"
|
||||
|
|
|
@ -2,7 +2,7 @@ use actix_rt::Arbiter;
|
|||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
postgres::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
BoxError, MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
// use background_jobs_sled_storage::Storage;
|
||||
use std::{
|
||||
|
@ -26,7 +26,7 @@ pub struct MyJob {
|
|||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -90,7 +90,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<anyhow::Result<()>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
|
|
|
@ -7,8 +7,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio"] }
|
||||
background-jobs = { version = "0.18.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
memory_storage::{Storage, TokioTimer},
|
||||
tokio::WorkerConfig,
|
||||
Job, MaxRetries,
|
||||
BoxError, Job, MaxRetries,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
|
@ -25,7 +24,7 @@ pub struct MyJob {
|
|||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
async fn main() -> Result<(), BoxError> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
|
@ -79,7 +78,8 @@ impl MyJob {
|
|||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Error = BoxError;
|
||||
type Future = Ready<Result<(), BoxError>>;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-actix"
|
||||
description = "in-process jobs processor based on Actix"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -11,9 +11,8 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-rt = "2.5.1"
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.24"
|
||||
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
|
||||
background-jobs-core = { version = "0.18.0", path = "../jobs-core" }
|
||||
metrics = "0.22.0"
|
||||
tracing = "0.1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -12,8 +12,7 @@
|
|||
//!
|
||||
//! ### Example
|
||||
//! ```rust
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs_core::{Backoff, Job, MaxRetries};
|
||||
//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError};
|
||||
//! use background_jobs_actix::{ActixTimer, WorkerConfig};
|
||||
//! use std::future::{ready, Ready};
|
||||
//!
|
||||
|
@ -31,7 +30,7 @@
|
|||
//! }
|
||||
//!
|
||||
//! #[actix_rt::main]
|
||||
//! async fn main() -> Result<(), Error> {
|
||||
//! async fn main() -> Result<(), BoxError> {
|
||||
//! // Set up our Storage
|
||||
//! // For this example, we use the default in-memory storage mechanism
|
||||
//! use background_jobs_core::memory_storage::Storage;
|
||||
|
@ -72,7 +71,7 @@
|
|||
//!
|
||||
//! impl Job for MyJob {
|
||||
//! type State = MyState;
|
||||
//! type Future = Ready<Result<(), Error>>;
|
||||
//! type Future = Ready<Result<(), BoxError>>;
|
||||
//!
|
||||
//! // The name of the job. It is super important that each job has a unique name,
|
||||
//! // because otherwise one job will overwrite another job when they're being
|
||||
|
@ -114,9 +113,8 @@
|
|||
//! ```
|
||||
|
||||
use actix_rt::{Arbiter, ArbiterHandle};
|
||||
use anyhow::Error;
|
||||
use background_jobs_core::{
|
||||
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage,
|
||||
memory_storage::Timer, new_job, new_scheduled_job, BoxError, Job, ProcessorMap, Storage,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
|
@ -469,7 +467,7 @@ impl QueueHandle {
|
|||
///
|
||||
/// This job will be sent to the server for storage, and will execute whenever a worker for the
|
||||
/// job's queue is free to do so.
|
||||
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
|
||||
pub async fn queue<J>(&self, job: J) -> Result<(), BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
@ -482,7 +480,7 @@ impl QueueHandle {
|
|||
///
|
||||
/// This job will be sent to the server for storage, and will execute after the specified time
|
||||
/// and when a worker for the job's queue is free to do so.
|
||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
|
||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
use anyhow::Error;
|
||||
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
|
||||
use background_jobs_core::{BoxError, JobInfo, NewJobInfo, ReturnJobInfo, Storage};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait ActixStorage {
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Error>;
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, BoxError>;
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>;
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, BoxError>;
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error>;
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError>;
|
||||
|
||||
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error>;
|
||||
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), BoxError>;
|
||||
}
|
||||
|
||||
pub(crate) struct StorageWrapper<S>(pub(crate) S)
|
||||
|
@ -24,19 +23,19 @@ where
|
|||
S: Storage + Send + Sync,
|
||||
S::Error: Send + Sync + 'static,
|
||||
{
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Error> {
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, BoxError> {
|
||||
Ok(self.0.push(job).await?)
|
||||
}
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error> {
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, BoxError> {
|
||||
Ok(self.0.pop(queue, runner_id).await?)
|
||||
}
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error> {
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError> {
|
||||
Ok(self.0.heartbeat(job_id, runner_id).await?)
|
||||
}
|
||||
|
||||
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error> {
|
||||
async fn complete(&self, ret: ReturnJobInfo) -> Result<(), BoxError> {
|
||||
self.0.complete(ret).await?;
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-core"
|
||||
description = "Core types for implementing an asynchronous jobs processor"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -18,7 +18,6 @@ completion-logging = []
|
|||
error-logging = []
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.24"
|
||||
event-listener = "4"
|
||||
metrics = "0.22.0"
|
||||
|
|
71
jobs-core/src/box_error.rs
Normal file
71
jobs-core/src/box_error.rs
Normal file
|
@ -0,0 +1,71 @@
|
|||
/// A simple error box that provides no additional formatting utilities
|
||||
pub struct BoxError {
|
||||
error: Box<dyn std::error::Error + Send + Sync>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for BoxError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.error.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BoxError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.error.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> From<E> for BoxError
|
||||
where
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
fn from(error: E) -> Self {
|
||||
BoxError {
|
||||
error: Box::new(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BoxError> for Box<dyn std::error::Error + Send + Sync> {
|
||||
fn from(value: BoxError) -> Self {
|
||||
value.error
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BoxError> for Box<dyn std::error::Error + Send> {
|
||||
fn from(value: BoxError) -> Self {
|
||||
value.error
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BoxError> for Box<dyn std::error::Error> {
|
||||
fn from(value: BoxError) -> Self {
|
||||
value.error
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<dyn std::error::Error + Send + Sync> for BoxError {
|
||||
fn as_ref(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
|
||||
self.error.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<dyn std::error::Error + Send> for BoxError {
|
||||
fn as_ref(&self) -> &(dyn std::error::Error + Send + 'static) {
|
||||
self.error.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<dyn std::error::Error> for BoxError {
|
||||
fn as_ref(&self) -> &(dyn std::error::Error + 'static) {
|
||||
self.error.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for BoxError {
|
||||
type Target = dyn std::error::Error + Send + Sync;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.error.as_ref()
|
||||
}
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
use crate::{Backoff, JobError, MaxRetries, NewJobInfo};
|
||||
use anyhow::Error;
|
||||
use crate::{Backoff, BoxError, JobError, MaxRetries, NewJobInfo};
|
||||
use serde::{de::DeserializeOwned, ser::Serialize};
|
||||
use serde_json::Value;
|
||||
use std::{future::Future, pin::Pin, time::SystemTime};
|
||||
|
@ -15,8 +14,7 @@ use tracing::{Instrument, Span};
|
|||
/// ### Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use anyhow::Error;
|
||||
/// use background_jobs_core::{Job, new_job};
|
||||
/// use background_jobs_core::{Job, new_job, BoxError};
|
||||
/// use tracing::info;
|
||||
/// use std::future::{ready, Ready};
|
||||
///
|
||||
|
@ -27,7 +25,8 @@ use tracing::{Instrument, Span};
|
|||
///
|
||||
/// impl Job for MyJob {
|
||||
/// type State = ();
|
||||
/// type Future = Ready<Result<(), Error>>;
|
||||
/// type Error = BoxError;
|
||||
/// type Future = Ready<Result<(), BoxError>>;
|
||||
///
|
||||
/// const NAME: &'static str = "MyJob";
|
||||
///
|
||||
|
@ -38,7 +37,7 @@ use tracing::{Instrument, Span};
|
|||
/// }
|
||||
/// }
|
||||
///
|
||||
/// fn main() -> Result<(), Error> {
|
||||
/// fn main() -> Result<(), BoxError> {
|
||||
/// let job = new_job(MyJob { count: 1234 })?;
|
||||
///
|
||||
/// Ok(())
|
||||
|
@ -48,8 +47,11 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
|
|||
/// The application state provided to this job at runtime.
|
||||
type State: Clone + 'static;
|
||||
|
||||
/// The error type this job returns
|
||||
type Error: Into<BoxError>;
|
||||
|
||||
/// The future returned by this job
|
||||
type Future: Future<Output = Result<(), Error>> + Send;
|
||||
type Future: Future<Output = Result<(), Self::Error>> + Send;
|
||||
|
||||
/// The name of the job
|
||||
///
|
||||
|
@ -127,7 +129,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
|
|||
}
|
||||
|
||||
/// A provided method to create a new JobInfo from provided arguments
|
||||
pub fn new_job<J>(job: J) -> Result<NewJobInfo, Error>
|
||||
pub fn new_job<J>(job: J) -> Result<NewJobInfo, BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
@ -144,7 +146,7 @@ where
|
|||
}
|
||||
|
||||
/// Create a NewJobInfo to schedule a job to be performed after a certain time
|
||||
pub fn new_scheduled_job<J>(job: J, after: SystemTime) -> Result<NewJobInfo, Error>
|
||||
pub fn new_scheduled_job<J>(job: J, after: SystemTime) -> Result<NewJobInfo, BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
@ -175,13 +177,13 @@ where
|
|||
Box::pin(async move {
|
||||
let (fut, span) = res?;
|
||||
|
||||
if let Some(span) = span {
|
||||
fut.instrument(span).await?;
|
||||
let res = if let Some(span) = span {
|
||||
fut.instrument(span).await
|
||||
} else {
|
||||
fut.await?;
|
||||
}
|
||||
fut.await
|
||||
};
|
||||
|
||||
Ok(())
|
||||
res.map_err(Into::into).map_err(JobError::Processing)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -6,8 +6,7 @@
|
|||
//! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs
|
||||
//! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate.
|
||||
|
||||
use anyhow::Error;
|
||||
|
||||
mod box_error;
|
||||
mod catch_unwind;
|
||||
mod job;
|
||||
mod job_info;
|
||||
|
@ -16,6 +15,7 @@ mod storage;
|
|||
mod unsend_job;
|
||||
|
||||
pub use crate::{
|
||||
box_error::BoxError,
|
||||
job::{new_job, new_scheduled_job, process, Job},
|
||||
job_info::{JobInfo, NewJobInfo, ReturnJobInfo},
|
||||
processor_map::{CachedProcessorMap, ProcessorMap},
|
||||
|
@ -28,8 +28,8 @@ pub use unsend_job::{JoinError, UnsendJob, UnsendSpawner};
|
|||
/// The error type returned by the `process` method
|
||||
pub enum JobError {
|
||||
/// Some error occurred while processing the job
|
||||
#[error("Error performing job: {0}")]
|
||||
Processing(#[from] Error),
|
||||
#[error("{0}")]
|
||||
Processing(#[from] BoxError),
|
||||
|
||||
/// Creating a `Job` type from the provided `serde_json::Value` failed
|
||||
#[error("Could not make JSON value from arguments")]
|
||||
|
|
|
@ -188,10 +188,11 @@ where
|
|||
ReturnJobInfo::pass(id)
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let display = format!("{}", e);
|
||||
let debug = format!("{:?}", e);
|
||||
let display = format!("{e}");
|
||||
span.record("exception.message", &tracing::field::display(&display));
|
||||
let debug = format!("{e:?}");
|
||||
span.record("exception.details", &tracing::field::display(&debug));
|
||||
|
||||
#[cfg(feature = "error-logging")]
|
||||
tracing::warn!("Job {queue}: {name}-{id} errored");
|
||||
ReturnJobInfo::fail(id)
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use crate::{Backoff, Job, MaxRetries};
|
||||
use anyhow::Error;
|
||||
use crate::{Backoff, BoxError, Job, MaxRetries};
|
||||
use serde::{de::DeserializeOwned, ser::Serialize};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
|
@ -45,10 +44,13 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
|
|||
/// The application state provided to this job at runtime.
|
||||
type State: Clone + 'static;
|
||||
|
||||
/// The error type this job returns
|
||||
type Error: Into<BoxError>;
|
||||
|
||||
/// The future returned by this job
|
||||
///
|
||||
/// Importantly, this Future does not require Send
|
||||
type Future: Future<Output = Result<(), Error>>;
|
||||
type Future: Future<Output = Result<(), Self::Error>>;
|
||||
|
||||
/// The spawner type that will be used to spawn the unsend future
|
||||
type Spawner: UnsendSpawner;
|
||||
|
@ -148,7 +150,8 @@ where
|
|||
T: UnsendJob,
|
||||
{
|
||||
type State = T::State;
|
||||
type Future = UnwrapFuture<<T::Spawner as UnsendSpawner>::Handle<Result<(), Error>>>;
|
||||
type Error = BoxError;
|
||||
type Future = UnwrapFuture<<T::Spawner as UnsendSpawner>::Handle<Result<(), Self::Error>>>;
|
||||
|
||||
const NAME: &'static str = <Self as UnsendJob>::NAME;
|
||||
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
|
||||
|
@ -158,7 +161,8 @@ where
|
|||
|
||||
fn run(self, state: Self::State) -> Self::Future {
|
||||
UnwrapFuture(T::Spawner::spawn(
|
||||
UnsendJob::run(self, state).instrument(Span::current()),
|
||||
async move { UnsendJob::run(self, state).await.map_err(Into::into) }
|
||||
.instrument(Span::current()),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-metrics"
|
||||
description = "Metrics subscriber for accessing metrics produced by background jobs"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -13,7 +13,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
async-trait = "0.1.24"
|
||||
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
|
||||
background-jobs-core = { version = "0.18.0", path = "../jobs-core" }
|
||||
metrics = "0.22.0"
|
||||
metrics-util = "0.16.0"
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-postgres"
|
||||
description = "Postgres storage backend for background-jobs"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -14,7 +14,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
async-trait = "0.1.24"
|
||||
background-jobs-core = { version = "0.17.0-beta.1", path = "../jobs-core" }
|
||||
background-jobs-core = { version = "0.18.0", path = "../jobs-core" }
|
||||
barrel = { version = "0.7.0", features = ["pg"] }
|
||||
dashmap = "5.5.3"
|
||||
deadpool = { version = "0.9", features = ["rt_tokio_1"] }
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-sled"
|
||||
description = "Sled storage backend for background-jobs"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -13,7 +13,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
async-trait = "0.1.24"
|
||||
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
|
||||
background-jobs-core = { version = "0.18.0", path = "../jobs-core" }
|
||||
bincode = "1.2"
|
||||
sled = "0.34"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-tokio"
|
||||
description = "in-process jobs processor based on Tokio"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
@ -12,9 +12,8 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.79"
|
||||
async-trait = "0.1.77"
|
||||
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
|
||||
background-jobs-core = { version = "0.18.0", path = "../jobs-core" }
|
||||
metrics = "0.22.0"
|
||||
serde = "1.0.195"
|
||||
serde_json = "1.0.111"
|
||||
|
|
|
@ -12,8 +12,7 @@
|
|||
//!
|
||||
//! ### Example
|
||||
//! ```rust
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs_core::{Backoff, Job, MaxRetries};
|
||||
//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError};
|
||||
//! use background_jobs_tokio::{TokioTimer, WorkerConfig};
|
||||
//! use std::future::{ready, Ready};
|
||||
//!
|
||||
|
@ -31,7 +30,7 @@
|
|||
//! }
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() -> Result<(), Error> {
|
||||
//! async fn main() -> Result<(), BoxError> {
|
||||
//! // Set up our Storage
|
||||
//! // For this example, we use the default in-memory storage mechanism
|
||||
//! use background_jobs_core::memory_storage::Storage;
|
||||
|
@ -74,7 +73,7 @@
|
|||
//!
|
||||
//! impl Job for MyJob {
|
||||
//! type State = MyState;
|
||||
//! type Future = Ready<Result<(), Error>>;
|
||||
//! type Future = Ready<Result<(), BoxError>>;
|
||||
//!
|
||||
//! // The name of the job. It is super important that each job has a unique name,
|
||||
//! // because otherwise one job will overwrite another job when they're being
|
||||
|
@ -115,9 +114,9 @@
|
|||
//! }
|
||||
//! ```
|
||||
|
||||
use anyhow::Error;
|
||||
use background_jobs_core::{
|
||||
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait,
|
||||
memory_storage::Timer, new_job, new_scheduled_job, BoxError, Job, ProcessorMap,
|
||||
Storage as StorageTrait,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
|
@ -314,7 +313,7 @@ impl QueueHandle {
|
|||
///
|
||||
/// This job will be sent to the server for storage, and will execute whenever a worker for the
|
||||
/// job's queue is free to do so.
|
||||
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
|
||||
pub async fn queue<J>(&self, job: J) -> Result<(), BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
@ -327,7 +326,7 @@ impl QueueHandle {
|
|||
///
|
||||
/// This job will be sent to the server for storage, and will execute after the specified time
|
||||
/// and when a worker for the job's queue is free to do so.
|
||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
|
||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), BoxError>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait};
|
||||
use background_jobs_core::{BoxError, JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait TokioStorage: Send + Sync {
|
||||
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid>;
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, BoxError>;
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo>;
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, BoxError>;
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> anyhow::Result<()>;
|
||||
async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> Result<(), BoxError>;
|
||||
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()>;
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), BoxError>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -26,19 +26,22 @@ impl<S> TokioStorage for StorageWrapper<S>
|
|||
where
|
||||
S: StorageTrait + Send + Sync + 'static,
|
||||
{
|
||||
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid> {
|
||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, BoxError> {
|
||||
self.0.push(job).await.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo> {
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, BoxError> {
|
||||
self.0.pop(queue, runner_id).await.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> anyhow::Result<()> {
|
||||
self.0.heartbeat(job_id, runner_id).await.map_err(From::from)
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError> {
|
||||
self.0
|
||||
.heartbeat(job_id, runner_id)
|
||||
.await
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()> {
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), BoxError> {
|
||||
self.0
|
||||
.complete(return_job_info)
|
||||
.await
|
||||
|
|
20
src/lib.rs
20
src/lib.rs
|
@ -29,7 +29,6 @@
|
|||
//! ```toml
|
||||
//! [dependencies]
|
||||
//! actix-rt = "2.6.0"
|
||||
//! anyhow = "1.0"
|
||||
//! background-jobs = "0.15.0"
|
||||
//! serde = { version = "1.0", features = ["derive"] }
|
||||
//! ```
|
||||
|
@ -39,8 +38,7 @@
|
|||
//! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`.
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs::Job;
|
||||
//! use background_jobs::[Job, BoxError};
|
||||
//! use std::future::{ready, Ready};
|
||||
//!
|
||||
//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
|
@ -60,7 +58,8 @@
|
|||
//!
|
||||
//! impl Job for MyJob {
|
||||
//! type State = ();
|
||||
//! type Future = Ready<Result<(), Error>>;
|
||||
//! type Error = BoxError;
|
||||
//! type Future = Ready<Result<(), BoxError>>;
|
||||
//!
|
||||
//! const NAME: &'static str = "MyJob";
|
||||
//!
|
||||
|
@ -80,8 +79,7 @@
|
|||
//! Let's re-define the job to care about some application state.
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs::Job;
|
||||
//! use background_jobs::[Job, BoxError};
|
||||
//! use std::future::{ready, Ready};
|
||||
//!
|
||||
//! #[derive(Clone, Debug)]
|
||||
|
@ -99,7 +97,8 @@
|
|||
//!
|
||||
//! impl Job for MyJob {
|
||||
//! type State = MyState;
|
||||
//! type Future = Ready<Result<(), Error>>;
|
||||
//! type Error = BoxError;
|
||||
//! type Future = Ready<Result<(), BoxError>>;
|
||||
//!
|
||||
//! const NAME: &'static str = "MyJob";
|
||||
//!
|
||||
|
@ -123,11 +122,10 @@
|
|||
//!
|
||||
//! ##### Main
|
||||
//! ```rust,ignore
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs::{ServerConfig, memory_storage::Storage, WorkerConfig};
|
||||
//! use background_jobs::{ServerConfig, memory_storage::Storage, actix::WorkerConfig, BoxError};
|
||||
//!
|
||||
//! #[actix_rt::main]
|
||||
//! async fn main() -> Result<(), Error> {
|
||||
//! async fn main() -> Result<(), BoxError> {
|
||||
//! // Set up our Storage
|
||||
//! let storage = Storage::new();
|
||||
//!
|
||||
|
@ -173,7 +171,7 @@
|
|||
//! | `completion-logging` | Enables a tracing event that occurs whenever a job completes |
|
||||
//! | `error-logging` | Enables a tracing event that occurs whenever a job fails |
|
||||
|
||||
pub use background_jobs_core::{Backoff, Job, MaxRetries, UnsendJob, UnsendSpawner};
|
||||
pub use background_jobs_core::{Backoff, BoxError, Job, MaxRetries, UnsendJob, UnsendSpawner};
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod metrics {
|
||||
|
|
Loading…
Reference in a new issue