diff --git a/Cargo.toml b/Cargo.toml index c9a3918..be7cb7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,26 +14,29 @@ rust-version = "1.62" [features] default = ["blocking", "asynk"] blocking = ["diesel", "diesel-derive-enum", "dotenv"] -asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"] +asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "async-recursion"] [dependencies] +cron = "0.11" chrono = "0.4" +hex = "0.4" log = "0.4" serde = "1" serde_derive = "1.0.141" serde_json = "1" +sha2 = "0.10" thiserror = "1.0" +typed-builder = "0.10" typetag = "0.2" -uuid = { version = "0.8", features = ["v4"] } - +uuid = { version = "1.1", features = ["v4"] } [dependencies.diesel] -version = "1.4" -features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"] +version = "2.0" +features = ["postgres", "serde_json", "chrono", "uuid", "r2d2"] optional = true [dependencies.diesel-derive-enum] -version = "1" +version = "2.0.0-rc.0" features = ["postgres"] optional = true @@ -43,7 +46,7 @@ optional = true [dependencies.bb8-postgres] version = "0.8" -features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"] +features = ["with-serde_json-1" , "with-uuid-1" , "with-chrono-0_4"] optional = true [dependencies.postgres-types] @@ -60,9 +63,6 @@ optional = true version = "0.1" optional = true -[dependencies.typed-builder] -version = "0.10" -optional = true [dependencies.async-recursion] version = "1" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..df544cf --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +db: + docker run --rm -d --name postgres -p 5432:5432 \ + -e POSTGRES_DB=fang \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=postgres \ + postgres:latest +clippy: + cargo clippy --all-features +diesel: + DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run +stop: + docker kill postgres +tests: + DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture + +ignored: + DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture --ignored diff --git a/diesel.toml b/diesel.toml deleted file mode 100644 index 92267c8..0000000 --- a/diesel.toml +++ /dev/null @@ -1,5 +0,0 @@ -# For documentation on how to configure this file, -# see diesel.rs/guides/configuring-diesel-cli - -[print_schema] -file = "src/schema.rs" diff --git a/fang_examples/asynk/simple_async_cron_worker/Cargo.toml b/fang_examples/asynk/simple_async_cron_worker/Cargo.toml new file mode 100644 index 0000000..86b645f --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "simple_async_cron_worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../../" , features = ["asynk"]} +env_logger = "0.9.0" +log = "0.4.0" +tokio = { version = "1", features = ["full"] } diff --git a/fang_examples/asynk/simple_async_cron_worker/src/lib.rs b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs new file mode 100644 index 0000000..ff102c9 --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs @@ -0,0 +1,33 @@ +use fang::async_trait; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_runnable::Error; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::AsyncRunnable; +use fang::Scheduled; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyCronTask {} + +#[async_trait] +#[typetag::serde] +impl AsyncRunnable for MyCronTask { + async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> { + log::info!("CRON!!!!!!!!!!!!!!!",); + + Ok(()) + } + + fn cron(&self) -> Option { + // sec min hour day of month month day of week year + // be careful works only with UTC hour. + // https://www.timeanddate.com/worldclock/timezone/utc + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + + fn uniq(&self) -> bool { + true + } +} diff --git a/fang_examples/asynk/simple_async_cron_worker/src/main.rs b/fang_examples/asynk/simple_async_cron_worker/src/main.rs new file mode 100644 index 0000000..b3c0021 --- /dev/null +++ b/fang_examples/asynk/simple_async_cron_worker/src/main.rs @@ -0,0 +1,41 @@ +use fang::asynk::async_queue::AsyncQueue; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_worker_pool::AsyncWorkerPool; +use fang::AsyncRunnable; +use fang::NoTls; +use simple_async_cron_worker::MyCronTask; +use std::time::Duration; + +#[tokio::main] +async fn main() { + env_logger::init(); + + log::info!("Starting..."); + let max_pool_size: u32 = 3; + let mut queue = AsyncQueue::builder() + .uri("postgres://postgres:postgres@localhost/fang") + .max_pool_size(max_pool_size) + .build(); + + queue.connect(NoTls).await.unwrap(); + log::info!("Queue connected..."); + + let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() + .number_of_workers(10_u32) + .queue(queue.clone()) + .build(); + + log::info!("Pool created ..."); + + pool.start().await; + log::info!("Workers started ..."); + + let task = MyCronTask {}; + + queue + .schedule_task(&task as &dyn AsyncRunnable) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(100)).await; +} diff --git a/fang_examples/simple_async_worker/Cargo.toml b/fang_examples/asynk/simple_async_worker/Cargo.toml similarity index 83% rename from fang_examples/simple_async_worker/Cargo.toml rename to fang_examples/asynk/simple_async_worker/Cargo.toml index cf56b40..8248aed 100644 --- a/fang_examples/simple_async_worker/Cargo.toml +++ b/fang_examples/asynk/simple_async_worker/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fang = { path = "../../" , features = ["asynk"]} +fang = { path = "../../../" , features = ["asynk"]} env_logger = "0.9.0" log = "0.4.0" tokio = { version = "1", features = ["full"] } diff --git a/fang_examples/simple_async_worker/src/lib.rs b/fang_examples/asynk/simple_async_worker/src/lib.rs similarity index 100% rename from fang_examples/simple_async_worker/src/lib.rs rename to fang_examples/asynk/simple_async_worker/src/lib.rs diff --git a/fang_examples/simple_async_worker/src/main.rs b/fang_examples/asynk/simple_async_worker/src/main.rs similarity index 95% rename from fang_examples/simple_async_worker/src/main.rs rename to fang_examples/asynk/simple_async_worker/src/main.rs index ff58e8c..cbe9446 100644 --- a/fang_examples/simple_async_worker/src/main.rs +++ b/fang_examples/asynk/simple_async_worker/src/main.rs @@ -12,11 +12,10 @@ async fn main() { env_logger::init(); log::info!("Starting..."); - let max_pool_size: u32 = 2; + let max_pool_size: u32 = 3; let mut queue = AsyncQueue::builder() .uri("postgres://postgres:postgres@localhost/fang") .max_pool_size(max_pool_size) - .duplicated_tasks(true) .build(); queue.connect(NoTls).await.unwrap(); @@ -40,6 +39,7 @@ async fn main() { .insert_task(&task1 as &dyn AsyncRunnable) .await .unwrap(); + queue .insert_task(&task2 as &dyn AsyncRunnable) .await diff --git a/fang_examples/simple_worker/.gitignore b/fang_examples/blocking/simple_cron_worker/.gitignore similarity index 100% rename from fang_examples/simple_worker/.gitignore rename to fang_examples/blocking/simple_cron_worker/.gitignore diff --git a/fang_examples/blocking/simple_cron_worker/Cargo.toml b/fang_examples/blocking/simple_cron_worker/Cargo.toml new file mode 100644 index 0000000..b29f1c7 --- /dev/null +++ b/fang_examples/blocking/simple_cron_worker/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "simple_cron_worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../../" , features = ["blocking"]} +dotenv = "0.15.0" +env_logger = "0.9.0" +log = "0.4.0" +diesel = { version = "2", features = ["postgres", "r2d2"] } diff --git a/fang_examples/simple_worker/README.md b/fang_examples/blocking/simple_cron_worker/README.md similarity index 100% rename from fang_examples/simple_worker/README.md rename to fang_examples/blocking/simple_cron_worker/README.md diff --git a/fang_examples/blocking/simple_cron_worker/src/lib.rs b/fang_examples/blocking/simple_cron_worker/src/lib.rs new file mode 100644 index 0000000..ab44341 --- /dev/null +++ b/fang_examples/blocking/simple_cron_worker/src/lib.rs @@ -0,0 +1,35 @@ +use fang::runnable::Runnable; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::FangError; +use fang::Queueable; +use fang::Scheduled; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyCronTask {} + +#[typetag::serde] +impl Runnable for MyCronTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + log::info!("CRON !!!!!!!!!!!!!!!!!"); + + Ok(()) + } + + fn task_type(&self) -> String { + "cron_test".to_string() + } + + fn cron(&self) -> Option { + // sec min hour day of month month day of week year + // be careful works only with UTC hour. + // https://www.timeanddate.com/worldclock/timezone/utc + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + + fn uniq(&self) -> bool { + true + } +} diff --git a/fang_examples/blocking/simple_cron_worker/src/main.rs b/fang_examples/blocking/simple_cron_worker/src/main.rs new file mode 100644 index 0000000..76380f0 --- /dev/null +++ b/fang_examples/blocking/simple_cron_worker/src/main.rs @@ -0,0 +1,43 @@ +use diesel::r2d2; +use dotenv::dotenv; +use fang::PgConnection; +use fang::Queue; +use fang::Queueable; +use fang::RetentionMode; +use fang::WorkerPool; +use simple_cron_worker::MyCronTask; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let manager = r2d2::ConnectionManager::::new(database_url); + + r2d2::Pool::builder() + .max_size(pool_size) + .build(manager) + .unwrap() +} + +fn main() { + dotenv().ok(); + + env_logger::init(); + + let queue = Queue::builder().connection_pool(connection_pool(2)).build(); + + let mut worker_pool = WorkerPool::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .number_of_workers(2_u32) + .task_type("cron_test".to_string()) + .build(); + + worker_pool.queue.schedule_task(&MyCronTask {}).unwrap(); + + worker_pool.start().unwrap(); + + sleep(Duration::from_secs(100)) +} diff --git a/fang_examples/blocking/simple_worker/.gitignore b/fang_examples/blocking/simple_worker/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/fang_examples/blocking/simple_worker/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/fang_examples/simple_worker/Cargo.toml b/fang_examples/blocking/simple_worker/Cargo.toml similarity index 58% rename from fang_examples/simple_worker/Cargo.toml rename to fang_examples/blocking/simple_worker/Cargo.toml index b833463..2bd62b0 100644 --- a/fang_examples/simple_worker/Cargo.toml +++ b/fang_examples/blocking/simple_worker/Cargo.toml @@ -1,12 +1,13 @@ [package] name = "simple_worker" version = "0.1.0" -edition = "2018" +edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fang = { path = "../../" , features = ["blocking"]} -signal-hook = "0.3.10" +fang = { path = "../../../" , features = ["blocking"]} dotenv = "0.15.0" env_logger = "0.9.0" +log = "0.4.0" +diesel = { version = "2", features = ["postgres", "r2d2"] } diff --git a/fang_examples/blocking/simple_worker/README.md b/fang_examples/blocking/simple_worker/README.md new file mode 100644 index 0000000..f1c3f1b --- /dev/null +++ b/fang_examples/blocking/simple_worker/README.md @@ -0,0 +1,3 @@ +## Simple example + +The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata. diff --git a/fang_examples/blocking/simple_worker/src/lib.rs b/fang_examples/blocking/simple_worker/src/lib.rs new file mode 100644 index 0000000..4129591 --- /dev/null +++ b/fang_examples/blocking/simple_worker/src/lib.rs @@ -0,0 +1,96 @@ +use fang::runnable::Runnable; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::FangError; +use fang::Queueable; +use std::thread; +use std::time::Duration; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyTask { + pub number: u16, + pub current_thread_name: String, +} + +impl MyTask { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + + Self { + number, + current_thread_name, + } + } +} + +#[typetag::serde] +impl Runnable for MyTask { + fn run(&self, queue: &dyn Queueable) -> Result<(), FangError> { + let new_task = MyTask::new(self.number + 1); + + log::info!( + "The number is {}, thread name {}", + self.number, + self.current_thread_name + ); + + queue.insert_task(&new_task).unwrap(); + + thread::sleep(Duration::from_secs(2)); + + Ok(()) + } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyFailingTask { + pub number: u16, + pub current_thread_name: String, +} + +impl MyFailingTask { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + Self { + number, + current_thread_name, + } + } +} + +#[typetag::serde] +impl Runnable for MyFailingTask { + fn run(&self, queue: &dyn Queueable) -> Result<(), FangError> { + let new_task = MyFailingTask::new(self.number + 1); + + queue.insert_task(&new_task).unwrap(); + + log::info!( + "Failing task number {}, Thread name:{}", + self.number, + self.current_thread_name + ); + + thread::sleep(Duration::from_secs(3)); + + let b = true; + + if b { + panic!("Hello!"); + } else { + Ok(()) + } + } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } +} diff --git a/fang_examples/blocking/simple_worker/src/main.rs b/fang_examples/blocking/simple_worker/src/main.rs new file mode 100644 index 0000000..a162047 --- /dev/null +++ b/fang_examples/blocking/simple_worker/src/main.rs @@ -0,0 +1,50 @@ +use diesel::r2d2; +use dotenv::dotenv; +use fang::PgConnection; +use fang::Queue; +use fang::Queueable; +use fang::RetentionMode; +use fang::WorkerPool; +use simple_worker::MyFailingTask; +use simple_worker::MyTask; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let manager = r2d2::ConnectionManager::::new(database_url); + + r2d2::Pool::builder() + .max_size(pool_size) + .build(manager) + .unwrap() +} + +fn main() { + dotenv().ok(); + + env_logger::init(); + + let queue = Queue::builder().connection_pool(connection_pool(3)).build(); + + let mut worker_pool = WorkerPool::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .number_of_workers(3_u32) + .task_type("worker_pool_test".to_string()) + .build(); + + worker_pool.queue.insert_task(&MyTask::new(1)).unwrap(); + worker_pool.queue.insert_task(&MyTask::new(1000)).unwrap(); + + worker_pool + .queue + .insert_task(&MyFailingTask::new(5000)) + .unwrap(); + + worker_pool.start().unwrap(); + + sleep(Duration::from_secs(100)) +} diff --git a/fang_examples/simple_worker/src/lib.rs b/fang_examples/simple_worker/src/lib.rs deleted file mode 100644 index f79c02d..0000000 --- a/fang_examples/simple_worker/src/lib.rs +++ /dev/null @@ -1,44 +0,0 @@ -use fang::serde::{Deserialize, Serialize}; -use fang::typetag; -use fang::Error; -use fang::PgConnection; -use fang::Queue; -use fang::Runnable; -use std::thread; -use std::time::Duration; - -#[derive(Serialize, Deserialize)] -#[serde(crate = "fang::serde")] -pub struct MyJob { - pub number: u16, - pub current_thread_name: String, -} - -impl MyJob { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } -} - -#[typetag::serde] -impl Runnable for MyJob { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_job = MyJob::new(self.number + 1); - - Queue::push_task_query(connection, &new_job).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "worker_pool_test".to_string() - } -} diff --git a/fang_examples/simple_worker/src/main.rs b/fang_examples/simple_worker/src/main.rs deleted file mode 100644 index ae66e39..0000000 --- a/fang_examples/simple_worker/src/main.rs +++ /dev/null @@ -1,41 +0,0 @@ -use dotenv::dotenv; -use fang::Queue; -use fang::RetentionMode; -use fang::WorkerParams; -use fang::WorkerPool; -use signal_hook::{consts::signal::*, iterator::Signals}; -use simple_worker::MyJob; - -fn main() { - dotenv().ok(); - - env_logger::init(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - - let mut worker_pool = WorkerPool::new_with_params(2, worker_params); - worker_pool.start().unwrap(); - - let queue = Queue::new(); - - queue.push_task(&MyJob::new(1)).unwrap(); - queue.push_task(&MyJob::new(1000)).unwrap(); - - let mut signals = Signals::new(&[SIGINT, SIGTERM]).unwrap(); - for sig in signals.forever() { - match sig { - SIGINT => { - println!("Received SIGINT"); - worker_pool.shutdown().unwrap(); - break; - } - SIGTERM => { - println!("Received SIGTERM"); - worker_pool.shutdown().unwrap(); - break; - } - _ => unreachable!("Received unexpected signal: {:?}", sig), - } - } -} diff --git a/migrations/2021-06-05-112912_create_fang_tasks/down.sql b/migrations/2021-06-05-112912_create_fang_tasks/down.sql deleted file mode 100644 index e8becd4..0000000 --- a/migrations/2021-06-05-112912_create_fang_tasks/down.sql +++ /dev/null @@ -1,2 +0,0 @@ -DROP TABLE fang_tasks; -DROP TYPE fang_task_state; diff --git a/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql b/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql deleted file mode 100644 index 3b764b4..0000000 --- a/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE fang_periodic_tasks; diff --git a/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql b/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql deleted file mode 100644 index a35eb4d..0000000 --- a/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE fang_periodic_tasks ( - id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), - metadata jsonb NOT NULL, - period_in_millis BIGINT NOT NULL, - scheduled_at TIMESTAMP WITH TIME ZONE, - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() -); - -CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at); -CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata); diff --git a/migrations/2022-08-20-151615_create_fang_tasks/down.sql b/migrations/2022-08-20-151615_create_fang_tasks/down.sql new file mode 100644 index 0000000..3cd3345 --- /dev/null +++ b/migrations/2022-08-20-151615_create_fang_tasks/down.sql @@ -0,0 +1 @@ +DROP TABLE fang_tasks; diff --git a/migrations/2021-06-05-112912_create_fang_tasks/up.sql b/migrations/2022-08-20-151615_create_fang_tasks/up.sql similarity index 61% rename from migrations/2021-06-05-112912_create_fang_tasks/up.sql rename to migrations/2022-08-20-151615_create_fang_tasks/up.sql index 19112eb..ebebb6e 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/up.sql +++ b/migrations/2022-08-20-151615_create_fang_tasks/up.sql @@ -6,13 +6,15 @@ CREATE TABLE fang_tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), metadata jsonb NOT NULL, error_message TEXT, - state fang_task_state default 'new' NOT NULL, - task_type VARCHAR default 'common' NOT NULL, + state fang_task_state DEFAULT 'new' NOT NULL, + task_type VARCHAR DEFAULT 'common' NOT NULL, + uniq_hash CHAR(64), + scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); -CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); -CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata); +CREATE INDEX fang_tasks_scheduled_at_index ON fang_tasks(scheduled_at); +CREATE INDEX fang_tasks_uniq_hash ON fang_tasks(uniq_hash); diff --git a/src/asynk.rs b/src/asynk.rs index 77cc6d2..a75dd03 100644 --- a/src/asynk.rs +++ b/src/asynk.rs @@ -1,8 +1,9 @@ pub mod async_queue; pub mod async_runnable; -pub mod async_scheduler; pub mod async_worker; pub mod async_worker_pool; +pub use async_queue::*; pub use async_runnable::AsyncRunnable; -pub use async_runnable::Error as AsyncError; +pub use async_worker::*; +pub use async_worker_pool::*; diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index f51b2d8..c837379 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,5 +1,6 @@ use crate::asynk::async_runnable::AsyncRunnable; -use crate::asynk::async_runnable::Error as FangError; +use crate::CronError; +use crate::Scheduled::*; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::bb8::RunError; @@ -9,10 +10,11 @@ use bb8_postgres::tokio_postgres::Socket; use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; use chrono::DateTime; -use chrono::Duration; use chrono::Utc; +use cron::Schedule; use postgres_types::{FromSql, ToSql}; -use std::time::Duration as StdDuration; +use sha2::{Digest, Sha256}; +use std::str::FromStr; use thiserror::Error; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -21,21 +23,17 @@ use uuid::Uuid; use bb8_postgres::tokio_postgres::tls::NoTls; const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql"); -const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql"); -const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql"); +const INSERT_TASK_UNIQ_QUERY: &str = include_str!("queries/insert_task_uniq.sql"); const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql"); const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql"); const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql"); +const REMOVE_ALL_SCHEDULED_TASK_QUERY: &str = + include_str!("queries/remove_all_scheduled_tasks.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); -const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql"); -const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql"); - -#[cfg(test)] +const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql"); const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql"); -#[cfg(test)] -const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql"); pub const DEFAULT_TASK_TYPE: &str = "common"; @@ -71,21 +69,9 @@ pub struct Task { #[builder(setter(into))] pub task_type: String, #[builder(setter(into))] - pub created_at: DateTime, + pub uniq_hash: Option, #[builder(setter(into))] - pub updated_at: DateTime, -} - -#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] -pub struct PeriodicTask { - #[builder(setter(into))] - pub id: Uuid, - #[builder(setter(into))] - pub metadata: serde_json::Value, - #[builder(setter(into))] - pub period_in_millis: i64, - #[builder(setter(into))] - pub scheduled_at: Option>, + pub scheduled_at: DateTime, #[builder(setter(into))] pub created_at: DateTime, #[builder(setter(into))] @@ -100,6 +86,8 @@ pub enum AsyncQueueError { PgError(#[from] bb8_postgres::tokio_postgres::Error), #[error(transparent)] SerdeError(#[from] serde_json::Error), + #[error(transparent)] + CronError(#[from] CronError), #[error("returned invalid result (expected {expected:?}, found {found:?})")] ResultError { expected: u64, found: u64 }, #[error( @@ -110,12 +98,9 @@ pub enum AsyncQueueError { TimeError, } -impl From for FangError { - fn from(error: AsyncQueueError) -> Self { - let message = format!("{:?}", error); - FangError { - description: message, - } +impl From for AsyncQueueError { + fn from(error: cron::error::Error) -> Self { + AsyncQueueError::CronError(CronError::LibraryError(error)) } } @@ -127,12 +112,17 @@ pub trait AsyncQueueable: Send { ) -> Result, AsyncQueueError>; async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result; + async fn remove_all_tasks(&mut self) -> Result; + async fn remove_all_scheduled_tasks(&mut self) -> Result; + async fn remove_task(&mut self, task: Task) -> Result; async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + async fn find_task_by_id(&mut self, id: Uuid) -> Result; + async fn update_task_state( &mut self, task: Task, @@ -142,22 +132,7 @@ pub trait AsyncQueueable: Send { async fn fail_task(&mut self, task: Task, error_message: &str) -> Result; - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError>; - - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result; - - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result; + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; } #[derive(TypedBuilder, Debug, Clone)] @@ -174,8 +149,6 @@ where uri: String, #[builder(setter(into))] max_pool_size: u32, - #[builder(default = false, setter(into))] - duplicated_tasks: bool, #[builder(default = false, setter(skip))] connected: bool, } @@ -185,47 +158,24 @@ where pub struct AsyncQueueTest<'a> { #[builder(setter(into))] pub transaction: Transaction<'a>, - #[builder(default = false, setter(into))] - pub duplicated_tasks: bool, -} - -#[cfg(test)] -impl<'a> AsyncQueueTest<'a> { - pub async fn find_task_by_id(&mut self, id: Uuid) -> Result { - let row: Row = self - .transaction - .query_one(FIND_TASK_BY_ID_QUERY, &[&id]) - .await?; - - let task = AsyncQueue::::row_to_task(row); - Ok(task) - } - pub async fn find_periodic_task_by_id( - &mut self, - id: Uuid, - ) -> Result { - let row: Row = self - .transaction - .query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id]) - .await?; - - let task = AsyncQueue::::row_to_periodic_task(row); - Ok(task) - } } #[cfg(test)] #[async_trait] impl AsyncQueueable for AsyncQueueTest<'_> { + async fn find_task_by_id(&mut self, id: Uuid) -> Result { + let transaction = &mut self.transaction; + + AsyncQueue::::find_task_by_id_query(transaction, id).await + } + async fn fetch_and_touch_task( &mut self, task_type: Option, ) -> Result, AsyncQueueError> { let transaction = &mut self.transaction; - let task = AsyncQueue::::fetch_and_touch_task_query(transaction, task_type).await?; - - Ok(task) + AsyncQueue::::fetch_and_touch_task_query(transaction, task_type).await } async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { @@ -233,84 +183,92 @@ impl AsyncQueueable for AsyncQueueTest<'_> { let metadata = serde_json::to_value(task)?; - let task: Task = if self.duplicated_tasks { - AsyncQueue::::insert_task_query(transaction, metadata, &task.task_type()).await? + let task: Task = if !task.uniq() { + AsyncQueue::::insert_task_query( + transaction, + metadata, + &task.task_type(), + Utc::now(), + ) + .await? } else { AsyncQueue::::insert_task_if_not_exist_query( transaction, metadata, &task.task_type(), + Utc::now(), ) .await? }; Ok(task) } - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result { - let transaction = &mut self.transaction; - - let periodic_task = - AsyncQueue::::schedule_next_task_query(transaction, periodic_task).await?; - - Ok(periodic_task) - } - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result { + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { let transaction = &mut self.transaction; let metadata = serde_json::to_value(task)?; - let periodic_task = AsyncQueue::::insert_periodic_task_query( - transaction, - metadata, - timestamp, - period, - ) - .await?; + let scheduled_at = match task.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); - Ok(periodic_task) - } + iterator + .next() + .ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, + None => { + return Err(AsyncQueueError::CronError( + CronError::TaskNotSchedulableError, + )); + } + }; - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - let transaction = &mut self.transaction; + let task: Task = if !task.uniq() { + AsyncQueue::::insert_task_query( + transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + } else { + AsyncQueue::::insert_task_if_not_exist_query( + transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + }; - let periodic_task = - AsyncQueue::::fetch_periodic_tasks_query(transaction, error_margin).await?; - - Ok(periodic_task) + Ok(task) } async fn remove_all_tasks(&mut self) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_all_tasks_query(transaction).await?; + AsyncQueue::::remove_all_tasks_query(transaction).await + } - Ok(result) + async fn remove_all_scheduled_tasks(&mut self) -> Result { + let transaction = &mut self.transaction; + + AsyncQueue::::remove_all_scheduled_tasks_query(transaction).await } async fn remove_task(&mut self, task: Task) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_task_query(transaction, task).await?; - - Ok(result) + AsyncQueue::::remove_task_query(transaction, task).await } async fn remove_tasks_type(&mut self, task_type: &str) -> Result { let transaction = &mut self.transaction; - let result = AsyncQueue::::remove_tasks_type_query(transaction, task_type).await?; - - Ok(result) + AsyncQueue::::remove_tasks_type_query(transaction, task_type).await } async fn update_task_state( @@ -320,9 +278,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> { ) -> Result { let transaction = &mut self.transaction; - let task = AsyncQueue::::update_task_state_query(transaction, task, state).await?; - - Ok(task) + AsyncQueue::::update_task_state_query(transaction, task, state).await } async fn fail_task( @@ -332,9 +288,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> { ) -> Result { let transaction = &mut self.transaction; - let task = AsyncQueue::::fail_task_query(transaction, task, error_message).await?; - - Ok(task) + AsyncQueue::::fail_task_query(transaction, task, error_message).await } } @@ -370,6 +324,18 @@ where Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await } + async fn remove_all_scheduled_tasks_query( + transaction: &mut Transaction<'_>, + ) -> Result { + Self::execute_query( + transaction, + REMOVE_ALL_SCHEDULED_TASK_QUERY, + &[&Utc::now()], + None, + ) + .await + } + async fn remove_task_query( transaction: &mut Transaction<'_>, task: Task, @@ -384,6 +350,16 @@ where Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await } + async fn find_task_by_id_query( + transaction: &mut Transaction<'_>, + id: Uuid, + ) -> Result { + let row: Row = transaction.query_one(FIND_TASK_BY_ID_QUERY, &[&id]).await?; + + let task = Self::row_to_task(row); + Ok(task) + } + async fn fail_task_query( transaction: &mut Transaction<'_>, task: Task, @@ -435,7 +411,7 @@ where task_type: &str, ) -> Result { let row: Row = transaction - .query_one(FETCH_TASK_TYPE_QUERY, &[&task_type]) + .query_one(FETCH_TASK_TYPE_QUERY, &[&task_type, &Utc::now()]) .await?; let task = Self::row_to_task(row); @@ -461,72 +437,32 @@ where transaction: &mut Transaction<'_>, metadata: serde_json::Value, task_type: &str, + scheduled_at: DateTime, ) -> Result { let row: Row = transaction - .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type]) + .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type, &scheduled_at]) .await?; let task = Self::row_to_task(row); Ok(task) } - async fn schedule_next_task_query( - transaction: &mut Transaction<'_>, - periodic_task: PeriodicTask, - ) -> Result { - let updated_at = Utc::now(); - let scheduled_at = updated_at + Duration::milliseconds(periodic_task.period_in_millis); - - let row: Row = transaction - .query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at]) - .await?; - - let periodic_task = Self::row_to_periodic_task(row); - Ok(periodic_task) - } - - async fn insert_periodic_task_query( + async fn insert_task_uniq_query( transaction: &mut Transaction<'_>, metadata: serde_json::Value, - timestamp: DateTime, - period: i64, - ) -> Result { + task_type: &str, + scheduled_at: DateTime, + ) -> Result { + let uniq_hash = Self::calculate_hash(metadata.to_string()); + let row: Row = transaction .query_one( - INSERT_PERIODIC_TASK_QUERY, - &[&metadata, ×tamp, &period], + INSERT_TASK_UNIQ_QUERY, + &[&metadata, &task_type, &uniq_hash, &scheduled_at], ) .await?; - let periodic_task = Self::row_to_periodic_task(row); - Ok(periodic_task) - } - async fn fetch_periodic_tasks_query( - transaction: &mut Transaction<'_>, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - let current_time = Utc::now(); - - let margin: Duration = match Duration::from_std(error_margin) { - Ok(value) => Ok(value), - Err(_) => Err(AsyncQueueError::TimeError), - }?; - - let low_limit = current_time - margin; - let high_limit = current_time + margin; - let rows: Vec = transaction - .query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit]) - .await?; - - let periodic_tasks: Vec = rows - .into_iter() - .map(|row| Self::row_to_periodic_task(row)) - .collect(); - - if periodic_tasks.is_empty() { - Ok(None) - } else { - Ok(Some(periodic_tasks)) - } + let task = Self::row_to_task(row); + Ok(task) } async fn execute_query( @@ -552,19 +488,31 @@ where transaction: &mut Transaction<'_>, metadata: serde_json::Value, task_type: &str, + scheduled_at: DateTime, ) -> Result { - match Self::find_task_by_metadata_query(transaction, &metadata).await { + match Self::find_task_by_uniq_hash_query(transaction, &metadata).await { Some(task) => Ok(task), - None => Self::insert_task_query(transaction, metadata, task_type).await, + None => { + Self::insert_task_uniq_query(transaction, metadata, task_type, scheduled_at).await + } } } - async fn find_task_by_metadata_query( + fn calculate_hash(json: String) -> String { + let mut hasher = Sha256::new(); + hasher.update(json.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + } + + async fn find_task_by_uniq_hash_query( transaction: &mut Transaction<'_>, metadata: &serde_json::Value, ) -> Option { + let uniq_hash = Self::calculate_hash(metadata.to_string()); + let result = transaction - .query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata]) + .query_one(FIND_TASK_BY_UNIQ_HASH_QUERY, &[&uniq_hash]) .await; match result { @@ -573,47 +521,29 @@ where } } - fn row_to_periodic_task(row: Row) -> PeriodicTask { - let id: Uuid = row.get("id"); - let metadata: serde_json::Value = row.get("metadata"); - let period_in_millis: i64 = row.get("period_in_millis"); - let scheduled_at: Option> = match row.try_get("scheduled_at") { - Ok(datetime) => Some(datetime), - Err(_) => None, - }; - let created_at: DateTime = row.get("created_at"); - let updated_at: DateTime = row.get("updated_at"); - - PeriodicTask::builder() - .id(id) - .metadata(metadata) - .period_in_millis(period_in_millis) - .scheduled_at(scheduled_at) - .created_at(created_at) - .updated_at(updated_at) - .build() - } - fn row_to_task(row: Row) -> Task { let id: Uuid = row.get("id"); let metadata: serde_json::Value = row.get("metadata"); - let error_message: Option = match row.try_get("error_message") { - Ok(error_message) => Some(error_message), - Err(_) => None, - }; + + let error_message: Option = row.try_get("error_message").ok(); + + let uniq_hash: Option = row.try_get("uniq_hash").ok(); let state: FangTaskState = row.get("state"); let task_type: String = row.get("task_type"); let created_at: DateTime = row.get("created_at"); let updated_at: DateTime = row.get("updated_at"); + let scheduled_at: DateTime = row.get("scheduled_at"); Task::builder() .id(id) .metadata(metadata) .error_message(error_message) .state(state) + .uniq_hash(uniq_hash) .task_type(task_type) .created_at(created_at) .updated_at(updated_at) + .scheduled_at(scheduled_at) .build() } } @@ -626,6 +556,17 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { + async fn find_task_by_id(&mut self, id: Uuid) -> Result { + let mut connection = self.pool.as_ref().unwrap().get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::find_task_by_id_query(&mut transaction, id).await?; + + transaction.commit().await?; + + Ok(task) + } + async fn fetch_and_touch_task( &mut self, task_type: Option, @@ -648,11 +589,17 @@ where let metadata = serde_json::to_value(task)?; - let task: Task = if self.duplicated_tasks { - Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await? - } else { - Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type()) + let task: Task = if !task.uniq() { + Self::insert_task_query(&mut transaction, metadata, &task.task_type(), Utc::now()) .await? + } else { + Self::insert_task_if_not_exist_query( + &mut transaction, + metadata, + &task.task_type(), + Utc::now(), + ) + .await? }; transaction.commit().await?; @@ -660,55 +607,44 @@ where Ok(task) } - async fn insert_periodic_task( - &mut self, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period: i64, - ) -> Result { + async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result { self.check_if_connection()?; let mut connection = self.pool.as_ref().unwrap().get().await?; let mut transaction = connection.transaction().await?; - let metadata = serde_json::to_value(task)?; - let periodic_task = - Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?; + let scheduled_at = match task.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); + iterator + .next() + .ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, + None => { + return Err(AsyncQueueError::CronError( + CronError::TaskNotSchedulableError, + )); + } + }; + let task: Task = if !task.uniq() { + Self::insert_task_query(&mut transaction, metadata, &task.task_type(), scheduled_at) + .await? + } else { + Self::insert_task_if_not_exist_query( + &mut transaction, + metadata, + &task.task_type(), + scheduled_at, + ) + .await? + }; transaction.commit().await?; - - Ok(periodic_task) - } - - async fn schedule_next_task( - &mut self, - periodic_task: PeriodicTask, - ) -> Result { - self.check_if_connection()?; - let mut connection = self.pool.as_ref().unwrap().get().await?; - let mut transaction = connection.transaction().await?; - - let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?; - - transaction.commit().await?; - - Ok(periodic_task) - } - - async fn fetch_periodic_tasks( - &mut self, - error_margin: StdDuration, - ) -> Result>, AsyncQueueError> { - self.check_if_connection()?; - let mut connection = self.pool.as_ref().unwrap().get().await?; - let mut transaction = connection.transaction().await?; - - let periodic_task = - Self::fetch_periodic_tasks_query(&mut transaction, error_margin).await?; - - transaction.commit().await?; - - Ok(periodic_task) + Ok(task) } async fn remove_all_tasks(&mut self) -> Result { @@ -723,6 +659,18 @@ where Ok(result) } + async fn remove_all_scheduled_tasks(&mut self) -> Result { + self.check_if_connection()?; + let mut connection = self.pool.as_ref().unwrap().get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_all_scheduled_tasks_query(&mut transaction).await?; + + transaction.commit().await?; + + Ok(result) + } + async fn remove_task(&mut self, task: Task) -> Result { self.check_if_connection()?; let mut connection = self.pool.as_ref().unwrap().get().await?; @@ -784,12 +732,17 @@ mod async_queue_tests { use super::AsyncQueueable; use super::FangTaskState; use super::Task; - use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; + use crate::FangError; + use crate::Scheduled; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; use bb8_postgres::PostgresConnectionManager; + use chrono::DateTime; + use chrono::Duration; + use chrono::SubsecRound; + use chrono::Utc; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -800,11 +753,30 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } + #[derive(Serialize, Deserialize)] + struct AsyncTaskSchedule { + pub number: u16, + pub datetime: String, + } + + #[typetag::serde] + #[async_trait] + impl AsyncRunnable for AsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + Ok(()) + } + + fn cron(&self) -> Option { + let datetime = self.datetime.parse::>().ok()?; + Some(Scheduled::ScheduleOnce(datetime)) + } + } + #[tokio::test] async fn insert_task_creates_new_task() { let pool = pool().await; @@ -912,6 +884,60 @@ mod async_queue_tests { test.transaction.rollback().await.unwrap(); } + #[tokio::test] + async fn schedule_task_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + + let task = &AsyncTaskSchedule { + number: 1, + datetime: datetime.to_string(), + }; + + let task = test.schedule_task(task).await.unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTaskSchedule"), type_task); + assert_eq!(task.scheduled_at, datetime); + } + + #[tokio::test] + async fn remove_all_scheduled_tasks_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + + let task1 = &AsyncTaskSchedule { + number: 1, + datetime: datetime.to_string(), + }; + + let task2 = &AsyncTaskSchedule { + number: 2, + datetime: datetime.to_string(), + }; + + test.schedule_task(task1).await.unwrap(); + test.schedule_task(task2).await.unwrap(); + + let number = test.remove_all_scheduled_tasks().await.unwrap(); + + assert_eq!(2, number); + } + #[tokio::test] async fn fetch_and_touch_test() { let pool = pool().await; diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index e14f698..49b0122 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -1,19 +1,55 @@ +use crate::async_queue::AsyncQueueError; use crate::asynk::async_queue::AsyncQueueable; +use crate::FangError; +use crate::Scheduled; use async_trait::async_trait; +use bb8_postgres::bb8::RunError; +use bb8_postgres::tokio_postgres::Error as TokioPostgresError; +use serde_json::Error as SerdeError; const COMMON_TYPE: &str = "common"; -#[derive(Debug)] -pub struct Error { - pub description: String, +impl From for FangError { + fn from(error: AsyncQueueError) -> Self { + let message = format!("{:?}", error); + FangError { + description: message, + } + } +} + +impl From for FangError { + fn from(error: TokioPostgresError) -> Self { + Self::from(AsyncQueueError::PgError(error)) + } +} + +impl From> for FangError { + fn from(error: RunError) -> Self { + Self::from(AsyncQueueError::PoolError(error)) + } +} + +impl From for FangError { + fn from(error: SerdeError) -> Self { + Self::from(AsyncQueueError::SerdeError(error)) + } } #[typetag::serde(tag = "type")] #[async_trait] pub trait AsyncRunnable: Send + Sync { - async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>; + async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>; fn task_type(&self) -> String { COMMON_TYPE.to_string() } + + fn uniq(&self) -> bool { + false + } + + fn cron(&self) -> Option { + None + } } diff --git a/src/asynk/async_scheduler.rs b/src/asynk/async_scheduler.rs deleted file mode 100644 index cb4961c..0000000 --- a/src/asynk/async_scheduler.rs +++ /dev/null @@ -1,260 +0,0 @@ -use crate::asynk::async_queue::AsyncQueueable; -use crate::asynk::async_queue::PeriodicTask; -use crate::asynk::AsyncError as Error; -use crate::asynk::AsyncRunnable; -use async_recursion::async_recursion; -use log::error; -use std::time::Duration; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use typed_builder::TypedBuilder; - -#[derive(TypedBuilder, Clone)] -pub struct Scheduler -where - AQueue: AsyncQueueable + Clone + Sync + 'static, -{ - #[builder(setter(into))] - pub check_period: Duration, - #[builder(setter(into))] - pub error_margin: Duration, - #[builder(setter(into))] - pub queue: AQueue, - #[builder(default = 0, setter(into))] - pub number_of_restarts: u32, -} - -impl Scheduler -where - AQueue: AsyncQueueable + Clone + Sync + 'static, -{ - #[async_recursion(?Send)] - pub async fn start(&mut self) -> Result<(), Error> { - let join_handle: JoinHandle> = self.schedule_loop().await; - - match join_handle.await { - Err(err) => { - error!( - "Scheduler panicked, restarting {:?}. Number of restarts {}", - err, self.number_of_restarts - ); - self.number_of_restarts += 1; - sleep(Duration::from_secs(1)).await; - self.start().await - } - Ok(task_res) => match task_res { - Err(err) => { - error!( - "Scheduler failed, restarting {:?}. Number of restarts {}", - err, self.number_of_restarts - ); - self.number_of_restarts += 1; - self.start().await - } - Ok(_) => { - error!( - "Scheduler stopped. restarting. Number of restarts {}", - self.number_of_restarts - ); - self.number_of_restarts += 1; - self.start().await - } - }, - } - } - - pub async fn schedule_loop(&mut self) -> JoinHandle> { - let mut scheduler = self.clone(); - tokio::spawn(async move { - let sleep_duration = scheduler.check_period; - - loop { - scheduler.schedule().await?; - - sleep(sleep_duration).await; - } - }) - } - - pub async fn schedule(&mut self) -> Result<(), Error> { - if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin).await? { - for task in tasks { - self.process_task(task).await?; - } - }; - Ok(()) - } - - async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { - match task.scheduled_at { - None => { - self.queue.schedule_next_task(task).await?; - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.insert_task(&*actual_task).await?; - - self.queue.schedule_next_task(task).await?; - } - } - Ok(()) - } -} - -#[cfg(test)] -#[derive(TypedBuilder)] -pub struct SchedulerTest<'a> { - #[builder(setter(into))] - pub check_period: Duration, - #[builder(setter(into))] - pub error_margin: Duration, - #[builder(setter(into))] - pub queue: &'a mut dyn AsyncQueueable, - #[builder(default = 0, setter(into))] - pub number_of_restarts: u32, -} - -#[cfg(test)] -impl<'a> SchedulerTest<'a> { - async fn schedule_test(&mut self) -> Result<(), Error> { - let sleep_duration = self.check_period; - - loop { - match self.queue.fetch_periodic_tasks(self.error_margin).await? { - Some(tasks) => { - for task in tasks { - self.process_task(task).await?; - } - - return Ok(()); - } - None => { - sleep(sleep_duration).await; - } - }; - } - } - - async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { - match task.scheduled_at { - None => { - self.queue.schedule_next_task(task).await?; - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.insert_task(&*actual_task).await?; - - self.queue.schedule_next_task(task).await?; - } - } - Ok(()) - } -} - -#[cfg(test)] -mod async_scheduler_tests { - use super::SchedulerTest; - use crate::asynk::async_queue::AsyncQueueTest; - use crate::asynk::async_queue::AsyncQueueable; - use crate::asynk::async_queue::PeriodicTask; - use crate::asynk::AsyncError as Error; - use crate::asynk::AsyncRunnable; - use async_trait::async_trait; - use bb8_postgres::bb8::Pool; - use bb8_postgres::tokio_postgres::NoTls; - use bb8_postgres::PostgresConnectionManager; - use chrono::DateTime; - use chrono::Duration as OtherDuration; - use chrono::Utc; - use serde::{Deserialize, Serialize}; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct AsyncScheduledTask { - pub number: u16, - } - - #[typetag::serde] - #[async_trait] - impl AsyncRunnable for AsyncScheduledTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { - Ok(()) - } - fn task_type(&self) -> String { - "schedule".to_string() - } - } - - #[tokio::test] - async fn schedules_tasks() { - let pool = pool().await; - let mut connection = pool.get().await.unwrap(); - let transaction = connection.transaction().await.unwrap(); - - let mut test = AsyncQueueTest::builder().transaction(transaction).build(); - - let schedule_in_future = Utc::now() + OtherDuration::seconds(5); - - let _periodic_task = insert_periodic_task( - &mut test, - &AsyncScheduledTask { number: 1 }, - schedule_in_future, - 10000, - ) - .await; - - let check_period: u64 = 1; - let error_margin_seconds: u64 = 2; - - let mut scheduler = SchedulerTest::builder() - .check_period(Duration::from_secs(check_period)) - .error_margin(Duration::from_secs(error_margin_seconds)) - .queue(&mut test as &mut dyn AsyncQueueable) - .build(); - // Scheduler start tricky not loop :) - scheduler.schedule_test().await.unwrap(); - - let task = scheduler - .queue - .fetch_and_touch_task(Some("schedule".to_string())) - .await - .unwrap() - .unwrap(); - - let metadata = task.metadata.as_object().unwrap(); - let number = metadata["number"].as_u64(); - let type_task = metadata["type"].as_str(); - - let runnable_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - assert_eq!("schedule", runnable_task.task_type()); - assert_eq!(Some("AsyncScheduledTask"), type_task); - assert_eq!(Some(1), number); - } - - async fn insert_periodic_task( - test: &mut AsyncQueueTest<'_>, - task: &dyn AsyncRunnable, - timestamp: DateTime, - period_in_millis: i64, - ) -> PeriodicTask { - test.insert_periodic_task(task, timestamp, period_in_millis) - .await - .unwrap() - } - - async fn pool() -> Pool> { - let pg_mgr = PostgresConnectionManager::new_from_stringlike( - "postgres://postgres:postgres@localhost/fang", - NoTls, - ) - .unwrap(); - - Pool::builder().build(pg_mgr).await.unwrap() - } -} diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index 39a4605..6dbd7e8 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -3,7 +3,8 @@ use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_queue::Task; use crate::asynk::async_queue::DEFAULT_TASK_TYPE; use crate::asynk::async_runnable::AsyncRunnable; -use crate::asynk::AsyncError as Error; +use crate::FangError; +use crate::Scheduled::*; use crate::{RetentionMode, SleepParams}; use log::error; use typed_builder::TypedBuilder; @@ -27,15 +28,20 @@ impl AsyncWorker where AQueue: AsyncQueueable + Clone + Sync + 'static, { - pub async fn run(&mut self, task: Task) -> Result<(), Error> { - let result = self.execute_task(task).await; + pub async fn run( + &mut self, + task: Task, + actual_task: Box, + ) -> Result<(), FangError> { + let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } - async fn execute_task(&mut self, task: Task) -> Result { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - + async fn execute_task( + &mut self, + task: Task, + actual_task: Box, + ) -> Result { let task_result = actual_task.run(&mut self.queue).await; match task_result { Ok(()) => Ok(task), @@ -43,7 +49,10 @@ where } } - async fn finalize_task(&mut self, result: Result) -> Result<(), Error> { + async fn finalize_task( + &mut self, + result: Result, + ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { Ok(task) => { @@ -86,16 +95,26 @@ where tokio::time::sleep(self.sleep_params.sleep_period).await; } - pub async fn run_tasks(&mut self) -> Result<(), Error> { + pub async fn run_tasks(&mut self) -> Result<(), FangError> { loop { + //fetch task match self .queue .fetch_and_touch_task(Some(self.task_type.clone())) .await { Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task).await?; + } self.sleep_params.maybe_reset_sleep_period(); - self.run(task).await? + // run scheduled task + self.run(task, actual_task).await?; } Ok(None) => { self.sleep().await; @@ -126,15 +145,20 @@ pub struct AsyncWorkerTest<'a> { #[cfg(test)] impl<'a> AsyncWorkerTest<'a> { - pub async fn run(&mut self, task: Task) -> Result<(), Error> { - let result = self.execute_task(task).await; + pub async fn run( + &mut self, + task: Task, + actual_task: Box, + ) -> Result<(), FangError> { + let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } - async fn execute_task(&mut self, task: Task) -> Result { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - + async fn execute_task( + &mut self, + task: Task, + actual_task: Box, + ) -> Result { let task_result = actual_task.run(self.queue).await; match task_result { Ok(()) => Ok(task), @@ -142,7 +166,10 @@ impl<'a> AsyncWorkerTest<'a> { } } - async fn finalize_task(&mut self, result: Result) -> Result<(), Error> { + async fn finalize_task( + &mut self, + result: Result, + ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { Ok(task) => { @@ -185,7 +212,7 @@ impl<'a> AsyncWorkerTest<'a> { tokio::time::sleep(self.sleep_params.sleep_period).await; } - pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> { + pub async fn run_tasks_until_none(&mut self) -> Result<(), FangError> { loop { match self .queue @@ -193,8 +220,17 @@ impl<'a> AsyncWorkerTest<'a> { .await { Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task).await?; + } self.sleep_params.maybe_reset_sleep_period(); - self.run(task).await? + // run scheduled task + self.run(task, actual_task).await?; } Ok(None) => { return Ok(()); @@ -216,13 +252,16 @@ mod async_worker_tests { use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_worker::Task; - use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; + use crate::FangError; use crate::RetentionMode; + use crate::Scheduled; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; use bb8_postgres::PostgresConnectionManager; + use chrono::Duration; + use chrono::Utc; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -233,10 +272,27 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } + + #[derive(Serialize, Deserialize)] + struct WorkerAsyncTaskSchedule { + pub number: u16, + } + + #[typetag::serde] + #[async_trait] + impl AsyncRunnable for WorkerAsyncTaskSchedule { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + Ok(()) + } + fn cron(&self) -> Option { + Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(1))) + } + } + #[derive(Serialize, Deserialize)] struct AsyncFailedTask { pub number: u16, @@ -245,10 +301,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncFailedTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { let message = format!("number {} is wrong :(", self.number); - Err(Error { + Err(FangError { description: message, }) } @@ -260,7 +316,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType1 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -275,7 +331,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType2 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -290,8 +346,9 @@ mod async_worker_tests { let transaction = connection.transaction().await.unwrap(); let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + let actual_task = WorkerAsyncTask { number: 1 }; - let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await; + let task = insert_task(&mut test, &actual_task).await; let id = task.id; let mut worker = AsyncWorkerTest::builder() @@ -299,12 +356,48 @@ mod async_worker_tests { .retention_mode(RetentionMode::KeepAll) .build(); - worker.run(task).await.unwrap(); + worker.run(task, Box::new(actual_task)).await.unwrap(); let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); assert_eq!(FangTaskState::Finished, task_finished.state); test.transaction.rollback().await.unwrap(); } + + #[tokio::test] + async fn schedule_task_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let actual_task = WorkerAsyncTaskSchedule { number: 1 }; + + let task = test.schedule_task(&actual_task).await.unwrap(); + + let id = task.id; + + let mut worker = AsyncWorkerTest::builder() + .queue(&mut test as &mut dyn AsyncQueueable) + .retention_mode(RetentionMode::KeepAll) + .build(); + + worker.run_tasks_until_none().await.unwrap(); + + let task = worker.queue.find_task_by_id(id).await.unwrap(); + + assert_eq!(id, task.id); + assert_eq!(FangTaskState::New, task.state); + + tokio::time::sleep(core::time::Duration::from_secs(3)).await; + + worker.run_tasks_until_none().await.unwrap(); + + let task = test.find_task_by_id(id).await.unwrap(); + assert_eq!(id, task.id); + assert_eq!(FangTaskState::Finished, task.state); + } + #[tokio::test] async fn saves_error_for_failed_task() { let pool = pool().await; @@ -312,8 +405,9 @@ mod async_worker_tests { let transaction = connection.transaction().await.unwrap(); let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + let failed_task = AsyncFailedTask { number: 1 }; - let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await; + let task = insert_task(&mut test, &failed_task).await; let id = task.id; let mut worker = AsyncWorkerTest::builder() @@ -321,7 +415,7 @@ mod async_worker_tests { .retention_mode(RetentionMode::KeepAll) .build(); - worker.run(task).await.unwrap(); + worker.run(task, Box::new(failed_task)).await.unwrap(); let task_finished = test.find_task_by_id(id).await.unwrap(); assert_eq!(id, task_finished.id); @@ -367,6 +461,7 @@ mod async_worker_tests { assert_eq!(FangTaskState::New, task2.state); test.transaction.rollback().await.unwrap(); } + #[tokio::test] async fn remove_when_finished() { let pool = pool().await; diff --git a/src/asynk/async_worker_pool.rs b/src/asynk/async_worker_pool.rs index 57796c4..6a2e000 100644 --- a/src/asynk/async_worker_pool.rs +++ b/src/asynk/async_worker_pool.rs @@ -1,7 +1,7 @@ use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::DEFAULT_TASK_TYPE; use crate::asynk::async_worker::AsyncWorker; -use crate::asynk::AsyncError as Error; +use crate::FangError; use crate::{RetentionMode, SleepParams}; use async_recursion::async_recursion; use log::error; @@ -61,7 +61,7 @@ where sleep_params: SleepParams, retention_mode: RetentionMode, task_type: String, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { Self::run_worker(queue, sleep_params, retention_mode, task_type).await }) @@ -71,7 +71,7 @@ where sleep_params: SleepParams, retention_mode: RetentionMode, task_type: String, - ) -> Result<(), Error> { + ) -> Result<(), FangError> { let mut worker: AsyncWorker = AsyncWorker::builder() .queue(queue) .sleep_params(sleep_params) diff --git a/src/asynk/queries/fail_task.sql b/src/asynk/queries/fail_task.sql index 416d91f..1719286 100644 --- a/src/asynk/queries/fail_task.sql +++ b/src/asynk/queries/fail_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING * diff --git a/src/asynk/queries/fetch_periodic_tasks.sql b/src/asynk/queries/fetch_periodic_tasks.sql deleted file mode 100644 index 5d7529e..0000000 --- a/src/asynk/queries/fetch_periodic_tasks.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL diff --git a/src/asynk/queries/fetch_task_type.sql b/src/asynk/queries/fetch_task_type.sql index 360a5fa..91e2f85 100644 --- a/src/asynk/queries/fetch_task_type.sql +++ b/src/asynk/queries/fetch_task_type.sql @@ -1 +1 @@ -SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED +SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/src/asynk/queries/find_periodic_task_by_id.sql b/src/asynk/queries/find_periodic_task_by_id.sql deleted file mode 100644 index 67c0316..0000000 --- a/src/asynk/queries/find_periodic_task_by_id.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_periodic_tasks WHERE id = $1 diff --git a/src/asynk/queries/find_task_by_metadata.sql b/src/asynk/queries/find_task_by_metadata.sql deleted file mode 100644 index 0e38bfa..0000000 --- a/src/asynk/queries/find_task_by_metadata.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1 diff --git a/src/asynk/queries/find_task_by_uniq_hash.sql b/src/asynk/queries/find_task_by_uniq_hash.sql new file mode 100644 index 0000000..3694a58 --- /dev/null +++ b/src/asynk/queries/find_task_by_uniq_hash.sql @@ -0,0 +1 @@ +SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1 diff --git a/src/asynk/queries/insert_periodic_task.sql b/src/asynk/queries/insert_periodic_task.sql deleted file mode 100644 index 1ba98e8..0000000 --- a/src/asynk/queries/insert_periodic_task.sql +++ /dev/null @@ -1 +0,0 @@ -INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_millis") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at diff --git a/src/asynk/queries/insert_task.sql b/src/asynk/queries/insert_task.sql index b6ec160..514d921 100644 --- a/src/asynk/queries/insert_task.sql +++ b/src/asynk/queries/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +INSERT INTO "fang_tasks" ("metadata", "task_type", "scheduled_at") VALUES ($1, $2, $3) RETURNING * diff --git a/src/asynk/queries/insert_task_uniq.sql b/src/asynk/queries/insert_task_uniq.sql new file mode 100644 index 0000000..0817383 --- /dev/null +++ b/src/asynk/queries/insert_task_uniq.sql @@ -0,0 +1 @@ +INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) RETURNING * diff --git a/src/asynk/queries/remove_all_periodic_tasks.sql b/src/asynk/queries/remove_all_periodic_tasks.sql deleted file mode 100644 index 75b5afe..0000000 --- a/src/asynk/queries/remove_all_periodic_tasks.sql +++ /dev/null @@ -1 +0,0 @@ -DELETE FROM "fang_periodic_tasks" diff --git a/src/asynk/queries/remove_all_scheduled_tasks.sql b/src/asynk/queries/remove_all_scheduled_tasks.sql new file mode 100644 index 0000000..61a5b6b --- /dev/null +++ b/src/asynk/queries/remove_all_scheduled_tasks.sql @@ -0,0 +1 @@ +DELETE FROM "fang_tasks" WHERE scheduled_at > $1 diff --git a/src/asynk/queries/schedule_next_task.sql b/src/asynk/queries/schedule_next_task.sql deleted file mode 100644 index 00fd46e..0000000 --- a/src/asynk/queries/schedule_next_task.sql +++ /dev/null @@ -1 +0,0 @@ -UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at diff --git a/src/asynk/queries/update_task_state.sql b/src/asynk/queries/update_task_state.sql index afca5c0..e2e2d94 100644 --- a/src/asynk/queries/update_task_state.sql +++ b/src/asynk/queries/update_task_state.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at +UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING * diff --git a/src/blocking.rs b/src/blocking.rs index f2aaf7c..7f4f296 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -1,13 +1,12 @@ pub mod error; -pub mod executor; pub mod queue; -pub mod scheduler; +pub mod runnable; pub mod schema; +pub mod worker; pub mod worker_pool; -pub use error::FangError; -pub use executor::*; pub use queue::*; -pub use scheduler::*; +pub use runnable::Runnable; pub use schema::*; +pub use worker::*; pub use worker_pool::*; diff --git a/src/blocking/error.rs b/src/blocking/error.rs index a1cb125..a5205fb 100644 --- a/src/blocking/error.rs +++ b/src/blocking/error.rs @@ -1,21 +1,31 @@ +use crate::blocking::queue::QueueError; +use crate::FangError; +use diesel::r2d2::PoolError; +use diesel::result::Error as DieselError; use std::io::Error as IoError; -use std::sync::PoisonError; -use thiserror::Error; -#[derive(Error, Debug)] -pub enum FangError { - #[error("The shared state in an executor thread became poisoned")] - PoisonedLock, - - #[error("Failed to create executor thread")] - ExecutorThreadCreationFailed { - #[from] - source: IoError, - }, -} - -impl From> for FangError { - fn from(_: PoisonError) -> Self { - Self::PoisonedLock +impl From for FangError { + fn from(error: IoError) -> Self { + let description = format!("{:?}", error); + FangError { description } + } +} + +impl From for FangError { + fn from(error: QueueError) -> Self { + let description = format!("{:?}", error); + FangError { description } + } +} + +impl From for FangError { + fn from(error: DieselError) -> Self { + Self::from(QueueError::DieselError(error)) + } +} + +impl From for FangError { + fn from(error: PoolError) -> Self { + Self::from(QueueError::PoolError(error)) } } diff --git a/src/blocking/executor.rs b/src/blocking/executor.rs deleted file mode 100644 index 8367132..0000000 --- a/src/blocking/executor.rs +++ /dev/null @@ -1,328 +0,0 @@ -use crate::error::FangError; -use crate::queue::Queue; -use crate::queue::Task; -use crate::worker_pool::{SharedState, WorkerState}; -use crate::{RetentionMode, SleepParams}; -use diesel::pg::PgConnection; -use diesel::r2d2::{ConnectionManager, PooledConnection}; -use log::error; -use std::thread; - -pub struct Executor { - pub pooled_connection: PooledConnection>, - pub task_type: Option, - pub sleep_params: SleepParams, - pub retention_mode: RetentionMode, - shared_state: Option, -} - -#[derive(Debug)] -pub struct Error { - pub description: String, -} - -#[typetag::serde(tag = "type")] -pub trait Runnable { - fn run(&self, connection: &PgConnection) -> Result<(), Error>; - - fn task_type(&self) -> String { - "common".to_string() - } -} - -impl Executor { - pub fn new(pooled_connection: PooledConnection>) -> Self { - Self { - pooled_connection, - sleep_params: SleepParams::default(), - retention_mode: RetentionMode::RemoveFinished, - task_type: None, - shared_state: None, - } - } - - pub fn set_shared_state(&mut self, shared_state: SharedState) { - self.shared_state = Some(shared_state); - } - - pub fn set_task_type(&mut self, task_type: String) { - self.task_type = Some(task_type); - } - - pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { - self.sleep_params = sleep_params; - } - - pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { - self.retention_mode = retention_mode; - } - - pub fn run(&self, task: Task) { - let result = self.execute_task(task); - self.finalize_task(result) - } - - pub fn run_tasks(&mut self) -> Result<(), FangError> { - loop { - if let Some(ref shared_state) = self.shared_state { - let shared_state = shared_state.read()?; - if let WorkerState::Shutdown = *shared_state { - return Ok(()); - } - } - - match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) { - Ok(Some(task)) => { - self.maybe_reset_sleep_period(); - self.run(task); - } - Ok(None) => { - self.sleep(); - } - - Err(error) => { - error!("Failed to fetch a task {:?}", error); - - self.sleep(); - } - }; - } - } - - pub fn maybe_reset_sleep_period(&mut self) { - self.sleep_params.maybe_reset_sleep_period(); - } - - pub fn sleep(&mut self) { - self.sleep_params.maybe_increase_sleep_period(); - - thread::sleep(self.sleep_params.sleep_period); - } - - fn execute_task(&self, task: Task) -> Result { - let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); - let task_result = actual_task.run(&self.pooled_connection); - - match task_result { - Ok(()) => Ok(task), - Err(error) => Err((task, error.description)), - } - } - - fn finalize_task(&self, result: Result) { - match self.retention_mode { - RetentionMode::KeepAll => { - match result { - Ok(task) => Queue::finish_task_query(&self.pooled_connection, &task).unwrap(), - Err((task, error)) => { - Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap() - } - }; - } - RetentionMode::RemoveAll => { - match result { - Ok(task) => Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(), - Err((task, _error)) => { - Queue::remove_task_query(&self.pooled_connection, task.id).unwrap() - } - }; - } - RetentionMode::RemoveFinished => match result { - Ok(task) => { - Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(); - } - Err((task, error)) => { - Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap(); - } - }, - } - } -} - -#[cfg(test)] -mod executor_tests { - use super::Error; - use super::Executor; - use super::RetentionMode; - use super::Runnable; - use crate::queue::Queue; - use crate::schema::FangTaskState; - use crate::typetag; - use crate::NewTask; - use diesel::connection::Connection; - use diesel::pg::PgConnection; - use diesel::r2d2::{ConnectionManager, PooledConnection}; - use serde::{Deserialize, Serialize}; - - #[derive(Serialize, Deserialize)] - struct ExecutorTaskTest { - pub number: u16, - } - - #[typetag::serde] - impl Runnable for ExecutorTaskTest { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - println!("the number is {}", self.number); - - Ok(()) - } - } - - #[derive(Serialize, Deserialize)] - struct FailedTask { - pub number: u16, - } - - #[typetag::serde] - impl Runnable for FailedTask { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - let message = format!("the number is {}", self.number); - - Err(Error { - description: message, - }) - } - } - - #[derive(Serialize, Deserialize)] - struct TaskType1 {} - - #[typetag::serde] - impl Runnable for TaskType1 { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "type1".to_string() - } - } - - #[derive(Serialize, Deserialize)] - struct TaskType2 {} - - #[typetag::serde] - impl Runnable for TaskType2 { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "type2".to_string() - } - } - - pub fn serialize(job: &dyn Runnable) -> serde_json::Value { - serde_json::to_value(job).unwrap() - } - - #[test] - fn executes_and_finishes_task() { - let job = ExecutorTaskTest { number: 10 }; - - let new_task = NewTask { - metadata: serialize(&job), - task_type: "common".to_string(), - }; - - let mut executor = Executor::new(pooled_connection()); - executor.set_retention_mode(RetentionMode::KeepAll); - - executor - .pooled_connection - .test_transaction::<(), Error, _>(|| { - let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap(); - - assert_eq!(FangTaskState::New, task.state); - - executor.run(task.clone()); - - let found_task = - Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap(); - - assert_eq!(FangTaskState::Finished, found_task.state); - - Ok(()) - }); - } - - #[test] - #[ignore] - fn executes_task_only_of_specific_type() { - let task1 = TaskType1 {}; - let task2 = TaskType2 {}; - - let new_task1 = NewTask { - metadata: serialize(&task1), - task_type: "type1".to_string(), - }; - - let new_task2 = NewTask { - metadata: serialize(&task2), - task_type: "type2".to_string(), - }; - - let executor = Executor::new(pooled_connection()); - - let task1 = Queue::insert_query(&executor.pooled_connection, &new_task1).unwrap(); - let task2 = Queue::insert_query(&executor.pooled_connection, &new_task2).unwrap(); - - assert_eq!(FangTaskState::New, task1.state); - assert_eq!(FangTaskState::New, task2.state); - - std::thread::spawn(move || { - let mut executor = Executor::new(pooled_connection()); - executor.set_retention_mode(RetentionMode::KeepAll); - executor.set_task_type("type1".to_string()); - - executor.run_tasks().unwrap(); - }); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - - let found_task1 = - Queue::find_task_by_id_query(&executor.pooled_connection, task1.id).unwrap(); - assert_eq!(FangTaskState::Finished, found_task1.state); - - let found_task2 = - Queue::find_task_by_id_query(&executor.pooled_connection, task2.id).unwrap(); - assert_eq!(FangTaskState::New, found_task2.state); - } - - #[test] - fn saves_error_for_failed_task() { - let task = FailedTask { number: 10 }; - - let new_task = NewTask { - metadata: serialize(&task), - task_type: "common".to_string(), - }; - - let executor = Executor::new(pooled_connection()); - - executor - .pooled_connection - .test_transaction::<(), Error, _>(|| { - let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap(); - - assert_eq!(FangTaskState::New, task.state); - - executor.run(task.clone()); - - let found_task = - Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap(); - - assert_eq!(FangTaskState::Failed, found_task.state); - assert_eq!( - "the number is 10".to_string(), - found_task.error_message.unwrap() - ); - - Ok(()) - }); - } - - fn pooled_connection() -> PooledConnection> { - Queue::connection_pool(5).get().unwrap() - } -} diff --git a/src/blocking/queue.rs b/src/blocking/queue.rs index 868c65d..909bae0 100644 --- a/src/blocking/queue.rs +++ b/src/blocking/queue.rs @@ -1,328 +1,347 @@ -use crate::executor::Runnable; -use crate::schema::fang_periodic_tasks; +use crate::runnable::Runnable; use crate::schema::fang_tasks; use crate::schema::FangTaskState; +use crate::CronError; +use crate::Scheduled::*; use chrono::DateTime; -use chrono::Duration; use chrono::Utc; +use cron::Schedule; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2; -use diesel::result::Error; -use dotenv::dotenv; -use std::env; -use std::time::Duration as StdDuration; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::PoolError; +use diesel::r2d2::PooledConnection; +use diesel::result::Error as DieselError; +use sha2::Digest; +use sha2::Sha256; +use std::str::FromStr; +use thiserror::Error; +use typed_builder::TypedBuilder; use uuid::Uuid; -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] -#[table_name = "fang_tasks"] +#[cfg(test)] +use dotenv::dotenv; +#[cfg(test)] +use std::env; + +pub type PoolConnection = PooledConnection>; + +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone, TypedBuilder)] +#[diesel(table_name = fang_tasks)] pub struct Task { + #[builder(setter(into))] pub id: Uuid, + #[builder(setter(into))] pub metadata: serde_json::Value, + #[builder(setter(into))] pub error_message: Option, + #[builder(setter(into))] pub state: FangTaskState, + #[builder(setter(into))] pub task_type: String, + #[builder(setter(into))] + pub uniq_hash: Option, + #[builder(setter(into))] + pub scheduled_at: DateTime, + #[builder(setter(into))] pub created_at: DateTime, + #[builder(setter(into))] pub updated_at: DateTime, } -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] -#[table_name = "fang_periodic_tasks"] -pub struct PeriodicTask { - pub id: Uuid, - pub metadata: serde_json::Value, - pub period_in_millis: i64, - pub scheduled_at: Option>, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Insertable)] -#[table_name = "fang_tasks"] +#[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)] +#[diesel(table_name = fang_tasks)] pub struct NewTask { - pub metadata: serde_json::Value, - pub task_type: String, + #[builder(setter(into))] + metadata: serde_json::Value, + #[builder(setter(into))] + task_type: String, + #[builder(setter(into))] + uniq_hash: Option, + #[builder(setter(into))] + scheduled_at: DateTime, } -#[derive(Insertable)] -#[table_name = "fang_periodic_tasks"] -pub struct NewPeriodicTask { - pub metadata: serde_json::Value, - pub period_in_millis: i64, +#[derive(Debug, Error)] +pub enum QueueError { + #[error(transparent)] + DieselError(#[from] DieselError), + #[error(transparent)] + PoolError(#[from] PoolError), + #[error(transparent)] + CronError(#[from] CronError), } +impl From for QueueError { + fn from(error: cron::error::Error) -> Self { + QueueError::CronError(CronError::LibraryError(error)) + } +} + +pub trait Queueable { + fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError>; + + fn insert_task(&self, params: &dyn Runnable) -> Result; + + fn remove_all_tasks(&self) -> Result; + + fn remove_all_scheduled_tasks(&self) -> Result; + + fn remove_tasks_of_type(&self, task_type: &str) -> Result; + + fn remove_task(&self, id: Uuid) -> Result; + + fn find_task_by_id(&self, id: Uuid) -> Option; + + fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result; + + fn fail_task(&self, task: &Task, error: String) -> Result; + + fn schedule_task(&self, task: &dyn Runnable) -> Result; +} + +#[derive(Clone, TypedBuilder)] pub struct Queue { - pub connection: PgConnection, + #[builder(setter(into))] + pub connection_pool: r2d2::Pool>, } -impl Default for Queue { - fn default() -> Self { - Self::new() +impl Queueable for Queue { + fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError> { + let mut connection = self.get_connection()?; + + Self::fetch_and_touch_query(&mut connection, task_type) + } + + fn insert_task(&self, params: &dyn Runnable) -> Result { + let mut connection = self.get_connection()?; + + Self::insert_query(&mut connection, params, Utc::now()) + } + fn schedule_task(&self, params: &dyn Runnable) -> Result { + let mut connection = self.get_connection()?; + + Self::schedule_task_query(&mut connection, params) + } + + fn remove_all_scheduled_tasks(&self) -> Result { + let mut connection = self.get_connection()?; + + Self::remove_all_scheduled_tasks_query(&mut connection) + } + + fn remove_all_tasks(&self) -> Result { + let mut connection = self.get_connection()?; + + Self::remove_all_tasks_query(&mut connection) + } + + fn remove_tasks_of_type(&self, task_type: &str) -> Result { + let mut connection = self.get_connection()?; + + Self::remove_tasks_of_type_query(&mut connection, task_type) + } + + fn remove_task(&self, id: Uuid) -> Result { + let mut connection = self.get_connection()?; + + Self::remove_task_query(&mut connection, id) + } + + fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result { + let mut connection = self.get_connection()?; + + Self::update_task_state_query(&mut connection, task, state) + } + + fn fail_task(&self, task: &Task, error: String) -> Result { + let mut connection = self.get_connection()?; + + Self::fail_task_query(&mut connection, task, error) + } + + fn find_task_by_id(&self, id: Uuid) -> Option { + let mut connection = self.get_connection().unwrap(); + + Self::find_task_by_id_query(&mut connection, id) } } impl Queue { - pub fn new() -> Self { - let connection = Self::pg_connection(None); + pub fn get_connection(&self) -> Result { + let result = self.connection_pool.get(); - Self { connection } + if let Err(err) = result { + log::error!("Failed to get a db connection {:?}", err); + return Err(QueueError::PoolError(err)); + } + + Ok(result.unwrap()) } - pub fn new_with_url(database_url: String) -> Self { - let connection = Self::pg_connection(Some(database_url)); + pub fn schedule_task_query( + connection: &mut PgConnection, + params: &dyn Runnable, + ) -> Result { + let scheduled_at = match params.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); - Self { connection } - } - - pub fn new_with_connection(connection: PgConnection) -> Self { - Self { connection } - } - - pub fn push_task(&self, task: &dyn Runnable) -> Result { - Self::push_task_query(&self.connection, task) - } - - pub fn push_task_query(connection: &PgConnection, task: &dyn Runnable) -> Result { - let json_task = serde_json::to_value(task).unwrap(); - - match Self::find_task_by_metadata_query(connection, &json_task) { - Some(task) => Ok(task), + iterator + .next() + .ok_or(QueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, None => { - let new_task = NewTask { - metadata: json_task.clone(), - task_type: task.task_type(), - }; - Self::insert_query(connection, &new_task) + return Err(QueueError::CronError(CronError::TaskNotSchedulableError)); + } + }; + + Self::insert_query(connection, params, scheduled_at) + } + + fn calculate_hash(json: String) -> String { + let mut hasher = Sha256::new(); + hasher.update(json.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + } + + pub fn insert_query( + connection: &mut PgConnection, + params: &dyn Runnable, + scheduled_at: DateTime, + ) -> Result { + if !params.uniq() { + let new_task = NewTask::builder() + .scheduled_at(scheduled_at) + .uniq_hash(None) + .task_type(params.task_type()) + .metadata(serde_json::to_value(params).unwrap()) + .build(); + + Ok(diesel::insert_into(fang_tasks::table) + .values(new_task) + .get_result::(connection)?) + } else { + let metadata = serde_json::to_value(params).unwrap(); + + let uniq_hash = Self::calculate_hash(metadata.to_string()); + + match Self::find_task_by_uniq_hash_query(connection, &uniq_hash) { + Some(task) => Ok(task), + None => { + let new_task = NewTask::builder() + .scheduled_at(scheduled_at) + .uniq_hash(Some(uniq_hash)) + .task_type(params.task_type()) + .metadata(serde_json::to_value(params).unwrap()) + .build(); + + Ok(diesel::insert_into(fang_tasks::table) + .values(new_task) + .get_result::(connection)?) + } } } } - pub fn push_periodic_task( - &self, - task: &dyn Runnable, - period: i64, - ) -> Result { - Self::push_periodic_task_query(&self.connection, task, period) - } - - pub fn push_periodic_task_query( - connection: &PgConnection, - task: &dyn Runnable, - period: i64, - ) -> Result { - let json_task = serde_json::to_value(task).unwrap(); - - match Self::find_periodic_task_by_metadata_query(connection, &json_task) { - Some(task) => Ok(task), - None => { - let new_task = NewPeriodicTask { - metadata: json_task, - period_in_millis: period, - }; - - diesel::insert_into(fang_periodic_tasks::table) - .values(new_task) - .get_result::(connection) - } - } - } - - pub fn enqueue_task(task: &dyn Runnable) -> Result { - Self::new().push_task(task) - } - - pub fn insert(&self, params: &NewTask) -> Result { - Self::insert_query(&self.connection, params) - } - - pub fn insert_query(connection: &PgConnection, params: &NewTask) -> Result { - diesel::insert_into(fang_tasks::table) - .values(params) - .get_result::(connection) - } - - pub fn fetch_task(&self, task_type: &Option) -> Option { - Self::fetch_task_query(&self.connection, task_type) - } - - pub fn fetch_task_query(connection: &PgConnection, task_type: &Option) -> Option { - match task_type { - None => Self::fetch_any_task_query(connection), - Some(task_type_str) => Self::fetch_task_of_type_query(connection, task_type_str), - } - } - - pub fn fetch_and_touch(&self, task_type: &Option) -> Result, Error> { - Self::fetch_and_touch_query(&self.connection, task_type) + pub fn fetch_task_query(connection: &mut PgConnection, task_type: String) -> Option { + Self::fetch_task_of_type_query(connection, &task_type) } pub fn fetch_and_touch_query( - connection: &PgConnection, - task_type: &Option, - ) -> Result, Error> { - connection.transaction::, Error, _>(|| { - let found_task = Self::fetch_task_query(connection, task_type); + connection: &mut PgConnection, + task_type: String, + ) -> Result, QueueError> { + connection.transaction::, QueueError, _>(|conn| { + let found_task = Self::fetch_task_query(conn, task_type); if found_task.is_none() { return Ok(None); } - match Self::start_processing_task_query(connection, &found_task.unwrap()) { + match Self::update_task_state_query( + conn, + &found_task.unwrap(), + FangTaskState::InProgress, + ) { Ok(updated_task) => Ok(Some(updated_task)), Err(err) => Err(err), } }) } - pub fn find_task_by_id(&self, id: Uuid) -> Option { - Self::find_task_by_id_query(&self.connection, id) - } - - pub fn find_task_by_id_query(connection: &PgConnection, id: Uuid) -> Option { + pub fn find_task_by_id_query(connection: &mut PgConnection, id: Uuid) -> Option { fang_tasks::table .filter(fang_tasks::id.eq(id)) .first::(connection) .ok() } - pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option { - Self::find_periodic_task_by_id_query(&self.connection, id) + pub fn remove_all_tasks_query(connection: &mut PgConnection) -> Result { + Ok(diesel::delete(fang_tasks::table).execute(connection)?) } - pub fn find_periodic_task_by_id_query( - connection: &PgConnection, - id: Uuid, - ) -> Option { - fang_periodic_tasks::table - .filter(fang_periodic_tasks::id.eq(id)) - .first::(connection) - .ok() - } + pub fn remove_all_scheduled_tasks_query( + connection: &mut PgConnection, + ) -> Result { + let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now())); - pub fn fetch_periodic_tasks(&self, error_margin: StdDuration) -> Option> { - Self::fetch_periodic_tasks_query(&self.connection, error_margin) - } - - pub fn fetch_periodic_tasks_query( - connection: &PgConnection, - error_margin: StdDuration, - ) -> Option> { - let current_time = Self::current_time(); - - let margin = Duration::from_std(error_margin).unwrap(); - - let low_limit = current_time - margin; - let high_limit = current_time + margin; - - fang_periodic_tasks::table - .filter( - fang_periodic_tasks::scheduled_at - .gt(low_limit) - .and(fang_periodic_tasks::scheduled_at.lt(high_limit)), - ) - .or_filter(fang_periodic_tasks::scheduled_at.is_null()) - .load::(connection) - .ok() - } - - pub fn schedule_next_task_execution(&self, task: &PeriodicTask) -> Result { - let current_time = Self::current_time(); - let scheduled_at = current_time + Duration::milliseconds(task.period_in_millis); - - diesel::update(task) - .set(( - fang_periodic_tasks::scheduled_at.eq(scheduled_at), - fang_periodic_tasks::updated_at.eq(current_time), - )) - .get_result::(&self.connection) - } - - pub fn remove_all_tasks(&self) -> Result { - Self::remove_all_tasks_query(&self.connection) - } - - pub fn remove_all_tasks_query(connection: &PgConnection) -> Result { - diesel::delete(fang_tasks::table).execute(connection) - } - - pub fn remove_tasks_of_type(&self, task_type: &str) -> Result { - Self::remove_tasks_of_type_query(&self.connection, task_type) + Ok(diesel::delete(query).execute(connection)?) } pub fn remove_tasks_of_type_query( - connection: &PgConnection, + connection: &mut PgConnection, task_type: &str, - ) -> Result { + ) -> Result { let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type)); - diesel::delete(query).execute(connection) + Ok(diesel::delete(query).execute(connection)?) } - pub fn remove_all_periodic_tasks(&self) -> Result { - Self::remove_all_periodic_tasks_query(&self.connection) - } - - pub fn remove_all_periodic_tasks_query(connection: &PgConnection) -> Result { - diesel::delete(fang_periodic_tasks::table).execute(connection) - } - - pub fn remove_task(&self, id: Uuid) -> Result { - Self::remove_task_query(&self.connection, id) - } - - pub fn remove_task_query(connection: &PgConnection, id: Uuid) -> Result { + pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result { let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); - diesel::delete(query).execute(connection) + Ok(diesel::delete(query).execute(connection)?) } - pub fn finish_task(&self, task: &Task) -> Result { - Self::finish_task_query(&self.connection, task) - } - - pub fn finish_task_query(connection: &PgConnection, task: &Task) -> Result { - diesel::update(task) - .set(( - fang_tasks::state.eq(FangTaskState::Finished), - fang_tasks::updated_at.eq(Self::current_time()), - )) - .get_result::(connection) - } - - pub fn start_processing_task(&self, task: &Task) -> Result { - Self::start_processing_task_query(&self.connection, task) - } - - pub fn start_processing_task_query( - connection: &PgConnection, + pub fn update_task_state_query( + connection: &mut PgConnection, task: &Task, - ) -> Result { - diesel::update(task) + state: FangTaskState, + ) -> Result { + Ok(diesel::update(task) .set(( - fang_tasks::state.eq(FangTaskState::InProgress), + fang_tasks::state.eq(state), fang_tasks::updated_at.eq(Self::current_time()), )) - .get_result::(connection) - } - - pub fn fail_task(&self, task: &Task, error: String) -> Result { - Self::fail_task_query(&self.connection, task, error) + .get_result::(connection)?) } pub fn fail_task_query( - connection: &PgConnection, + connection: &mut PgConnection, task: &Task, error: String, - ) -> Result { - diesel::update(task) + ) -> Result { + Ok(diesel::update(task) .set(( fang_tasks::state.eq(FangTaskState::Failed), fang_tasks::error_message.eq(error), fang_tasks::updated_at.eq(Self::current_time()), )) - .get_result::(connection) + .get_result::(connection)?) } + fn current_time() -> DateTime { + Utc::now() + } + + #[cfg(test)] pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { dotenv().ok(); @@ -336,36 +355,12 @@ impl Queue { .unwrap() } - fn current_time() -> DateTime { - Utc::now() - } - - fn pg_connection(database_url: Option) -> PgConnection { - dotenv().ok(); - - let url = match database_url { - Some(string_url) => string_url, - None => env::var("DATABASE_URL").expect("DATABASE_URL must be set"), - }; - - PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)) - } - - fn fetch_any_task_query(connection: &PgConnection) -> Option { - fang_tasks::table - .order(fang_tasks::created_at.asc()) - .limit(1) - .filter(fang_tasks::state.eq(FangTaskState::New)) - .for_update() - .skip_locked() - .get_result::(connection) - .ok() - } - - fn fetch_task_of_type_query(connection: &PgConnection, task_type: &str) -> Option { + fn fetch_task_of_type_query(connection: &mut PgConnection, task_type: &str) -> Option { fang_tasks::table .order(fang_tasks::created_at.asc()) + .order(fang_tasks::scheduled_at.asc()) .limit(1) + .filter(fang_tasks::scheduled_at.le(Utc::now())) .filter(fang_tasks::state.eq(FangTaskState::New)) .filter(fang_tasks::task_type.eq(task_type)) .for_update() @@ -374,27 +369,13 @@ impl Queue { .ok() } - fn find_periodic_task_by_metadata_query( - connection: &PgConnection, - metadata: &serde_json::Value, - ) -> Option { - fang_periodic_tasks::table - .filter(fang_periodic_tasks::metadata.eq(metadata)) - .first::(connection) - .ok() - } - - fn find_task_by_metadata_query( - connection: &PgConnection, - metadata: &serde_json::Value, + fn find_task_by_uniq_hash_query( + connection: &mut PgConnection, + uniq_hash: &str, ) -> Option { fang_tasks::table - .filter(fang_tasks::metadata.eq(metadata)) - .filter( - fang_tasks::state - .eq(FangTaskState::New) - .or(fang_tasks::state.eq(FangTaskState::InProgress)), - ) + .filter(fang_tasks::uniq_hash.eq(uniq_hash)) + .filter(fang_tasks::state.eq(FangTaskState::New)) .first::(connection) .ok() } @@ -402,422 +383,21 @@ impl Queue { #[cfg(test)] mod queue_tests { - use super::NewTask; - use super::PeriodicTask; use super::Queue; - use super::Task; - use crate::executor::Error as ExecutorError; - use crate::executor::Runnable; - use crate::schema::fang_periodic_tasks; - use crate::schema::fang_tasks; + use super::Queueable; + use crate::chrono::SubsecRound; + use crate::runnable::Runnable; + use crate::runnable::COMMON_TYPE; use crate::schema::FangTaskState; use crate::typetag; - use chrono::prelude::*; - use chrono::{DateTime, Duration, Utc}; + use crate::FangError; + use crate::Scheduled; + use chrono::DateTime; + use chrono::Duration; + use chrono::Utc; use diesel::connection::Connection; - use diesel::prelude::*; use diesel::result::Error; use serde::{Deserialize, Serialize}; - use std::time::Duration as StdDuration; - - #[test] - fn insert_inserts_task() { - let queue = Queue::new(); - - let new_task = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - let result = queue - .connection - .test_transaction::(|| queue.insert(&new_task)); - - assert_eq!(result.state, FangTaskState::New); - assert_eq!(result.error_message, None); - } - - #[test] - fn fetch_task_fetches_the_oldest_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let timestamp1 = Utc::now() - Duration::hours(40); - - let task1 = insert_task(serde_json::json!(true), timestamp1, &queue.connection); - - let timestamp2 = Utc::now() - Duration::hours(20); - - insert_task(serde_json::json!(false), timestamp2, &queue.connection); - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1.id); - - Ok(()) - }); - } - - #[test] - fn finish_task_updates_state_field() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_task(&queue.connection); - - let updated_task = queue.finish_task(&task).unwrap(); - - assert_eq!(FangTaskState::Finished, updated_task.state); - - Ok(()) - }); - } - - #[test] - fn fail_task_updates_state_field_and_sets_error_message() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_task(&queue.connection); - let error = "Failed".to_string(); - - let updated_task = queue.fail_task(&task, error.clone()).unwrap(); - - assert_eq!(FangTaskState::Failed, updated_task.state); - assert_eq!(error, updated_task.error_message.unwrap()); - - Ok(()) - }); - } - - #[test] - fn fetch_and_touch_updates_state() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let _task = insert_new_task(&queue.connection); - - let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap(); - - assert_eq!(FangTaskState::InProgress, updated_task.state); - - Ok(()) - }); - } - - #[test] - fn fetch_and_touch_returns_none() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = queue.fetch_and_touch(&None).unwrap(); - - assert_eq!(None, task); - - Ok(()) - }); - } - - #[test] - fn push_task_serializes_and_inserts_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_task(&task).unwrap(); - - let mut m = serde_json::value::Map::new(); - m.insert( - "number".to_string(), - serde_json::value::Value::Number(10.into()), - ); - m.insert( - "type".to_string(), - serde_json::value::Value::String("PepeTask".to_string()), - ); - - assert_eq!(task.metadata, serde_json::value::Value::Object(m)); - - Ok(()) - }); - } - - #[test] - fn push_task_does_not_insert_the_same_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task2 = queue.push_task(&task).unwrap(); - - let task1 = queue.push_task(&task).unwrap(); - - assert_eq!(task1.id, task2.id); - - Ok(()) - }); - } - - #[test] - fn push_periodic_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_periodic_task(&task, 60_i64).unwrap(); - - assert_eq!(task.period_in_millis, 60_i64); - assert!(queue.find_periodic_task_by_id(task.id).is_some()); - - Ok(()) - }); - } - - #[test] - fn push_periodic_task_returns_existing_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task1 = queue.push_periodic_task(&task, 60).unwrap(); - - let task2 = queue.push_periodic_task(&task, 60).unwrap(); - - assert_eq!(task1.id, task2.id); - - Ok(()) - }); - } - - #[test] - fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_periodic_task(&task, 60).unwrap(); - - let schedule_in_future = Utc::now() + Duration::hours(100); - - insert_periodic_task( - serde_json::json!(true), - schedule_in_future, - 100, - &queue.connection, - ); - - let tasks = queue - .fetch_periodic_tasks(StdDuration::from_secs(100)) - .unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].id, task.id); - - Ok(()) - }); - } - - #[test] - fn schedule_next_task_execution() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_periodic_task( - serde_json::json!(true), - Utc::now(), - 100000, - &queue.connection, - ); - - let updated_task = queue.schedule_next_task_execution(&task).unwrap(); - - let next_schedule = (task.scheduled_at.unwrap() - + Duration::milliseconds(task.period_in_millis)) - .round_subsecs(0); - - assert_eq!( - next_schedule, - updated_task.scheduled_at.unwrap().round_subsecs(0) - ); - - Ok(()) - }); - } - - #[test] - fn remove_all_periodic_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = - insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); - - let result = queue.remove_all_periodic_tasks().unwrap(); - - assert_eq!(1, result); - - assert_eq!(None, queue.find_periodic_task_by_id(task.id)); - - Ok(()) - }); - } - - #[test] - fn remove_all_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_task(serde_json::json!(true), Utc::now(), &queue.connection); - let result = queue.remove_all_tasks().unwrap(); - - assert_eq!(1, result); - - assert_eq!(None, queue.find_task_by_id(task.id)); - - Ok(()) - }); - } - - #[test] - fn fetch_periodic_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let schedule_in_future = Utc::now() + Duration::hours(100); - - insert_periodic_task( - serde_json::json!(true), - schedule_in_future, - 100, - &queue.connection, - ); - - let task = - insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); - - let tasks = queue - .fetch_periodic_tasks(StdDuration::from_secs(100)) - .unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].id, task.id); - - Ok(()) - }); - } - - #[test] - fn remove_task() { - let queue = Queue::new(); - - let new_task1 = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - let new_task2 = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - queue.connection.test_transaction::<(), Error, _>(|| { - let task1 = queue.insert(&new_task1).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_some()); - - let task2 = queue.insert(&new_task2).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_task(task1.id).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_none()); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_task(task2.id).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_none()); - - Ok(()) - }); - } - - #[test] - fn remove_task_of_type() { - let queue = Queue::new(); - - let new_task1 = NewTask { - metadata: serde_json::json!(true), - task_type: "type1".to_string(), - }; - - let new_task2 = NewTask { - metadata: serde_json::json!(true), - task_type: "type2".to_string(), - }; - - queue.connection.test_transaction::<(), Error, _>(|| { - let task1 = queue.insert(&new_task1).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_some()); - - let task2 = queue.insert(&new_task2).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_tasks_of_type("type1").unwrap(); - assert!(queue.find_task_by_id(task1.id).is_none()); - assert!(queue.find_task_by_id(task2.id).is_some()); - - Ok(()) - }); - } - - // this test is ignored because it commits data to the db - #[test] - #[ignore] - fn fetch_task_locks_the_record() { - let queue = Queue::new(); - let timestamp1 = Utc::now() - Duration::hours(40); - - let task1 = insert_task( - serde_json::json!(PepeTask { number: 12 }), - timestamp1, - &queue.connection, - ); - - let task1_id = task1.id; - - let timestamp2 = Utc::now() - Duration::hours(20); - - let task2 = insert_task( - serde_json::json!(PepeTask { number: 11 }), - timestamp2, - &queue.connection, - ); - - let thread = std::thread::spawn(move || { - let queue = Queue::new(); - - queue.connection.transaction::<(), Error, _>(|| { - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1.id); - - std::thread::sleep(std::time::Duration::from_millis(5000)); - - Ok(()) - }) - }); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task2.id); - - let _result = thread.join(); - - // returns unlocked record - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1_id); - } #[derive(Serialize, Deserialize)] struct PepeTask { @@ -826,47 +406,373 @@ mod queue_tests { #[typetag::serde] impl Runnable for PepeTask { - fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { println!("the number is {}", self.number); Ok(()) } + fn uniq(&self) -> bool { + true + } } - fn insert_task( - metadata: serde_json::Value, - timestamp: DateTime, - connection: &PgConnection, - ) -> Task { - diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(metadata), - fang_tasks::created_at.eq(timestamp), - )]) - .get_result::(connection) - .unwrap() + #[derive(Serialize, Deserialize)] + struct AyratTask { + pub number: u16, } - fn insert_periodic_task( - metadata: serde_json::Value, - timestamp: DateTime, - period_in_millis: i64, - connection: &PgConnection, - ) -> PeriodicTask { - diesel::insert_into(fang_periodic_tasks::table) - .values(&vec![( - fang_periodic_tasks::metadata.eq(metadata), - fang_periodic_tasks::scheduled_at.eq(timestamp), - fang_periodic_tasks::period_in_millis.eq(period_in_millis), - )]) - .get_result::(connection) - .unwrap() + #[typetag::serde] + impl Runnable for AyratTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + println!("the number is {}", self.number); + + Ok(()) + } + fn uniq(&self) -> bool { + true + } + + fn task_type(&self) -> String { + "weirdo".to_string() + } } - fn insert_new_task(connection: &PgConnection) -> Task { - diesel::insert_into(fang_tasks::table) - .values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)]) - .get_result::(connection) - .unwrap() + #[derive(Serialize, Deserialize)] + struct ScheduledPepeTask { + pub number: u16, + pub datetime: String, + } + + #[typetag::serde] + impl Runnable for ScheduledPepeTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + println!("the number is {}", self.number); + + Ok(()) + } + fn uniq(&self) -> bool { + true + } + + fn task_type(&self) -> String { + "scheduled".to_string() + } + + fn cron(&self) -> Option { + let datetime = self.datetime.parse::>().ok()?; + Some(Scheduled::ScheduleOnce(datetime)) + } + } + + #[test] + fn insert_task_test() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(task.error_message, None); + assert_eq!(FangTaskState::New, task.state); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fetch_task_fetches_the_oldest_task() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap(); + let _task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap(); + + let found_task = Queue::fetch_task_query(conn, COMMON_TYPE.to_string()).unwrap(); + assert_eq!(found_task.id, task1.id); + Ok(()) + }); + } + + #[test] + fn update_task_state_test() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + + let found_task = + Queue::update_task_state_query(conn, &task, FangTaskState::Finished).unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::Finished); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fail_task_updates_state_field_and_sets_error_message() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + + let error = "Failed".to_string(); + + let found_task = Queue::fail_task_query(conn, &task, error.clone()).unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::Failed); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + assert_eq!(found_task.error_message.unwrap(), error); + + Ok(()) + }); + } + + #[test] + fn fetch_and_touch_updates_state() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + + let found_task = Queue::fetch_and_touch_query(conn, COMMON_TYPE.to_string()) + .unwrap() + .unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::InProgress); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fetch_and_touch_returns_none() { + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let found_task = Queue::fetch_and_touch_query(conn, COMMON_TYPE.to_string()).unwrap(); + + assert_eq!(None, found_task); + + Ok(()) + }); + } + + #[test] + fn insert_task_uniq_test() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task1 = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + let task2 = Queue::insert_query(conn, &task, Utc::now()).unwrap(); + + assert_eq!(task2.id, task1.id); + Ok(()) + }); + } + + #[test] + fn schedule_task_test() { + let pool = Queue::connection_pool(5); + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + + let task = &ScheduledPepeTask { + number: 10, + datetime: datetime.to_string(), + }; + + let task = Queue::schedule_task_query(conn, task).unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(Some(10), number); + assert_eq!(Some("ScheduledPepeTask"), type_task); + assert_eq!(task.scheduled_at, datetime); + + Ok(()) + }); + } + + #[test] + fn remove_all_scheduled_tasks_test() { + let pool = Queue::connection_pool(5); + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0); + + let task1 = &ScheduledPepeTask { + number: 10, + datetime: datetime.to_string(), + }; + + let task2 = &ScheduledPepeTask { + number: 11, + datetime: datetime.to_string(), + }; + + Queue::schedule_task_query(conn, task1).unwrap(); + Queue::schedule_task_query(conn, task2).unwrap(); + + let number = Queue::remove_all_scheduled_tasks_query(conn).unwrap(); + + assert_eq!(2, number); + + Ok(()) + }); + } + + #[test] + fn remove_all_tasks_test() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap(); + + let result = Queue::remove_all_tasks_query(conn).unwrap(); + + assert_eq!(2, result); + assert_eq!(None, Queue::find_task_by_id_query(conn, task1.id)); + assert_eq!(None, Queue::find_task_by_id_query(conn, task2.id)); + + Ok(()) + }); + } + + #[test] + fn remove_task() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap(); + + assert!(Queue::find_task_by_id_query(conn, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(conn, task2.id).is_some()); + + Queue::remove_task_query(conn, task1.id).unwrap(); + + assert!(Queue::find_task_by_id_query(conn, task1.id).is_none()); + assert!(Queue::find_task_by_id_query(conn, task2.id).is_some()); + + Ok(()) + }); + } + + #[test] + fn remove_task_of_type() { + let task1 = PepeTask { number: 10 }; + let task2 = AyratTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|conn| { + let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap(); + + assert!(Queue::find_task_by_id_query(conn, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(conn, task2.id).is_some()); + + Queue::remove_tasks_of_type_query(conn, "weirdo").unwrap(); + + assert!(Queue::find_task_by_id_query(conn, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(conn, task2.id).is_none()); + + Ok(()) + }); } } diff --git a/src/blocking/runnable.rs b/src/blocking/runnable.rs new file mode 100644 index 0000000..0ddad54 --- /dev/null +++ b/src/blocking/runnable.rs @@ -0,0 +1,22 @@ +use crate::queue::Queueable; +use crate::FangError; +use crate::Scheduled; + +pub const COMMON_TYPE: &str = "common"; + +#[typetag::serde(tag = "type")] +pub trait Runnable { + fn run(&self, _queueable: &dyn Queueable) -> Result<(), FangError>; + + fn task_type(&self) -> String { + COMMON_TYPE.to_string() + } + + fn uniq(&self) -> bool { + false + } + + fn cron(&self) -> Option { + None + } +} diff --git a/src/blocking/scheduler.rs b/src/blocking/scheduler.rs deleted file mode 100644 index 994c7d4..0000000 --- a/src/blocking/scheduler.rs +++ /dev/null @@ -1,125 +0,0 @@ -use crate::executor::Runnable; -use crate::queue::Queue; -use crate::PeriodicTask; -use std::thread; -use std::time::Duration; - -pub struct Scheduler { - pub check_period: Duration, - pub error_margin: Duration, - pub queue: Queue, -} - -impl Drop for Scheduler { - fn drop(&mut self) { - Scheduler::start(self.check_period, self.error_margin) - } -} - -impl Scheduler { - pub fn start(check_period: Duration, error_margin: Duration) { - let queue = Queue::new(); - let builder = thread::Builder::new().name("scheduler".to_string()); - - builder - .spawn(move || { - let scheduler = Self::new(check_period, error_margin, queue); - - scheduler.schedule_loop(); - }) - .unwrap(); - } - - pub fn new(check_period: Duration, error_margin: Duration, queue: Queue) -> Self { - Self { - check_period, - queue, - error_margin, - } - } - - pub fn schedule_loop(&self) { - loop { - self.schedule(); - - thread::sleep(self.check_period); - } - } - - pub fn schedule(&self) { - if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin) { - for task in tasks { - self.process_task(task); - } - }; - } - - fn process_task(&self, task: PeriodicTask) { - match task.scheduled_at { - None => { - self.queue.schedule_next_task_execution(&task).unwrap(); - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.push_task(&(*actual_task)).unwrap(); - - self.queue.schedule_next_task_execution(&task).unwrap(); - } - } - } -} - -#[cfg(test)] -mod task_scheduler_tests { - use super::Scheduler; - use crate::executor::Error; - use crate::executor::Runnable; - use crate::queue::Queue; - use crate::schema::fang_tasks; - use crate::typetag; - use crate::Task; - use diesel::pg::PgConnection; - use diesel::prelude::*; - use serde::{Deserialize, Serialize}; - use std::thread; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct ScheduledTask {} - - #[typetag::serde] - impl Runnable for ScheduledTask { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "schedule".to_string() - } - } - - #[test] - #[ignore] - fn schedules_tasks() { - let queue = Queue::new(); - - queue.push_periodic_task(&ScheduledTask {}, 10000).unwrap(); - Scheduler::start(Duration::from_secs(1), Duration::from_secs(2)); - - let sleep_duration = Duration::from_secs(15); - thread::sleep(sleep_duration); - - let tasks = get_all_tasks(&queue.connection); - - assert_eq!(1, tasks.len()); - } - - fn get_all_tasks(conn: &PgConnection) -> Vec { - fang_tasks::table - .filter(fang_tasks::task_type.eq("schedule")) - .get_results::(conn) - .unwrap() - } -} diff --git a/src/blocking/schema.rs b/src/blocking/schema.rs index 620a6f4..28a0fd2 100644 --- a/src/blocking/schema.rs +++ b/src/blocking/schema.rs @@ -1,6 +1,11 @@ -use diesel_derive_enum::DbEnum; +pub mod sql_types { + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "fang_task_state"))] + pub struct FangTaskState; +} -#[derive(DbEnum, Debug, Eq, PartialEq, Clone)] +#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] +#[DieselTypePath = "crate::blocking::schema::sql_types::FangTaskState"] pub enum FangTaskState { New, InProgress, @@ -9,32 +14,23 @@ pub enum FangTaskState { } table! { - use super::FangTaskStateMapping; + use super::sql_types::FangTaskState; use diesel::sql_types::Jsonb; use diesel::sql_types::Nullable; use diesel::sql_types::Text; use diesel::sql_types::Timestamptz; use diesel::sql_types::Uuid; use diesel::sql_types::Varchar; - + use diesel::pg::sql_types::Bpchar; fang_tasks (id) { id -> Uuid, metadata -> Jsonb, error_message -> Nullable, - state -> FangTaskStateMapping, + state -> FangTaskState, task_type -> Varchar, - created_at -> Timestamptz, - updated_at -> Timestamptz, - } -} - -table! { - fang_periodic_tasks (id) { - id -> Uuid, - metadata -> Jsonb, - period_in_millis -> Int8, - scheduled_at -> Nullable, + uniq_hash -> Nullable, + scheduled_at -> Timestamptz, created_at -> Timestamptz, updated_at -> Timestamptz, } diff --git a/src/blocking/worker.rs b/src/blocking/worker.rs new file mode 100644 index 0000000..1fc451a --- /dev/null +++ b/src/blocking/worker.rs @@ -0,0 +1,325 @@ +use crate::queue::Queueable; +use crate::queue::Task; +use crate::runnable::Runnable; +use crate::runnable::COMMON_TYPE; +use crate::schema::FangTaskState; +use crate::FangError; +use crate::Scheduled::*; +use crate::{RetentionMode, SleepParams}; +use log::error; +use std::thread; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder)] +pub struct Worker +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + #[builder(setter(into))] + pub queue: BQueue, + #[builder(default=COMMON_TYPE.to_string(), setter(into))] + pub task_type: String, + #[builder(default, setter(into))] + pub sleep_params: SleepParams, + #[builder(default, setter(into))] + pub retention_mode: RetentionMode, +} + +impl Worker +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + pub fn run(&self, task: Task) { + let result = self.execute_task(task); + self.finalize_task(result) + } + + pub fn run_tasks(&mut self) -> Result<(), FangError> { + loop { + match self.queue.fetch_and_touch_task(self.task_type.clone()) { + Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task)?; + } + + self.maybe_reset_sleep_period(); + self.run(task); + } + Ok(None) => { + self.sleep(); + } + + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep(); + } + }; + } + } + + #[cfg(test)] + pub fn run_tasks_until_none(&mut self) -> Result<(), FangError> { + loop { + match self.queue.fetch_and_touch_task(self.task_type.clone()) { + Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task)?; + } + + self.maybe_reset_sleep_period(); + self.run(task); + } + Ok(None) => { + return Ok(()); + } + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep(); + } + }; + } + } + + pub fn maybe_reset_sleep_period(&mut self) { + self.sleep_params.maybe_reset_sleep_period(); + } + + pub fn sleep(&mut self) { + self.sleep_params.maybe_increase_sleep_period(); + + thread::sleep(self.sleep_params.sleep_period); + } + + fn execute_task(&self, task: Task) -> Result { + let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); + let task_result = actual_task.run(&self.queue); + + match task_result { + Ok(()) => Ok(task), + Err(error) => Err((task, error.description)), + } + } + + fn finalize_task(&self, result: Result) { + match self.retention_mode { + RetentionMode::KeepAll => { + match result { + Ok(task) => self + .queue + .update_task_state(&task, FangTaskState::Finished) + .unwrap(), + Err((task, error)) => self.queue.fail_task(&task, error).unwrap(), + }; + } + RetentionMode::RemoveAll => { + match result { + Ok(task) => self.queue.remove_task(task.id).unwrap(), + Err((task, _error)) => self.queue.remove_task(task.id).unwrap(), + }; + } + RetentionMode::RemoveFinished => match result { + Ok(task) => { + self.queue.remove_task(task.id).unwrap(); + } + Err((task, error)) => { + self.queue.fail_task(&task, error).unwrap(); + } + }, + } + } +} + +#[cfg(test)] +mod worker_tests { + use super::RetentionMode; + use super::Runnable; + use super::Worker; + use crate::queue::Queue; + use crate::queue::Queueable; + use crate::schema::FangTaskState; + use crate::typetag; + use crate::FangError; + use chrono::Utc; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + struct WorkerTaskTest { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for WorkerTaskTest { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + println!("the number is {}", self.number); + + Ok(()) + } + + fn task_type(&self) -> String { + "worker_task".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct FailedTask { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for FailedTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + let message = format!("the number is {}", self.number); + + Err(FangError { + description: message, + }) + } + + fn task_type(&self) -> String { + "F_task".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct TaskType1 {} + + #[typetag::serde] + impl Runnable for TaskType1 { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + Ok(()) + } + + fn task_type(&self) -> String { + "type1".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct TaskType2 {} + + #[typetag::serde] + impl Runnable for TaskType2 { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + Ok(()) + } + + fn task_type(&self) -> String { + "type2".to_string() + } + } + + // Worker tests has to commit because the worker operations commits + #[test] + #[ignore] + fn executes_and_finishes_task() { + let task = WorkerTaskTest { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let worker = Worker::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .task_type(task.task_type()) + .build(); + let mut pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task.state); + + // this operation commits and thats why need to commit this test + worker.run(task.clone()); + + let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Finished, found_task.state); + + Queue::remove_tasks_of_type_query(&mut pooled_connection, "worker_task").unwrap(); + } + + #[test] + #[ignore] + fn executes_task_only_of_specific_type() { + let task1 = TaskType1 {}; + let task2 = TaskType2 {}; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut worker = Worker::::builder() + .queue(queue) + .task_type(task1.task_type()) + .retention_mode(RetentionMode::KeepAll) + .build(); + + let mut pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task1 = Queue::insert_query(&mut pooled_connection, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&mut pooled_connection, &task2, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task1.state); + assert_eq!(FangTaskState::New, task2.state); + + worker.run_tasks_until_none().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + + let found_task1 = Queue::find_task_by_id_query(&mut pooled_connection, task1.id).unwrap(); + assert_eq!(FangTaskState::Finished, found_task1.state); + + let found_task2 = Queue::find_task_by_id_query(&mut pooled_connection, task2.id).unwrap(); + assert_eq!(FangTaskState::New, found_task2.state); + + Queue::remove_tasks_of_type_query(&mut pooled_connection, "type1").unwrap(); + Queue::remove_tasks_of_type_query(&mut pooled_connection, "type2").unwrap(); + } + + #[test] + #[ignore] + fn saves_error_for_failed_task() { + let task = FailedTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let worker = Worker::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .task_type(task.task_type()) + .build(); + + let mut pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task.state); + + worker.run(task.clone()); + + let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Failed, found_task.state); + assert_eq!( + "the number is 10".to_string(), + found_task.error_message.unwrap() + ); + + Queue::remove_tasks_of_type_query(&mut pooled_connection, "F_task").unwrap(); + } +} diff --git a/src/blocking/worker_pool.rs b/src/blocking/worker_pool.rs index 651c787..d445535 100644 --- a/src/blocking/worker_pool.rs +++ b/src/blocking/worker_pool.rs @@ -1,36 +1,38 @@ -use crate::diesel::r2d2; -use crate::diesel::PgConnection; -use crate::error::FangError; -use crate::executor::Executor; -use crate::queue::Queue; -use crate::{RetentionMode, SleepParams}; +use crate::queue::Queueable; +use crate::worker::Worker; +use crate::FangError; +use crate::RetentionMode; +use crate::SleepParams; use log::error; use log::info; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; use std::thread; +use typed_builder::TypedBuilder; -#[derive(Clone)] -pub struct WorkerPool { +#[derive(Clone, TypedBuilder)] +pub struct WorkerPool +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + #[builder(setter(into))] pub number_of_workers: u32, - pub worker_params: WorkerParams, - pub connection_pool: r2d2::Pool>, - shared_state: SharedState, - thread_join_handles: Arc>>>, + #[builder(setter(into))] + pub queue: BQueue, + #[builder(setter(into), default)] + pub task_type: String, + #[builder(setter(into), default)] + pub sleep_params: SleepParams, + #[builder(setter(into), default)] + pub retention_mode: RetentionMode, } -pub struct WorkerThread { +#[derive(Clone, TypedBuilder)] +pub struct WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ pub name: String, pub restarts: u64, - pub worker_pool: WorkerPool, - graceful_shutdown: bool, -} - -pub type SharedState = Arc>; - -pub enum WorkerState { - Running, - Shutdown, + pub worker_pool: WorkerPool, } #[derive(Clone)] @@ -40,342 +42,73 @@ pub struct WorkerParams { pub task_type: Option, } -impl Default for WorkerParams { - fn default() -> Self { - Self::new() - } -} - -impl WorkerParams { - pub fn new() -> Self { - Self { - retention_mode: None, - sleep_params: None, - task_type: None, - } - } - - pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { - self.retention_mode = Some(retention_mode); - } - - pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { - self.sleep_params = Some(sleep_params); - } - - pub fn set_task_type(&mut self, task_type: String) { - self.task_type = Some(task_type); - } -} - -impl WorkerPool { - pub fn new(number_of_workers: u32) -> Self { - let worker_params = WorkerParams::new(); - let connection_pool = Queue::connection_pool(number_of_workers); - - Self { - number_of_workers, - worker_params, - connection_pool, - shared_state: Arc::new(RwLock::new(WorkerState::Running)), - thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( - number_of_workers as usize, - ))), - } - } - - pub fn new_with_params(number_of_workers: u32, worker_params: WorkerParams) -> Self { - let connection_pool = Queue::connection_pool(number_of_workers); - - Self { - number_of_workers, - worker_params, - connection_pool, - shared_state: Arc::new(RwLock::new(WorkerState::Running)), - thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( - number_of_workers as usize, - ))), - } - } - +impl WorkerPool +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ pub fn start(&mut self) -> Result<(), FangError> { for idx in 1..self.number_of_workers + 1 { - let worker_type = self - .worker_params - .task_type - .clone() - .unwrap_or_else(|| "".to_string()); - let name = format!("worker_{}{}", worker_type, idx); - WorkerThread::spawn_in_pool(name.clone(), 0, self.clone())?; - } - Ok(()) - } + let name = format!("worker_{}{}", self.task_type, idx); - /// Attempt graceful shutdown of each job thread, blocks until all threads exit. Threads exit - /// when their current job finishes. - pub fn shutdown(&mut self) -> Result<(), FangError> { - *self.shared_state.write()? = WorkerState::Shutdown; + let worker_thread = WorkerThread::builder() + .name(name.clone()) + .restarts(0) + .worker_pool(self.clone()) + .build(); - for (worker_name, thread) in self.thread_join_handles.write()?.drain() { - if let Err(err) = thread.join() { - error!( - "Failed to exit executor thread '{}' cleanly: {:?}", - worker_name, err - ); - } + worker_thread.spawn()?; } Ok(()) } } -impl WorkerThread { - pub fn new(name: String, restarts: u64, worker_pool: WorkerPool) -> Self { - Self { - name, - restarts, - worker_pool, - graceful_shutdown: false, - } - } - - pub fn spawn_in_pool( - name: String, - restarts: u64, - worker_pool: WorkerPool, - ) -> Result<(), FangError> { +impl WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + fn spawn(self) -> Result<(), FangError> { info!( "starting a worker thread {}, number of restarts {}", - name, restarts + self.name, self.restarts ); - let job = WorkerThread::new(name.clone(), restarts, worker_pool.clone()); - let join_handle = Self::spawn_thread(name.clone(), job)?; - worker_pool - .thread_join_handles - .write()? - .insert(name, join_handle); - Ok(()) - } + let builder = thread::Builder::new().name(self.name.clone()); - fn spawn_thread( - name: String, - mut job: WorkerThread, - ) -> Result, FangError> { - let builder = thread::Builder::new().name(name.clone()); builder .spawn(move || { - match job.worker_pool.connection_pool.get() { - Ok(connection) => { - let mut executor = Executor::new(connection); - executor.set_shared_state(job.worker_pool.shared_state.clone()); + let mut worker: Worker = Worker::builder() + .queue(self.worker_pool.queue.clone()) + .task_type(self.worker_pool.task_type.clone()) + .retention_mode(self.worker_pool.retention_mode.clone()) + .sleep_params(self.worker_pool.sleep_params.clone()) + .build(); - if let Some(ref task_type_str) = job.worker_pool.worker_params.task_type { - executor.set_task_type(task_type_str.to_owned()); - } - - if let Some(ref retention_mode) = - job.worker_pool.worker_params.retention_mode - { - executor.set_retention_mode(retention_mode.to_owned()); - } - - if let Some(ref sleep_params) = job.worker_pool.worker_params.sleep_params { - executor.set_sleep_params(sleep_params.clone()); - } - - // Run executor - match executor.run_tasks() { - Ok(_) => { - job.graceful_shutdown = true; - } - Err(error) => { - error!("Error executing tasks in worker '{}': {:?}", name, error); - } - } - } - Err(error) => { - error!("Failed to get postgres connection: {:?}", error); - } + // Run worker + if let Err(error) = worker.run_tasks() { + error!( + "Error executing tasks in worker '{}': {:?}", + self.name, error + ); } }) - .map_err(FangError::from) + .map_err(FangError::from)?; + + Ok(()) } } -impl Drop for WorkerThread { +impl Drop for WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ fn drop(&mut self) { - if self.graceful_shutdown { - return; - } + self.restarts += 1; - WorkerThread::spawn_in_pool( - self.name.clone(), - self.restarts + 1, - self.worker_pool.clone(), - ) - .unwrap(); - } -} - -#[cfg(test)] -mod task_pool_tests { - use super::WorkerParams; - use super::WorkerPool; - use crate::executor::Error; - use crate::executor::Runnable; - use crate::queue::Queue; - use crate::schema::{fang_tasks, FangTaskState}; - use crate::typetag; - use crate::RetentionMode; - use crate::Task; - use diesel::pg::PgConnection; - use diesel::prelude::*; - use serde::{Deserialize, Serialize}; - use std::thread; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct MyTask { - pub number: u16, - pub current_thread_name: String, - } - - impl MyTask { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } - } - - #[typetag::serde] - impl Runnable for MyTask { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_task = MyTask::new(self.number + 1); - - Queue::push_task_query(connection, &new_task).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "worker_pool_test".to_string() - } - } - - #[derive(Serialize, Deserialize)] - struct ShutdownTask { - pub number: u16, - pub current_thread_name: String, - } - - impl ShutdownTask { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } - } - - #[typetag::serde] - impl Runnable for ShutdownTask { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_task = MyTask::new(self.number + 1); - - Queue::push_task_query(connection, &new_task).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "shutdown_test".to_string() - } - } - - fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec { - fang_tasks::table - .filter(fang_tasks::task_type.eq(job_type)) - .get_results::(conn) - .unwrap() - } - - // Following tests ignored because they commit data to the db - #[test] - #[ignore] - fn tasks_are_finished_on_shutdown() { - let queue = Queue::new(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut task_pool = WorkerPool::new_with_params(2, worker_params); - - queue.push_task(&ShutdownTask::new(100)).unwrap(); - queue.push_task(&ShutdownTask::new(200)).unwrap(); - - task_pool.start().unwrap(); - thread::sleep(Duration::from_secs(1)); - task_pool.shutdown().unwrap(); - thread::sleep(Duration::from_secs(5)); - - let tasks = get_all_tasks(&queue.connection, "shutdown_test"); - let in_progress_tasks = tasks - .iter() - .filter(|task| task.state == FangTaskState::InProgress); - let finished_tasks = tasks - .iter() - .filter(|task| task.state == FangTaskState::Finished); - - // Asserts first two tasks are allowed to finish, the tasks they spawn are not started - // though. No tasks should be in progress after a graceful shutdown. - assert_eq!(in_progress_tasks.count(), 0); - assert_eq!(finished_tasks.count(), 2); - } - - #[test] - #[ignore] - fn tasks_are_split_between_two_threads() { - let queue = Queue::new(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut task_pool = WorkerPool::new_with_params(2, worker_params); - - queue.push_task(&MyTask::new(100)).unwrap(); - queue.push_task(&MyTask::new(200)).unwrap(); - - task_pool.start().unwrap(); - - thread::sleep(Duration::from_secs(100)); - - let tasks = get_all_tasks(&queue.connection, "worker_pool_test"); - - assert!(tasks.len() > 40); - - let test_worker1_tasks = tasks.clone().into_iter().filter(|task| { - serde_json::to_string(&task.metadata) - .unwrap() - .contains("worker_1") - }); - - let test_worker2_tasks = tasks.into_iter().filter(|task| { - serde_json::to_string(&task.metadata) - .unwrap() - .contains("worker_2") - }); - - assert!(test_worker1_tasks.count() > 20); - assert!(test_worker2_tasks.count() > 20); + error!( + "Worker {} stopped. Restarting. The number of restarts {}", + self.name, self.restarts, + ); + + self.clone().spawn().unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index eb16c3a..47d3f27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,22 @@ #![allow(clippy::extra_unused_lifetimes)] use std::time::Duration; +use thiserror::Error; + +pub enum Scheduled { + CronPattern(String), + ScheduleOnce(DateTime), +} + +#[derive(Debug, Error)] +pub enum CronError { + #[error(transparent)] + LibraryError(#[from] cron::error::Error), + #[error("You have to implement method `cron()` in your AsyncRunnable")] + TaskNotSchedulableError, + #[error("No timestamps match with this cron pattern")] + NoTimestampsError, +} #[derive(Clone, Debug)] pub enum RetentionMode { @@ -46,6 +62,11 @@ impl Default for SleepParams { } } +#[derive(Debug)] +pub struct FangError { + pub description: String, +} + #[macro_use] #[cfg(feature = "blocking")] extern crate diesel; @@ -60,9 +81,16 @@ pub use typetag; #[doc(hidden)] pub extern crate serde; +#[doc(hidden)] +pub extern crate chrono; + #[doc(hidden)] pub use serde_derive::{Deserialize, Serialize}; +pub use chrono::DateTime; +pub use chrono::Utc; +pub use cron::Schedule; + #[cfg(feature = "blocking")] pub mod blocking; #[cfg(feature = "blocking")]