diff --git a/Cargo.toml b/Cargo.toml index a817322..c5a41d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ authors = [ "Rafael Caricio ", ] description = "Async persistent background task processing for Rust applications with Tokio and PostgreSQL." +keywords = ["async", "background", "task", "jobs", "queue", "diesel", "postgres", "postgresql", "tokio"] repository = "https://code.caric.io/rafaelcaricio/backie" edition = "2021" license = "MIT" diff --git a/README.md b/README.md index 90a171d..da7bfc6 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ If you are not already using, you will also want to include the following depend ```toml [dependencies] async-trait = "0.1" +anyhow = "1" serde = { version = "1.0", features = ["derive"] } diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } diesel-async = { version = "0.2", features = ["postgres", "bb8"] } @@ -130,13 +131,19 @@ We also defined in the `start` method that the worker pool should run forever. ### Queueing tasks -After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks: +After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible +to directly create a [`Queue`] instance from with a [`TaskStore`] instance. ```rust +let queue = Queue::new(task_store); let task = MyTask { info: "Hello world!".to_string() }; queue.enqueue(task).await.unwrap(); ``` +This will enqueue the task and whenever a worker is available it will start processing it. Workers don't need to be +started before enqueuing tasks. Workers don't need to be in the same process as the queue as long as the workers have +access to the same underlying storage system. + ## Contributing 1. [Fork it!](https://github.com/rafaelcaricio/backie/fork) diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 57936da..05edf36 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -5,6 +5,7 @@ use crate::store::TaskStore; use crate::worker::{runnable, ExecuteTaskFn}; use crate::worker::{StateFn, Worker}; use crate::RetentionMode; +use futures::future::join_all; use std::collections::BTreeMap; use std::future::Future; use std::sync::Arc; @@ -106,6 +107,8 @@ where let (tx, rx) = tokio::sync::watch::channel(()); + let mut worker_handles = Vec::new(); + // Spawn all individual workers per queue for (queue_name, (retention_mode, num_workers)) in self.worker_queues.iter() { for idx in 0..*num_workers { @@ -118,13 +121,14 @@ where Some(rx.clone()), ); let worker_name = format!("worker-{queue_name}-{idx}"); - // TODO: grab the join handle for every worker for graceful shutdown - tokio::spawn(async move { + // grabs the join handle for every worker for graceful shutdown + let join_handle = tokio::spawn(async move { match worker.run_tasks().await { Ok(()) => log::info!("Worker {worker_name} stopped successfully"), Err(err) => log::error!("Worker {worker_name} stopped due to error: {err}"), } }); + worker_handles.push(join_handle); } } @@ -134,7 +138,18 @@ where if let Err(err) = tx.send(()) { log::warn!("Failed to send shutdown signal to worker pool: {}", err); } else { - log::info!("Worker pool stopped gracefully"); + // Wait for all workers to finish processing + let results = join_all(worker_handles) + .await + .into_iter() + .filter(Result::is_err) + .map(Result::unwrap_err) + .collect::>(); + if !results.is_empty() { + log::error!("Worker pool stopped with errors: {:?}", results); + } else { + log::info!("Worker pool stopped gracefully"); + } } }), self.queue,