From 6665ced67191edac62714e7a65872f6c9969f67f Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 13 Jan 2024 16:58:48 -0500 Subject: [PATCH] Add tokio example, update example imports --- examples/basic-example/Cargo.toml | 5 +- examples/basic-example/src/main.rs | 6 +- examples/long-example/Cargo.toml | 5 +- examples/long-example/src/main.rs | 11 ++- examples/managed-example/Cargo.toml | 5 +- examples/managed-example/src/main.rs | 11 ++- examples/metrics-example/Cargo.toml | 5 +- examples/metrics-example/src/main.rs | 11 +-- examples/panic-example/Cargo.toml | 5 +- examples/panic-example/src/main.rs | 3 +- examples/postgres-example/Cargo.toml | 2 +- examples/postgres-example/src/main.rs | 6 +- examples/tokio-example/.gitignore | 1 + examples/tokio-example/Cargo.toml | 16 ++++ examples/tokio-example/src/main.rs | 105 ++++++++++++++++++++++++++ 15 files changed, 154 insertions(+), 43 deletions(-) create mode 100644 examples/tokio-example/.gitignore create mode 100644 examples/tokio-example/Cargo.toml create mode 100644 examples/tokio-example/src/main.rs diff --git a/examples/basic-example/Cargo.toml b/examples/basic-example/Cargo.toml index 8184ecd..787a960 100644 --- a/examples/basic-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -9,10 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.17.0", path = "../..", features = [ - "error-logging", -] } -background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs index 9849737..ae28c10 100644 --- a/examples/basic-example/src/main.rs +++ b/examples/basic-example/src/main.rs @@ -1,10 +1,10 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ + actix::{Spawner, WorkerConfig}, memory_storage::{ActixTimer, Storage}, - ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig, + MaxRetries, UnsendJob as Job, }; -// use background_jobs_sled_storage::Storage; use std::{ future::{ready, Ready}, time::{Duration, SystemTime}, @@ -85,7 +85,7 @@ impl MyJob { impl Job for MyJob { type State = MyState; type Future = Ready>; - type Spawner = ActixSpawner; + type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml index 80abbc7..fccbcb6 100644 --- a/examples/long-example/Cargo.toml +++ b/examples/long-example/Cargo.toml @@ -9,10 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.17.0", path = "../..", features = [ - "error-logging", -] } -background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/long-example/src/main.rs b/examples/long-example/src/main.rs index 2066f18..a23b885 100644 --- a/examples/long-example/src/main.rs +++ b/examples/long-example/src/main.rs @@ -1,7 +1,10 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig}; -use background_jobs_sled_storage::Storage; +use background_jobs::{ + actix::{Spawner, WorkerConfig}, + sled::Storage, + MaxRetries, UnsendJob as Job, +}; use std::{ future::{ready, Future, Ready}, pin::Pin, @@ -88,7 +91,7 @@ impl MyJob { impl Job for MyJob { type State = MyState; type Future = Ready>; - type Spawner = ActixSpawner; + type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being @@ -118,7 +121,7 @@ impl Job for MyJob { impl Job for LongJob { type State = MyState; type Future = Pin>>>; - type Spawner = ActixSpawner; + type Spawner = Spawner; const NAME: &'static str = "LongJob"; diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml index 37ce76c..85a2a53 100644 --- a/examples/managed-example/Cargo.toml +++ b/examples/managed-example/Cargo.toml @@ -9,10 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.17.0", path = "../..", features = [ - "error-logging", -] } -background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index e5a8736..c5c912e 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -1,7 +1,10 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig}; -use background_jobs_sled_storage::Storage; +use background_jobs::{ + actix::{Spawner, WorkerConfig}, + sled::Storage, + MaxRetries, UnsendJob as Job, +}; use std::{ future::{ready, Ready}, time::{Duration, SystemTime}, @@ -100,7 +103,7 @@ impl MyJob { impl Job for MyJob { type State = MyState; type Future = Ready>; - type Spawner = ActixSpawner; + type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being @@ -130,7 +133,7 @@ impl Job for MyJob { impl Job for StopJob { type State = MyState; type Future = Ready>; - type Spawner = ActixSpawner; + type Spawner = Spawner; const NAME: &'static str = "StopJob"; const QUEUE: &'static str = DEFAULT_QUEUE; diff --git a/examples/metrics-example/Cargo.toml b/examples/metrics-example/Cargo.toml index edfa0f1..dbe91ff 100644 --- a/examples/metrics-example/Cargo.toml +++ b/examples/metrics-example/Cargo.toml @@ -9,10 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.17.0", path = "../..", features = [ - "error-logging", -] } -background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/metrics-example/src/main.rs b/examples/metrics-example/src/main.rs index 45c21bd..e5c966a 100644 --- a/examples/metrics-example/src/main.rs +++ b/examples/metrics-example/src/main.rs @@ -1,14 +1,11 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ + actix::{Spawner, WorkerConfig}, metrics::MetricsStorage, - ActixSpawner, - MaxRetries, - // memory_storage::{ActixTimer, Storage}, - UnsendJob as Job, - WorkerConfig, + sled::Storage, + MaxRetries, UnsendJob as Job, }; -use background_jobs_sled_storage::Storage; use std::{future::Future, pin::Pin, time::Duration}; use tracing::info; use tracing_subscriber::EnvFilter; @@ -93,7 +90,7 @@ impl MyJob { impl Job for MyJob { type State = MyState; type Future = Pin> + 'static>>; - type Spawner = ActixSpawner; + type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml index 25927ba..1fc95e2 100644 --- a/examples/panic-example/Cargo.toml +++ b/examples/panic-example/Cargo.toml @@ -9,10 +9,7 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -background-jobs = { version = "0.17.0", path = "../..", features = [ - "error-logging", -] } -background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } time = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/examples/panic-example/src/main.rs b/examples/panic-example/src/main.rs index ab73d6e..8fc1ad5 100644 --- a/examples/panic-example/src/main.rs +++ b/examples/panic-example/src/main.rs @@ -1,7 +1,6 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{Job, MaxRetries, WorkerConfig}; -use background_jobs_sled_storage::Storage; +use background_jobs::{actix::WorkerConfig, sled::Storage, Job, MaxRetries}; use std::{ future::{ready, Ready}, time::{Duration, SystemTime}, diff --git a/examples/postgres-example/Cargo.toml b/examples/postgres-example/Cargo.toml index a2ffa74..990742b 100644 --- a/examples/postgres-example/Cargo.toml +++ b/examples/postgres-example/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] actix-rt = "2.9.0" anyhow = "1.0.79" -background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." } +background-jobs = { version = "0.17.0", features = ["postgres"], path = "../.." } serde = { version = "1.0.195", features = ["derive"] } tokio = { version = "1.35.1", features = ["full"] } tracing = "0.1.40" diff --git a/examples/postgres-example/src/main.rs b/examples/postgres-example/src/main.rs index c41375b..6d0e03a 100644 --- a/examples/postgres-example/src/main.rs +++ b/examples/postgres-example/src/main.rs @@ -1,6 +1,8 @@ use actix_rt::Arbiter; use background_jobs::{ - postgres::Storage, ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig, + actix::{Spawner, WorkerConfig}, + postgres::Storage, + MaxRetries, UnsendJob as Job, }; // use background_jobs_sled_storage::Storage; use std::{ @@ -89,7 +91,7 @@ impl MyJob { impl Job for MyJob { type State = MyState; type Future = Ready>; - type Spawner = ActixSpawner; + type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being diff --git a/examples/tokio-example/.gitignore b/examples/tokio-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/tokio-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/tokio-example/Cargo.toml b/examples/tokio-example/Cargo.toml new file mode 100644 index 0000000..7e370c1 --- /dev/null +++ b/examples/tokio-example/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "tokio-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio"] } +tokio = { version = "1", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +serde = { version = "1.0", features = ["derive"] } +sled = "0.34" diff --git a/examples/tokio-example/src/main.rs b/examples/tokio-example/src/main.rs new file mode 100644 index 0000000..53a388a --- /dev/null +++ b/examples/tokio-example/src/main.rs @@ -0,0 +1,105 @@ +use anyhow::Error; +use background_jobs::{ + memory_storage::{Storage, TokioTimer}, + tokio::WorkerConfig, + Job, MaxRetries, +}; +use std::{ + future::{ready, Ready}, + time::{Duration, SystemTime}, +}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +const DEFAULT_QUEUE: &str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + + // Set up our Storage + // let db = sled::Config::new().temporary(true).open()?; + let storage = Storage::new(TokioTimer); + + // Configure and start our workers + let queue_handle = WorkerConfig::new(storage, |_| MyState::new("My App")) + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start(); + + // Queue our jobs + queue_handle.queue(MyJob::new(1, 2)).await?; + queue_handle.queue(MyJob::new(3, 4)).await?; + queue_handle.queue(MyJob::new(5, 6)).await?; + for i in 0..20 { + queue_handle + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(i)) + .await?; + } + + // Block on Tokio + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +impl Job for MyJob { + type State = MyState; + type Future = Ready>; + + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being + // registered. + const NAME: &'static str = "MyJob"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + ready(Ok(())) + } +}