diff --git a/.gitignore b/.gitignore index 96ef6c0..626b0a1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target Cargo.lock +src/schema.rs diff --git a/Cargo.toml b/Cargo.toml index fb739d7..d8d67aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ rust-version = "1.62" [features] default = ["blocking", "asynk"] blocking = ["diesel", "diesel-derive-enum", "dotenv"] -asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder"] +asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"] [dependencies] chrono = "0.4" @@ -45,6 +45,7 @@ version = "0.8" features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"] optional = true + [dependencies.postgres-types] version = "0.X.X" features = ["derive"] @@ -62,3 +63,8 @@ optional = true [dependencies.typed-builder] version = "0.10" optional = true + + +[dependencies.async-recursion] +version = "1" +optional = true \ No newline at end of file diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 5b90ca9..2fb3e9f 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -10,6 +10,7 @@ use bb8_postgres::tokio_postgres::Socket; use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; use chrono::DateTime; +use chrono::Duration; use chrono::Utc; use postgres_types::{FromSql, ToSql}; use thiserror::Error; @@ -17,14 +18,21 @@ use typed_builder::TypedBuilder; use uuid::Uuid; const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql"); +const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql"); +const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql"); const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql"); const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql"); const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); +const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql"); +const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql"); + #[cfg(test)] -const GET_TASK_BY_ID_QUERY: &str = include_str!("queries/get_task_by_id.sql"); +const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql"); +#[cfg(test)] +const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql"); pub const DEFAULT_TASK_TYPE: &str = "common"; @@ -40,11 +48,13 @@ pub enum FangTaskState { #[postgres(name = "finished")] Finished, } + impl Default for FangTaskState { fn default() -> Self { FangTaskState::New } } + #[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] pub struct Task { #[builder(setter(into))] @@ -79,22 +89,6 @@ pub struct PeriodicTask { pub updated_at: DateTime, } -#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] -pub struct NewTask { - #[builder(setter(into))] - pub metadata: serde_json::Value, - #[builder(setter(into))] - pub task_type: String, -} - -#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] -pub struct NewPeriodicTask { - #[builder(setter(into))] - pub metadata: serde_json::Value, - #[builder(setter(into))] - pub period_in_seconds: i32, -} - #[derive(Debug, Error)] pub enum AsyncQueueError { #[error(transparent)] @@ -104,6 +98,7 @@ pub enum AsyncQueueError { #[error("returned invalid result (expected {expected:?}, found {found:?})")] ResultError { expected: u64, found: u64 }, } + impl From for FangError { fn from(error: AsyncQueueError) -> Self { let message = format!("{:?}", error); @@ -112,11 +107,12 @@ impl From for FangError { } } } + #[async_trait] pub trait AsyncQueueable { async fn fetch_and_touch_task( &mut self, - task_type: &Option, + task_type: Option, ) -> Result, AsyncQueueError>; async fn insert_task( @@ -124,7 +120,6 @@ pub trait AsyncQueueable { task: serde_json::Value, task_type: &str, ) -> Result; - async fn remove_all_tasks(&mut self) -> Result; async fn remove_task(&mut self, task: Task) -> Result; @@ -139,9 +134,25 @@ pub trait AsyncQueueable { async fn fail_task(&mut self, task: Task, error_message: &str) -> Result; + + async fn fetch_periodic_tasks( + &mut self, + error_margin_seconds: i64, + ) -> Result>, AsyncQueueError>; + + async fn insert_periodic_task( + &mut self, + metadata: serde_json::Value, + timestamp: DateTime, + period: i32, + ) -> Result; + async fn schedule_next_task( + &mut self, + periodic_task: PeriodicTask, + ) -> Result; } -#[derive(Debug, Clone)] +#[derive(TypedBuilder, Debug, Clone)] pub struct AsyncQueue where Tls: MakeTlsConnect + Clone + Send + Sync + 'static, @@ -149,25 +160,44 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { + #[builder(setter(into))] pool: Pool>, + #[builder(default = false, setter(into))] + duplicated_tasks: bool, } #[cfg(test)] +#[derive(TypedBuilder)] pub struct AsyncQueueTest<'a> { + #[builder(setter(into))] pub transaction: Transaction<'a>, + #[builder(default = false, setter(into))] + pub duplicated_tasks: bool, } #[cfg(test)] impl<'a> AsyncQueueTest<'a> { - pub async fn get_task_by_id(&mut self, id: Uuid) -> Result { + pub async fn find_task_by_id(&mut self, id: Uuid) -> Result { let row: Row = self .transaction - .query_one(GET_TASK_BY_ID_QUERY, &[&id]) + .query_one(FIND_TASK_BY_ID_QUERY, &[&id]) .await?; let task = AsyncQueue::::row_to_task(row); Ok(task) } + pub async fn find_periodic_task_by_id( + &mut self, + id: Uuid, + ) -> Result { + let row: Row = self + .transaction + .query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id]) + .await?; + + let task = AsyncQueue::::row_to_periodic_task(row); + Ok(task) + } } #[cfg(test)] @@ -175,7 +205,7 @@ impl<'a> AsyncQueueTest<'a> { impl AsyncQueueable for AsyncQueueTest<'_> { async fn fetch_and_touch_task( &mut self, - task_type: &Option, + task_type: Option, ) -> Result, AsyncQueueError> { let transaction = &mut self.transaction; @@ -191,11 +221,57 @@ impl AsyncQueueable for AsyncQueueTest<'_> { ) -> Result { let transaction = &mut self.transaction; - let task = AsyncQueue::::insert_task_query(transaction, metadata, task_type).await?; - + let task: Task = if self.duplicated_tasks { + AsyncQueue::::insert_task_query(transaction, metadata, task_type).await? + } else { + AsyncQueue::::insert_task_if_not_exist_query(transaction, metadata, task_type) + .await? + }; Ok(task) } + async fn schedule_next_task( + &mut self, + periodic_task: PeriodicTask, + ) -> Result { + let transaction = &mut self.transaction; + + let periodic_task = + AsyncQueue::::schedule_next_task_query(transaction, periodic_task).await?; + + Ok(periodic_task) + } + async fn insert_periodic_task( + &mut self, + metadata: serde_json::Value, + timestamp: DateTime, + period: i32, + ) -> Result { + let transaction = &mut self.transaction; + + let periodic_task = AsyncQueue::::insert_periodic_task_query( + transaction, + metadata, + timestamp, + period, + ) + .await?; + + Ok(periodic_task) + } + + async fn fetch_periodic_tasks( + &mut self, + error_margin_seconds: i64, + ) -> Result>, AsyncQueueError> { + let transaction = &mut self.transaction; + + let periodic_task = + AsyncQueue::::fetch_periodic_tasks_query(transaction, error_margin_seconds) + .await?; + + Ok(periodic_task) + } async fn remove_all_tasks(&mut self) -> Result { let transaction = &mut self.transaction; @@ -251,11 +327,18 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { - pub async fn connect(uri: impl ToString, tls: Tls) -> Result { + pub async fn connect( + uri: impl ToString, + tls: Tls, + duplicated_tasks: bool, + ) -> Result { let manager = PostgresConnectionManager::new_from_stringlike(uri, tls)?; let pool = Pool::builder().build(manager).await?; - Ok(Self { pool }) + Ok(Self { + pool, + duplicated_tasks, + }) } pub async fn remove_all_tasks_query( @@ -302,14 +385,14 @@ where pub async fn fetch_and_touch_task_query( transaction: &mut Transaction<'_>, - task_type: &Option, + task_type: Option, ) -> Result, AsyncQueueError> { let task_type = match task_type { Some(passed_task_type) => passed_task_type, - None => DEFAULT_TASK_TYPE, + None => DEFAULT_TASK_TYPE.to_string(), }; - let task = match Self::get_task_type_query(transaction, task_type).await { + let task = match Self::get_task_type_query(transaction, &task_type).await { Ok(some_task) => Some(some_task), Err(_) => None, }; @@ -362,7 +445,59 @@ where let task = Self::row_to_task(row); Ok(task) } + pub async fn schedule_next_task_query( + transaction: &mut Transaction<'_>, + periodic_task: PeriodicTask, + ) -> Result { + let updated_at = Utc::now(); + let scheduled_at = updated_at + Duration::seconds(periodic_task.period_in_seconds.into()); + let row: Row = transaction + .query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at]) + .await?; + + let periodic_task = Self::row_to_periodic_task(row); + Ok(periodic_task) + } + pub async fn insert_periodic_task_query( + transaction: &mut Transaction<'_>, + metadata: serde_json::Value, + timestamp: DateTime, + period: i32, + ) -> Result { + let row: Row = transaction + .query_one( + INSERT_PERIODIC_TASK_QUERY, + &[&metadata, ×tamp, &period], + ) + .await?; + let periodic_task = Self::row_to_periodic_task(row); + Ok(periodic_task) + } + + pub async fn fetch_periodic_tasks_query( + transaction: &mut Transaction<'_>, + error_margin_seconds: i64, + ) -> Result>, AsyncQueueError> { + let current_time = Utc::now(); + + let low_limit = current_time - Duration::seconds(error_margin_seconds); + let high_limit = current_time + Duration::seconds(error_margin_seconds); + let rows: Vec = transaction + .query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit]) + .await?; + + let periodic_tasks: Vec = rows + .into_iter() + .map(|row| Self::row_to_periodic_task(row)) + .collect(); + + if periodic_tasks.is_empty() { + Ok(None) + } else { + Ok(Some(periodic_tasks)) + } + } pub async fn execute_query( transaction: &mut Transaction<'_>, query: &str, @@ -382,6 +517,49 @@ where Ok(result) } + pub async fn insert_task_if_not_exist_query( + transaction: &mut Transaction<'_>, + metadata: serde_json::Value, + task_type: &str, + ) -> Result { + match Self::find_task_by_metadata_query(transaction, &metadata).await { + Some(task) => Ok(task), + None => Self::insert_task_query(transaction, metadata, task_type).await, + } + } + pub async fn find_task_by_metadata_query( + transaction: &mut Transaction<'_>, + metadata: &serde_json::Value, + ) -> Option { + let result = transaction + .query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata]) + .await; + + match result { + Ok(row) => Some(Self::row_to_task(row)), + Err(_) => None, + } + } + fn row_to_periodic_task(row: Row) -> PeriodicTask { + let id: Uuid = row.get("id"); + let metadata: serde_json::Value = row.get("metadata"); + let period_in_seconds: i32 = row.get("period_in_seconds"); + let scheduled_at: Option> = match row.try_get("scheduled_at") { + Ok(datetime) => Some(datetime), + Err(_) => None, + }; + let created_at: DateTime = row.get("created_at"); + let updated_at: DateTime = row.get("updated_at"); + + PeriodicTask::builder() + .id(id) + .metadata(metadata) + .period_in_seconds(period_in_seconds) + .scheduled_at(scheduled_at) + .created_at(created_at) + .updated_at(updated_at) + .build() + } fn row_to_task(row: Row) -> Task { let id: Uuid = row.get("id"); let metadata: serde_json::Value = row.get("metadata"); @@ -416,7 +594,7 @@ where { async fn fetch_and_touch_task( &mut self, - task_type: &Option, + task_type: Option, ) -> Result, AsyncQueueError> { let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; @@ -436,13 +614,63 @@ where let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; - let task = Self::insert_task_query(&mut transaction, metadata, task_type).await?; + let task: Task = if self.duplicated_tasks { + Self::insert_task_query(&mut transaction, metadata, task_type).await? + } else { + Self::insert_task_if_not_exist_query(&mut transaction, metadata, task_type).await? + }; transaction.commit().await?; Ok(task) } + async fn insert_periodic_task( + &mut self, + metadata: serde_json::Value, + timestamp: DateTime, + period: i32, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let periodic_task = + Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?; + + transaction.commit().await?; + + Ok(periodic_task) + } + + async fn schedule_next_task( + &mut self, + periodic_task: PeriodicTask, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?; + + transaction.commit().await?; + + Ok(periodic_task) + } + + async fn fetch_periodic_tasks( + &mut self, + error_margin_seconds: i64, + ) -> Result>, AsyncQueueError> { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let periodic_task = + Self::fetch_periodic_tasks_query(&mut transaction, error_margin_seconds).await?; + + transaction.commit().await?; + + Ok(periodic_task) + } + async fn remove_all_tasks(&mut self) -> Result { let mut connection = self.pool.get().await?; let mut transaction = connection.transaction().await?; @@ -538,7 +766,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; @@ -557,7 +785,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; @@ -586,7 +814,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; @@ -613,7 +841,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; @@ -645,7 +873,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; @@ -665,7 +893,7 @@ mod async_queue_tests { assert_eq!(Some(2), number); assert_eq!(Some("AsyncTask"), type_task); - let task = test.fetch_and_touch_task(&None).await.unwrap().unwrap(); + let task = test.fetch_and_touch_task(None).await.unwrap().unwrap(); let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); @@ -674,7 +902,7 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = test.fetch_and_touch_task(&None).await.unwrap().unwrap(); + let task = test.fetch_and_touch_task(None).await.unwrap().unwrap(); let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -691,7 +919,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncTask { number: 1 }).await; diff --git a/src/asynk/async_scheduler.rs b/src/asynk/async_scheduler.rs new file mode 100644 index 0000000..e369f27 --- /dev/null +++ b/src/asynk/async_scheduler.rs @@ -0,0 +1,222 @@ +use crate::asynk::async_queue::AsyncQueueable; +use crate::asynk::async_queue::PeriodicTask; +use crate::asynk::AsyncRunnable; +use crate::asynk::Error; +use async_recursion::async_recursion; +use log::error; +use std::time::Duration; +use tokio::time::sleep; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder)] +pub struct Scheduler<'a> { + #[builder(setter(into))] + pub check_period: u64, + #[builder(setter(into))] + pub error_margin_seconds: u64, + #[builder(setter(into))] + pub queue: &'a mut dyn AsyncQueueable, + #[builder(default = 0, setter(into))] + pub number_of_restarts: u32, +} + +impl<'a> Scheduler<'a> { + #[async_recursion(?Send)] + pub async fn start(&mut self) -> Result<(), Error> { + let task_res = self.schedule_loop().await; + + sleep(Duration::from_secs(1)).await; + + match task_res { + Err(err) => { + error!( + "Scheduler failed, restarting {:?}. Number of restarts {}", + err, self.number_of_restarts + ); + self.number_of_restarts += 1; + self.start().await + } + Ok(_) => { + error!( + "Scheduler stopped. restarting. Number of restarts {}", + self.number_of_restarts + ); + self.number_of_restarts += 1; + self.start().await + } + } + } + + pub async fn schedule_loop(&mut self) -> Result<(), Error> { + let sleep_duration = Duration::from_secs(self.check_period); + + loop { + self.schedule().await?; + + sleep(sleep_duration).await; + } + } + + pub async fn schedule(&mut self) -> Result<(), Error> { + if let Some(tasks) = self + .queue + .fetch_periodic_tasks(self.error_margin_seconds as i64) + .await? + { + for task in tasks { + self.process_task(task).await?; + } + }; + Ok(()) + } + + async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { + match task.scheduled_at { + None => { + self.queue.schedule_next_task(task).await?; + } + Some(_) => { + let metadata = task.metadata.clone(); + + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + self.queue + .insert_task(metadata, &(*actual_task).task_type()) + .await?; + + self.queue.schedule_next_task(task).await?; + } + } + Ok(()) + } + + #[cfg(test)] + async fn schedule_test(&mut self) -> Result<(), Error> { + let sleep_duration = Duration::from_secs(self.check_period); + + loop { + match self + .queue + .fetch_periodic_tasks(self.error_margin_seconds as i64) + .await? + { + Some(tasks) => { + for task in tasks { + self.process_task(task).await?; + } + + return Ok(()); + } + None => { + sleep(sleep_duration).await; + } + }; + } + } +} + +#[cfg(test)] +mod async_scheduler_tests { + use super::Scheduler; + use crate::asynk::async_queue::AsyncQueueTest; + use crate::asynk::async_queue::AsyncQueueable; + use crate::asynk::async_queue::PeriodicTask; + use crate::asynk::AsyncRunnable; + use crate::asynk::Error; + use async_trait::async_trait; + use bb8_postgres::bb8::Pool; + use bb8_postgres::tokio_postgres::NoTls; + use bb8_postgres::PostgresConnectionManager; + use chrono::DateTime; + use chrono::Duration as OtherDuration; + use chrono::Utc; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + struct AsyncScheduledTask { + pub number: u16, + } + + #[typetag::serde] + #[async_trait(?Send)] + impl AsyncRunnable for AsyncScheduledTask { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + Ok(()) + } + fn task_type(&self) -> String { + "schedule".to_string() + } + } + + #[tokio::test] + async fn schedules_tasks() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let schedule_in_future = Utc::now() + OtherDuration::seconds(5); + + let _periodic_task = insert_periodic_task( + &mut test, + &AsyncScheduledTask { number: 1 }, + schedule_in_future, + 10, + ) + .await; + + let check_period: u64 = 1; + let error_margin_seconds: u64 = 2; + + let mut scheduler = Scheduler::builder() + .check_period(check_period) + .error_margin_seconds(error_margin_seconds) + .queue(&mut test as &mut dyn AsyncQueueable) + .build(); + // Scheduler start tricky not loop :) + scheduler.schedule_test().await.unwrap(); + + let task = scheduler + .queue + .fetch_and_touch_task(Some("schedule".to_string())) + .await + .unwrap() + .unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + let runnable_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + assert_eq!("schedule", runnable_task.task_type()); + assert_eq!(Some("AsyncScheduledTask"), type_task); + assert_eq!(Some(1), number); + } + + async fn insert_periodic_task( + test: &mut AsyncQueueTest<'_>, + task: &dyn AsyncRunnable, + timestamp: DateTime, + period_in_seconds: i32, + ) -> PeriodicTask { + let metadata = serde_json::to_value(task).unwrap(); + + test.insert_periodic_task(metadata, timestamp, period_in_seconds) + .await + .unwrap() + } + + async fn pool() -> Pool> { + let pg_mgr = PostgresConnectionManager::new_from_stringlike( + "postgres://postgres:postgres@localhost/fang", + NoTls, + ) + .unwrap(); + + Pool::builder().build(pg_mgr).await.unwrap() + } +} diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index d151b55..71ec481 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -13,18 +13,20 @@ use typed_builder::TypedBuilder; pub struct AsyncWorker<'a> { #[builder(setter(into))] pub queue: &'a mut dyn AsyncQueueable, - #[builder(default=DEFAULT_TASK_TYPE.to_string() , setter(into))] + #[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))] pub task_type: String, #[builder(default, setter(into))] pub sleep_params: SleepParams, #[builder(default, setter(into))] pub retention_mode: RetentionMode, } + impl<'a> AsyncWorker<'a> { pub async fn run(&mut self, task: Task) -> Result<(), Error> { let result = self.execute_task(task).await; self.finalize_task(result).await } + async fn execute_task(&mut self, task: Task) -> Result { let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); @@ -35,6 +37,7 @@ impl<'a> AsyncWorker<'a> { Err(error) => Err((task, error.description)), } } + async fn finalize_task(&mut self, result: Result) -> Result<(), Error> { match self.retention_mode { RetentionMode::KeepAll => match result { @@ -71,16 +74,18 @@ impl<'a> AsyncWorker<'a> { }, } } + pub async fn sleep(&mut self) { self.sleep_params.maybe_increase_sleep_period(); tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await; } + pub async fn run_tasks(&mut self) -> Result<(), Error> { loop { match self .queue - .fetch_and_touch_task(&Some(self.task_type.clone())) + .fetch_and_touch_task(Some(self.task_type.clone())) .await { Ok(Some(task)) => { @@ -99,12 +104,13 @@ impl<'a> AsyncWorker<'a> { }; } } + #[cfg(test)] pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> { loop { match self .queue - .fetch_and_touch_task(&Some(self.task_type.clone())) + .fetch_and_touch_task(Some(self.task_type.clone())) .await { Ok(Some(task)) => { @@ -204,7 +210,7 @@ mod async_worker_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await; let id = task.id; @@ -215,7 +221,7 @@ mod async_worker_tests { .build(); worker.run(task).await.unwrap(); - let task_finished = test.get_task_by_id(id).await.unwrap(); + let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); assert_eq!(FangTaskState::Finished, task_finished.state); test.transaction.rollback().await.unwrap(); @@ -226,7 +232,7 @@ mod async_worker_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await; let id = task.id; @@ -237,7 +243,7 @@ mod async_worker_tests { .build(); worker.run(task).await.unwrap(); - let task_finished = test.get_task_by_id(id).await.unwrap(); + let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); assert_eq!(FangTaskState::Failed, task_finished.state); @@ -253,7 +259,7 @@ mod async_worker_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await; let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await; @@ -270,9 +276,9 @@ mod async_worker_tests { .build(); worker.run_tasks_until_none().await.unwrap(); - let task1 = test.get_task_by_id(id1).await.unwrap(); - let task12 = test.get_task_by_id(id12).await.unwrap(); - let task2 = test.get_task_by_id(id2).await.unwrap(); + let task1 = test.find_task_by_id(id1).await.unwrap(); + let task12 = test.find_task_by_id(id12).await.unwrap(); + let task2 = test.find_task_by_id(id2).await.unwrap(); assert_eq!(id1, task1.id); assert_eq!(id12, task12.id); @@ -288,7 +294,7 @@ mod async_worker_tests { let mut connection = pool.get().await.unwrap(); let transaction = connection.transaction().await.unwrap(); - let mut test = AsyncQueueTest { transaction }; + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); let task1 = insert_task(&mut test, &AsyncTaskType1 {}).await; let task12 = insert_task(&mut test, &AsyncTaskType1 {}).await; @@ -305,13 +311,13 @@ mod async_worker_tests { worker.run_tasks_until_none().await.unwrap(); let task = test - .fetch_and_touch_task(&Some("type1".to_string())) + .fetch_and_touch_task(Some("type1".to_string())) .await .unwrap(); assert_eq!(None, task); let task2 = test - .fetch_and_touch_task(&Some("type2".to_string())) + .fetch_and_touch_task(Some("type2".to_string())) .await .unwrap() .unwrap(); diff --git a/src/asynk/mod.rs b/src/asynk/mod.rs index bf6e0cf..45de86c 100644 --- a/src/asynk/mod.rs +++ b/src/asynk/mod.rs @@ -1,5 +1,6 @@ pub mod async_queue; pub mod async_runnable; +pub mod async_scheduler; pub mod async_worker; pub use async_runnable::AsyncRunnable; pub use async_runnable::Error; diff --git a/src/asynk/queries/fetch_periodic_tasks.sql b/src/asynk/queries/fetch_periodic_tasks.sql new file mode 100644 index 0000000..5d7529e --- /dev/null +++ b/src/asynk/queries/fetch_periodic_tasks.sql @@ -0,0 +1 @@ +SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL diff --git a/src/asynk/queries/find_periodic_task_by_id.sql b/src/asynk/queries/find_periodic_task_by_id.sql new file mode 100644 index 0000000..67c0316 --- /dev/null +++ b/src/asynk/queries/find_periodic_task_by_id.sql @@ -0,0 +1 @@ +SELECT * FROM fang_periodic_tasks WHERE id = $1 diff --git a/src/asynk/queries/get_task_by_id.sql b/src/asynk/queries/find_task_by_id.sql similarity index 100% rename from src/asynk/queries/get_task_by_id.sql rename to src/asynk/queries/find_task_by_id.sql diff --git a/src/asynk/queries/find_task_by_metadata.sql b/src/asynk/queries/find_task_by_metadata.sql new file mode 100644 index 0000000..0e38bfa --- /dev/null +++ b/src/asynk/queries/find_task_by_metadata.sql @@ -0,0 +1 @@ +SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1 diff --git a/src/asynk/queries/insert_periodic_task.sql b/src/asynk/queries/insert_periodic_task.sql index d9b52b3..6f2eaaf 100644 --- a/src/asynk/queries/insert_periodic_task.sql +++ b/src/asynk/queries/insert_periodic_task.sql @@ -1 +1 @@ -INSERT INTO "fang_periodic_tasks" ("metadata", "period_in_seconds") VALUES ($1, $2) +INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_seconds") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at diff --git a/src/asynk/queries/schedule_next_task.sql b/src/asynk/queries/schedule_next_task.sql index 3e0495c..95b8138 100644 --- a/src/asynk/queries/schedule_next_task.sql +++ b/src/asynk/queries/schedule_next_task.sql @@ -1 +1 @@ -UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 +UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at diff --git a/src/blocking/executor.rs b/src/blocking/executor.rs index 353bedc..1e7d17b 100644 --- a/src/blocking/executor.rs +++ b/src/blocking/executor.rs @@ -16,6 +16,7 @@ pub struct Executor { pub retention_mode: RetentionMode, shared_state: Option, } + #[derive(Debug)] pub struct Error { pub description: String,