diff --git a/README.md b/README.md index 49f13a2..a12f7ed 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Background task processing library for Rust. It uses Postgres DB as a task queue. -## Features +## Key Features Here are some of the fang's key features: diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 641655c..877bff1 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -113,39 +113,59 @@ impl From for AsyncQueueError { } } +/// This trait defines operations for an asynchronous queue. +/// The trait can be implemented for different storage backends. +/// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future. + #[async_trait] pub trait AsyncQueueable: Send { + /// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to + /// fetch a task of the type `common`. After fetching it should update the state of the task to + /// `FangTaskState::InProgress`. + /// async fn fetch_and_touch_task( &mut self, task_type: Option, ) -> Result, AsyncQueueError>; + /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type + /// created by an AsyncWorkerPool. async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result; + /// The method will remove all tasks from the queue async fn remove_all_tasks(&mut self) -> Result; + /// Remove all tasks that are scheduled in the future. async fn remove_all_scheduled_tasks(&mut self) -> Result; + /// Remove a task by its id. async fn remove_task(&mut self, id: Uuid) -> Result; + /// Remove a task by its metadata (struct fields values) async fn remove_task_by_metadata( &mut self, task: &dyn AsyncRunnable, ) -> Result; + /// Removes all tasks that have the specified `task_type`. async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + /// Retrieve a task from storage by its `id`. async fn find_task_by_id(&mut self, id: Uuid) -> Result; + /// Update the state field of the specified task + /// See the `FangTaskState` enum for possible states. async fn update_task_state( &mut self, task: Task, state: FangTaskState, ) -> Result; + /// Update the state of a task to `FangTaskState::Failed` and set an error_message. async fn fail_task(&mut self, task: Task, error_message: &str) -> Result; + /// Schedule a task. async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; async fn schedule_retry( @@ -156,6 +176,19 @@ pub trait AsyncQueueable: Send { ) -> Result; } +/// An async queue that can be used to enqueue tasks. +/// It uses a PostgreSQL storage. It must be connected to perform any operation. +/// To connect an `AsyncQueue` to PostgreSQL database call the `connect` method. +/// A Queue can be created with the TypedBuilder. +/// +/// ```rust +/// let mut queue = AsyncQueue::builder() +/// .uri("postgres://postgres:postgres@localhost/fang") +/// .max_pool_size(max_pool_size) +/// .build(); +/// ``` +/// + #[derive(TypedBuilder, Debug, Clone)] pub struct AsyncQueue where @@ -344,6 +377,7 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { + /// Check if the connection with db is established pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> { if self.connected { Ok(()) @@ -351,6 +385,8 @@ where Err(AsyncQueueError::NotConnectedError) } } + + /// Connect to the db if not connected pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> { let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?; @@ -363,6 +399,7 @@ where self.connected = true; Ok(()) } + async fn remove_all_tasks_query( transaction: &mut Transaction<'_>, ) -> Result { diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index 05efa99..7a7bac3 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -37,27 +37,55 @@ impl From for FangError { } } +/// Implement this trait to run your custom tasks. #[typetag::serde(tag = "type")] #[async_trait] pub trait AsyncRunnable: Send + Sync { + /// Execute the task. This method should define its logic async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>; + /// Define the type of the task. + /// The `common` task type is used by default fn task_type(&self) -> String { COMMON_TYPE.to_string() } + /// If set to true, no new tasks with the same metadata will be inserted + /// By default it is set to false. fn uniq(&self) -> bool { false } + /// This method defines if a task is periodic or it should be executed once in the future. + /// + /// Be careful it works only with the UTC timezone. + /// + /// + /// Example: + /// + /// + /** + ```rust + fn cron(&self) -> Option { + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + ``` + */ + + /// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant. fn cron(&self) -> Option { None } + /// Define the maximum number of retries the task will be retried. + /// By default the number of retries is 20. fn max_retries(&self) -> i32 { RETRIES_NUMBER } + /// Define the backoff mode + /// By default, it is exponential, 2^(attempt) fn backoff(&self, attempt: u32) -> u32 { u32::pow(2, attempt) } diff --git a/src/blocking.rs b/src/blocking.rs index 1f4f214..69f3b07 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -1,4 +1,4 @@ -pub mod error; +mod error; pub mod fang_task_state; pub mod queue; pub mod runnable; diff --git a/src/blocking/fang_task_state.rs b/src/blocking/fang_task_state.rs index 14c983f..3dc52f8 100644 --- a/src/blocking/fang_task_state.rs +++ b/src/blocking/fang_task_state.rs @@ -1,9 +1,18 @@ +/// Possible states of the task #[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] #[DieselTypePath = "crate::schema::sql_types::FangTaskState"] pub enum FangTaskState { + /// The task is ready to be executed New, + /// The task is being executing. + /// + /// The task may stay in this state forever + /// if an unexpected error happened InProgress, + /// The task failed Failed, + /// The task finished successfully Finished, + /// The task is being retried. It means it failed but it's scheduled to be executed again Retried, } diff --git a/src/lib.rs b/src/lib.rs index aac3f44..afbe235 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,49 +2,80 @@ use std::time::Duration; use thiserror::Error; +use typed_builder::TypedBuilder; +/// Represents a schedule for scheduled tasks. +/// +/// It's used in the [`AsyncRunnable::cron`] and [`Runnable::cron`] +#[derive(Debug, Clone)] pub enum Scheduled { + /// A cron pattern for a periodic task + /// + /// For example, `Scheduled::CronPattern("0/20 * * * * * *")` CronPattern(String), + /// A datetime for a scheduled task that will be executed once + /// + /// For example, `Scheduled::ScheduleOnce(chrono::Utc::now() + std::time::Duration::seconds(7i64))` ScheduleOnce(DateTime), } +/// List of error types that can occur while working with cron schedules. #[derive(Debug, Error)] pub enum CronError { + /// A problem occured during cron schedule parsing. #[error(transparent)] LibraryError(#[from] cron::error::Error), + /// [`Scheduled`] enum variant is not provided #[error("You have to implement method `cron()` in your AsyncRunnable")] TaskNotSchedulableError, + /// The next execution can not be determined using the current [`Scheduled::CronPattern`] #[error("No timestamps match with this cron pattern")] NoTimestampsError, } +/// All possible options for retaining tasks in the db after their execution. +/// +/// The default mode is [`RetentionMode::RemoveAll`] #[derive(Clone, Debug)] pub enum RetentionMode { + /// Keep all tasks KeepAll, + /// Remove all tasks RemoveAll, + /// Remove only successfully finished tasks RemoveFinished, } + impl Default for RetentionMode { fn default() -> Self { RetentionMode::RemoveAll } } -#[derive(Clone, Debug)] +/// Configuration parameters for putting workers to sleep +/// while they don't have any tasks to execute +#[derive(Clone, Debug, TypedBuilder)] pub struct SleepParams { + /// the current sleep period pub sleep_period: Duration, + /// the maximum period a worker is allowed to sleep. + /// After this value is reached, `sleep_period` is not increased anymore pub max_sleep_period: Duration, + /// the initial value of the `sleep_period` pub min_sleep_period: Duration, + /// the step that `sleep_period` is increased by on every iteration pub sleep_step: Duration, } impl SleepParams { + /// Reset the `sleep_period` if `sleep_period` > `min_sleep_period` pub fn maybe_reset_sleep_period(&mut self) { if self.sleep_period != self.min_sleep_period { self.sleep_period = self.min_sleep_period; } } + /// Increase the `sleep_period` by the `sleep_step` if the `max_sleep_period` is not reached pub fn maybe_increase_sleep_period(&mut self) { if self.sleep_period < self.max_sleep_period { self.sleep_period += self.sleep_step; @@ -63,11 +94,14 @@ impl Default for SleepParams { } } +/// An error that can happen during executing of tasks #[derive(Debug)] pub struct FangError { + /// A description of an error pub description: String, } +#[doc(hidden)] #[cfg(feature = "blocking")] extern crate diesel; @@ -94,6 +128,7 @@ pub use chrono::Utc; #[cfg(feature = "blocking")] pub mod blocking; + #[cfg(feature = "blocking")] pub use blocking::*;