Handle imports and dependencies (#56)

* imports, exports and README

* serde_json not necessay

* deleting tokio_postgres

* authors

* blocking example

* Update fang_examples/simple_async_worker/src/lib.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
Pmarquez 2022-08-03 08:37:53 +00:00 committed by GitHub
parent 140b19e6e4
commit cf2ce19c97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 193 additions and 24 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "fang" name = "fang"
version = "0.6.0" version = "0.6.0"
authors = ["Ayrat Badykov <ayratin555@gmail.com>"] authors = ["Ayrat Badykov <ayratin555@gmail.com>" , "Pepe Márquez <pepe.marquezromero@gmail.com>"]
description = "Background job processing library for Rust" description = "Background job processing library for Rust"
repository = "https://github.com/ayrat555/fang" repository = "https://github.com/ayrat555/fang"
edition = "2021" edition = "2021"
@ -19,7 +19,8 @@ asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-build
[dependencies] [dependencies]
chrono = "0.4" chrono = "0.4"
log = "0.4" log = "0.4"
serde = { version = "1", features = ["derive"] } serde = "1"
serde_derive = "1.0.141"
serde_json = "1" serde_json = "1"
thiserror = "1.0" thiserror = "1.0"
typetag = "0.2" typetag = "0.2"
@ -69,4 +70,4 @@ version = "1"
optional = true optional = true
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.20", features = ["macros"] } tokio = { version = "1.20", features = ["macros"] }

177
README.md
View file

@ -12,12 +12,17 @@ Background task processing library for Rust. It uses Postgres DB as a task queue
1. Add this to your Cargo.toml 1. Add this to your Cargo.toml
### Blocking
```toml ```toml
[dependencies] [dependencies]
fang = "0.6" fang = { version = "0.6" , features = ["blocking"]}
serde = { version = "1.0", features = ["derive"] }
``` ```
### Async
```toml
[dependencies]
fang = { version = "0.6" , features = ["asynk"]}
```
*Supports rustc 1.62+* *Supports rustc 1.62+*
2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql). 2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql).
@ -26,16 +31,18 @@ serde = { version = "1.0", features = ["derive"] }
### Defining a task ### Defining a task
Every task should implement `fang::Runnable` trait which is used by `fang` to execute it. #### Blocking
Every task in blocking should implement `fang::Runnable` trait which is used by `fang` to execute it.
```rust ```rust
use fang::Error; use fang::Error;
use fang::Runnable; use fang::Runnable;
use fang::typetag; use fang::typetag;
use fang::PgConnection; use fang::PgConnection;
use serde::{Deserialize, Serialize}; use fang::serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct MyTask { struct MyTask {
pub number: u16, pub number: u16,
} }
@ -54,9 +61,40 @@ As you can see from the example above, the trait implementation has `#[typetag::
The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it. The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it.
#### Async
Every task in async should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it.
Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with serde.
```rust
use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
// this func is optional to impl
// Default task-type it is common
fn task_type(&self) -> String {
"my-task-type".to_string()
}
}
```
### Enqueuing a task ### Enqueuing a task
To enqueue a task use `Queue::enqueue_task` #### Blocking
To enqueue a task in blocking use `Queue::enqueue_task`
```rust ```rust
@ -85,11 +123,53 @@ Or you can use `PgConnection` struct:
Queue::push_task_query(pg_connection, &new_task).unwrap(); Queue::push_task_query(pg_connection, &new_task).unwrap();
``` ```
#### Async
To enqueue a task in async use `AsyncQueueable::insert_task`
depending of the backend that you prefer you will need to do it with a specific queue.
For Postgres backend.
```rust
use fang::asynk::async_queue::AsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;
// Create a AsyncQueue
let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
// Postgres database url
.uri("postgres://postgres:postgres@localhost/fang")
// Max number of connections that are allowed
.max_pool_size(max_pool_size)
// false if would like Uniqueness in tasks
.duplicated_tasks(true)
.build();
// Always connect first in order to perform any operation
queue.connect(NoTls).await.unwrap();
```
For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic.
You can implement a Tls type.
It is well documented for [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) and [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/)
```rust
// AsyncTask from first example
let task = AsyncTask { 8 };
let task_returned = queue
.insert_task(&task as &dyn AsyncRunnable)
.await
.unwrap();
```
### Starting workers ### Starting workers
#### Blocking
Every worker runs in a separate thread. In case of panic, they are always restarted. Every worker runs in a separate thread. In case of panic, they are always restarted.
Use `WorkerPool` to start workers. `WorkerPool::new` accepts one parameter - the number of workers. Use `WorkerPool` for blocking to start workers. `WorkerPool::new` accepts one parameter - the number of workers.
```rust ```rust
@ -112,17 +192,45 @@ worker_pool.shutdown()
Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
Simple Worker for an example implementation. Simple Worker for an example implementation.
#### Async
Every worker runs in a separate tokio thread. In case of panic, they are always restarted.
Use `AsyncWorkerPool` for async to start workers.
```rust
use fang::asynk::async_worker_pool::AsyncWorkerPool;
// Need to create a queue like before
// Also insert some tasks
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size)
.queue(queue.clone())
.build();
pool.start().await;
```
Check out: Check out:
- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example - [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example
- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_async_worker) - simple async worker example
- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users. - [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users.
### Configuration ### Configuration
#### Blocking
To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct. To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct.
#### Async
Just use `TypeBuilder` done for `AsyncWorkerPool`.
### Configuring the type of workers ### Configuring the type of workers
#### Blocking
You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type. You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.
Add `task_type` method to the `Runnable` trait implementation: Add `task_type` method to the `Runnable` trait implementation:
@ -156,6 +264,12 @@ WorkerPool::new_with_params(10, worker_params).start();
Without setting `task_type` workers will be executing any type of task. Without setting `task_type` workers will be executing any type of task.
#### Async
Same as Blocking.
Use `TypeBuilder` for `AsyncWorker`.
### Configuring retention mode ### Configuring retention mode
By default, all successfully finished tasks are removed from the DB, failed tasks aren't. By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
@ -172,12 +286,17 @@ pub enum RetentionMode {
Set retention mode with `set_retention_mode`: Set retention mode with `set_retention_mode`:
#### Blocking
```rust ```rust
let mut worker_params = WorkerParams::new(); let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll); worker_params.set_retention_mode(RetentionMode::RemoveAll);
WorkerPool::new_with_params(10, worker_params).start(); WorkerPool::new_with_params(10, worker_params).start();
``` ```
#### Async
Set it in `AsyncWorker` `TypeBuilder`.
### Configuring sleep values ### Configuring sleep values
@ -189,7 +308,7 @@ pub struct SleepParams {
pub max_sleep_period: u64, \\ default value is 15 pub max_sleep_period: u64, \\ default value is 15
pub min_sleep_period: u64, \\ default value is 5 pub min_sleep_period: u64, \\ default value is 5
pub sleep_step: u64, \\ default value is 5 pub sleep_step: u64, \\ default value is 5
}p }
``` ```
If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds. If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds.
@ -208,6 +327,9 @@ worker_params.set_sleep_params(sleep_params);
WorkerPool::new_with_params(10, worker_params).start(); WorkerPool::new_with_params(10, worker_params).start();
``` ```
#### Async
Set it in `AsyncWorker` `TypeBuilder`.
## Periodic Tasks ## Periodic Tasks
@ -215,6 +337,8 @@ Fang can add tasks to `fang_tasks` periodically. To use this feature first run [
Usage example: Usage example:
#### Blocking
```rust ```rust
use fang::Scheduler; use fang::Scheduler;
use fang::Queue; use fang::Queue;
@ -238,6 +362,39 @@ In the example above, `push_periodic_task` is used to save the specified task to
- Db check period in seconds - Db check period in seconds
- Acceptable error limit in seconds - |current_time - scheduled_time| < error - Acceptable error limit in seconds - |current_time - scheduled_time| < error
#### Async
```rust
use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;
// Build a AsyncQueue as before
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = queue.insert_periodic_task(
&AsyncTask { number: 1 },
schedule_in_future,
10,
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder()
.check_period(check_period)
.error_margin_seconds(error_margin_seconds)
.queue(&mut queue as &mut dyn AsyncQueueable)
.build();
// Add some more task in other thread or before loop
// Scheduler Loop
scheduler.start().await.unwrap();
```
## Contributing ## Contributing
1. [Fork it!](https://github.com/ayrat555/fang/fork) 1. [Fork it!](https://github.com/ayrat555/fang/fork)
@ -268,9 +425,11 @@ cargo test --all-features -- --ignored --test-threads=1
docker kill postgres docker kill postgres
``` ```
## Author ## Authors
Ayrat Badykov (@ayrat555) - Ayrat Badykov (@ayrat555)
- Pepe Márquez (@pxp9)
[s1]: https://img.shields.io/crates/v/fang.svg [s1]: https://img.shields.io/crates/v/fang.svg

View file

@ -9,8 +9,4 @@ edition = "2021"
fang = { path = "../../" , features = ["asynk"]} fang = { path = "../../" , features = ["asynk"]}
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.0" log = "0.4.0"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
serde_json = "1"
async-trait = "0.1"
tokio-postgres = "0.7"

View file

@ -1,13 +1,13 @@
use async_trait::async_trait; use fang::async_trait;
use fang::asynk::async_queue::AsyncQueueable; use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_runnable::Error; use fang::asynk::async_runnable::Error;
use fang::serde::{Deserialize, Serialize};
use fang::typetag; use fang::typetag;
use fang::AsyncRunnable; use fang::AsyncRunnable;
use serde::Deserialize;
use serde::Serialize;
use std::time::Duration; use std::time::Duration;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyTask { pub struct MyTask {
pub number: u16, pub number: u16,
} }
@ -22,7 +22,7 @@ impl MyTask {
#[typetag::serde] #[typetag::serde]
impl AsyncRunnable for MyTask { impl AsyncRunnable for MyTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
log::info!("the curreny number is {}", self.number); log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
let new_task = MyTask::new(self.number + 1); let new_task = MyTask::new(self.number + 1);

View file

@ -2,9 +2,9 @@ use fang::asynk::async_queue::AsyncQueue;
use fang::asynk::async_queue::AsyncQueueable; use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_worker_pool::AsyncWorkerPool; use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::AsyncRunnable; use fang::AsyncRunnable;
use fang::NoTls;
use simple_async_worker::MyTask; use simple_async_worker::MyTask;
use std::time::Duration; use std::time::Duration;
use tokio_postgres::NoTls;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {

View file

@ -7,7 +7,6 @@ edition = "2018"
[dependencies] [dependencies]
fang = { path = "../../" , features = ["blocking"]} fang = { path = "../../" , features = ["blocking"]}
serde = { version = "1.0", features = ["derive"] }
signal-hook = "0.3.10" signal-hook = "0.3.10"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.9.0" env_logger = "0.9.0"

View file

@ -1,14 +1,14 @@
use fang::serde::{Deserialize, Serialize};
use fang::typetag; use fang::typetag;
use fang::Error; use fang::Error;
use fang::PgConnection; use fang::PgConnection;
use fang::Queue; use fang::Queue;
use fang::Runnable; use fang::Runnable;
use serde::Deserialize;
use serde::Serialize;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyJob { pub struct MyJob {
pub number: u16, pub number: u16,
pub current_thread_name: String, pub current_thread_name: String,

View file

@ -52,6 +52,12 @@ pub use diesel::pg::PgConnection;
#[doc(hidden)] #[doc(hidden)]
pub use typetag; pub use typetag;
#[doc(hidden)]
pub extern crate serde;
#[doc(hidden)]
pub use serde_derive::{Deserialize, Serialize};
#[cfg(feature = "blocking")] #[cfg(feature = "blocking")]
pub mod blocking; pub mod blocking;
#[cfg(feature = "blocking")] #[cfg(feature = "blocking")]
@ -62,3 +68,11 @@ pub mod asynk;
#[cfg(feature = "asynk")] #[cfg(feature = "asynk")]
pub use asynk::*; pub use asynk::*;
#[cfg(feature = "asynk")]
#[doc(hidden)]
pub use bb8_postgres::tokio_postgres::tls::NoTls;
#[cfg(feature = "asynk")]
#[doc(hidden)]
pub use async_trait::async_trait;