From f56792a5580aa58216348ac8afbe72fd7fbea8e8 Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Wed, 20 Jul 2022 14:27:01 +0000 Subject: [PATCH] Returning Insert query (#34) * insert query return task * deleting schema --- src/asynk/async_queue.rs | 83 ++++++++++++++++------- src/asynk/queries/insert_job.sql | 1 - src/asynk/queries/insert_periodic_job.sql | 1 - src/asynk/queries/insert_task.sql | 2 +- 4 files changed, 59 insertions(+), 28 deletions(-) delete mode 100644 src/asynk/queries/insert_job.sql delete mode 100644 src/asynk/queries/insert_periodic_job.sql diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index e5f70f7..f6d6542 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -135,15 +135,15 @@ where Ok(task) } - pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { + pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; - let result = Self::insert_task_query(&mut transaction, task).await?; + let task = Self::insert_task_query(&mut transaction, task).await?; transaction.commit().await?; - Ok(result) + Ok(task) } pub async fn remove_all_tasks(&mut self) -> Result { @@ -284,17 +284,15 @@ where pub async fn insert_task_query( transaction: &mut Transaction<'_>, task: &dyn AsyncRunnable, - ) -> Result { + ) -> Result { let metadata = serde_json::to_value(task).unwrap(); let task_type = task.task_type(); - Self::execute_query( - transaction, - INSERT_TASK_QUERY, - &[&metadata, &task_type], - Some(1), - ) - .await + let row: Row = transaction + .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type]) + .await?; + let task = Self::row_to_task(row); + Ok(task) } pub async fn execute_query( @@ -371,12 +369,16 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let result = + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) .await .unwrap(); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); - assert_eq!(1, result); + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); transaction.rollback().await.unwrap(); } @@ -386,17 +388,28 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let result = + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) .await .unwrap(); - assert_eq!(1, result); - let result = + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) .await .unwrap(); - assert_eq!(1, result); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(2), number); + assert_eq!(Some("AsyncTask"), type_task); let result = AsyncQueue::::remove_all_tasks_query(&mut transaction) .await @@ -412,17 +425,27 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let result = + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) .await .unwrap(); - assert_eq!(1, result); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); - let result = + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) .await .unwrap(); - assert_eq!(1, result); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(2), number); + assert_eq!(Some("AsyncTask"), type_task); let task = AsyncQueue::::fetch_and_touch_task_query(&mut transaction, &None) .await @@ -453,17 +476,27 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let result = + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) .await .unwrap(); - assert_eq!(1, result); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); - let result = + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + + let task = AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) .await .unwrap(); - assert_eq!(1, result); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(2), number); + assert_eq!(Some("AsyncTask"), type_task); let result = AsyncQueue::::remove_tasks_type_query(&mut transaction, "mytype") .await diff --git a/src/asynk/queries/insert_job.sql b/src/asynk/queries/insert_job.sql deleted file mode 100644 index 2bef71d..0000000 --- a/src/asynk/queries/insert_job.sql +++ /dev/null @@ -1 +0,0 @@ -INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) diff --git a/src/asynk/queries/insert_periodic_job.sql b/src/asynk/queries/insert_periodic_job.sql deleted file mode 100644 index 881eab1..0000000 --- a/src/asynk/queries/insert_periodic_job.sql +++ /dev/null @@ -1 +0,0 @@ -INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at" , "period_in_seconds") VALUES ($1, $2 , $3) diff --git a/src/asynk/queries/insert_task.sql b/src/asynk/queries/insert_task.sql index 2bef71d..c76a4b8 100644 --- a/src/asynk/queries/insert_task.sql +++ b/src/asynk/queries/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) +INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , metadata , error_message , task_type , created_at , updated_at