From d3805cd562b6bc0d9435a21acbd058d88a1a28f3 Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Wed, 20 Jul 2022 15:26:11 +0000 Subject: [PATCH] Returning fail task (#35) * fail task returning * fix clippy * state check --- src/asynk/async_queue.rs | 65 ++++++++++++++++++++++--------- src/asynk/queries/fail_task.sql | 2 +- src/asynk/queries/insert_task.sql | 2 +- 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index f6d6542..e404272 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -181,16 +181,16 @@ where pub async fn fail_task( &mut self, - task: &Task, + task: Task, error_message: &str, - ) -> Result { + ) -> Result { let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; - let result = Self::fail_task_query(&mut transaction, task, error_message).await?; + let task = Self::fail_task_query(&mut transaction, task, error_message).await?; transaction.commit().await?; - Ok(result) + Ok(task) } pub async fn remove_all_tasks_query( @@ -215,23 +215,24 @@ where pub async fn fail_task_query( transaction: &mut Transaction<'_>, - task: &Task, + task: Task, error_message: &str, - ) -> Result { + ) -> Result { let updated_at = Utc::now(); - Self::execute_query( - transaction, - FAIL_TASK_QUERY, - &[ - &FangTaskState::Failed, - &error_message, - &updated_at, - &task.id, - ], - Some(1), - ) - .await + let row: Row = transaction + .query_one( + FAIL_TASK_QUERY, + &[ + &FangTaskState::Failed, + &error_message, + &updated_at, + &task.id, + ], + ) + .await?; + let failed_task = Self::row_to_task(row); + Ok(failed_task) } pub async fn fetch_and_touch_task_query( @@ -321,7 +322,7 @@ where Ok(error_message) => Some(error_message), Err(_) => None, }; - let state: FangTaskState = FangTaskState::New; + let state: FangTaskState = row.get("state"); let task_type: String = row.get("task_type"); let created_at: DateTime = row.get("created_at"); let updated_at: DateTime = row.get("updated_at"); @@ -341,6 +342,7 @@ where #[cfg(test)] mod async_queue_tests { use super::AsyncQueue; + use super::FangTaskState; use crate::asynk::AsyncRunnable; use crate::asynk::Error; use async_trait::async_trait; @@ -381,6 +383,31 @@ mod async_queue_tests { assert_eq!(Some("AsyncTask"), type_task); transaction.rollback().await.unwrap(); } + #[tokio::test] + async fn failed_task_query_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let mut transaction = connection.transaction().await.unwrap(); + + let task = + AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) + .await + .unwrap(); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + let id = task.id; + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + let failed_task = + AsyncQueue::::fail_task_query(&mut transaction, task, "Some error") + .await + .unwrap(); + assert_eq!(id, failed_task.id); + assert_eq!(Some("Some error"), failed_task.error_message.as_deref()); + assert_eq!(FangTaskState::Failed, failed_task.state); + transaction.rollback().await.unwrap(); + } #[tokio::test] async fn remove_all_tasks_test() { diff --git a/src/asynk/queries/fail_task.sql b/src/asynk/queries/fail_task.sql index 91f7f46..416d91f 100644 --- a/src/asynk/queries/fail_task.sql +++ b/src/asynk/queries/fail_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 +UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at diff --git a/src/asynk/queries/insert_task.sql b/src/asynk/queries/insert_task.sql index c76a4b8..b6ec160 100644 --- a/src/asynk/queries/insert_task.sql +++ b/src/asynk/queries/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , metadata , error_message , task_type , created_at , updated_at +INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at