Asynk change schema (#75)
* add uniq method for AsyncRunnable trait (#70) * add uniq method for asyncrunnable * add hash enum * remove string * return bool values * Task struct modified (#71) * Task struct modified * asynk module try to adapt new scheme * delete period in millis * delete period in millis completed * Cron support :D * Cron and single Schedule support :D * Current timestamp value * fix bug and new test that confirms that it was a bug * fix a call * Update Cargo.toml Co-authored-by: Ayrat Badykov <ayratin555@gmail.com> * comments suggestions * fix clippy * Better user api for schedule with cron * Cron tested with example * Comments adressed * Comments adressed Co-authored-by: Ayrat Badykov <ayratin555@gmail.com> Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
parent
fc935f487e
commit
dffee33b9c
26 changed files with 462 additions and 545 deletions
|
@ -17,6 +17,9 @@ blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
||||||
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"]
|
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
cron = "0.11"
|
||||||
|
hex = "0.4"
|
||||||
|
sha2 = "0.10"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
|
|
12
fang_examples/asynk/simple_async_cron_worker/Cargo.toml
Normal file
12
fang_examples/asynk/simple_async_cron_worker/Cargo.toml
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
[package]
|
||||||
|
name = "simple_async_cron_worker"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
fang = { path = "../../../" , features = ["asynk"]}
|
||||||
|
env_logger = "0.9.0"
|
||||||
|
log = "0.4.0"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
33
fang_examples/asynk/simple_async_cron_worker/src/lib.rs
Normal file
33
fang_examples/asynk/simple_async_cron_worker/src/lib.rs
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
use fang::async_trait;
|
||||||
|
use fang::asynk::async_queue::AsyncQueueable;
|
||||||
|
use fang::asynk::async_runnable::Error;
|
||||||
|
use fang::asynk::async_runnable::Scheduled;
|
||||||
|
use fang::serde::{Deserialize, Serialize};
|
||||||
|
use fang::typetag;
|
||||||
|
use fang::AsyncRunnable;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
#[serde(crate = "fang::serde")]
|
||||||
|
pub struct MyCronTask {}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
#[typetag::serde]
|
||||||
|
impl AsyncRunnable for MyCronTask {
|
||||||
|
async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||||
|
log::info!("CRON!!!!!!!!!!!!!!!",);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
// sec min hour day of month month day of week year
|
||||||
|
// be careful works only with UTC hour.
|
||||||
|
// https://www.timeanddate.com/worldclock/timezone/utc
|
||||||
|
let expression = "0/20 * * * Aug-Sep * 2022/1";
|
||||||
|
Some(Scheduled::CronPattern(expression.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn uniq(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
41
fang_examples/asynk/simple_async_cron_worker/src/main.rs
Normal file
41
fang_examples/asynk/simple_async_cron_worker/src/main.rs
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
use fang::asynk::async_queue::AsyncQueue;
|
||||||
|
use fang::asynk::async_queue::AsyncQueueable;
|
||||||
|
use fang::asynk::async_worker_pool::AsyncWorkerPool;
|
||||||
|
use fang::AsyncRunnable;
|
||||||
|
use fang::NoTls;
|
||||||
|
use simple_async_cron_worker::MyCronTask;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
log::info!("Starting...");
|
||||||
|
let max_pool_size: u32 = 3;
|
||||||
|
let mut queue = AsyncQueue::builder()
|
||||||
|
.uri("postgres://postgres:postgres@localhost/fang")
|
||||||
|
.max_pool_size(max_pool_size)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
queue.connect(NoTls).await.unwrap();
|
||||||
|
log::info!("Queue connected...");
|
||||||
|
|
||||||
|
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
|
||||||
|
.number_of_workers(10_u32)
|
||||||
|
.queue(queue.clone())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
log::info!("Pool created ...");
|
||||||
|
|
||||||
|
pool.start().await;
|
||||||
|
log::info!("Workers started ...");
|
||||||
|
|
||||||
|
let task = MyCronTask {};
|
||||||
|
|
||||||
|
queue
|
||||||
|
.schedule_task(&task as &dyn AsyncRunnable)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(100)).await;
|
||||||
|
}
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
fang = { path = "../../" , features = ["asynk"]}
|
fang = { path = "../../../" , features = ["asynk"]}
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
log = "0.4.0"
|
log = "0.4.0"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
|
@ -12,11 +12,10 @@ async fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
log::info!("Starting...");
|
log::info!("Starting...");
|
||||||
let max_pool_size: u32 = 2;
|
let max_pool_size: u32 = 3;
|
||||||
let mut queue = AsyncQueue::builder()
|
let mut queue = AsyncQueue::builder()
|
||||||
.uri("postgres://postgres:postgres@localhost/fang")
|
.uri("postgres://postgres:postgres@localhost/fang")
|
||||||
.max_pool_size(max_pool_size)
|
.max_pool_size(max_pool_size)
|
||||||
.duplicated_tasks(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
queue.connect(NoTls).await.unwrap();
|
queue.connect(NoTls).await.unwrap();
|
||||||
|
@ -40,6 +39,7 @@ async fn main() {
|
||||||
.insert_task(&task1 as &dyn AsyncRunnable)
|
.insert_task(&task1 as &dyn AsyncRunnable)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
queue
|
queue
|
||||||
.insert_task(&task2 as &dyn AsyncRunnable)
|
.insert_task(&task2 as &dyn AsyncRunnable)
|
||||||
.await
|
.await
|
|
@ -8,7 +8,6 @@ CREATE TABLE fang_tasks (
|
||||||
error_message TEXT,
|
error_message TEXT,
|
||||||
state fang_task_state DEFAULT 'new' NOT NULL,
|
state fang_task_state DEFAULT 'new' NOT NULL,
|
||||||
task_type VARCHAR DEFAULT 'common' NOT NULL,
|
task_type VARCHAR DEFAULT 'common' NOT NULL,
|
||||||
periodic BOOLEAN DEFAULT FALSE,
|
|
||||||
uniq_hash CHAR(64),
|
uniq_hash CHAR(64),
|
||||||
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::async_runnable::Scheduled::*;
|
||||||
use crate::asynk::async_runnable::AsyncRunnable;
|
use crate::asynk::async_runnable::AsyncRunnable;
|
||||||
use crate::asynk::async_runnable::Error as FangError;
|
use crate::asynk::async_runnable::Error as FangError;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -9,10 +10,11 @@ use bb8_postgres::tokio_postgres::Socket;
|
||||||
use bb8_postgres::tokio_postgres::Transaction;
|
use bb8_postgres::tokio_postgres::Transaction;
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
use bb8_postgres::PostgresConnectionManager;
|
||||||
use chrono::DateTime;
|
use chrono::DateTime;
|
||||||
use chrono::Duration;
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
use cron::Schedule;
|
||||||
use postgres_types::{FromSql, ToSql};
|
use postgres_types::{FromSql, ToSql};
|
||||||
use std::time::Duration as StdDuration;
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::str::FromStr;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use typed_builder::TypedBuilder;
|
use typed_builder::TypedBuilder;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -21,21 +23,15 @@ use uuid::Uuid;
|
||||||
use bb8_postgres::tokio_postgres::tls::NoTls;
|
use bb8_postgres::tokio_postgres::tls::NoTls;
|
||||||
|
|
||||||
const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql");
|
const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql");
|
||||||
const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql");
|
const INSERT_TASK_UNIQ_QUERY: &str = include_str!("queries/insert_task_uniq.sql");
|
||||||
const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql");
|
|
||||||
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
|
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
|
||||||
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
|
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
|
||||||
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
|
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
|
||||||
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
|
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
|
||||||
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
|
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
|
||||||
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
|
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
|
||||||
const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql");
|
const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql");
|
||||||
const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql");
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql");
|
const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql");
|
||||||
#[cfg(test)]
|
|
||||||
const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql");
|
|
||||||
|
|
||||||
pub const DEFAULT_TASK_TYPE: &str = "common";
|
pub const DEFAULT_TASK_TYPE: &str = "common";
|
||||||
|
|
||||||
|
@ -71,25 +67,23 @@ pub struct Task {
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pub task_type: String,
|
pub task_type: String,
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
|
pub uniq_hash: Option<String>,
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub scheduled_at: DateTime<Utc>,
|
||||||
|
#[builder(setter(into))]
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pub updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)]
|
#[derive(Debug, Error)]
|
||||||
pub struct PeriodicTask {
|
pub enum CronError {
|
||||||
#[builder(setter(into))]
|
#[error(transparent)]
|
||||||
pub id: Uuid,
|
LibraryError(#[from] cron::error::Error),
|
||||||
#[builder(setter(into))]
|
#[error("You have to implement method `cron()` in your AsyncRunnable")]
|
||||||
pub metadata: serde_json::Value,
|
TaskNotSchedulableError,
|
||||||
#[builder(setter(into))]
|
#[error("No timestamps match with this cron pattern")]
|
||||||
pub period_in_millis: i64,
|
NoTimestampsError,
|
||||||
#[builder(setter(into))]
|
|
||||||
pub scheduled_at: Option<DateTime<Utc>>,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub created_at: DateTime<Utc>,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub updated_at: DateTime<Utc>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -100,6 +94,8 @@ pub enum AsyncQueueError {
|
||||||
PgError(#[from] bb8_postgres::tokio_postgres::Error),
|
PgError(#[from] bb8_postgres::tokio_postgres::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
SerdeError(#[from] serde_json::Error),
|
SerdeError(#[from] serde_json::Error),
|
||||||
|
#[error(transparent)]
|
||||||
|
CronError(#[from] CronError),
|
||||||
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
|
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
|
||||||
ResultError { expected: u64, found: u64 },
|
ResultError { expected: u64, found: u64 },
|
||||||
#[error(
|
#[error(
|
||||||
|
@ -110,6 +106,12 @@ pub enum AsyncQueueError {
|
||||||
TimeError,
|
TimeError,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<cron::error::Error> for AsyncQueueError {
|
||||||
|
fn from(error: cron::error::Error) -> Self {
|
||||||
|
AsyncQueueError::CronError(CronError::LibraryError(error))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<AsyncQueueError> for FangError {
|
impl From<AsyncQueueError> for FangError {
|
||||||
fn from(error: AsyncQueueError) -> Self {
|
fn from(error: AsyncQueueError) -> Self {
|
||||||
let message = format!("{:?}", error);
|
let message = format!("{:?}", error);
|
||||||
|
@ -127,12 +129,15 @@ pub trait AsyncQueueable: Send {
|
||||||
) -> Result<Option<Task>, AsyncQueueError>;
|
) -> Result<Option<Task>, AsyncQueueError>;
|
||||||
|
|
||||||
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
||||||
|
|
||||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
|
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
|
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
|
||||||
|
|
||||||
|
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError>;
|
||||||
|
|
||||||
async fn update_task_state(
|
async fn update_task_state(
|
||||||
&mut self,
|
&mut self,
|
||||||
task: Task,
|
task: Task,
|
||||||
|
@ -142,22 +147,7 @@ pub trait AsyncQueueable: Send {
|
||||||
async fn fail_task(&mut self, task: Task, error_message: &str)
|
async fn fail_task(&mut self, task: Task, error_message: &str)
|
||||||
-> Result<Task, AsyncQueueError>;
|
-> Result<Task, AsyncQueueError>;
|
||||||
|
|
||||||
async fn fetch_periodic_tasks(
|
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
||||||
&mut self,
|
|
||||||
error_margin: StdDuration,
|
|
||||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError>;
|
|
||||||
|
|
||||||
async fn insert_periodic_task(
|
|
||||||
&mut self,
|
|
||||||
task: &dyn AsyncRunnable,
|
|
||||||
timestamp: DateTime<Utc>,
|
|
||||||
period: i64,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError>;
|
|
||||||
|
|
||||||
async fn schedule_next_task(
|
|
||||||
&mut self,
|
|
||||||
periodic_task: PeriodicTask,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(TypedBuilder, Debug, Clone)]
|
#[derive(TypedBuilder, Debug, Clone)]
|
||||||
|
@ -174,8 +164,6 @@ where
|
||||||
uri: String,
|
uri: String,
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
max_pool_size: u32,
|
max_pool_size: u32,
|
||||||
#[builder(default = false, setter(into))]
|
|
||||||
duplicated_tasks: bool,
|
|
||||||
#[builder(default = false, setter(skip))]
|
#[builder(default = false, setter(skip))]
|
||||||
connected: bool,
|
connected: bool,
|
||||||
}
|
}
|
||||||
|
@ -185,47 +173,24 @@ where
|
||||||
pub struct AsyncQueueTest<'a> {
|
pub struct AsyncQueueTest<'a> {
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pub transaction: Transaction<'a>,
|
pub transaction: Transaction<'a>,
|
||||||
#[builder(default = false, setter(into))]
|
|
||||||
pub duplicated_tasks: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
impl<'a> AsyncQueueTest<'a> {
|
|
||||||
pub async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
|
||||||
let row: Row = self
|
|
||||||
.transaction
|
|
||||||
.query_one(FIND_TASK_BY_ID_QUERY, &[&id])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::row_to_task(row);
|
|
||||||
Ok(task)
|
|
||||||
}
|
|
||||||
pub async fn find_periodic_task_by_id(
|
|
||||||
&mut self,
|
|
||||||
id: Uuid,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
let row: Row = self
|
|
||||||
.transaction
|
|
||||||
.query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::row_to_periodic_task(row);
|
|
||||||
Ok(task)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl AsyncQueueable for AsyncQueueTest<'_> {
|
impl AsyncQueueable for AsyncQueueTest<'_> {
|
||||||
|
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
||||||
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
|
AsyncQueue::<NoTls>::find_task_by_id_query(transaction, id).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn fetch_and_touch_task(
|
async fn fetch_and_touch_task(
|
||||||
&mut self,
|
&mut self,
|
||||||
task_type: Option<String>,
|
task_type: Option<String>,
|
||||||
) -> Result<Option<Task>, AsyncQueueError> {
|
) -> Result<Option<Task>, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(transaction, task_type).await?;
|
AsyncQueue::<NoTls>::fetch_and_touch_task_query(transaction, task_type).await
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||||
|
@ -233,84 +198,86 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
|
||||||
let task: Task = if self.duplicated_tasks {
|
let task: Task = if !task.uniq() {
|
||||||
AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, &task.task_type()).await?
|
AsyncQueue::<NoTls>::insert_task_query(
|
||||||
|
transaction,
|
||||||
|
metadata,
|
||||||
|
&task.task_type(),
|
||||||
|
Utc::now(),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
} else {
|
} else {
|
||||||
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
|
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
|
||||||
transaction,
|
transaction,
|
||||||
metadata,
|
metadata,
|
||||||
&task.task_type(),
|
&task.task_type(),
|
||||||
|
Utc::now(),
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
Ok(task)
|
Ok(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn schedule_next_task(
|
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||||
&mut self,
|
|
||||||
periodic_task: PeriodicTask,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
let transaction = &mut self.transaction;
|
|
||||||
|
|
||||||
let periodic_task =
|
|
||||||
AsyncQueue::<NoTls>::schedule_next_task_query(transaction, periodic_task).await?;
|
|
||||||
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
|
||||||
async fn insert_periodic_task(
|
|
||||||
&mut self,
|
|
||||||
task: &dyn AsyncRunnable,
|
|
||||||
timestamp: DateTime<Utc>,
|
|
||||||
period: i64,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
|
||||||
let periodic_task = AsyncQueue::<NoTls>::insert_periodic_task_query(
|
let scheduled_at = match task.cron() {
|
||||||
transaction,
|
Some(scheduled) => match scheduled {
|
||||||
metadata,
|
CronPattern(cron_pattern) => {
|
||||||
timestamp,
|
let schedule = Schedule::from_str(&cron_pattern)?;
|
||||||
period,
|
let mut iterator = schedule.upcoming(Utc);
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(periodic_task)
|
iterator
|
||||||
}
|
.next()
|
||||||
|
.ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))?
|
||||||
|
}
|
||||||
|
ScheduleOnce(datetime) => datetime,
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
return Err(AsyncQueueError::CronError(
|
||||||
|
CronError::TaskNotSchedulableError,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
async fn fetch_periodic_tasks(
|
let task: Task = if !task.uniq() {
|
||||||
&mut self,
|
AsyncQueue::<NoTls>::insert_task_query(
|
||||||
error_margin: StdDuration,
|
transaction,
|
||||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
metadata,
|
||||||
let transaction = &mut self.transaction;
|
&task.task_type(),
|
||||||
|
scheduled_at,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
|
||||||
|
transaction,
|
||||||
|
metadata,
|
||||||
|
&task.task_type(),
|
||||||
|
scheduled_at,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
};
|
||||||
|
|
||||||
let periodic_task =
|
Ok(task)
|
||||||
AsyncQueue::<NoTls>::fetch_periodic_tasks_query(transaction, error_margin).await?;
|
|
||||||
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
}
|
||||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let result = AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await?;
|
AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let result = AsyncQueue::<NoTls>::remove_task_query(transaction, task).await?;
|
AsyncQueue::<NoTls>::remove_task_query(transaction, task).await
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(transaction, task_type).await?;
|
AsyncQueue::<NoTls>::remove_tasks_type_query(transaction, task_type).await
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_task_state(
|
async fn update_task_state(
|
||||||
|
@ -320,9 +287,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::update_task_state_query(transaction, task, state).await?;
|
AsyncQueue::<NoTls>::update_task_state_query(transaction, task, state).await
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fail_task(
|
async fn fail_task(
|
||||||
|
@ -332,9 +297,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let transaction = &mut self.transaction;
|
let transaction = &mut self.transaction;
|
||||||
|
|
||||||
let task = AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await?;
|
AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,6 +347,16 @@ where
|
||||||
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
|
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn find_task_by_id_query(
|
||||||
|
transaction: &mut Transaction<'_>,
|
||||||
|
id: Uuid,
|
||||||
|
) -> Result<Task, AsyncQueueError> {
|
||||||
|
let row: Row = transaction.query_one(FIND_TASK_BY_ID_QUERY, &[&id]).await?;
|
||||||
|
|
||||||
|
let task = Self::row_to_task(row);
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
|
||||||
async fn fail_task_query(
|
async fn fail_task_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
task: Task,
|
task: Task,
|
||||||
|
@ -435,7 +408,7 @@ where
|
||||||
task_type: &str,
|
task_type: &str,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let row: Row = transaction
|
let row: Row = transaction
|
||||||
.query_one(FETCH_TASK_TYPE_QUERY, &[&task_type])
|
.query_one(FETCH_TASK_TYPE_QUERY, &[&task_type, &Utc::now()])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let task = Self::row_to_task(row);
|
let task = Self::row_to_task(row);
|
||||||
|
@ -461,72 +434,32 @@ where
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
task_type: &str,
|
task_type: &str,
|
||||||
|
scheduled_at: DateTime<Utc>,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let row: Row = transaction
|
let row: Row = transaction
|
||||||
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
|
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type, &scheduled_at])
|
||||||
.await?;
|
.await?;
|
||||||
let task = Self::row_to_task(row);
|
let task = Self::row_to_task(row);
|
||||||
Ok(task)
|
Ok(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn schedule_next_task_query(
|
async fn insert_task_uniq_query(
|
||||||
transaction: &mut Transaction<'_>,
|
|
||||||
periodic_task: PeriodicTask,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
let updated_at = Utc::now();
|
|
||||||
let scheduled_at = updated_at + Duration::milliseconds(periodic_task.period_in_millis);
|
|
||||||
|
|
||||||
let row: Row = transaction
|
|
||||||
.query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let periodic_task = Self::row_to_periodic_task(row);
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn insert_periodic_task_query(
|
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
timestamp: DateTime<Utc>,
|
task_type: &str,
|
||||||
period: i64,
|
scheduled_at: DateTime<Utc>,
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
|
let uniq_hash = Self::calculate_hash(metadata.to_string());
|
||||||
|
|
||||||
let row: Row = transaction
|
let row: Row = transaction
|
||||||
.query_one(
|
.query_one(
|
||||||
INSERT_PERIODIC_TASK_QUERY,
|
INSERT_TASK_UNIQ_QUERY,
|
||||||
&[&metadata, ×tamp, &period],
|
&[&metadata, &task_type, &uniq_hash, &scheduled_at],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let periodic_task = Self::row_to_periodic_task(row);
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_periodic_tasks_query(
|
let task = Self::row_to_task(row);
|
||||||
transaction: &mut Transaction<'_>,
|
Ok(task)
|
||||||
error_margin: StdDuration,
|
|
||||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
|
||||||
let current_time = Utc::now();
|
|
||||||
|
|
||||||
let margin: Duration = match Duration::from_std(error_margin) {
|
|
||||||
Ok(value) => Ok(value),
|
|
||||||
Err(_) => Err(AsyncQueueError::TimeError),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
let low_limit = current_time - margin;
|
|
||||||
let high_limit = current_time + margin;
|
|
||||||
let rows: Vec<Row> = transaction
|
|
||||||
.query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let periodic_tasks: Vec<PeriodicTask> = rows
|
|
||||||
.into_iter()
|
|
||||||
.map(|row| Self::row_to_periodic_task(row))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if periodic_tasks.is_empty() {
|
|
||||||
Ok(None)
|
|
||||||
} else {
|
|
||||||
Ok(Some(periodic_tasks))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_query(
|
async fn execute_query(
|
||||||
|
@ -552,19 +485,31 @@ where
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
task_type: &str,
|
task_type: &str,
|
||||||
|
scheduled_at: DateTime<Utc>,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
match Self::find_task_by_metadata_query(transaction, &metadata).await {
|
match Self::find_task_by_uniq_hash_query(transaction, &metadata).await {
|
||||||
Some(task) => Ok(task),
|
Some(task) => Ok(task),
|
||||||
None => Self::insert_task_query(transaction, metadata, task_type).await,
|
None => {
|
||||||
|
Self::insert_task_uniq_query(transaction, metadata, task_type, scheduled_at).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn find_task_by_metadata_query(
|
fn calculate_hash(json: String) -> String {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(json.as_bytes());
|
||||||
|
let result = hasher.finalize();
|
||||||
|
hex::encode(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_task_by_uniq_hash_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
metadata: &serde_json::Value,
|
metadata: &serde_json::Value,
|
||||||
) -> Option<Task> {
|
) -> Option<Task> {
|
||||||
|
let uniq_hash = Self::calculate_hash(metadata.to_string());
|
||||||
|
|
||||||
let result = transaction
|
let result = transaction
|
||||||
.query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata])
|
.query_one(FIND_TASK_BY_UNIQ_HASH_QUERY, &[&uniq_hash])
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
|
@ -573,47 +518,29 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn row_to_periodic_task(row: Row) -> PeriodicTask {
|
|
||||||
let id: Uuid = row.get("id");
|
|
||||||
let metadata: serde_json::Value = row.get("metadata");
|
|
||||||
let period_in_millis: i64 = row.get("period_in_millis");
|
|
||||||
let scheduled_at: Option<DateTime<Utc>> = match row.try_get("scheduled_at") {
|
|
||||||
Ok(datetime) => Some(datetime),
|
|
||||||
Err(_) => None,
|
|
||||||
};
|
|
||||||
let created_at: DateTime<Utc> = row.get("created_at");
|
|
||||||
let updated_at: DateTime<Utc> = row.get("updated_at");
|
|
||||||
|
|
||||||
PeriodicTask::builder()
|
|
||||||
.id(id)
|
|
||||||
.metadata(metadata)
|
|
||||||
.period_in_millis(period_in_millis)
|
|
||||||
.scheduled_at(scheduled_at)
|
|
||||||
.created_at(created_at)
|
|
||||||
.updated_at(updated_at)
|
|
||||||
.build()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn row_to_task(row: Row) -> Task {
|
fn row_to_task(row: Row) -> Task {
|
||||||
let id: Uuid = row.get("id");
|
let id: Uuid = row.get("id");
|
||||||
let metadata: serde_json::Value = row.get("metadata");
|
let metadata: serde_json::Value = row.get("metadata");
|
||||||
let error_message: Option<String> = match row.try_get("error_message") {
|
|
||||||
Ok(error_message) => Some(error_message),
|
let error_message: Option<String> = row.try_get("error_message").ok();
|
||||||
Err(_) => None,
|
|
||||||
};
|
let uniq_hash: Option<String> = row.try_get("uniq_hash").ok();
|
||||||
let state: FangTaskState = row.get("state");
|
let state: FangTaskState = row.get("state");
|
||||||
let task_type: String = row.get("task_type");
|
let task_type: String = row.get("task_type");
|
||||||
let created_at: DateTime<Utc> = row.get("created_at");
|
let created_at: DateTime<Utc> = row.get("created_at");
|
||||||
let updated_at: DateTime<Utc> = row.get("updated_at");
|
let updated_at: DateTime<Utc> = row.get("updated_at");
|
||||||
|
let scheduled_at: DateTime<Utc> = row.get("scheduled_at");
|
||||||
|
|
||||||
Task::builder()
|
Task::builder()
|
||||||
.id(id)
|
.id(id)
|
||||||
.metadata(metadata)
|
.metadata(metadata)
|
||||||
.error_message(error_message)
|
.error_message(error_message)
|
||||||
.state(state)
|
.state(state)
|
||||||
|
.uniq_hash(uniq_hash)
|
||||||
.task_type(task_type)
|
.task_type(task_type)
|
||||||
.created_at(created_at)
|
.created_at(created_at)
|
||||||
.updated_at(updated_at)
|
.updated_at(updated_at)
|
||||||
|
.scheduled_at(scheduled_at)
|
||||||
.build()
|
.build()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -626,6 +553,17 @@ where
|
||||||
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
|
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
|
let task = Self::find_task_by_id_query(&mut transaction, id).await?;
|
||||||
|
|
||||||
|
transaction.commit().await?;
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
}
|
||||||
|
|
||||||
async fn fetch_and_touch_task(
|
async fn fetch_and_touch_task(
|
||||||
&mut self,
|
&mut self,
|
||||||
task_type: Option<String>,
|
task_type: Option<String>,
|
||||||
|
@ -648,11 +586,17 @@ where
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
|
||||||
let task: Task = if self.duplicated_tasks {
|
let task: Task = if !task.uniq() {
|
||||||
Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await?
|
Self::insert_task_query(&mut transaction, metadata, &task.task_type(), Utc::now())
|
||||||
} else {
|
|
||||||
Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type())
|
|
||||||
.await?
|
.await?
|
||||||
|
} else {
|
||||||
|
Self::insert_task_if_not_exist_query(
|
||||||
|
&mut transaction,
|
||||||
|
metadata,
|
||||||
|
&task.task_type(),
|
||||||
|
Utc::now(),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
};
|
};
|
||||||
|
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
|
@ -660,55 +604,44 @@ where
|
||||||
Ok(task)
|
Ok(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_periodic_task(
|
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||||
&mut self,
|
|
||||||
task: &dyn AsyncRunnable,
|
|
||||||
timestamp: DateTime<Utc>,
|
|
||||||
period: i64,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
self.check_if_connection()?;
|
self.check_if_connection()?;
|
||||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
|
||||||
let periodic_task =
|
let scheduled_at = match task.cron() {
|
||||||
Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?;
|
Some(scheduled) => match scheduled {
|
||||||
|
CronPattern(cron_pattern) => {
|
||||||
|
let schedule = Schedule::from_str(&cron_pattern)?;
|
||||||
|
let mut iterator = schedule.upcoming(Utc);
|
||||||
|
iterator
|
||||||
|
.next()
|
||||||
|
.ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))?
|
||||||
|
}
|
||||||
|
ScheduleOnce(datetime) => datetime,
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
return Err(AsyncQueueError::CronError(
|
||||||
|
CronError::TaskNotSchedulableError,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let task: Task = if !task.uniq() {
|
||||||
|
Self::insert_task_query(&mut transaction, metadata, &task.task_type(), scheduled_at)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
Self::insert_task_if_not_exist_query(
|
||||||
|
&mut transaction,
|
||||||
|
metadata,
|
||||||
|
&task.task_type(),
|
||||||
|
scheduled_at,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
};
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
|
Ok(task)
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn schedule_next_task(
|
|
||||||
&mut self,
|
|
||||||
periodic_task: PeriodicTask,
|
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
|
||||||
self.check_if_connection()?;
|
|
||||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_periodic_tasks(
|
|
||||||
&mut self,
|
|
||||||
error_margin: StdDuration,
|
|
||||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
|
||||||
self.check_if_connection()?;
|
|
||||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
|
||||||
let mut transaction = connection.transaction().await?;
|
|
||||||
|
|
||||||
let periodic_task =
|
|
||||||
Self::fetch_periodic_tasks_query(&mut transaction, error_margin).await?;
|
|
||||||
|
|
||||||
transaction.commit().await?;
|
|
||||||
|
|
||||||
Ok(periodic_task)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||||
|
@ -784,12 +717,17 @@ mod async_queue_tests {
|
||||||
use super::AsyncQueueable;
|
use super::AsyncQueueable;
|
||||||
use super::FangTaskState;
|
use super::FangTaskState;
|
||||||
use super::Task;
|
use super::Task;
|
||||||
|
use crate::async_runnable::Scheduled;
|
||||||
use crate::asynk::AsyncError as Error;
|
use crate::asynk::AsyncError as Error;
|
||||||
use crate::asynk::AsyncRunnable;
|
use crate::asynk::AsyncRunnable;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
use bb8_postgres::tokio_postgres::NoTls;
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
use bb8_postgres::PostgresConnectionManager;
|
||||||
|
use chrono::DateTime;
|
||||||
|
use chrono::Duration;
|
||||||
|
use chrono::SubsecRound;
|
||||||
|
use chrono::Utc;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -805,6 +743,25 @@ mod async_queue_tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct AsyncTaskSchedule {
|
||||||
|
pub number: u16,
|
||||||
|
pub datetime: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncRunnable for AsyncTaskSchedule {
|
||||||
|
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
let datetime = self.datetime.parse::<DateTime<Utc>>().ok()?;
|
||||||
|
Some(Scheduled::ScheduleOnce(datetime))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn insert_task_creates_new_task() {
|
async fn insert_task_creates_new_task() {
|
||||||
let pool = pool().await;
|
let pool = pool().await;
|
||||||
|
@ -912,6 +869,32 @@ mod async_queue_tests {
|
||||||
test.transaction.rollback().await.unwrap();
|
test.transaction.rollback().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn schedule_task_test() {
|
||||||
|
let pool = pool().await;
|
||||||
|
let mut connection = pool.get().await.unwrap();
|
||||||
|
let transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
|
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||||
|
|
||||||
|
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
|
||||||
|
|
||||||
|
let task = &AsyncTaskSchedule {
|
||||||
|
number: 1,
|
||||||
|
datetime: datetime.to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let task = test.schedule_task(task).await.unwrap();
|
||||||
|
|
||||||
|
let metadata = task.metadata.as_object().unwrap();
|
||||||
|
let number = metadata["number"].as_u64();
|
||||||
|
let type_task = metadata["type"].as_str();
|
||||||
|
|
||||||
|
assert_eq!(Some(1), number);
|
||||||
|
assert_eq!(Some("AsyncTaskSchedule"), type_task);
|
||||||
|
assert_eq!(task.scheduled_at, datetime);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn fetch_and_touch_test() {
|
async fn fetch_and_touch_test() {
|
||||||
let pool = pool().await;
|
let pool = pool().await;
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use chrono::DateTime;
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
const COMMON_TYPE: &str = "common";
|
const COMMON_TYPE: &str = "common";
|
||||||
|
|
||||||
|
@ -8,6 +10,11 @@ pub struct Error {
|
||||||
pub description: String,
|
pub description: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum Scheduled {
|
||||||
|
CronPattern(String),
|
||||||
|
ScheduleOnce(DateTime<Utc>),
|
||||||
|
}
|
||||||
|
|
||||||
#[typetag::serde(tag = "type")]
|
#[typetag::serde(tag = "type")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait AsyncRunnable: Send + Sync {
|
pub trait AsyncRunnable: Send + Sync {
|
||||||
|
@ -16,4 +23,12 @@ pub trait AsyncRunnable: Send + Sync {
|
||||||
fn task_type(&self) -> String {
|
fn task_type(&self) -> String {
|
||||||
COMMON_TYPE.to_string()
|
COMMON_TYPE.to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn uniq(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,260 +0,0 @@
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
|
||||||
use crate::asynk::async_queue::PeriodicTask;
|
|
||||||
use crate::asynk::AsyncError as Error;
|
|
||||||
use crate::asynk::AsyncRunnable;
|
|
||||||
use async_recursion::async_recursion;
|
|
||||||
use log::error;
|
|
||||||
use std::time::Duration;
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use tokio::time::sleep;
|
|
||||||
use typed_builder::TypedBuilder;
|
|
||||||
|
|
||||||
#[derive(TypedBuilder, Clone)]
|
|
||||||
pub struct Scheduler<AQueue>
|
|
||||||
where
|
|
||||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
|
||||||
{
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub check_period: Duration,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub error_margin: Duration,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub queue: AQueue,
|
|
||||||
#[builder(default = 0, setter(into))]
|
|
||||||
pub number_of_restarts: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<AQueue> Scheduler<AQueue>
|
|
||||||
where
|
|
||||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
|
||||||
{
|
|
||||||
#[async_recursion(?Send)]
|
|
||||||
pub async fn start(&mut self) -> Result<(), Error> {
|
|
||||||
let join_handle: JoinHandle<Result<(), Error>> = self.schedule_loop().await;
|
|
||||||
|
|
||||||
match join_handle.await {
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
"Scheduler panicked, restarting {:?}. Number of restarts {}",
|
|
||||||
err, self.number_of_restarts
|
|
||||||
);
|
|
||||||
self.number_of_restarts += 1;
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
self.start().await
|
|
||||||
}
|
|
||||||
Ok(task_res) => match task_res {
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
"Scheduler failed, restarting {:?}. Number of restarts {}",
|
|
||||||
err, self.number_of_restarts
|
|
||||||
);
|
|
||||||
self.number_of_restarts += 1;
|
|
||||||
self.start().await
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
error!(
|
|
||||||
"Scheduler stopped. restarting. Number of restarts {}",
|
|
||||||
self.number_of_restarts
|
|
||||||
);
|
|
||||||
self.number_of_restarts += 1;
|
|
||||||
self.start().await
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn schedule_loop(&mut self) -> JoinHandle<Result<(), Error>> {
|
|
||||||
let mut scheduler = self.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let sleep_duration = scheduler.check_period;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
scheduler.schedule().await?;
|
|
||||||
|
|
||||||
sleep(sleep_duration).await;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn schedule(&mut self) -> Result<(), Error> {
|
|
||||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
|
||||||
for task in tasks {
|
|
||||||
self.process_task(task).await?;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
|
|
||||||
match task.scheduled_at {
|
|
||||||
None => {
|
|
||||||
self.queue.schedule_next_task(task).await?;
|
|
||||||
}
|
|
||||||
Some(_) => {
|
|
||||||
let actual_task: Box<dyn AsyncRunnable> =
|
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
|
||||||
|
|
||||||
self.queue.insert_task(&*actual_task).await?;
|
|
||||||
|
|
||||||
self.queue.schedule_next_task(task).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
#[derive(TypedBuilder)]
|
|
||||||
pub struct SchedulerTest<'a> {
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub check_period: Duration,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub error_margin: Duration,
|
|
||||||
#[builder(setter(into))]
|
|
||||||
pub queue: &'a mut dyn AsyncQueueable,
|
|
||||||
#[builder(default = 0, setter(into))]
|
|
||||||
pub number_of_restarts: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
impl<'a> SchedulerTest<'a> {
|
|
||||||
async fn schedule_test(&mut self) -> Result<(), Error> {
|
|
||||||
let sleep_duration = self.check_period;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
|
||||||
Some(tasks) => {
|
|
||||||
for task in tasks {
|
|
||||||
self.process_task(task).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
sleep(sleep_duration).await;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
|
|
||||||
match task.scheduled_at {
|
|
||||||
None => {
|
|
||||||
self.queue.schedule_next_task(task).await?;
|
|
||||||
}
|
|
||||||
Some(_) => {
|
|
||||||
let actual_task: Box<dyn AsyncRunnable> =
|
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
|
||||||
|
|
||||||
self.queue.insert_task(&*actual_task).await?;
|
|
||||||
|
|
||||||
self.queue.schedule_next_task(task).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod async_scheduler_tests {
|
|
||||||
use super::SchedulerTest;
|
|
||||||
use crate::asynk::async_queue::AsyncQueueTest;
|
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
|
||||||
use crate::asynk::async_queue::PeriodicTask;
|
|
||||||
use crate::asynk::AsyncError as Error;
|
|
||||||
use crate::asynk::AsyncRunnable;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bb8_postgres::bb8::Pool;
|
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
|
||||||
use chrono::DateTime;
|
|
||||||
use chrono::Duration as OtherDuration;
|
|
||||||
use chrono::Utc;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct AsyncScheduledTask {
|
|
||||||
pub number: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
#[async_trait]
|
|
||||||
impl AsyncRunnable for AsyncScheduledTask {
|
|
||||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"schedule".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn schedules_tasks() {
|
|
||||||
let pool = pool().await;
|
|
||||||
let mut connection = pool.get().await.unwrap();
|
|
||||||
let transaction = connection.transaction().await.unwrap();
|
|
||||||
|
|
||||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
|
||||||
|
|
||||||
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
|
|
||||||
|
|
||||||
let _periodic_task = insert_periodic_task(
|
|
||||||
&mut test,
|
|
||||||
&AsyncScheduledTask { number: 1 },
|
|
||||||
schedule_in_future,
|
|
||||||
10000,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let check_period: u64 = 1;
|
|
||||||
let error_margin_seconds: u64 = 2;
|
|
||||||
|
|
||||||
let mut scheduler = SchedulerTest::builder()
|
|
||||||
.check_period(Duration::from_secs(check_period))
|
|
||||||
.error_margin(Duration::from_secs(error_margin_seconds))
|
|
||||||
.queue(&mut test as &mut dyn AsyncQueueable)
|
|
||||||
.build();
|
|
||||||
// Scheduler start tricky not loop :)
|
|
||||||
scheduler.schedule_test().await.unwrap();
|
|
||||||
|
|
||||||
let task = scheduler
|
|
||||||
.queue
|
|
||||||
.fetch_and_touch_task(Some("schedule".to_string()))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let metadata = task.metadata.as_object().unwrap();
|
|
||||||
let number = metadata["number"].as_u64();
|
|
||||||
let type_task = metadata["type"].as_str();
|
|
||||||
|
|
||||||
let runnable_task: Box<dyn AsyncRunnable> =
|
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
|
||||||
|
|
||||||
assert_eq!("schedule", runnable_task.task_type());
|
|
||||||
assert_eq!(Some("AsyncScheduledTask"), type_task);
|
|
||||||
assert_eq!(Some(1), number);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn insert_periodic_task(
|
|
||||||
test: &mut AsyncQueueTest<'_>,
|
|
||||||
task: &dyn AsyncRunnable,
|
|
||||||
timestamp: DateTime<Utc>,
|
|
||||||
period_in_millis: i64,
|
|
||||||
) -> PeriodicTask {
|
|
||||||
test.insert_periodic_task(task, timestamp, period_in_millis)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
|
|
||||||
let pg_mgr = PostgresConnectionManager::new_from_stringlike(
|
|
||||||
"postgres://postgres:postgres@localhost/fang",
|
|
||||||
NoTls,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Pool::builder().build(pg_mgr).await.unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::async_runnable::Scheduled::*;
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use crate::asynk::async_queue::FangTaskState;
|
use crate::asynk::async_queue::FangTaskState;
|
||||||
use crate::asynk::async_queue::Task;
|
use crate::asynk::async_queue::Task;
|
||||||
|
@ -27,15 +28,20 @@ impl<AQueue> AsyncWorker<AQueue>
|
||||||
where
|
where
|
||||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
||||||
{
|
{
|
||||||
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
|
pub async fn run(
|
||||||
let result = self.execute_task(task).await;
|
&mut self,
|
||||||
|
task: Task,
|
||||||
|
actual_task: Box<dyn AsyncRunnable>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let result = self.execute_task(task, actual_task).await;
|
||||||
self.finalize_task(result).await
|
self.finalize_task(result).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
|
async fn execute_task(
|
||||||
let actual_task: Box<dyn AsyncRunnable> =
|
&mut self,
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
task: Task,
|
||||||
|
actual_task: Box<dyn AsyncRunnable>,
|
||||||
|
) -> Result<Task, (Task, String)> {
|
||||||
let task_result = actual_task.run(&mut self.queue).await;
|
let task_result = actual_task.run(&mut self.queue).await;
|
||||||
match task_result {
|
match task_result {
|
||||||
Ok(()) => Ok(task),
|
Ok(()) => Ok(task),
|
||||||
|
@ -88,14 +94,24 @@ where
|
||||||
|
|
||||||
pub async fn run_tasks(&mut self) -> Result<(), Error> {
|
pub async fn run_tasks(&mut self) -> Result<(), Error> {
|
||||||
loop {
|
loop {
|
||||||
|
//fetch task
|
||||||
match self
|
match self
|
||||||
.queue
|
.queue
|
||||||
.fetch_and_touch_task(Some(self.task_type.clone()))
|
.fetch_and_touch_task(Some(self.task_type.clone()))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(task)) => {
|
Ok(Some(task)) => {
|
||||||
|
let actual_task: Box<dyn AsyncRunnable> =
|
||||||
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
|
// check if task is scheduled or not
|
||||||
|
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||||
|
// program task
|
||||||
|
self.queue.schedule_task(&*actual_task).await?;
|
||||||
|
}
|
||||||
self.sleep_params.maybe_reset_sleep_period();
|
self.sleep_params.maybe_reset_sleep_period();
|
||||||
self.run(task).await?
|
// run scheduled task
|
||||||
|
self.run(task, actual_task).await?;
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
self.sleep().await;
|
self.sleep().await;
|
||||||
|
@ -126,15 +142,20 @@ pub struct AsyncWorkerTest<'a> {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<'a> AsyncWorkerTest<'a> {
|
impl<'a> AsyncWorkerTest<'a> {
|
||||||
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
|
pub async fn run(
|
||||||
let result = self.execute_task(task).await;
|
&mut self,
|
||||||
|
task: Task,
|
||||||
|
actual_task: Box<dyn AsyncRunnable>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let result = self.execute_task(task, actual_task).await;
|
||||||
self.finalize_task(result).await
|
self.finalize_task(result).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
|
async fn execute_task(
|
||||||
let actual_task: Box<dyn AsyncRunnable> =
|
&mut self,
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
task: Task,
|
||||||
|
actual_task: Box<dyn AsyncRunnable>,
|
||||||
|
) -> Result<Task, (Task, String)> {
|
||||||
let task_result = actual_task.run(self.queue).await;
|
let task_result = actual_task.run(self.queue).await;
|
||||||
match task_result {
|
match task_result {
|
||||||
Ok(()) => Ok(task),
|
Ok(()) => Ok(task),
|
||||||
|
@ -193,8 +214,17 @@ impl<'a> AsyncWorkerTest<'a> {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(task)) => {
|
Ok(Some(task)) => {
|
||||||
|
let actual_task: Box<dyn AsyncRunnable> =
|
||||||
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
|
// check if task is scheduled or not
|
||||||
|
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||||
|
// program task
|
||||||
|
self.queue.schedule_task(&*actual_task).await?;
|
||||||
|
}
|
||||||
self.sleep_params.maybe_reset_sleep_period();
|
self.sleep_params.maybe_reset_sleep_period();
|
||||||
self.run(task).await?
|
// run scheduled task
|
||||||
|
self.run(task, actual_task).await?;
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -212,6 +242,7 @@ impl<'a> AsyncWorkerTest<'a> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod async_worker_tests {
|
mod async_worker_tests {
|
||||||
use super::AsyncWorkerTest;
|
use super::AsyncWorkerTest;
|
||||||
|
use crate::async_runnable::Scheduled;
|
||||||
use crate::asynk::async_queue::AsyncQueueTest;
|
use crate::asynk::async_queue::AsyncQueueTest;
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use crate::asynk::async_queue::FangTaskState;
|
use crate::asynk::async_queue::FangTaskState;
|
||||||
|
@ -223,6 +254,8 @@ mod async_worker_tests {
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
use bb8_postgres::tokio_postgres::NoTls;
|
||||||
use bb8_postgres::PostgresConnectionManager;
|
use bb8_postgres::PostgresConnectionManager;
|
||||||
|
use chrono::Duration;
|
||||||
|
use chrono::Utc;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -237,6 +270,23 @@ mod async_worker_tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct WorkerAsyncTaskSchedule {
|
||||||
|
pub number: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncRunnable for WorkerAsyncTaskSchedule {
|
||||||
|
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(7)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct AsyncFailedTask {
|
struct AsyncFailedTask {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
|
@ -290,8 +340,9 @@ mod async_worker_tests {
|
||||||
let transaction = connection.transaction().await.unwrap();
|
let transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||||
|
let actual_task = WorkerAsyncTask { number: 1 };
|
||||||
|
|
||||||
let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await;
|
let task = insert_task(&mut test, &actual_task).await;
|
||||||
let id = task.id;
|
let id = task.id;
|
||||||
|
|
||||||
let mut worker = AsyncWorkerTest::builder()
|
let mut worker = AsyncWorkerTest::builder()
|
||||||
|
@ -299,12 +350,48 @@ mod async_worker_tests {
|
||||||
.retention_mode(RetentionMode::KeepAll)
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
worker.run(task).await.unwrap();
|
worker.run(task, Box::new(actual_task)).await.unwrap();
|
||||||
let task_finished = test.find_task_by_id(id).await.unwrap();
|
let task_finished = test.find_task_by_id(id).await.unwrap();
|
||||||
assert_eq!(id, task_finished.id);
|
assert_eq!(id, task_finished.id);
|
||||||
assert_eq!(FangTaskState::Finished, task_finished.state);
|
assert_eq!(FangTaskState::Finished, task_finished.state);
|
||||||
test.transaction.rollback().await.unwrap();
|
test.transaction.rollback().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn schedule_task_test() {
|
||||||
|
let pool = pool().await;
|
||||||
|
let mut connection = pool.get().await.unwrap();
|
||||||
|
let transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
|
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||||
|
|
||||||
|
let actual_task = WorkerAsyncTaskSchedule { number: 1 };
|
||||||
|
|
||||||
|
let task = test.schedule_task(&actual_task).await.unwrap();
|
||||||
|
|
||||||
|
let id = task.id;
|
||||||
|
|
||||||
|
let mut worker = AsyncWorkerTest::builder()
|
||||||
|
.queue(&mut test as &mut dyn AsyncQueueable)
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
worker.run_tasks_until_none().await.unwrap();
|
||||||
|
|
||||||
|
let task = worker.queue.find_task_by_id(id).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(id, task.id);
|
||||||
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
|
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
|
||||||
|
|
||||||
|
worker.run_tasks_until_none().await.unwrap();
|
||||||
|
|
||||||
|
let task = test.find_task_by_id(id).await.unwrap();
|
||||||
|
assert_eq!(id, task.id);
|
||||||
|
assert_eq!(FangTaskState::Finished, task.state);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn saves_error_for_failed_task() {
|
async fn saves_error_for_failed_task() {
|
||||||
let pool = pool().await;
|
let pool = pool().await;
|
||||||
|
@ -312,8 +399,9 @@ mod async_worker_tests {
|
||||||
let transaction = connection.transaction().await.unwrap();
|
let transaction = connection.transaction().await.unwrap();
|
||||||
|
|
||||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||||
|
let failed_task = AsyncFailedTask { number: 1 };
|
||||||
|
|
||||||
let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await;
|
let task = insert_task(&mut test, &failed_task).await;
|
||||||
let id = task.id;
|
let id = task.id;
|
||||||
|
|
||||||
let mut worker = AsyncWorkerTest::builder()
|
let mut worker = AsyncWorkerTest::builder()
|
||||||
|
@ -321,7 +409,7 @@ mod async_worker_tests {
|
||||||
.retention_mode(RetentionMode::KeepAll)
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
worker.run(task).await.unwrap();
|
worker.run(task, Box::new(failed_task)).await.unwrap();
|
||||||
let task_finished = test.find_task_by_id(id).await.unwrap();
|
let task_finished = test.find_task_by_id(id).await.unwrap();
|
||||||
|
|
||||||
assert_eq!(id, task_finished.id);
|
assert_eq!(id, task_finished.id);
|
||||||
|
@ -367,6 +455,7 @@ mod async_worker_tests {
|
||||||
assert_eq!(FangTaskState::New, task2.state);
|
assert_eq!(FangTaskState::New, task2.state);
|
||||||
test.transaction.rollback().await.unwrap();
|
test.transaction.rollback().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn remove_when_finished() {
|
async fn remove_when_finished() {
|
||||||
let pool = pool().await;
|
let pool = pool().await;
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
pub mod async_queue;
|
pub mod async_queue;
|
||||||
pub mod async_runnable;
|
pub mod async_runnable;
|
||||||
pub mod async_scheduler;
|
|
||||||
pub mod async_worker;
|
pub mod async_worker;
|
||||||
pub mod async_worker_pool;
|
pub mod async_worker_pool;
|
||||||
|
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING *
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL
|
|
|
@ -1 +1 @@
|
||||||
SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
SELECT * FROM fang_periodic_tasks WHERE id = $1
|
|
|
@ -1 +0,0 @@
|
||||||
SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1
|
|
1
src/asynk/queries/find_task_by_uniq_hash.sql
Normal file
1
src/asynk/queries/find_task_by_uniq_hash.sql
Normal file
|
@ -0,0 +1 @@
|
||||||
|
SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1
|
|
@ -1 +0,0 @@
|
||||||
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_millis") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
|
|
@ -1 +1 @@
|
||||||
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
INSERT INTO "fang_tasks" ("metadata", "task_type", "scheduled_at") VALUES ($1, $2, $3) RETURNING *
|
||||||
|
|
1
src/asynk/queries/insert_task_uniq.sql
Normal file
1
src/asynk/queries/insert_task_uniq.sql
Normal file
|
@ -0,0 +1 @@
|
||||||
|
INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) RETURNING *
|
|
@ -1 +0,0 @@
|
||||||
DELETE FROM "fang_periodic_tasks"
|
|
|
@ -1 +0,0 @@
|
||||||
UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
|
|
@ -1 +1 @@
|
||||||
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING *
|
||||||
|
|
|
@ -60,9 +60,16 @@ pub use typetag;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub extern crate serde;
|
pub extern crate serde;
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub extern crate chrono;
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use serde_derive::{Deserialize, Serialize};
|
pub use serde_derive::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
pub use chrono::DateTime;
|
||||||
|
pub use chrono::Utc;
|
||||||
|
pub use cron::Schedule;
|
||||||
|
|
||||||
#[cfg(feature = "blocking")]
|
#[cfg(feature = "blocking")]
|
||||||
pub mod blocking;
|
pub mod blocking;
|
||||||
#[cfg(feature = "blocking")]
|
#[cfg(feature = "blocking")]
|
||||||
|
|
Loading…
Reference in a new issue