diff --git a/CHANGELOG.md b/CHANGELOG.md index 050a6a5..14c1ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Not released + +### Added + +- Add retries for tasks - [#92](https://github.com/ayrat555/fang/pull/92) + ## 0.9.1 (2022-09-14) ### Added diff --git a/README.md b/README.md index e8d1870..d070208 100644 --- a/README.md +++ b/README.md @@ -7,21 +7,34 @@ Background task processing library for Rust. It uses Postgres DB as a task queue. ## Features -- Asynk feature uses `tokio`. Workers are started in tokio tasks. -- Blocking feature uses `std::thread`. Workers are started in a separated threads. + + Here are some of the fang's key features: + + - Async and threaded workers + Workers can be started in threads (threaded workers) or `tokio` tasks (async workers) + - Scheduled tasks + Tasks can be scheduled at any time in the future + - Periodic (CRON) tasks + Tasks can be scheduled using cron expressions + - Unique tasks + Tasks are not duplicated in the queue if they are unique + - Single-purpose workers + Tasks are stored in a single table but workers can execute only tasks of the specific type + - Retries + Tasks can be retried with a custom backoff mode ## Installation 1. Add this to your Cargo.toml -#### Blocking feature +#### the Blocking feature ```toml [dependencies] fang = { version = "0.9" , features = ["blocking"], default-features = false } ``` -#### Asynk feature +#### the Asynk feature ```toml [dependencies] fang = { version = "0.9" , features = ["asynk"], default-features = false } @@ -34,14 +47,14 @@ fang = { version = "0.9" } *Supports rustc 1.62+* -2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2022-08-20-151615_create_fang_tasks/up.sql). +2. Create the `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2022-08-20-151615_create_fang_tasks/up.sql). ## Usage ### Defining a task #### Blocking feature -Every task should implement `fang::Runnable` trait which is used by `fang` to execute it. +Every task should implement the `fang::Runnable` trait which is used by `fang` to execute it. ```rust use fang::Error; @@ -64,38 +77,47 @@ impl Runnable for MyTask { Ok(()) } - // If you want to make the tasks of this type uniq. + // If `uniq` is set to true and the task is already in the storage, it won't be inserted again + // The existing record will be returned for for any insertions operaiton fn uniq(&self) -> bool { true } // This will be useful if you want to filter tasks. - // default value: "common".to_string() + // the default value is `common` fn task_type(&self) -> String { "my_task".to_string() } // This will be useful if you would like to schedule tasks. - // default value: None (task is not schedule just executes when it is fetched) + // default value is None (the task is not scheduled, it's just executed as soon as it's inserted) fn cron(&self) -> Option { - // sec min hour day of month month day of week year - // be careful works only with UTC hour. - // https://www.timeanddate.com/worldclock/timezone/utc let expression = "0/20 * * * Aug-Sep * 2022/1"; Some(Scheduled::CronPattern(expression.to_string())) } + + // the maximum number of retries. Set it to 0 to make it not retriable + // the default value is 20 + fn max_retries(&self) -> i32 { + 20 + } + + // backoff mode for retries + fn backoff(&self, attempt: u32) -> u32 { + u32::pow(2, attempt) + } } ``` As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task. -The second parameter of the `run` function is a is an struct that implements fang::Queueable (fang::Queue for example), You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it. +The second parameter of the `run` function is a struct that implements `fang::Queueable`. You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it. #### Asynk feature Every task should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it. -Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with typetag. +Be careful not to call two implementations of the AsyncRunnable trait with the same name, because it will cause a failure in the `typetag` crate. ```rust use fang::AsyncRunnable; use fang::asynk::async_queue::AsyncQueueable; @@ -114,37 +136,46 @@ impl AsyncRunnable for AsyncTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) } - // this func is optional to impl - // Default task-type it is common + // this func is optional + // Default task_type is common fn task_type(&self) -> String { "my-task-type".to_string() } - // If you want to make the tasks of this type uniq. + // If `uniq` is set to true and the task is already in the storage, it won't be inserted again + // The existing record will be returned for for any insertions operaiton fn uniq(&self) -> bool { true } // This will be useful if you would like to schedule tasks. - // default value: None (task is not schedule just executes when it is fetched) + // default value is None (the task is not scheduled, it's just executed as soon as it's inserted) fn cron(&self) -> Option { - // sec min hour day of month month day of week year - // be careful works only with UTC hour. - // https://www.timeanddate.com/worldclock/timezone/utc let expression = "0/20 * * * Aug-Sep * 2022/1"; Some(Scheduled::CronPattern(expression.to_string())) } + + // the maximum number of retries. Set it to 0 to make it not retriable + // the default value is 20 + fn max_retries(&self) -> i32 { + 20 + } + + // backoff mode for retries + fn backoff(&self, attempt: u32) -> u32 { + u32::pow(2, attempt) + } } ``` -In both modules, tasks can be schedule to be execute once. Use ```Scheduled::ScheduleOnce``` enum variant to schedule in specific datetime. +In both modules, tasks can be scheduled to be executed once. Use `Scheduled::ScheduleOnce` enum variant. -Datetimes and cron pattern are interpreted in UTC timezone. So you should introduce an offset to schedule in the desire hour. +Datetimes and cron patterns are interpreted in the UTC timezone. So you should introduce the offset to schedule in a different timezone. Example: -If your hour is UTC + 2 and you would like to schedule at 11:00 all days, your expression will be this one. +If your timezone is UTC + 2 and you want to schedule at 11:00: ```rust let expression = "0 0 9 * * * *"; @@ -153,11 +184,9 @@ If your hour is UTC + 2 and you would like to schedule at 11:00 all days, your e ### Enqueuing a task -#### Blocking feature +#### the Blocking feature To enqueue a task use `Queue::enqueue_task` - -For Postgres Backend. ```rust use fang::Queue; @@ -171,13 +200,8 @@ use fang::Queue; ``` -`Queue::insert_task` method will insert a task with uniqueness or not it depends on `uniq` method defined in a task. -If uniq is set to true and the task is already in storage this will return the task in the storage. - - -#### Asynk feature -To enqueue a task use `AsyncQueueable::insert_task`, -depending of the backend that you prefer you will need to do it with a specific queue. +#### the Asynk feature +To enqueue a task use `AsyncQueueable::insert_task`. For Postgres backend. ```rust @@ -185,7 +209,7 @@ use fang::asynk::async_queue::AsyncQueue; use fang::NoTls; use fang::AsyncRunnable; -// Create a AsyncQueue +// Create an AsyncQueue let max_pool_size: u32 = 2; let mut queue = AsyncQueue::builder() @@ -199,14 +223,10 @@ let mut queue = AsyncQueue::builder() queue.connect(NoTls).await.unwrap(); ``` -For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic. - -You can implement a Tls type. - -It is well documented for [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) and [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/) +As an easy example, we are using NoTls type. If for some reason you would like to encrypt Postgres requests, you can use [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) or [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/). ```rust -// AsyncTask from first example +// AsyncTask from the first example let task = AsyncTask { 8 }; let task_returned = queue .insert_task(&task as &dyn AsyncRunnable) @@ -216,7 +236,7 @@ let task_returned = queue ### Starting workers -#### Blocking feature +#### the Blocking feature Every worker runs in a separate thread. In case of panic, they are always restarted. Use `WorkerPool` to start workers. Use `WorkerPool::builder` to create your worker pool and run tasks. @@ -238,8 +258,8 @@ let mut worker_pool = WorkerPool::::builder() worker_pool.start(); ``` -#### Asynk feature -Every worker runs in a separate tokio task. In case of panic, they are always restarted. +#### the Asynk feature +Every worker runs in a separate `tokio` task. In case of panic, they are always restarted. Use `AsyncWorkerPool` to start workers. ```rust @@ -265,18 +285,18 @@ Check out: - [Simple Cron Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_cron_worker) - simple worker example - [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_async_worker) - simple async worker example - [Simple Cron Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_cron_async_worker) - simple async worker example -- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang blocking module to synchronize feeds and deliver updates to users. -- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses Fang asynk module to process updates from Telegram users and schedule weather info. +- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses the Fang's blocking module to synchronize feeds and deliver updates to users. +- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses the Fang's asynk module to process updates from Telegram users and schedule weather info. ### Configuration #### Blocking feature -Just use `TypeBuilder` done for `WorkerPool`. +Just use `TypeBuilder` for `WorkerPool`. #### Asynk feature -Just use `TypeBuilder` done for `AsyncWorkerPool`. +Just use `TypeBuilder` for `AsyncWorkerPool`. ### Configuring the type of workers @@ -300,7 +320,7 @@ Set retention mode with worker pools `TypeBuilder` in both modules. #### Blocking feature -You can use use `SleepParams` to confugure sleep values: +You can use use `SleepParams` to configure sleep values: ```rust pub struct SleepParams { @@ -332,14 +352,14 @@ Set sleep params with worker pools `TypeBuilder` in both modules. 2. Create your feature branch (`git checkout -b my-new-feature`) 3. Commit your changes (`git commit -am 'Add some feature'`) 4. Push to the branch (`git push origin my-new-feature`) -5. Create new Pull Request +5. Create a new Pull Request ### Running tests locally - Install diesel_cli. ``` cargo install diesel_cli ``` -- Install docker in your machine. +- Install docker on your machine. - Run a Postgres docker container. (See in Makefile.) ``` @@ -361,7 +381,7 @@ make tests make ignored ``` -- Kill docker container +- Kill the docker container ``` make stop ``` diff --git a/docs/content/_index.md b/docs/content/_index.md index 16c97c3..9b3b262 100644 --- a/docs/content/_index.md +++ b/docs/content/_index.md @@ -51,3 +51,8 @@ content = 'Tasks are not duplicated in the queue if they are unique' title = "Single-purpose workers" content = 'Tasks are stored in a single table but workers can execute only tasks of the specific type' +++ + +[[extra.list]] +title = "Retries" +content = 'Tasks can be retried with a custom backoff mode' ++++ diff --git a/src/lib.rs b/src/lib.rs index edb2b18..aac3f44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ #![doc = include_str!("../README.md")] -#![allow(clippy::extra_unused_lifetimes)] use std::time::Duration; use thiserror::Error; @@ -30,6 +29,7 @@ impl Default for RetentionMode { RetentionMode::RemoveAll } } + #[derive(Clone, Debug)] pub struct SleepParams { pub sleep_period: Duration,