Start documenting the project (#96)

* Start documenting the project

* Add AsyncQueue implementation

* Documenting Async runnable

* By default value in uniq function

* Fix errors and warnings async runnable docs

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* format something

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* uniq documentation

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

Co-authored-by: pxp9 <pepe.marquezromero@gmail.com>
Co-authored-by: Pmarquez <48651252+pxp9@users.noreply.github.com>
This commit is contained in:
Ayrat Badykov 2022-10-18 09:30:17 +03:00 committed by GitHub
parent cfc3c46164
commit 9b92a4a34e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 3 deletions

View file

@ -6,7 +6,7 @@
Background task processing library for Rust. It uses Postgres DB as a task queue. Background task processing library for Rust. It uses Postgres DB as a task queue.
## Features ## Key Features
Here are some of the fang's key features: Here are some of the fang's key features:

View file

@ -113,39 +113,59 @@ impl From<cron::error::Error> for AsyncQueueError {
} }
} }
/// This trait defines operations for an asynchronous queue.
/// The trait can be implemented for different storage backends.
/// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future.
#[async_trait] #[async_trait]
pub trait AsyncQueueable: Send { pub trait AsyncQueueable: Send {
/// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to
/// fetch a task of the type `common`. After fetching it should update the state of the task to
/// `FangTaskState::InProgress`.
///
async fn fetch_and_touch_task( async fn fetch_and_touch_task(
&mut self, &mut self,
task_type: Option<String>, task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError>; ) -> Result<Option<Task>, AsyncQueueError>;
/// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type
/// created by an AsyncWorkerPool.
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>; async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
/// The method will remove all tasks from the queue
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>; async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
/// Remove all tasks that are scheduled in the future.
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>; async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;
/// Remove a task by its id.
async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError>; async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError>;
/// Remove a task by its metadata (struct fields values)
async fn remove_task_by_metadata( async fn remove_task_by_metadata(
&mut self, &mut self,
task: &dyn AsyncRunnable, task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError>; ) -> Result<u64, AsyncQueueError>;
/// Removes all tasks that have the specified `task_type`.
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>; async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
/// Retrieve a task from storage by its `id`.
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError>; async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError>;
/// Update the state field of the specified task
/// See the `FangTaskState` enum for possible states.
async fn update_task_state( async fn update_task_state(
&mut self, &mut self,
task: Task, task: Task,
state: FangTaskState, state: FangTaskState,
) -> Result<Task, AsyncQueueError>; ) -> Result<Task, AsyncQueueError>;
/// Update the state of a task to `FangTaskState::Failed` and set an error_message.
async fn fail_task(&mut self, task: Task, error_message: &str) async fn fail_task(&mut self, task: Task, error_message: &str)
-> Result<Task, AsyncQueueError>; -> Result<Task, AsyncQueueError>;
/// Schedule a task.
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>; async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
async fn schedule_retry( async fn schedule_retry(
@ -156,6 +176,19 @@ pub trait AsyncQueueable: Send {
) -> Result<Task, AsyncQueueError>; ) -> Result<Task, AsyncQueueError>;
} }
/// An async queue that can be used to enqueue tasks.
/// It uses a PostgreSQL storage. It must be connected to perform any operation.
/// To connect an `AsyncQueue` to PostgreSQL database call the `connect` method.
/// A Queue can be created with the TypedBuilder.
///
/// ```rust
/// let mut queue = AsyncQueue::builder()
/// .uri("postgres://postgres:postgres@localhost/fang")
/// .max_pool_size(max_pool_size)
/// .build();
/// ```
///
#[derive(TypedBuilder, Debug, Clone)] #[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue<Tls> pub struct AsyncQueue<Tls>
where where
@ -344,6 +377,7 @@ where
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send, <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send, <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{ {
/// Check if the connection with db is established
pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> { pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> {
if self.connected { if self.connected {
Ok(()) Ok(())
@ -351,6 +385,8 @@ where
Err(AsyncQueueError::NotConnectedError) Err(AsyncQueueError::NotConnectedError)
} }
} }
/// Connect to the db if not connected
pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> { pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> {
let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?; let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?;
@ -363,6 +399,7 @@ where
self.connected = true; self.connected = true;
Ok(()) Ok(())
} }
async fn remove_all_tasks_query( async fn remove_all_tasks_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> { ) -> Result<u64, AsyncQueueError> {

View file

@ -37,27 +37,55 @@ impl From<SerdeError> for FangError {
} }
} }
/// Implement this trait to run your custom tasks.
#[typetag::serde(tag = "type")] #[typetag::serde(tag = "type")]
#[async_trait] #[async_trait]
pub trait AsyncRunnable: Send + Sync { pub trait AsyncRunnable: Send + Sync {
/// Execute the task. This method should define its logic
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>; async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>;
/// Define the type of the task.
/// The `common` task type is used by default
fn task_type(&self) -> String { fn task_type(&self) -> String {
COMMON_TYPE.to_string() COMMON_TYPE.to_string()
} }
/// If set to true, no new tasks with the same metadata will be inserted
/// By default it is set to false.
fn uniq(&self) -> bool { fn uniq(&self) -> bool {
false false
} }
/// This method defines if a task is periodic or it should be executed once in the future.
///
/// Be careful it works only with the UTC timezone.
///
///
/// Example:
///
///
/**
```rust
fn cron(&self) -> Option<Scheduled> {
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
```
*/
/// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant.
fn cron(&self) -> Option<Scheduled> { fn cron(&self) -> Option<Scheduled> {
None None
} }
/// Define the maximum number of retries the task will be retried.
/// By default the number of retries is 20.
fn max_retries(&self) -> i32 { fn max_retries(&self) -> i32 {
RETRIES_NUMBER RETRIES_NUMBER
} }
/// Define the backoff mode
/// By default, it is exponential, 2^(attempt)
fn backoff(&self, attempt: u32) -> u32 { fn backoff(&self, attempt: u32) -> u32 {
u32::pow(2, attempt) u32::pow(2, attempt)
} }

View file

@ -1,4 +1,4 @@
pub mod error; mod error;
pub mod fang_task_state; pub mod fang_task_state;
pub mod queue; pub mod queue;
pub mod runnable; pub mod runnable;

View file

@ -1,9 +1,18 @@
/// Possible states of the task
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] #[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
#[DieselTypePath = "crate::schema::sql_types::FangTaskState"] #[DieselTypePath = "crate::schema::sql_types::FangTaskState"]
pub enum FangTaskState { pub enum FangTaskState {
/// The task is ready to be executed
New, New,
/// The task is being executing.
///
/// The task may stay in this state forever
/// if an unexpected error happened
InProgress, InProgress,
/// The task failed
Failed, Failed,
/// The task finished successfully
Finished, Finished,
/// The task is being retried. It means it failed but it's scheduled to be executed again
Retried, Retried,
} }

View file

@ -2,49 +2,80 @@
use std::time::Duration; use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use typed_builder::TypedBuilder;
/// Represents a schedule for scheduled tasks.
///
/// It's used in the [`AsyncRunnable::cron`] and [`Runnable::cron`]
#[derive(Debug, Clone)]
pub enum Scheduled { pub enum Scheduled {
/// A cron pattern for a periodic task
///
/// For example, `Scheduled::CronPattern("0/20 * * * * * *")`
CronPattern(String), CronPattern(String),
/// A datetime for a scheduled task that will be executed once
///
/// For example, `Scheduled::ScheduleOnce(chrono::Utc::now() + std::time::Duration::seconds(7i64))`
ScheduleOnce(DateTime<Utc>), ScheduleOnce(DateTime<Utc>),
} }
/// List of error types that can occur while working with cron schedules.
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum CronError { pub enum CronError {
/// A problem occured during cron schedule parsing.
#[error(transparent)] #[error(transparent)]
LibraryError(#[from] cron::error::Error), LibraryError(#[from] cron::error::Error),
/// [`Scheduled`] enum variant is not provided
#[error("You have to implement method `cron()` in your AsyncRunnable")] #[error("You have to implement method `cron()` in your AsyncRunnable")]
TaskNotSchedulableError, TaskNotSchedulableError,
/// The next execution can not be determined using the current [`Scheduled::CronPattern`]
#[error("No timestamps match with this cron pattern")] #[error("No timestamps match with this cron pattern")]
NoTimestampsError, NoTimestampsError,
} }
/// All possible options for retaining tasks in the db after their execution.
///
/// The default mode is [`RetentionMode::RemoveAll`]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum RetentionMode { pub enum RetentionMode {
/// Keep all tasks
KeepAll, KeepAll,
/// Remove all tasks
RemoveAll, RemoveAll,
/// Remove only successfully finished tasks
RemoveFinished, RemoveFinished,
} }
impl Default for RetentionMode { impl Default for RetentionMode {
fn default() -> Self { fn default() -> Self {
RetentionMode::RemoveAll RetentionMode::RemoveAll
} }
} }
#[derive(Clone, Debug)] /// Configuration parameters for putting workers to sleep
/// while they don't have any tasks to execute
#[derive(Clone, Debug, TypedBuilder)]
pub struct SleepParams { pub struct SleepParams {
/// the current sleep period
pub sleep_period: Duration, pub sleep_period: Duration,
/// the maximum period a worker is allowed to sleep.
/// After this value is reached, `sleep_period` is not increased anymore
pub max_sleep_period: Duration, pub max_sleep_period: Duration,
/// the initial value of the `sleep_period`
pub min_sleep_period: Duration, pub min_sleep_period: Duration,
/// the step that `sleep_period` is increased by on every iteration
pub sleep_step: Duration, pub sleep_step: Duration,
} }
impl SleepParams { impl SleepParams {
/// Reset the `sleep_period` if `sleep_period` > `min_sleep_period`
pub fn maybe_reset_sleep_period(&mut self) { pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period { if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period; self.sleep_period = self.min_sleep_period;
} }
} }
/// Increase the `sleep_period` by the `sleep_step` if the `max_sleep_period` is not reached
pub fn maybe_increase_sleep_period(&mut self) { pub fn maybe_increase_sleep_period(&mut self) {
if self.sleep_period < self.max_sleep_period { if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step; self.sleep_period += self.sleep_step;
@ -63,11 +94,14 @@ impl Default for SleepParams {
} }
} }
/// An error that can happen during executing of tasks
#[derive(Debug)] #[derive(Debug)]
pub struct FangError { pub struct FangError {
/// A description of an error
pub description: String, pub description: String,
} }
#[doc(hidden)]
#[cfg(feature = "blocking")] #[cfg(feature = "blocking")]
extern crate diesel; extern crate diesel;
@ -94,6 +128,7 @@ pub use chrono::Utc;
#[cfg(feature = "blocking")] #[cfg(feature = "blocking")]
pub mod blocking; pub mod blocking;
#[cfg(feature = "blocking")] #[cfg(feature = "blocking")]
pub use blocking::*; pub use blocking::*;