Returning fail task (#35)

* fail task returning

* fix clippy

* state check
This commit is contained in:
Pmarquez 2022-07-20 15:26:11 +00:00 committed by GitHub
parent f56792a558
commit d3805cd562
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 21 deletions

View file

@ -181,16 +181,16 @@ where
pub async fn fail_task(
&mut self,
task: &Task,
task: Task,
error_message: &str,
) -> Result<u64, AsyncQueueError> {
) -> Result<Task, AsyncQueueError> {
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::fail_task_query(&mut transaction, task, error_message).await?;
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
transaction.commit().await?;
Ok(result)
Ok(task)
}
pub async fn remove_all_tasks_query(
@ -215,23 +215,24 @@ where
pub async fn fail_task_query(
transaction: &mut Transaction<'_>,
task: &Task,
task: Task,
error_message: &str,
) -> Result<u64, AsyncQueueError> {
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now();
Self::execute_query(
transaction,
FAIL_TASK_QUERY,
&[
&FangTaskState::Failed,
&error_message,
&updated_at,
&task.id,
],
Some(1),
)
.await
let row: Row = transaction
.query_one(
FAIL_TASK_QUERY,
&[
&FangTaskState::Failed,
&error_message,
&updated_at,
&task.id,
],
)
.await?;
let failed_task = Self::row_to_task(row);
Ok(failed_task)
}
pub async fn fetch_and_touch_task_query(
@ -321,7 +322,7 @@ where
Ok(error_message) => Some(error_message),
Err(_) => None,
};
let state: FangTaskState = FangTaskState::New;
let state: FangTaskState = row.get("state");
let task_type: String = row.get("task_type");
let created_at: DateTime<Utc> = row.get("created_at");
let updated_at: DateTime<Utc> = row.get("updated_at");
@ -341,6 +342,7 @@ where
#[cfg(test)]
mod async_queue_tests {
use super::AsyncQueue;
use super::FangTaskState;
use crate::asynk::AsyncRunnable;
use crate::asynk::Error;
use async_trait::async_trait;
@ -381,6 +383,31 @@ mod async_queue_tests {
assert_eq!(Some("AsyncTask"), type_task);
transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn failed_task_query_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
let task =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
let id = task.id;
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncTask"), type_task);
let failed_task =
AsyncQueue::<NoTls>::fail_task_query(&mut transaction, task, "Some error")
.await
.unwrap();
assert_eq!(id, failed_task.id);
assert_eq!(Some("Some error"), failed_task.error_message.as_deref());
assert_eq!(FangTaskState::Failed, failed_task.state);
transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn remove_all_tasks_test() {

View file

@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at

View file

@ -1 +1 @@
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , metadata , error_message , task_type , created_at , updated_at
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at