diff --git a/Cargo.toml b/Cargo.toml index b6069ac..8a857dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,9 @@ members = [ "jobs-actix", "jobs-core", "jobs-sled", - "examples/actix-example", + "examples/basic-example", "examples/managed-example", + "examples/panic-example", ] [features] diff --git a/README.md b/README.md index 18e839b..52bc8bf 100644 --- a/README.md +++ b/README.md @@ -136,14 +136,11 @@ async fn main() -> Result<(), Error> { use background_jobs::memory_storage::Storage; let storage = Storage::new(); - // Start the application server. This guards access to to the jobs store - let queue_handle = create_server(storage); - // Configure and start our workers - WorkerConfig::new(move || MyState::new("My App")) + let queue_handle = WorkerConfig::new(move || MyState::new("My App")) .register::() .set_worker_count(DEFAULT_QUEUE, 16) - .start(queue_handle.clone()); + .start(storage); // Queue our jobs queue_handle.queue(MyJob::new(1, 2))?; diff --git a/examples/actix-example/.gitignore b/examples/basic-example/.gitignore similarity index 100% rename from examples/actix-example/.gitignore rename to examples/basic-example/.gitignore diff --git a/examples/actix-example/Cargo.toml b/examples/basic-example/Cargo.toml similarity index 96% rename from examples/actix-example/Cargo.toml rename to examples/basic-example/Cargo.toml index b912c18..838c189 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "actix-example" +name = "basic-example" version = "0.1.0" authors = ["asonix "] edition = "2021" diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs new file mode 100644 index 0000000..7d1fa89 --- /dev/null +++ b/examples/basic-example/src/main.rs @@ -0,0 +1,105 @@ +use actix_rt::Arbiter; +use anyhow::Error; +use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; +use background_jobs_sled_storage::Storage; +use chrono::{Duration, Utc}; +use std::future::{ready, Ready}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +const DEFAULT_QUEUE: &str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + + // Set up our Storage + let db = sled::Config::new().temporary(true).open()?; + let storage = Storage::new(db)?; + + let arbiter = Arbiter::new(); + + // Configure and start our workers + let queue_handle = WorkerConfig::new(move || MyState::new("My App")) + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start_in_arbiter(&arbiter.handle(), storage); + + // Queue our jobs + queue_handle.queue(MyJob::new(1, 2)).await?; + queue_handle.queue(MyJob::new(3, 4)).await?; + queue_handle.queue(MyJob::new(5, 6)).await?; + queue_handle + .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + arbiter.stop(); + let _ = arbiter.join(); + + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +#[async_trait::async_trait] +impl Job for MyJob { + type State = MyState; + type Future = Ready>; + + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being + // registered. + const NAME: &'static str = "MyJob"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + ready(Ok(())) + } +} diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index e8b72c5..5c90707 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -1,6 +1,6 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{ActixJob as Job, Manager, MaxRetries, WorkerConfig}; +use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; use chrono::{Duration, Utc}; use std::future::{ready, Ready}; @@ -36,20 +36,17 @@ async fn main() -> Result<(), Error> { let storage = Storage::new(db)?; // Configure and start our workers - let worker_config = WorkerConfig::new(move || MyState::new("My App")) + let manager = WorkerConfig::new(move || MyState::new("My App")) .register::() .register::() - .set_worker_count(DEFAULT_QUEUE, 16); - - // Start the application server. This guards access to to the jobs store - let manager = Manager::new(storage, worker_config); + .set_worker_count(DEFAULT_QUEUE, 16) + .managed(storage); // Queue our jobs - manager.queue_handle().queue(MyJob::new(1, 2)).await?; - manager.queue_handle().queue(MyJob::new(3, 4)).await?; - manager.queue_handle().queue(MyJob::new(5, 6)).await?; + manager.queue(MyJob::new(1, 2)).await?; + manager.queue(MyJob::new(3, 4)).await?; + manager.queue(MyJob::new(5, 6)).await?; manager - .queue_handle() .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) .await?; @@ -57,17 +54,16 @@ async fn main() -> Result<(), Error> { actix_rt::signal::ctrl_c().await?; // kill the current arbiter - manager.queue_handle().queue(StopJob).await?; + manager.queue(StopJob).await?; // Block on Actix actix_rt::signal::ctrl_c().await?; // See that the workers have respawned - manager.queue_handle().queue(MyJob::new(1, 2)).await?; - manager.queue_handle().queue(MyJob::new(3, 4)).await?; - manager.queue_handle().queue(MyJob::new(5, 6)).await?; + manager.queue(MyJob::new(1, 2)).await?; + manager.queue(MyJob::new(3, 4)).await?; + manager.queue(MyJob::new(5, 6)).await?; manager - .queue_handle() .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) .await?; diff --git a/examples/panic-example/.gitignore b/examples/panic-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/panic-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml new file mode 100644 index 0000000..8bb4c43 --- /dev/null +++ b/examples/panic-example/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "panic-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-rt = "2.0.0" +anyhow = "1.0" +async-trait = "0.1.24" +background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.2", features = ["fmt"] } +serde = { version = "1.0", features = ["derive"] } +sled = "0.34" diff --git a/examples/actix-example/src/main.rs b/examples/panic-example/src/main.rs similarity index 89% rename from examples/actix-example/src/main.rs rename to examples/panic-example/src/main.rs index 2932afc..9c62572 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/panic-example/src/main.rs @@ -1,6 +1,6 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{create_server_in_arbiter, ActixJob as Job, MaxRetries, WorkerConfig}; +use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; use chrono::{Duration, Utc}; use std::future::{ready, Ready}; @@ -37,15 +37,12 @@ async fn main() -> Result<(), Error> { let arbiter = Arbiter::new(); - // Start the application server. This guards access to to the jobs store - let queue_handle = create_server_in_arbiter(arbiter.handle(), storage); - // Configure and start our workers - WorkerConfig::new(move || MyState::new("My App")) + let queue_handle = WorkerConfig::new(move || MyState::new("My App")) .register::() .register::() .set_worker_count(DEFAULT_QUEUE, 16) - .start_in_arbiter(&arbiter.handle(), queue_handle.clone()); + .start_in_arbiter(&arbiter.handle(), storage); // Queue some panicking job for _ in 0..32 { diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 8c22055..e44716b 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -36,14 +36,11 @@ //! use background_jobs::memory_storage::Storage; //! let storage = Storage::new(); //! -//! // Start the application server. This guards access to to the jobs store -//! let queue_handle = create_server(storage); -//! //! // Configure and start our workers -//! WorkerConfig::new(move || MyState::new("My App")) +//! let queue_handle = WorkerConfig::new(move || MyState::new("My App")) //! .register::() //! .set_worker_count(DEFAULT_QUEUE, 16) -//! .start(queue_handle.clone()); +//! .start(storage); //! //! // Queue our jobs //! queue_handle.queue(MyJob::new(1, 2))?; @@ -151,7 +148,7 @@ impl Manager { /// /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it /// spins up another one and spawns the workers again - pub fn new(storage: S, worker_config: WorkerConfig) -> Self + fn new(storage: S, worker_config: WorkerConfig) -> Self where S: Storage + Sync + 'static, State: Clone, @@ -188,7 +185,8 @@ impl Manager { drop(drop_notifier); // Assume arbiter is dead if we were notified - if arbiter.spawn(async {}) { + let online = arbiter.spawn(async {}); + if online { panic!("Arbiter should be dead by now"); } @@ -214,6 +212,14 @@ impl Manager { } } +impl Deref for Manager { + type Target = QueueHandle; + + fn deref(&self) -> &Self::Target { + &self.queue_handle + } +} + impl Deref for ArbiterDropper { type Target = Arbiter; @@ -255,23 +261,7 @@ impl Drop for DropNotifier { /// In previous versions of this library, the server itself was run on it's own dedicated threads /// and guarded access to jobs via messages. Since we now have futures-aware synchronization /// primitives, the Server has become an object that gets shared between client threads. -/// -/// This method will panic if not called from an actix runtime -pub fn create_server(storage: S) -> QueueHandle -where - S: Storage + Sync + 'static, -{ - create_server_in_arbiter(Arbiter::current(), storage) -} - -/// Create a new Server -/// -/// In previous versions of this library, the server itself was run on it's own dedicated threads -/// and guarded access to jobs via messages. Since we now have futures-aware synchronization -/// primitives, the Server has become an object that gets shared between client threads. -/// -/// This method will panic if not called from an actix runtime -pub fn create_server_in_arbiter(arbiter: ArbiterHandle, storage: S) -> QueueHandle +fn create_server_in_arbiter(arbiter: ArbiterHandle, storage: S) -> QueueHandle where S: Storage + Sync + 'static, { @@ -285,7 +275,7 @@ where /// In previous versions of this library, the server itself was run on it's own dedicated threads /// and guarded access to jobs via messages. Since we now have futures-aware synchronization /// primitives, the Server has become an object that gets shared between client threads. -pub fn create_server_managed(storage: S) -> QueueHandle +fn create_server_managed(storage: S) -> QueueHandle where S: Storage + Sync + 'static, { @@ -352,17 +342,28 @@ where /// Start the workers in the current arbiter /// /// This method will panic if not called from an actix runtime - pub fn start(self, queue_handle: QueueHandle) { - self.start_in_arbiter(&Arbiter::current(), queue_handle) + pub fn start(self, storage: S) -> QueueHandle { + self.start_in_arbiter(&Arbiter::current(), storage) } /// Start the workers in the provided arbiter - pub fn start_in_arbiter(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) { - self.start_managed(arbiter, queue_handle, &()) + pub fn start_in_arbiter( + self, + arbiter: &ArbiterHandle, + storage: S, + ) -> QueueHandle { + let queue_handle = create_server_in_arbiter(arbiter.clone(), storage); + self.start_managed(arbiter, queue_handle.clone(), &()); + queue_handle + } + + /// Start the workers on a managed arbiter, and return the manager struct + pub fn managed(self, storage: S) -> Manager { + Manager::new(storage, self) } /// Start a workers in a managed way - pub fn start_managed( + fn start_managed( &self, arbiter: &ArbiterHandle, queue_handle: QueueHandle, diff --git a/src/lib.rs b/src/lib.rs index 525c182..e2c10a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,14 +132,11 @@ //! // Set up our Storage //! let storage = Storage::new(); //! -//! // Start the application server. This guards access to to the jobs store -//! let queue_handle = ServerConfig::new(storage).start(); -//! //! // Configure and start our workers -//! WorkerConfig::new(move || MyState::new("My App")) +//! let queue_handle = WorkerConfig::new(move || MyState::new("My App")) //! .register::() //! .set_processor_count(DEFAULT_QUEUE, 16) -//! .start(queue_handle.clone()); +//! .start(storage); //! //! // Queue our jobs //! queue_handle.queue(MyJob::new(1, 2))?; @@ -172,6 +169,4 @@ pub mod dev { } #[cfg(feature = "background-jobs-actix")] -pub use background_jobs_actix::{ - create_server, create_server_in_arbiter, ActixJob, Manager, QueueHandle, WorkerConfig, -}; +pub use background_jobs_actix::{ActixJob, Manager, QueueHandle, WorkerConfig};