From 4895400d33089c6b55f45d399e0e88dbca89a6c3 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Sat, 11 Mar 2023 22:22:25 +0100 Subject: [PATCH] Update readme --- Cargo.toml | 2 +- LICENCE | 27 ++--- README.md | 262 +++++++++++++++++---------------------------- src/lib.rs | 18 ++-- src/queries.rs | 2 + src/queue.rs | 2 +- src/store.rs | 21 +++- src/worker.rs | 30 ++---- src/worker_pool.rs | 116 +++++++++++++++++++- 9 files changed, 260 insertions(+), 220 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 32b314f..a817322 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" authors = [ "Rafael Caricio ", ] -description = "Async background task processing library with Tokio and Postgres" +description = "Async persistent background task processing for Rust applications with Tokio and PostgreSQL." repository = "https://code.caric.io/rafaelcaricio/backie" edition = "2021" license = "MIT" diff --git a/LICENCE b/LICENCE index 9fea819..9156721 100644 --- a/LICENCE +++ b/LICENCE @@ -1,21 +1,16 @@ MIT License -Copyright (c) 2022 Ayrat Badykov +Copyright (c) 2023 Rafael Caricio -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +documentation files (the “Software”), to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index cba2f3f..02eeba2 100644 --- a/README.md +++ b/README.md @@ -1,218 +1,154 @@ # Backie 🚲 -Async background job processing library with Diesel and Tokio. It's a heavily modified fork of [fang](https://github.com/ayrat555/fang). +Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks +to be processed by workers. It's designed to be easy to use and horizontally scalable. It uses Postgres as +a storage backend and can also be extended to support other types of storage. -## Key Features +High-level overview of how Backie works: +- Client puts tasks on a queue +- Server starts a multiple workers per queue +- Worker pulls tasks off the queue and starts processing them +- Tasks are processed concurrently by multiple workers - Here are some of the fang's key features: +Backie started as a fork of +[fang](https://github.com/ayrat555/fang) crate, but quickly diverged significantly in its implementation. - - Async workers: Workers are started as `tokio` tasks (async workers) - - Unique tasks: Tasks are not duplicated in the queue if they are unique - - Single-purpose workers: Tasks are stored in a single table but workers can be configured to execute only tasks of a specific type - - Retries: Tasks can be retried with a custom backoff mode +## Key features -## Differences from Fang crate +Here are some of the Backie's key features: -- Supports only async processing -- Supports graceful shutdown -- The connection pool for the queue is provided by the user -- Tasks status is calculated based on the database state -- Tasks have a timeout and are retried if they are not completed in time +- Async workers: Workers are started as [Tokio](https://tokio.rs/) tasks +- Application context: Tasks can access an shared user-provided application context +- Single-purpose workers: Tasks are stored together but workers are configured to execute only tasks of a specific queue +- Retries: Tasks are retried with a custom backoff mode +- Graceful shutdown: provide a future to gracefully shutdown the workers, on-the-fly tasks are not interrupted +- Recovery of unfinished tasks: Tasks that were not finished are retried on the next worker start +- Unique tasks: Tasks are not duplicated in the queue if they provide a unique hash + +## Other planned features + +- Task timeout: Tasks are retried if they are not completed in time +- Scheduling of tasks: Tasks can be scheduled to be executed at a specific time ## Installation -1. Add this to your Cargo.toml +1. Add this to your `Cargo.toml` ```toml [dependencies] -backie = "0.10" +backie = "0.1" ``` -*Supports rustc 1.67+* +If you are not already using, you will also want to include the following dependencies for defining your tasks: + +```toml +[dependencies] +async-trait = "0.1" +serde = { version = "1.0", features = ["derive"] } +diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } +diesel-async = { version = "0.2", features = ["postgres", "bb8"] } +``` + +Those dependencies are required to use the `#[async_trait]` and `#[derive(Serialize, Deserialize)]` attributes +in your task definitions and to connect to the Postgres database. + +*Supports rustc 1.68+* 2. Create the `backie_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/rafaelcaricio/backie/blob/master/migrations/2023-03-06-151907_create_backie_tasks/up.sql). ## Usage -Every task must implement the `backie::RunnableTask` trait, Backie uses the information provided by the trait to -execute the task. +The [`BackgroundTask`] trait is used to define a task. You must implement this trait for all +tasks you want to execute. -All implementations of `RunnableTask` must have unique names per project. +One important thing to note is the use of the attribute [`BackgroundTask::TASK_NAME`] which **must** be unique for +the whole application. This attribute is critical for reconstructing the task back from the database. + +The [`BackgroundTask::AppData`] can be used to argument the task with your application specific contextual information. +This is useful for example to pass a database connection pool to the task or other application configuration. + +The [`BackgroundTask::run`] method is where you define the behaviour of your background task execution. This method +will be called by the task queue workers. ```rust -use backie::RunnableTask; -use backie::task::{TaskHash, TaskType}; -use backie::queue::AsyncQueueable; -use serde::{Deserialize, Serialize}; use async_trait::async_trait; +use backie::{BackgroundTask, CurrentTask}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] -#[serde(crate = "fang::serde")] -struct MyTask { - pub number: u16, +pub struct MyTask { + info: String, } -#[typetag::serde] #[async_trait] -impl RunnableTask for MyTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { - Ok(()) - } - - // this func is optional - // Default task_type is common - fn task_type(&self) -> TaskType { - "my-task-type".into() - } +impl BackgroundTask for MyTask { + const TASK_NAME: &'static str = "my_task_unique_name"; + type AppData = (); - // If `uniq` is set to true and the task is already in the storage, it won't be inserted again - // The existing record will be returned for for any insertions operaiton - fn uniq(&self) -> Option { - None - } - - // the maximum number of retries. Set it to 0 to make it not retriable - // the default value is 20 - fn max_retries(&self) -> i32 { - 20 - } - - // backoff mode for retries - fn backoff(&self, attempt: u32) -> u32 { - u32::pow(2, attempt) - } + async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error> { + // Do something + Ok(()) + } } ``` -### Enqueuing a task - -To enqueue a task use `AsyncQueueable::create_task`. - -For Postgres backend. -```rust -use backie::queue::PgAsyncQueue; - -// Create an AsyncQueue -let manager = AsyncDieselConnectionManager::::new("postgres://postgres:password@localhost/backie"); -let pool = Pool::builder() - .max_size(1) - .min_idle(Some(1)) - .build(manager) - .await - .unwrap(); - -let mut queue = PgAsyncQueue::new(pool); - -// Publish the first example -let task = MyTask { number: 8 }; -let task_returned = queue - .create_task(&task) - .await - .unwrap(); -``` - ### Starting workers -Every worker runs in a separate `tokio` task. In case of panic, they are always restarted. -Use `AsyncWorkerPool` to start workers. +First, we need to create a [`TaskStore`] trait instance. This is the object responsible for storing and retrieving +tasks from a database. Backie currently only supports Postgres as a storage backend via the provided +[`PgTaskStore`]. You can implement other storage backends by implementing the [`TaskStore`] trait. ```rust -use backie::worker_pool::AsyncWorkerPool; +let connection_url = "postgres://postgres:password@localhost/backie"; -// Need to create a queue -// Also insert some tasks +let manager = AsyncDieselConnectionManager::::new(connection_url); +let pool = Pool::builder() + .max_size(3) + .build(manager) + .await + .unwrap(); -let mut pool: AsyncWorkerPool = AsyncWorkerPool::builder() - .number_of_workers(max_pool_size) - .queue(queue.clone()) - // if you want to run tasks of the specific kind - .task_type("my_task_type".into()) - .build(); - -pool.start().await; +let task_store = PgTaskStore::new(pool); ``` -Check out: - -- [Simple Worker Example](https://github.com/rafaelcaricio/backie/tree/master/examples/simple_worker) - simple worker example - -### Configuration - -Use the `AsyncWorkerPool` builder: - - ```rust - let mut pool: AsyncWorkerPool = AsyncWorkerPool::builder() - .number_of_workers(max_pool_size) - .queue(queue.clone()) - .build(); - ``` - -### Configuring the type of workers - -### Configuring retention mode - -By default, all successfully finished tasks are removed from the DB, failed tasks aren't. - -There are three retention modes you can use: +Then, we can use the `task_store` to start a worker pool using the [`WorkerPool`]. The [`WorkerPool`] is responsible +for starting the workers and managing their lifecycle. ```rust -pub enum RetentionMode { - KeepAll, // doesn't remove tasks - RemoveAll, // removes all tasks - RemoveFinished, // default value -} +// Register the task types I want to use and start the worker pool +let (_, queue) = WorkerPool::new(task_store, |_|()) + .register_task_type::() + .configure_queue("default", 1, RetentionMode::default()) + .start(futures::future::pending::<()>()) + .await + .unwrap(); ``` -Set retention mode with worker pools `TypeBuilder` in both modules. +With that, we are defining that we want to execute instances of `MyTask` and that the `default` queue should +have 1 worker running using the default [`RetentionMode`] (remove from the database only successfully finished tasks). +We also defined in the `start` method that the worker pool should run forever. + +### Queueing tasks + +After stating the workers we get an instance of [`Queue`] which we can use to enqueue tasks: + +```rust +let task = MyTask { info: "Hello world!".to_string() }; +queue.enqueue(task).await.unwrap(); +``` ## Contributing -1. [Fork it!](https://github.com/ayrat555/fang/fork) +1. [Fork it!](https://github.com/rafaelcaricio/backie/fork) 2. Create your feature branch (`git checkout -b my-new-feature`) 3. Commit your changes (`git commit -am 'Add some feature'`) 4. Push to the branch (`git push origin my-new-feature`) 5. Create a new Pull Request -### Running tests locally -- Install diesel_cli. -``` -cargo install diesel_cli -``` -- Install docker on your machine. +## Thank related crates authors -- Run a Postgres docker container. (See in Makefile.) -``` -make db -``` +I would like to thank the authors of the [Fang](https://github.com/ayrat555/fang) and [background_job](https://git.asonix.dog/asonix/background-jobs.git) crates which were the main inspiration for this project. -- Run the migrations -``` -make diesel -``` - -- Run tests -``` -make tests -``` - -- Run dirty//long tests, DB must be recreated afterwards. -``` -make ignored -``` - -- Kill the docker container -``` -make stop -``` - -## Thank Fang's authors - -I would like to thank the authors of the fang crate which was the inspiration for this project. - -- Ayrat Badykov (@ayrat555) -- Pepe Márquez (@pxp9) - -[ci]: https://crates.io/crates/backie -[docs]: https://docs.rs/backie/ -[ga-test]: https://github.com/rafaelcaricio/backie/actions/workflows/rust.yml/badge.svg -[ga-style]: https://github.com/rafaelcaricio/backie/actions/workflows/style.yml/badge.svg +- Ayrat Badykov ([@ayrat555](https://github.com/ayrat555)) +- Pepe Márquez ([@pxp9](https://github.com/pxp9)) +- Riley ([asonix](https://github.com/asonix)) diff --git a/src/lib.rs b/src/lib.rs index 551ef29..cbdf900 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,21 +19,23 @@ pub enum RetentionMode { impl Default for RetentionMode { fn default() -> Self { - Self::RemoveAll + Self::RemoveDone } } pub use runnable::BackgroundTask; pub use store::{PgTaskStore, TaskStore}; -pub use task::CurrentTask; +pub use task::{CurrentTask, Task, TaskId, TaskState}; pub use worker_pool::WorkerPool; +pub use worker::Worker; +pub use queue::Queue; pub mod errors; mod queries; -pub mod queue; -pub mod runnable; +mod queue; +mod runnable; mod schema; -pub mod store; -pub mod task; -pub mod worker; -pub mod worker_pool; +mod store; +mod task; +mod worker; +mod worker_pool; diff --git a/src/queries.rs b/src/queries.rs index e3bd774..dda0885 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -65,8 +65,10 @@ impl Task { pub(crate) async fn fetch_next_pending( connection: &mut AsyncPgConnection, queue_name: &str, + task_names: &Vec, ) -> Option { backie_tasks::table + .filter(backie_tasks::task_name.eq_any(task_names)) .filter(backie_tasks::scheduled_at.lt(Utc::now())) // skip tasks scheduled for the future .order(backie_tasks::created_at.asc()) // get the oldest task first .filter(backie_tasks::running_at.is_null()) // that is not marked as running already diff --git a/src/queue.rs b/src/queue.rs index 4119091..94fa656 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -17,7 +17,7 @@ impl Queue where S: TaskStore, { - pub(crate) fn new(task_store: Arc) -> Self { + pub fn new(task_store: Arc) -> Self { Queue { task_store } } diff --git a/src/store.rs b/src/store.rs index de82f50..83bd9f1 100644 --- a/src/store.rs +++ b/src/store.rs @@ -19,7 +19,11 @@ impl PgTaskStore { #[async_trait::async_trait] impl TaskStore for PgTaskStore { - async fn pull_next_task(&self, queue_name: &str) -> Result, AsyncQueueError> { + async fn pull_next_task( + &self, + queue_name: &str, + task_names: &Vec, + ) -> Result, AsyncQueueError> { let mut connection = self .pool .get() @@ -28,7 +32,7 @@ impl TaskStore for PgTaskStore { connection .transaction::, AsyncQueueError, _>(|conn| { async move { - let Some(pending_task) = Task::fetch_next_pending(conn, queue_name).await else { + let Some(pending_task) = Task::fetch_next_pending(conn, queue_name, task_names).await else { return Ok(None); }; @@ -107,11 +111,16 @@ pub mod test_store { #[async_trait::async_trait] impl TaskStore for MemoryTaskStore { - async fn pull_next_task(&self, queue_name: &str) -> Result, AsyncQueueError> { + async fn pull_next_task( + &self, + queue_name: &str, + task_names: &Vec, + ) -> Result, AsyncQueueError> { let mut tasks = self.tasks.lock().await; let mut next_task = None; for (_, task) in tasks .iter_mut() + .filter(|(_, task)| task_names.contains(&task.task_name)) .sorted_by(|a, b| a.1.created_at.cmp(&b.1.created_at)) { if task.queue_name == queue_name && task.state() == TaskState::Ready { @@ -189,7 +198,11 @@ pub mod test_store { #[async_trait::async_trait] pub trait TaskStore: Clone + Send + Sync + 'static { - async fn pull_next_task(&self, queue_name: &str) -> Result, AsyncQueueError>; + async fn pull_next_task( + &self, + queue_name: &str, + task_names: &Vec, + ) -> Result, AsyncQueueError>; async fn create_task(&self, new_task: NewTask) -> Result; async fn set_task_state(&self, id: TaskId, state: TaskState) -> Result<(), AsyncQueueError>; async fn remove_task(&self, id: TaskId) -> Result; diff --git a/src/worker.rs b/src/worker.rs index 3fa2bf4..a523657 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -91,6 +91,7 @@ where } pub(crate) async fn run_tasks(&mut self) -> Result<(), BackieError> { + let registered_task_names = self.task_registry.keys().cloned().collect(); loop { // Check if has to stop before pulling next task if let Some(ref shutdown) = self.shutdown { @@ -99,7 +100,11 @@ where } }; - match self.store.pull_next_task(&self.queue_name).await? { + match self + .store + .pull_next_task(&self.queue_name, ®istered_task_names) + .await? + { Some(task) => { self.run(task).await?; } @@ -127,29 +132,6 @@ where } } - // #[cfg(test)] - // pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { - // loop { - // match self.store.pull_next_task(self.queue_name.clone()).await? { - // Some(task) => { - // let actual_task: Box = - // serde_json::from_value(task.payload.clone()).unwrap(); - // - // // check if task is scheduled or not - // if let Some(CronPattern(_)) = actual_task.cron() { - // // program task - // // self.queue.schedule_task(&*actual_task).await?; - // } - // // run scheduled task - // self.run(task, actual_task).await?; - // } - // None => { - // return Ok(()); - // } - // }; - // } - // } - async fn run(&self, task: Task) -> Result<(), BackieError> { let task_info = CurrentTask::new(&task); let runnable_task_caller = self diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 000c98a..57936da 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -121,7 +121,7 @@ where // TODO: grab the join handle for every worker for graceful shutdown tokio::spawn(async move { match worker.run_tasks().await { - Ok(()) => log::info!("Worker {worker_name} stopped sucessfully"), + Ok(()) => log::info!("Worker {worker_name} stopped successfully"), Err(err) => log::error!("Worker {worker_name} stopped due to error: {err}"), } }); @@ -148,10 +148,10 @@ mod tests { use crate::store::test_store::MemoryTaskStore; use crate::store::PgTaskStore; use crate::task::CurrentTask; - use anyhow::Error; use async_trait::async_trait; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; + use tokio::sync::Mutex; #[derive(Clone, Debug)] struct ApplicationContext { @@ -207,7 +207,11 @@ mod tests { type AppData = ApplicationContext; - async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Error> { + async fn run( + &self, + task: CurrentTask, + context: Self::AppData, + ) -> Result<(), anyhow::Error> { println!( "[{}] Other task with {}!", task.id(), @@ -217,6 +221,36 @@ mod tests { } } + #[derive(Clone)] + struct NotifyFinishedContext { + tx: Arc>>>, + } + + #[derive(serde::Serialize, serde::Deserialize)] + struct NotifyFinished; + + #[async_trait] + impl BackgroundTask for NotifyFinished { + const TASK_NAME: &'static str = "notify_finished"; + + type AppData = NotifyFinishedContext; + + async fn run( + &self, + task: CurrentTask, + context: Self::AppData, + ) -> Result<(), anyhow::Error> { + match context.tx.lock().await.take() { + None => println!("Cannot notify, already done that!"), + Some(tx) => { + tx.send(()).unwrap(); + println!("[{}] Notify finished did it's job!", task.id()) + } + }; + Ok(()) + } + } + #[tokio::test] async fn validate_all_registered_tasks_queues_are_configured() { let my_app_context = ApplicationContext::new(); @@ -283,6 +317,82 @@ mod tests { join_handle.await.unwrap(); } + #[tokio::test] + async fn test_worker_pool_stop_after_task_execute() { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let my_app_context = NotifyFinishedContext { + tx: Arc::new(Mutex::new(Some(tx))), + }; + + let (join_handle, queue) = + WorkerPool::new(memory_store().await, move |_| my_app_context.clone()) + .register_task_type::() + .configure_queue("default", 1, RetentionMode::default()) + .start(async move { + rx.await.unwrap(); + println!("Worker pool got notified to stop"); + }) + .await + .unwrap(); + + // Notifies the worker pool to stop after the task is executed + queue.enqueue(NotifyFinished).await.unwrap(); + + // This makes sure the task can run multiple times and use the shared context + queue.enqueue(NotifyFinished).await.unwrap(); + + join_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_worker_pool_try_to_run_unknown_task() { + #[derive(Clone, serde::Serialize, serde::Deserialize)] + struct UnknownTask; + + #[async_trait] + impl BackgroundTask for UnknownTask { + const TASK_NAME: &'static str = "unknown_task"; + + type AppData = NotifyFinishedContext; + + async fn run( + &self, + task: CurrentTask, + _context: Self::AppData, + ) -> Result<(), anyhow::Error> { + println!("[{}] Unknown task ran!", task.id()); + Ok(()) + } + } + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let my_app_context = NotifyFinishedContext { + tx: Arc::new(Mutex::new(Some(tx))), + }; + + let task_store = memory_store().await; + + let (join_handle, queue) = WorkerPool::new(task_store, move |_| my_app_context.clone()) + .register_task_type::() + .configure_queue("default", 1, RetentionMode::default()) + .start(async move { + rx.await.unwrap(); + println!("Worker pool got notified to stop"); + }) + .await + .unwrap(); + + // Enqueue a task that is not registered + queue.enqueue(UnknownTask).await.unwrap(); + + // Notifies the worker pool to stop for this test + queue.enqueue(NotifyFinished).await.unwrap(); + + join_handle.await.unwrap(); + } + async fn memory_store() -> MemoryTaskStore { MemoryTaskStore::default() }