Allow customization of the pulling interval per queue

This commit is contained in:
Rafael Caricio 2023-03-12 17:15:40 +01:00
parent 82e6ef6dac
commit 10e01390b8
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
9 changed files with 217 additions and 180 deletions

View file

@ -12,9 +12,6 @@ license = "MIT"
readme = "README.md"
rust-version = "1.67"
[lib]
doctest = false
[dependencies]
chrono = "0.4"
log = "0.4"

View file

@ -17,13 +17,14 @@ Backie started as a fork of
Here are some of the Backie's key features:
- Async workers: Workers are started as [Tokio](https://tokio.rs/) tasks
- Application context: Tasks can access an shared user-provided application context
- Single-purpose workers: Tasks are stored together but workers are configured to execute only tasks of a specific queue
- Retries: Tasks are retried with a custom backoff mode
- Graceful shutdown: provide a future to gracefully shutdown the workers, on-the-fly tasks are not interrupted
- Recovery of unfinished tasks: Tasks that were not finished are retried on the next worker start
- Unique tasks: Tasks are not duplicated in the queue if they provide a unique hash
- **Guaranteed execution**: at least one execution of a task
- **Async workers**: Workers are started as [Tokio](https://tokio.rs/) tasks
- **Application context**: Tasks can access an shared user-provided application context
- **Single-purpose workers**: Tasks are stored together but workers are configured to execute only tasks of a specific queue
- **Retries**: Tasks are retried with a custom backoff mode
- **Graceful shutdown**: provide a future to gracefully shutdown the workers, on-the-fly tasks are not interrupted
- **Recovery of unfinished tasks**: Tasks that were not finished are retried on the next worker start
- **Unique tasks**: Tasks are not duplicated in the queue if they provide a unique hash
## Other planned features
@ -99,50 +100,19 @@ First, we need to create a [`TaskStore`] trait instance. This is the object resp
tasks from a database. Backie currently only supports Postgres as a storage backend via the provided
[`PgTaskStore`]. You can implement other storage backends by implementing the [`TaskStore`] trait.
```rust
let connection_url = "postgres://postgres:password@localhost/backie";
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(connection_url);
let pool = Pool::builder()
.max_size(3)
.build(manager)
.await
.unwrap();
let task_store = PgTaskStore::new(pool);
```
Then, we can use the `task_store` to start a worker pool using the [`WorkerPool`]. The [`WorkerPool`] is responsible
for starting the workers and managing their lifecycle.
```rust
// Register the task types I want to use and start the worker pool
let (_, queue) = WorkerPool::new(task_store, |_|())
.register_task_type::<MyTask>()
.configure_queue("default", 1, RetentionMode::default())
.start(futures::future::pending::<()>())
.await
.unwrap();
```
With that, we are defining that we want to execute instances of `MyTask` and that the `default` queue should
have 1 worker running using the default [`RetentionMode`] (remove from the database only successfully finished tasks).
We also defined in the `start` method that the worker pool should run forever.
A full example of starting a worker pool can be found in the [examples directory](https://github.com/rafaelcaricio/backie/blob/main/examples/simple_worker/src/main.rs).
### Queueing tasks
After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible
After stating the workers, we get an instance of [`Queue`] which we can use to enqueue tasks. It is also possible
to directly create a [`Queue`] instance from with a [`TaskStore`] instance.
```rust
let queue = Queue::new(task_store);
let task = MyTask { info: "Hello world!".to_string() };
queue.enqueue(task).await.unwrap();
```
This will enqueue the task and whenever a worker is available it will start processing it. Workers don't need to be
This will enqueue the task and whenever a worker is available it will start processing. Workers don't need to be
started before enqueuing tasks. Workers don't need to be in the same process as the queue as long as the workers have
access to the same underlying storage system.
access to the same underlying storage system. This enables horizontal scaling of the workers.
## Contributing

View file

@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
backie = { path = "../../" }
anyhow = "1"
env_logger = "0.9.0"
env_logger = "0.10"
log = "0.4.0"
tokio = { version = "1", features = ["full"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] }

View file

@ -1,97 +0,0 @@
use async_trait::async_trait;
use backie::{BackgroundTask, CurrentTask};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct MyApplicationContext {
app_name: String,
}
impl MyApplicationContext {
pub fn new(app_name: &str) -> Self {
Self {
app_name: app_name.to_string(),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct MyTask {
pub number: u16,
}
impl MyTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[derive(Serialize, Deserialize)]
pub struct MyFailingTask {
pub number: u16,
}
impl MyFailingTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait]
impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task";
type AppData = MyApplicationContext;
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> {
// let new_task = MyTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
log::info!(
"[{}] Hello from {}! the current number is {}",
task.id(),
ctx.app_name,
self.number
);
tokio::time::sleep(Duration::from_secs(3)).await;
log::info!("[{}] done..", task.id());
Ok(())
}
}
#[async_trait]
impl BackgroundTask for MyFailingTask {
const TASK_NAME: &'static str = "my_failing_task";
type AppData = MyApplicationContext;
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> {
// let new_task = MyFailingTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
// task.id();
// task.keep_alive().await?;
// task.previous_error();
// task.retry_count();
log::info!("[{}] the current number is {}", task.id(), self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
log::info!("[{}] done..", task.id());
//
// let b = true;
//
// if b {
// panic!("Hello!");
// } else {
// Ok(())
// }
Ok(())
}
}

View file

@ -1,9 +1,96 @@
use backie::{PgTaskStore, RetentionMode, WorkerPool};
use async_trait::async_trait;
use backie::{BackgroundTask, CurrentTask};
use backie::{PgTaskStore, Queue, WorkerPool};
use diesel_async::pg::AsyncPgConnection;
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use simple_worker::MyApplicationContext;
use simple_worker::MyFailingTask;
use simple_worker::MyTask;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct MyApplicationContext {
app_name: String,
}
impl MyApplicationContext {
pub fn new(app_name: &str) -> Self {
Self {
app_name: app_name.to_string(),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct MyTask {
pub number: u16,
}
impl MyTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait]
impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task";
type AppData = MyApplicationContext;
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> {
// let new_task = MyTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
log::info!(
"[{}] Hello from {}! the current number is {}",
task.id(),
ctx.app_name,
self.number
);
tokio::time::sleep(Duration::from_secs(3)).await;
log::info!("[{}] done..", task.id());
Ok(())
}
}
#[derive(Serialize, Deserialize)]
pub struct MyFailingTask {
pub number: u16,
}
impl MyFailingTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait]
impl BackgroundTask for MyFailingTask {
const TASK_NAME: &'static str = "my_failing_task";
type AppData = MyApplicationContext;
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> {
// let new_task = MyFailingTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
// task.id();
// task.keep_alive().await?;
// task.previous_error();
// task.retry_count();
log::info!("[{}] the current number is {}", task.id(), self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
log::info!("[{}] done..", task.id());
Ok(())
}
}
#[tokio::main]
async fn main() {
@ -30,10 +117,11 @@ async fn main() {
let my_app_context = MyApplicationContext::new("Backie Example App");
// Register the task types I want to use and start the worker pool
let (join_handle, queue) = WorkerPool::new(task_store, move |_| my_app_context.clone())
let (join_handle, _queue) =
WorkerPool::new(task_store.clone(), move |_| my_app_context.clone())
.register_task_type::<MyTask>()
.register_task_type::<MyFailingTask>()
.configure_queue("default", 3, RetentionMode::RemoveDone)
.configure_queue("default".into())
.start(async move {
let _ = rx.changed().await;
})
@ -46,6 +134,7 @@ async fn main() {
let task2 = MyTask::new(20_000);
let task3 = MyFailingTask::new(50_000);
let queue = Queue::new(Arc::new(task_store)); // or use the `queue` instance returned by the worker pool
queue.enqueue(task1).await.unwrap();
queue.enqueue(task2).await.unwrap();
queue.enqueue(task3).await.unwrap();

View file

@ -5,7 +5,7 @@
/// All possible options for retaining tasks in the db after their execution.
///
/// The default mode is [`RetentionMode::RemoveAll`]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub enum RetentionMode {
/// Keep all tasks
KeepAll,
@ -28,7 +28,7 @@ pub use runnable::BackgroundTask;
pub use store::{PgTaskStore, TaskStore};
pub use task::{CurrentTask, Task, TaskId, TaskState};
pub use worker::Worker;
pub use worker_pool::WorkerPool;
pub use worker_pool::{QueueConfig, WorkerPool};
pub mod errors;
mod queries;

View file

@ -17,7 +17,7 @@ use serde::{de::DeserializeOwned, ser::Serialize};
///
///
/// # Example
/// ```rust
/// ```
/// use async_trait::async_trait;
/// use backie::{BackgroundTask, CurrentTask};
/// use serde::{Deserialize, Serialize};
@ -25,6 +25,7 @@ use serde::{de::DeserializeOwned, ser::Serialize};
/// #[derive(Serialize, Deserialize)]
/// pub struct MyTask {}
///
/// #[async_trait]
/// impl BackgroundTask for MyTask {
/// const TASK_NAME: &'static str = "my_task_unique_name";
/// type AppData = ();

View file

@ -9,6 +9,7 @@ use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
pub type ExecuteTaskFn<AppData> = Arc<
@ -59,6 +60,8 @@ where
retention_mode: RetentionMode,
pull_interval: Duration,
task_registry: BTreeMap<String, ExecuteTaskFn<AppData>>,
app_data_fn: StateFn<AppData>,
@ -76,6 +79,7 @@ where
store: Arc<S>,
queue_name: String,
retention_mode: RetentionMode,
pull_interval: Duration,
task_registry: BTreeMap<String, ExecuteTaskFn<AppData>>,
app_data_fn: StateFn<AppData>,
shutdown: Option<tokio::sync::watch::Receiver<()>>,
@ -84,6 +88,7 @@ where
store,
queue_name,
retention_mode,
pull_interval,
task_registry,
app_data_fn,
shutdown,
@ -120,11 +125,11 @@ where
log::info!("Shutting down worker");
return Ok(());
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)).fuse() => {}
_ = tokio::time::sleep(self.pull_interval).fuse() => {}
}
}
None => {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(self.pull_interval).await;
}
};
}

View file

@ -9,6 +9,7 @@ use futures::future::join_all;
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Clone)]
@ -37,7 +38,7 @@ where
queue_tasks: BTreeMap<String, Vec<String>>,
/// Number of workers that will be spawned per queue.
worker_queues: BTreeMap<String, (RetentionMode, u32)>,
worker_queues: BTreeMap<String, QueueConfig>,
}
impl<AppData, S> WorkerPool<AppData, S>
@ -80,14 +81,8 @@ where
self
}
pub fn configure_queue(
mut self,
queue_name: impl ToString,
num_workers: u32,
retention_mode: RetentionMode,
) -> Self {
self.worker_queues
.insert(queue_name.to_string(), (retention_mode, num_workers));
pub fn configure_queue(mut self, config: QueueConfig) -> Self {
self.worker_queues.insert(config.name.clone(), config);
self
}
@ -110,12 +105,13 @@ where
let mut worker_handles = Vec::new();
// Spawn all individual workers per queue
for (queue_name, (retention_mode, num_workers)) in self.worker_queues.iter() {
for idx in 0..*num_workers {
for (queue_name, queue_config) in self.worker_queues.iter() {
for idx in 0..queue_config.num_workers {
let mut worker: Worker<AppData, S> = Worker::new(
self.task_store.clone(),
queue_name.clone(),
*retention_mode,
queue_config.retention_mode,
queue_config.pull_interval,
self.task_registry.clone(),
self.application_data_fn.clone(),
Some(rx.clone()),
@ -157,6 +153,80 @@ where
}
}
/// Configuration for a queue.
///
/// This is used to configure the number of workers, the retention mode, and the pulling interval
/// for a queue.
///
/// # Examples
///
/// Example of configuring a queue with all options:
/// ```
/// # use backie::QueueConfig;
/// # use backie::RetentionMode;
/// # use std::time::Duration;
/// let config = QueueConfig::new("default")
/// .num_workers(5)
/// .retention_mode(RetentionMode::KeepAll)
/// .pull_interval(Duration::from_secs(1));
/// ```
/// Example of queue configuration with default options:
/// ```
/// # use backie::QueueConfig;
/// let config = QueueConfig::new("default");
/// // Also possible to use the `From` trait:
/// let config: QueueConfig = "default".into();
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct QueueConfig {
name: String,
num_workers: u32,
retention_mode: RetentionMode,
pull_interval: Duration,
}
impl QueueConfig {
/// Create a new queue configuration.
pub fn new(name: impl ToString) -> Self {
Self {
name: name.to_string(),
num_workers: 1,
retention_mode: RetentionMode::default(),
pull_interval: Duration::from_secs(1),
}
}
/// Set the number of workers for this queue.
pub fn num_workers(mut self, num_workers: u32) -> Self {
self.num_workers = num_workers;
self
}
/// Set the retention mode for this queue.
pub fn retention_mode(mut self, retention_mode: RetentionMode) -> Self {
self.retention_mode = retention_mode;
self
}
/// Set the pull interval for this queue.
///
/// This is the interval at which the queue will be checking for new tasks by calling
/// the backend storage.
pub fn pull_interval(mut self, pull_interval: Duration) -> Self {
self.pull_interval = pull_interval;
self
}
}
impl<S> From<S> for QueueConfig
where
S: ToString,
{
fn from(name: S) -> Self {
Self::new(name.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -263,7 +333,7 @@ mod tests {
let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(GreetingTask::QUEUE, 1, RetentionMode::RemoveDone)
.configure_queue(GreetingTask::QUEUE.into())
.start(futures::future::ready(()))
.await
.unwrap();
@ -286,8 +356,8 @@ mod tests {
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.register_task_type::<OtherTask>()
.configure_queue("default", 1, RetentionMode::default())
.configure_queue("other_queue", 1, RetentionMode::default())
.configure_queue("default".into())
.configure_queue("other_queue".into())
.start(futures::future::ready(()))
.await
.unwrap();
@ -348,7 +418,7 @@ mod tests {
let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<NotifyFinished>()
.configure_queue("default", 1, RetentionMode::default())
.configure_queue("default".into())
.start(async move {
rx.await.unwrap();
println!("Worker pool got notified to stop");
@ -437,7 +507,7 @@ mod tests {
move |_| my_app_context.clone()
})
.register_task_type::<NotifyStopDuringRun>()
.configure_queue("default", 1, RetentionMode::default())
.configure_queue("default".into())
.start(async move {
rx.await.unwrap();
println!("Worker pool got notified to stop");
@ -530,7 +600,7 @@ mod tests {
move |_| player_context.clone()
})
.register_task_type::<KeepAliveTask>()
.configure_queue("default", 1, RetentionMode::default())
.configure_queue("default".into())
.start(async move {
should_stop.await.unwrap();
println!("Worker pool got notified to stop");
@ -575,7 +645,9 @@ mod tests {
let (join_handle, _queue) =
WorkerPool::new(pg_task_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(GreetingTask::QUEUE, 1, RetentionMode::RemoveDone)
.configure_queue(
QueueConfig::new(GreetingTask::QUEUE).retention_mode(RetentionMode::RemoveDone),
)
.start(futures::future::ready(()))
.await
.unwrap();