Wait all workers to stop gracefully

This commit is contained in:
Rafael Caricio 2023-03-12 00:18:15 +01:00
parent eed21d265d
commit 2964dc2b88
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
3 changed files with 27 additions and 4 deletions

View file

@ -5,6 +5,7 @@ authors = [
"Rafael Caricio <rafael@caricio.com>", "Rafael Caricio <rafael@caricio.com>",
] ]
description = "Async persistent background task processing for Rust applications with Tokio and PostgreSQL." description = "Async persistent background task processing for Rust applications with Tokio and PostgreSQL."
keywords = ["async", "background", "task", "jobs", "queue", "diesel", "postgres", "postgresql", "tokio"]
repository = "https://code.caric.io/rafaelcaricio/backie" repository = "https://code.caric.io/rafaelcaricio/backie"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"

View file

@ -44,6 +44,7 @@ If you are not already using, you will also want to include the following depend
```toml ```toml
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
anyhow = "1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] } diesel-async = { version = "0.2", features = ["postgres", "bb8"] }
@ -130,13 +131,19 @@ We also defined in the `start` method that the worker pool should run forever.
### Queueing tasks ### Queueing tasks
After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks: After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible
to directly create a [`Queue`] instance from with a [`TaskStore`] instance.
```rust ```rust
let queue = Queue::new(task_store);
let task = MyTask { info: "Hello world!".to_string() }; let task = MyTask { info: "Hello world!".to_string() };
queue.enqueue(task).await.unwrap(); queue.enqueue(task).await.unwrap();
``` ```
This will enqueue the task and whenever a worker is available it will start processing it. Workers don't need to be
started before enqueuing tasks. Workers don't need to be in the same process as the queue as long as the workers have
access to the same underlying storage system.
## Contributing ## Contributing
1. [Fork it!](https://github.com/rafaelcaricio/backie/fork) 1. [Fork it!](https://github.com/rafaelcaricio/backie/fork)

View file

@ -5,6 +5,7 @@ use crate::store::TaskStore;
use crate::worker::{runnable, ExecuteTaskFn}; use crate::worker::{runnable, ExecuteTaskFn};
use crate::worker::{StateFn, Worker}; use crate::worker::{StateFn, Worker};
use crate::RetentionMode; use crate::RetentionMode;
use futures::future::join_all;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
@ -106,6 +107,8 @@ where
let (tx, rx) = tokio::sync::watch::channel(()); let (tx, rx) = tokio::sync::watch::channel(());
let mut worker_handles = Vec::new();
// Spawn all individual workers per queue // Spawn all individual workers per queue
for (queue_name, (retention_mode, num_workers)) in self.worker_queues.iter() { for (queue_name, (retention_mode, num_workers)) in self.worker_queues.iter() {
for idx in 0..*num_workers { for idx in 0..*num_workers {
@ -118,13 +121,14 @@ where
Some(rx.clone()), Some(rx.clone()),
); );
let worker_name = format!("worker-{queue_name}-{idx}"); let worker_name = format!("worker-{queue_name}-{idx}");
// TODO: grab the join handle for every worker for graceful shutdown // grabs the join handle for every worker for graceful shutdown
tokio::spawn(async move { let join_handle = tokio::spawn(async move {
match worker.run_tasks().await { match worker.run_tasks().await {
Ok(()) => log::info!("Worker {worker_name} stopped successfully"), Ok(()) => log::info!("Worker {worker_name} stopped successfully"),
Err(err) => log::error!("Worker {worker_name} stopped due to error: {err}"), Err(err) => log::error!("Worker {worker_name} stopped due to error: {err}"),
} }
}); });
worker_handles.push(join_handle);
} }
} }
@ -134,7 +138,18 @@ where
if let Err(err) = tx.send(()) { if let Err(err) = tx.send(()) {
log::warn!("Failed to send shutdown signal to worker pool: {}", err); log::warn!("Failed to send shutdown signal to worker pool: {}", err);
} else { } else {
log::info!("Worker pool stopped gracefully"); // Wait for all workers to finish processing
let results = join_all(worker_handles)
.await
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.collect::<Vec<_>>();
if !results.is_empty() {
log::error!("Worker pool stopped with errors: {:?}", results);
} else {
log::info!("Worker pool stopped gracefully");
}
} }
}), }),
self.queue, self.queue,