Merge pull request #21 from ayrat555/ayrat555/async

Async processing
This commit is contained in:
Ayrat Badykov 2022-08-04 18:27:15 +03:00 committed by GitHub
commit 28da86e878
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 2263 additions and 99 deletions

3
.gitignore vendored
View file

@ -1,2 +1,3 @@
/target **/target
Cargo.lock Cargo.lock
src/schema.rs

View file

@ -1,24 +1,70 @@
[package] [package]
name = "fang" name = "fang"
version = "0.6.0" version = "0.6.0"
authors = ["Ayrat Badykov <ayratin555@gmail.com>"] authors = ["Ayrat Badykov <ayratin555@gmail.com>" , "Pepe Márquez <pepe.marquezromero@gmail.com>"]
description = "Background job processing library for Rust" description = "Background job processing library for Rust"
repository = "https://github.com/ayrat555/fang" repository = "https://github.com/ayrat555/fang"
edition = "2018" edition = "2021"
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
rust-version = "1.62" rust-version = "1.62"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["blocking", "asynk"]
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"]
[dependencies] [dependencies]
diesel = { version = "1.4", features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"] }
diesel-derive-enum = { version = "1", features = ["postgres"] }
dotenv = "0.15"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4" chrono = "0.4"
serde_json = "1"
typetag = "0.2"
log = "0.4" log = "0.4"
serde = { version = "1", features = ["derive"] } serde = "1"
serde_derive = "1.0.141"
serde_json = "1"
thiserror = "1.0" thiserror = "1.0"
typetag = "0.2"
uuid = { version = "0.8", features = ["v4"] }
[dependencies.diesel]
version = "1.4"
features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"]
optional = true
[dependencies.diesel-derive-enum]
version = "1"
features = ["postgres"]
optional = true
[dependencies.dotenv]
version = "0.15"
optional = true
[dependencies.bb8-postgres]
version = "0.8"
features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"]
optional = true
[dependencies.postgres-types]
version = "0.X.X"
features = ["derive"]
optional = true
[dependencies.tokio]
version = "1.20"
features = ["rt", "time", "macros"]
optional = true
[dependencies.async-trait]
version = "0.1"
optional = true
[dependencies.typed-builder]
version = "0.10"
optional = true
[dependencies.async-recursion]
version = "1"
optional = true

205
README.md
View file

@ -4,7 +4,7 @@
# Fang # Fang
Background job processing library for Rust. It uses Postgres DB as a task queue. Background task processing library for Rust. It uses Postgres DB as a task queue.
## Installation ## Installation
@ -12,36 +12,43 @@ Background job processing library for Rust. It uses Postgres DB as a task queue.
1. Add this to your Cargo.toml 1. Add this to your Cargo.toml
### Blocking
```toml ```toml
[dependencies] [dependencies]
fang = "0.6" fang = { version = "0.6" , features = ["blocking"]}
serde = { version = "1.0", features = ["derive"] }
``` ```
### Async
```toml
[dependencies]
fang = { version = "0.6" , features = ["asynk"]}
```
*Supports rustc 1.62+* *Supports rustc 1.62+*
2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql). 2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql).
## Usage ## Usage
### Defining a job ### Defining a task
Every job should implement `fang::Runnable` trait which is used by `fang` to execute it. #### Blocking
Every task in blocking should implement `fang::Runnable` trait which is used by `fang` to execute it.
```rust ```rust
use fang::Error; use fang::Error;
use fang::Runnable; use fang::Runnable;
use fang::typetag; use fang::typetag;
use fang::PgConnection; use fang::PgConnection;
use serde::{Deserialize, Serialize}; use fang::serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct Job { #[serde(crate = "fang::serde")]
struct MyTask {
pub number: u16, pub number: u16,
} }
#[typetag::serde] #[typetag::serde]
impl Runnable for Job { impl Runnable for MyTask {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> { fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
println!("the number is {}", self.number); println!("the number is {}", self.number);
@ -50,13 +57,44 @@ impl Runnable for Job {
} }
``` ```
As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the job. As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task.
The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the job queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it. The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it.
### Enqueuing a job
To enqueue a job use `Queue::enqueue_task` #### Async
Every task in async should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it.
Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with serde.
```rust
use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
// this func is optional to impl
// Default task-type it is common
fn task_type(&self) -> String {
"my-task-type".to_string()
}
}
```
### Enqueuing a task
#### Blocking
To enqueue a task in blocking use `Queue::enqueue_task`
```rust ```rust
@ -64,17 +102,17 @@ use fang::Queue;
... ...
Queue::enqueue_task(&Job { number: 10 }).unwrap(); Queue::enqueue_task(&MyTask { number: 10 }).unwrap();
``` ```
The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several jobs use Postgres struct instance: The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several tasks use Postgres struct instance:
```rust ```rust
let queue = Queue::new(); let queue = Queue::new();
for id in &unsynced_feed_ids { for id in &unsynced_feed_ids {
queue.push_task(&SyncFeedJob { feed_id: *id }).unwrap(); queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
} }
``` ```
@ -82,14 +120,56 @@ for id in &unsynced_feed_ids {
Or you can use `PgConnection` struct: Or you can use `PgConnection` struct:
```rust ```rust
Queue::push_task_query(pg_connection, &new_job).unwrap(); Queue::push_task_query(pg_connection, &new_task).unwrap();
```
#### Async
To enqueue a task in async use `AsyncQueueable::insert_task`
depending of the backend that you prefer you will need to do it with a specific queue.
For Postgres backend.
```rust
use fang::asynk::async_queue::AsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;
// Create a AsyncQueue
let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
// Postgres database url
.uri("postgres://postgres:postgres@localhost/fang")
// Max number of connections that are allowed
.max_pool_size(max_pool_size)
// false if would like Uniqueness in tasks
.duplicated_tasks(true)
.build();
// Always connect first in order to perform any operation
queue.connect(NoTls).await.unwrap();
```
For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic.
You can implement a Tls type.
It is well documented for [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) and [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/)
```rust
// AsyncTask from first example
let task = AsyncTask { 8 };
let task_returned = queue
.insert_task(&task as &dyn AsyncRunnable)
.await
.unwrap();
``` ```
### Starting workers ### Starting workers
#### Blocking
Every worker runs in a separate thread. In case of panic, they are always restarted. Every worker runs in a separate thread. In case of panic, they are always restarted.
Use `WorkerPool` to start workers. `WorkerPool::new` accepts one parameter - the number of workers. Use `WorkerPool` for blocking to start workers. `WorkerPool::new` accepts one parameter - the number of workers.
```rust ```rust
@ -112,17 +192,45 @@ worker_pool.shutdown()
Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
Simple Worker for an example implementation. Simple Worker for an example implementation.
#### Async
Every worker runs in a separate tokio thread. In case of panic, they are always restarted.
Use `AsyncWorkerPool` for async to start workers.
```rust
use fang::asynk::async_worker_pool::AsyncWorkerPool;
// Need to create a queue like before
// Also insert some tasks
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size)
.queue(queue.clone())
.build();
pool.start().await;
```
Check out: Check out:
- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example - [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example
- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_async_worker) - simple async worker example
- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users. - [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users.
### Configuration ### Configuration
#### Blocking
To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct. To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct.
#### Async
Just use `TypeBuilder` done for `AsyncWorkerPool`.
### Configuring the type of workers ### Configuring the type of workers
#### Blocking
You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type. You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.
Add `task_type` method to the `Runnable` trait implementation: Add `task_type` method to the `Runnable` trait implementation:
@ -131,7 +239,7 @@ Add `task_type` method to the `Runnable` trait implementation:
... ...
#[typetag::serde] #[typetag::serde]
impl Runnable for Job { impl Runnable for MyTask {
fn run(&self) -> Result<(), Error> { fn run(&self) -> Result<(), Error> {
println!("the number is {}", self.number); println!("the number is {}", self.number);
@ -156,6 +264,12 @@ WorkerPool::new_with_params(10, worker_params).start();
Without setting `task_type` workers will be executing any type of task. Without setting `task_type` workers will be executing any type of task.
#### Async
Same as Blocking.
Use `TypeBuilder` for `AsyncWorker`.
### Configuring retention mode ### Configuring retention mode
By default, all successfully finished tasks are removed from the DB, failed tasks aren't. By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
@ -172,12 +286,17 @@ pub enum RetentionMode {
Set retention mode with `set_retention_mode`: Set retention mode with `set_retention_mode`:
#### Blocking
```rust ```rust
let mut worker_params = WorkerParams::new(); let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll); worker_params.set_retention_mode(RetentionMode::RemoveAll);
WorkerPool::new_with_params(10, worker_params).start(); WorkerPool::new_with_params(10, worker_params).start();
``` ```
#### Async
Set it in `AsyncWorker` `TypeBuilder`.
### Configuring sleep values ### Configuring sleep values
@ -189,7 +308,7 @@ pub struct SleepParams {
pub max_sleep_period: u64, \\ default value is 15 pub max_sleep_period: u64, \\ default value is 15
pub min_sleep_period: u64, \\ default value is 5 pub min_sleep_period: u64, \\ default value is 5
pub sleep_step: u64, \\ default value is 5 pub sleep_step: u64, \\ default value is 5
}p }
``` ```
If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds. If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds.
@ -208,6 +327,9 @@ worker_params.set_sleep_params(sleep_params);
WorkerPool::new_with_params(10, worker_params).start(); WorkerPool::new_with_params(10, worker_params).start();
``` ```
#### Async
Set it in `AsyncWorker` `TypeBuilder`.
## Periodic Tasks ## Periodic Tasks
@ -215,6 +337,8 @@ Fang can add tasks to `fang_tasks` periodically. To use this feature first run [
Usage example: Usage example:
#### Blocking
```rust ```rust
use fang::Scheduler; use fang::Scheduler;
use fang::Queue; use fang::Queue;
@ -222,11 +346,11 @@ use fang::Queue;
let queue = Queue::new(); let queue = Queue::new();
queue queue
.push_periodic_task(&SyncJob::default(), 120) .push_periodic_task(&SyncMyTask::default(), 120)
.unwrap(); .unwrap();
queue queue
.push_periodic_task(&DeliverJob::default(), 60) .push_periodic_task(&DeliverMyTask::default(), 60)
.unwrap(); .unwrap();
Scheduler::start(10, 5); Scheduler::start(10, 5);
@ -238,6 +362,39 @@ In the example above, `push_periodic_task` is used to save the specified task to
- Db check period in seconds - Db check period in seconds
- Acceptable error limit in seconds - |current_time - scheduled_time| < error - Acceptable error limit in seconds - |current_time - scheduled_time| < error
#### Async
```rust
use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;
// Build a AsyncQueue as before
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = queue.insert_periodic_task(
&AsyncTask { number: 1 },
schedule_in_future,
10,
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder()
.check_period(check_period)
.error_margin_seconds(error_margin_seconds)
.queue(&mut queue as &mut dyn AsyncQueueable)
.build();
// Add some more task in other thread or before loop
// Scheduler Loop
scheduler.start().await.unwrap();
```
## Contributing ## Contributing
1. [Fork it!](https://github.com/ayrat555/fang/fork) 1. [Fork it!](https://github.com/ayrat555/fang/fork)
@ -268,9 +425,11 @@ cargo test --all-features -- --ignored --test-threads=1
docker kill postgres docker kill postgres
``` ```
## Author ## Authors
Ayrat Badykov (@ayrat555) - Ayrat Badykov (@ayrat555)
- Pepe Márquez (@pxp9)
[s1]: https://img.shields.io/crates/v/fang.svg [s1]: https://img.shields.io/crates/v/fang.svg

View file

@ -0,0 +1,12 @@
[package]
name = "simple_async_worker"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fang = { path = "../../" , features = ["asynk"]}
env_logger = "0.9.0"
log = "0.4.0"
tokio = { version = "1", features = ["full"] }

View file

@ -0,0 +1,71 @@
use fang::async_trait;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_runnable::Error;
use fang::serde::{Deserialize, Serialize};
use fang::typetag;
use fang::AsyncRunnable;
use std::time::Duration;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyTask {
pub number: u16,
}
impl MyTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyFailingTask {
pub number: u16,
}
impl MyFailingTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
let new_task = MyTask::new(self.number + 1);
queue
.insert_task(&new_task as &dyn AsyncRunnable)
.await
.unwrap();
log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(())
}
}
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyFailingTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
let new_task = MyFailingTask::new(self.number + 1);
queue
.insert_task(&new_task as &dyn AsyncRunnable)
.await
.unwrap();
log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
let b = true;
if b {
panic!("Hello!");
} else {
Ok(())
}
}
}

View file

@ -0,0 +1,54 @@
use fang::asynk::async_queue::AsyncQueue;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::AsyncRunnable;
use fang::NoTls;
use simple_async_worker::MyFailingTask;
use simple_async_worker::MyTask;
use std::time::Duration;
#[tokio::main]
async fn main() {
env_logger::init();
log::info!("Starting...");
let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
.uri("postgres://postgres:postgres@localhost/fang")
.max_pool_size(max_pool_size)
.duplicated_tasks(true)
.build();
queue.connect(NoTls).await.unwrap();
log::info!("Queue connected...");
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(10_u32)
.queue(queue.clone())
.build();
log::info!("Pool created ...");
pool.start().await;
log::info!("Workers started ...");
let task1 = MyTask::new(0);
let task2 = MyTask::new(20_000);
let task3 = MyFailingTask::new(50_000);
queue
.insert_task(&task1 as &dyn AsyncRunnable)
.await
.unwrap();
queue
.insert_task(&task2 as &dyn AsyncRunnable)
.await
.unwrap();
queue
.insert_task(&task3 as &dyn AsyncRunnable)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(100)).await;
}

View file

@ -6,8 +6,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
fang = { path = "../../" } fang = { path = "../../" , features = ["blocking"]}
serde = { version = "1.0", features = ["derive"] }
signal-hook = "0.3.10" signal-hook = "0.3.10"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.9.0" env_logger = "0.9.0"

View file

@ -1,14 +1,14 @@
use fang::serde::{Deserialize, Serialize};
use fang::typetag; use fang::typetag;
use fang::Error; use fang::Error;
use fang::PgConnection; use fang::PgConnection;
use fang::Queue; use fang::Queue;
use fang::Runnable; use fang::Runnable;
use serde::Deserialize;
use serde::Serialize;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyJob { pub struct MyJob {
pub number: u16, pub number: u16,
pub current_thread_name: String, pub current_thread_name: String,

993
src/asynk/async_queue.rs Normal file
View file

@ -0,0 +1,993 @@
use crate::asynk::async_runnable::Error as FangError;
use crate::AsyncRunnable;
use async_trait::async_trait;
use bb8_postgres::bb8::Pool;
use bb8_postgres::bb8::RunError;
use bb8_postgres::tokio_postgres::row::Row;
#[cfg(test)]
use bb8_postgres::tokio_postgres::tls::NoTls;
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use bb8_postgres::tokio_postgres::Socket;
use bb8_postgres::tokio_postgres::Transaction;
use bb8_postgres::PostgresConnectionManager;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use postgres_types::{FromSql, ToSql};
use thiserror::Error;
use typed_builder::TypedBuilder;
use uuid::Uuid;
const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql");
const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql");
const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql");
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql");
const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql");
#[cfg(test)]
const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql");
#[cfg(test)]
const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql");
pub const DEFAULT_TASK_TYPE: &str = "common";
#[derive(Debug, Eq, PartialEq, Clone, ToSql, FromSql)]
#[postgres(name = "fang_task_state")]
pub enum FangTaskState {
#[postgres(name = "new")]
New,
#[postgres(name = "in_progress")]
InProgress,
#[postgres(name = "failed")]
Failed,
#[postgres(name = "finished")]
Finished,
}
impl Default for FangTaskState {
fn default() -> Self {
FangTaskState::New
}
}
#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)]
pub struct Task {
#[builder(setter(into))]
pub id: Uuid,
#[builder(setter(into))]
pub metadata: serde_json::Value,
#[builder(setter(into))]
pub error_message: Option<String>,
#[builder(default, setter(into))]
pub state: FangTaskState,
#[builder(setter(into))]
pub task_type: String,
#[builder(setter(into))]
pub created_at: DateTime<Utc>,
#[builder(setter(into))]
pub updated_at: DateTime<Utc>,
}
#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)]
pub struct PeriodicTask {
#[builder(setter(into))]
pub id: Uuid,
#[builder(setter(into))]
pub metadata: serde_json::Value,
#[builder(setter(into))]
pub period_in_seconds: i32,
#[builder(setter(into))]
pub scheduled_at: Option<DateTime<Utc>>,
#[builder(setter(into))]
pub created_at: DateTime<Utc>,
#[builder(setter(into))]
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Error)]
pub enum AsyncQueueError {
#[error(transparent)]
PoolError(#[from] RunError<bb8_postgres::tokio_postgres::Error>),
#[error(transparent)]
PgError(#[from] bb8_postgres::tokio_postgres::Error),
#[error(transparent)]
SerdeError(#[from] serde_json::Error),
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
ResultError { expected: u64, found: u64 },
#[error(
"AsyncQueue is not connected :( , call connect() method first and then perform operations"
)]
NotConnectedError,
}
impl From<AsyncQueueError> for FangError {
fn from(error: AsyncQueueError) -> Self {
let message = format!("{:?}", error);
FangError {
description: message,
}
}
}
#[async_trait]
pub trait AsyncQueueable: Send {
async fn fetch_and_touch_task(
&mut self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError>;
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
async fn update_task_state(
&mut self,
task: Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError>;
async fn fail_task(&mut self, task: Task, error_message: &str)
-> Result<Task, AsyncQueueError>;
async fn fetch_periodic_tasks(
&mut self,
error_margin_seconds: i64,
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError>;
async fn insert_periodic_task(
&mut self,
task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>,
period: i32,
) -> Result<PeriodicTask, AsyncQueueError>;
async fn schedule_next_task(
&mut self,
periodic_task: PeriodicTask,
) -> Result<PeriodicTask, AsyncQueueError>;
}
#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
#[builder(default=None, setter(skip))]
pool: Option<Pool<PostgresConnectionManager<Tls>>>,
#[builder(setter(into))]
uri: String,
#[builder(setter(into))]
max_pool_size: u32,
#[builder(default = false, setter(into))]
duplicated_tasks: bool,
#[builder(default = false, setter(skip))]
connected: bool,
}
#[cfg(test)]
#[derive(TypedBuilder)]
pub struct AsyncQueueTest<'a> {
#[builder(setter(into))]
pub transaction: Transaction<'a>,
#[builder(default = false, setter(into))]
pub duplicated_tasks: bool,
}
#[cfg(test)]
impl<'a> AsyncQueueTest<'a> {
pub async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
let row: Row = self
.transaction
.query_one(FIND_TASK_BY_ID_QUERY, &[&id])
.await?;
let task = AsyncQueue::<NoTls>::row_to_task(row);
Ok(task)
}
pub async fn find_periodic_task_by_id(
&mut self,
id: Uuid,
) -> Result<PeriodicTask, AsyncQueueError> {
let row: Row = self
.transaction
.query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id])
.await?;
let task = AsyncQueue::<NoTls>::row_to_periodic_task(row);
Ok(task)
}
}
#[cfg(test)]
#[async_trait]
impl AsyncQueueable for AsyncQueueTest<'_> {
async fn fetch_and_touch_task(
&mut self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
let transaction = &mut self.transaction;
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(transaction, task_type).await?;
Ok(task)
}
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
let transaction = &mut self.transaction;
let metadata = serde_json::to_value(task)?;
let task: Task = if self.duplicated_tasks {
AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, &task.task_type()).await?
} else {
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
transaction,
metadata,
&task.task_type(),
)
.await?
};
Ok(task)
}
async fn schedule_next_task(
&mut self,
periodic_task: PeriodicTask,
) -> Result<PeriodicTask, AsyncQueueError> {
let transaction = &mut self.transaction;
let periodic_task =
AsyncQueue::<NoTls>::schedule_next_task_query(transaction, periodic_task).await?;
Ok(periodic_task)
}
async fn insert_periodic_task(
&mut self,
task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>,
period: i32,
) -> Result<PeriodicTask, AsyncQueueError> {
let transaction = &mut self.transaction;
let metadata = serde_json::to_value(task)?;
let periodic_task = AsyncQueue::<NoTls>::insert_periodic_task_query(
transaction,
metadata,
timestamp,
period,
)
.await?;
Ok(periodic_task)
}
async fn fetch_periodic_tasks(
&mut self,
error_margin_seconds: i64,
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
let transaction = &mut self.transaction;
let periodic_task =
AsyncQueue::<NoTls>::fetch_periodic_tasks_query(transaction, error_margin_seconds)
.await?;
Ok(periodic_task)
}
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction;
let result = AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await?;
Ok(result)
}
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction;
let result = AsyncQueue::<NoTls>::remove_task_query(transaction, task).await?;
Ok(result)
}
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction;
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(transaction, task_type).await?;
Ok(result)
}
async fn update_task_state(
&mut self,
task: Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError> {
let transaction = &mut self.transaction;
let task = AsyncQueue::<NoTls>::update_task_state_query(transaction, task, state).await?;
Ok(task)
}
async fn fail_task(
&mut self,
task: Task,
error_message: &str,
) -> Result<Task, AsyncQueueError> {
let transaction = &mut self.transaction;
let task = AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await?;
Ok(task)
}
}
impl<Tls> AsyncQueue<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> {
if self.connected {
Ok(())
} else {
Err(AsyncQueueError::NotConnectedError)
}
}
pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> {
let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?;
let pool = Pool::builder()
.max_size(self.max_pool_size)
.build(manager)
.await?;
self.pool = Some(pool);
self.connected = true;
Ok(())
}
async fn remove_all_tasks_query(
transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await
}
async fn remove_task_query(
transaction: &mut Transaction<'_>,
task: Task,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
}
async fn remove_tasks_type_query(
transaction: &mut Transaction<'_>,
task_type: &str,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
}
async fn fail_task_query(
transaction: &mut Transaction<'_>,
task: Task,
error_message: &str,
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now();
let row: Row = transaction
.query_one(
FAIL_TASK_QUERY,
&[
&FangTaskState::Failed,
&error_message,
&updated_at,
&task.id,
],
)
.await?;
let failed_task = Self::row_to_task(row);
Ok(failed_task)
}
async fn fetch_and_touch_task_query(
transaction: &mut Transaction<'_>,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
let task_type = match task_type {
Some(passed_task_type) => passed_task_type,
None => DEFAULT_TASK_TYPE.to_string(),
};
let task = match Self::get_task_type_query(transaction, &task_type).await {
Ok(some_task) => Some(some_task),
Err(_) => None,
};
let result_task = if let Some(some_task) = task {
Some(
Self::update_task_state_query(transaction, some_task, FangTaskState::InProgress)
.await?,
)
} else {
None
};
Ok(result_task)
}
async fn get_task_type_query(
transaction: &mut Transaction<'_>,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
let row: Row = transaction
.query_one(FETCH_TASK_TYPE_QUERY, &[&task_type])
.await?;
let task = Self::row_to_task(row);
Ok(task)
}
async fn update_task_state_query(
transaction: &mut Transaction<'_>,
task: Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now();
let row: Row = transaction
.query_one(UPDATE_TASK_STATE_QUERY, &[&state, &updated_at, &task.id])
.await?;
let task = Self::row_to_task(row);
Ok(task)
}
async fn insert_task_query(
transaction: &mut Transaction<'_>,
metadata: serde_json::Value,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
let row: Row = transaction
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
.await?;
let task = Self::row_to_task(row);
Ok(task)
}
async fn schedule_next_task_query(
transaction: &mut Transaction<'_>,
periodic_task: PeriodicTask,
) -> Result<PeriodicTask, AsyncQueueError> {
let updated_at = Utc::now();
let scheduled_at = updated_at + Duration::seconds(periodic_task.period_in_seconds.into());
let row: Row = transaction
.query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at])
.await?;
let periodic_task = Self::row_to_periodic_task(row);
Ok(periodic_task)
}
async fn insert_periodic_task_query(
transaction: &mut Transaction<'_>,
metadata: serde_json::Value,
timestamp: DateTime<Utc>,
period: i32,
) -> Result<PeriodicTask, AsyncQueueError> {
let row: Row = transaction
.query_one(
INSERT_PERIODIC_TASK_QUERY,
&[&metadata, &timestamp, &period],
)
.await?;
let periodic_task = Self::row_to_periodic_task(row);
Ok(periodic_task)
}
async fn fetch_periodic_tasks_query(
transaction: &mut Transaction<'_>,
error_margin_seconds: i64,
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
let current_time = Utc::now();
let low_limit = current_time - Duration::seconds(error_margin_seconds);
let high_limit = current_time + Duration::seconds(error_margin_seconds);
let rows: Vec<Row> = transaction
.query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit])
.await?;
let periodic_tasks: Vec<PeriodicTask> = rows
.into_iter()
.map(|row| Self::row_to_periodic_task(row))
.collect();
if periodic_tasks.is_empty() {
Ok(None)
} else {
Ok(Some(periodic_tasks))
}
}
async fn execute_query(
transaction: &mut Transaction<'_>,
query: &str,
params: &[&(dyn ToSql + Sync)],
expected_result_count: Option<u64>,
) -> Result<u64, AsyncQueueError> {
let result = transaction.execute(query, params).await?;
if let Some(expected_result) = expected_result_count {
if result != expected_result {
return Err(AsyncQueueError::ResultError {
expected: expected_result,
found: result,
});
}
}
Ok(result)
}
async fn insert_task_if_not_exist_query(
transaction: &mut Transaction<'_>,
metadata: serde_json::Value,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
match Self::find_task_by_metadata_query(transaction, &metadata).await {
Some(task) => Ok(task),
None => Self::insert_task_query(transaction, metadata, task_type).await,
}
}
async fn find_task_by_metadata_query(
transaction: &mut Transaction<'_>,
metadata: &serde_json::Value,
) -> Option<Task> {
let result = transaction
.query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata])
.await;
match result {
Ok(row) => Some(Self::row_to_task(row)),
Err(_) => None,
}
}
fn row_to_periodic_task(row: Row) -> PeriodicTask {
let id: Uuid = row.get("id");
let metadata: serde_json::Value = row.get("metadata");
let period_in_seconds: i32 = row.get("period_in_seconds");
let scheduled_at: Option<DateTime<Utc>> = match row.try_get("scheduled_at") {
Ok(datetime) => Some(datetime),
Err(_) => None,
};
let created_at: DateTime<Utc> = row.get("created_at");
let updated_at: DateTime<Utc> = row.get("updated_at");
PeriodicTask::builder()
.id(id)
.metadata(metadata)
.period_in_seconds(period_in_seconds)
.scheduled_at(scheduled_at)
.created_at(created_at)
.updated_at(updated_at)
.build()
}
fn row_to_task(row: Row) -> Task {
let id: Uuid = row.get("id");
let metadata: serde_json::Value = row.get("metadata");
let error_message: Option<String> = match row.try_get("error_message") {
Ok(error_message) => Some(error_message),
Err(_) => None,
};
let state: FangTaskState = row.get("state");
let task_type: String = row.get("task_type");
let created_at: DateTime<Utc> = row.get("created_at");
let updated_at: DateTime<Utc> = row.get("updated_at");
Task::builder()
.id(id)
.metadata(metadata)
.error_message(error_message)
.state(state)
.task_type(task_type)
.created_at(created_at)
.updated_at(updated_at)
.build()
}
}
#[async_trait]
impl<Tls> AsyncQueueable for AsyncQueue<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
async fn fetch_and_touch_task(
&mut self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?;
transaction.commit().await?;
Ok(task)
}
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let metadata = serde_json::to_value(task)?;
let task: Task = if self.duplicated_tasks {
Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await?
} else {
Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type())
.await?
};
transaction.commit().await?;
Ok(task)
}
async fn insert_periodic_task(
&mut self,
task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>,
period: i32,
) -> Result<PeriodicTask, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let metadata = serde_json::to_value(task)?;
let periodic_task =
Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?;
transaction.commit().await?;
Ok(periodic_task)
}
async fn schedule_next_task(
&mut self,
periodic_task: PeriodicTask,
) -> Result<PeriodicTask, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?;
transaction.commit().await?;
Ok(periodic_task)
}
async fn fetch_periodic_tasks(
&mut self,
error_margin_seconds: i64,
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let periodic_task =
Self::fetch_periodic_tasks_query(&mut transaction, error_margin_seconds).await?;
transaction.commit().await?;
Ok(periodic_task)
}
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_all_tasks_query(&mut transaction).await?;
transaction.commit().await?;
Ok(result)
}
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_task_query(&mut transaction, task).await?;
transaction.commit().await?;
Ok(result)
}
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
transaction.commit().await?;
Ok(result)
}
async fn update_task_state(
&mut self,
task: Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let task = Self::update_task_state_query(&mut transaction, task, state).await?;
transaction.commit().await?;
Ok(task)
}
async fn fail_task(
&mut self,
task: Task,
error_message: &str,
) -> Result<Task, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
transaction.commit().await?;
Ok(task)
}
}
#[cfg(test)]
mod async_queue_tests {
use super::AsyncQueueTest;
use super::AsyncQueueable;
use super::FangTaskState;
use super::Task;
use crate::asynk::AsyncRunnable;
use crate::asynk::Error;
use async_trait::async_trait;
use bb8_postgres::bb8::Pool;
use bb8_postgres::tokio_postgres::NoTls;
use bb8_postgres::PostgresConnectionManager;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct AsyncTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
}
#[tokio::test]
async fn insert_task_creates_new_task() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn update_task_state_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let id = task.id;
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let finished_task = test
.update_task_state(task, FangTaskState::Finished)
.await
.unwrap();
assert_eq!(id, finished_task.id);
assert_eq!(FangTaskState::Finished, finished_task.state);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn failed_task_query_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let id = task.id;
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let failed_task = test.fail_task(task, "Some error").await.unwrap();
assert_eq!(id, failed_task.id);
assert_eq!(Some("Some error"), failed_task.error_message.as_deref());
assert_eq!(FangTaskState::Failed, failed_task.state);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn remove_all_tasks_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = insert_task(&mut test, &AsyncTask { number: 2 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let result = test.remove_all_tasks().await.unwrap();
assert_eq!(2, result);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn fetch_and_touch_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = insert_task(&mut test, &AsyncTask { number: 2 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = test.fetch_and_touch_task(None).await.unwrap().unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = test.fetch_and_touch_task(None).await.unwrap().unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn remove_tasks_type_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = insert_task(&mut test, &AsyncTask { number: 2 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let result = test.remove_tasks_type("mytype").await.unwrap();
assert_eq!(0, result);
let result = test.remove_tasks_type("common").await.unwrap();
assert_eq!(2, result);
test.transaction.rollback().await.unwrap();
}
async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task {
test.insert_task(task).await.unwrap()
}
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
let pg_mgr = PostgresConnectionManager::new_from_stringlike(
"postgres://postgres:postgres@localhost/fang",
NoTls,
)
.unwrap();
Pool::builder().build(pg_mgr).await.unwrap()
}
}

View file

@ -0,0 +1,19 @@
use crate::asynk::async_queue::AsyncQueueable;
use async_trait::async_trait;
const COMMON_TYPE: &str = "common";
#[derive(Debug)]
pub struct Error {
pub description: String,
}
#[typetag::serde(tag = "type")]
#[async_trait]
pub trait AsyncRunnable: Send + Sync {
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>;
fn task_type(&self) -> String {
COMMON_TYPE.to_string()
}
}

View file

@ -0,0 +1,267 @@
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_queue::PeriodicTask;
use crate::asynk::AsyncRunnable;
use crate::asynk::Error;
use async_recursion::async_recursion;
use log::error;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use typed_builder::TypedBuilder;
#[derive(TypedBuilder, Clone)]
pub struct Scheduler<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[builder(setter(into))]
pub check_period: u64,
#[builder(setter(into))]
pub error_margin_seconds: u64,
#[builder(setter(into))]
pub queue: AQueue,
#[builder(default = 0, setter(into))]
pub number_of_restarts: u32,
}
impl<AQueue> Scheduler<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[async_recursion(?Send)]
pub async fn start(&mut self) -> Result<(), Error> {
let join_handle: JoinHandle<Result<(), Error>> = self.schedule_loop().await;
match join_handle.await {
Err(err) => {
error!(
"Scheduler panicked, restarting {:?}. Number of restarts {}",
err, self.number_of_restarts
);
self.number_of_restarts += 1;
sleep(Duration::from_secs(1)).await;
self.start().await
}
Ok(task_res) => match task_res {
Err(err) => {
error!(
"Scheduler failed, restarting {:?}. Number of restarts {}",
err, self.number_of_restarts
);
self.number_of_restarts += 1;
self.start().await
}
Ok(_) => {
error!(
"Scheduler stopped. restarting. Number of restarts {}",
self.number_of_restarts
);
self.number_of_restarts += 1;
self.start().await
}
},
}
}
pub async fn schedule_loop(&mut self) -> JoinHandle<Result<(), Error>> {
let mut scheduler = self.clone();
tokio::spawn(async move {
let sleep_duration = Duration::from_secs(scheduler.check_period);
loop {
scheduler.schedule().await?;
sleep(sleep_duration).await;
}
})
}
pub async fn schedule(&mut self) -> Result<(), Error> {
if let Some(tasks) = self
.queue
.fetch_periodic_tasks(self.error_margin_seconds as i64)
.await?
{
for task in tasks {
self.process_task(task).await?;
}
};
Ok(())
}
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
match task.scheduled_at {
None => {
self.queue.schedule_next_task(task).await?;
}
Some(_) => {
let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
self.queue.insert_task(&*actual_task).await?;
self.queue.schedule_next_task(task).await?;
}
}
Ok(())
}
}
#[cfg(test)]
#[derive(TypedBuilder)]
pub struct SchedulerTest<'a> {
#[builder(setter(into))]
pub check_period: u64,
#[builder(setter(into))]
pub error_margin_seconds: u64,
#[builder(setter(into))]
pub queue: &'a mut dyn AsyncQueueable,
#[builder(default = 0, setter(into))]
pub number_of_restarts: u32,
}
#[cfg(test)]
impl<'a> SchedulerTest<'a> {
async fn schedule_test(&mut self) -> Result<(), Error> {
let sleep_duration = Duration::from_secs(self.check_period);
loop {
match self
.queue
.fetch_periodic_tasks(self.error_margin_seconds as i64)
.await?
{
Some(tasks) => {
for task in tasks {
self.process_task(task).await?;
}
return Ok(());
}
None => {
sleep(sleep_duration).await;
}
};
}
}
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
match task.scheduled_at {
None => {
self.queue.schedule_next_task(task).await?;
}
Some(_) => {
let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
self.queue.insert_task(&*actual_task).await?;
self.queue.schedule_next_task(task).await?;
}
}
Ok(())
}
}
#[cfg(test)]
mod async_scheduler_tests {
use super::SchedulerTest;
use crate::asynk::async_queue::AsyncQueueTest;
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_queue::PeriodicTask;
use crate::asynk::AsyncRunnable;
use crate::asynk::Error;
use async_trait::async_trait;
use bb8_postgres::bb8::Pool;
use bb8_postgres::tokio_postgres::NoTls;
use bb8_postgres::PostgresConnectionManager;
use chrono::DateTime;
use chrono::Duration as OtherDuration;
use chrono::Utc;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct AsyncScheduledTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncScheduledTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"schedule".to_string()
}
}
#[tokio::test]
async fn schedules_tasks() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = insert_periodic_task(
&mut test,
&AsyncScheduledTask { number: 1 },
schedule_in_future,
10,
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = SchedulerTest::builder()
.check_period(check_period)
.error_margin_seconds(error_margin_seconds)
.queue(&mut test as &mut dyn AsyncQueueable)
.build();
// Scheduler start tricky not loop :)
scheduler.schedule_test().await.unwrap();
let task = scheduler
.queue
.fetch_and_touch_task(Some("schedule".to_string()))
.await
.unwrap()
.unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let runnable_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
assert_eq!("schedule", runnable_task.task_type());
assert_eq!(Some("AsyncScheduledTask"), type_task);
assert_eq!(Some(1), number);
}
async fn insert_periodic_task(
test: &mut AsyncQueueTest<'_>,
task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>,
period_in_seconds: i32,
) -> PeriodicTask {
test.insert_periodic_task(task, timestamp, period_in_seconds)
.await
.unwrap()
}
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
let pg_mgr = PostgresConnectionManager::new_from_stringlike(
"postgres://postgres:postgres@localhost/fang",
NoTls,
)
.unwrap();
Pool::builder().build(pg_mgr).await.unwrap()
}
}

419
src/asynk/async_worker.rs Normal file
View file

@ -0,0 +1,419 @@
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_queue::FangTaskState;
use crate::asynk::async_queue::Task;
use crate::asynk::async_queue::DEFAULT_TASK_TYPE;
use crate::asynk::async_runnable::AsyncRunnable;
use crate::asynk::Error;
use crate::{RetentionMode, SleepParams};
use log::error;
use std::time::Duration;
use typed_builder::TypedBuilder;
#[derive(TypedBuilder)]
pub struct AsyncWorker<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[builder(setter(into))]
pub queue: AQueue,
#[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))]
pub task_type: String,
#[builder(default, setter(into))]
pub sleep_params: SleepParams,
#[builder(default, setter(into))]
pub retention_mode: RetentionMode,
}
impl<AQueue> AsyncWorker<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
let result = self.execute_task(task).await;
self.finalize_task(result).await
}
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
let task_result = actual_task.run(&mut self.queue).await;
match task_result {
Ok(()) => Ok(task),
Err(error) => Err((task, error.description)),
}
}
async fn finalize_task(&mut self, result: Result<Task, (Task, String)>) -> Result<(), Error> {
match self.retention_mode {
RetentionMode::KeepAll => match result {
Ok(task) => {
self.queue
.update_task_state(task, FangTaskState::Finished)
.await?;
Ok(())
}
Err((task, error)) => {
self.queue.fail_task(task, &error).await?;
Ok(())
}
},
RetentionMode::RemoveAll => match result {
Ok(task) => {
self.queue.remove_task(task).await?;
Ok(())
}
Err((task, _error)) => {
self.queue.remove_task(task).await?;
Ok(())
}
},
RetentionMode::RemoveFinished => match result {
Ok(task) => {
self.queue.remove_task(task).await?;
Ok(())
}
Err((task, error)) => {
self.queue.fail_task(task, &error).await?;
Ok(())
}
},
}
}
pub async fn sleep(&mut self) {
self.sleep_params.maybe_increase_sleep_period();
tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await;
}
pub async fn run_tasks(&mut self) -> Result<(), Error> {
loop {
match self
.queue
.fetch_and_touch_task(Some(self.task_type.clone()))
.await
{
Ok(Some(task)) => {
self.sleep_params.maybe_reset_sleep_period();
self.run(task).await?
}
Ok(None) => {
self.sleep().await;
}
Err(error) => {
error!("Failed to fetch a task {:?}", error);
self.sleep().await;
}
};
}
}
}
#[cfg(test)]
#[derive(TypedBuilder)]
pub struct AsyncWorkerTest<'a> {
#[builder(setter(into))]
pub queue: &'a mut dyn AsyncQueueable,
#[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))]
pub task_type: String,
#[builder(default, setter(into))]
pub sleep_params: SleepParams,
#[builder(default, setter(into))]
pub retention_mode: RetentionMode,
}
#[cfg(test)]
impl<'a> AsyncWorkerTest<'a> {
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
let result = self.execute_task(task).await;
self.finalize_task(result).await
}
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
let task_result = actual_task.run(self.queue).await;
match task_result {
Ok(()) => Ok(task),
Err(error) => Err((task, error.description)),
}
}
async fn finalize_task(&mut self, result: Result<Task, (Task, String)>) -> Result<(), Error> {
match self.retention_mode {
RetentionMode::KeepAll => match result {
Ok(task) => {
self.queue
.update_task_state(task, FangTaskState::Finished)
.await?;
Ok(())
}
Err((task, error)) => {
self.queue.fail_task(task, &error).await?;
Ok(())
}
},
RetentionMode::RemoveAll => match result {
Ok(task) => {
self.queue.remove_task(task).await?;
Ok(())
}
Err((task, _error)) => {
self.queue.remove_task(task).await?;
Ok(())
}
},
RetentionMode::RemoveFinished => match result {
Ok(task) => {
self.queue.remove_task(task).await?;
Ok(())
}
Err((task, error)) => {
self.queue.fail_task(task, &error).await?;
Ok(())
}
},
}
}
pub async fn sleep(&mut self) {
self.sleep_params.maybe_increase_sleep_period();
tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await;
}
pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> {
loop {
match self
.queue
.fetch_and_touch_task(Some(self.task_type.clone()))
.await
{
Ok(Some(task)) => {
self.sleep_params.maybe_reset_sleep_period();
self.run(task).await?
}
Ok(None) => {
return Ok(());
}
Err(error) => {
error!("Failed to fetch a task {:?}", error);
self.sleep().await;
}
};
}
}
}
#[cfg(test)]
mod async_worker_tests {
use super::AsyncWorkerTest;
use crate::asynk::async_queue::AsyncQueueTest;
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_queue::FangTaskState;
use crate::asynk::async_worker::Task;
use crate::asynk::AsyncRunnable;
use crate::asynk::Error;
use crate::RetentionMode;
use async_trait::async_trait;
use bb8_postgres::bb8::Pool;
use bb8_postgres::tokio_postgres::NoTls;
use bb8_postgres::PostgresConnectionManager;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct WorkerAsyncTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for WorkerAsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct AsyncFailedTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncFailedTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
let message = format!("number {} is wrong :(", self.number);
Err(Error {
description: message,
})
}
}
#[derive(Serialize, Deserialize)]
struct AsyncTaskType1 {}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTaskType1 {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"type1".to_string()
}
}
#[derive(Serialize, Deserialize)]
struct AsyncTaskType2 {}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTaskType2 {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"type2".to_string()
}
}
#[tokio::test]
async fn execute_and_finishes_task() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await;
let id = task.id;
let mut worker = AsyncWorkerTest::builder()
.queue(&mut test as &mut dyn AsyncQueueable)
.retention_mode(RetentionMode::KeepAll)
.build();
worker.run(task).await.unwrap();
let task_finished = test.find_task_by_id(id).await.unwrap();
assert_eq!(id, task_finished.id);
assert_eq!(FangTaskState::Finished, task_finished.state);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn saves_error_for_failed_task() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await;
let id = task.id;
let mut worker = AsyncWorkerTest::builder()
.queue(&mut test as &mut dyn AsyncQueueable)
.retention_mode(RetentionMode::KeepAll)
.build();
worker.run(task).await.unwrap();
let task_finished = test.find_task_by_id(id).await.unwrap();
assert_eq!(id, task_finished.id);
assert_eq!(FangTaskState::Failed, task_finished.state);
assert_eq!(
"number 1 is wrong :(".to_string(),
task_finished.error_message.unwrap()
);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn executes_task_only_of_specific_type() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await;
let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await;
let task2 = insert_task(&mut test, &AsyncTaskType2 {}).await;
let id1 = task1.id;
let id12 = task12.id;
let id2 = task2.id;
let mut worker = AsyncWorkerTest::builder()
.queue(&mut test as &mut dyn AsyncQueueable)
.task_type("type1".to_string())
.retention_mode(RetentionMode::KeepAll)
.build();
worker.run_tasks_until_none().await.unwrap();
let task1 = test.find_task_by_id(id1).await.unwrap();
let task12 = test.find_task_by_id(id12).await.unwrap();
let task2 = test.find_task_by_id(id2).await.unwrap();
assert_eq!(id1, task1.id);
assert_eq!(id12, task12.id);
assert_eq!(id2, task2.id);
assert_eq!(FangTaskState::Finished, task1.state);
assert_eq!(FangTaskState::Finished, task12.state);
assert_eq!(FangTaskState::New, task2.state);
test.transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn remove_when_finished() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await;
let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await;
let task2 = insert_task(&mut test, &AsyncTaskType2 {}).await;
let _id1 = task1.id;
let _id12 = task12.id;
let id2 = task2.id;
let mut worker = AsyncWorkerTest::builder()
.queue(&mut test as &mut dyn AsyncQueueable)
.task_type("type1".to_string())
.build();
worker.run_tasks_until_none().await.unwrap();
let task = test
.fetch_and_touch_task(Some("type1".to_string()))
.await
.unwrap();
assert_eq!(None, task);
let task2 = test
.fetch_and_touch_task(Some("type2".to_string()))
.await
.unwrap()
.unwrap();
assert_eq!(id2, task2.id);
test.transaction.rollback().await.unwrap();
}
async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task {
test.insert_task(task).await.unwrap()
}
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
let pg_mgr = PostgresConnectionManager::new_from_stringlike(
"postgres://postgres:postgres@localhost/fang",
NoTls,
)
.unwrap();
Pool::builder().build(pg_mgr).await.unwrap()
}
}

View file

@ -0,0 +1,75 @@
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_worker::AsyncWorker;
use crate::asynk::Error;
use crate::{RetentionMode, SleepParams};
use async_recursion::async_recursion;
use log::error;
use tokio::task::JoinHandle;
use typed_builder::TypedBuilder;
#[derive(TypedBuilder, Clone)]
pub struct AsyncWorkerPool<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[builder(setter(into))]
pub queue: AQueue,
#[builder(default, setter(into))]
pub sleep_params: SleepParams,
#[builder(default, setter(into))]
pub retention_mode: RetentionMode,
#[builder(setter(into))]
pub number_of_workers: u32,
}
impl<AQueue> AsyncWorkerPool<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
pub async fn start(&mut self) {
for idx in 0..self.number_of_workers {
let pool = self.clone();
tokio::spawn(Self::supervise_task(pool, 0, idx));
}
}
#[async_recursion]
pub async fn supervise_task(pool: AsyncWorkerPool<AQueue>, restarts: u64, worker_number: u32) {
let restarts = restarts + 1;
let join_handle = Self::spawn_worker(
pool.queue.clone(),
pool.sleep_params.clone(),
pool.retention_mode.clone(),
)
.await;
if (join_handle.await).is_err() {
error!(
"Worker {} stopped. Restarting. the number of restarts {}",
worker_number, restarts,
);
Self::supervise_task(pool, restarts, worker_number).await;
}
}
pub async fn spawn_worker(
queue: AQueue,
sleep_params: SleepParams,
retention_mode: RetentionMode,
) -> JoinHandle<Result<(), Error>> {
tokio::spawn(async move { Self::run_worker(queue, sleep_params, retention_mode).await })
}
pub async fn run_worker(
queue: AQueue,
sleep_params: SleepParams,
retention_mode: RetentionMode,
) -> Result<(), Error> {
let mut worker: AsyncWorker<AQueue> = AsyncWorker::builder()
.queue(queue)
.sleep_params(sleep_params)
.retention_mode(retention_mode)
.build();
worker.run_tasks().await
}
}

8
src/asynk/mod.rs Normal file
View file

@ -0,0 +1,8 @@
pub mod async_queue;
pub mod async_runnable;
pub mod async_scheduler;
pub mod async_worker;
pub mod async_worker_pool;
pub use async_runnable::AsyncRunnable;
pub use async_runnable::Error;

View file

@ -0,0 +1 @@
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at

View file

@ -0,0 +1 @@
SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL

View file

@ -0,0 +1 @@
SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED

View file

@ -0,0 +1 @@
SELECT * FROM fang_periodic_tasks WHERE id = $1

View file

@ -0,0 +1 @@
SELECT * FROM fang_tasks WHERE id = $1

View file

@ -0,0 +1 @@
SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1

View file

@ -0,0 +1 @@
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_seconds") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at

View file

@ -0,0 +1 @@
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at

View file

@ -0,0 +1 @@
DELETE FROM "fang_periodic_tasks"

View file

@ -0,0 +1 @@
DELETE FROM "fang_tasks"

View file

@ -0,0 +1 @@
DELETE FROM "fang_tasks" WHERE id = $1

View file

@ -0,0 +1 @@
DELETE FROM "fang_tasks" WHERE task_type = $1

View file

@ -0,0 +1 @@
UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at

View file

@ -0,0 +1 @@
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at

View file

@ -2,6 +2,7 @@ use crate::error::FangError;
use crate::queue::Queue; use crate::queue::Queue;
use crate::queue::Task; use crate::queue::Task;
use crate::worker_pool::{SharedState, WorkerState}; use crate::worker_pool::{SharedState, WorkerState};
use crate::{RetentionMode, SleepParams};
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::r2d2::{ConnectionManager, PooledConnection};
use log::error; use log::error;
@ -16,46 +17,6 @@ pub struct Executor {
shared_state: Option<SharedState>, shared_state: Option<SharedState>,
} }
#[derive(Clone)]
pub enum RetentionMode {
KeepAll,
RemoveAll,
RemoveFinished,
}
#[derive(Clone)]
pub struct SleepParams {
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
pub sleep_step: u64,
}
impl SleepParams {
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
}
}
pub fn maybe_increase_sleep_period(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step;
}
}
}
impl Default for SleepParams {
fn default() -> Self {
SleepParams {
sleep_period: 5,
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Error { pub struct Error {
pub description: String, pub description: String,
@ -185,10 +146,10 @@ mod executor_tests {
use super::Executor; use super::Executor;
use super::RetentionMode; use super::RetentionMode;
use super::Runnable; use super::Runnable;
use crate::queue::NewTask;
use crate::queue::Queue; use crate::queue::Queue;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use crate::typetag; use crate::typetag;
use crate::NewTask;
use diesel::connection::Connection; use diesel::connection::Connection;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::r2d2::{ConnectionManager, PooledConnection};

13
src/blocking/mod.rs Normal file
View file

@ -0,0 +1,13 @@
pub mod error;
pub mod executor;
pub mod queue;
pub mod scheduler;
pub mod schema;
pub mod worker_pool;
pub use error::FangError;
pub use executor::*;
pub use queue::*;
pub use scheduler::*;
pub use schema::*;
pub use worker_pool::*;

View file

@ -1,6 +1,6 @@
use crate::executor::Runnable; use crate::executor::Runnable;
use crate::queue::PeriodicTask;
use crate::queue::Queue; use crate::queue::Queue;
use crate::PeriodicTask;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -82,9 +82,9 @@ mod task_scheduler_tests {
use crate::executor::Error; use crate::executor::Error;
use crate::executor::Runnable; use crate::executor::Runnable;
use crate::queue::Queue; use crate::queue::Queue;
use crate::queue::Task;
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::typetag; use crate::typetag;
use crate::Task;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -2,9 +2,8 @@ use crate::diesel::r2d2;
use crate::diesel::PgConnection; use crate::diesel::PgConnection;
use crate::error::FangError; use crate::error::FangError;
use crate::executor::Executor; use crate::executor::Executor;
use crate::executor::RetentionMode;
use crate::executor::SleepParams;
use crate::queue::Queue; use crate::queue::Queue;
use crate::{RetentionMode, SleepParams};
use log::error; use log::error;
use log::info; use log::info;
use std::collections::HashMap; use std::collections::HashMap;
@ -223,12 +222,12 @@ mod task_pool_tests {
use super::WorkerParams; use super::WorkerParams;
use super::WorkerPool; use super::WorkerPool;
use crate::executor::Error; use crate::executor::Error;
use crate::executor::RetentionMode;
use crate::executor::Runnable; use crate::executor::Runnable;
use crate::queue::Queue; use crate::queue::Queue;
use crate::queue::Task;
use crate::schema::{fang_tasks, FangTaskState}; use crate::schema::{fang_tasks, FangTaskState};
use crate::typetag; use crate::typetag;
use crate::RetentionMode;
use crate::Task;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -1,24 +1,78 @@
#![allow(clippy::nonstandard_macro_braces)]
#![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::extra_unused_lifetimes)]
#[derive(Clone, Debug)]
pub enum RetentionMode {
KeepAll,
RemoveAll,
RemoveFinished,
}
impl Default for RetentionMode {
fn default() -> Self {
RetentionMode::RemoveAll
}
}
#[derive(Clone, Debug)]
pub struct SleepParams {
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
pub sleep_step: u64,
}
impl SleepParams {
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
}
}
pub fn maybe_increase_sleep_period(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step;
}
}
}
impl Default for SleepParams {
fn default() -> Self {
SleepParams {
sleep_period: 5,
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
}
}
}
#[macro_use] #[macro_use]
#[cfg(feature = "blocking")]
extern crate diesel; extern crate diesel;
pub mod error;
pub mod executor;
pub mod queue;
pub mod scheduler;
pub mod schema;
pub mod worker_pool;
pub use error::FangError;
pub use executor::*;
pub use queue::*;
pub use scheduler::*;
pub use schema::*;
pub use worker_pool::*;
#[doc(hidden)] #[doc(hidden)]
#[cfg(feature = "blocking")]
pub use diesel::pg::PgConnection; pub use diesel::pg::PgConnection;
#[doc(hidden)] #[doc(hidden)]
pub use typetag; pub use typetag;
#[doc(hidden)]
pub extern crate serde;
#[doc(hidden)]
pub use serde_derive::{Deserialize, Serialize};
#[cfg(feature = "blocking")]
pub mod blocking;
#[cfg(feature = "blocking")]
pub use blocking::*;
#[cfg(feature = "asynk")]
pub mod asynk;
#[cfg(feature = "asynk")]
pub use asynk::*;
#[cfg(feature = "asynk")]
#[doc(hidden)]
pub use bb8_postgres::tokio_postgres::tls::NoTls;
#[cfg(feature = "asynk")]
#[doc(hidden)]
pub use async_trait::async_trait;