Async Worker (#44)
* stating with async worker * async sleep solve and update state test * uncomenting line * retention mode and sleep params in lib.rs * fixing code style * trying to solve * dont like changes * Add AsyncQueueable trait (#45) * add trait * update runnable trait * fix test Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
parent
d585bde870
commit
8d0a23e2f9
9 changed files with 383 additions and 178 deletions
|
@ -12,6 +12,7 @@ rust-version = "1.62"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
default = ["blocking", "asynk"]
|
||||||
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
||||||
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder"]
|
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder"]
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::asynk::AsyncRunnable;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::bb8::RunError;
|
use bb8_postgres::bb8::RunError;
|
||||||
use bb8_postgres::tokio_postgres::row::Row;
|
use bb8_postgres::tokio_postgres::row::Row;
|
||||||
|
@ -90,6 +90,7 @@ pub struct NewPeriodicTask {
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pub period_in_seconds: i32,
|
pub period_in_seconds: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum AsyncQueueError {
|
pub enum AsyncQueueError {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
|
@ -100,6 +101,36 @@ pub enum AsyncQueueError {
|
||||||
ResultError { expected: u64, found: u64 },
|
ResultError { expected: u64, found: u64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait AsyncQueueable {
|
||||||
|
async fn fetch_and_touch_task(
|
||||||
|
&mut self,
|
||||||
|
task_type: &Option<String>,
|
||||||
|
) -> Result<Option<Task>, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn insert_task(
|
||||||
|
&mut self,
|
||||||
|
task: serde_json::Value,
|
||||||
|
task_type: &str,
|
||||||
|
) -> Result<Task, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn update_task_state(
|
||||||
|
&mut self,
|
||||||
|
task: Task,
|
||||||
|
state: FangTaskState,
|
||||||
|
) -> Result<Task, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn fail_task(&mut self, task: Task, error_message: &str)
|
||||||
|
-> Result<Task, AsyncQueueError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct AsyncQueue<Tls>
|
pub struct AsyncQueue<Tls>
|
||||||
where
|
where
|
||||||
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
|
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
|
||||||
|
@ -117,80 +148,11 @@ where
|
||||||
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
pub fn new(pool: Pool<PostgresConnectionManager<Tls>>) -> Self {
|
pub async fn connect(uri: impl ToString, tls: Tls) -> Result<Self, AsyncQueueError> {
|
||||||
AsyncQueue { pool }
|
let manager = PostgresConnectionManager::new_from_stringlike(uri, tls)?;
|
||||||
}
|
let pool = Pool::builder().build(manager).await?;
|
||||||
|
|
||||||
pub async fn fetch_and_touch_task(
|
Ok(Self { pool })
|
||||||
&mut self,
|
|
||||||
task_type: &Option<String>,
|
|
||||||
) -> Result<Task, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let task = Self::insert_task_query(&mut transaction, task).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let result = Self::remove_all_tasks_query(&mut transaction).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove_task(&mut self, task: &Task) -> Result<u64, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let result = Self::remove_task_query(&mut transaction, task).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fail_task(
|
|
||||||
&mut self,
|
|
||||||
task: Task,
|
|
||||||
error_message: &str,
|
|
||||||
) -> Result<Task, AsyncQueueError> {
|
|
||||||
let mut connection = self.pool.get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_all_tasks_query(
|
pub async fn remove_all_tasks_query(
|
||||||
|
@ -201,7 +163,7 @@ where
|
||||||
|
|
||||||
pub async fn remove_task_query(
|
pub async fn remove_task_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
task: &Task,
|
task: Task,
|
||||||
) -> Result<u64, AsyncQueueError> {
|
) -> Result<u64, AsyncQueueError> {
|
||||||
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
|
Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await
|
||||||
}
|
}
|
||||||
|
@ -238,19 +200,25 @@ where
|
||||||
pub async fn fetch_and_touch_task_query(
|
pub async fn fetch_and_touch_task_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
task_type: &Option<String>,
|
task_type: &Option<String>,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Option<Task>, AsyncQueueError> {
|
||||||
let task_type = match task_type {
|
let task_type = match task_type {
|
||||||
Some(passed_task_type) => passed_task_type,
|
Some(passed_task_type) => passed_task_type,
|
||||||
None => DEFAULT_TASK_TYPE,
|
None => DEFAULT_TASK_TYPE,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut task = Self::get_task_type_query(transaction, task_type).await?;
|
let task = match Self::get_task_type_query(transaction, task_type).await {
|
||||||
|
Ok(some_task) => Some(some_task),
|
||||||
Self::update_task_state_query(transaction, &task, FangTaskState::InProgress).await?;
|
Err(_) => None,
|
||||||
|
};
|
||||||
task.state = FangTaskState::InProgress;
|
let result_task = if let Some(some_task) = task {
|
||||||
|
Some(
|
||||||
Ok(task)
|
Self::update_task_state_query(transaction, some_task, FangTaskState::InProgress)
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
Ok(result_task)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_task_type_query(
|
pub async fn get_task_type_query(
|
||||||
|
@ -268,27 +236,23 @@ where
|
||||||
|
|
||||||
pub async fn update_task_state_query(
|
pub async fn update_task_state_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
task: &Task,
|
task: Task,
|
||||||
state: FangTaskState,
|
state: FangTaskState,
|
||||||
) -> Result<u64, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let updated_at = Utc::now();
|
let updated_at = Utc::now();
|
||||||
|
|
||||||
Self::execute_query(
|
let row: Row = transaction
|
||||||
transaction,
|
.query_one(UPDATE_TASK_STATE_QUERY, &[&state, &updated_at, &task.id])
|
||||||
UPDATE_TASK_STATE_QUERY,
|
.await?;
|
||||||
&[&state, &updated_at, &task.id],
|
let task = Self::row_to_task(row);
|
||||||
Some(1),
|
Ok(task)
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert_task_query(
|
pub async fn insert_task_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
task: &dyn AsyncRunnable,
|
metadata: serde_json::Value,
|
||||||
|
task_type: &str,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let metadata = serde_json::to_value(task).unwrap();
|
|
||||||
let task_type = task.task_type();
|
|
||||||
|
|
||||||
let row: Row = transaction
|
let row: Row = transaction
|
||||||
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
|
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -339,16 +303,117 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<Tls> AsyncQueueable for AsyncQueue<Tls>
|
||||||
|
where
|
||||||
|
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
|
{
|
||||||
|
async fn fetch_and_touch_task(
|
||||||
|
&mut self,
|
||||||
|
task_type: &Option<String>,
|
||||||
|
) -> Result<Option<Task>, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn insert_task(
|
||||||
|
&mut self,
|
||||||
|
metadata: serde_json::Value,
|
||||||
|
task_type: &str,
|
||||||
|
) -> Result<Task, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let task = Self::insert_task_query(&mut transaction, metadata, task_type).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let result = Self::remove_all_tasks_query(&mut transaction).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let result = Self::remove_task_query(&mut transaction, task).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_task_state(
|
||||||
|
&mut self,
|
||||||
|
task: Task,
|
||||||
|
state: FangTaskState,
|
||||||
|
) -> Result<Task, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let task = Self::update_task_state_query(&mut transaction, task, state).await?;
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fail_task(
|
||||||
|
&mut self,
|
||||||
|
task: Task,
|
||||||
|
error_message: &str,
|
||||||
|
) -> Result<Task, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod async_queue_tests {
|
mod async_queue_tests {
|
||||||
use super::AsyncQueue;
|
use super::AsyncQueue;
|
||||||
|
use super::AsyncQueueable;
|
||||||
use super::FangTaskState;
|
use super::FangTaskState;
|
||||||
|
use super::Task;
|
||||||
use crate::asynk::AsyncRunnable;
|
use crate::asynk::AsyncRunnable;
|
||||||
use crate::asynk::Error;
|
use crate::asynk::Error;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::tokio_postgres::Client;
|
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
use bb8_postgres::tokio_postgres::NoTls;
|
||||||
|
use bb8_postgres::tokio_postgres::Transaction;
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
use bb8_postgres::PostgresConnectionManager;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
@ -358,9 +423,9 @@ mod async_queue_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
#[async_trait]
|
#[async_trait(?Send)]
|
||||||
impl AsyncRunnable for AsyncTask {
|
impl AsyncRunnable for AsyncTask {
|
||||||
async fn run(&self, _connection: &Client) -> Result<(), Error> {
|
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -371,10 +436,14 @@ mod async_queue_tests {
|
||||||
let mut connection = pool.get().await.unwrap();
|
let mut connection = pool.get().await.unwrap();
|
||||||
let mut transaction = connection.transaction().await.unwrap();
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
|
let task = AsyncTask { number: 1 };
|
||||||
|
let metadata = serde_json::to_value(&task as &dyn AsyncRunnable).unwrap();
|
||||||
|
|
||||||
let task =
|
let task =
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
|
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, metadata, &task.task_type())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -383,29 +452,62 @@ mod async_queue_tests {
|
||||||
assert_eq!(Some("AsyncTask"), type_task);
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
transaction.rollback().await.unwrap();
|
transaction.rollback().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn update_task_state_test() {
|
||||||
|
let pool = pool().await;
|
||||||
|
let mut connection = pool.get().await.unwrap();
|
||||||
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
|
let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await;
|
||||||
|
|
||||||
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
|
let number = metadata["number"].as_u64();
|
||||||
|
let type_task = metadata["type"].as_str();
|
||||||
|
let id = task.id;
|
||||||
|
|
||||||
|
assert_eq!(Some(1), number);
|
||||||
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
|
|
||||||
|
let finished_task = AsyncQueue::<NoTls>::update_task_state_query(
|
||||||
|
&mut transaction,
|
||||||
|
task,
|
||||||
|
FangTaskState::Finished,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(id, finished_task.id);
|
||||||
|
assert_eq!(FangTaskState::Finished, finished_task.state);
|
||||||
|
|
||||||
|
transaction.rollback().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn failed_task_query_test() {
|
async fn failed_task_query_test() {
|
||||||
let pool = pool().await;
|
let pool = pool().await;
|
||||||
let mut connection = pool.get().await.unwrap();
|
let mut connection = pool.get().await.unwrap();
|
||||||
let mut transaction = connection.transaction().await.unwrap();
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
let id = task.id;
|
let id = task.id;
|
||||||
|
|
||||||
assert_eq!(Some(1), number);
|
assert_eq!(Some(1), number);
|
||||||
assert_eq!(Some("AsyncTask"), type_task);
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
|
|
||||||
let failed_task =
|
let failed_task =
|
||||||
AsyncQueue::<NoTls>::fail_task_query(&mut transaction, task, "Some error")
|
AsyncQueue::<NoTls>::fail_task_query(&mut transaction, task, "Some error")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(id, failed_task.id);
|
assert_eq!(id, failed_task.id);
|
||||||
assert_eq!(Some("Some error"), failed_task.error_message.as_deref());
|
assert_eq!(Some("Some error"), failed_task.error_message.as_deref());
|
||||||
assert_eq!(FangTaskState::Failed, failed_task.state);
|
assert_eq!(FangTaskState::Failed, failed_task.state);
|
||||||
|
|
||||||
transaction.rollback().await.unwrap();
|
transaction.rollback().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,10 +517,7 @@ mod async_queue_tests {
|
||||||
let mut connection = pool.get().await.unwrap();
|
let mut connection = pool.get().await.unwrap();
|
||||||
let mut transaction = connection.transaction().await.unwrap();
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
|
@ -427,10 +526,8 @@ mod async_queue_tests {
|
||||||
assert_eq!(Some(1), number);
|
assert_eq!(Some(1), number);
|
||||||
assert_eq!(Some("AsyncTask"), type_task);
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -452,10 +549,8 @@ mod async_queue_tests {
|
||||||
let mut connection = pool.get().await.unwrap();
|
let mut connection = pool.get().await.unwrap();
|
||||||
let mut transaction = connection.transaction().await.unwrap();
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -463,10 +558,8 @@ mod async_queue_tests {
|
||||||
assert_eq!(Some(1), number);
|
assert_eq!(Some(1), number);
|
||||||
assert_eq!(Some("AsyncTask"), type_task);
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -476,7 +569,9 @@ mod async_queue_tests {
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(&mut transaction, &None)
|
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(&mut transaction, &None)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -486,6 +581,7 @@ mod async_queue_tests {
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(&mut transaction, &None)
|
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(&mut transaction, &None)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
|
@ -503,10 +599,8 @@ mod async_queue_tests {
|
||||||
let mut connection = pool.get().await.unwrap();
|
let mut connection = pool.get().await.unwrap();
|
||||||
let mut transaction = connection.transaction().await.unwrap();
|
let mut transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 1 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -514,10 +608,8 @@ mod async_queue_tests {
|
||||||
assert_eq!(Some(1), number);
|
assert_eq!(Some(1), number);
|
||||||
assert_eq!(Some("AsyncTask"), type_task);
|
assert_eq!(Some("AsyncTask"), type_task);
|
||||||
|
|
||||||
let task =
|
let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await;
|
||||||
AsyncQueue::<NoTls>::insert_task_query(&mut transaction, &AsyncTask { number: 2 })
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
let number = metadata["number"].as_u64();
|
let number = metadata["number"].as_u64();
|
||||||
let type_task = metadata["type"].as_str();
|
let type_task = metadata["type"].as_str();
|
||||||
|
@ -547,4 +639,12 @@ mod async_queue_tests {
|
||||||
|
|
||||||
Pool::builder().build(pg_mgr).await.unwrap()
|
Pool::builder().build(pg_mgr).await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn insert_task(transaction: &mut Transaction<'_>, task: &dyn AsyncRunnable) -> Task {
|
||||||
|
let metadata = serde_json::to_value(task).unwrap();
|
||||||
|
|
||||||
|
AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, &task.task_type())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::tokio_postgres::Client;
|
|
||||||
|
|
||||||
const COMMON_TYPE: &str = "common";
|
const COMMON_TYPE: &str = "common";
|
||||||
|
|
||||||
|
@ -9,9 +9,9 @@ pub struct Error {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde(tag = "type")]
|
#[typetag::serde(tag = "type")]
|
||||||
#[async_trait]
|
#[async_trait(?Send)]
|
||||||
pub trait AsyncRunnable {
|
pub trait AsyncRunnable {
|
||||||
async fn run(&self, client: &Client) -> Result<(), Error>;
|
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>;
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
fn task_type(&self) -> String {
|
||||||
COMMON_TYPE.to_string()
|
COMMON_TYPE.to_string()
|
||||||
|
|
109
src/asynk/async_worker.rs
Normal file
109
src/asynk/async_worker.rs
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
use crate::asynk::async_queue::AsyncQueue;
|
||||||
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
|
use crate::asynk::async_queue::FangTaskState;
|
||||||
|
use crate::asynk::async_queue::Task;
|
||||||
|
use crate::asynk::async_runnable::AsyncRunnable;
|
||||||
|
use crate::asynk::Error;
|
||||||
|
use crate::{RetentionMode, SleepParams};
|
||||||
|
use bb8_postgres::tokio_postgres::tls::MakeTlsConnect;
|
||||||
|
use bb8_postgres::tokio_postgres::tls::TlsConnect;
|
||||||
|
use bb8_postgres::tokio_postgres::Socket;
|
||||||
|
use log::error;
|
||||||
|
use std::time::Duration;
|
||||||
|
use typed_builder::TypedBuilder;
|
||||||
|
|
||||||
|
#[derive(TypedBuilder, Debug)]
|
||||||
|
pub struct AsyncWorker<Tls>
|
||||||
|
where
|
||||||
|
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
|
{
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub queue: AsyncQueue<Tls>,
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub task_type: Option<String>,
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub sleep_params: SleepParams,
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub retention_mode: RetentionMode,
|
||||||
|
}
|
||||||
|
impl<Tls> AsyncWorker<Tls>
|
||||||
|
where
|
||||||
|
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
|
||||||
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
|
{
|
||||||
|
pub async fn run(&mut self, task: Task) {
|
||||||
|
let result = self.execute_task(task).await;
|
||||||
|
self.finalize_task(result).await
|
||||||
|
}
|
||||||
|
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
|
||||||
|
let actual_task: Box<dyn AsyncRunnable> =
|
||||||
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
|
let task_result = actual_task.run(&mut self.queue).await;
|
||||||
|
match task_result {
|
||||||
|
Ok(()) => Ok(task),
|
||||||
|
Err(error) => Err((task, error.description)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn finalize_task(&mut self, result: Result<Task, (Task, String)>) {
|
||||||
|
match self.retention_mode {
|
||||||
|
RetentionMode::KeepAll => {
|
||||||
|
match result {
|
||||||
|
Ok(task) => self
|
||||||
|
.queue
|
||||||
|
.update_task_state(task, FangTaskState::Finished)
|
||||||
|
.await
|
||||||
|
.unwrap(),
|
||||||
|
Err((task, error)) => self.queue.fail_task(task, &error).await.unwrap(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
RetentionMode::RemoveAll => {
|
||||||
|
match result {
|
||||||
|
Ok(task) => self.queue.remove_task(task).await.unwrap(),
|
||||||
|
Err((task, _error)) => self.queue.remove_task(task).await.unwrap(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
RetentionMode::RemoveFinished => match result {
|
||||||
|
Ok(task) => {
|
||||||
|
self.queue.remove_task(task).await.unwrap();
|
||||||
|
}
|
||||||
|
Err((task, error)) => {
|
||||||
|
self.queue.fail_task(task, &error).await.unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn sleep(&mut self) {
|
||||||
|
self.sleep_params.maybe_increase_sleep_period();
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await;
|
||||||
|
}
|
||||||
|
pub async fn run_tasks(&mut self) -> Result<(), Error> {
|
||||||
|
loop {
|
||||||
|
match self
|
||||||
|
.queue
|
||||||
|
.fetch_and_touch_task(&self.task_type.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Some(task)) => {
|
||||||
|
self.sleep_params.maybe_reset_sleep_period();
|
||||||
|
self.run(task).await;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
self.sleep().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(error) => {
|
||||||
|
error!("Failed to fetch a task {:?}", error);
|
||||||
|
|
||||||
|
self.sleep().await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
pub mod async_queue;
|
pub mod async_queue;
|
||||||
pub mod async_runnable;
|
pub mod async_runnable;
|
||||||
|
pub mod async_worker;
|
||||||
pub use async_runnable::AsyncRunnable;
|
pub use async_runnable::AsyncRunnable;
|
||||||
pub use async_runnable::Error;
|
pub use async_runnable::Error;
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3
|
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use crate::error::FangError;
|
use crate::error::FangError;
|
||||||
use crate::queue::Queue;
|
use crate::queue::Queue;
|
||||||
|
use crate::queue::Task;
|
||||||
use crate::worker_pool::{SharedState, WorkerState};
|
use crate::worker_pool::{SharedState, WorkerState};
|
||||||
use crate::Task;
|
use crate::{RetentionMode, SleepParams};
|
||||||
use diesel::pg::PgConnection;
|
use diesel::pg::PgConnection;
|
||||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||||
use log::error;
|
use log::error;
|
||||||
|
@ -15,47 +16,6 @@ pub struct Executor {
|
||||||
pub retention_mode: RetentionMode,
|
pub retention_mode: RetentionMode,
|
||||||
shared_state: Option<SharedState>,
|
shared_state: Option<SharedState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum RetentionMode {
|
|
||||||
KeepAll,
|
|
||||||
RemoveAll,
|
|
||||||
RemoveFinished,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SleepParams {
|
|
||||||
pub sleep_period: u64,
|
|
||||||
pub max_sleep_period: u64,
|
|
||||||
pub min_sleep_period: u64,
|
|
||||||
pub sleep_step: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SleepParams {
|
|
||||||
pub fn maybe_reset_sleep_period(&mut self) {
|
|
||||||
if self.sleep_period != self.min_sleep_period {
|
|
||||||
self.sleep_period = self.min_sleep_period;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn maybe_increase_sleep_period(&mut self) {
|
|
||||||
if self.sleep_period < self.max_sleep_period {
|
|
||||||
self.sleep_period += self.sleep_step;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for SleepParams {
|
|
||||||
fn default() -> Self {
|
|
||||||
SleepParams {
|
|
||||||
sleep_period: 5,
|
|
||||||
max_sleep_period: 15,
|
|
||||||
min_sleep_period: 5,
|
|
||||||
sleep_step: 5,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Error {
|
pub struct Error {
|
||||||
pub description: String,
|
pub description: String,
|
||||||
|
|
|
@ -2,9 +2,8 @@ use crate::diesel::r2d2;
|
||||||
use crate::diesel::PgConnection;
|
use crate::diesel::PgConnection;
|
||||||
use crate::error::FangError;
|
use crate::error::FangError;
|
||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::executor::RetentionMode;
|
|
||||||
use crate::executor::SleepParams;
|
|
||||||
use crate::queue::Queue;
|
use crate::queue::Queue;
|
||||||
|
use crate::{RetentionMode, SleepParams};
|
||||||
use log::error;
|
use log::error;
|
||||||
use log::info;
|
use log::info;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@ -223,11 +222,11 @@ mod task_pool_tests {
|
||||||
use super::WorkerParams;
|
use super::WorkerParams;
|
||||||
use super::WorkerPool;
|
use super::WorkerPool;
|
||||||
use crate::executor::Error;
|
use crate::executor::Error;
|
||||||
use crate::executor::RetentionMode;
|
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
use crate::queue::Queue;
|
use crate::queue::Queue;
|
||||||
use crate::schema::{fang_tasks, FangTaskState};
|
use crate::schema::{fang_tasks, FangTaskState};
|
||||||
use crate::typetag;
|
use crate::typetag;
|
||||||
|
use crate::RetentionMode;
|
||||||
use crate::Task;
|
use crate::Task;
|
||||||
use diesel::pg::PgConnection;
|
use diesel::pg::PgConnection;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
36
src/lib.rs
36
src/lib.rs
|
@ -1,4 +1,40 @@
|
||||||
#![allow(clippy::extra_unused_lifetimes)]
|
#![allow(clippy::extra_unused_lifetimes)]
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub enum RetentionMode {
|
||||||
|
KeepAll,
|
||||||
|
RemoveAll,
|
||||||
|
RemoveFinished,
|
||||||
|
}
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct SleepParams {
|
||||||
|
pub sleep_period: u64,
|
||||||
|
pub max_sleep_period: u64,
|
||||||
|
pub min_sleep_period: u64,
|
||||||
|
pub sleep_step: u64,
|
||||||
|
}
|
||||||
|
impl SleepParams {
|
||||||
|
pub fn maybe_reset_sleep_period(&mut self) {
|
||||||
|
if self.sleep_period != self.min_sleep_period {
|
||||||
|
self.sleep_period = self.min_sleep_period;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn maybe_increase_sleep_period(&mut self) {
|
||||||
|
if self.sleep_period < self.max_sleep_period {
|
||||||
|
self.sleep_period += self.sleep_step;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Default for SleepParams {
|
||||||
|
fn default() -> Self {
|
||||||
|
SleepParams {
|
||||||
|
sleep_period: 5,
|
||||||
|
max_sleep_period: 15,
|
||||||
|
min_sleep_period: 5,
|
||||||
|
sleep_step: 5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
#[cfg(feature = "blocking")]
|
#[cfg(feature = "blocking")]
|
||||||
|
|
Loading…
Reference in a new issue