From c1fcc878859af4e4395c702a575136f39880ea82 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Tue, 7 Mar 2023 16:41:20 +0100 Subject: [PATCH] Revamp project using newtype and rework tasks table --- .env | 2 +- Cargo.toml | 8 +- README.md | 2 +- examples/simple_async_worker/src/lib.rs | 3 +- .../down.sql | 1 - .../up.sql | 21 - .../down.sql | 2 + .../up.sql | 29 ++ src/errors.rs | 26 +- src/fang_task_state.rs | 18 - src/lib.rs | 55 +-- src/queries.rs | 203 ++++---- src/queue.rs | 453 ++++++++---------- src/runnable.rs | 42 +- src/task.rs | 126 ++++- src/worker.rs | 284 ++++++----- src/worker_pool.rs | 23 +- 17 files changed, 653 insertions(+), 645 deletions(-) delete mode 100644 migrations/2022-08-20-151615_create_fang_tasks/down.sql delete mode 100644 migrations/2022-08-20-151615_create_fang_tasks/up.sql create mode 100644 migrations/2023-03-06-151907_create_backie_tasks/down.sql create mode 100644 migrations/2023-03-06-151907_create_backie_tasks/up.sql delete mode 100644 src/fang_task_state.rs diff --git a/.env b/.env index b725b6b..48b8ac4 100644 --- a/.env +++ b/.env @@ -1 +1 @@ -DATABASE_URL=postgres://postgres:password@localhost/fang +DATABASE_URL=postgres://postgres:password@localhost/backie diff --git a/Cargo.toml b/Cargo.toml index 3b32c5b..8630409 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "frango" +name = "backie" version = "0.1.0" authors = [ "Rafael Caricio ", ] description = "Async background job processing library with Diesel and Tokio" -repository = "https://code.caric.io/rafaelcaricio/frango" +repository = "https://code.caric.io/rafaelcaricio/backie" edition = "2021" license = "MIT" readme = "README.md" @@ -26,10 +26,10 @@ sha2 = "0.10" thiserror = "1.0" typed-builder = "0.12" typetag = "0.2" -uuid = { version = "1.1", features = ["v4"] } +uuid = { version = "1.1", features = ["v4", "serde"] } async-trait = "0.1" async-recursion = "1" diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } -diesel-derive-enum = { version = "2.0.1", features = ["postgres"] } +diesel-derive-newtype = "2.0.0-rc.0" diesel-async = { version = "0.2", features = ["postgres", "bb8"] } tokio = { version = "1.25", features = ["rt", "time", "macros"] } diff --git a/README.md b/README.md index 7df808c..82fc145 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style] -# Frango +# Backie Background task processing library for Rust. It uses Postgres DB as a task queue. diff --git a/examples/simple_async_worker/src/lib.rs b/examples/simple_async_worker/src/lib.rs index bf793f0..bbac6d5 100644 --- a/examples/simple_async_worker/src/lib.rs +++ b/examples/simple_async_worker/src/lib.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use fang::queue::AsyncQueueable; use fang::runnable::AsyncRunnable; use fang::errors::FrangoError; @@ -30,7 +31,7 @@ impl MyFailingTask { #[async_trait] #[typetag::serde] impl AsyncRunnable for MyTask { - async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Infallible> { // let new_task = MyTask::new(self.number + 1); // queue // .insert_task(&new_task as &dyn AsyncRunnable) diff --git a/migrations/2022-08-20-151615_create_fang_tasks/down.sql b/migrations/2022-08-20-151615_create_fang_tasks/down.sql deleted file mode 100644 index 3cd3345..0000000 --- a/migrations/2022-08-20-151615_create_fang_tasks/down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE fang_tasks; diff --git a/migrations/2022-08-20-151615_create_fang_tasks/up.sql b/migrations/2022-08-20-151615_create_fang_tasks/up.sql deleted file mode 100644 index cd4b354..0000000 --- a/migrations/2022-08-20-151615_create_fang_tasks/up.sql +++ /dev/null @@ -1,21 +0,0 @@ -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - -CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished', 'retried'); - -CREATE TABLE fang_tasks ( - id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), - metadata jsonb NOT NULL, - error_message TEXT, - state fang_task_state DEFAULT 'new' NOT NULL, - task_type VARCHAR DEFAULT 'common' NOT NULL, - uniq_hash CHAR(64), - retries INTEGER DEFAULT 0 NOT NULL, - scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() -); - -CREATE INDEX fang_tasks_state_index ON fang_tasks(state); -CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); -CREATE INDEX fang_tasks_scheduled_at_index ON fang_tasks(scheduled_at); -CREATE INDEX fang_tasks_uniq_hash ON fang_tasks(uniq_hash); diff --git a/migrations/2023-03-06-151907_create_backie_tasks/down.sql b/migrations/2023-03-06-151907_create_backie_tasks/down.sql new file mode 100644 index 0000000..a44cba5 --- /dev/null +++ b/migrations/2023-03-06-151907_create_backie_tasks/down.sql @@ -0,0 +1,2 @@ +DROP TABLE backie_tasks; +DROP FUNCTION backie_notify_new_tasks; diff --git a/migrations/2023-03-06-151907_create_backie_tasks/up.sql b/migrations/2023-03-06-151907_create_backie_tasks/up.sql new file mode 100644 index 0000000..1baaac5 --- /dev/null +++ b/migrations/2023-03-06-151907_create_backie_tasks/up.sql @@ -0,0 +1,29 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +CREATE TABLE backie_tasks ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + payload jsonb NOT NULL, + error_message TEXT DEFAULT NULL, + task_type VARCHAR DEFAULT 'common' NOT NULL, + uniq_hash CHAR(64) DEFAULT NULL, + retries INTEGER DEFAULT 0 NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + running_at TIMESTAMP WITH TIME ZONE DEFAULT NULL, + done_at TIMESTAMP WITH TIME ZONE DEFAULT NULL +); + +CREATE INDEX backie_tasks_type_index ON backie_tasks(task_type); +CREATE INDEX backie_tasks_created_at_index ON backie_tasks(created_at); +CREATE INDEX backie_tasks_uniq_hash ON backie_tasks(uniq_hash); + +--- create uniqueness index +CREATE UNIQUE INDEX backie_tasks_uniq_hash_index ON backie_tasks(uniq_hash) WHERE uniq_hash IS NOT NULL; + +CREATE FUNCTION backie_notify_new_tasks() returns trigger as $$ +BEGIN + perform pg_notify('backie::tasks', 'created'); + return new; +END; +$$ language plpgsql; + +CREATE TRIGGER backie_notify_workers after insert on backie_tasks for each statement execute procedure backie_notify_new_tasks(); diff --git a/src/errors.rs b/src/errors.rs index 3e2cdac..b9b1fdb 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,23 +1,30 @@ +use std::fmt::Display; use serde_json::Error as SerdeError; use thiserror::Error; -/// An error that can happen during executing of tasks -#[derive(Debug)] -pub struct FrangoError { +/// Library errors +#[derive(Debug, Clone, Error)] +pub struct BackieError { /// A description of an error pub description: String, } -impl From for FrangoError { +impl Display for BackieError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + +impl From for BackieError { fn from(error: AsyncQueueError) -> Self { let message = format!("{error:?}"); - FrangoError { + BackieError { description: message, } } } -impl From for FrangoError { +impl From for BackieError { fn from(error: SerdeError) -> Self { Self::from(AsyncQueueError::SerdeError(error)) } @@ -41,12 +48,15 @@ pub enum CronError { pub enum AsyncQueueError { #[error(transparent)] PgError(#[from] diesel::result::Error), + #[error(transparent)] SerdeError(#[from] serde_json::Error), + #[error(transparent)] CronError(#[from] CronError), - #[error("Can not perform this operation if task is not uniq, please check its definition in impl AsyncRunnable")] - TaskNotUniqError, + + #[error("Task is not in progress, operation not allowed")] + TaskNotRunning, } impl From for AsyncQueueError { diff --git a/src/fang_task_state.rs b/src/fang_task_state.rs deleted file mode 100644 index f851724..0000000 --- a/src/fang_task_state.rs +++ /dev/null @@ -1,18 +0,0 @@ -/// Possible states of the task -#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] -#[ExistingTypePath = "crate::schema::sql_types::FangTaskState"] -pub enum FangTaskState { - /// The task is ready to be executed - New, - /// The task is being executing. - /// - /// The task may stay in this state forever - /// if an unexpected error happened - InProgress, - /// The task failed - Failed, - /// The task finished successfully - Finished, - /// The task is being retried. It means it failed but it's scheduled to be executed again - Retried, -} diff --git a/src/lib.rs b/src/lib.rs index 2604a2e..d1cb7e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,6 @@ #![doc = include_str!("../README.md")] use chrono::{DateTime, Utc}; -use std::time::Duration; -use typed_builder::TypedBuilder; /// Represents a schedule for scheduled tasks. /// @@ -22,66 +20,25 @@ pub enum Scheduled { /// All possible options for retaining tasks in the db after their execution. /// /// The default mode is [`RetentionMode::RemoveAll`] -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum RetentionMode { /// Keep all tasks KeepAll, - /// Remove all tasks + + /// Remove all finished tasks independently of their final execution state. RemoveAll, + /// Remove only successfully finished tasks - RemoveFinished, + RemoveDone, } impl Default for RetentionMode { fn default() -> Self { - RetentionMode::RemoveAll - } -} - -/// Configuration parameters for putting workers to sleep -/// while they don't have any tasks to execute -#[derive(Clone, Debug, TypedBuilder)] -pub struct SleepParams { - /// the current sleep period - pub sleep_period: Duration, - /// the maximum period a worker is allowed to sleep. - /// After this value is reached, `sleep_period` is not increased anymore - pub max_sleep_period: Duration, - /// the initial value of the `sleep_period` - pub min_sleep_period: Duration, - /// the step that `sleep_period` is increased by on every iteration - pub sleep_step: Duration, -} - -impl SleepParams { - /// Reset the `sleep_period` if `sleep_period` > `min_sleep_period` - pub fn maybe_reset_sleep_period(&mut self) { - if self.sleep_period != self.min_sleep_period { - self.sleep_period = self.min_sleep_period; - } - } - - /// Increase the `sleep_period` by the `sleep_step` if the `max_sleep_period` is not reached - pub fn maybe_increase_sleep_period(&mut self) { - if self.sleep_period < self.max_sleep_period { - self.sleep_period += self.sleep_step; - } - } -} - -impl Default for SleepParams { - fn default() -> Self { - SleepParams { - sleep_period: Duration::from_secs(5), - max_sleep_period: Duration::from_secs(15), - min_sleep_period: Duration::from_secs(5), - sleep_step: Duration::from_secs(5), - } + Self::RemoveAll } } pub mod errors; -pub mod fang_task_state; mod queries; pub mod queue; pub mod runnable; diff --git a/src/queries.rs b/src/queries.rs index fe94ccb..fa211d1 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,105 +1,99 @@ use crate::errors::AsyncQueueError; -use crate::fang_task_state::FangTaskState; -use crate::runnable::AsyncRunnable; -use crate::schema::fang_tasks; -use crate::task::NewTask; -use crate::task::{Task, DEFAULT_TASK_TYPE}; +use crate::runnable::RunnableTask; +use crate::schema::backie_tasks; +use crate::task::{NewTask, TaskId, TaskType, TaskHash}; +use crate::task::Task; use chrono::DateTime; use chrono::Duration; use chrono::Utc; use diesel::prelude::*; use diesel::ExpressionMethods; use diesel_async::{pg::AsyncPgConnection, RunQueryDsl}; -use sha2::{Digest, Sha256}; -use uuid::Uuid; impl Task { - pub async fn remove_all( + pub(crate) async fn remove_all( connection: &mut AsyncPgConnection, ) -> Result { - Ok(diesel::delete(fang_tasks::table) + Ok(diesel::delete(backie_tasks::table) .execute(connection) .await? as u64) } - pub async fn remove_all_scheduled( + pub(crate) async fn remove_all_scheduled( connection: &mut AsyncPgConnection, ) -> Result { - let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now())); + let query = backie_tasks::table.filter(backie_tasks::running_at.is_null()); Ok(diesel::delete(query).execute(connection).await? as u64) } - pub async fn remove( + pub(crate) async fn remove( connection: &mut AsyncPgConnection, - id: Uuid, + id: TaskId, ) -> Result { - let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); + let query = backie_tasks::table.filter(backie_tasks::id.eq(id)); Ok(diesel::delete(query).execute(connection).await? as u64) } - pub async fn remove_by_metadata( + pub(crate) async fn remove_by_hash( connection: &mut AsyncPgConnection, - task: &dyn AsyncRunnable, + task_hash: TaskHash, + ) -> Result { + let query = backie_tasks::table.filter(backie_tasks::uniq_hash.eq(task_hash)); + let qty = diesel::delete(query).execute(connection).await?; + Ok(qty > 0) + } + + pub(crate) async fn remove_by_type( + connection: &mut AsyncPgConnection, + task_type: TaskType, ) -> Result { - let metadata = serde_json::to_value(task)?; - - let uniq_hash = Self::calculate_hash(metadata.to_string()); - - let query = fang_tasks::table.filter(fang_tasks::uniq_hash.eq(uniq_hash)); - + let query = backie_tasks::table.filter(backie_tasks::task_type.eq(task_type)); Ok(diesel::delete(query).execute(connection).await? as u64) } - pub async fn remove_by_type( + pub(crate) async fn find_by_id( connection: &mut AsyncPgConnection, - task_type: &str, - ) -> Result { - let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type)); - Ok(diesel::delete(query).execute(connection).await? as u64) - } - - pub async fn find_by_id( - connection: &mut AsyncPgConnection, - id: Uuid, + id: TaskId, ) -> Result { - let task = fang_tasks::table - .filter(fang_tasks::id.eq(id)) + let task = backie_tasks::table + .filter(backie_tasks::id.eq(id)) .first::(connection) .await?; Ok(task) } - pub async fn fail_with_message( + pub(crate) async fn fail_with_message( connection: &mut AsyncPgConnection, - task: Task, + id: TaskId, error_message: &str, ) -> Result { - Ok(diesel::update(&task) + let query = backie_tasks::table.filter(backie_tasks::id.eq(id)); + Ok(diesel::update(query) .set(( - fang_tasks::state.eq(FangTaskState::Failed), - fang_tasks::error_message.eq(error_message), - fang_tasks::updated_at.eq(Utc::now()), + backie_tasks::error_message.eq(error_message), + backie_tasks::done_at.eq(Utc::now()), )) .get_result::(connection) .await?) } - pub async fn schedule_retry( + pub(crate) async fn schedule_retry( connection: &mut AsyncPgConnection, - task: &Task, + id: TaskId, backoff_seconds: u32, error: &str, ) -> Result { + use crate::schema::backie_tasks::dsl; + let now = Utc::now(); let scheduled_at = now + Duration::seconds(backoff_seconds as i64); - let task = diesel::update(task) + let task = diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id))) .set(( - fang_tasks::state.eq(FangTaskState::Retried), - fang_tasks::error_message.eq(error), - fang_tasks::retries.eq(task.retries + 1), - fang_tasks::scheduled_at.eq(scheduled_at), - fang_tasks::updated_at.eq(now), + backie_tasks::error_message.eq(error), + backie_tasks::retries.eq(dsl::retries + 1), + backie_tasks::created_at.eq(scheduled_at), + backie_tasks::running_at.eq::>>(None), )) .get_result::(connection) .await?; @@ -107,20 +101,17 @@ impl Task { Ok(task) } - pub async fn fetch_by_type( + pub(crate) async fn fetch_next_pending( connection: &mut AsyncPgConnection, - task_type: Option, + task_type: TaskType, ) -> Option { - fang_tasks::table - .order(fang_tasks::created_at.asc()) - .order(fang_tasks::scheduled_at.asc()) + backie_tasks::table + .filter(backie_tasks::created_at.lt(Utc::now())) // skip tasks scheduled for the future + .order(backie_tasks::created_at.asc()) // get the oldest task first + .filter(backie_tasks::running_at.is_null()) // that is not marked as running already + .filter(backie_tasks::done_at.is_null()) // and not marked as done + .filter(backie_tasks::task_type.eq(task_type)) .limit(1) - .filter(fang_tasks::scheduled_at.le(Utc::now())) - .filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried])) - .filter( - fang_tasks::task_type - .eq(task_type.unwrap_or_else(|| DEFAULT_TASK_TYPE.to_string())), - ) .for_update() .skip_locked() .get_result::(connection) @@ -128,76 +119,74 @@ impl Task { .ok() } - pub async fn update_state( + pub(crate) async fn set_running( connection: &mut AsyncPgConnection, task: Task, - state: FangTaskState, ) -> Result { - let updated_at = Utc::now(); Ok(diesel::update(&task) .set(( - fang_tasks::state.eq(state), - fang_tasks::updated_at.eq(updated_at), + backie_tasks::running_at.eq(Utc::now()), )) .get_result::(connection) .await?) } - pub async fn insert( + pub(crate) async fn set_done( connection: &mut AsyncPgConnection, - params: &dyn AsyncRunnable, - scheduled_at: DateTime, + id: TaskId, ) -> Result { - if !params.uniq() { - let new_task = NewTask::builder() - .scheduled_at(scheduled_at) - .uniq_hash(None) - .task_type(params.task_type()) - .metadata(serde_json::to_value(params).unwrap()) - .build(); + Ok(diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id))) + .set(( + backie_tasks::done_at.eq(Utc::now()), + )) + .get_result::(connection) + .await?) + } - Ok(diesel::insert_into(fang_tasks::table) - .values(new_task) - .get_result::(connection) - .await?) - } else { - let metadata = serde_json::to_value(params).unwrap(); + pub(crate) async fn insert( + connection: &mut AsyncPgConnection, + runnable: &dyn RunnableTask + ) -> Result { + let payload = serde_json::to_value(runnable)?; + match runnable.uniq() { + None => { + let new_task = NewTask::builder() + .uniq_hash(None) + .task_type(runnable.task_type()) + .payload(payload) + .build(); - let uniq_hash = Self::calculate_hash(metadata.to_string()); + Ok(diesel::insert_into(backie_tasks::table) + .values(new_task) + .get_result::(connection) + .await?) + } + Some(hash) => { + match Self::find_by_uniq_hash(connection, hash.clone()).await { + Some(task) => Ok(task), + None => { + let new_task = NewTask::builder() + .uniq_hash(Some(hash)) + .task_type(runnable.task_type()) + .payload(payload) + .build(); - match Self::find_by_uniq_hash(connection, &uniq_hash).await { - Some(task) => Ok(task), - None => { - let new_task = NewTask::builder() - .scheduled_at(scheduled_at) - .uniq_hash(Some(uniq_hash)) - .task_type(params.task_type()) - .metadata(serde_json::to_value(params).unwrap()) - .build(); - - Ok(diesel::insert_into(fang_tasks::table) - .values(new_task) - .get_result::(connection) - .await?) + Ok(diesel::insert_into(backie_tasks::table) + .values(new_task) + .get_result::(connection) + .await?) + } } } } } - fn calculate_hash(json: String) -> String { - let mut hasher = Sha256::new(); - hasher.update(json.as_bytes()); - let result = hasher.finalize(); - hex::encode(result) - } - - pub async fn find_by_uniq_hash( + pub(crate) async fn find_by_uniq_hash( connection: &mut AsyncPgConnection, - uniq_hash: &str, + hash: TaskHash, ) -> Option { - fang_tasks::table - .filter(fang_tasks::uniq_hash.eq(uniq_hash)) - .filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried])) + backie_tasks::table + .filter(backie_tasks::uniq_hash.eq(hash)) .first::(connection) .await .ok() diff --git a/src/queue.rs b/src/queue.rs index de9d775..a2ce673 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,37 +1,41 @@ use crate::errors::AsyncQueueError; -use crate::errors::CronError; -use crate::fang_task_state::FangTaskState; -use crate::runnable::AsyncRunnable; -use crate::task::Task; -use crate::Scheduled::*; +use crate::runnable::RunnableTask; +use crate::task::{Task, TaskId, TaskType, TaskHash}; use async_trait::async_trait; -use chrono::Utc; -use cron::Schedule; use diesel::result::Error::QueryBuilderError; use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::AsyncConnection; use diesel_async::{pg::AsyncPgConnection, pooled_connection::bb8::Pool}; -use std::str::FromStr; -use typed_builder::TypedBuilder; -use uuid::Uuid; /// This trait defines operations for an asynchronous queue. /// The trait can be implemented for different storage backends. /// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future. #[async_trait] -pub trait AsyncQueueable: Send { - /// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to - /// fetch a task of the type `common`. After fetching it should update the state of the task to - /// `FangTaskState::InProgress`. +pub trait Queueable: Send { + /// Pull pending tasks from the queue to execute them. /// - async fn fetch_and_touch_task( - &mut self, - task_type: Option, - ) -> Result, AsyncQueueError>; + /// This method returns one task of the `task_type` type. If `task_type` is `None` it will try to + /// fetch a task of the type `common`. The returned task is marked as running and must be executed. + async fn pull_next_task(&mut self, kind: Option) -> Result, AsyncQueueError>; /// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type /// created by an AsyncWorkerPool. - async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result; + async fn create_task(&mut self, task: &dyn RunnableTask) -> Result; + + /// Retrieve a task by its `id`. + async fn find_task_by_id(&mut self, id: TaskId) -> Result; + + /// Update the state of a task to failed and set an error_message. + async fn set_task_failed(&mut self, id: TaskId, error_message: &str) -> Result; + + /// Update the state of a task to done. + async fn set_task_done(&mut self, id: TaskId) -> Result; + + /// Update the state of a task to inform that it's still in progress. + async fn keep_task_alive(&mut self, id: TaskId) -> Result<(), AsyncQueueError>; + + /// Remove a task by its id. + async fn remove_task(&mut self, id: TaskId) -> Result; /// The method will remove all tasks from the queue async fn remove_all_tasks(&mut self) -> Result; @@ -39,39 +43,15 @@ pub trait AsyncQueueable: Send { /// Remove all tasks that are scheduled in the future. async fn remove_all_scheduled_tasks(&mut self) -> Result; - /// Remove a task by its id. - async fn remove_task(&mut self, id: Uuid) -> Result; - /// Remove a task by its metadata (struct fields values) - async fn remove_task_by_metadata( - &mut self, - task: &dyn AsyncRunnable, - ) -> Result; + async fn remove_task_by_hash(&mut self, task_hash: TaskHash) -> Result; /// Removes all tasks that have the specified `task_type`. - async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + async fn remove_tasks_type(&mut self, task_type: TaskType) -> Result; - /// Retrieve a task from storage by its `id`. - async fn find_task_by_id(&mut self, id: Uuid) -> Result; - - /// Update the state field of the specified task - /// See the `FangTaskState` enum for possible states. - async fn update_task_state( + async fn schedule_task_retry( &mut self, - task: Task, - state: FangTaskState, - ) -> Result; - - /// Update the state of a task to `FangTaskState::Failed` and set an error_message. - async fn fail_task(&mut self, task: Task, error_message: &str) - -> Result; - - /// Schedule a task. - async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; - - async fn schedule_retry( - &mut self, - task: &Task, + id: TaskId, backoff_seconds: u32, error: &str, ) -> Result; @@ -89,25 +69,22 @@ pub trait AsyncQueueable: Send { /// .build(); /// ``` /// -#[derive(TypedBuilder, Debug, Clone)] +#[derive(Debug, Clone)] pub struct PgAsyncQueue { pool: Pool, } -#[async_trait] -impl AsyncQueueable for PgAsyncQueue { - async fn find_task_by_id(&mut self, id: Uuid) -> Result { - let mut connection = self - .pool - .get() - .await - .map_err(|e| QueryBuilderError(e.into()))?; - Task::find_by_id(&mut connection, id).await +impl PgAsyncQueue { + pub fn new(pool: Pool) -> Self { + PgAsyncQueue { pool } } +} - async fn fetch_and_touch_task( +#[async_trait] +impl Queueable for PgAsyncQueue { + async fn pull_next_task( &mut self, - task_type: Option, + task_type: Option, ) -> Result, AsyncQueueError> { let mut connection = self .pool @@ -117,55 +94,81 @@ impl AsyncQueueable for PgAsyncQueue { connection .transaction::, AsyncQueueError, _>(|conn| { async move { - let Some(found_task) = Task::fetch_by_type(conn, task_type).await else { + let Some(pending_task) = Task::fetch_next_pending(conn, task_type.unwrap_or_default()).await else { return Ok(None); }; - match Task::update_state(conn, found_task, FangTaskState::InProgress).await - { - Ok(updated_task) => Ok(Some(updated_task)), - Err(err) => Err(err), - } + Task::set_running(conn, pending_task).await.map(|running_task| Some(running_task)) } .scope_boxed() }) .await } - async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { + async fn create_task(&mut self, runnable: &dyn RunnableTask) -> Result { let mut connection = self .pool .get() .await .map_err(|e| QueryBuilderError(e.into()))?; - Ok(Task::insert(&mut connection, task, Utc::now()).await?) + Ok(Task::insert(&mut connection, runnable).await?) } - async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { + async fn find_task_by_id(&mut self, id: TaskId) -> Result { let mut connection = self .pool .get() .await .map_err(|e| QueryBuilderError(e.into()))?; - let scheduled_at = match task.cron() { - Some(scheduled) => match scheduled { - CronPattern(cron_pattern) => { - let schedule = Schedule::from_str(&cron_pattern)?; - let mut iterator = schedule.upcoming(Utc); - iterator - .next() - .ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))? - } - ScheduleOnce(datetime) => datetime, - }, - None => { - return Err(AsyncQueueError::CronError( - CronError::TaskNotSchedulableError, - )); - } - }; + Task::find_by_id(&mut connection, id).await + } - Ok(Task::insert(&mut connection, task, scheduled_at).await?) + async fn set_task_failed( + &mut self, + id: TaskId, + error_message: &str, + ) -> Result { + let mut connection = self + .pool + .get() + .await + .map_err(|e| QueryBuilderError(e.into()))?; + Task::fail_with_message(&mut connection, id, error_message).await + } + + async fn set_task_done(&mut self, id: TaskId) -> Result { + let mut connection = self + .pool + .get() + .await + .map_err(|e| QueryBuilderError(e.into()))?; + Task::set_done(&mut connection, id).await + } + + async fn keep_task_alive(&mut self, id: TaskId) -> Result<(), AsyncQueueError> { + let mut connection = self + .pool + .get() + .await + .map_err(|e| QueryBuilderError(e.into()))?; + connection + .transaction::<(), AsyncQueueError, _>(|conn| { + async move { + let task = Task::find_by_id(conn, id).await?; + Task::set_running(conn, task).await?; + Ok(()) + }.scope_boxed() + }).await + } + + async fn remove_task(&mut self, id: TaskId) -> Result { + let mut connection = self + .pool + .get() + .await + .map_err(|e| QueryBuilderError(e.into()))?; + let result = Task::remove(&mut connection, id).await?; + Ok(result) } async fn remove_all_tasks(&mut self) -> Result { @@ -187,34 +190,16 @@ impl AsyncQueueable for PgAsyncQueue { Ok(result) } - async fn remove_task(&mut self, id: Uuid) -> Result { + async fn remove_task_by_hash(&mut self, task_hash: TaskHash) -> Result { let mut connection = self .pool .get() .await .map_err(|e| QueryBuilderError(e.into()))?; - let result = Task::remove(&mut connection, id).await?; - Ok(result) + Task::remove_by_hash(&mut connection, task_hash).await } - async fn remove_task_by_metadata( - &mut self, - task: &dyn AsyncRunnable, - ) -> Result { - if task.uniq() { - let mut connection = self - .pool - .get() - .await - .map_err(|e| QueryBuilderError(e.into()))?; - let result = Task::remove_by_metadata(&mut connection, task).await?; - Ok(result) - } else { - Err(AsyncQueueError::TaskNotUniqError) - } - } - - async fn remove_tasks_type(&mut self, task_type: &str) -> Result { + async fn remove_tasks_type(&mut self, task_type: TaskType) -> Result { let mut connection = self .pool .get() @@ -224,37 +209,9 @@ impl AsyncQueueable for PgAsyncQueue { Ok(result) } - async fn update_task_state( + async fn schedule_task_retry( &mut self, - task: Task, - state: FangTaskState, - ) -> Result { - let mut connection = self - .pool - .get() - .await - .map_err(|e| QueryBuilderError(e.into()))?; - let task = Task::update_state(&mut connection, task, state).await?; - Ok(task) - } - - async fn fail_task( - &mut self, - task: Task, - error_message: &str, - ) -> Result { - let mut connection = self - .pool - .get() - .await - .map_err(|e| QueryBuilderError(e.into()))?; - let task = Task::fail_with_message(&mut connection, task, error_message).await?; - Ok(task) - } - - async fn schedule_retry( - &mut self, - task: &Task, + id: TaskId, backoff_seconds: u32, error: &str, ) -> Result { @@ -263,7 +220,7 @@ impl AsyncQueueable for PgAsyncQueue { .get() .await .map_err(|e| QueryBuilderError(e.into()))?; - let task = Task::schedule_retry(&mut connection, task, backoff_seconds, error).await?; + let task = Task::schedule_retry(&mut connection, id, backoff_seconds, error).await?; Ok(task) } } @@ -271,16 +228,14 @@ impl AsyncQueueable for PgAsyncQueue { #[cfg(test)] mod async_queue_tests { use super::*; - use crate::errors::FrangoError; use crate::Scheduled; use async_trait::async_trait; - use chrono::prelude::*; use chrono::DateTime; - use chrono::Duration; use chrono::Utc; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; use serde::{Deserialize, Serialize}; + use crate::task::TaskState; #[derive(Serialize, Deserialize)] struct AsyncTask { @@ -289,8 +244,8 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncTask { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } } @@ -302,13 +257,13 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncUniqTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncUniqTask { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } - fn uniq(&self) -> bool { - true + fn uniq(&self) -> Option { + TaskHash::default_for_task(self).ok() } } @@ -320,8 +275,8 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } @@ -334,11 +289,11 @@ mod async_queue_tests { #[tokio::test] async fn insert_task_creates_new_task() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -351,11 +306,11 @@ mod async_queue_tests { #[tokio::test] async fn update_task_state_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); let id = task.id; @@ -363,13 +318,10 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let finished_task = test - .update_task_state(task, FangTaskState::Finished) - .await - .unwrap(); + let finished_task = test.set_task_done(task.id).await.unwrap(); assert_eq!(id, finished_task.id); - assert_eq!(FangTaskState::Finished, finished_task.state); + assert_eq!(TaskState::Done, finished_task.state()); test.remove_all_tasks().await.unwrap(); } @@ -377,11 +329,11 @@ mod async_queue_tests { #[tokio::test] async fn failed_task_query_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); let id = task.id; @@ -389,11 +341,11 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let failed_task = test.fail_task(task, "Some error").await.unwrap(); + let failed_task = test.set_task_failed(task.id, "Some error").await.unwrap(); assert_eq!(id, failed_task.id); assert_eq!(Some("Some error"), failed_task.error_message.as_deref()); - assert_eq!(FangTaskState::Failed, failed_task.state); + assert_eq!(TaskState::Failed, failed_task.state()); test.remove_all_tasks().await.unwrap(); } @@ -401,20 +353,20 @@ mod async_queue_tests { #[tokio::test] async fn remove_all_tasks_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool.into()).build(); + let mut test = PgAsyncQueue::new(pool.into()); - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = insert_task(&mut test, &AsyncTask { number: 2 }).await; + let task = test.create_task(&AsyncTask { number: 2 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -425,92 +377,92 @@ mod async_queue_tests { assert_eq!(2, result); } - #[tokio::test] - async fn schedule_task_test() { - let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); - - let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); - - let task = &AsyncTaskSchedule { - number: 1, - datetime: datetime.to_string(), - }; - - let task = test.schedule_task(task).await.unwrap(); - - let metadata = task.metadata.as_object().unwrap(); - let number = metadata["number"].as_u64(); - let type_task = metadata["type"].as_str(); - - assert_eq!(Some(1), number); - assert_eq!(Some("AsyncTaskSchedule"), type_task); - assert_eq!(task.scheduled_at, datetime); - - test.remove_all_tasks().await.unwrap(); - } + // #[tokio::test] + // async fn schedule_task_test() { + // let pool = pool().await; + // let mut test = PgAsyncQueue::new(pool); + // + // let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + // + // let task = &AsyncTaskSchedule { + // number: 1, + // datetime: datetime.to_string(), + // }; + // + // let task = test.schedule_task(task).await.unwrap(); + // + // let metadata = task.payload.as_object().unwrap(); + // let number = metadata["number"].as_u64(); + // let type_task = metadata["type"].as_str(); + // + // assert_eq!(Some(1), number); + // assert_eq!(Some("AsyncTaskSchedule"), type_task); + // assert_eq!(task.scheduled_at, datetime); + // + // test.remove_all_tasks().await.unwrap(); + // } + // + // #[tokio::test] + // async fn remove_all_scheduled_tasks_test() { + // let pool = pool().await; + // let mut test = PgAsyncQueue::new(pool); + // + // let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + // + // let task1 = &AsyncTaskSchedule { + // number: 1, + // datetime: datetime.to_string(), + // }; + // + // let task2 = &AsyncTaskSchedule { + // number: 2, + // datetime: datetime.to_string(), + // }; + // + // test.schedule_task(task1).await.unwrap(); + // test.schedule_task(task2).await.unwrap(); + // + // let number = test.remove_all_scheduled_tasks().await.unwrap(); + // + // assert_eq!(2, number); + // + // test.remove_all_tasks().await.unwrap(); + // } #[tokio::test] - async fn remove_all_scheduled_tasks_test() { + async fn pull_next_task_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let task1 = &AsyncTaskSchedule { - number: 1, - datetime: datetime.to_string(), - }; - - let task2 = &AsyncTaskSchedule { - number: 2, - datetime: datetime.to_string(), - }; - - test.schedule_task(task1).await.unwrap(); - test.schedule_task(task2).await.unwrap(); - - let number = test.remove_all_scheduled_tasks().await.unwrap(); - - assert_eq!(2, number); - - test.remove_all_tasks().await.unwrap(); - } - - #[tokio::test] - async fn fetch_and_touch_test() { - let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); - - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; - - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = insert_task(&mut test, &AsyncTask { number: 2 }).await; + let task = test.create_task(&AsyncTask { number: 2 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(2), number); assert_eq!(Some("AsyncTask"), type_task); - let task = test.fetch_and_touch_task(None).await.unwrap().unwrap(); + let task = test.pull_next_task(None).await.unwrap().unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = test.fetch_and_touch_task(None).await.unwrap().unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let task = test.pull_next_task(None).await.unwrap().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -523,30 +475,30 @@ mod async_queue_tests { #[tokio::test] async fn remove_tasks_type_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; + let task = test.create_task(&AsyncTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = insert_task(&mut test, &AsyncTask { number: 2 }).await; + let task = test.create_task(&AsyncTask { number: 2 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(2), number); assert_eq!(Some("AsyncTask"), type_task); - let result = test.remove_tasks_type("mytype").await.unwrap(); + let result = test.remove_tasks_type(TaskType::from("nonexistentType")).await.unwrap(); assert_eq!(0, result); - let result = test.remove_tasks_type("common").await.unwrap(); + let result = test.remove_tasks_type(TaskType::default()).await.unwrap(); assert_eq!(2, result); test.remove_all_tasks().await.unwrap(); @@ -555,48 +507,43 @@ mod async_queue_tests { #[tokio::test] async fn remove_tasks_by_metadata() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); - let task = insert_task(&mut test, &AsyncUniqTask { number: 1 }).await; + let task = test.create_task(&AsyncUniqTask { number: 1 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(1), number); assert_eq!(Some("AsyncUniqTask"), type_task); - let task = insert_task(&mut test, &AsyncUniqTask { number: 2 }).await; + let task = test.create_task(&AsyncUniqTask { number: 2 }).await.unwrap(); - let metadata = task.metadata.as_object().unwrap(); + let metadata = task.payload.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); assert_eq!(Some(2), number); assert_eq!(Some("AsyncUniqTask"), type_task); - let result = test - .remove_task_by_metadata(&AsyncUniqTask { number: 0 }) + let result = test.remove_task_by_hash(AsyncUniqTask { number: 0 }.uniq().unwrap()) .await .unwrap(); - assert_eq!(0, result); + assert!(!result, "Should **not** remove task"); let result = test - .remove_task_by_metadata(&AsyncUniqTask { number: 1 }) + .remove_task_by_hash(AsyncUniqTask { number: 1 }.uniq().unwrap()) .await .unwrap(); - assert_eq!(1, result); + assert!(result, "Should remove task"); test.remove_all_tasks().await.unwrap(); } - async fn insert_task(test: &mut PgAsyncQueue, task: &dyn AsyncRunnable) -> Task { - test.insert_task(task).await.unwrap() - } - async fn pool() -> Pool { let manager = AsyncDieselConnectionManager::::new( - "postgres://postgres:password@localhost/fang", + "postgres://postgres:password@localhost/backie", ); Pool::builder() .max_size(1) diff --git a/src/runnable.rs b/src/runnable.rs index 23fedee..9972eaa 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -1,46 +1,46 @@ -use crate::errors::FrangoError; -use crate::queue::AsyncQueueable; +use std::error::Error; +use crate::queue::Queueable; use crate::Scheduled; use async_trait::async_trait; +use crate::task::{TaskType}; +use crate::task::TaskHash; -const COMMON_TYPE: &str = "common"; pub const RETRIES_NUMBER: i32 = 20; -/// Implement this trait to run your custom tasks. + +/// Task that can be executed by the queue. +/// +/// The `RunnableTask` trait is used to define the behaviour of a task. You must implement this +/// trait for all tasks you want to execute. #[typetag::serde(tag = "type")] #[async_trait] -pub trait AsyncRunnable: Send + Sync { +pub trait RunnableTask: Send + Sync { /// Execute the task. This method should define its logic - async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FrangoError>; + async fn run(&self, queue: &mut dyn Queueable) -> Result<(), Box>; /// Define the type of the task. /// The `common` task type is used by default - fn task_type(&self) -> String { - COMMON_TYPE.to_string() + fn task_type(&self) -> TaskType { + TaskType::default() } /// If set to true, no new tasks with the same metadata will be inserted /// By default it is set to false. - fn uniq(&self) -> bool { - false + fn uniq(&self) -> Option { + None } /// This method defines if a task is periodic or it should be executed once in the future. /// /// Be careful it works only with the UTC timezone. /// - /// /// Example: /// - /// - /** - ```rust - fn cron(&self) -> Option { - let expression = "0/20 * * * Aug-Sep * 2022/1"; - Some(Scheduled::CronPattern(expression.to_string())) - } - ``` - */ - + /// ```rust + /// fn cron(&self) -> Option { + /// let expression = "0/20 * * * Aug-Sep * 2022/1"; + /// Some(Scheduled::CronPattern(expression.to_string())) + /// } + ///``` /// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant. fn cron(&self) -> Option { None diff --git a/src/task.rs b/src/task.rs index 8277bdf..eb43586 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,47 +1,135 @@ -use crate::fang_task_state::FangTaskState; -use crate::schema::fang_tasks; +use std::borrow::Cow; +use std::fmt; +use std::fmt::Display; +use crate::schema::backie_tasks; use chrono::DateTime; use chrono::Utc; use diesel::prelude::*; use typed_builder::TypedBuilder; use uuid::Uuid; +use serde::Serialize; +use diesel_derive_newtype::DieselNewType; +use sha2::{Digest, Sha256}; -pub const DEFAULT_TASK_TYPE: &str = "common"; +/// States of a task. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum TaskState { + /// The task is ready to be executed. + Ready, + + /// The task is running. + Running, + + /// The task has failed to execute. + Failed, + + /// The task finished successfully. + Done, +} + +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, DieselNewType, Serialize)] +pub struct TaskId(Uuid); + +impl Display for TaskId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, DieselNewType, Serialize)] +pub struct TaskType(Cow<'static, str>); + +impl Default for TaskType { + fn default() -> Self { + Self(Cow::from("default")) + } +} + +impl From for TaskType +where + S: AsRef + 'static, +{ + fn from(s: S) -> Self { + TaskType(Cow::from(s.as_ref().to_owned())) + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, DieselNewType, Serialize)] +pub struct TaskHash(Cow<'static, str>); + +impl TaskHash { + pub fn default_for_task(value: &T) -> Result where T: Serialize { + let value = serde_json::to_value(value)?; + let mut hasher = Sha256::new(); + hasher.update(serde_json::to_string(&value)?.as_bytes()); + let result = hasher.finalize(); + Ok(TaskHash(Cow::from(hex::encode(result)))) + } +} #[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone, TypedBuilder)] -#[diesel(table_name = fang_tasks)] +#[diesel(table_name = backie_tasks)] pub struct Task { #[builder(setter(into))] - pub id: Uuid, + pub id: TaskId, + #[builder(setter(into))] - pub metadata: serde_json::Value, + pub payload: serde_json::Value, + #[builder(setter(into))] pub error_message: Option, + #[builder(setter(into))] - pub state: FangTaskState, + pub task_type: TaskType, + #[builder(setter(into))] - pub task_type: String, - #[builder(setter(into))] - pub uniq_hash: Option, + pub uniq_hash: Option, + #[builder(setter(into))] pub retries: i32, - #[builder(setter(into))] - pub scheduled_at: DateTime, + #[builder(setter(into))] pub created_at: DateTime, + #[builder(setter(into))] - pub updated_at: DateTime, + pub running_at: Option>, + + #[builder(setter(into))] + pub done_at: Option>, +} + +impl Task { + pub fn state(&self) -> TaskState { + if self.done_at.is_some() { + if self.error_message.is_some() { + TaskState::Failed + } else { + TaskState::Done + } + } else if self.running_at.is_some() { + TaskState::Running + } else { + TaskState::Ready + } + } } #[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)] -#[diesel(table_name = fang_tasks)] +#[diesel(table_name = backie_tasks)] pub struct NewTask { #[builder(setter(into))] - metadata: serde_json::Value, + payload: serde_json::Value, + #[builder(setter(into))] - task_type: String, + task_type: TaskType, + #[builder(setter(into))] - uniq_hash: Option, - #[builder(setter(into))] - scheduled_at: DateTime, + uniq_hash: Option, +} + +pub struct TaskInfo { + id: TaskId, + error_message: Option, + retries: i32, + created_at: DateTime, } diff --git a/src/worker.rs b/src/worker.rs index 8eca0f0..bc25493 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,81 +1,101 @@ -use crate::errors::FrangoError; -use crate::fang_task_state::FangTaskState; -use crate::queue::AsyncQueueable; -use crate::runnable::AsyncRunnable; -use crate::task::Task; -use crate::task::DEFAULT_TASK_TYPE; +use std::error::Error; +use crate::errors::BackieError; +use crate::queue::Queueable; +use crate::runnable::RunnableTask; +use crate::task::{Task, TaskType}; use crate::Scheduled::*; -use crate::{RetentionMode, SleepParams}; +use crate::{RetentionMode}; use log::error; use typed_builder::TypedBuilder; /// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue #[derive(TypedBuilder)] -pub struct AsyncWorker +pub struct AsyncWorker where - AQueue: AsyncQueueable + Clone + Sync + 'static, + Q: Queueable + Clone + Sync + 'static, { #[builder(setter(into))] - pub queue: AQueue, - #[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))] - pub task_type: String, + pub queue: Q, + #[builder(default, setter(into))] - pub sleep_params: SleepParams, + pub task_type: Option, + #[builder(default, setter(into))] pub retention_mode: RetentionMode, } -impl AsyncWorker +// impl AsyncWorkerBuilder +// where +// TypedBuilderFields: Clone, +// Q: Queueable + Clone + Sync + 'static, +// { +// pub fn with_graceful_shutdown(self, signal: F) -> Self +// where +// F: Future, +// { +// self +// } +// } + +impl AsyncWorker where - AQueue: AsyncQueueable + Clone + Sync + 'static, + Q: Queueable + Clone + Sync + 'static, { - async fn run(&mut self, task: Task, runnable: Box) -> Result<(), FrangoError> { + async fn run(&mut self, task: Task, runnable: Box) -> Result<(), BackieError> { + // TODO: catch panics let result = runnable.run(&mut self.queue).await; - match result { - Ok(_) => self.finalize_task(task, &result).await?, - - Err(ref error) => { + Ok(_) => self.finalize_task(task, result).await?, + Err(error) => { if task.retries < runnable.max_retries() { let backoff_seconds = runnable.backoff(task.retries as u32); + log::debug!( + "Task {} failed to run and will be retried in {} seconds", + task.id, + backoff_seconds + ); + let error_message = format!("{}", error); self.queue - .schedule_retry(&task, backoff_seconds, &error.description) + .schedule_task_retry(task.id, backoff_seconds, &error_message) .await?; } else { - self.finalize_task(task, &result).await?; + log::debug!("Task {} failed and reached the maximum retries", task.id); + self.finalize_task(task, Err(error)).await?; } } } - Ok(()) } async fn finalize_task( &mut self, task: Task, - result: &Result<(), FrangoError>, - ) -> Result<(), FrangoError> { + result: Result<(), Box>, + ) -> Result<(), BackieError> { match self.retention_mode { RetentionMode::KeepAll => match result { Ok(_) => { - self.queue - .update_task_state(task, FangTaskState::Finished) - .await?; + self.queue.set_task_done(task.id).await?; + log::debug!("Task {} done and kept in the database", task.id); } Err(error) => { - self.queue.fail_task(task, &error.description).await?; + log::debug!("Task {} failed and kept in the database", task.id); + self.queue.set_task_failed(task.id, &format!("{}", error)).await?; } }, RetentionMode::RemoveAll => { + log::debug!("Task {} finalized and deleted from the database", task.id); self.queue.remove_task(task.id).await?; } - RetentionMode::RemoveFinished => match result { + RetentionMode::RemoveDone => match result { Ok(_) => { + log::debug!("Task {} done and deleted from the database", task.id); self.queue.remove_task(task.id).await?; } Err(error) => { - self.queue.fail_task(task, &error.description).await?; + log::debug!("Task {} failed and kept in the database", task.id); + self.queue.set_task_failed(task.id, &format!("{}", error)).await?; } }, }; @@ -83,64 +103,68 @@ where Ok(()) } - async fn sleep(&mut self) { - self.sleep_params.maybe_increase_sleep_period(); - - tokio::time::sleep(self.sleep_params.sleep_period).await; + async fn wait(&mut self) { + // TODO: add a way to stop the worker + // Listen to postgres pubsub notification + // Listen to watchable future + // All that until a max timeout + // + // select! { + // _ = self.queue.wait_for_task(Some(self.task_type.clone())).fuse() => {}, + // _ = SleepParams::default().sleep().fuse() => {}, + // } } - pub(crate) async fn run_tasks(&mut self) -> Result<(), FrangoError> { + pub(crate) async fn run_tasks(&mut self) { loop { - //fetch task + // TODO: check if should stop the worker match self .queue - .fetch_and_touch_task(Some(self.task_type.clone())) + .pull_next_task(self.task_type.clone()) .await { Ok(Some(task)) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); + let actual_task: Box = serde_json::from_value(task.payload.clone()).unwrap(); // check if task is scheduled or not if let Some(CronPattern(_)) = actual_task.cron() { // program task - self.queue.schedule_task(&*actual_task).await?; + //self.queue.schedule_task(&*actual_task).await?; } - self.sleep_params.maybe_reset_sleep_period(); // run scheduled task - self.run(task, actual_task).await?; + // TODO: what do we do if the task fails? it's an internal error, inform the logs + let _ = self.run(task, actual_task).await; } Ok(None) => { - self.sleep().await; + self.wait().await; } Err(error) => { error!("Failed to fetch a task {:?}", error); - - self.sleep().await; + self.wait().await; } }; } } #[cfg(test)] - pub async fn run_tasks_until_none(&mut self) -> Result<(), FrangoError> { + pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { loop { match self .queue - .fetch_and_touch_task(Some(self.task_type.clone())) + .pull_next_task(self.task_type.clone()) .await { Ok(Some(task)) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); + let actual_task: Box = + serde_json::from_value(task.payload.clone()).unwrap(); // check if task is scheduled or not if let Some(CronPattern(_)) = actual_task.cron() { // program task - self.queue.schedule_task(&*actual_task).await?; + // self.queue.schedule_task(&*actual_task).await?; } - self.sleep_params.maybe_reset_sleep_period(); + self.wait().await; // run scheduled task self.run(task, actual_task).await?; } @@ -150,7 +174,7 @@ where Err(error) => { error!("Failed to fetch a task {:?}", error); - self.sleep().await; + self.wait().await; } }; } @@ -160,8 +184,8 @@ where #[cfg(test)] mod async_worker_tests { use super::*; - use crate::errors::FrangoError; - use crate::queue::AsyncQueueable; + use crate::errors::BackieError; + use crate::queue::Queueable; use crate::queue::PgAsyncQueue; use crate::worker::Task; use crate::RetentionMode; @@ -172,6 +196,7 @@ mod async_worker_tests { use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use diesel_async::AsyncPgConnection; use serde::{Deserialize, Serialize}; + use crate::task::TaskState; #[derive(Serialize, Deserialize)] struct WorkerAsyncTask { @@ -180,8 +205,8 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for WorkerAsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for WorkerAsyncTask { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } } @@ -193,8 +218,8 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for WorkerAsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for WorkerAsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } fn cron(&self) -> Option { @@ -209,13 +234,13 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncFailedTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncFailedTask { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { let message = format!("number {} is wrong :(", self.number); - Err(FrangoError { + Err(Box::new(BackieError { description: message, - }) + })) } fn max_retries(&self) -> i32 { @@ -228,13 +253,13 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncRetryTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncRetryTask { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { let message = "Failed".to_string(); - Err(FrangoError { + Err(Box::new(BackieError { description: message, - }) + })) } fn max_retries(&self) -> i32 { @@ -247,13 +272,13 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncTaskType1 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncTaskType1 { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } - fn task_type(&self) -> String { - "type1".to_string() + fn task_type(&self) -> TaskType { + TaskType::from("type1") } } @@ -262,20 +287,20 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] - impl AsyncRunnable for AsyncTaskType2 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FrangoError> { + impl RunnableTask for AsyncTaskType2 { + async fn run(&self, _queueable: &mut dyn Queueable) -> Result<(), Box<(dyn std::error::Error + Send + 'static)>> { Ok(()) } - fn task_type(&self) -> String { - "type2".to_string() + fn task_type(&self) -> TaskType { + TaskType::from("type2") } } #[tokio::test] async fn execute_and_finishes_task() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); let actual_task = WorkerAsyncTask { number: 1 }; @@ -290,53 +315,53 @@ mod async_worker_tests { worker.run(task, Box::new(actual_task)).await.unwrap(); let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); - assert_eq!(FangTaskState::Finished, task_finished.state); + assert_eq!(TaskState::Done, task_finished.state()); test.remove_all_tasks().await.unwrap(); } - #[tokio::test] - async fn schedule_task_test() { - let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); - - let actual_task = WorkerAsyncTaskSchedule { number: 1 }; - - let task = test.schedule_task(&actual_task).await.unwrap(); - - let id = task.id; - - let mut worker = AsyncWorker::::builder() - .queue(test.clone()) - .retention_mode(RetentionMode::KeepAll) - .build(); - - worker.run_tasks_until_none().await.unwrap(); - - let task = worker.queue.find_task_by_id(id).await.unwrap(); - - assert_eq!(id, task.id); - assert_eq!(FangTaskState::New, task.state); - - tokio::time::sleep(core::time::Duration::from_secs(3)).await; - - worker.run_tasks_until_none().await.unwrap(); - - let task = test.find_task_by_id(id).await.unwrap(); - assert_eq!(id, task.id); - assert_eq!(FangTaskState::Finished, task.state); - - test.remove_all_tasks().await.unwrap(); - } + // #[tokio::test] + // async fn schedule_task_test() { + // let pool = pool().await; + // let mut test = PgAsyncQueue::new(pool); + // + // let actual_task = WorkerAsyncTaskSchedule { number: 1 }; + // + // let task = test.schedule_task(&actual_task).await.unwrap(); + // + // let id = task.id; + // + // let mut worker = AsyncWorker::::builder() + // .queue(test.clone()) + // .retention_mode(RetentionMode::KeepAll) + // .build(); + // + // worker.run_tasks_until_none().await.unwrap(); + // + // let task = worker.queue.find_task_by_id(id).await.unwrap(); + // + // assert_eq!(id, task.id); + // assert_eq!(TaskState::Ready, task.state()); + // + // tokio::time::sleep(core::time::Duration::from_secs(3)).await; + // + // worker.run_tasks_until_none().await.unwrap(); + // + // let task = test.find_task_by_id(id).await.unwrap(); + // assert_eq!(id, task.id); + // assert_eq!(TaskState::Done, task.state()); + // + // test.remove_all_tasks().await.unwrap(); + // } #[tokio::test] async fn retries_task_test() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); let actual_task = AsyncRetryTask {}; - let task = test.insert_task(&actual_task).await.unwrap(); + let task = test.create_task(&actual_task).await.unwrap(); let id = task.id; @@ -350,8 +375,9 @@ mod async_worker_tests { let task = worker.queue.find_task_by_id(id).await.unwrap(); assert_eq!(id, task.id); - assert_eq!(FangTaskState::Retried, task.state); + assert_eq!(TaskState::Ready, task.state()); assert_eq!(1, task.retries); + assert!(task.error_message.is_some()); tokio::time::sleep(core::time::Duration::from_secs(5)).await; worker.run_tasks_until_none().await.unwrap(); @@ -359,7 +385,7 @@ mod async_worker_tests { let task = worker.queue.find_task_by_id(id).await.unwrap(); assert_eq!(id, task.id); - assert_eq!(FangTaskState::Retried, task.state); + assert_eq!(TaskState::Ready, task.state()); assert_eq!(2, task.retries); tokio::time::sleep(core::time::Duration::from_secs(10)).await; @@ -367,7 +393,7 @@ mod async_worker_tests { let task = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task.id); - assert_eq!(FangTaskState::Failed, task.state); + assert_eq!(TaskState::Failed, task.state()); assert_eq!("Failed".to_string(), task.error_message.unwrap()); test.remove_all_tasks().await.unwrap(); @@ -376,7 +402,7 @@ mod async_worker_tests { #[tokio::test] async fn saves_error_for_failed_task() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); let failed_task = AsyncFailedTask { number: 1 }; @@ -392,7 +418,7 @@ mod async_worker_tests { let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); - assert_eq!(FangTaskState::Failed, task_finished.state); + assert_eq!(TaskState::Failed, task_finished.state()); assert_eq!( "number 1 is wrong :(".to_string(), task_finished.error_message.unwrap() @@ -404,7 +430,7 @@ mod async_worker_tests { #[tokio::test] async fn executes_task_only_of_specific_type() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await; let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await; @@ -416,7 +442,7 @@ mod async_worker_tests { let mut worker = AsyncWorker::::builder() .queue(test.clone()) - .task_type("type1".to_string()) + .task_type(TaskType::from("type1")) .retention_mode(RetentionMode::KeepAll) .build(); @@ -428,9 +454,9 @@ mod async_worker_tests { assert_eq!(id1, task1.id); assert_eq!(id12, task12.id); assert_eq!(id2, task2.id); - assert_eq!(FangTaskState::Finished, task1.state); - assert_eq!(FangTaskState::Finished, task12.state); - assert_eq!(FangTaskState::New, task2.state); + assert_eq!(TaskState::Done, task1.state()); + assert_eq!(TaskState::Done, task12.state()); + assert_eq!(TaskState::Ready, task2.state()); test.remove_all_tasks().await.unwrap(); } @@ -438,7 +464,7 @@ mod async_worker_tests { #[tokio::test] async fn remove_when_finished() { let pool = pool().await; - let mut test = PgAsyncQueue::builder().pool(pool).build(); + let mut test = PgAsyncQueue::new(pool); let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await; let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await; @@ -450,18 +476,18 @@ mod async_worker_tests { let mut worker = AsyncWorker::::builder() .queue(test.clone()) - .task_type("type1".to_string()) + .task_type(TaskType::from("type1")) .build(); worker.run_tasks_until_none().await.unwrap(); let task = test - .fetch_and_touch_task(Some("type1".to_string())) + .pull_next_task(Some(TaskType::from("type1"))) .await .unwrap(); assert_eq!(None, task); let task2 = test - .fetch_and_touch_task(Some("type2".to_string())) + .pull_next_task(Some(TaskType::from("type2"))) .await .unwrap() .unwrap(); @@ -470,13 +496,13 @@ mod async_worker_tests { test.remove_all_tasks().await.unwrap(); } - async fn insert_task(test: &mut PgAsyncQueue, task: &dyn AsyncRunnable) -> Task { - test.insert_task(task).await.unwrap() + async fn insert_task(test: &mut PgAsyncQueue, task: &dyn RunnableTask) -> Task { + test.create_task(task).await.unwrap() } async fn pool() -> Pool { let manager = AsyncDieselConnectionManager::::new( - "postgres://postgres:password@localhost/fang", + "postgres://postgres:password@localhost/backie", ); Pool::builder() .max_size(1) diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 6473147..5c705e8 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,7 +1,7 @@ -use crate::queue::AsyncQueueable; -use crate::task::DEFAULT_TASK_TYPE; +use crate::queue::Queueable; +use crate::task::{TaskType}; use crate::worker::AsyncWorker; -use crate::{RetentionMode, SleepParams}; +use crate::{RetentionMode}; use async_recursion::async_recursion; use log::error; use typed_builder::TypedBuilder; @@ -9,28 +9,28 @@ use typed_builder::TypedBuilder; #[derive(TypedBuilder, Clone)] pub struct AsyncWorkerPool where - AQueue: AsyncQueueable + Clone + Sync + 'static, + AQueue: Queueable + Clone + Sync + 'static, { #[builder(setter(into))] /// the AsyncWorkerPool uses a queue to control the tasks that will be executed. pub queue: AQueue, - /// sleep_params controls how much time a worker will sleep while waiting for tasks - #[builder(default, setter(into))] - pub sleep_params: SleepParams, - /// retention_mode controls if tasks should be persisted after execution + + /// retention_mode controls if tasks should be persisted after execution #[builder(default, setter(into))] pub retention_mode: RetentionMode, + /// the number of workers of the AsyncWorkerPool. #[builder(setter(into))] pub number_of_workers: u32, + /// The type of tasks that will be executed by `AsyncWorkerPool`. - #[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))] - pub task_type: String, + #[builder(default=None, setter(into))] + pub task_type: Option, } impl AsyncWorkerPool where - AQueue: AsyncQueueable + Clone + Sync + 'static, + AQueue: Queueable + Clone + Sync + 'static, { /// Starts the configured number of workers /// This is necessary in order to execute tasks. @@ -50,7 +50,6 @@ where let join_handle = tokio::spawn(async move { let mut worker: AsyncWorker = AsyncWorker::builder() .queue(inner_pool.queue.clone()) - .sleep_params(inner_pool.sleep_params.clone()) .retention_mode(inner_pool.retention_mode.clone()) .task_type(inner_pool.task_type.clone()) .build();