fang_tasks queries done (#24)
* fang_tasks queries done * queries with transaction rollback * deleting unused imports * some tests * deleting schema , deleting feature section , updated_at corrected * rollback and commit methods and execute modified * transaction empty message * fix cargo clippy
This commit is contained in:
parent
e2481144dc
commit
a60eb08fa6
7 changed files with 153 additions and 54 deletions
|
@ -22,7 +22,7 @@ typetag = "0.2"
|
|||
log = "0.4"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
thiserror = "1.0"
|
||||
bb8-postgres = {version = "0.8", features = ["with-serde_json-1"]}
|
||||
bb8-postgres = {version = "0.8", features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"]}
|
||||
tokio = { version = "1.20", features = ["full"] }
|
||||
async-trait = "0.1"
|
||||
typed-builder = "0.10"
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::asynk::AsyncRunnable;
|
||||
use crate::Task;
|
||||
use bb8_postgres::bb8::Pool;
|
||||
use bb8_postgres::bb8::RunError;
|
||||
use bb8_postgres::tokio_postgres::tls::MakeTlsConnect;
|
||||
|
@ -7,6 +8,7 @@ use bb8_postgres::tokio_postgres::types::ToSql;
|
|||
use bb8_postgres::tokio_postgres::Socket;
|
||||
use bb8_postgres::tokio_postgres::Transaction;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::Utc;
|
||||
use thiserror::Error;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
|
@ -20,6 +22,8 @@ pub enum AsyncQueueError {
|
|||
ResultError { expected: u64, found: u64 },
|
||||
#[error("Queue doesn't have a connection")]
|
||||
PoolAndTransactionEmpty,
|
||||
#[error("Need to create a transaction to perform this operation")]
|
||||
TransactionEmpty,
|
||||
}
|
||||
|
||||
#[derive(TypedBuilder)]
|
||||
|
@ -37,6 +41,11 @@ where
|
|||
}
|
||||
|
||||
const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql");
|
||||
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
|
||||
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
|
||||
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
|
||||
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
|
||||
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
|
||||
|
||||
impl<'a, Tls> AsyncQueue<'a, Tls>
|
||||
where
|
||||
|
@ -52,19 +61,73 @@ where
|
|||
pub fn new_with_transaction(transaction: Transaction<'a>) -> Self {
|
||||
AsyncQueue::builder().transaction(transaction).build()
|
||||
}
|
||||
|
||||
pub async fn rollback(mut self) -> Result<AsyncQueue<'a, Tls>, AsyncQueueError> {
|
||||
let transaction = self.transaction;
|
||||
self.transaction = None;
|
||||
match transaction {
|
||||
Some(tr) => {
|
||||
tr.rollback().await?;
|
||||
Ok(self)
|
||||
}
|
||||
None => Err(AsyncQueueError::TransactionEmpty),
|
||||
}
|
||||
}
|
||||
pub async fn commit(mut self) -> Result<AsyncQueue<'a, Tls>, AsyncQueueError> {
|
||||
let transaction = self.transaction;
|
||||
self.transaction = None;
|
||||
match transaction {
|
||||
Some(tr) => {
|
||||
tr.commit().await?;
|
||||
Ok(self)
|
||||
}
|
||||
None => Err(AsyncQueueError::TransactionEmpty),
|
||||
}
|
||||
}
|
||||
pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<u64, AsyncQueueError> {
|
||||
let json_task = serde_json::to_value(task).unwrap();
|
||||
let metadata = serde_json::to_value(task).unwrap();
|
||||
let task_type = task.task_type();
|
||||
|
||||
self.execute_one(INSERT_TASK_QUERY, &[&json_task, &task_type])
|
||||
self.execute(INSERT_TASK_QUERY, &[&metadata, &task_type], Some(1))
|
||||
.await
|
||||
}
|
||||
pub async fn update_task_state(
|
||||
&mut self,
|
||||
task: &Task,
|
||||
state: &str,
|
||||
) -> Result<u64, AsyncQueueError> {
|
||||
let updated_at = Utc::now();
|
||||
self.execute(
|
||||
UPDATE_TASK_STATE_QUERY,
|
||||
&[&state, &updated_at, &task.id],
|
||||
Some(1),
|
||||
)
|
||||
.await
|
||||
}
|
||||
pub async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||
self.execute(REMOVE_ALL_TASK_QUERY, &[], None).await
|
||||
}
|
||||
pub async fn remove_task(&mut self, task: &Task) -> Result<u64, AsyncQueueError> {
|
||||
self.execute(REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
|
||||
}
|
||||
pub async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
||||
self.execute(REMOVE_TASKS_TYPE_QUERY, &[&task_type], None)
|
||||
.await
|
||||
}
|
||||
pub async fn fail_task(&mut self, task: &Task) -> Result<u64, AsyncQueueError> {
|
||||
let updated_at = Utc::now();
|
||||
self.execute(
|
||||
FAIL_TASK_QUERY,
|
||||
&[&"failed", &task.error_message, &updated_at, &task.id],
|
||||
Some(1),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn execute_one(
|
||||
async fn execute(
|
||||
&mut self,
|
||||
query: &str,
|
||||
params: &[&(dyn ToSql + Sync)],
|
||||
expected_result_count: Option<u64>,
|
||||
) -> Result<u64, AsyncQueueError> {
|
||||
let result = if let Some(pool) = &self.pool {
|
||||
let connection = pool.get().await?;
|
||||
|
@ -75,14 +138,14 @@ where
|
|||
} else {
|
||||
return Err(AsyncQueueError::PoolAndTransactionEmpty);
|
||||
};
|
||||
|
||||
if result != 1 {
|
||||
return Err(AsyncQueueError::ResultError {
|
||||
expected: 1,
|
||||
found: result,
|
||||
});
|
||||
if let Some(expected_result) = expected_result_count {
|
||||
if result != expected_result {
|
||||
return Err(AsyncQueueError::ResultError {
|
||||
expected: expected_result,
|
||||
found: result,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
@ -122,6 +185,38 @@ mod async_queue_tests {
|
|||
let result = queue.insert_task(&Job { number: 1 }).await.unwrap();
|
||||
|
||||
assert_eq!(1, result);
|
||||
queue.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remove_all_tasks_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
let mut queue = AsyncQueue::<NoTls>::new_with_transaction(transaction);
|
||||
|
||||
let result = queue.insert_task(&Job { number: 1 }).await.unwrap();
|
||||
assert_eq!(1, result);
|
||||
let result = queue.insert_task(&Job { number: 2 }).await.unwrap();
|
||||
assert_eq!(1, result);
|
||||
let result = queue.remove_all_tasks().await.unwrap();
|
||||
assert_eq!(2, result);
|
||||
queue.rollback().await.unwrap();
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn remove_tasks_type_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
let mut queue = AsyncQueue::<NoTls>::new_with_transaction(transaction);
|
||||
|
||||
let result = queue.insert_task(&Job { number: 1 }).await.unwrap();
|
||||
assert_eq!(1, result);
|
||||
let result = queue.insert_task(&Job { number: 2 }).await.unwrap();
|
||||
assert_eq!(1, result);
|
||||
let result = queue.remove_tasks_type("common").await.unwrap();
|
||||
assert_eq!(2, result);
|
||||
queue.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
|
||||
|
|
40
src/lib.rs
40
src/lib.rs
|
@ -14,6 +14,46 @@
|
|||
// pub use scheduler::*;
|
||||
// pub use schema::*;
|
||||
// pub use worker_pool::*;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
|
||||
#[table_name = "fang_tasks"]
|
||||
pub struct Task {
|
||||
pub id: Uuid,
|
||||
pub metadata: serde_json::Value,
|
||||
pub error_message: Option<String>,
|
||||
pub state: FangTaskState,
|
||||
pub task_type: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
|
||||
#[table_name = "fang_periodic_tasks"]
|
||||
pub struct PeriodicTask {
|
||||
pub id: Uuid,
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
pub scheduled_at: Option<DateTime<Utc>>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "fang_tasks"]
|
||||
pub struct NewTask {
|
||||
pub metadata: serde_json::Value,
|
||||
pub task_type: String,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "fang_periodic_tasks"]
|
||||
pub struct NewPeriodicTask {
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
extern crate diesel;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::error::FangError;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Task;
|
||||
use crate::worker_pool::{SharedState, WorkerState};
|
||||
use crate::Task;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||
use log::error;
|
||||
|
@ -185,10 +185,10 @@ mod executor_tests {
|
|||
use super::Executor;
|
||||
use super::RetentionMode;
|
||||
use super::Runnable;
|
||||
use crate::queue::NewTask;
|
||||
use crate::queue::Queue;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::typetag;
|
||||
use crate::NewTask;
|
||||
use diesel::connection::Connection;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::executor::Runnable;
|
|||
use crate::schema::fang_periodic_tasks;
|
||||
use crate::schema::fang_tasks;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::{NewPeriodicTask, NewTask, PeriodicTask, Task};
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
|
@ -13,43 +14,6 @@ use dotenv::dotenv;
|
|||
use std::env;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
|
||||
#[table_name = "fang_tasks"]
|
||||
pub struct Task {
|
||||
pub id: Uuid,
|
||||
pub metadata: serde_json::Value,
|
||||
pub error_message: Option<String>,
|
||||
pub state: FangTaskState,
|
||||
pub task_type: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
|
||||
#[table_name = "fang_periodic_tasks"]
|
||||
pub struct PeriodicTask {
|
||||
pub id: Uuid,
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
pub scheduled_at: Option<DateTime<Utc>>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "fang_tasks"]
|
||||
pub struct NewTask {
|
||||
pub metadata: serde_json::Value,
|
||||
pub task_type: String,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "fang_periodic_tasks"]
|
||||
pub struct NewPeriodicTask {
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
}
|
||||
|
||||
pub struct Queue {
|
||||
pub connection: PgConnection,
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::executor::Runnable;
|
||||
use crate::queue::PeriodicTask;
|
||||
use crate::queue::Queue;
|
||||
use crate::PeriodicTask;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -82,9 +82,9 @@ mod job_scheduler_tests {
|
|||
use crate::executor::Error;
|
||||
use crate::executor::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Task;
|
||||
use crate::schema::fang_tasks;
|
||||
use crate::typetag;
|
||||
use crate::Task;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
|
@ -226,9 +226,9 @@ mod job_pool_tests {
|
|||
use crate::executor::RetentionMode;
|
||||
use crate::executor::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Task;
|
||||
use crate::schema::{fang_tasks, FangTaskState};
|
||||
use crate::typetag;
|
||||
use crate::Task;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
Loading…
Reference in a new issue