Remove Task by uniq hash (#90)

* Change asynk remove_task API (just uuid needed)

* Add headers to traits

* Remove task by uniq hash (asynk module)

* Remove task by uniq hash (blocking module)

* Error if task is not uniq

* Rename to remove_task_by_metadata

* Testing remove by metadata query
This commit is contained in:
Pmarquez 2022-09-14 11:28:31 +00:00 committed by GitHub
parent f4a2c55000
commit fed5b9f363
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 191 additions and 13 deletions

View file

@ -30,6 +30,7 @@ const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql")
const REMOVE_ALL_SCHEDULED_TASK_QUERY: &str = const REMOVE_ALL_SCHEDULED_TASK_QUERY: &str =
include_str!("queries/remove_all_scheduled_tasks.sql"); include_str!("queries/remove_all_scheduled_tasks.sql");
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
const REMOVE_TASK_BY_METADATA_QUERY: &str = include_str!("queries/remove_task_by_metadata.sql");
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql"); const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql");
@ -96,6 +97,8 @@ pub enum AsyncQueueError {
NotConnectedError, NotConnectedError,
#[error("Can not convert `std::time::Duration` to `chrono::Duration`")] #[error("Can not convert `std::time::Duration` to `chrono::Duration`")]
TimeError, TimeError,
#[error("Can not perform this operation if task is not uniq, please check its definition in impl AsyncRunnable")]
TaskNotUniqError,
} }
impl From<cron::error::Error> for AsyncQueueError { impl From<cron::error::Error> for AsyncQueueError {
@ -117,7 +120,12 @@ pub trait AsyncQueueable: Send {
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>; async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>; async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError>;
async fn remove_task_by_metadata(
&mut self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError>;
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>; async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
@ -259,10 +267,23 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
AsyncQueue::<NoTls>::remove_all_scheduled_tasks_query(transaction).await AsyncQueue::<NoTls>::remove_all_scheduled_tasks_query(transaction).await
} }
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> { async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction; let transaction = &mut self.transaction;
AsyncQueue::<NoTls>::remove_task_query(transaction, task).await AsyncQueue::<NoTls>::remove_task_query(transaction, id).await
}
async fn remove_task_by_metadata(
&mut self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError> {
if task.uniq() {
let transaction = &mut self.transaction;
AsyncQueue::<NoTls>::remove_task_by_metadata_query(transaction, task).await
} else {
Err(AsyncQueueError::TaskNotUniqError)
}
} }
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> { async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
@ -338,9 +359,26 @@ where
async fn remove_task_query( async fn remove_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task: Task, id: Uuid,
) -> Result<u64, AsyncQueueError> { ) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&id], Some(1)).await
}
async fn remove_task_by_metadata_query(
transaction: &mut Transaction<'_>,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError> {
let metadata = serde_json::to_value(task)?;
let uniq_hash = Self::calculate_hash(metadata.to_string());
Self::execute_query(
transaction,
REMOVE_TASK_BY_METADATA_QUERY,
&[&uniq_hash],
None,
)
.await
} }
async fn remove_tasks_type_query( async fn remove_tasks_type_query(
@ -671,18 +709,37 @@ where
Ok(result) Ok(result)
} }
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> { async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?; self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?; let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?; let mut transaction = connection.transaction().await?;
let result = Self::remove_task_query(&mut transaction, task).await?; let result = Self::remove_task_query(&mut transaction, id).await?;
transaction.commit().await?; transaction.commit().await?;
Ok(result) Ok(result)
} }
async fn remove_task_by_metadata(
&mut self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError> {
if task.uniq() {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_task_by_metadata_query(&mut transaction, task).await?;
transaction.commit().await?;
Ok(result)
} else {
Err(AsyncQueueError::TaskNotUniqError)
}
}
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> { async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?; self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?; let mut connection = self.pool.as_ref().unwrap().get().await?;
@ -758,6 +815,23 @@ mod async_queue_tests {
} }
} }
#[derive(Serialize, Deserialize)]
struct AsyncUniqTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncUniqTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
Ok(())
}
fn uniq(&self) -> bool {
true
}
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct AsyncTaskSchedule { struct AsyncTaskSchedule {
pub number: u16, pub number: u16,
@ -1019,6 +1093,47 @@ mod async_queue_tests {
test.transaction.rollback().await.unwrap(); test.transaction.rollback().await.unwrap();
} }
#[tokio::test]
async fn remove_tasks_by_metadata() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let task = insert_task(&mut test, &AsyncUniqTask { number: 1 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(1), number);
assert_eq!(Some("AsyncUniqTask"), type_task);
let task = insert_task(&mut test, &AsyncUniqTask { number: 2 }).await;
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(2), number);
assert_eq!(Some("AsyncUniqTask"), type_task);
let result = test
.remove_task_by_metadata(&AsyncUniqTask { number: 0 })
.await
.unwrap();
assert_eq!(0, result);
let result = test
.remove_task_by_metadata(&AsyncUniqTask { number: 1 })
.await
.unwrap();
assert_eq!(1, result);
test.transaction.rollback().await.unwrap();
}
async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task { async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task {
test.insert_task(task).await.unwrap() test.insert_task(task).await.unwrap()
} }

View file

@ -68,17 +68,17 @@ where
}, },
RetentionMode::RemoveAll => match result { RetentionMode::RemoveAll => match result {
Ok(task) => { Ok(task) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
Err((task, _error)) => { Err((task, _error)) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
}, },
RetentionMode::RemoveFinished => match result { RetentionMode::RemoveFinished => match result {
Ok(task) => { Ok(task) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
Err((task, error)) => { Err((task, error)) => {
@ -185,17 +185,17 @@ impl<'a> AsyncWorkerTest<'a> {
}, },
RetentionMode::RemoveAll => match result { RetentionMode::RemoveAll => match result {
Ok(task) => { Ok(task) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
Err((task, _error)) => { Err((task, _error)) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
}, },
RetentionMode::RemoveFinished => match result { RetentionMode::RemoveFinished => match result {
Ok(task) => { Ok(task) => {
self.queue.remove_task(task).await?; self.queue.remove_task(task.id).await?;
Ok(()) Ok(())
} }
Err((task, error)) => { Err((task, error)) => {

View file

@ -0,0 +1 @@
DELETE FROM "fang_tasks" WHERE uniq_hash = $1

View file

@ -71,6 +71,8 @@ pub enum QueueError {
PoolError(#[from] PoolError), PoolError(#[from] PoolError),
#[error(transparent)] #[error(transparent)]
CronError(#[from] CronError), CronError(#[from] CronError),
#[error("Can not perform this operation if task is not uniq, please check its definition in impl Runnable")]
TaskNotUniqError,
} }
impl From<cron::error::Error> for QueueError { impl From<cron::error::Error> for QueueError {
@ -92,6 +94,10 @@ pub trait Queueable {
fn remove_task(&self, id: Uuid) -> Result<usize, QueueError>; fn remove_task(&self, id: Uuid) -> Result<usize, QueueError>;
/// To use this function task has to be uniq. uniq() has to return true.
/// If task is not uniq this function will not do anything.
fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result<usize, QueueError>;
fn find_task_by_id(&self, id: Uuid) -> Option<Task>; fn find_task_by_id(&self, id: Uuid) -> Option<Task>;
fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError>; fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError>;
@ -149,6 +155,18 @@ impl Queueable for Queue {
Self::remove_task_query(&mut connection, id) Self::remove_task_query(&mut connection, id)
} }
/// To use this function task has to be uniq. uniq() has to return true.
/// If task is not uniq this function will not do anything.
fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result<usize, QueueError> {
if task.uniq() {
let mut connection = self.get_connection()?;
Self::remove_task_by_metadata_query(&mut connection, task)
} else {
Err(QueueError::TaskNotUniqError)
}
}
fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError> { fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?; let mut connection = self.get_connection()?;
@ -304,6 +322,19 @@ impl Queue {
Ok(diesel::delete(query).execute(connection)?) Ok(diesel::delete(query).execute(connection)?)
} }
pub fn remove_task_by_metadata_query(
connection: &mut PgConnection,
task: &dyn Runnable,
) -> Result<usize, QueueError> {
let metadata = serde_json::to_value(task).unwrap();
let uniq_hash = Self::calculate_hash(metadata.to_string());
let query = fang_tasks::table.filter(fang_tasks::uniq_hash.eq(uniq_hash));
Ok(diesel::delete(query).execute(connection)?)
}
pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result<usize, QueueError> { pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
@ -775,4 +806,35 @@ mod queue_tests {
Ok(()) Ok(())
}); });
} }
#[test]
fn remove_task_by_metadata() {
let m_task1 = PepeTask { number: 10 };
let m_task2 = PepeTask { number: 11 };
let m_task3 = AyratTask { number: 10 };
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &m_task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(conn, &m_task2, Utc::now()).unwrap();
let task3 = Queue::insert_query(conn, &m_task3, Utc::now()).unwrap();
assert!(Queue::find_task_by_id_query(conn, task1.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task3.id).is_some());
Queue::remove_task_by_metadata_query(conn, &m_task1).unwrap();
assert!(Queue::find_task_by_id_query(conn, task1.id).is_none());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task3.id).is_some());
Ok(())
});
}
} }