Returning Insert query (#34)

* insert query return task

* deleting schema
This commit is contained in:
Pmarquez 2022-07-20 14:27:01 +00:00 committed by GitHub
parent 8f6f905fe9
commit f56792a558
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 28 deletions

View file

@ -135,15 +135,15 @@ where
Ok(task)
}
pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<u64, AsyncQueueError> {
pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::insert_task_query(&mut transaction, task).await?;
let task = Self::insert_task_query(&mut transaction, task).await?;
transaction.commit().await?;
Ok(result)
Ok(task)
}
pub async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
@ -284,17 +284,15 @@ where
pub async fn insert_task_query(
transaction: &mut Transaction<'_>,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError> {
) -> Result<Task, AsyncQueueError> {
let metadata = serde_json::to_value(task).unwrap();
let task_type = task.task_type();
Self::execute_query(
transaction,
INSERT_TASK_QUERY,
&[&metadata, &task_type],
Some(1),
)
.await
let row: Row = transaction
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
.await?;
let task = Self::row_to_task(row);
Ok(task)
}
pub async fn execute_query(
@ -371,12 +369,16 @@ mod async_queue_tests {
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
let result =
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(1, result);
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
transaction.rollback().await.unwrap();
}
@ -386,17 +388,28 @@ mod async_queue_tests {
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
let result =
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
let result =
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
.await
.unwrap();
assert_eq!(1, result);
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let result = AsyncQueue::<NoTls>::remove_all_tasks_query(&mut transaction)
.await
@ -412,17 +425,27 @@ mod async_queue_tests {
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
let result =
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let result =
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
.await
.unwrap();
assert_eq!(1, result);
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(&mut transaction, &None)
.await
@ -453,17 +476,27 @@ mod async_queue_tests {
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
let result =
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let result =
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
.await
.unwrap();
assert_eq!(1, result);
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncTask"), type_task);
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(&mut transaction, "mytype")
.await

View file

@ -1 +0,0 @@
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2)

View file

@ -1 +0,0 @@
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at" , "period_in_seconds") VALUES ($1, $2 , $3)

View file

@ -1 +1 @@
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2)
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , metadata , error_message , task_type , created_at , updated_at