update README and CHANGELOG, bump version (#79)

* README and CHANGELOG

* FUNDING

* bump version

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
Pmarquez 2022-09-03 08:22:12 +00:00 committed by Ayrat Badykov
parent 2b0eb0627d
commit 7fb4423bf5
No known key found for this signature in database
GPG key ID: EC1148A46811EFB5
7 changed files with 132 additions and 188 deletions

4
.github/FUNDING.yml vendored
View file

@ -1,4 +1,4 @@
# These are supported funding model platforms # These are supported funding model platforms
github: [ayrat555] github: [ayrat555, pxp9]
custom: ["https://paypal.me/AyratBadykov"] custom: ["https://paypal.me/AyratBadykov", "https://paypal.me/PMR9"]

View file

@ -1,5 +1,22 @@
# Changelog # Changelog
## 0.9.0 (2022-09-2)
### [#69](https://github.com/ayrat555/fang/pull/69)
### Added
- Added cron support for scheduled tasks.
### Improved
- Major refactoring of the blocking module.
- Delete the graceful shutdown in blocking module.
- Simplify database schema.
- Re-write scheduled tasks in asynk module.
- Re-write the errors in both modules.
- Update diesel crate to 2.0 version.
- Update uuid crate to 1.1 version.
## 0.8.0 (2022-08-18) ## 0.8.0 (2022-08-18)
- Use Duration in SleepParams and schedulers - [#67](https://github.com/ayrat555/fang/pull/67) - Use Duration in SleepParams and schedulers - [#67](https://github.com/ayrat555/fang/pull/67)

View file

@ -1,6 +1,6 @@
[package] [package]
name = "fang" name = "fang"
version = "0.8.0" version = "0.9.0"
authors = ["Ayrat Badykov <ayratin555@gmail.com>" , "Pepe Márquez <pepe.marquezromero@gmail.com>"] authors = ["Ayrat Badykov <ayratin555@gmail.com>" , "Pepe Márquez <pepe.marquezromero@gmail.com>"]
description = "Background job processing library for Rust" description = "Background job processing library for Rust"
repository = "https://github.com/ayrat555/fang" repository = "https://github.com/ayrat555/fang"

293
README.md
View file

@ -18,23 +18,23 @@ Background task processing library for Rust. It uses Postgres DB as a task queue
#### Blocking feature #### Blocking feature
```toml ```toml
[dependencies] [dependencies]
fang = { version = "0.8" , features = ["blocking"], default-features = false } fang = { version = "0.9" , features = ["blocking"], default-features = false }
``` ```
#### Asynk feature #### Asynk feature
```toml ```toml
[dependencies] [dependencies]
fang = { version = "0.8" , features = ["asynk"], default-features = false } fang = { version = "0.9" , features = ["asynk"], default-features = false }
``` ```
#### Both features #### Both features
```toml ```toml
fang = { version = "0.8" } fang = { version = "0.9" }
``` ```
*Supports rustc 1.62+* *Supports rustc 1.62+*
2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql). 2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2022-08-20-151615_create_fang_tasks/up.sql).
## Usage ## Usage
@ -58,17 +58,38 @@ struct MyTask {
#[typetag::serde] #[typetag::serde]
impl Runnable for MyTask { impl Runnable for MyTask {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> { fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
println!("the number is {}", self.number); println!("the number is {}", self.number);
Ok(()) Ok(())
} }
// If you want to make the tasks of this type uniq.
fn uniq(&self) -> bool {
true
}
// This will be useful if you want to filter tasks.
// default value: "common".to_string()
fn task_type(&self) -> String {
"my_task".to_string()
}
// This will be useful if you would like to schedule tasks.
// default value: None (task is not schedule just executes when it is fetched)
fn cron(&self) -> Option<Scheduled> {
// sec min hour day of month month day of week year
// be careful works only with UTC hour.
// https://www.timeanddate.com/worldclock/timezone/utc
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
} }
``` ```
As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task. As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task.
The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it. The second parameter of the `run` function is a is an struct that implements fang::Queueable (fang::Queue for example), You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it.
#### Asynk feature #### Asynk feature
@ -98,39 +119,61 @@ impl AsyncRunnable for AsyncTask {
fn task_type(&self) -> String { fn task_type(&self) -> String {
"my-task-type".to_string() "my-task-type".to_string()
} }
// If you want to make the tasks of this type uniq.
fn uniq(&self) -> bool {
true
}
// This will be useful if you would like to schedule tasks.
// default value: None (task is not schedule just executes when it is fetched)
fn cron(&self) -> Option<Scheduled> {
// sec min hour day of month month day of week year
// be careful works only with UTC hour.
// https://www.timeanddate.com/worldclock/timezone/utc
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
} }
``` ```
In both modules, tasks can be schedule to be execute once. Use ```Scheduled::ScheduleOnce``` enum variant to schedule in specific datetime.
Datetimes and cron pattern are interpreted in UTC timezone. So you should introduce an offset to schedule in the desire hour.
Example:
If your hour is UTC + 2 and you would like to schedule at 11:00 all days, your expression will be this one.
```rust
let expression = "0 0 9 * * * *";
```
### Enqueuing a task ### Enqueuing a task
#### Blocking feature #### Blocking feature
To enqueue a task use `Queue::enqueue_task` To enqueue a task use `Queue::enqueue_task`
For Postgres Backend.
```rust ```rust
use fang::Queue; use fang::Queue;
... // create a r2d2 pool
Queue::enqueue_task(&MyTask { number: 10 }).unwrap(); // create a fang queue
let queue = Queue::builder().connection_pool(pool).build();
let task_inserted = queue.insert_task(&MyTask::new(1)).unwrap();
``` ```
The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several tasks use Postgres struct instance: `Queue::insert_task` method will insert a task with uniqueness or not it depends on `uniq` method defined in a task.
If uniq is set to true and the task is already in storage this will return the task in the storage.
```rust
let queue = Queue::new();
for id in &unsynced_feed_ids {
queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
}
```
Or you can use `PgConnection` struct:
```rust
Queue::push_task_query(pg_connection, &new_task).unwrap();
```
#### Asynk feature #### Asynk feature
To enqueue a task use `AsyncQueueable::insert_task`, To enqueue a task use `AsyncQueueable::insert_task`,
@ -150,8 +193,6 @@ let mut queue = AsyncQueue::builder()
.uri("postgres://postgres:postgres@localhost/fang") .uri("postgres://postgres:postgres@localhost/fang")
// Max number of connections that are allowed // Max number of connections that are allowed
.max_pool_size(max_pool_size) .max_pool_size(max_pool_size)
// false if would like Uniqueness in tasks
.duplicated_tasks(true)
.build(); .build();
// Always connect first in order to perform any operation // Always connect first in order to perform any operation
@ -178,29 +219,25 @@ let task_returned = queue
#### Blocking feature #### Blocking feature
Every worker runs in a separate thread. In case of panic, they are always restarted. Every worker runs in a separate thread. In case of panic, they are always restarted.
Use `WorkerPool` to start workers. `WorkerPool::new` accepts one parameter - the number of workers. Use `WorkerPool` to start workers. Use `WorkerPool::builder` to create your worker pool and run tasks.
```rust ```rust
use fang::WorkerPool; use fang::WorkerPool;
use fang::Queue;
WorkerPool::new(10).start(); // create a Queue
let mut worker_pool = WorkerPool::<Queue>::builder()
.queue(queue)
.number_of_workers(3_u32)
// if you want to run tasks of the specific kind
.task_type("my_task_type")
.build();
worker_pool.start();
``` ```
Use `shutdown` to stop worker threads, they will try to finish in-progress tasks.
```rust
use fang::WorkerPool;
worker_pool = WorkerPool::new(10).start().unwrap;
worker_pool.shutdown()
```
Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
Simple Worker for an example implementation.
#### Asynk feature #### Asynk feature
Every worker runs in a separate tokio task. In case of panic, they are always restarted. Every worker runs in a separate tokio task. In case of panic, they are always restarted.
Use `AsyncWorkerPool` to start workers. Use `AsyncWorkerPool` to start workers.
@ -214,6 +251,8 @@ use fang::asynk::async_worker_pool::AsyncWorkerPool;
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder() let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size) .number_of_workers(max_pool_size)
.queue(queue.clone()) .queue(queue.clone())
// if you want to run tasks of the specific kind
.task_type("my_task_type")
.build(); .build();
pool.start().await; pool.start().await;
@ -222,16 +261,18 @@ pool.start().await;
Check out: Check out:
- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example - [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_worker) - simple worker example
- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_async_worker) - simple async worker example - [Simple Cron Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_cron_worker) - simple worker example
- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_async_worker) - simple async worker example
- [Simple Cron Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_cron_async_worker) - simple async worker example
- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang blocking module to synchronize feeds and deliver updates to users. - [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang blocking module to synchronize feeds and deliver updates to users.
- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses Fang asynk module to process updates from Telegram users. - [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses Fang asynk module to process updates from Telegram users and schedule weather info.
### Configuration ### Configuration
#### Blocking feature #### Blocking feature
To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct. Just use `TypeBuilder` done for `WorkerPool`.
#### Asynk feature #### Asynk feature
@ -239,47 +280,6 @@ Just use `TypeBuilder` done for `AsyncWorkerPool`.
### Configuring the type of workers ### Configuring the type of workers
#### Blocking feature
You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.
Add `task_type` method to the `Runnable` trait implementation:
```rust
...
#[typetag::serde]
impl Runnable for MyTask {
fn run(&self) -> Result<(), Error> {
println!("the number is {}", self.number);
Ok(())
}
fn task_type(&self) -> String {
"number".to_string()
}
}
```
Set `task_type` to the `WorkerParamas`:
```rust
let mut worker_params = WorkerParams::new();
worker_params.set_task_type("number".to_string());
WorkerPool::new_with_params(10, worker_params).start();
```
Without setting `task_type` workers will be executing any type of task.
#### Asynk feature
Same as Blocking feature.
Use `TypeBuilder` for `AsyncWorker`.
### Configuring retention mode ### Configuring retention mode
By default, all successfully finished tasks are removed from the DB, failed tasks aren't. By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
@ -294,19 +294,7 @@ pub enum RetentionMode {
} }
``` ```
Set retention mode with `set_retention_mode`: Set retention mode with worker pools `TypeBuilder` in both modules.
#### Blocking feature
```rust
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);
WorkerPool::new_with_params(10, worker_params).start();
```
#### Asynk feature
Set it in `AsyncWorker` `TypeBuilder`.
### Configuring sleep values ### Configuring sleep values
@ -334,80 +322,9 @@ let sleep_params = SleepParams {
min_sleep_period: Duration::from_secs(2), min_sleep_period: Duration::from_secs(2),
sleep_step: Duration::from_secs(1), sleep_step: Duration::from_secs(1),
}; };
let mut worker_params = WorkerParams::new();
worker_params.set_sleep_params(sleep_params);
WorkerPool::new_with_params(10, worker_params).start();
```
#### Asynk feature
Set it in `AsyncWorker` `TypeBuilder`.
## Periodic Tasks
Fang can add tasks to `fang_tasks` periodically. To use this feature first run [the migration with `fang_periodic_tasks` table](https://github.com/ayrat555/fang/tree/master/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql).
Usage example:
#### Blocking feature
```rust
use fang::Scheduler;
use fang::Queue;
let queue = Queue::new();
queue
.push_periodic_task(&SyncMyTask::default(), 120000)
.unwrap();
queue
.push_periodic_task(&DeliverMyTask::default(), 60000)
.unwrap();
Scheduler::start(Duration::from_secs(10), Duration::from_secs(5));
```
In the example above, `push_periodic_task` is used to save the specified task to the `fang_periodic_tasks` table which will be enqueued (saved to `fang_tasks` table) every specied number of milliseconds.
`Scheduler::start(Duration::from_secs(10), Duration::from_secs(5))` starts scheduler. It accepts two parameters:
- Db check period
- Acceptable error limit - |current_time - scheduled_time| < error
#### Asynk feature
```rust
use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;
use std::time::Duration;
use chrono::Duration as OtherDuration;
// Build a AsyncQueue as before
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = queue.insert_periodic_task(
&AsyncTask { number: 1 },
schedule_in_future,
10000, // period in milliseconds
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder()
.check_period(Duration::from_secs(check_period))
.error_margin_seconds(Duration::from_secs(error_margin_seconds))
.queue(&mut queue as &mut dyn AsyncQueueable)
.build();
// Add some more task in other thread or before loop
// Scheduler Loop
scheduler.start().await.unwrap();
``` ```
Set sleep params with worker pools `TypeBuilder` in both modules.
## Contributing ## Contributing
@ -418,25 +335,35 @@ scheduler.start().await.unwrap();
5. Create new Pull Request 5. Create new Pull Request
### Running tests locally ### Running tests locally
- Install diesel_cli.
``` ```
cargo install diesel_cli cargo install diesel_cli
```
- Install docker in your machine.
docker run --rm -d --name postgres -p 5432:5432 \ - Run a Postgres docker container. (See in Makefile.)
-e POSTGRES_DB=fang \ ```
-e POSTGRES_USER=postgres \ make db
-e POSTGRES_PASSWORD=postgres \ ```
postgres:latest
DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run - Run the migrations
```
make diesel
```
// Run regular tests - Run tests
cargo test --all-features ```
make tests
```
// Run dirty/long tests, DB must be recreated afterwards - Run dirty//long tests, DB must be recreated afterwards.
cargo test --all-features -- --ignored --test-threads=1 ```
make ignored
```
docker kill postgres - Kill docker container
```
make stop
``` ```
## Authors ## Authors

View file

@ -1,5 +1,5 @@
[package] [package]
name = "simple_async_cron_worker" name = "simple_cron_async_worker"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"

View file

@ -3,7 +3,7 @@ use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_worker_pool::AsyncWorkerPool; use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::AsyncRunnable; use fang::AsyncRunnable;
use fang::NoTls; use fang::NoTls;
use simple_async_cron_worker::MyCronTask; use simple_cron_async_worker::MyCronTask;
use std::time::Duration; use std::time::Duration;
#[tokio::main] #[tokio::main]