diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index 3f149c7..b66af7d 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -9,6 +9,7 @@ use crate::{RetentionMode, SleepParams}; use log::error; use typed_builder::TypedBuilder; +/// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue #[derive(TypedBuilder)] pub struct AsyncWorker where @@ -28,11 +29,7 @@ impl AsyncWorker where AQueue: AsyncQueueable + Clone + Sync + 'static, { - pub async fn run( - &mut self, - task: Task, - runnable: Box, - ) -> Result<(), FangError> { + async fn run(&mut self, task: Task, runnable: Box) -> Result<(), FangError> { let result = runnable.run(&mut self.queue).await; match result { @@ -86,13 +83,13 @@ where Ok(()) } - pub async fn sleep(&mut self) { + async fn sleep(&mut self) { self.sleep_params.maybe_increase_sleep_period(); tokio::time::sleep(self.sleep_params.sleep_period).await; } - pub async fn run_tasks(&mut self) -> Result<(), FangError> { + pub(crate) async fn run_tasks(&mut self) -> Result<(), FangError> { loop { //fetch task match self diff --git a/src/asynk/async_worker_pool.rs b/src/asynk/async_worker_pool.rs index 6a2e000..784440e 100644 --- a/src/asynk/async_worker_pool.rs +++ b/src/asynk/async_worker_pool.rs @@ -14,13 +14,18 @@ where AQueue: AsyncQueueable + Clone + Sync + 'static, { #[builder(setter(into))] + /// the AsyncWorkerPool uses a queue to control the tasks that will be executed. pub queue: AQueue, + /// sleep_params controls how much time a worker will sleep while waiting for tasks #[builder(default, setter(into))] pub sleep_params: SleepParams, + /// retention_mode controls if tasks should be persisted after execution #[builder(default, setter(into))] pub retention_mode: RetentionMode, + /// the number of workers of the AsyncWorkerPool. #[builder(setter(into))] pub number_of_workers: u32, + /// The type of tasks that will be executed by `AsyncWorkerPool`. #[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))] pub task_type: String, } @@ -29,6 +34,8 @@ impl AsyncWorkerPool where AQueue: AsyncQueueable + Clone + Sync + 'static, { + /// Starts the configured number of workers + /// This is necessary in order to execute tasks. pub async fn start(&mut self) { for idx in 0..self.number_of_workers { let pool = self.clone(); @@ -37,7 +44,7 @@ where } #[async_recursion] - pub async fn supervise_task(pool: AsyncWorkerPool, restarts: u64, worker_number: u32) { + async fn supervise_task(pool: AsyncWorkerPool, restarts: u64, worker_number: u32) { let restarts = restarts + 1; let join_handle = Self::spawn_worker( pool.queue.clone(), @@ -56,7 +63,7 @@ where } } - pub async fn spawn_worker( + async fn spawn_worker( queue: AQueue, sleep_params: SleepParams, retention_mode: RetentionMode, @@ -66,7 +73,7 @@ where Self::run_worker(queue, sleep_params, retention_mode, task_type).await }) } - pub async fn run_worker( + async fn run_worker( queue: AQueue, sleep_params: SleepParams, retention_mode: RetentionMode, diff --git a/src/blocking/queue.rs b/src/blocking/queue.rs index 8fa570c..75e0acc 100644 --- a/src/blocking/queue.rs +++ b/src/blocking/queue.rs @@ -84,29 +84,46 @@ impl From for QueueError { } } +/// This trait defines operations for a synchronous queue. +/// The trait can be implemented for different storage backends. +/// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future. pub trait Queueable { + /// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to + /// fetch a task of the type `common`. After fetching it should update the state of the task to + /// `FangTaskState::InProgress`. fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError>; + /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type + /// created by an `WorkerPool`. fn insert_task(&self, params: &dyn Runnable) -> Result; + /// The method will remove all tasks from the queue fn remove_all_tasks(&self) -> Result; + /// Remove all tasks that are scheduled in the future. fn remove_all_scheduled_tasks(&self) -> Result; + /// Removes all tasks that have the specified `task_type`. fn remove_tasks_of_type(&self, task_type: &str) -> Result; + /// Remove a task by its id. fn remove_task(&self, id: Uuid) -> Result; /// To use this function task has to be uniq. uniq() has to return true. /// If task is not uniq this function will not do anything. + /// Remove a task by its metadata (struct fields values) fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result; fn find_task_by_id(&self, id: Uuid) -> Option; + /// Update the state field of the specified task + /// See the `FangTaskState` enum for possible states. fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result; + /// Update the state of a task to `FangTaskState::Failed` and set an error_message. fn fail_task(&self, task: &Task, error: &str) -> Result; + /// Schedule a task. fn schedule_task(&self, task: &dyn Runnable) -> Result; fn schedule_retry( @@ -117,6 +134,27 @@ pub trait Queueable { ) -> Result; } +/// An async queue that can be used to enqueue tasks. +/// It uses a PostgreSQL storage. It must be connected to perform any operation. +/// To connect a `Queue` to the PostgreSQL database call the `get_connection` method. +/// A Queue can be created with the TypedBuilder. +/// +/// ```rust +/// // Set DATABASE_URL enviroment variable if you would like to try this function. +/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { +/// let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); +/// +/// let manager = r2d2::ConnectionManager::::new(database_url); +/// +/// r2d2::Pool::builder() +/// .max_size(pool_size) +/// .build(manager) +/// .unwrap() +/// } +/// +/// let queue = Queue::builder().connection_pool(connection_pool(3)).build(); +/// ``` +/// #[derive(Clone, TypedBuilder)] pub struct Queue { #[builder(setter(into))] @@ -208,6 +246,7 @@ impl Queueable for Queue { } impl Queue { + /// Connect to the db if not connected pub fn get_connection(&self) -> Result { let result = self.connection_pool.get(); diff --git a/src/blocking/runnable.rs b/src/blocking/runnable.rs index 6a84150..01a6f69 100644 --- a/src/blocking/runnable.rs +++ b/src/blocking/runnable.rs @@ -5,26 +5,48 @@ use crate::Scheduled; pub const COMMON_TYPE: &str = "common"; pub const RETRIES_NUMBER: i32 = 20; +/// Implement this trait to run your custom tasks. #[typetag::serde(tag = "type")] pub trait Runnable { + /// Execute the task. This method should define its logic fn run(&self, _queueable: &dyn Queueable) -> Result<(), FangError>; + /// Define the type of the task. + /// The `common` task type is used by default fn task_type(&self) -> String { COMMON_TYPE.to_string() } + /// If set to true, no new tasks with the same metadata will be inserted + /// By default it is set to false. fn uniq(&self) -> bool { false } + /// This method defines if a task is periodic or it should be executed once in the future. + /// + /// Be careful it works only with the UTC timezone. + /** + ```rust + fn cron(&self) -> Option { + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + ``` + */ + /// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant. fn cron(&self) -> Option { None } + /// Define the maximum number of retries the task will be retried. + /// By default the number of retries is 20. fn max_retries(&self) -> i32 { RETRIES_NUMBER } + /// Define the backoff mode + /// By default, it is exponential, 2^(attempt) fn backoff(&self, attempt: u32) -> u32 { u32::pow(2, attempt) } diff --git a/src/blocking/worker.rs b/src/blocking/worker.rs index d24de14..a114f7e 100644 --- a/src/blocking/worker.rs +++ b/src/blocking/worker.rs @@ -13,6 +13,8 @@ use log::error; use std::thread; use typed_builder::TypedBuilder; +/// A executioner of tasks, it executes tasks only of one given task_type, it sleeps when they are +/// not tasks to be executed. #[derive(TypedBuilder)] pub struct Worker where @@ -52,7 +54,7 @@ where } } - pub fn run_tasks(&mut self) -> Result<(), FangError> { + pub(crate) fn run_tasks(&mut self) -> Result<(), FangError> { loop { match self.queue.fetch_and_touch_task(self.task_type.clone()) { Ok(Some(task)) => { @@ -114,7 +116,7 @@ where self.sleep_params.maybe_reset_sleep_period(); } - pub fn sleep(&mut self) { + fn sleep(&mut self) { self.sleep_params.maybe_increase_sleep_period(); thread::sleep(self.sleep_params.sleep_period); diff --git a/src/blocking/worker_pool.rs b/src/blocking/worker_pool.rs index d445535..b519028 100644 --- a/src/blocking/worker_pool.rs +++ b/src/blocking/worker_pool.rs @@ -13,16 +13,22 @@ pub struct WorkerPool where BQueue: Queueable + Clone + Sync + Send + 'static, { - #[builder(setter(into))] - pub number_of_workers: u32, + /// the AsyncWorkerPool uses a queue to control the tasks that will be executed. #[builder(setter(into))] pub queue: BQueue, - #[builder(setter(into), default)] - pub task_type: String, + /// sleep_params controls how much time a worker will sleep while waiting for tasks + /// execute. #[builder(setter(into), default)] pub sleep_params: SleepParams, + /// retention_mode controls if tasks should be persisted after execution #[builder(setter(into), default)] pub retention_mode: RetentionMode, + /// the number of workers of the AsyncWorkerPool. + #[builder(setter(into))] + pub number_of_workers: u32, + /// The type of tasks that will be executed by `AsyncWorkerPool`. + #[builder(setter(into), default)] + pub task_type: String, } #[derive(Clone, TypedBuilder)] @@ -46,6 +52,8 @@ impl WorkerPool where BQueue: Queueable + Clone + Sync + Send + 'static, { + /// Starts the configured number of workers + /// This is necessary in order to execute tasks. pub fn start(&mut self) -> Result<(), FangError> { for idx in 1..self.number_of_workers + 1 { let name = format!("worker_{}{}", self.task_type, idx);