diff --git a/Cargo.toml b/Cargo.toml index c9a3918..ff7b4a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,9 @@ blocking = ["diesel", "diesel-derive-enum", "dotenv"] asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"] [dependencies] +cron = "0.11" +hex = "0.4" +sha2 = "0.10" chrono = "0.4" log = "0.4" serde = "1" diff --git a/fang_examples/asynk/simple_async_cron_worker/Cargo.toml b/fang_examples/asynk/simple_async_cron_worker/Cargo.toml new file mode 100644 index 0000000..86b645f --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "simple_async_cron_worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../../" , features = ["asynk"]} +env_logger = "0.9.0" +log = "0.4.0" +tokio = { version = "1", features = ["full"] } diff --git a/fang_examples/asynk/simple_async_cron_worker/src/lib.rs b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs new file mode 100644 index 0000000..4ac1e8b --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs @@ -0,0 +1,33 @@ +use fang::async_trait; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_runnable::Error; +use fang::asynk::async_runnable::Scheduled; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::AsyncRunnable; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyCronTask {} + +#[async_trait] +#[typetag::serde] +impl AsyncRunnable for MyCronTask { + async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> { + log::info!("CRON!!!!!!!!!!!!!!!",); + + Ok(()) + } + + fn cron(&self) -> Option { + // sec min hour day of month month day of week year + // be careful works only with UTC hour. + // https://www.timeanddate.com/worldclock/timezone/utc + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + + fn uniq(&self) -> bool { + true + } +} diff --git a/fang_examples/asynk/simple_async_cron_worker/src/main.rs b/fang_examples/asynk/simple_async_cron_worker/src/main.rs new file mode 100644 index 0000000..b3c0021 --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/src/main.rs @@ -0,0 +1,41 @@ +use fang::asynk::async_queue::AsyncQueue; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_worker_pool::AsyncWorkerPool; +use fang::AsyncRunnable; +use fang::NoTls; +use simple_async_cron_worker::MyCronTask; +use std::time::Duration; + +#[tokio::main] +async fn main() { + env_logger::init(); + + log::info!("Starting..."); + let max_pool_size: u32 = 3; + let mut queue = AsyncQueue::builder() + .uri("postgres://postgres:postgres@localhost/fang") + .max_pool_size(max_pool_size) + .build(); + + queue.connect(NoTls).await.unwrap(); + log::info!("Queue connected..."); + + let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() + .number_of_workers(10_u32) + .queue(queue.clone()) + .build(); + + log::info!("Pool created ..."); + + pool.start().await; + log::info!("Workers started ..."); + + let task = MyCronTask {}; + + queue + .schedule_task(&task as &dyn AsyncRunnable) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(100)).await; +} diff --git a/fang_examples/simple_async_worker/Cargo.toml b/fang_examples/asynk/simple_async_worker/Cargo.toml similarity index 83% rename from fang_examples/simple_async_worker/Cargo.toml rename to fang_examples/asynk/simple_async_worker/Cargo.toml index cf56b40..8248aed 100644 --- a/fang_examples/simple_async_worker/Cargo.toml +++ b/fang_examples/asynk/simple_async_worker/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fang = { path = "../../" , features = ["asynk"]} +fang = { path = "../../../" , features = ["asynk"]} env_logger = "0.9.0" log = "0.4.0" tokio = { version = "1", features = ["full"] } diff --git a/fang_examples/simple_async_worker/src/lib.rs b/fang_examples/asynk/simple_async_worker/src/lib.rs similarity index 100% rename from fang_examples/simple_async_worker/src/lib.rs rename to fang_examples/asynk/simple_async_worker/src/lib.rs diff --git a/fang_examples/simple_async_worker/src/main.rs b/fang_examples/asynk/simple_async_worker/src/main.rs similarity index 95% rename from fang_examples/simple_async_worker/src/main.rs rename to fang_examples/asynk/simple_async_worker/src/main.rs index ff58e8c..cbe9446 100644 --- a/fang_examples/simple_async_worker/src/main.rs +++ b/fang_examples/asynk/simple_async_worker/src/main.rs @@ -12,11 +12,10 @@ async fn main() { env_logger::init(); log::info!("Starting..."); - let max_pool_size: u32 = 2; + let max_pool_size: u32 = 3; let mut queue = AsyncQueue::builder() .uri("postgres://postgres:postgres@localhost/fang") .max_pool_size(max_pool_size) - .duplicated_tasks(true) .build(); queue.connect(NoTls).await.unwrap(); @@ -40,6 +39,7 @@ async fn main() { .insert_task(&task1 as &dyn AsyncRunnable) .await .unwrap(); + queue .insert_task(&task2 as &dyn AsyncRunnable) .await diff --git a/migrations/2022-08-20-151615_create_fang_tasks/up.sql b/migrations/2022-08-20-151615_create_fang_tasks/up.sql index d8f9c83..ebebb6e 100644 --- a/migrations/2022-08-20-151615_create_fang_tasks/up.sql +++ b/migrations/2022-08-20-151615_create_fang_tasks/up.sql @@ -8,7 +8,6 @@ CREATE TABLE fang_tasks ( error_message TEXT, state fang_task_state DEFAULT 'new' NOT NULL, task_type VARCHAR DEFAULT 'common' NOT NULL, - periodic BOOLEAN DEFAULT FALSE, uniq_hash CHAR(64), scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index f51b2d8..715b21d 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,3 +1,4 @@ +use crate::async_runnable::Scheduled::*; use crate::asynk::async_runnable::AsyncRunnable; use crate::asynk::async_runnable::Error as FangError; use async_trait::async_trait; @@ -9,10 +10,11 @@ use bb8_postgres::tokio_postgres::Socket; use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; use chrono::DateTime; -use chrono::Duration; use chrono::Utc; +use cron::Schedule; use postgres_types::{FromSql, ToSql}; -use std::time::Duration as StdDuration; +use sha2::{Digest, Sha256}; +use std::str::FromStr; use thiserror::Error; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -21,21 +23,15 @@ use uuid::Uuid; use bb8_postgres::tokio_postgres::tls::NoTls; const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql"); -const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql"); -const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql"); +const INSERT_TASK_UNIQ_QUERY: &str = include_str!("queries/insert_task_uniq.sql"); const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql"); const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql"); const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); -const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql"); -const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql"); - -#[cfg(test)] +const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql"); const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql"); -#[cfg(test)] -const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql"); pub const DEFAULT_TASK_TYPE: &str = "common"; @@ -71,25 +67,23 @@ pub struct Task { #[builder(setter(into))] pub task_type: String, #[builder(setter(into))] + pub uniq_hash: Option, + #[builder(setter(into))] + pub scheduled_at: DateTime, + #[builder(setter(into))] pub created_at: DateTime, #[builder(setter(into))] pub updated_at: DateTime, } -#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] -pub struct PeriodicTask { - #[builder(setter(into))] - pub id: Uuid, - #[builder(setter(into))] - pub metadata: serde_json::Value, - #[builder(setter(into))] - pub period_in_millis: i64, - #[builder(setter(into))] - pub scheduled_at: Option>, - #[builder(setter(into))] - pub created_at: DateTime, - #[builder(setter(into))] - pub updated_at: DateTime, +#[derive(Debug, Error)] +pub enum CronError { + #[error(transparent)] + LibraryError(#[from] cron::error::Error), + #[error("You have to implement method `cron()` in your AsyncRunnable")] + TaskNotSchedulableError, + #[error("No timestamps match with this cron pattern")] + NoTimestampsError, } #[derive(Debug, Error)] @@ -100,6 +94,8 @@ pub enum AsyncQueueError { PgError(#[from] bb8_postgres::tokio_postgres::Error), #[error(transparent)] SerdeError(#[from] serde_json::Error), + #[error(transparent)] + CronError(#[from] CronError), #[error("returned invalid result (expected {expected:?}, found {found:?})")] ResultError { expected: u64, found: u64 }, #[error( @@ -110,6 +106,12 @@ pub enum AsyncQueueError { TimeError, } +impl From for AsyncQueueError { + fn from(error: cron::error::Error) -> Self { + AsyncQueueError::CronError(CronError::LibraryError(error)) + } +} + impl From for FangError { fn from(error: AsyncQueueError) -> Self { let message = format!("{:?}", error); @@ -127,12 +129,15 @@ pub trait AsyncQueueable: Send { ) -> Result, AsyncQueueError>; async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result; + async fn remove_all_tasks(&mut self) -> Result; async fn remove_task(&mut self, task: Task) -> Result; async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + async fn find_task_by_id(&mut self, id: Uuid) -> Result; + async fn update_task_state( &mut self, task: Task, @@ -142,22 +147,7 @@ pub trait AsyncQueueable: Send { async fn fail_task(&mut self, task: Task, error_message: &str) -> Result; - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError>; - - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result; - - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result; + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; } #[derive(TypedBuilder, Debug, Clone)] @@ -174,8 +164,6 @@ where uri: String, #[builder(setter(into))] max_pool_size: u32, - #[builder(default = false, setter(into))] - duplicated_tasks: bool, #[builder(default = false, setter(skip))] connected: bool, } @@ -185,47 +173,24 @@ where pub struct AsyncQueueTest<'a> { #[builder(setter(into))] pub transaction: Transaction<'a>, - #[builder(default = false, setter(into))] - pub duplicated_tasks: bool, -} - -#[cfg(test)] -impl<'a> AsyncQueueTest<'a> { - pub async fn find_task_by_id(&mut self, id: Uuid) -> Result { - let row: Row = self - .transaction - .query_one(FIND_TASK_BY_ID_QUERY, &[&id]) - .await?; - - let task = AsyncQueue::::row_to_task(row); - Ok(task) - } - pub async fn find_periodic_task_by_id( - &mut self, - id: Uuid, - ) -> Result { - let row: Row = self - .transaction - .query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id]) - .await?; - - let task = AsyncQueue::::row_to_periodic_task(row); - Ok(task) - } } #[cfg(test)] #[async_trait] impl AsyncQueueable for AsyncQueueTest<'_> { + async fn find_task_by_id(&mut self, id: Uuid) -> Result { + let transaction = &mut self.transaction; + + AsyncQueue::::find_task_by_id_query(transaction, id).await + } + async fn fetch_and_touch_task( &mut self, task_type: Option, ) -> Result, AsyncQueueError> { let transaction = &mut self.transaction; - let task = AsyncQueue::::fetch_and_touch_task_query(transaction, task_type).await?; - - Ok(task) + AsyncQueue::::fetch_and_touch_task_query(transaction, task_type).await } async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { @@ -233,84 +198,86 @@ impl AsyncQueueable for AsyncQueueTest<'_> { let metadata = serde_json::to_value(task)?; - let task: Task = if self.duplicated_tasks { - AsyncQueue::::insert_task_query(transaction, metadata, &task.task_type()).await? + let task: Task = if !task.uniq() { + AsyncQueue::::insert_task_query( + transaction, + metadata, + &task.task_type(), + Utc::now(), + ) + .await? } else { AsyncQueue::::insert_task_if_not_exist_query( transaction, metadata, &task.task_type(), + Utc::now(), ) .await? }; Ok(task) } - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result { - let transaction = &mut self.transaction; - - let periodic_task = - AsyncQueue::::schedule_next_task_query(transaction, periodic_task).await?; - - Ok(periodic_task) - } - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result { + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { let transaction = &mut self.transaction; let metadata = serde_json::to_value(task)?; - let periodic_task = AsyncQueue::::insert_periodic_task_query( - transaction, - metadata, - timestamp, - period, - ) - .await?; + let scheduled_at = match task.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); - Ok(periodic_task) - } + iterator + .next() + .ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, + None => { + return Err(AsyncQueueError::CronError( + CronError::TaskNotSchedulableError, + )); + } + }; - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - let transaction = &mut self.transaction; + let task: Task = if !task.uniq() { + AsyncQueue::::insert_task_query( + transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + } else { + AsyncQueue::::insert_task_if_not_exist_query( + transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + }; - let periodic_task = - AsyncQueue::::fetch_periodic_tasks_query(transaction, error_margin).await?; - - Ok(periodic_task) + Ok(task) } async fn remove_all_tasks(&mut self) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_all_tasks_query(transaction).await?; - - Ok(result) + AsyncQueue::::remove_all_tasks_query(transaction).await } async fn remove_task(&mut self, task: Task) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_task_query(transaction, task).await?; - - Ok(result) + AsyncQueue::::remove_task_query(transaction, task).await } async fn remove_tasks_type(&mut self, task_type: &str) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_tasks_type_query(transaction, task_type).await?; - - Ok(result) + AsyncQueue::::remove_tasks_type_query(transaction, task_type).await } async fn update_task_state( @@ -320,9 +287,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> { ) -> Result { let transaction = &mut self.transaction; - let task = AsyncQueue::::update_task_state_query(transaction, task, state).await?; - - Ok(task) + AsyncQueue::::update_task_state_query(transaction, task, state).await } async fn fail_task( @@ -332,9 +297,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> { ) -> Result { let transaction = &mut self.transaction; - let task = AsyncQueue::::fail_task_query(transaction, task, error_message).await?; - - Ok(task) + AsyncQueue::::fail_task_query(transaction, task, error_message).await } } @@ -384,6 +347,16 @@ where Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await } + async fn find_task_by_id_query( + transaction: &mut Transaction<'_>, + id: Uuid, + ) -> Result { + let row: Row = transaction.query_one(FIND_TASK_BY_ID_QUERY, &[&id]).await?; + + let task = Self::row_to_task(row); + Ok(task) + } + async fn fail_task_query( transaction: &mut Transaction<'_>, task: Task, @@ -435,7 +408,7 @@ where task_type: &str, ) -> Result { let row: Row = transaction - .query_one(FETCH_TASK_TYPE_QUERY, &[&task_type]) + .query_one(FETCH_TASK_TYPE_QUERY, &[&task_type, &Utc::now()]) .await?; let task = Self::row_to_task(row); @@ -461,72 +434,32 @@ where transaction: &mut Transaction<'_>, metadata: serde_json::Value, task_type: &str, + scheduled_at: DateTime, ) -> Result { let row: Row = transaction - .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type]) + .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type, &scheduled_at]) .await?; let task = Self::row_to_task(row); Ok(task) } - async fn schedule_next_task_query( - transaction: &mut Transaction<'_>, - periodic_task: PeriodicTask, - ) -> Result { - let updated_at = Utc::now(); - let scheduled_at = updated_at + Duration::milliseconds(periodic_task.period_in_millis); - - let row: Row = transaction - .query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at]) - .await?; - - let periodic_task = Self::row_to_periodic_task(row); - Ok(periodic_task) - } - - async fn insert_periodic_task_query( + async fn insert_task_uniq_query( transaction: &mut Transaction<'_>, metadata: serde_json::Value, - timestamp: DateTime, - period: i64, - ) -> Result { + task_type: &str, + scheduled_at: DateTime, + ) -> Result { + let uniq_hash = Self::calculate_hash(metadata.to_string()); + let row: Row = transaction .query_one( - INSERT_PERIODIC_TASK_QUERY, - &[&metadata, ×tamp, &period], + INSERT_TASK_UNIQ_QUERY, + &[&metadata, &task_type, &uniq_hash, &scheduled_at], ) .await?; - let periodic_task = Self::row_to_periodic_task(row); - Ok(periodic_task) - } - async fn fetch_periodic_tasks_query( - transaction: &mut Transaction<'_>, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - let current_time = Utc::now(); - - let margin: Duration = match Duration::from_std(error_margin) { - Ok(value) => Ok(value), - Err(_) => Err(AsyncQueueError::TimeError), - }?; - - let low_limit = current_time - margin; - let high_limit = current_time + margin; - let rows: Vec = transaction - .query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit]) - .await?; - - let periodic_tasks: Vec = rows - .into_iter() - .map(|row| Self::row_to_periodic_task(row)) - .collect(); - - if periodic_tasks.is_empty() { - Ok(None) - } else { - Ok(Some(periodic_tasks)) - } + let task = Self::row_to_task(row); + Ok(task) } async fn execute_query( @@ -552,19 +485,31 @@ where transaction: &mut Transaction<'_>, metadata: serde_json::Value, task_type: &str, + scheduled_at: DateTime, ) -> Result { - match Self::find_task_by_metadata_query(transaction, &metadata).await { + match Self::find_task_by_uniq_hash_query(transaction, &metadata).await { Some(task) => Ok(task), - None => Self::insert_task_query(transaction, metadata, task_type).await, + None => { + Self::insert_task_uniq_query(transaction, metadata, task_type, scheduled_at).await + } } } - async fn find_task_by_metadata_query( + fn calculate_hash(json: String) -> String { + let mut hasher = Sha256::new(); + hasher.update(json.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + } + + async fn find_task_by_uniq_hash_query( transaction: &mut Transaction<'_>, metadata: &serde_json::Value, ) -> Option { + let uniq_hash = Self::calculate_hash(metadata.to_string()); + let result = transaction - .query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata]) + .query_one(FIND_TASK_BY_UNIQ_HASH_QUERY, &[&uniq_hash]) .await; match result { @@ -573,47 +518,29 @@ where } } - fn row_to_periodic_task(row: Row) -> PeriodicTask { - let id: Uuid = row.get("id"); - let metadata: serde_json::Value = row.get("metadata"); - let period_in_millis: i64 = row.get("period_in_millis"); - let scheduled_at: Option> = match row.try_get("scheduled_at") { - Ok(datetime) => Some(datetime), - Err(_) => None, - }; - let created_at: DateTime = row.get("created_at"); - let updated_at: DateTime = row.get("updated_at"); - - PeriodicTask::builder() - .id(id) - .metadata(metadata) - .period_in_millis(period_in_millis) - .scheduled_at(scheduled_at) - .created_at(created_at) - .updated_at(updated_at) - .build() - } - fn row_to_task(row: Row) -> Task { let id: Uuid = row.get("id"); let metadata: serde_json::Value = row.get("metadata"); - let error_message: Option = match row.try_get("error_message") { - Ok(error_message) => Some(error_message), - Err(_) => None, - }; + + let error_message: Option = row.try_get("error_message").ok(); + + let uniq_hash: Option = row.try_get("uniq_hash").ok(); let state: FangTaskState = row.get("state"); let task_type: String = row.get("task_type"); let created_at: DateTime = row.get("created_at"); let updated_at: DateTime = row.get("updated_at"); + let scheduled_at: DateTime = row.get("scheduled_at"); Task::builder() .id(id) .metadata(metadata) .error_message(error_message) .state(state) + .uniq_hash(uniq_hash) .task_type(task_type) .created_at(created_at) .updated_at(updated_at) + .scheduled_at(scheduled_at) .build() } } @@ -626,6 +553,17 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { + async fn find_task_by_id(&mut self, id: Uuid) -> Result { + let mut connection = self.pool.as_ref().unwrap().get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::find_task_by_id_query(&mut transaction, id).await?; + + transaction.commit().await?; + + Ok(task) + } + async fn fetch_and_touch_task( &mut self, task_type: Option, @@ -648,11 +586,17 @@ where let metadata = serde_json::to_value(task)?; - let task: Task = if self.duplicated_tasks { - Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await? - } else { - Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type()) + let task: Task = if !task.uniq() { + Self::insert_task_query(&mut transaction, metadata, &task.task_type(), Utc::now()) .await? + } else { + Self::insert_task_if_not_exist_query( + &mut transaction, + metadata, + &task.task_type(), + Utc::now(), + ) + .await? }; transaction.commit().await?; @@ -660,55 +604,44 @@ where Ok(task) } - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result { + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { self.check_if_connection()?; let mut connection = self.pool.as_ref().unwrap().get().await?; let mut transaction = connection.transaction().await?; - let metadata = serde_json::to_value(task)?; - let periodic_task = - Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?; + let scheduled_at = match task.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); + iterator + .next() + .ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, + None => { + return Err(AsyncQueueError::CronError( + CronError::TaskNotSchedulableError, + )); + } + }; + let task: Task = if !task.uniq() { + Self::insert_task_query(&mut transaction, metadata, &task.task_type(), scheduled_at) + .await? + } else { + Self::insert_task_if_not_exist_query( + &mut transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + }; transaction.commit().await?; - - Ok(periodic_task) - } - - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result { - self.check_if_connection()?; - let mut connection = self.pool.as_ref().unwrap().get().await?; - let mut transaction = connection.transaction().await?; - - let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?; - - transaction.commit().await?; - - Ok(periodic_task) - } - - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - self.check_if_connection()?; - let mut connection = self.pool.as_ref().unwrap().get().await?; - let mut transaction = connection.transaction().await?; - - let periodic_task = - Self::fetch_periodic_tasks_query(&mut transaction, error_margin).await?; - - transaction.commit().await?; - - Ok(periodic_task) + Ok(task) } async fn remove_all_tasks(&mut self) -> Result { @@ -784,12 +717,17 @@ mod async_queue_tests { use super::AsyncQueueable; use super::FangTaskState; use super::Task; + use crate::async_runnable::Scheduled; use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; use bb8_postgres::PostgresConnectionManager; + use chrono::DateTime; + use chrono::Duration; + use chrono::SubsecRound; + use chrono::Utc; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -805,6 +743,25 @@ mod async_queue_tests { } } + #[derive(Serialize, Deserialize)] + struct AsyncTaskSchedule { + pub number: u16, + pub datetime: String, + } + + #[typetag::serde] + #[async_trait] + impl AsyncRunnable for AsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + Ok(()) + } + + fn cron(&self) -> Option { + let datetime = self.datetime.parse::>().ok()?; + Some(Scheduled::ScheduleOnce(datetime)) + } + } + #[tokio::test] async fn insert_task_creates_new_task() { let pool = pool().await; @@ -912,6 +869,32 @@ mod async_queue_tests { test.transaction.rollback().await.unwrap(); } + #[tokio::test] + async fn schedule_task_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + + let task = &AsyncTaskSchedule { + number: 1, + datetime: datetime.to_string(), + }; + + let task = test.schedule_task(task).await.unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTaskSchedule"), type_task); + assert_eq!(task.scheduled_at, datetime); + } + #[tokio::test] async fn fetch_and_touch_test() { let pool = pool().await; diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index e14f698..1a0ddd6 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -1,5 +1,7 @@ use crate::asynk::async_queue::AsyncQueueable; use async_trait::async_trait; +use chrono::DateTime; +use chrono::Utc; const COMMON_TYPE: &str = "common"; @@ -8,6 +10,11 @@ pub struct Error { pub description: String, } +pub enum Scheduled { + CronPattern(String), + ScheduleOnce(DateTime), +} + #[typetag::serde(tag = "type")] #[async_trait] pub trait AsyncRunnable: Send + Sync { @@ -16,4 +23,12 @@ pub trait AsyncRunnable: Send + Sync { fn task_type(&self) -> String { COMMON_TYPE.to_string() } + + fn uniq(&self) -> bool { + false + } + + fn cron(&self) -> Option { + None + } } diff --git a/src/asynk/async_scheduler.rs b/src/asynk/async_scheduler.rs deleted file mode 100644 index cb4961c..0000000 --- a/src/asynk/async_scheduler.rs +++ /dev/null @@ -1,260 +0,0 @@ -use crate::asynk::async_queue::AsyncQueueable; -use crate::asynk::async_queue::PeriodicTask; -use crate::asynk::AsyncError as Error; -use crate::asynk::AsyncRunnable; -use async_recursion::async_recursion; -use log::error; -use std::time::Duration; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use typed_builder::TypedBuilder; - -#[derive(TypedBuilder, Clone)] -pub struct Scheduler -where - AQueue: AsyncQueueable + Clone + Sync + 'static, -{ - #[builder(setter(into))] - pub check_period: Duration, - #[builder(setter(into))] - pub error_margin: Duration, - #[builder(setter(into))] - pub queue: AQueue, - #[builder(default = 0, setter(into))] - pub number_of_restarts: u32, -} - -impl Scheduler -where - AQueue: AsyncQueueable + Clone + Sync + 'static, -{ - #[async_recursion(?Send)] - pub async fn start(&mut self) -> Result<(), Error> { - let join_handle: JoinHandle> = self.schedule_loop().await; - - match join_handle.await { - Err(err) => { - error!( - "Scheduler panicked, restarting {:?}. Number of restarts {}", - err, self.number_of_restarts - ); - self.number_of_restarts += 1; - sleep(Duration::from_secs(1)).await; - self.start().await - } - Ok(task_res) => match task_res { - Err(err) => { - error!( - "Scheduler failed, restarting {:?}. Number of restarts {}", - err, self.number_of_restarts - ); - self.number_of_restarts += 1; - self.start().await - } - Ok(_) => { - error!( - "Scheduler stopped. restarting. Number of restarts {}", - self.number_of_restarts - ); - self.number_of_restarts += 1; - self.start().await - } - }, - } - } - - pub async fn schedule_loop(&mut self) -> JoinHandle> { - let mut scheduler = self.clone(); - tokio::spawn(async move { - let sleep_duration = scheduler.check_period; - - loop { - scheduler.schedule().await?; - - sleep(sleep_duration).await; - } - }) - } - - pub async fn schedule(&mut self) -> Result<(), Error> { - if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin).await? { - for task in tasks { - self.process_task(task).await?; - } - }; - Ok(()) - } - - async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { - match task.scheduled_at { - None => { - self.queue.schedule_next_task(task).await?; - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.insert_task(&*actual_task).await?; - - self.queue.schedule_next_task(task).await?; - } - } - Ok(()) - } -} - -#[cfg(test)] -#[derive(TypedBuilder)] -pub struct SchedulerTest<'a> { - #[builder(setter(into))] - pub check_period: Duration, - #[builder(setter(into))] - pub error_margin: Duration, - #[builder(setter(into))] - pub queue: &'a mut dyn AsyncQueueable, - #[builder(default = 0, setter(into))] - pub number_of_restarts: u32, -} - -#[cfg(test)] -impl<'a> SchedulerTest<'a> { - async fn schedule_test(&mut self) -> Result<(), Error> { - let sleep_duration = self.check_period; - - loop { - match self.queue.fetch_periodic_tasks(self.error_margin).await? { - Some(tasks) => { - for task in tasks { - self.process_task(task).await?; - } - - return Ok(()); - } - None => { - sleep(sleep_duration).await; - } - }; - } - } - - async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { - match task.scheduled_at { - None => { - self.queue.schedule_next_task(task).await?; - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.insert_task(&*actual_task).await?; - - self.queue.schedule_next_task(task).await?; - } - } - Ok(()) - } -} - -#[cfg(test)] -mod async_scheduler_tests { - use super::SchedulerTest; - use crate::asynk::async_queue::AsyncQueueTest; - use crate::asynk::async_queue::AsyncQueueable; - use crate::asynk::async_queue::PeriodicTask; - use crate::asynk::AsyncError as Error; - use crate::asynk::AsyncRunnable; - use async_trait::async_trait; - use bb8_postgres::bb8::Pool; - use bb8_postgres::tokio_postgres::NoTls; - use bb8_postgres::PostgresConnectionManager; - use chrono::DateTime; - use chrono::Duration as OtherDuration; - use chrono::Utc; - use serde::{Deserialize, Serialize}; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct AsyncScheduledTask { - pub number: u16, - } - - #[typetag::serde] - #[async_trait] - impl AsyncRunnable for AsyncScheduledTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { - Ok(()) - } - fn task_type(&self) -> String { - "schedule".to_string() - } - } - - #[tokio::test] - async fn schedules_tasks() { - let pool = pool().await; - let mut connection = pool.get().await.unwrap(); - let transaction = connection.transaction().await.unwrap(); - - let mut test = AsyncQueueTest::builder().transaction(transaction).build(); - - let schedule_in_future = Utc::now() + OtherDuration::seconds(5); - - let _periodic_task = insert_periodic_task( - &mut test, - &AsyncScheduledTask { number: 1 }, - schedule_in_future, - 10000, - ) - .await; - - let check_period: u64 = 1; - let error_margin_seconds: u64 = 2; - - let mut scheduler = SchedulerTest::builder() - .check_period(Duration::from_secs(check_period)) - .error_margin(Duration::from_secs(error_margin_seconds)) - .queue(&mut test as &mut dyn AsyncQueueable) - .build(); - // Scheduler start tricky not loop :) - scheduler.schedule_test().await.unwrap(); - - let task = scheduler - .queue - .fetch_and_touch_task(Some("schedule".to_string())) - .await - .unwrap() - .unwrap(); - - let metadata = task.metadata.as_object().unwrap(); - let number = metadata["number"].as_u64(); - let type_task = metadata["type"].as_str(); - - let runnable_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - assert_eq!("schedule", runnable_task.task_type()); - assert_eq!(Some("AsyncScheduledTask"), type_task); - assert_eq!(Some(1), number); - } - - async fn insert_periodic_task( - test: &mut AsyncQueueTest<'_>, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period_in_millis: i64, - ) -> PeriodicTask { - test.insert_periodic_task(task, timestamp, period_in_millis) - .await - .unwrap() - } - - async fn pool() -> Pool> { - let pg_mgr = PostgresConnectionManager::new_from_stringlike( - "postgres://postgres:postgres@localhost/fang", - NoTls, - ) - .unwrap(); - - Pool::builder().build(pg_mgr).await.unwrap() - } -} diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index 39a4605..36b4f34 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -1,3 +1,4 @@ +use crate::async_runnable::Scheduled::*; use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_queue::Task; @@ -27,15 +28,20 @@ impl AsyncWorker where AQueue: AsyncQueueable + Clone + Sync + 'static, { - pub async fn run(&mut self, task: Task) -> Result<(), Error> { - let result = self.execute_task(task).await; + pub async fn run( + &mut self, + task: Task, + actual_task: Box, + ) -> Result<(), Error> { + let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } - async fn execute_task(&mut self, task: Task) -> Result { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - + async fn execute_task( + &mut self, + task: Task, + actual_task: Box, + ) -> Result { let task_result = actual_task.run(&mut self.queue).await; match task_result { Ok(()) => Ok(task), @@ -88,14 +94,24 @@ where pub async fn run_tasks(&mut self) -> Result<(), Error> { loop { + //fetch task match self .queue .fetch_and_touch_task(Some(self.task_type.clone())) .await { Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task).await?; + } self.sleep_params.maybe_reset_sleep_period(); - self.run(task).await? + // run scheduled task + self.run(task, actual_task).await?; } Ok(None) => { self.sleep().await; @@ -126,15 +142,20 @@ pub struct AsyncWorkerTest<'a> { #[cfg(test)] impl<'a> AsyncWorkerTest<'a> { - pub async fn run(&mut self, task: Task) -> Result<(), Error> { - let result = self.execute_task(task).await; + pub async fn run( + &mut self, + task: Task, + actual_task: Box, + ) -> Result<(), Error> { + let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } - async fn execute_task(&mut self, task: Task) -> Result { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - + async fn execute_task( + &mut self, + task: Task, + actual_task: Box, + ) -> Result { let task_result = actual_task.run(self.queue).await; match task_result { Ok(()) => Ok(task), @@ -193,8 +214,17 @@ impl<'a> AsyncWorkerTest<'a> { .await { Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task).await?; + } self.sleep_params.maybe_reset_sleep_period(); - self.run(task).await? + // run scheduled task + self.run(task, actual_task).await?; } Ok(None) => { return Ok(()); @@ -212,6 +242,7 @@ impl<'a> AsyncWorkerTest<'a> { #[cfg(test)] mod async_worker_tests { use super::AsyncWorkerTest; + use crate::async_runnable::Scheduled; use crate::asynk::async_queue::AsyncQueueTest; use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; @@ -223,6 +254,8 @@ mod async_worker_tests { use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; use bb8_postgres::PostgresConnectionManager; + use chrono::Duration; + use chrono::Utc; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -237,6 +270,23 @@ mod async_worker_tests { Ok(()) } } + + #[derive(Serialize, Deserialize)] + struct WorkerAsyncTaskSchedule { + pub number: u16, + } + + #[typetag::serde] + #[async_trait] + impl AsyncRunnable for WorkerAsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + Ok(()) + } + fn cron(&self) -> Option { + Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(7))) + } + } + #[derive(Serialize, Deserialize)] struct AsyncFailedTask { pub number: u16, @@ -290,8 +340,9 @@ mod async_worker_tests { let transaction = connection.transaction().await.unwrap(); let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + let actual_task = WorkerAsyncTask { number: 1 }; - let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await; + let task = insert_task(&mut test, &actual_task).await; let id = task.id; let mut worker = AsyncWorkerTest::builder() @@ -299,12 +350,48 @@ mod async_worker_tests { .retention_mode(RetentionMode::KeepAll) .build(); - worker.run(task).await.unwrap(); + worker.run(task, Box::new(actual_task)).await.unwrap(); let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); assert_eq!(FangTaskState::Finished, task_finished.state); test.transaction.rollback().await.unwrap(); } + + #[tokio::test] + async fn schedule_task_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let actual_task = WorkerAsyncTaskSchedule { number: 1 }; + + let task = test.schedule_task(&actual_task).await.unwrap(); + + let id = task.id; + + let mut worker = AsyncWorkerTest::builder() + .queue(&mut test as &mut dyn AsyncQueueable) + .retention_mode(RetentionMode::KeepAll) + .build(); + + worker.run_tasks_until_none().await.unwrap(); + + let task = worker.queue.find_task_by_id(id).await.unwrap(); + + assert_eq!(id, task.id); + assert_eq!(FangTaskState::New, task.state); + + tokio::time::sleep(core::time::Duration::from_secs(10)).await; + + worker.run_tasks_until_none().await.unwrap(); + + let task = test.find_task_by_id(id).await.unwrap(); + assert_eq!(id, task.id); + assert_eq!(FangTaskState::Finished, task.state); + } + #[tokio::test] async fn saves_error_for_failed_task() { let pool = pool().await; @@ -312,8 +399,9 @@ mod async_worker_tests { let transaction = connection.transaction().await.unwrap(); let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + let failed_task = AsyncFailedTask { number: 1 }; - let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await; + let task = insert_task(&mut test, &failed_task).await; let id = task.id; let mut worker = AsyncWorkerTest::builder() @@ -321,7 +409,7 @@ mod async_worker_tests { .retention_mode(RetentionMode::KeepAll) .build(); - worker.run(task).await.unwrap(); + worker.run(task, Box::new(failed_task)).await.unwrap(); let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); @@ -367,6 +455,7 @@ mod async_worker_tests { assert_eq!(FangTaskState::New, task2.state); test.transaction.rollback().await.unwrap(); } + #[tokio::test] async fn remove_when_finished() { let pool = pool().await; diff --git a/src/asynk/mod.rs b/src/asynk/mod.rs index 77cc6d2..994f702 100644 --- a/src/asynk/mod.rs +++ b/src/asynk/mod.rs @@ -1,6 +1,5 @@ pub mod async_queue; pub mod async_runnable; -pub mod async_scheduler; pub mod async_worker; pub mod async_worker_pool; diff --git a/src/asynk/queries/fail_task.sql b/src/asynk/queries/fail_task.sql index 416d91f..1719286 100644 --- a/src/asynk/queries/fail_task.sql +++ b/src/asynk/queries/fail_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING * diff --git a/src/asynk/queries/fetch_periodic_tasks.sql b/src/asynk/queries/fetch_periodic_tasks.sql deleted file mode 100644 index 5d7529e..0000000 --- a/src/asynk/queries/fetch_periodic_tasks.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL diff --git a/src/asynk/queries/fetch_task_type.sql b/src/asynk/queries/fetch_task_type.sql index 360a5fa..91e2f85 100644 --- a/src/asynk/queries/fetch_task_type.sql +++ b/src/asynk/queries/fetch_task_type.sql @@ -1 +1 @@ -SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED +SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/src/asynk/queries/find_periodic_task_by_id.sql b/src/asynk/queries/find_periodic_task_by_id.sql deleted file mode 100644 index 67c0316..0000000 --- a/src/asynk/queries/find_periodic_task_by_id.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_periodic_tasks WHERE id = $1 diff --git a/src/asynk/queries/find_task_by_metadata.sql b/src/asynk/queries/find_task_by_metadata.sql deleted file mode 100644 index 0e38bfa..0000000 --- a/src/asynk/queries/find_task_by_metadata.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1 diff --git a/src/asynk/queries/find_task_by_uniq_hash.sql b/src/asynk/queries/find_task_by_uniq_hash.sql new file mode 100644 index 0000000..3694a58 --- /dev/null +++ b/src/asynk/queries/find_task_by_uniq_hash.sql @@ -0,0 +1 @@ +SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1 diff --git a/src/asynk/queries/insert_periodic_task.sql b/src/asynk/queries/insert_periodic_task.sql deleted file mode 100644 index 1ba98e8..0000000 --- a/src/asynk/queries/insert_periodic_task.sql +++ /dev/null @@ -1 +0,0 @@ -INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_millis") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at diff --git a/src/asynk/queries/insert_task.sql b/src/asynk/queries/insert_task.sql index b6ec160..514d921 100644 --- a/src/asynk/queries/insert_task.sql +++ b/src/asynk/queries/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +INSERT INTO "fang_tasks" ("metadata", "task_type", "scheduled_at") VALUES ($1, $2, $3) RETURNING * diff --git a/src/asynk/queries/insert_task_uniq.sql b/src/asynk/queries/insert_task_uniq.sql new file mode 100644 index 0000000..0817383 --- /dev/null +++ b/src/asynk/queries/insert_task_uniq.sql @@ -0,0 +1 @@ +INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) RETURNING * diff --git a/src/asynk/queries/remove_all_periodic_tasks.sql b/src/asynk/queries/remove_all_periodic_tasks.sql deleted file mode 100644 index 75b5afe..0000000 --- a/src/asynk/queries/remove_all_periodic_tasks.sql +++ /dev/null @@ -1 +0,0 @@ -DELETE FROM "fang_periodic_tasks" diff --git a/src/asynk/queries/schedule_next_task.sql b/src/asynk/queries/schedule_next_task.sql deleted file mode 100644 index 00fd46e..0000000 --- a/src/asynk/queries/schedule_next_task.sql +++ /dev/null @@ -1 +0,0 @@ -UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at diff --git a/src/asynk/queries/update_task_state.sql b/src/asynk/queries/update_task_state.sql index afca5c0..e2e2d94 100644 --- a/src/asynk/queries/update_task_state.sql +++ b/src/asynk/queries/update_task_state.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING * diff --git a/src/lib.rs b/src/lib.rs index eb16c3a..f099c68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,9 +60,16 @@ pub use typetag; #[doc(hidden)] pub extern crate serde; +#[doc(hidden)] +pub extern crate chrono; + #[doc(hidden)] pub use serde_derive::{Deserialize, Serialize}; +pub use chrono::DateTime; +pub use chrono::Utc; +pub use cron::Schedule; + #[cfg(feature = "blocking")] pub mod blocking; #[cfg(feature = "blocking")]