Async Improvements AsyncRunnable param (#51)

* AsyncRunnable param in insert_task and insert_periodic_task

* fix example
This commit is contained in:
Pmarquez 2022-07-31 15:15:00 +00:00 committed by GitHub
parent 133d142761
commit 7e66196373
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 56 deletions

View file

@ -26,8 +26,10 @@ impl AsyncRunnable for MyTask {
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
let new_task = MyTask::new(self.number + 1); let new_task = MyTask::new(self.number + 1);
let metadata = serde_json::to_value(&new_task as &dyn AsyncRunnable).unwrap(); queue
queue.insert_task(metadata, "common").await.unwrap(); .insert_task(&new_task as &dyn AsyncRunnable)
.await
.unwrap();
Ok(()) Ok(())
} }

View file

@ -31,11 +31,14 @@ async fn main() {
let task1 = MyTask::new(0); let task1 = MyTask::new(0);
let task2 = MyTask::new(20_000); let task2 = MyTask::new(20_000);
let metadata1 = serde_json::to_value(&task1 as &dyn AsyncRunnable).unwrap(); queue
let metadata2 = serde_json::to_value(&task2 as &dyn AsyncRunnable).unwrap(); .insert_task(&task1 as &dyn AsyncRunnable)
.await
queue.insert_task(metadata1, "common").await.unwrap(); .unwrap();
queue.insert_task(metadata2, "common").await.unwrap(); queue
.insert_task(&task2 as &dyn AsyncRunnable)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(100)).await; tokio::time::sleep(Duration::from_secs(100)).await;
} }

View file

@ -1,4 +1,5 @@
use crate::asynk::async_runnable::Error as FangError; use crate::asynk::async_runnable::Error as FangError;
use crate::AsyncRunnable;
use async_trait::async_trait; use async_trait::async_trait;
use bb8_postgres::bb8::Pool; use bb8_postgres::bb8::Pool;
use bb8_postgres::bb8::RunError; use bb8_postgres::bb8::RunError;
@ -95,6 +96,8 @@ pub enum AsyncQueueError {
PoolError(#[from] RunError<bb8_postgres::tokio_postgres::Error>), PoolError(#[from] RunError<bb8_postgres::tokio_postgres::Error>),
#[error(transparent)] #[error(transparent)]
PgError(#[from] bb8_postgres::tokio_postgres::Error), PgError(#[from] bb8_postgres::tokio_postgres::Error),
#[error(transparent)]
SerdeError(#[from] serde_json::Error),
#[error("returned invalid result (expected {expected:?}, found {found:?})")] #[error("returned invalid result (expected {expected:?}, found {found:?})")]
ResultError { expected: u64, found: u64 }, ResultError { expected: u64, found: u64 },
} }
@ -115,11 +118,7 @@ pub trait AsyncQueueable: Send {
task_type: Option<String>, task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError>; ) -> Result<Option<Task>, AsyncQueueError>;
async fn insert_task( async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
&mut self,
task: serde_json::Value,
task_type: &str,
) -> Result<Task, AsyncQueueError>;
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>; async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>; async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
@ -142,7 +141,7 @@ pub trait AsyncQueueable: Send {
async fn insert_periodic_task( async fn insert_periodic_task(
&mut self, &mut self,
metadata: serde_json::Value, task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
period: i32, period: i32,
) -> Result<PeriodicTask, AsyncQueueError>; ) -> Result<PeriodicTask, AsyncQueueError>;
@ -214,18 +213,20 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
Ok(task) Ok(task)
} }
async fn insert_task( async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
&mut self,
metadata: serde_json::Value,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
let transaction = &mut self.transaction; let transaction = &mut self.transaction;
let metadata = serde_json::to_value(task)?;
let task: Task = if self.duplicated_tasks { let task: Task = if self.duplicated_tasks {
AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, task_type).await? AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, &task.task_type()).await?
} else { } else {
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(transaction, metadata, task_type) AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
.await? transaction,
metadata,
&task.task_type(),
)
.await?
}; };
Ok(task) Ok(task)
} }
@ -243,12 +244,14 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
} }
async fn insert_periodic_task( async fn insert_periodic_task(
&mut self, &mut self,
metadata: serde_json::Value, task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
period: i32, period: i32,
) -> Result<PeriodicTask, AsyncQueueError> { ) -> Result<PeriodicTask, AsyncQueueError> {
let transaction = &mut self.transaction; let transaction = &mut self.transaction;
let metadata = serde_json::to_value(task)?;
let periodic_task = AsyncQueue::<NoTls>::insert_periodic_task_query( let periodic_task = AsyncQueue::<NoTls>::insert_periodic_task_query(
transaction, transaction,
metadata, metadata,
@ -341,27 +344,27 @@ where
}) })
} }
pub async fn remove_all_tasks_query( async fn remove_all_tasks_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> { ) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await
} }
pub async fn remove_task_query( async fn remove_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task: Task, task: Task,
) -> Result<u64, AsyncQueueError> { ) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
} }
pub async fn remove_tasks_type_query( async fn remove_tasks_type_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task_type: &str, task_type: &str,
) -> Result<u64, AsyncQueueError> { ) -> Result<u64, AsyncQueueError> {
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
} }
pub async fn fail_task_query( async fn fail_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task: Task, task: Task,
error_message: &str, error_message: &str,
@ -383,7 +386,7 @@ where
Ok(failed_task) Ok(failed_task)
} }
pub async fn fetch_and_touch_task_query( async fn fetch_and_touch_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task_type: Option<String>, task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> { ) -> Result<Option<Task>, AsyncQueueError> {
@ -407,7 +410,7 @@ where
Ok(result_task) Ok(result_task)
} }
pub async fn get_task_type_query( async fn get_task_type_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task_type: &str, task_type: &str,
) -> Result<Task, AsyncQueueError> { ) -> Result<Task, AsyncQueueError> {
@ -420,7 +423,7 @@ where
Ok(task) Ok(task)
} }
pub async fn update_task_state_query( async fn update_task_state_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task: Task, task: Task,
state: FangTaskState, state: FangTaskState,
@ -434,7 +437,7 @@ where
Ok(task) Ok(task)
} }
pub async fn insert_task_query( async fn insert_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
metadata: serde_json::Value, metadata: serde_json::Value,
task_type: &str, task_type: &str,
@ -445,7 +448,7 @@ where
let task = Self::row_to_task(row); let task = Self::row_to_task(row);
Ok(task) Ok(task)
} }
pub async fn schedule_next_task_query( async fn schedule_next_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
periodic_task: PeriodicTask, periodic_task: PeriodicTask,
) -> Result<PeriodicTask, AsyncQueueError> { ) -> Result<PeriodicTask, AsyncQueueError> {
@ -459,7 +462,7 @@ where
let periodic_task = Self::row_to_periodic_task(row); let periodic_task = Self::row_to_periodic_task(row);
Ok(periodic_task) Ok(periodic_task)
} }
pub async fn insert_periodic_task_query( async fn insert_periodic_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
metadata: serde_json::Value, metadata: serde_json::Value,
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
@ -475,7 +478,7 @@ where
Ok(periodic_task) Ok(periodic_task)
} }
pub async fn fetch_periodic_tasks_query( async fn fetch_periodic_tasks_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
error_margin_seconds: i64, error_margin_seconds: i64,
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> { ) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
@ -498,7 +501,7 @@ where
Ok(Some(periodic_tasks)) Ok(Some(periodic_tasks))
} }
} }
pub async fn execute_query( async fn execute_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
query: &str, query: &str,
params: &[&(dyn ToSql + Sync)], params: &[&(dyn ToSql + Sync)],
@ -517,7 +520,7 @@ where
Ok(result) Ok(result)
} }
pub async fn insert_task_if_not_exist_query( async fn insert_task_if_not_exist_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
metadata: serde_json::Value, metadata: serde_json::Value,
task_type: &str, task_type: &str,
@ -527,7 +530,7 @@ where
None => Self::insert_task_query(transaction, metadata, task_type).await, None => Self::insert_task_query(transaction, metadata, task_type).await,
} }
} }
pub async fn find_task_by_metadata_query( async fn find_task_by_metadata_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
metadata: &serde_json::Value, metadata: &serde_json::Value,
) -> Option<Task> { ) -> Option<Task> {
@ -606,18 +609,17 @@ where
Ok(task) Ok(task)
} }
async fn insert_task( async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
&mut self,
metadata: serde_json::Value,
task_type: &str,
) -> Result<Task, AsyncQueueError> {
let mut connection = self.pool.get().await?; let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?; let mut transaction = connection.transaction().await?;
let metadata = serde_json::to_value(task)?;
let task: Task = if self.duplicated_tasks { let task: Task = if self.duplicated_tasks {
Self::insert_task_query(&mut transaction, metadata, task_type).await? Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await?
} else { } else {
Self::insert_task_if_not_exist_query(&mut transaction, metadata, task_type).await? Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type())
.await?
}; };
transaction.commit().await?; transaction.commit().await?;
@ -627,13 +629,15 @@ where
async fn insert_periodic_task( async fn insert_periodic_task(
&mut self, &mut self,
metadata: serde_json::Value, task: &dyn AsyncRunnable,
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
period: i32, period: i32,
) -> Result<PeriodicTask, AsyncQueueError> { ) -> Result<PeriodicTask, AsyncQueueError> {
let mut connection = self.pool.get().await?; let mut connection = self.pool.get().await?;
let mut transaction = connection.transaction().await?; let mut transaction = connection.transaction().await?;
let metadata = serde_json::to_value(task)?;
let periodic_task = let periodic_task =
Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?; Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?;
@ -949,8 +953,7 @@ mod async_queue_tests {
} }
async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task { async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task {
let metadata = serde_json::to_value(task).unwrap(); test.insert_task(task).await.unwrap()
test.insert_task(metadata, &task.task_type()).await.unwrap()
} }
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> { async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {

View file

@ -76,14 +76,10 @@ impl<'a> Scheduler<'a> {
self.queue.schedule_next_task(task).await?; self.queue.schedule_next_task(task).await?;
} }
Some(_) => { Some(_) => {
let metadata = task.metadata.clone();
let actual_task: Box<dyn AsyncRunnable> = let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap(); serde_json::from_value(task.metadata.clone()).unwrap();
self.queue self.queue.insert_task(&*actual_task).await?;
.insert_task(metadata, &(*actual_task).task_type())
.await?;
self.queue.schedule_next_task(task).await?; self.queue.schedule_next_task(task).await?;
} }
@ -203,9 +199,7 @@ mod async_scheduler_tests {
timestamp: DateTime<Utc>, timestamp: DateTime<Utc>,
period_in_seconds: i32, period_in_seconds: i32,
) -> PeriodicTask { ) -> PeriodicTask {
let metadata = serde_json::to_value(task).unwrap(); test.insert_periodic_task(task, timestamp, period_in_seconds)
test.insert_periodic_task(metadata, timestamp, period_in_seconds)
.await .await
.unwrap() .unwrap()
} }

View file

@ -326,8 +326,7 @@ mod async_worker_tests {
test.transaction.rollback().await.unwrap(); test.transaction.rollback().await.unwrap();
} }
async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task { async fn insert_task(test: &mut AsyncQueueTest<'_>, task: &dyn AsyncRunnable) -> Task {
let metadata = serde_json::to_value(task).unwrap(); test.insert_task(task).await.unwrap()
test.insert_task(metadata, &task.task_type()).await.unwrap()
} }
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> { async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
let pg_mgr = PostgresConnectionManager::new_from_stringlike( let pg_mgr = PostgresConnectionManager::new_from_stringlike(