diff --git a/README.md b/README.md index 82fc145..22d1100 100644 --- a/README.md +++ b/README.md @@ -1,308 +1,153 @@ -

fang

- [![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style] # Backie -Background task processing library for Rust. It uses Postgres DB as a task queue. +Async background job processing library with Diesel and Tokio. It's a heavily modified fork of [fang](https://github.com/ayrat555/fang). ## Key Features Here are some of the fang's key features: - - Async and threaded workers. - Workers can be started in threads (threaded workers) or `tokio` tasks (async workers) - - Scheduled tasks. - Tasks can be scheduled at any time in the future - - Periodic (CRON) tasks. - Tasks can be scheduled using cron expressions - - Unique tasks. - Tasks are not duplicated in the queue if they are unique - - Single-purpose workers. - Tasks are stored in a single table but workers can be configured to execute only tasks of a specific type - - Retries. - Tasks can be retried with a custom backoff mode + - Async workers: Workers are started as `tokio` tasks (async workers) + - Unique tasks: Tasks are not duplicated in the queue if they are unique + - Single-purpose workers: Tasks are stored in a single table but workers can be configured to execute only tasks of a specific type + - Retries: Tasks can be retried with a custom backoff mode -## Differences from original fang +## Differences from Fang crate - Supports only async processing - Supports graceful shutdown - The connection pool for the queue is provided by the user +- Tasks status is calculated based on the database state +- Tasks have a timeout and are retried if they are not completed in time ## Installation 1. Add this to your Cargo.toml - -#### the Blocking feature ```toml [dependencies] -fang = { version = "0.10" , features = ["blocking"], default-features = false } -``` - -#### the Asynk feature -```toml -[dependencies] -fang = { version = "0.10" , features = ["asynk"], default-features = false } -``` - -#### Both features -```toml -fang = { version = "0.10" } +backie = "0.10" ``` *Supports rustc 1.67+* -2. Create the `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2022-08-20-151615_create_fang_tasks/up.sql). +2. Create the `backie_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/rafaelcaricio/backie/blob/master/migrations/2023-03-06-151907_create_backie_tasks/up.sql). ## Usage -### Defining a task +Every task must implement the `backie::RunnableTask` trait, Backie uses the information provided by the trait to +execute the task. -#### Blocking feature -Every task should implement the `fang::Runnable` trait which is used by `fang` to execute it. +All implementations of `RunnableTask` must have unique names per project. ```rust -use fang::Error; -use fang::Runnable; -use fang::typetag; -use fang::PgConnection; -use fang::serde::{Deserialize, Serialize}; +use backie::RunnableTask; +use backie::task::{TaskHash, TaskType}; +use backie::queue::AsyncQueueable; +use serde::{Deserialize, Serialize}; +use async_trait::async_trait; #[derive(Serialize, Deserialize)] #[serde(crate = "fang::serde")] struct MyTask { - pub number: u16, -} - -#[typetag::serde] -impl Runnable for MyTask { - fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { - println!("the number is {}", self.number); - - Ok(()) - } - - // If `uniq` is set to true and the task is already in the storage, it won't be inserted again - // The existing record will be returned for for any insertions operaiton - fn uniq(&self) -> bool { - true - } - - // This will be useful if you want to filter tasks. - // the default value is `common` - fn task_type(&self) -> String { - "my_task".to_string() - } - - // This will be useful if you would like to schedule tasks. - // default value is None (the task is not scheduled, it's just executed as soon as it's inserted) - fn cron(&self) -> Option { - let expression = "0/20 * * * Aug-Sep * 2022/1"; - Some(Scheduled::CronPattern(expression.to_string())) - } - - // the maximum number of retries. Set it to 0 to make it not retriable - // the default value is 20 - fn max_retries(&self) -> i32 { - 20 - } - - // backoff mode for retries - fn backoff(&self, attempt: u32) -> u32 { - u32::pow(2, attempt) - } -} -``` - -As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task. - -The second parameter of the `run` function is a struct that implements `fang::Queueable`. You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it. - - -#### Asynk feature -Every task should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it. - -Be careful not to call two implementations of the AsyncRunnable trait with the same name, because it will cause a failure in the `typetag` crate. -```rust -use fang::AsyncRunnable; -use fang::asynk::async_queue::AsyncQueueable; -use fang::serde::{Deserialize, Serialize}; -use fang::async_trait; - -#[derive(Serialize, Deserialize)] -#[serde(crate = "fang::serde")] -struct AsyncTask { - pub number: u16, + pub number: u16, } #[typetag::serde] #[async_trait] -impl AsyncRunnable for AsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { - Ok(()) - } - // this func is optional - // Default task_type is common - fn task_type(&self) -> String { - "my-task-type".to_string() - } +impl RunnableTask for MyTask { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + Ok(()) + } + + // this func is optional + // Default task_type is common + fn task_type(&self) -> TaskType { + "my-task-type".into() + } + // If `uniq` is set to true and the task is already in the storage, it won't be inserted again + // The existing record will be returned for for any insertions operaiton + fn uniq(&self) -> Option { + None + } - // If `uniq` is set to true and the task is already in the storage, it won't be inserted again - // The existing record will be returned for for any insertions operaiton - fn uniq(&self) -> bool { - true - } - - // This will be useful if you would like to schedule tasks. - // default value is None (the task is not scheduled, it's just executed as soon as it's inserted) - fn cron(&self) -> Option { - let expression = "0/20 * * * Aug-Sep * 2022/1"; - Some(Scheduled::CronPattern(expression.to_string())) - } - - // the maximum number of retries. Set it to 0 to make it not retriable - // the default value is 20 - fn max_retries(&self) -> i32 { + // the maximum number of retries. Set it to 0 to make it not retriable + // the default value is 20 + fn max_retries(&self) -> i32 { 20 - } + } - // backoff mode for retries - fn backoff(&self, attempt: u32) -> u32 { + // backoff mode for retries + fn backoff(&self, attempt: u32) -> u32 { u32::pow(2, attempt) - } + } } ``` -In both modules, tasks can be scheduled to be executed once. Use `Scheduled::ScheduleOnce` enum variant. - -Datetimes and cron patterns are interpreted in the UTC timezone. So you should introduce the offset to schedule in a different timezone. - -Example: - -If your timezone is UTC + 2 and you want to schedule at 11:00: - -```rust - let expression = "0 0 9 * * * *"; -``` - - ### Enqueuing a task -#### the Blocking feature -To enqueue a task use `Queue::enqueue_task` - -```rust -use fang::Queue; - -// create a r2d2 pool - -// create a fang queue - - let queue = Queue::builder().connection_pool(pool).build(); - - let task_inserted = queue.insert_task(&MyTask::new(1)).unwrap(); - -``` - -#### the Asynk feature -To enqueue a task use `AsyncQueueable::insert_task`. +To enqueue a task use `AsyncQueueable::create_task`. For Postgres backend. ```rust -use fang::asynk::async_queue::AsyncQueue; -use fang::NoTls; -use fang::AsyncRunnable; +use backie::queue::PgAsyncQueue; // Create an AsyncQueue -let max_pool_size: u32 = 2; +let manager = AsyncDieselConnectionManager::::new("postgres://postgres:password@localhost/backie"); +let pool = Pool::builder() + .max_size(1) + .min_idle(Some(1)) + .build(manager) + .await + .unwrap(); -let mut queue = AsyncQueue::builder() - // Postgres database url - .uri("postgres://postgres:postgres@localhost/fang") - // Max number of connections that are allowed - .max_pool_size(max_pool_size) - .build(); +let mut queue = PgAsyncQueue::new(pool); -// Always connect first in order to perform any operation -queue.connect(NoTls).await.unwrap(); - -``` -As an easy example, we are using NoTls type. If for some reason you would like to encrypt Postgres requests, you can use [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) or [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/). - -```rust -// AsyncTask from the first example -let task = AsyncTask { 8 }; +// Publish the first example +let task = MyTask { number: 8 }; let task_returned = queue - .insert_task(&task as &dyn AsyncRunnable) + .create_task(&task) .await .unwrap(); ``` ### Starting workers -#### the Blocking feature -Every worker runs in a separate thread. In case of panic, they are always restarted. - -Use `WorkerPool` to start workers. Use `WorkerPool::builder` to create your worker pool and run tasks. - - -```rust -use fang::WorkerPool; -use fang::Queue; - -// create a Queue - -let mut worker_pool = WorkerPool::::builder() - .queue(queue) - .number_of_workers(3_u32) - // if you want to run tasks of the specific kind - .task_type("my_task_type") - .build(); - -worker_pool.start(); -``` - -#### the Asynk feature Every worker runs in a separate `tokio` task. In case of panic, they are always restarted. Use `AsyncWorkerPool` to start workers. ```rust -use fang::asynk::async_worker_pool::AsyncWorkerPool; +use backie::worker_pool::AsyncWorkerPool; // Need to create a queue // Also insert some tasks -let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() +let mut pool: AsyncWorkerPool = AsyncWorkerPool::builder() .number_of_workers(max_pool_size) .queue(queue.clone()) // if you want to run tasks of the specific kind - .task_type("my_task_type") + .task_type("my_task_type".into()) .build(); pool.start().await; ``` - Check out: -- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_worker) - simple worker example -- [Simple Cron Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_cron_worker) - simple worker example -- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_async_worker) - simple async worker example -- [Simple Cron Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_cron_async_worker) - simple async worker example -- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses the Fang's blocking module to synchronize feeds and deliver updates to users. -- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses the Fang's asynk module to process updates from Telegram users and schedule weather info. +- [Simple Worker Example](https://github.com/rafaelcaricio/backie/tree/master/examples/simple_worker) - simple worker example ### Configuration -#### Blocking feature - -Just use `TypeBuilder` for `WorkerPool`. - -#### Asynk feature - -Just use `TypeBuilder` for `AsyncWorkerPool`. +Use the `AsyncWorkerPool` builder: + + ```rust + let mut pool: AsyncWorkerPool = AsyncWorkerPool::builder() + .number_of_workers(max_pool_size) + .queue(queue.clone()) + .build(); + ``` ### Configuring the type of workers @@ -322,36 +167,6 @@ pub enum RetentionMode { Set retention mode with worker pools `TypeBuilder` in both modules. -### Configuring sleep values - -#### Blocking feature - -You can use use `SleepParams` to configure sleep values: - -```rust -pub struct SleepParams { - pub sleep_period: Duration, // default value is 5 seconds - pub max_sleep_period: Duration, // default value is 15 seconds - pub min_sleep_period: Duration, // default value is 5 seconds - pub sleep_step: Duration, // default value is 5 seconds -} -``` - -If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds. - - -Use `set_sleep_params` to set it: -```rust -let sleep_params = SleepParams { - sleep_period: Duration::from_secs(2), - max_sleep_period: Duration::from_secs(6), - min_sleep_period: Duration::from_secs(2), - sleep_step: Duration::from_secs(1), -}; -``` - -Set sleep params with worker pools `TypeBuilder` in both modules. - ## Contributing 1. [Fork it!](https://github.com/ayrat555/fang/fork) @@ -392,17 +207,14 @@ make ignored make stop ``` -## Authors +## Thank Fang's authors + +I would like to thank the authors of the fang crate which was the inspiration for this project. - Ayrat Badykov (@ayrat555) - - Pepe Márquez (@pxp9) - -[s1]: https://img.shields.io/crates/v/fang.svg -[docs-badge]: https://img.shields.io/badge/docs-website-blue.svg -[ci]: https://crates.io/crates/fang -[docs]: https://docs.rs/fang/ -[ga-test]: https://github.com/ayrat555/fang/actions/workflows/rust.yml/badge.svg -[ga-style]: https://github.com/ayrat555/fang/actions/workflows/style.yml/badge.svg -[signal-hook]: https://crates.io/crates/signal-hook +[ci]: https://crates.io/crates/backie +[docs]: https://docs.rs/backie/ +[ga-test]: https://github.com/rafaelcaricio/backie/actions/workflows/rust.yml/badge.svg +[ga-style]: https://github.com/rafaelcaricio/backie/actions/workflows/style.yml/badge.svg diff --git a/examples/simple_async_worker/Cargo.toml b/examples/simple_worker/Cargo.toml similarity index 85% rename from examples/simple_async_worker/Cargo.toml rename to examples/simple_worker/Cargo.toml index 1d85f1d..efd1d1a 100644 --- a/examples/simple_async_worker/Cargo.toml +++ b/examples/simple_worker/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "simple_async_worker" +name = "simple_worker" version = "0.1.0" edition = "2021" [dependencies] -fang = { path = "../../../" } +backie = { path = "../../" } env_logger = "0.9.0" log = "0.4.0" tokio = { version = "1", features = ["full"] } diff --git a/examples/simple_async_worker/src/lib.rs b/examples/simple_worker/src/lib.rs similarity index 100% rename from examples/simple_async_worker/src/lib.rs rename to examples/simple_worker/src/lib.rs diff --git a/examples/simple_async_worker/src/main.rs b/examples/simple_worker/src/main.rs similarity index 100% rename from examples/simple_async_worker/src/main.rs rename to examples/simple_worker/src/main.rs diff --git a/logo.png b/logo.png deleted file mode 100644 index d22a761..0000000 Binary files a/logo.png and /dev/null differ diff --git a/src/errors.rs b/src/errors.rs index b9b1fdb..c472d12 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,5 +1,5 @@ -use std::fmt::Display; use serde_json::Error as SerdeError; +use std::fmt::Display; use thiserror::Error; /// Library errors diff --git a/src/queries.rs b/src/queries.rs index fa211d1..2640ef1 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,8 +1,8 @@ use crate::errors::AsyncQueueError; use crate::runnable::RunnableTask; use crate::schema::backie_tasks; -use crate::task::{NewTask, TaskId, TaskType, TaskHash}; use crate::task::Task; +use crate::task::{NewTask, TaskHash, TaskId, TaskType}; use chrono::DateTime; use chrono::Duration; use chrono::Utc; @@ -124,9 +124,7 @@ impl Task { task: Task, ) -> Result { Ok(diesel::update(&task) - .set(( - backie_tasks::running_at.eq(Utc::now()), - )) + .set((backie_tasks::running_at.eq(Utc::now()),)) .get_result::(connection) .await?) } @@ -135,17 +133,17 @@ impl Task { connection: &mut AsyncPgConnection, id: TaskId, ) -> Result { - Ok(diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id))) - .set(( - backie_tasks::done_at.eq(Utc::now()), - )) - .get_result::(connection) - .await?) + Ok( + diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id))) + .set((backie_tasks::done_at.eq(Utc::now()),)) + .get_result::(connection) + .await?, + ) } pub(crate) async fn insert( connection: &mut AsyncPgConnection, - runnable: &dyn RunnableTask + runnable: &dyn RunnableTask, ) -> Result { let payload = serde_json::to_value(runnable)?; match runnable.uniq() { @@ -161,23 +159,21 @@ impl Task { .get_result::(connection) .await?) } - Some(hash) => { - match Self::find_by_uniq_hash(connection, hash.clone()).await { - Some(task) => Ok(task), - None => { - let new_task = NewTask::builder() - .uniq_hash(Some(hash)) - .task_type(runnable.task_type()) - .payload(payload) - .build(); + Some(hash) => match Self::find_by_uniq_hash(connection, hash.clone()).await { + Some(task) => Ok(task), + None => { + let new_task = NewTask::builder() + .uniq_hash(Some(hash)) + .task_type(runnable.task_type()) + .payload(payload) + .build(); - Ok(diesel::insert_into(backie_tasks::table) - .values(new_task) - .get_result::(connection) - .await?) - } + Ok(diesel::insert_into(backie_tasks::table) + .values(new_task) + .get_result::(connection) + .await?) } - } + }, } } diff --git a/src/queue.rs b/src/queue.rs index a2ce673..58710c5 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,6 +1,6 @@ use crate::errors::AsyncQueueError; use crate::runnable::RunnableTask; -use crate::task::{Task, TaskId, TaskType, TaskHash}; +use crate::task::{Task, TaskHash, TaskId, TaskType}; use async_trait::async_trait; use diesel::result::Error::QueryBuilderError; use diesel_async::scoped_futures::ScopedFutureExt; @@ -16,7 +16,10 @@ pub trait Queueable: Send { /// /// This method returns one task of the `task_type` type. If `task_type` is `None` it will try to /// fetch a task of the type `common`. The returned task is marked as running and must be executed. - async fn pull_next_task(&mut self, kind: Option) -> Result, AsyncQueueError>; + async fn pull_next_task( + &mut self, + kind: Option, + ) -> Result, AsyncQueueError>; /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type /// created by an AsyncWorkerPool. @@ -26,7 +29,11 @@ pub trait Queueable: Send { async fn find_task_by_id(&mut self, id: TaskId) -> Result; /// Update the state of a task to failed and set an error_message. - async fn set_task_failed(&mut self, id: TaskId, error_message: &str) -> Result; + async fn set_task_failed( + &mut self, + id: TaskId, + error_message: &str, + ) -> Result; /// Update the state of a task to done. async fn set_task_done(&mut self, id: TaskId) -> Result; @@ -57,18 +64,7 @@ pub trait Queueable: Send { ) -> Result; } -/// An async queue that can be used to enqueue tasks. -/// It uses a PostgreSQL storage. It must be connected to perform any operation. -/// To connect an `AsyncQueue` to PostgreSQL database call the `connect` method. -/// A Queue can be created with the TypedBuilder. -/// -/// ```rust -/// let mut queue = AsyncQueue::builder() -/// .uri("postgres://postgres:postgres@localhost/fang") -/// .max_pool_size(max_pool_size) -/// .build(); -/// ``` -/// +/// An async queue that is used to manipulate tasks, it uses PostgreSQL as storage. #[derive(Debug, Clone)] pub struct PgAsyncQueue { pool: Pool, @@ -98,7 +94,7 @@ impl Queueable for PgAsyncQueue { return Ok(None); }; - Task::set_running(conn, pending_task).await.map(|running_task| Some(running_task)) + Task::set_running(conn, pending_task).await.map(Some) } .scope_boxed() }) @@ -157,8 +153,10 @@ impl Queueable for PgAsyncQueue { let task = Task::find_by_id(conn, id).await?; Task::set_running(conn, task).await?; Ok(()) - }.scope_boxed() - }).await + } + .scope_boxed() + }) + .await } async fn remove_task(&mut self, id: TaskId) -> Result { @@ -228,6 +226,7 @@ impl Queueable for PgAsyncQueue { #[cfg(test)] mod async_queue_tests { use super::*; + use crate::task::TaskState; use crate::Scheduled; use async_trait::async_trait; use chrono::DateTime; @@ -235,7 +234,6 @@ mod async_queue_tests { use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; use serde::{Deserialize, Serialize}; - use crate::task::TaskState; #[derive(Serialize, Deserialize)] struct AsyncTask { @@ -245,7 +243,10 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncTask { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } } @@ -258,7 +259,10 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncUniqTask { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } @@ -276,7 +280,10 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } @@ -495,7 +502,10 @@ mod async_queue_tests { assert_eq!(Some(2), number); assert_eq!(Some("AsyncTask"), type_task); - let result = test.remove_tasks_type(TaskType::from("nonexistentType")).await.unwrap(); + let result = test + .remove_tasks_type(TaskType::from("nonexistentType")) + .await + .unwrap(); assert_eq!(0, result); let result = test.remove_tasks_type(TaskType::default()).await.unwrap(); @@ -509,7 +519,10 @@ mod async_queue_tests { let pool = pool().await; let mut test = PgAsyncQueue::new(pool); - let task = test.create_task(&AsyncUniqTask { number: 1 }).await.unwrap(); + let task = test + .create_task(&AsyncUniqTask { number: 1 }) + .await + .unwrap(); let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); @@ -518,7 +531,10 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncUniqTask"), type_task); - let task = test.create_task(&AsyncUniqTask { number: 2 }).await.unwrap(); + let task = test + .create_task(&AsyncUniqTask { number: 2 }) + .await + .unwrap(); let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); @@ -527,7 +543,8 @@ mod async_queue_tests { assert_eq!(Some(2), number); assert_eq!(Some("AsyncUniqTask"), type_task); - let result = test.remove_task_by_hash(AsyncUniqTask { number: 0 }.uniq().unwrap()) + let result = test + .remove_task_by_hash(AsyncUniqTask { number: 0 }.uniq().unwrap()) .await .unwrap(); assert!(!result, "Should **not** remove task"); diff --git a/src/runnable.rs b/src/runnable.rs index 9972eaa..dde232d 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -1,9 +1,9 @@ -use std::error::Error; use crate::queue::Queueable; +use crate::task::TaskHash; +use crate::task::TaskType; use crate::Scheduled; use async_trait::async_trait; -use crate::task::{TaskType}; -use crate::task::TaskHash; +use std::error::Error; pub const RETRIES_NUMBER: i32 = 20; diff --git a/src/task.rs b/src/task.rs index eb43586..6f4f00d 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,15 +1,15 @@ -use std::borrow::Cow; -use std::fmt; -use std::fmt::Display; use crate::schema::backie_tasks; use chrono::DateTime; use chrono::Utc; use diesel::prelude::*; +use diesel_derive_newtype::DieselNewType; +use serde::Serialize; +use sha2::{Digest, Sha256}; +use std::borrow::Cow; +use std::fmt; +use std::fmt::Display; use typed_builder::TypedBuilder; use uuid::Uuid; -use serde::Serialize; -use diesel_derive_newtype::DieselNewType; -use sha2::{Digest, Sha256}; /// States of a task. #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -58,7 +58,10 @@ where pub struct TaskHash(Cow<'static, str>); impl TaskHash { - pub fn default_for_task(value: &T) -> Result where T: Serialize { + pub fn default_for_task(value: &T) -> Result + where + T: Serialize, + { let value = serde_json::to_value(value)?; let mut hasher = Sha256::new(); hasher.update(serde_json::to_string(&value)?.as_bytes()); diff --git a/src/worker.rs b/src/worker.rs index bc25493..95c248e 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,11 +1,11 @@ -use std::error::Error; use crate::errors::BackieError; use crate::queue::Queueable; use crate::runnable::RunnableTask; use crate::task::{Task, TaskType}; +use crate::RetentionMode; use crate::Scheduled::*; -use crate::{RetentionMode}; use log::error; +use std::error::Error; use typed_builder::TypedBuilder; /// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue @@ -41,7 +41,11 @@ impl AsyncWorker where Q: Queueable + Clone + Sync + 'static, { - async fn run(&mut self, task: Task, runnable: Box) -> Result<(), BackieError> { + async fn run( + &mut self, + task: Task, + runnable: Box, + ) -> Result<(), BackieError> { // TODO: catch panics let result = runnable.run(&mut self.queue).await; match result { @@ -81,7 +85,9 @@ where } Err(error) => { log::debug!("Task {} failed and kept in the database", task.id); - self.queue.set_task_failed(task.id, &format!("{}", error)).await?; + self.queue + .set_task_failed(task.id, &format!("{}", error)) + .await?; } }, RetentionMode::RemoveAll => { @@ -95,7 +101,9 @@ where } Err(error) => { log::debug!("Task {} failed and kept in the database", task.id); - self.queue.set_task_failed(task.id, &format!("{}", error)).await?; + self.queue + .set_task_failed(task.id, &format!("{}", error)) + .await?; } }, }; @@ -118,13 +126,10 @@ where pub(crate) async fn run_tasks(&mut self) { loop { // TODO: check if should stop the worker - match self - .queue - .pull_next_task(self.task_type.clone()) - .await - { + match self.queue.pull_next_task(self.task_type.clone()).await { Ok(Some(task)) => { - let actual_task: Box = serde_json::from_value(task.payload.clone()).unwrap(); + let actual_task: Box = + serde_json::from_value(task.payload.clone()).unwrap(); // check if task is scheduled or not if let Some(CronPattern(_)) = actual_task.cron() { @@ -150,11 +155,7 @@ where #[cfg(test)] pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { loop { - match self - .queue - .pull_next_task(self.task_type.clone()) - .await - { + match self.queue.pull_next_task(self.task_type.clone()).await { Ok(Some(task)) => { let actual_task: Box = serde_json::from_value(task.payload.clone()).unwrap(); @@ -185,8 +186,9 @@ where mod async_worker_tests { use super::*; use crate::errors::BackieError; - use crate::queue::Queueable; use crate::queue::PgAsyncQueue; + use crate::queue::Queueable; + use crate::task::TaskState; use crate::worker::Task; use crate::RetentionMode; use crate::Scheduled; @@ -196,7 +198,6 @@ mod async_worker_tests { use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; use serde::{Deserialize, Serialize}; - use crate::task::TaskState; #[derive(Serialize, Deserialize)] struct WorkerAsyncTask { @@ -206,7 +207,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for WorkerAsyncTask { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } } @@ -219,7 +223,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for WorkerAsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } fn cron(&self) -> Option { @@ -235,7 +242,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncFailedTask { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { let message = format!("number {} is wrong :(", self.number); Err(Box::new(BackieError { @@ -254,7 +264,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncRetryTask { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { let message = "Failed".to_string(); Err(Box::new(BackieError { @@ -273,12 +286,15 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncTaskType1 { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } fn task_type(&self) -> TaskType { - TaskType::from("type1") + "type1".into() } } @@ -288,7 +304,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl RunnableTask for AsyncTaskType2 { - async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { + async fn run( + &self, + _queueable: &mut dyn Queueable, + ) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } @@ -324,33 +343,33 @@ mod async_worker_tests { // async fn schedule_task_test() { // let pool = pool().await; // let mut test = PgAsyncQueue::new(pool); - // + // // let actual_task = WorkerAsyncTaskSchedule { number: 1 }; - // + // // let task = test.schedule_task(&actual_task).await.unwrap(); - // + // // let id = task.id; - // + // // let mut worker = AsyncWorker::::builder() // .queue(test.clone()) // .retention_mode(RetentionMode::KeepAll) // .build(); - // + // // worker.run_tasks_until_none().await.unwrap(); - // + // // let task = worker.queue.find_task_by_id(id).await.unwrap(); - // + // // assert_eq!(id, task.id); // assert_eq!(TaskState::Ready, task.state()); - // + // // tokio::time::sleep(core::time::Duration::from_secs(3)).await; - // + // // worker.run_tasks_until_none().await.unwrap(); - // + // // let task = test.find_task_by_id(id).await.unwrap(); // assert_eq!(id, task.id); // assert_eq!(TaskState::Done, task.state()); - // + // // test.remove_all_tasks().await.unwrap(); // } diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 5c705e8..c5f5407 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,7 +1,7 @@ use crate::queue::Queueable; -use crate::task::{TaskType}; +use crate::task::TaskType; use crate::worker::AsyncWorker; -use crate::{RetentionMode}; +use crate::RetentionMode; use async_recursion::async_recursion; use log::error; use typed_builder::TypedBuilder; @@ -50,7 +50,7 @@ where let join_handle = tokio::spawn(async move { let mut worker: AsyncWorker = AsyncWorker::builder() .queue(inner_pool.queue.clone()) - .retention_mode(inner_pool.retention_mode.clone()) + .retention_mode(inner_pool.retention_mode) .task_type(inner_pool.task_type.clone()) .build();