Update readme with backie info

This commit is contained in:
Rafael Caricio 2023-03-07 17:52:26 +01:00
parent c1fcc87885
commit 6ed9513baf
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
12 changed files with 212 additions and 365 deletions

334
README.md
View file

@ -1,308 +1,153 @@
<p align="center"><img src="https://raw.githubusercontent.com/ayrat555/fang/master/logo.png" alt="fang" height="300px"></p>
[![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style] [![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style]
# Backie # Backie
Background task processing library for Rust. It uses Postgres DB as a task queue. Async background job processing library with Diesel and Tokio. It's a heavily modified fork of [fang](https://github.com/ayrat555/fang).
## Key Features ## Key Features
Here are some of the fang's key features: Here are some of the fang's key features:
- Async and threaded workers. - Async workers: Workers are started as `tokio` tasks (async workers)
Workers can be started in threads (threaded workers) or `tokio` tasks (async workers) - Unique tasks: Tasks are not duplicated in the queue if they are unique
- Scheduled tasks. - Single-purpose workers: Tasks are stored in a single table but workers can be configured to execute only tasks of a specific type
Tasks can be scheduled at any time in the future - Retries: Tasks can be retried with a custom backoff mode
- Periodic (CRON) tasks.
Tasks can be scheduled using cron expressions
- Unique tasks.
Tasks are not duplicated in the queue if they are unique
- Single-purpose workers.
Tasks are stored in a single table but workers can be configured to execute only tasks of a specific type
- Retries.
Tasks can be retried with a custom backoff mode
## Differences from original fang ## Differences from Fang crate
- Supports only async processing - Supports only async processing
- Supports graceful shutdown - Supports graceful shutdown
- The connection pool for the queue is provided by the user - The connection pool for the queue is provided by the user
- Tasks status is calculated based on the database state
- Tasks have a timeout and are retried if they are not completed in time
## Installation ## Installation
1. Add this to your Cargo.toml 1. Add this to your Cargo.toml
#### the Blocking feature
```toml ```toml
[dependencies] [dependencies]
fang = { version = "0.10" , features = ["blocking"], default-features = false } backie = "0.10"
```
#### the Asynk feature
```toml
[dependencies]
fang = { version = "0.10" , features = ["asynk"], default-features = false }
```
#### Both features
```toml
fang = { version = "0.10" }
``` ```
*Supports rustc 1.67+* *Supports rustc 1.67+*
2. Create the `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2022-08-20-151615_create_fang_tasks/up.sql). 2. Create the `backie_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/rafaelcaricio/backie/blob/master/migrations/2023-03-06-151907_create_backie_tasks/up.sql).
## Usage ## Usage
### Defining a task Every task must implement the `backie::RunnableTask` trait, Backie uses the information provided by the trait to
execute the task.
#### Blocking feature All implementations of `RunnableTask` must have unique names per project.
Every task should implement the `fang::Runnable` trait which is used by `fang` to execute it.
```rust ```rust
use fang::Error; use backie::RunnableTask;
use fang::Runnable; use backie::task::{TaskHash, TaskType};
use fang::typetag; use backie::queue::AsyncQueueable;
use fang::PgConnection; use serde::{Deserialize, Serialize};
use fang::serde::{Deserialize, Serialize}; use async_trait::async_trait;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")] #[serde(crate = "fang::serde")]
struct MyTask { struct MyTask {
pub number: u16, pub number: u16,
}
#[typetag::serde]
impl Runnable for MyTask {
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
println!("the number is {}", self.number);
Ok(())
}
// If `uniq` is set to true and the task is already in the storage, it won't be inserted again
// The existing record will be returned for for any insertions operaiton
fn uniq(&self) -> bool {
true
}
// This will be useful if you want to filter tasks.
// the default value is `common`
fn task_type(&self) -> String {
"my_task".to_string()
}
// This will be useful if you would like to schedule tasks.
// default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
fn cron(&self) -> Option<Scheduled> {
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
// the maximum number of retries. Set it to 0 to make it not retriable
// the default value is 20
fn max_retries(&self) -> i32 {
20
}
// backoff mode for retries
fn backoff(&self, attempt: u32) -> u32 {
u32::pow(2, attempt)
}
}
```
As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task.
The second parameter of the `run` function is a struct that implements `fang::Queueable`. You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it.
#### Asynk feature
Every task should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it.
Be careful not to call two implementations of the AsyncRunnable trait with the same name, because it will cause a failure in the `typetag` crate.
```rust
use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
pub number: u16,
} }
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl AsyncRunnable for AsyncTask { impl RunnableTask for MyTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())
} }
// this func is optional
// Default task_type is common // this func is optional
fn task_type(&self) -> String { // Default task_type is common
"my-task-type".to_string() fn task_type(&self) -> TaskType {
} "my-task-type".into()
}
// If `uniq` is set to true and the task is already in the storage, it won't be inserted again
// The existing record will be returned for for any insertions operaiton
fn uniq(&self) -> Option<TaskHash> {
None
}
// If `uniq` is set to true and the task is already in the storage, it won't be inserted again // the maximum number of retries. Set it to 0 to make it not retriable
// The existing record will be returned for for any insertions operaiton // the default value is 20
fn uniq(&self) -> bool { fn max_retries(&self) -> i32 {
true
}
// This will be useful if you would like to schedule tasks.
// default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
fn cron(&self) -> Option<Scheduled> {
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
// the maximum number of retries. Set it to 0 to make it not retriable
// the default value is 20
fn max_retries(&self) -> i32 {
20 20
} }
// backoff mode for retries // backoff mode for retries
fn backoff(&self, attempt: u32) -> u32 { fn backoff(&self, attempt: u32) -> u32 {
u32::pow(2, attempt) u32::pow(2, attempt)
} }
} }
``` ```
In both modules, tasks can be scheduled to be executed once. Use `Scheduled::ScheduleOnce` enum variant.
Datetimes and cron patterns are interpreted in the UTC timezone. So you should introduce the offset to schedule in a different timezone.
Example:
If your timezone is UTC + 2 and you want to schedule at 11:00:
```rust
let expression = "0 0 9 * * * *";
```
### Enqueuing a task ### Enqueuing a task
#### the Blocking feature To enqueue a task use `AsyncQueueable::create_task`.
To enqueue a task use `Queue::enqueue_task`
```rust
use fang::Queue;
// create a r2d2 pool
// create a fang queue
let queue = Queue::builder().connection_pool(pool).build();
let task_inserted = queue.insert_task(&MyTask::new(1)).unwrap();
```
#### the Asynk feature
To enqueue a task use `AsyncQueueable::insert_task`.
For Postgres backend. For Postgres backend.
```rust ```rust
use fang::asynk::async_queue::AsyncQueue; use backie::queue::PgAsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;
// Create an AsyncQueue // Create an AsyncQueue
let max_pool_size: u32 = 2; let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new("postgres://postgres:password@localhost/backie");
let pool = Pool::builder()
.max_size(1)
.min_idle(Some(1))
.build(manager)
.await
.unwrap();
let mut queue = AsyncQueue::builder() let mut queue = PgAsyncQueue::new(pool);
// Postgres database url
.uri("postgres://postgres:postgres@localhost/fang")
// Max number of connections that are allowed
.max_pool_size(max_pool_size)
.build();
// Always connect first in order to perform any operation // Publish the first example
queue.connect(NoTls).await.unwrap(); let task = MyTask { number: 8 };
```
As an easy example, we are using NoTls type. If for some reason you would like to encrypt Postgres requests, you can use [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) or [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/).
```rust
// AsyncTask from the first example
let task = AsyncTask { 8 };
let task_returned = queue let task_returned = queue
.insert_task(&task as &dyn AsyncRunnable) .create_task(&task)
.await .await
.unwrap(); .unwrap();
``` ```
### Starting workers ### Starting workers
#### the Blocking feature
Every worker runs in a separate thread. In case of panic, they are always restarted.
Use `WorkerPool` to start workers. Use `WorkerPool::builder` to create your worker pool and run tasks.
```rust
use fang::WorkerPool;
use fang::Queue;
// create a Queue
let mut worker_pool = WorkerPool::<Queue>::builder()
.queue(queue)
.number_of_workers(3_u32)
// if you want to run tasks of the specific kind
.task_type("my_task_type")
.build();
worker_pool.start();
```
#### the Asynk feature
Every worker runs in a separate `tokio` task. In case of panic, they are always restarted. Every worker runs in a separate `tokio` task. In case of panic, they are always restarted.
Use `AsyncWorkerPool` to start workers. Use `AsyncWorkerPool` to start workers.
```rust ```rust
use fang::asynk::async_worker_pool::AsyncWorkerPool; use backie::worker_pool::AsyncWorkerPool;
// Need to create a queue // Need to create a queue
// Also insert some tasks // Also insert some tasks
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder() let mut pool: AsyncWorkerPool<PgAsyncQueue> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size) .number_of_workers(max_pool_size)
.queue(queue.clone()) .queue(queue.clone())
// if you want to run tasks of the specific kind // if you want to run tasks of the specific kind
.task_type("my_task_type") .task_type("my_task_type".into())
.build(); .build();
pool.start().await; pool.start().await;
``` ```
Check out: Check out:
- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_worker) - simple worker example - [Simple Worker Example](https://github.com/rafaelcaricio/backie/tree/master/examples/simple_worker) - simple worker example
- [Simple Cron Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/blocking/simple_cron_worker) - simple worker example
- [Simple Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_async_worker) - simple async worker example
- [Simple Cron Async Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/asynk/simple_cron_async_worker) - simple async worker example
- [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses the Fang's blocking module to synchronize feeds and deliver updates to users.
- [weather_bot_rust](https://github.com/pxp9/weather_bot_rust) - A bot that provides weather info. It uses the Fang's asynk module to process updates from Telegram users and schedule weather info.
### Configuration ### Configuration
#### Blocking feature Use the `AsyncWorkerPool` builder:
Just use `TypeBuilder` for `WorkerPool`. ```rust
let mut pool: AsyncWorkerPool<PgAsyncQueue> = AsyncWorkerPool::builder()
#### Asynk feature .number_of_workers(max_pool_size)
.queue(queue.clone())
Just use `TypeBuilder` for `AsyncWorkerPool`. .build();
```
### Configuring the type of workers ### Configuring the type of workers
@ -322,36 +167,6 @@ pub enum RetentionMode {
Set retention mode with worker pools `TypeBuilder` in both modules. Set retention mode with worker pools `TypeBuilder` in both modules.
### Configuring sleep values
#### Blocking feature
You can use use `SleepParams` to configure sleep values:
```rust
pub struct SleepParams {
pub sleep_period: Duration, // default value is 5 seconds
pub max_sleep_period: Duration, // default value is 15 seconds
pub min_sleep_period: Duration, // default value is 5 seconds
pub sleep_step: Duration, // default value is 5 seconds
}
```
If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds.
Use `set_sleep_params` to set it:
```rust
let sleep_params = SleepParams {
sleep_period: Duration::from_secs(2),
max_sleep_period: Duration::from_secs(6),
min_sleep_period: Duration::from_secs(2),
sleep_step: Duration::from_secs(1),
};
```
Set sleep params with worker pools `TypeBuilder` in both modules.
## Contributing ## Contributing
1. [Fork it!](https://github.com/ayrat555/fang/fork) 1. [Fork it!](https://github.com/ayrat555/fang/fork)
@ -392,17 +207,14 @@ make ignored
make stop make stop
``` ```
## Authors ## Thank Fang's authors
I would like to thank the authors of the fang crate which was the inspiration for this project.
- Ayrat Badykov (@ayrat555) - Ayrat Badykov (@ayrat555)
- Pepe Márquez (@pxp9) - Pepe Márquez (@pxp9)
[ci]: https://crates.io/crates/backie
[s1]: https://img.shields.io/crates/v/fang.svg [docs]: https://docs.rs/backie/
[docs-badge]: https://img.shields.io/badge/docs-website-blue.svg [ga-test]: https://github.com/rafaelcaricio/backie/actions/workflows/rust.yml/badge.svg
[ci]: https://crates.io/crates/fang [ga-style]: https://github.com/rafaelcaricio/backie/actions/workflows/style.yml/badge.svg
[docs]: https://docs.rs/fang/
[ga-test]: https://github.com/ayrat555/fang/actions/workflows/rust.yml/badge.svg
[ga-style]: https://github.com/ayrat555/fang/actions/workflows/style.yml/badge.svg
[signal-hook]: https://crates.io/crates/signal-hook

View file

@ -1,10 +1,10 @@
[package] [package]
name = "simple_async_worker" name = "simple_worker"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
fang = { path = "../../../" } backie = { path = "../../" }
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.0" log = "0.4.0"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

BIN
logo.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

View file

@ -1,5 +1,5 @@
use std::fmt::Display;
use serde_json::Error as SerdeError; use serde_json::Error as SerdeError;
use std::fmt::Display;
use thiserror::Error; use thiserror::Error;
/// Library errors /// Library errors

View file

@ -1,8 +1,8 @@
use crate::errors::AsyncQueueError; use crate::errors::AsyncQueueError;
use crate::runnable::RunnableTask; use crate::runnable::RunnableTask;
use crate::schema::backie_tasks; use crate::schema::backie_tasks;
use crate::task::{NewTask, TaskId, TaskType, TaskHash};
use crate::task::Task; use crate::task::Task;
use crate::task::{NewTask, TaskHash, TaskId, TaskType};
use chrono::DateTime; use chrono::DateTime;
use chrono::Duration; use chrono::Duration;
use chrono::Utc; use chrono::Utc;
@ -124,9 +124,7 @@ impl Task {
task: Task, task: Task,
) -> Result<Task, AsyncQueueError> { ) -> Result<Task, AsyncQueueError> {
Ok(diesel::update(&task) Ok(diesel::update(&task)
.set(( .set((backie_tasks::running_at.eq(Utc::now()),))
backie_tasks::running_at.eq(Utc::now()),
))
.get_result::<Task>(connection) .get_result::<Task>(connection)
.await?) .await?)
} }
@ -135,17 +133,17 @@ impl Task {
connection: &mut AsyncPgConnection, connection: &mut AsyncPgConnection,
id: TaskId, id: TaskId,
) -> Result<Task, AsyncQueueError> { ) -> Result<Task, AsyncQueueError> {
Ok(diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id))) Ok(
.set(( diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id)))
backie_tasks::done_at.eq(Utc::now()), .set((backie_tasks::done_at.eq(Utc::now()),))
)) .get_result::<Task>(connection)
.get_result::<Task>(connection) .await?,
.await?) )
} }
pub(crate) async fn insert( pub(crate) async fn insert(
connection: &mut AsyncPgConnection, connection: &mut AsyncPgConnection,
runnable: &dyn RunnableTask runnable: &dyn RunnableTask,
) -> Result<Task, AsyncQueueError> { ) -> Result<Task, AsyncQueueError> {
let payload = serde_json::to_value(runnable)?; let payload = serde_json::to_value(runnable)?;
match runnable.uniq() { match runnable.uniq() {
@ -161,23 +159,21 @@ impl Task {
.get_result::<Task>(connection) .get_result::<Task>(connection)
.await?) .await?)
} }
Some(hash) => { Some(hash) => match Self::find_by_uniq_hash(connection, hash.clone()).await {
match Self::find_by_uniq_hash(connection, hash.clone()).await { Some(task) => Ok(task),
Some(task) => Ok(task), None => {
None => { let new_task = NewTask::builder()
let new_task = NewTask::builder() .uniq_hash(Some(hash))
.uniq_hash(Some(hash)) .task_type(runnable.task_type())
.task_type(runnable.task_type()) .payload(payload)
.payload(payload) .build();
.build();
Ok(diesel::insert_into(backie_tasks::table) Ok(diesel::insert_into(backie_tasks::table)
.values(new_task) .values(new_task)
.get_result::<Task>(connection) .get_result::<Task>(connection)
.await?) .await?)
}
} }
} },
} }
} }

View file

@ -1,6 +1,6 @@
use crate::errors::AsyncQueueError; use crate::errors::AsyncQueueError;
use crate::runnable::RunnableTask; use crate::runnable::RunnableTask;
use crate::task::{Task, TaskId, TaskType, TaskHash}; use crate::task::{Task, TaskHash, TaskId, TaskType};
use async_trait::async_trait; use async_trait::async_trait;
use diesel::result::Error::QueryBuilderError; use diesel::result::Error::QueryBuilderError;
use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::scoped_futures::ScopedFutureExt;
@ -16,7 +16,10 @@ pub trait Queueable: Send {
/// ///
/// This method returns one task of the `task_type` type. If `task_type` is `None` it will try to /// This method returns one task of the `task_type` type. If `task_type` is `None` it will try to
/// fetch a task of the type `common`. The returned task is marked as running and must be executed. /// fetch a task of the type `common`. The returned task is marked as running and must be executed.
async fn pull_next_task(&mut self, kind: Option<TaskType>) -> Result<Option<Task>, AsyncQueueError>; async fn pull_next_task(
&mut self,
kind: Option<TaskType>,
) -> Result<Option<Task>, AsyncQueueError>;
/// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type
/// created by an AsyncWorkerPool. /// created by an AsyncWorkerPool.
@ -26,7 +29,11 @@ pub trait Queueable: Send {
async fn find_task_by_id(&mut self, id: TaskId) -> Result<Task, AsyncQueueError>; async fn find_task_by_id(&mut self, id: TaskId) -> Result<Task, AsyncQueueError>;
/// Update the state of a task to failed and set an error_message. /// Update the state of a task to failed and set an error_message.
async fn set_task_failed(&mut self, id: TaskId, error_message: &str) -> Result<Task, AsyncQueueError>; async fn set_task_failed(
&mut self,
id: TaskId,
error_message: &str,
) -> Result<Task, AsyncQueueError>;
/// Update the state of a task to done. /// Update the state of a task to done.
async fn set_task_done(&mut self, id: TaskId) -> Result<Task, AsyncQueueError>; async fn set_task_done(&mut self, id: TaskId) -> Result<Task, AsyncQueueError>;
@ -57,18 +64,7 @@ pub trait Queueable: Send {
) -> Result<Task, AsyncQueueError>; ) -> Result<Task, AsyncQueueError>;
} }
/// An async queue that can be used to enqueue tasks. /// An async queue that is used to manipulate tasks, it uses PostgreSQL as storage.
/// It uses a PostgreSQL storage. It must be connected to perform any operation.
/// To connect an `AsyncQueue` to PostgreSQL database call the `connect` method.
/// A Queue can be created with the TypedBuilder.
///
/// ```rust
/// let mut queue = AsyncQueue::builder()
/// .uri("postgres://postgres:postgres@localhost/fang")
/// .max_pool_size(max_pool_size)
/// .build();
/// ```
///
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PgAsyncQueue { pub struct PgAsyncQueue {
pool: Pool<AsyncPgConnection>, pool: Pool<AsyncPgConnection>,
@ -98,7 +94,7 @@ impl Queueable for PgAsyncQueue {
return Ok(None); return Ok(None);
}; };
Task::set_running(conn, pending_task).await.map(|running_task| Some(running_task)) Task::set_running(conn, pending_task).await.map(Some)
} }
.scope_boxed() .scope_boxed()
}) })
@ -157,8 +153,10 @@ impl Queueable for PgAsyncQueue {
let task = Task::find_by_id(conn, id).await?; let task = Task::find_by_id(conn, id).await?;
Task::set_running(conn, task).await?; Task::set_running(conn, task).await?;
Ok(()) Ok(())
}.scope_boxed() }
}).await .scope_boxed()
})
.await
} }
async fn remove_task(&mut self, id: TaskId) -> Result<u64, AsyncQueueError> { async fn remove_task(&mut self, id: TaskId) -> Result<u64, AsyncQueueError> {
@ -228,6 +226,7 @@ impl Queueable for PgAsyncQueue {
#[cfg(test)] #[cfg(test)]
mod async_queue_tests { mod async_queue_tests {
use super::*; use super::*;
use crate::task::TaskState;
use crate::Scheduled; use crate::Scheduled;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::DateTime; use chrono::DateTime;
@ -235,7 +234,6 @@ mod async_queue_tests {
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use diesel_async::AsyncPgConnection; use diesel_async::AsyncPgConnection;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::task::TaskState;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct AsyncTask { struct AsyncTask {
@ -245,7 +243,10 @@ mod async_queue_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncTask { impl RunnableTask for AsyncTask {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
} }
@ -258,7 +259,10 @@ mod async_queue_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncUniqTask { impl RunnableTask for AsyncUniqTask {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
@ -276,7 +280,10 @@ mod async_queue_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncTaskSchedule { impl RunnableTask for AsyncTaskSchedule {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
@ -495,7 +502,10 @@ mod async_queue_tests {
assert_eq!(Some(2), number); assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task); assert_eq!(Some("AsyncTask"), type_task);
let result = test.remove_tasks_type(TaskType::from("nonexistentType")).await.unwrap(); let result = test
.remove_tasks_type(TaskType::from("nonexistentType"))
.await
.unwrap();
assert_eq!(0, result); assert_eq!(0, result);
let result = test.remove_tasks_type(TaskType::default()).await.unwrap(); let result = test.remove_tasks_type(TaskType::default()).await.unwrap();
@ -509,7 +519,10 @@ mod async_queue_tests {
let pool = pool().await; let pool = pool().await;
let mut test = PgAsyncQueue::new(pool); let mut test = PgAsyncQueue::new(pool);
let task = test.create_task(&AsyncUniqTask { number: 1 }).await.unwrap(); let task = test
.create_task(&AsyncUniqTask { number: 1 })
.await
.unwrap();
let metadata = task.payload.as_object().unwrap(); let metadata = task.payload.as_object().unwrap();
let number = metadata["number"].as_u64(); let number = metadata["number"].as_u64();
@ -518,7 +531,10 @@ mod async_queue_tests {
assert_eq!(Some(1), number); assert_eq!(Some(1), number);
assert_eq!(Some("AsyncUniqTask"), type_task); assert_eq!(Some("AsyncUniqTask"), type_task);
let task = test.create_task(&AsyncUniqTask { number: 2 }).await.unwrap(); let task = test
.create_task(&AsyncUniqTask { number: 2 })
.await
.unwrap();
let metadata = task.payload.as_object().unwrap(); let metadata = task.payload.as_object().unwrap();
let number = metadata["number"].as_u64(); let number = metadata["number"].as_u64();
@ -527,7 +543,8 @@ mod async_queue_tests {
assert_eq!(Some(2), number); assert_eq!(Some(2), number);
assert_eq!(Some("AsyncUniqTask"), type_task); assert_eq!(Some("AsyncUniqTask"), type_task);
let result = test.remove_task_by_hash(AsyncUniqTask { number: 0 }.uniq().unwrap()) let result = test
.remove_task_by_hash(AsyncUniqTask { number: 0 }.uniq().unwrap())
.await .await
.unwrap(); .unwrap();
assert!(!result, "Should **not** remove task"); assert!(!result, "Should **not** remove task");

View file

@ -1,9 +1,9 @@
use std::error::Error;
use crate::queue::Queueable; use crate::queue::Queueable;
use crate::task::TaskHash;
use crate::task::TaskType;
use crate::Scheduled; use crate::Scheduled;
use async_trait::async_trait; use async_trait::async_trait;
use crate::task::{TaskType}; use std::error::Error;
use crate::task::TaskHash;
pub const RETRIES_NUMBER: i32 = 20; pub const RETRIES_NUMBER: i32 = 20;

View file

@ -1,15 +1,15 @@
use std::borrow::Cow;
use std::fmt;
use std::fmt::Display;
use crate::schema::backie_tasks; use crate::schema::backie_tasks;
use chrono::DateTime; use chrono::DateTime;
use chrono::Utc; use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use diesel_derive_newtype::DieselNewType;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::borrow::Cow;
use std::fmt;
use std::fmt::Display;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use uuid::Uuid; use uuid::Uuid;
use serde::Serialize;
use diesel_derive_newtype::DieselNewType;
use sha2::{Digest, Sha256};
/// States of a task. /// States of a task.
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
@ -58,7 +58,10 @@ where
pub struct TaskHash(Cow<'static, str>); pub struct TaskHash(Cow<'static, str>);
impl TaskHash { impl TaskHash {
pub fn default_for_task<T>(value: &T) -> Result<Self, serde_json::Error> where T: Serialize { pub fn default_for_task<T>(value: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
let value = serde_json::to_value(value)?; let value = serde_json::to_value(value)?;
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(serde_json::to_string(&value)?.as_bytes()); hasher.update(serde_json::to_string(&value)?.as_bytes());

View file

@ -1,11 +1,11 @@
use std::error::Error;
use crate::errors::BackieError; use crate::errors::BackieError;
use crate::queue::Queueable; use crate::queue::Queueable;
use crate::runnable::RunnableTask; use crate::runnable::RunnableTask;
use crate::task::{Task, TaskType}; use crate::task::{Task, TaskType};
use crate::RetentionMode;
use crate::Scheduled::*; use crate::Scheduled::*;
use crate::{RetentionMode};
use log::error; use log::error;
use std::error::Error;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
/// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue /// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue
@ -41,7 +41,11 @@ impl<Q> AsyncWorker<Q>
where where
Q: Queueable + Clone + Sync + 'static, Q: Queueable + Clone + Sync + 'static,
{ {
async fn run(&mut self, task: Task, runnable: Box<dyn RunnableTask>) -> Result<(), BackieError> { async fn run(
&mut self,
task: Task,
runnable: Box<dyn RunnableTask>,
) -> Result<(), BackieError> {
// TODO: catch panics // TODO: catch panics
let result = runnable.run(&mut self.queue).await; let result = runnable.run(&mut self.queue).await;
match result { match result {
@ -81,7 +85,9 @@ where
} }
Err(error) => { Err(error) => {
log::debug!("Task {} failed and kept in the database", task.id); log::debug!("Task {} failed and kept in the database", task.id);
self.queue.set_task_failed(task.id, &format!("{}", error)).await?; self.queue
.set_task_failed(task.id, &format!("{}", error))
.await?;
} }
}, },
RetentionMode::RemoveAll => { RetentionMode::RemoveAll => {
@ -95,7 +101,9 @@ where
} }
Err(error) => { Err(error) => {
log::debug!("Task {} failed and kept in the database", task.id); log::debug!("Task {} failed and kept in the database", task.id);
self.queue.set_task_failed(task.id, &format!("{}", error)).await?; self.queue
.set_task_failed(task.id, &format!("{}", error))
.await?;
} }
}, },
}; };
@ -118,13 +126,10 @@ where
pub(crate) async fn run_tasks(&mut self) { pub(crate) async fn run_tasks(&mut self) {
loop { loop {
// TODO: check if should stop the worker // TODO: check if should stop the worker
match self match self.queue.pull_next_task(self.task_type.clone()).await {
.queue
.pull_next_task(self.task_type.clone())
.await
{
Ok(Some(task)) => { Ok(Some(task)) => {
let actual_task: Box<dyn RunnableTask> = serde_json::from_value(task.payload.clone()).unwrap(); let actual_task: Box<dyn RunnableTask> =
serde_json::from_value(task.payload.clone()).unwrap();
// check if task is scheduled or not // check if task is scheduled or not
if let Some(CronPattern(_)) = actual_task.cron() { if let Some(CronPattern(_)) = actual_task.cron() {
@ -150,11 +155,7 @@ where
#[cfg(test)] #[cfg(test)]
pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> {
loop { loop {
match self match self.queue.pull_next_task(self.task_type.clone()).await {
.queue
.pull_next_task(self.task_type.clone())
.await
{
Ok(Some(task)) => { Ok(Some(task)) => {
let actual_task: Box<dyn RunnableTask> = let actual_task: Box<dyn RunnableTask> =
serde_json::from_value(task.payload.clone()).unwrap(); serde_json::from_value(task.payload.clone()).unwrap();
@ -185,8 +186,9 @@ where
mod async_worker_tests { mod async_worker_tests {
use super::*; use super::*;
use crate::errors::BackieError; use crate::errors::BackieError;
use crate::queue::Queueable;
use crate::queue::PgAsyncQueue; use crate::queue::PgAsyncQueue;
use crate::queue::Queueable;
use crate::task::TaskState;
use crate::worker::Task; use crate::worker::Task;
use crate::RetentionMode; use crate::RetentionMode;
use crate::Scheduled; use crate::Scheduled;
@ -196,7 +198,6 @@ mod async_worker_tests {
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use diesel_async::AsyncPgConnection; use diesel_async::AsyncPgConnection;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::task::TaskState;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct WorkerAsyncTask { struct WorkerAsyncTask {
@ -206,7 +207,10 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for WorkerAsyncTask { impl RunnableTask for WorkerAsyncTask {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
} }
@ -219,7 +223,10 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for WorkerAsyncTaskSchedule { impl RunnableTask for WorkerAsyncTaskSchedule {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
fn cron(&self) -> Option<Scheduled> { fn cron(&self) -> Option<Scheduled> {
@ -235,7 +242,10 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncFailedTask { impl RunnableTask for AsyncFailedTask {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
let message = format!("number {} is wrong :(", self.number); let message = format!("number {} is wrong :(", self.number);
Err(Box::new(BackieError { Err(Box::new(BackieError {
@ -254,7 +264,10 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncRetryTask { impl RunnableTask for AsyncRetryTask {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
let message = "Failed".to_string(); let message = "Failed".to_string();
Err(Box::new(BackieError { Err(Box::new(BackieError {
@ -273,12 +286,15 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncTaskType1 { impl RunnableTask for AsyncTaskType1 {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
fn task_type(&self) -> TaskType { fn task_type(&self) -> TaskType {
TaskType::from("type1") "type1".into()
} }
} }
@ -288,7 +304,10 @@ mod async_worker_tests {
#[typetag::serde] #[typetag::serde]
#[async_trait] #[async_trait]
impl RunnableTask for AsyncTaskType2 { impl RunnableTask for AsyncTaskType2 {
async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { async fn run(
&self,
_queueable: &mut dyn Queueable,
) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> {
Ok(()) Ok(())
} }
@ -324,33 +343,33 @@ mod async_worker_tests {
// async fn schedule_task_test() { // async fn schedule_task_test() {
// let pool = pool().await; // let pool = pool().await;
// let mut test = PgAsyncQueue::new(pool); // let mut test = PgAsyncQueue::new(pool);
// //
// let actual_task = WorkerAsyncTaskSchedule { number: 1 }; // let actual_task = WorkerAsyncTaskSchedule { number: 1 };
// //
// let task = test.schedule_task(&actual_task).await.unwrap(); // let task = test.schedule_task(&actual_task).await.unwrap();
// //
// let id = task.id; // let id = task.id;
// //
// let mut worker = AsyncWorker::<PgAsyncQueue>::builder() // let mut worker = AsyncWorker::<PgAsyncQueue>::builder()
// .queue(test.clone()) // .queue(test.clone())
// .retention_mode(RetentionMode::KeepAll) // .retention_mode(RetentionMode::KeepAll)
// .build(); // .build();
// //
// worker.run_tasks_until_none().await.unwrap(); // worker.run_tasks_until_none().await.unwrap();
// //
// let task = worker.queue.find_task_by_id(id).await.unwrap(); // let task = worker.queue.find_task_by_id(id).await.unwrap();
// //
// assert_eq!(id, task.id); // assert_eq!(id, task.id);
// assert_eq!(TaskState::Ready, task.state()); // assert_eq!(TaskState::Ready, task.state());
// //
// tokio::time::sleep(core::time::Duration::from_secs(3)).await; // tokio::time::sleep(core::time::Duration::from_secs(3)).await;
// //
// worker.run_tasks_until_none().await.unwrap(); // worker.run_tasks_until_none().await.unwrap();
// //
// let task = test.find_task_by_id(id).await.unwrap(); // let task = test.find_task_by_id(id).await.unwrap();
// assert_eq!(id, task.id); // assert_eq!(id, task.id);
// assert_eq!(TaskState::Done, task.state()); // assert_eq!(TaskState::Done, task.state());
// //
// test.remove_all_tasks().await.unwrap(); // test.remove_all_tasks().await.unwrap();
// } // }

View file

@ -1,7 +1,7 @@
use crate::queue::Queueable; use crate::queue::Queueable;
use crate::task::{TaskType}; use crate::task::TaskType;
use crate::worker::AsyncWorker; use crate::worker::AsyncWorker;
use crate::{RetentionMode}; use crate::RetentionMode;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use log::error; use log::error;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -50,7 +50,7 @@ where
let join_handle = tokio::spawn(async move { let join_handle = tokio::spawn(async move {
let mut worker: AsyncWorker<AQueue> = AsyncWorker::builder() let mut worker: AsyncWorker<AQueue> = AsyncWorker::builder()
.queue(inner_pool.queue.clone()) .queue(inner_pool.queue.clone())
.retention_mode(inner_pool.retention_mode.clone()) .retention_mode(inner_pool.retention_mode)
.task_type(inner_pool.task_type.clone()) .task_type(inner_pool.task_type.clone())
.build(); .build();