From 8f6f905fe93c8e3c35b3ff0722c789568cf8cd9d Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Wed, 20 Jul 2022 13:01:33 +0300 Subject: [PATCH] Use transaction in all queries (#33) * Use transaction in all queries * fix clippy --- src/asynk/async_queue.rs | 202 +++++++++++++++++++++++++-------------- 1 file changed, 128 insertions(+), 74 deletions(-) diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 5543c67..e5f70f7 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -135,45 +135,97 @@ where Ok(task) } - pub async fn get_row( - &mut self, - query: &str, - params: &[&(dyn ToSql + Sync)], - ) -> Result { - let connection = self.pool.get().await?; - - let row = connection.query_one(query, params).await?; - - Ok(row) - } - pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; - Self::insert_task_query(&mut transaction, task).await + let result = Self::insert_task_query(&mut transaction, task).await?; + + transaction.commit().await?; + + Ok(result) } pub async fn remove_all_tasks(&mut self) -> Result { - self.execute(REMOVE_ALL_TASK_QUERY, &[], None).await + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_all_tasks_query(&mut transaction).await?; + + transaction.commit().await?; + + Ok(result) } pub async fn remove_task(&mut self, task: &Task) -> Result { - self.execute(REMOVE_TASK_QUERY, &[&task.id], Some(1)).await + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_task_query(&mut transaction, task).await?; + + transaction.commit().await?; + + Ok(result) } pub async fn remove_tasks_type(&mut self, task_type: &str) -> Result { - self.execute(REMOVE_TASKS_TYPE_QUERY, &[&task_type], None) - .await + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?; + + transaction.commit().await?; + + Ok(result) } - pub async fn fail_task(&mut self, task: &Task) -> Result { + pub async fn fail_task( + &mut self, + task: &Task, + error_message: &str, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::fail_task_query(&mut transaction, task, error_message).await?; + transaction.commit().await?; + + Ok(result) + } + + pub async fn remove_all_tasks_query( + transaction: &mut Transaction<'_>, + ) -> Result { + Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await + } + + pub async fn remove_task_query( + transaction: &mut Transaction<'_>, + task: &Task, + ) -> Result { + Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await + } + + pub async fn remove_tasks_type_query( + transaction: &mut Transaction<'_>, + task_type: &str, + ) -> Result { + Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await + } + + pub async fn fail_task_query( + transaction: &mut Transaction<'_>, + task: &Task, + error_message: &str, + ) -> Result { let updated_at = Utc::now(); - self.execute( + + Self::execute_query( + transaction, FAIL_TASK_QUERY, &[ &FangTaskState::Failed, - &task.error_message, + &error_message, &updated_at, &task.id, ], @@ -182,43 +234,25 @@ where .await } - async fn execute( - &mut self, - query: &str, - params: &[&(dyn ToSql + Sync)], - expected_result_count: Option, - ) -> Result { - let connection = self.pool.get().await?; - - let result = connection.execute(query, params).await?; - if let Some(expected_result) = expected_result_count { - if result != expected_result { - return Err(AsyncQueueError::ResultError { - expected: expected_result, - found: result, - }); - } - } - Ok(result) - } - pub async fn fetch_and_touch_task_query( transaction: &mut Transaction<'_>, task_type: &Option, ) -> Result { - let mut task = match task_type { - None => Self::get_task_type(transaction, DEFAULT_TASK_TYPE).await?, - Some(task_type_str) => Self::get_task_type(transaction, task_type_str).await?, + let task_type = match task_type { + Some(passed_task_type) => passed_task_type, + None => DEFAULT_TASK_TYPE, }; - Self::update_task_state(transaction, &task, FangTaskState::InProgress).await?; + let mut task = Self::get_task_type_query(transaction, task_type).await?; + + Self::update_task_state_query(transaction, &task, FangTaskState::InProgress).await?; task.state = FangTaskState::InProgress; Ok(task) } - pub async fn get_task_type( + pub async fn get_task_type_query( transaction: &mut Transaction<'_>, task_type: &str, ) -> Result { @@ -231,7 +265,7 @@ where Ok(task) } - pub async fn update_task_state( + pub async fn update_task_state_query( transaction: &mut Transaction<'_>, task: &Task, state: FangTaskState, @@ -346,24 +380,31 @@ mod async_queue_tests { transaction.rollback().await.unwrap(); } - // #[tokio::test] - // async fn remove_all_tasks_test() { - // let pool = pool().await; - // let mut connection = pool.get().await.unwrap(); - // let transaction = connection.transaction().await.unwrap(); - // let mut queue = AsyncQueue::::new_with_transaction(transaction); + #[tokio::test] + async fn remove_all_tasks_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let mut transaction = connection.transaction().await.unwrap(); - // let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); - // assert_eq!(1, result); + let result = + AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) + .await + .unwrap(); + assert_eq!(1, result); - // let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap(); - // assert_eq!(1, result); + let result = + AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) + .await + .unwrap(); + assert_eq!(1, result); - // let result = queue.remove_all_tasks().await.unwrap(); - // assert_eq!(2, result); + let result = AsyncQueue::::remove_all_tasks_query(&mut transaction) + .await + .unwrap(); + assert_eq!(2, result); - // queue.rollback().await.unwrap(); - // } + transaction.rollback().await.unwrap(); + } #[tokio::test] async fn fetch_and_touch_test() { @@ -405,24 +446,37 @@ mod async_queue_tests { transaction.rollback().await.unwrap(); } - // #[tokio::test] - // async fn remove_tasks_type_test() { - // let pool = pool().await; - // let mut connection = pool.get().await.unwrap(); - // let transaction = connection.transaction().await.unwrap(); - // let mut queue = AsyncQueue::::new_with_transaction(transaction); - // let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); - // assert_eq!(1, result); + #[tokio::test] + async fn remove_tasks_type_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let mut transaction = connection.transaction().await.unwrap(); - // let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap(); - // assert_eq!(1, result); + let result = + AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) + .await + .unwrap(); + assert_eq!(1, result); - // let result = queue.remove_tasks_type("common").await.unwrap(); - // assert_eq!(2, result); + let result = + AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) + .await + .unwrap(); + assert_eq!(1, result); - // queue.rollback().await.unwrap(); - // } + let result = AsyncQueue::::remove_tasks_type_query(&mut transaction, "mytype") + .await + .unwrap(); + assert_eq!(0, result); + + let result = AsyncQueue::::remove_tasks_type_query(&mut transaction, "common") + .await + .unwrap(); + assert_eq!(2, result); + + transaction.rollback().await.unwrap(); + } async fn pool() -> Pool> { let pg_mgr = PostgresConnectionManager::new_from_stringlike(