Use transaction in all queries (#33)

* Use transaction in all queries

* fix clippy
This commit is contained in:
Ayrat Badykov 2022-07-20 13:01:33 +03:00 committed by GitHub
parent 3602097fb6
commit 8f6f905fe9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -135,45 +135,97 @@ where
Ok(task)
}
pub async fn get_row(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Row, AsyncQueueError> {
let connection = self.pool.get().await?;
let row = connection.query_one(query, params).await?;
Ok(row)
}
pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<u64, AsyncQueueError> {
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
Self::insert_task_query(&mut transaction, task).await
let result = Self::insert_task_query(&mut transaction, task).await?;
transaction.commit().await?;
Ok(result)
}
pub async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
self.execute(REMOVE_ALL_TASK_QUERY, &[], None).await
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_all_tasks_query(&mut transaction).await?;
transaction.commit().await?;
Ok(result)
}
pub async fn remove_task(&mut self, task: &Task) -> Result<u64, AsyncQueueError> {
self.execute(REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_task_query(&mut transaction, task).await?;
transaction.commit().await?;
Ok(result)
}
pub async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
self.execute(REMOVE_TASKS_TYPE_QUERY, &[&task_type], None)
.await
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
transaction.commit().await?;
Ok(result)
}
pub async fn fail_task(&mut self, task: &Task) -> Result<u64, AsyncQueueError> {
pub async fn fail_task(
&mut self,
task: &Task,
error_message: &str,
) -> Result<u64, AsyncQueueError> {
let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::fail_task_query(&mut transaction, task, error_message).await?;
transaction.commit().await?;
Ok(result)
}
pub async fn remove_all_tasks_query(
transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await
}
pub async fn remove_task_query(
transaction: &mut Transaction<'_>,
task: &Task,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
}
pub async fn remove_tasks_type_query(
transaction: &mut Transaction<'_>,
task_type: &str,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
}
pub async fn fail_task_query(
transaction: &mut Transaction<'_>,
task: &Task,
error_message: &str,
) -> Result<u64, AsyncQueueError> {
let updated_at = Utc::now();
self.execute(
Self::execute_query(
transaction,
FAIL_TASK_QUERY,
&[
&FangTaskState::Failed,
&task.error_message,
&error_message,
&updated_at,
&task.id,
],
@ -182,43 +234,25 @@ where
.await
}
async fn execute(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
expected_result_count: Option<u64>,
) -> Result<u64, AsyncQueueError> {
let connection = self.pool.get().await?;
let result = connection.execute(query, params).await?;
if let Some(expected_result) = expected_result_count {
if result != expected_result {
return Err(AsyncQueueError::ResultError {
expected: expected_result,
found: result,
});
}
}
Ok(result)
}
pub async fn fetch_and_touch_task_query(
transaction: &mut Transaction<'_>,
task_type: &Option<String>,
) -> Result<Task, AsyncQueueError> {
let mut task = match task_type {
None => Self::get_task_type(transaction, DEFAULT_TASK_TYPE).await?,
Some(task_type_str) => Self::get_task_type(transaction, task_type_str).await?,
let task_type = match task_type {
Some(passed_task_type) => passed_task_type,
None => DEFAULT_TASK_TYPE,
};
Self::update_task_state(transaction, &task, FangTaskState::InProgress).await?;
let mut task = Self::get_task_type_query(transaction, task_type).await?;
Self::update_task_state_query(transaction, &task, FangTaskState::InProgress).await?;
task.state = FangTaskState::InProgress;
Ok(task)
}
pub async fn get_task_type(
pub async fn get_task_type_query(
transaction: &mut Transaction<'_>,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
@ -231,7 +265,7 @@ where
Ok(task)
}
pub async fn update_task_state(
pub async fn update_task_state_query(
transaction: &mut Transaction<'_>,
task: &Task,
state: FangTaskState,
@ -346,24 +380,31 @@ mod async_queue_tests {
transaction.rollback().await.unwrap();
}
// #[tokio::test]
// async fn remove_all_tasks_test() {
// let pool = pool().await;
// let mut connection = pool.get().await.unwrap();
// let transaction = connection.transaction().await.unwrap();
// let mut queue = AsyncQueue::<NoTls>::new_with_transaction(transaction);
#[tokio::test]
async fn remove_all_tasks_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
// let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap();
// assert_eq!(1, result);
let result =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
// let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap();
// assert_eq!(1, result);
let result =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
.await
.unwrap();
assert_eq!(1, result);
// let result = queue.remove_all_tasks().await.unwrap();
// assert_eq!(2, result);
let result = AsyncQueue::<NoTls>::remove_all_tasks_query(&mut transaction)
.await
.unwrap();
assert_eq!(2, result);
// queue.rollback().await.unwrap();
// }
transaction.rollback().await.unwrap();
}
#[tokio::test]
async fn fetch_and_touch_test() {
@ -405,24 +446,37 @@ mod async_queue_tests {
transaction.rollback().await.unwrap();
}
// #[tokio::test]
// async fn remove_tasks_type_test() {
// let pool = pool().await;
// let mut connection = pool.get().await.unwrap();
// let transaction = connection.transaction().await.unwrap();
// let mut queue = AsyncQueue::<NoTls>::new_with_transaction(transaction);
// let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap();
// assert_eq!(1, result);
#[tokio::test]
async fn remove_tasks_type_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let mut transaction = connection.transaction().await.unwrap();
// let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap();
// assert_eq!(1, result);
let result =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
// let result = queue.remove_tasks_type("common").await.unwrap();
// assert_eq!(2, result);
let result =
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
.await
.unwrap();
assert_eq!(1, result);
// queue.rollback().await.unwrap();
// }
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(&mut transaction, "mytype")
.await
.unwrap();
assert_eq!(0, result);
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(&mut transaction, "common")
.await
.unwrap();
assert_eq!(2, result);
transaction.rollback().await.unwrap();
}
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
let pg_mgr = PostgresConnectionManager::new_from_stringlike(