From 360140d06478c95f606d9930f05e8d6df94bca40 Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Mon, 29 Aug 2022 16:59:22 +0000 Subject: [PATCH] Blocking refactor (#74) * Adapting code to new Fang tables structure and refactoring to make it more generic (Not finished) * Refactoring of the blocking module * Finishing blocking module, starting to modify tests * Worker tests done, Queue tests left * Maybe all tests done ?? * Makefile clippy * Examples fixed * asynk feature Co-authored-by: Ayrat Badykov --- Cargo.toml | 11 +- Makefile | 17 + .../asynk/simple_async_cron_worker/src/lib.rs | 2 +- .../simple_cron_worker}/.gitignore | 0 .../simple_cron_worker}/Cargo.toml | 5 +- .../simple_cron_worker}/README.md | 0 .../blocking/simple_cron_worker/src/lib.rs | 35 + .../blocking/simple_cron_worker/src/main.rs | 43 + .../blocking/simple_worker/.gitignore | 2 + .../blocking/simple_worker/Cargo.toml | 13 + .../blocking/simple_worker/README.md | 3 + .../blocking/simple_worker/src/lib.rs | 96 ++ .../blocking/simple_worker/src/main.rs | 50 + fang_examples/simple_worker/src/lib.rs | 44 - fang_examples/simple_worker/src/main.rs | 41 - src/{asynk/mod.rs => asynk.rs} | 3 + src/asynk/async_queue.rs | 15 +- src/asynk/async_runnable.rs | 8 +- src/asynk/async_worker.rs | 8 +- src/{blocking/mod.rs => blocking.rs} | 7 +- src/blocking/error.rs | 4 +- src/blocking/executor.rs | 328 ----- src/blocking/queue.rs | 1238 +++++++---------- src/blocking/runnable.rs | 22 + src/blocking/scheduler.rs | 125 -- src/blocking/schema.rs | 13 +- src/blocking/worker.rs | 330 +++++ src/blocking/worker_pool.rs | 405 +----- src/lib.rs | 16 + 29 files changed, 1247 insertions(+), 1637 deletions(-) create mode 100644 Makefile rename fang_examples/{simple_worker => blocking/simple_cron_worker}/.gitignore (100%) rename fang_examples/{simple_worker => blocking/simple_cron_worker}/Cargo.toml (71%) rename fang_examples/{simple_worker => blocking/simple_cron_worker}/README.md (100%) create mode 100644 fang_examples/blocking/simple_cron_worker/src/lib.rs create mode 100644 fang_examples/blocking/simple_cron_worker/src/main.rs create mode 100644 fang_examples/blocking/simple_worker/.gitignore create mode 100644 fang_examples/blocking/simple_worker/Cargo.toml create mode 100644 fang_examples/blocking/simple_worker/README.md create mode 100644 fang_examples/blocking/simple_worker/src/lib.rs create mode 100644 fang_examples/blocking/simple_worker/src/main.rs delete mode 100644 fang_examples/simple_worker/src/lib.rs delete mode 100644 fang_examples/simple_worker/src/main.rs rename src/{asynk/mod.rs => asynk.rs} (69%) rename src/{blocking/mod.rs => blocking.rs} (65%) delete mode 100644 src/blocking/executor.rs create mode 100644 src/blocking/runnable.rs delete mode 100644 src/blocking/scheduler.rs create mode 100644 src/blocking/worker.rs diff --git a/Cargo.toml b/Cargo.toml index ff7b4a9..65e63d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,22 +14,22 @@ rust-version = "1.62" [features] default = ["blocking", "asynk"] blocking = ["diesel", "diesel-derive-enum", "dotenv"] -asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"] +asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "async-recursion"] [dependencies] cron = "0.11" -hex = "0.4" -sha2 = "0.10" chrono = "0.4" +hex = "0.4" log = "0.4" serde = "1" serde_derive = "1.0.141" serde_json = "1" +sha2 = "0.10" thiserror = "1.0" +typed-builder = "0.10" typetag = "0.2" uuid = { version = "0.8", features = ["v4"] } - [dependencies.diesel] version = "1.4" features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"] @@ -63,9 +63,6 @@ optional = true version = "0.1" optional = true -[dependencies.typed-builder] -version = "0.10" -optional = true [dependencies.async-recursion] version = "1" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..df544cf --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +db: + docker run --rm -d --name postgres -p 5432:5432 \ + -e POSTGRES_DB=fang \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=postgres \ + postgres:latest +clippy: + cargo clippy --all-features +diesel: + DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run +stop: + docker kill postgres +tests: + DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture + +ignored: + DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture --ignored diff --git a/fang_examples/asynk/simple_async_cron_worker/src/lib.rs b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs index 4ac1e8b..ff102c9 100644 --- a/fang_examples/asynk/simple_async_cron_worker/src/lib.rs +++ b/fang_examples/asynk/simple_async_cron_worker/src/lib.rs @@ -1,10 +1,10 @@ use fang::async_trait; use fang::asynk::async_queue::AsyncQueueable; use fang::asynk::async_runnable::Error; -use fang::asynk::async_runnable::Scheduled; use fang::serde::{Deserialize, Serialize}; use fang::typetag; use fang::AsyncRunnable; +use fang::Scheduled; #[derive(Serialize, Deserialize)] #[serde(crate = "fang::serde")] diff --git a/fang_examples/simple_worker/.gitignore b/fang_examples/blocking/simple_cron_worker/.gitignore similarity index 100% rename from fang_examples/simple_worker/.gitignore rename to fang_examples/blocking/simple_cron_worker/.gitignore diff --git a/fang_examples/simple_worker/Cargo.toml b/fang_examples/blocking/simple_cron_worker/Cargo.toml similarity index 71% rename from fang_examples/simple_worker/Cargo.toml rename to fang_examples/blocking/simple_cron_worker/Cargo.toml index b833463..eab4aa4 100644 --- a/fang_examples/simple_worker/Cargo.toml +++ b/fang_examples/blocking/simple_cron_worker/Cargo.toml @@ -6,7 +6,8 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fang = { path = "../../" , features = ["blocking"]} -signal-hook = "0.3.10" +fang = { path = "../../../" , features = ["blocking"]} dotenv = "0.15.0" env_logger = "0.9.0" +log = "0.4.0" +diesel = "1.4.8" diff --git a/fang_examples/simple_worker/README.md b/fang_examples/blocking/simple_cron_worker/README.md similarity index 100% rename from fang_examples/simple_worker/README.md rename to fang_examples/blocking/simple_cron_worker/README.md diff --git a/fang_examples/blocking/simple_cron_worker/src/lib.rs b/fang_examples/blocking/simple_cron_worker/src/lib.rs new file mode 100644 index 0000000..709e3d7 --- /dev/null +++ b/fang_examples/blocking/simple_cron_worker/src/lib.rs @@ -0,0 +1,35 @@ +use fang::runnable::Runnable; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::Error; +use fang::Queueable; +use fang::Scheduled; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyCronTask {} + +#[typetag::serde] +impl Runnable for MyCronTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + log::info!("CRON !!!!!!!!!!!!!!!!!"); + + Ok(()) + } + + fn task_type(&self) -> String { + "cron_test".to_string() + } + + fn cron(&self) -> Option { + // sec min hour day of month month day of week year + // be careful works only with UTC hour. + // https://www.timeanddate.com/worldclock/timezone/utc + let expression = "0/20 * * * Aug-Sep * 2022/1"; + Some(Scheduled::CronPattern(expression.to_string())) + } + + fn uniq(&self) -> bool { + true + } +} diff --git a/fang_examples/blocking/simple_cron_worker/src/main.rs b/fang_examples/blocking/simple_cron_worker/src/main.rs new file mode 100644 index 0000000..a218c63 --- /dev/null +++ b/fang_examples/blocking/simple_cron_worker/src/main.rs @@ -0,0 +1,43 @@ +use diesel::r2d2; +use dotenv::dotenv; +use fang::PgConnection; +use fang::Queue; +use fang::Queueable; +use fang::RetentionMode; +use fang::WorkerPool; +use simple_worker::MyCronTask; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let manager = r2d2::ConnectionManager::::new(database_url); + + r2d2::Pool::builder() + .max_size(pool_size) + .build(manager) + .unwrap() +} + +fn main() { + dotenv().ok(); + + env_logger::init(); + + let queue = Queue::builder().connection_pool(connection_pool(2)).build(); + + let mut worker_pool = WorkerPool::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .number_of_workers(2_u32) + .task_type("cron_test".to_string()) + .build(); + + worker_pool.queue.schedule_task(&MyCronTask {}).unwrap(); + + worker_pool.start().unwrap(); + + sleep(Duration::from_secs(100)) +} diff --git a/fang_examples/blocking/simple_worker/.gitignore b/fang_examples/blocking/simple_worker/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/fang_examples/blocking/simple_worker/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/fang_examples/blocking/simple_worker/Cargo.toml b/fang_examples/blocking/simple_worker/Cargo.toml new file mode 100644 index 0000000..eab4aa4 --- /dev/null +++ b/fang_examples/blocking/simple_worker/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "simple_worker" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../../" , features = ["blocking"]} +dotenv = "0.15.0" +env_logger = "0.9.0" +log = "0.4.0" +diesel = "1.4.8" diff --git a/fang_examples/blocking/simple_worker/README.md b/fang_examples/blocking/simple_worker/README.md new file mode 100644 index 0000000..f1c3f1b --- /dev/null +++ b/fang_examples/blocking/simple_worker/README.md @@ -0,0 +1,3 @@ +## Simple example + +The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata. diff --git a/fang_examples/blocking/simple_worker/src/lib.rs b/fang_examples/blocking/simple_worker/src/lib.rs new file mode 100644 index 0000000..a018c5a --- /dev/null +++ b/fang_examples/blocking/simple_worker/src/lib.rs @@ -0,0 +1,96 @@ +use fang::runnable::Runnable; +use fang::serde::{Deserialize, Serialize}; +use fang::typetag; +use fang::Error; +use fang::Queueable; +use std::thread; +use std::time::Duration; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyTask { + pub number: u16, + pub current_thread_name: String, +} + +impl MyTask { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + + Self { + number, + current_thread_name, + } + } +} + +#[typetag::serde] +impl Runnable for MyTask { + fn run(&self, queue: &dyn Queueable) -> Result<(), Error> { + thread::sleep(Duration::from_secs(3)); + + let new_task = MyTask::new(self.number + 1); + + log::info!( + "The number is {}, thread name {}", + self.number, + self.current_thread_name + ); + + queue.insert_task(&new_task).unwrap(); + + Ok(()) + } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyFailingTask { + pub number: u16, + pub current_thread_name: String, +} + +impl MyFailingTask { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + Self { + number, + current_thread_name, + } + } +} + +#[typetag::serde] +impl Runnable for MyFailingTask { + fn run(&self, queue: &dyn Queueable) -> Result<(), Error> { + let new_task = MyFailingTask::new(self.number + 1); + + queue.insert_task(&new_task).unwrap(); + + log::info!( + "Failing task number {}, Thread name:{}", + self.number, + self.current_thread_name + ); + + thread::sleep(Duration::from_secs(3)); + + let b = true; + + if b { + panic!("Hello!"); + } else { + Ok(()) + } + } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } +} diff --git a/fang_examples/blocking/simple_worker/src/main.rs b/fang_examples/blocking/simple_worker/src/main.rs new file mode 100644 index 0000000..a162047 --- /dev/null +++ b/fang_examples/blocking/simple_worker/src/main.rs @@ -0,0 +1,50 @@ +use diesel::r2d2; +use dotenv::dotenv; +use fang::PgConnection; +use fang::Queue; +use fang::Queueable; +use fang::RetentionMode; +use fang::WorkerPool; +use simple_worker::MyFailingTask; +use simple_worker::MyTask; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let manager = r2d2::ConnectionManager::::new(database_url); + + r2d2::Pool::builder() + .max_size(pool_size) + .build(manager) + .unwrap() +} + +fn main() { + dotenv().ok(); + + env_logger::init(); + + let queue = Queue::builder().connection_pool(connection_pool(3)).build(); + + let mut worker_pool = WorkerPool::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .number_of_workers(3_u32) + .task_type("worker_pool_test".to_string()) + .build(); + + worker_pool.queue.insert_task(&MyTask::new(1)).unwrap(); + worker_pool.queue.insert_task(&MyTask::new(1000)).unwrap(); + + worker_pool + .queue + .insert_task(&MyFailingTask::new(5000)) + .unwrap(); + + worker_pool.start().unwrap(); + + sleep(Duration::from_secs(100)) +} diff --git a/fang_examples/simple_worker/src/lib.rs b/fang_examples/simple_worker/src/lib.rs deleted file mode 100644 index f79c02d..0000000 --- a/fang_examples/simple_worker/src/lib.rs +++ /dev/null @@ -1,44 +0,0 @@ -use fang::serde::{Deserialize, Serialize}; -use fang::typetag; -use fang::Error; -use fang::PgConnection; -use fang::Queue; -use fang::Runnable; -use std::thread; -use std::time::Duration; - -#[derive(Serialize, Deserialize)] -#[serde(crate = "fang::serde")] -pub struct MyJob { - pub number: u16, - pub current_thread_name: String, -} - -impl MyJob { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } -} - -#[typetag::serde] -impl Runnable for MyJob { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_job = MyJob::new(self.number + 1); - - Queue::push_task_query(connection, &new_job).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "worker_pool_test".to_string() - } -} diff --git a/fang_examples/simple_worker/src/main.rs b/fang_examples/simple_worker/src/main.rs deleted file mode 100644 index ae66e39..0000000 --- a/fang_examples/simple_worker/src/main.rs +++ /dev/null @@ -1,41 +0,0 @@ -use dotenv::dotenv; -use fang::Queue; -use fang::RetentionMode; -use fang::WorkerParams; -use fang::WorkerPool; -use signal_hook::{consts::signal::*, iterator::Signals}; -use simple_worker::MyJob; - -fn main() { - dotenv().ok(); - - env_logger::init(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - - let mut worker_pool = WorkerPool::new_with_params(2, worker_params); - worker_pool.start().unwrap(); - - let queue = Queue::new(); - - queue.push_task(&MyJob::new(1)).unwrap(); - queue.push_task(&MyJob::new(1000)).unwrap(); - - let mut signals = Signals::new(&[SIGINT, SIGTERM]).unwrap(); - for sig in signals.forever() { - match sig { - SIGINT => { - println!("Received SIGINT"); - worker_pool.shutdown().unwrap(); - break; - } - SIGTERM => { - println!("Received SIGTERM"); - worker_pool.shutdown().unwrap(); - break; - } - _ => unreachable!("Received unexpected signal: {:?}", sig), - } - } -} diff --git a/src/asynk/mod.rs b/src/asynk.rs similarity index 69% rename from src/asynk/mod.rs rename to src/asynk.rs index 994f702..b63cf51 100644 --- a/src/asynk/mod.rs +++ b/src/asynk.rs @@ -3,5 +3,8 @@ pub mod async_runnable; pub mod async_worker; pub mod async_worker_pool; +pub use async_queue::*; pub use async_runnable::AsyncRunnable; pub use async_runnable::Error as AsyncError; +pub use async_worker::*; +pub use async_worker_pool::*; diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 715b21d..98508b1 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,6 +1,7 @@ -use crate::async_runnable::Scheduled::*; use crate::asynk::async_runnable::AsyncRunnable; use crate::asynk::async_runnable::Error as FangError; +use crate::CronError; +use crate::Scheduled::*; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::bb8::RunError; @@ -76,16 +77,6 @@ pub struct Task { pub updated_at: DateTime, } -#[derive(Debug, Error)] -pub enum CronError { - #[error(transparent)] - LibraryError(#[from] cron::error::Error), - #[error("You have to implement method `cron()` in your AsyncRunnable")] - TaskNotSchedulableError, - #[error("No timestamps match with this cron pattern")] - NoTimestampsError, -} - #[derive(Debug, Error)] pub enum AsyncQueueError { #[error(transparent)] @@ -717,9 +708,9 @@ mod async_queue_tests { use super::AsyncQueueable; use super::FangTaskState; use super::Task; - use crate::async_runnable::Scheduled; use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; + use crate::Scheduled; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index 1a0ddd6..752329d 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -1,7 +1,6 @@ use crate::asynk::async_queue::AsyncQueueable; +use crate::Scheduled; use async_trait::async_trait; -use chrono::DateTime; -use chrono::Utc; const COMMON_TYPE: &str = "common"; @@ -10,11 +9,6 @@ pub struct Error { pub description: String, } -pub enum Scheduled { - CronPattern(String), - ScheduleOnce(DateTime), -} - #[typetag::serde(tag = "type")] #[async_trait] pub trait AsyncRunnable: Send + Sync { diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index 36b4f34..f63057f 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -1,10 +1,10 @@ -use crate::async_runnable::Scheduled::*; use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_queue::Task; use crate::asynk::async_queue::DEFAULT_TASK_TYPE; use crate::asynk::async_runnable::AsyncRunnable; use crate::asynk::AsyncError as Error; +use crate::Scheduled::*; use crate::{RetentionMode, SleepParams}; use log::error; use typed_builder::TypedBuilder; @@ -242,7 +242,6 @@ impl<'a> AsyncWorkerTest<'a> { #[cfg(test)] mod async_worker_tests { use super::AsyncWorkerTest; - use crate::async_runnable::Scheduled; use crate::asynk::async_queue::AsyncQueueTest; use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; @@ -250,6 +249,7 @@ mod async_worker_tests { use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; use crate::RetentionMode; + use crate::Scheduled; use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::tokio_postgres::NoTls; @@ -283,7 +283,7 @@ mod async_worker_tests { Ok(()) } fn cron(&self) -> Option { - Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(7))) + Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(1))) } } @@ -383,7 +383,7 @@ mod async_worker_tests { assert_eq!(id, task.id); assert_eq!(FangTaskState::New, task.state); - tokio::time::sleep(core::time::Duration::from_secs(10)).await; + tokio::time::sleep(core::time::Duration::from_secs(3)).await; worker.run_tasks_until_none().await.unwrap(); diff --git a/src/blocking/mod.rs b/src/blocking.rs similarity index 65% rename from src/blocking/mod.rs rename to src/blocking.rs index f2aaf7c..59f30a2 100644 --- a/src/blocking/mod.rs +++ b/src/blocking.rs @@ -1,13 +1,12 @@ pub mod error; -pub mod executor; pub mod queue; -pub mod scheduler; +pub mod runnable; pub mod schema; +pub mod worker; pub mod worker_pool; pub use error::FangError; -pub use executor::*; pub use queue::*; -pub use scheduler::*; pub use schema::*; +pub use worker::*; pub use worker_pool::*; diff --git a/src/blocking/error.rs b/src/blocking/error.rs index a1cb125..f7da9bc 100644 --- a/src/blocking/error.rs +++ b/src/blocking/error.rs @@ -1,3 +1,4 @@ +use crate::blocking::queue::QueueError; use std::io::Error as IoError; use std::sync::PoisonError; use thiserror::Error; @@ -6,7 +7,8 @@ use thiserror::Error; pub enum FangError { #[error("The shared state in an executor thread became poisoned")] PoisonedLock, - + #[error(transparent)] + QueueError(#[from] QueueError), #[error("Failed to create executor thread")] ExecutorThreadCreationFailed { #[from] diff --git a/src/blocking/executor.rs b/src/blocking/executor.rs deleted file mode 100644 index 8367132..0000000 --- a/src/blocking/executor.rs +++ /dev/null @@ -1,328 +0,0 @@ -use crate::error::FangError; -use crate::queue::Queue; -use crate::queue::Task; -use crate::worker_pool::{SharedState, WorkerState}; -use crate::{RetentionMode, SleepParams}; -use diesel::pg::PgConnection; -use diesel::r2d2::{ConnectionManager, PooledConnection}; -use log::error; -use std::thread; - -pub struct Executor { - pub pooled_connection: PooledConnection>, - pub task_type: Option, - pub sleep_params: SleepParams, - pub retention_mode: RetentionMode, - shared_state: Option, -} - -#[derive(Debug)] -pub struct Error { - pub description: String, -} - -#[typetag::serde(tag = "type")] -pub trait Runnable { - fn run(&self, connection: &PgConnection) -> Result<(), Error>; - - fn task_type(&self) -> String { - "common".to_string() - } -} - -impl Executor { - pub fn new(pooled_connection: PooledConnection>) -> Self { - Self { - pooled_connection, - sleep_params: SleepParams::default(), - retention_mode: RetentionMode::RemoveFinished, - task_type: None, - shared_state: None, - } - } - - pub fn set_shared_state(&mut self, shared_state: SharedState) { - self.shared_state = Some(shared_state); - } - - pub fn set_task_type(&mut self, task_type: String) { - self.task_type = Some(task_type); - } - - pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { - self.sleep_params = sleep_params; - } - - pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { - self.retention_mode = retention_mode; - } - - pub fn run(&self, task: Task) { - let result = self.execute_task(task); - self.finalize_task(result) - } - - pub fn run_tasks(&mut self) -> Result<(), FangError> { - loop { - if let Some(ref shared_state) = self.shared_state { - let shared_state = shared_state.read()?; - if let WorkerState::Shutdown = *shared_state { - return Ok(()); - } - } - - match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) { - Ok(Some(task)) => { - self.maybe_reset_sleep_period(); - self.run(task); - } - Ok(None) => { - self.sleep(); - } - - Err(error) => { - error!("Failed to fetch a task {:?}", error); - - self.sleep(); - } - }; - } - } - - pub fn maybe_reset_sleep_period(&mut self) { - self.sleep_params.maybe_reset_sleep_period(); - } - - pub fn sleep(&mut self) { - self.sleep_params.maybe_increase_sleep_period(); - - thread::sleep(self.sleep_params.sleep_period); - } - - fn execute_task(&self, task: Task) -> Result { - let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); - let task_result = actual_task.run(&self.pooled_connection); - - match task_result { - Ok(()) => Ok(task), - Err(error) => Err((task, error.description)), - } - } - - fn finalize_task(&self, result: Result) { - match self.retention_mode { - RetentionMode::KeepAll => { - match result { - Ok(task) => Queue::finish_task_query(&self.pooled_connection, &task).unwrap(), - Err((task, error)) => { - Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap() - } - }; - } - RetentionMode::RemoveAll => { - match result { - Ok(task) => Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(), - Err((task, _error)) => { - Queue::remove_task_query(&self.pooled_connection, task.id).unwrap() - } - }; - } - RetentionMode::RemoveFinished => match result { - Ok(task) => { - Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(); - } - Err((task, error)) => { - Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap(); - } - }, - } - } -} - -#[cfg(test)] -mod executor_tests { - use super::Error; - use super::Executor; - use super::RetentionMode; - use super::Runnable; - use crate::queue::Queue; - use crate::schema::FangTaskState; - use crate::typetag; - use crate::NewTask; - use diesel::connection::Connection; - use diesel::pg::PgConnection; - use diesel::r2d2::{ConnectionManager, PooledConnection}; - use serde::{Deserialize, Serialize}; - - #[derive(Serialize, Deserialize)] - struct ExecutorTaskTest { - pub number: u16, - } - - #[typetag::serde] - impl Runnable for ExecutorTaskTest { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - println!("the number is {}", self.number); - - Ok(()) - } - } - - #[derive(Serialize, Deserialize)] - struct FailedTask { - pub number: u16, - } - - #[typetag::serde] - impl Runnable for FailedTask { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - let message = format!("the number is {}", self.number); - - Err(Error { - description: message, - }) - } - } - - #[derive(Serialize, Deserialize)] - struct TaskType1 {} - - #[typetag::serde] - impl Runnable for TaskType1 { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "type1".to_string() - } - } - - #[derive(Serialize, Deserialize)] - struct TaskType2 {} - - #[typetag::serde] - impl Runnable for TaskType2 { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "type2".to_string() - } - } - - pub fn serialize(job: &dyn Runnable) -> serde_json::Value { - serde_json::to_value(job).unwrap() - } - - #[test] - fn executes_and_finishes_task() { - let job = ExecutorTaskTest { number: 10 }; - - let new_task = NewTask { - metadata: serialize(&job), - task_type: "common".to_string(), - }; - - let mut executor = Executor::new(pooled_connection()); - executor.set_retention_mode(RetentionMode::KeepAll); - - executor - .pooled_connection - .test_transaction::<(), Error, _>(|| { - let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap(); - - assert_eq!(FangTaskState::New, task.state); - - executor.run(task.clone()); - - let found_task = - Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap(); - - assert_eq!(FangTaskState::Finished, found_task.state); - - Ok(()) - }); - } - - #[test] - #[ignore] - fn executes_task_only_of_specific_type() { - let task1 = TaskType1 {}; - let task2 = TaskType2 {}; - - let new_task1 = NewTask { - metadata: serialize(&task1), - task_type: "type1".to_string(), - }; - - let new_task2 = NewTask { - metadata: serialize(&task2), - task_type: "type2".to_string(), - }; - - let executor = Executor::new(pooled_connection()); - - let task1 = Queue::insert_query(&executor.pooled_connection, &new_task1).unwrap(); - let task2 = Queue::insert_query(&executor.pooled_connection, &new_task2).unwrap(); - - assert_eq!(FangTaskState::New, task1.state); - assert_eq!(FangTaskState::New, task2.state); - - std::thread::spawn(move || { - let mut executor = Executor::new(pooled_connection()); - executor.set_retention_mode(RetentionMode::KeepAll); - executor.set_task_type("type1".to_string()); - - executor.run_tasks().unwrap(); - }); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - - let found_task1 = - Queue::find_task_by_id_query(&executor.pooled_connection, task1.id).unwrap(); - assert_eq!(FangTaskState::Finished, found_task1.state); - - let found_task2 = - Queue::find_task_by_id_query(&executor.pooled_connection, task2.id).unwrap(); - assert_eq!(FangTaskState::New, found_task2.state); - } - - #[test] - fn saves_error_for_failed_task() { - let task = FailedTask { number: 10 }; - - let new_task = NewTask { - metadata: serialize(&task), - task_type: "common".to_string(), - }; - - let executor = Executor::new(pooled_connection()); - - executor - .pooled_connection - .test_transaction::<(), Error, _>(|| { - let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap(); - - assert_eq!(FangTaskState::New, task.state); - - executor.run(task.clone()); - - let found_task = - Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap(); - - assert_eq!(FangTaskState::Failed, found_task.state); - assert_eq!( - "the number is 10".to_string(), - found_task.error_message.unwrap() - ); - - Ok(()) - }); - } - - fn pooled_connection() -> PooledConnection> { - Queue::connection_pool(5).get().unwrap() - } -} diff --git a/src/blocking/queue.rs b/src/blocking/queue.rs index 868c65d..249d4dd 100644 --- a/src/blocking/queue.rs +++ b/src/blocking/queue.rs @@ -1,183 +1,272 @@ -use crate::executor::Runnable; -use crate::schema::fang_periodic_tasks; +use crate::runnable::Runnable; use crate::schema::fang_tasks; use crate::schema::FangTaskState; +use crate::CronError; +use crate::Scheduled::*; use chrono::DateTime; -use chrono::Duration; use chrono::Utc; +use cron::Schedule; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2; -use diesel::result::Error; -use dotenv::dotenv; -use std::env; -use std::time::Duration as StdDuration; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::PoolError; +use diesel::r2d2::PooledConnection; +use diesel::result::Error as DieselError; +use sha2::Digest; +use sha2::Sha256; +use std::str::FromStr; +use thiserror::Error; +use typed_builder::TypedBuilder; use uuid::Uuid; -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] +#[cfg(test)] +use dotenv::dotenv; +#[cfg(test)] +use std::env; + +pub type PoolConnection = PooledConnection>; + +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone, TypedBuilder)] #[table_name = "fang_tasks"] pub struct Task { + #[builder(setter(into))] pub id: Uuid, + #[builder(setter(into))] pub metadata: serde_json::Value, + #[builder(setter(into))] pub error_message: Option, + #[builder(setter(into))] pub state: FangTaskState, + #[builder(setter(into))] pub task_type: String, + #[builder(setter(into))] + pub uniq_hash: Option, + #[builder(setter(into))] + pub scheduled_at: DateTime, + #[builder(setter(into))] pub created_at: DateTime, + #[builder(setter(into))] pub updated_at: DateTime, } -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] -#[table_name = "fang_periodic_tasks"] -pub struct PeriodicTask { - pub id: Uuid, - pub metadata: serde_json::Value, - pub period_in_millis: i64, - pub scheduled_at: Option>, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Insertable)] +#[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)] #[table_name = "fang_tasks"] pub struct NewTask { - pub metadata: serde_json::Value, - pub task_type: String, + #[builder(setter(into))] + metadata: serde_json::Value, + #[builder(setter(into))] + task_type: String, + #[builder(setter(into))] + uniq_hash: Option, + #[builder(setter(into))] + scheduled_at: DateTime, } -#[derive(Insertable)] -#[table_name = "fang_periodic_tasks"] -pub struct NewPeriodicTask { - pub metadata: serde_json::Value, - pub period_in_millis: i64, +#[derive(Debug, Error)] +pub enum QueueError { + #[error(transparent)] + DieselError(#[from] DieselError), + #[error(transparent)] + PoolError(#[from] PoolError), + #[error(transparent)] + CronError(#[from] CronError), } +impl From for QueueError { + fn from(error: cron::error::Error) -> Self { + QueueError::CronError(CronError::LibraryError(error)) + } +} + +pub trait Queueable { + fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError>; + + fn insert_task(&self, params: &dyn Runnable) -> Result; + + fn remove_all_tasks(&self) -> Result; + + fn remove_tasks_of_type(&self, task_type: &str) -> Result; + + fn remove_task(&self, id: Uuid) -> Result; + + fn find_task_by_id(&self, id: Uuid) -> Option; + + fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result; + + fn fail_task(&self, task: &Task, error: String) -> Result; + + fn schedule_task(&self, task: &dyn Runnable) -> Result; +} + +#[derive(Clone, TypedBuilder)] pub struct Queue { - pub connection: PgConnection, + #[builder(setter(into))] + pub connection_pool: r2d2::Pool>, } -impl Default for Queue { - fn default() -> Self { - Self::new() +impl Queueable for Queue { + fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError> { + let connection = self.get_connection()?; + + Self::fetch_and_touch_query(&connection, task_type) + } + + fn insert_task(&self, params: &dyn Runnable) -> Result { + let connection = self.get_connection()?; + + Self::insert_query(&connection, params, Utc::now()) + } + fn schedule_task(&self, params: &dyn Runnable) -> Result { + let connection = self.get_connection()?; + + Self::schedule_task_query(&connection, params) + } + fn remove_all_tasks(&self) -> Result { + let connection = self.get_connection()?; + + Self::remove_all_tasks_query(&connection) + } + + fn remove_tasks_of_type(&self, task_type: &str) -> Result { + let connection = self.get_connection()?; + + Self::remove_tasks_of_type_query(&connection, task_type) + } + + fn remove_task(&self, id: Uuid) -> Result { + let connection = self.get_connection()?; + + Self::remove_task_query(&connection, id) + } + + fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result { + let connection = self.get_connection()?; + + Self::update_task_state_query(&connection, task, state) + } + + fn fail_task(&self, task: &Task, error: String) -> Result { + let connection = self.get_connection()?; + + Self::fail_task_query(&connection, task, error) + } + + fn find_task_by_id(&self, id: Uuid) -> Option { + let connection = self.get_connection().unwrap(); + + Self::find_task_by_id_query(&connection, id) } } impl Queue { - pub fn new() -> Self { - let connection = Self::pg_connection(None); + pub fn get_connection(&self) -> Result { + let result = self.connection_pool.get(); - Self { connection } - } - - pub fn new_with_url(database_url: String) -> Self { - let connection = Self::pg_connection(Some(database_url)); - - Self { connection } - } - - pub fn new_with_connection(connection: PgConnection) -> Self { - Self { connection } - } - - pub fn push_task(&self, task: &dyn Runnable) -> Result { - Self::push_task_query(&self.connection, task) - } - - pub fn push_task_query(connection: &PgConnection, task: &dyn Runnable) -> Result { - let json_task = serde_json::to_value(task).unwrap(); - - match Self::find_task_by_metadata_query(connection, &json_task) { - Some(task) => Ok(task), - None => { - let new_task = NewTask { - metadata: json_task.clone(), - task_type: task.task_type(), - }; - Self::insert_query(connection, &new_task) - } + if let Err(err) = result { + log::error!("Failed to get a db connection {:?}", err); + return Err(QueueError::PoolError(err)); } + + Ok(result.unwrap()) } - pub fn push_periodic_task( - &self, - task: &dyn Runnable, - period: i64, - ) -> Result { - Self::push_periodic_task_query(&self.connection, task, period) - } - - pub fn push_periodic_task_query( + pub fn schedule_task_query( connection: &PgConnection, - task: &dyn Runnable, - period: i64, - ) -> Result { - let json_task = serde_json::to_value(task).unwrap(); + params: &dyn Runnable, + ) -> Result { + let scheduled_at = match params.cron() { + Some(scheduled) => match scheduled { + CronPattern(cron_pattern) => { + let schedule = Schedule::from_str(&cron_pattern)?; + let mut iterator = schedule.upcoming(Utc); - match Self::find_periodic_task_by_metadata_query(connection, &json_task) { - Some(task) => Ok(task), + iterator + .next() + .ok_or(QueueError::CronError(CronError::NoTimestampsError))? + } + ScheduleOnce(datetime) => datetime, + }, None => { - let new_task = NewPeriodicTask { - metadata: json_task, - period_in_millis: period, - }; + return Err(QueueError::CronError(CronError::TaskNotSchedulableError)); + } + }; - diesel::insert_into(fang_periodic_tasks::table) - .values(new_task) - .get_result::(connection) + Self::insert_query(connection, params, scheduled_at) + } + + fn calculate_hash(json: String) -> String { + let mut hasher = Sha256::new(); + hasher.update(json.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + } + + pub fn insert_query( + connection: &PgConnection, + params: &dyn Runnable, + scheduled_at: DateTime, + ) -> Result { + if !params.uniq() { + let new_task = NewTask::builder() + .scheduled_at(scheduled_at) + .uniq_hash(None) + .task_type(params.task_type()) + .metadata(serde_json::to_value(params).unwrap()) + .build(); + + Ok(diesel::insert_into(fang_tasks::table) + .values(new_task) + .get_result::(connection)?) + } else { + let metadata = serde_json::to_value(params).unwrap(); + + let uniq_hash = Self::calculate_hash(metadata.to_string()); + + match Self::find_task_by_uniq_hash_query(connection, &uniq_hash) { + Some(task) => Ok(task), + None => { + let new_task = NewTask::builder() + .scheduled_at(scheduled_at) + .uniq_hash(Some(uniq_hash)) + .task_type(params.task_type()) + .metadata(serde_json::to_value(params).unwrap()) + .build(); + + Ok(diesel::insert_into(fang_tasks::table) + .values(new_task) + .get_result::(connection)?) + } } } } - pub fn enqueue_task(task: &dyn Runnable) -> Result { - Self::new().push_task(task) - } - - pub fn insert(&self, params: &NewTask) -> Result { - Self::insert_query(&self.connection, params) - } - - pub fn insert_query(connection: &PgConnection, params: &NewTask) -> Result { - diesel::insert_into(fang_tasks::table) - .values(params) - .get_result::(connection) - } - - pub fn fetch_task(&self, task_type: &Option) -> Option { - Self::fetch_task_query(&self.connection, task_type) - } - - pub fn fetch_task_query(connection: &PgConnection, task_type: &Option) -> Option { - match task_type { - None => Self::fetch_any_task_query(connection), - Some(task_type_str) => Self::fetch_task_of_type_query(connection, task_type_str), - } - } - - pub fn fetch_and_touch(&self, task_type: &Option) -> Result, Error> { - Self::fetch_and_touch_query(&self.connection, task_type) + pub fn fetch_task_query(connection: &PgConnection, task_type: String) -> Option { + Self::fetch_task_of_type_query(connection, &task_type) } pub fn fetch_and_touch_query( connection: &PgConnection, - task_type: &Option, - ) -> Result, Error> { - connection.transaction::, Error, _>(|| { + task_type: String, + ) -> Result, QueueError> { + connection.transaction::, QueueError, _>(|| { let found_task = Self::fetch_task_query(connection, task_type); if found_task.is_none() { return Ok(None); } - match Self::start_processing_task_query(connection, &found_task.unwrap()) { + match Self::update_task_state_query( + connection, + &found_task.unwrap(), + FangTaskState::InProgress, + ) { Ok(updated_task) => Ok(Some(updated_task)), Err(err) => Err(err), } }) } - pub fn find_task_by_id(&self, id: Uuid) -> Option { - Self::find_task_by_id_query(&self.connection, id) - } - pub fn find_task_by_id_query(connection: &PgConnection, id: Uuid) -> Option { fang_tasks::table .filter(fang_tasks::id.eq(id)) @@ -185,144 +274,57 @@ impl Queue { .ok() } - pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option { - Self::find_periodic_task_by_id_query(&self.connection, id) - } - - pub fn find_periodic_task_by_id_query( - connection: &PgConnection, - id: Uuid, - ) -> Option { - fang_periodic_tasks::table - .filter(fang_periodic_tasks::id.eq(id)) - .first::(connection) - .ok() - } - - pub fn fetch_periodic_tasks(&self, error_margin: StdDuration) -> Option> { - Self::fetch_periodic_tasks_query(&self.connection, error_margin) - } - - pub fn fetch_periodic_tasks_query( - connection: &PgConnection, - error_margin: StdDuration, - ) -> Option> { - let current_time = Self::current_time(); - - let margin = Duration::from_std(error_margin).unwrap(); - - let low_limit = current_time - margin; - let high_limit = current_time + margin; - - fang_periodic_tasks::table - .filter( - fang_periodic_tasks::scheduled_at - .gt(low_limit) - .and(fang_periodic_tasks::scheduled_at.lt(high_limit)), - ) - .or_filter(fang_periodic_tasks::scheduled_at.is_null()) - .load::(connection) - .ok() - } - - pub fn schedule_next_task_execution(&self, task: &PeriodicTask) -> Result { - let current_time = Self::current_time(); - let scheduled_at = current_time + Duration::milliseconds(task.period_in_millis); - - diesel::update(task) - .set(( - fang_periodic_tasks::scheduled_at.eq(scheduled_at), - fang_periodic_tasks::updated_at.eq(current_time), - )) - .get_result::(&self.connection) - } - - pub fn remove_all_tasks(&self) -> Result { - Self::remove_all_tasks_query(&self.connection) - } - - pub fn remove_all_tasks_query(connection: &PgConnection) -> Result { - diesel::delete(fang_tasks::table).execute(connection) - } - - pub fn remove_tasks_of_type(&self, task_type: &str) -> Result { - Self::remove_tasks_of_type_query(&self.connection, task_type) + pub fn remove_all_tasks_query(connection: &PgConnection) -> Result { + Ok(diesel::delete(fang_tasks::table).execute(connection)?) } pub fn remove_tasks_of_type_query( connection: &PgConnection, task_type: &str, - ) -> Result { + ) -> Result { let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type)); - diesel::delete(query).execute(connection) + Ok(diesel::delete(query).execute(connection)?) } - pub fn remove_all_periodic_tasks(&self) -> Result { - Self::remove_all_periodic_tasks_query(&self.connection) - } - - pub fn remove_all_periodic_tasks_query(connection: &PgConnection) -> Result { - diesel::delete(fang_periodic_tasks::table).execute(connection) - } - - pub fn remove_task(&self, id: Uuid) -> Result { - Self::remove_task_query(&self.connection, id) - } - - pub fn remove_task_query(connection: &PgConnection, id: Uuid) -> Result { + pub fn remove_task_query(connection: &PgConnection, id: Uuid) -> Result { let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); - diesel::delete(query).execute(connection) + Ok(diesel::delete(query).execute(connection)?) } - pub fn finish_task(&self, task: &Task) -> Result { - Self::finish_task_query(&self.connection, task) - } - - pub fn finish_task_query(connection: &PgConnection, task: &Task) -> Result { - diesel::update(task) - .set(( - fang_tasks::state.eq(FangTaskState::Finished), - fang_tasks::updated_at.eq(Self::current_time()), - )) - .get_result::(connection) - } - - pub fn start_processing_task(&self, task: &Task) -> Result { - Self::start_processing_task_query(&self.connection, task) - } - - pub fn start_processing_task_query( + pub fn update_task_state_query( connection: &PgConnection, task: &Task, - ) -> Result { - diesel::update(task) + state: FangTaskState, + ) -> Result { + Ok(diesel::update(task) .set(( - fang_tasks::state.eq(FangTaskState::InProgress), + fang_tasks::state.eq(state), fang_tasks::updated_at.eq(Self::current_time()), )) - .get_result::(connection) - } - - pub fn fail_task(&self, task: &Task, error: String) -> Result { - Self::fail_task_query(&self.connection, task, error) + .get_result::(connection)?) } pub fn fail_task_query( connection: &PgConnection, task: &Task, error: String, - ) -> Result { - diesel::update(task) + ) -> Result { + Ok(diesel::update(task) .set(( fang_tasks::state.eq(FangTaskState::Failed), fang_tasks::error_message.eq(error), fang_tasks::updated_at.eq(Self::current_time()), )) - .get_result::(connection) + .get_result::(connection)?) } + fn current_time() -> DateTime { + Utc::now() + } + + #[cfg(test)] pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { dotenv().ok(); @@ -336,36 +338,12 @@ impl Queue { .unwrap() } - fn current_time() -> DateTime { - Utc::now() - } - - fn pg_connection(database_url: Option) -> PgConnection { - dotenv().ok(); - - let url = match database_url { - Some(string_url) => string_url, - None => env::var("DATABASE_URL").expect("DATABASE_URL must be set"), - }; - - PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)) - } - - fn fetch_any_task_query(connection: &PgConnection) -> Option { - fang_tasks::table - .order(fang_tasks::created_at.asc()) - .limit(1) - .filter(fang_tasks::state.eq(FangTaskState::New)) - .for_update() - .skip_locked() - .get_result::(connection) - .ok() - } - fn fetch_task_of_type_query(connection: &PgConnection, task_type: &str) -> Option { fang_tasks::table .order(fang_tasks::created_at.asc()) + .order(fang_tasks::scheduled_at.asc()) .limit(1) + .filter(fang_tasks::scheduled_at.le(Utc::now())) .filter(fang_tasks::state.eq(FangTaskState::New)) .filter(fang_tasks::task_type.eq(task_type)) .for_update() @@ -374,27 +352,10 @@ impl Queue { .ok() } - fn find_periodic_task_by_metadata_query( - connection: &PgConnection, - metadata: &serde_json::Value, - ) -> Option { - fang_periodic_tasks::table - .filter(fang_periodic_tasks::metadata.eq(metadata)) - .first::(connection) - .ok() - } - - fn find_task_by_metadata_query( - connection: &PgConnection, - metadata: &serde_json::Value, - ) -> Option { + fn find_task_by_uniq_hash_query(connection: &PgConnection, uniq_hash: &str) -> Option { fang_tasks::table - .filter(fang_tasks::metadata.eq(metadata)) - .filter( - fang_tasks::state - .eq(FangTaskState::New) - .or(fang_tasks::state.eq(FangTaskState::InProgress)), - ) + .filter(fang_tasks::uniq_hash.eq(uniq_hash)) + .filter(fang_tasks::state.eq(FangTaskState::New)) .first::(connection) .ok() } @@ -402,422 +363,17 @@ impl Queue { #[cfg(test)] mod queue_tests { - use super::NewTask; - use super::PeriodicTask; use super::Queue; - use super::Task; - use crate::executor::Error as ExecutorError; - use crate::executor::Runnable; - use crate::schema::fang_periodic_tasks; - use crate::schema::fang_tasks; + use super::Queueable; + use crate::runnable::Runnable; + use crate::runnable::COMMON_TYPE; use crate::schema::FangTaskState; use crate::typetag; - use chrono::prelude::*; - use chrono::{DateTime, Duration, Utc}; + use crate::worker::Error as WorkerError; + use chrono::Utc; use diesel::connection::Connection; - use diesel::prelude::*; use diesel::result::Error; use serde::{Deserialize, Serialize}; - use std::time::Duration as StdDuration; - - #[test] - fn insert_inserts_task() { - let queue = Queue::new(); - - let new_task = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - let result = queue - .connection - .test_transaction::(|| queue.insert(&new_task)); - - assert_eq!(result.state, FangTaskState::New); - assert_eq!(result.error_message, None); - } - - #[test] - fn fetch_task_fetches_the_oldest_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let timestamp1 = Utc::now() - Duration::hours(40); - - let task1 = insert_task(serde_json::json!(true), timestamp1, &queue.connection); - - let timestamp2 = Utc::now() - Duration::hours(20); - - insert_task(serde_json::json!(false), timestamp2, &queue.connection); - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1.id); - - Ok(()) - }); - } - - #[test] - fn finish_task_updates_state_field() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_task(&queue.connection); - - let updated_task = queue.finish_task(&task).unwrap(); - - assert_eq!(FangTaskState::Finished, updated_task.state); - - Ok(()) - }); - } - - #[test] - fn fail_task_updates_state_field_and_sets_error_message() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_task(&queue.connection); - let error = "Failed".to_string(); - - let updated_task = queue.fail_task(&task, error.clone()).unwrap(); - - assert_eq!(FangTaskState::Failed, updated_task.state); - assert_eq!(error, updated_task.error_message.unwrap()); - - Ok(()) - }); - } - - #[test] - fn fetch_and_touch_updates_state() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let _task = insert_new_task(&queue.connection); - - let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap(); - - assert_eq!(FangTaskState::InProgress, updated_task.state); - - Ok(()) - }); - } - - #[test] - fn fetch_and_touch_returns_none() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = queue.fetch_and_touch(&None).unwrap(); - - assert_eq!(None, task); - - Ok(()) - }); - } - - #[test] - fn push_task_serializes_and_inserts_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_task(&task).unwrap(); - - let mut m = serde_json::value::Map::new(); - m.insert( - "number".to_string(), - serde_json::value::Value::Number(10.into()), - ); - m.insert( - "type".to_string(), - serde_json::value::Value::String("PepeTask".to_string()), - ); - - assert_eq!(task.metadata, serde_json::value::Value::Object(m)); - - Ok(()) - }); - } - - #[test] - fn push_task_does_not_insert_the_same_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task2 = queue.push_task(&task).unwrap(); - - let task1 = queue.push_task(&task).unwrap(); - - assert_eq!(task1.id, task2.id); - - Ok(()) - }); - } - - #[test] - fn push_periodic_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_periodic_task(&task, 60_i64).unwrap(); - - assert_eq!(task.period_in_millis, 60_i64); - assert!(queue.find_periodic_task_by_id(task.id).is_some()); - - Ok(()) - }); - } - - #[test] - fn push_periodic_task_returns_existing_task() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task1 = queue.push_periodic_task(&task, 60).unwrap(); - - let task2 = queue.push_periodic_task(&task, 60).unwrap(); - - assert_eq!(task1.id, task2.id); - - Ok(()) - }); - } - - #[test] - fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = PepeTask { number: 10 }; - let task = queue.push_periodic_task(&task, 60).unwrap(); - - let schedule_in_future = Utc::now() + Duration::hours(100); - - insert_periodic_task( - serde_json::json!(true), - schedule_in_future, - 100, - &queue.connection, - ); - - let tasks = queue - .fetch_periodic_tasks(StdDuration::from_secs(100)) - .unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].id, task.id); - - Ok(()) - }); - } - - #[test] - fn schedule_next_task_execution() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_periodic_task( - serde_json::json!(true), - Utc::now(), - 100000, - &queue.connection, - ); - - let updated_task = queue.schedule_next_task_execution(&task).unwrap(); - - let next_schedule = (task.scheduled_at.unwrap() - + Duration::milliseconds(task.period_in_millis)) - .round_subsecs(0); - - assert_eq!( - next_schedule, - updated_task.scheduled_at.unwrap().round_subsecs(0) - ); - - Ok(()) - }); - } - - #[test] - fn remove_all_periodic_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = - insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); - - let result = queue.remove_all_periodic_tasks().unwrap(); - - assert_eq!(1, result); - - assert_eq!(None, queue.find_periodic_task_by_id(task.id)); - - Ok(()) - }); - } - - #[test] - fn remove_all_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_task(serde_json::json!(true), Utc::now(), &queue.connection); - let result = queue.remove_all_tasks().unwrap(); - - assert_eq!(1, result); - - assert_eq!(None, queue.find_task_by_id(task.id)); - - Ok(()) - }); - } - - #[test] - fn fetch_periodic_tasks() { - let queue = Queue::new(); - - queue.connection.test_transaction::<(), Error, _>(|| { - let schedule_in_future = Utc::now() + Duration::hours(100); - - insert_periodic_task( - serde_json::json!(true), - schedule_in_future, - 100, - &queue.connection, - ); - - let task = - insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); - - let tasks = queue - .fetch_periodic_tasks(StdDuration::from_secs(100)) - .unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].id, task.id); - - Ok(()) - }); - } - - #[test] - fn remove_task() { - let queue = Queue::new(); - - let new_task1 = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - let new_task2 = NewTask { - metadata: serde_json::json!(true), - task_type: "common".to_string(), - }; - - queue.connection.test_transaction::<(), Error, _>(|| { - let task1 = queue.insert(&new_task1).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_some()); - - let task2 = queue.insert(&new_task2).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_task(task1.id).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_none()); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_task(task2.id).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_none()); - - Ok(()) - }); - } - - #[test] - fn remove_task_of_type() { - let queue = Queue::new(); - - let new_task1 = NewTask { - metadata: serde_json::json!(true), - task_type: "type1".to_string(), - }; - - let new_task2 = NewTask { - metadata: serde_json::json!(true), - task_type: "type2".to_string(), - }; - - queue.connection.test_transaction::<(), Error, _>(|| { - let task1 = queue.insert(&new_task1).unwrap(); - assert!(queue.find_task_by_id(task1.id).is_some()); - - let task2 = queue.insert(&new_task2).unwrap(); - assert!(queue.find_task_by_id(task2.id).is_some()); - - queue.remove_tasks_of_type("type1").unwrap(); - assert!(queue.find_task_by_id(task1.id).is_none()); - assert!(queue.find_task_by_id(task2.id).is_some()); - - Ok(()) - }); - } - - // this test is ignored because it commits data to the db - #[test] - #[ignore] - fn fetch_task_locks_the_record() { - let queue = Queue::new(); - let timestamp1 = Utc::now() - Duration::hours(40); - - let task1 = insert_task( - serde_json::json!(PepeTask { number: 12 }), - timestamp1, - &queue.connection, - ); - - let task1_id = task1.id; - - let timestamp2 = Utc::now() - Duration::hours(20); - - let task2 = insert_task( - serde_json::json!(PepeTask { number: 11 }), - timestamp2, - &queue.connection, - ); - - let thread = std::thread::spawn(move || { - let queue = Queue::new(); - - queue.connection.transaction::<(), Error, _>(|| { - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1.id); - - std::thread::sleep(std::time::Duration::from_millis(5000)); - - Ok(()) - }) - }); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task2.id); - - let _result = thread.join(); - - // returns unlocked record - - let found_task = queue.fetch_task(&None).unwrap(); - - assert_eq!(found_task.id, task1_id); - } #[derive(Serialize, Deserialize)] struct PepeTask { @@ -826,47 +382,301 @@ mod queue_tests { #[typetag::serde] impl Runnable for PepeTask { - fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), WorkerError> { println!("the number is {}", self.number); Ok(()) } + fn uniq(&self) -> bool { + true + } } - fn insert_task( - metadata: serde_json::Value, - timestamp: DateTime, - connection: &PgConnection, - ) -> Task { - diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(metadata), - fang_tasks::created_at.eq(timestamp), - )]) - .get_result::(connection) - .unwrap() + #[derive(Serialize, Deserialize)] + struct AyratTask { + pub number: u16, } - fn insert_periodic_task( - metadata: serde_json::Value, - timestamp: DateTime, - period_in_millis: i64, - connection: &PgConnection, - ) -> PeriodicTask { - diesel::insert_into(fang_periodic_tasks::table) - .values(&vec![( - fang_periodic_tasks::metadata.eq(metadata), - fang_periodic_tasks::scheduled_at.eq(timestamp), - fang_periodic_tasks::period_in_millis.eq(period_in_millis), - )]) - .get_result::(connection) - .unwrap() + #[typetag::serde] + impl Runnable for AyratTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), WorkerError> { + println!("the number is {}", self.number); + + Ok(()) + } + fn uniq(&self) -> bool { + true + } + + fn task_type(&self) -> String { + "weirdo".to_string() + } } - fn insert_new_task(connection: &PgConnection) -> Task { - diesel::insert_into(fang_tasks::table) - .values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)]) - .get_result::(connection) - .unwrap() + #[test] + fn insert_inserts_task() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(task.error_message, None); + assert_eq!(FangTaskState::New, task.state); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fetch_task_fetches_the_oldest_task() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap(); + let _task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap(); + + let found_task = + Queue::fetch_task_query(&queue_pooled_connection, COMMON_TYPE.to_string()).unwrap(); + assert_eq!(found_task.id, task1.id); + Ok(()) + }); + } + + #[test] + fn update_task_state_test() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + + let found_task = Queue::update_task_state_query( + &queue_pooled_connection, + &task, + FangTaskState::Finished, + ) + .unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::Finished); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fail_task_updates_state_field_and_sets_error_message() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + + let error = "Failed".to_string(); + + let found_task = + Queue::fail_task_query(&queue_pooled_connection, &task, error.clone()).unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::Failed); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + assert_eq!(found_task.error_message.unwrap(), error); + + Ok(()) + }); + } + + #[test] + fn fetch_and_touch_updates_state() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + + let found_task = + Queue::fetch_and_touch_query(&queue_pooled_connection, COMMON_TYPE.to_string()) + .unwrap() + .unwrap(); + + let metadata = found_task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + + assert_eq!(found_task.id, task.id); + assert_eq!(found_task.state, FangTaskState::InProgress); + assert_eq!(Some(10), number); + assert_eq!(Some("PepeTask"), type_task); + + Ok(()) + }); + } + + #[test] + fn fetch_and_touch_returns_none() { + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let found_task = + Queue::fetch_and_touch_query(&queue_pooled_connection, COMMON_TYPE.to_string()) + .unwrap(); + + assert_eq!(None, found_task); + + Ok(()) + }); + } + + #[test] + fn insert_task_uniq_test() { + let task = PepeTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task1 = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(task2.id, task1.id); + Ok(()) + }); + } + + #[test] + fn remove_all_tasks_test() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap(); + + let result = Queue::remove_all_tasks_query(&queue_pooled_connection).unwrap(); + + assert_eq!(2, result); + assert_eq!( + None, + Queue::find_task_by_id_query(&queue_pooled_connection, task1.id) + ); + assert_eq!( + None, + Queue::find_task_by_id_query(&queue_pooled_connection, task2.id) + ); + + Ok(()) + }); + } + + #[test] + fn remove_task() { + let task1 = PepeTask { number: 10 }; + let task2 = PepeTask { number: 11 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap(); + + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some()); + + Queue::remove_task_query(&queue_pooled_connection, task1.id).unwrap(); + + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_none()); + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some()); + + Ok(()) + }); + } + + #[test] + fn remove_task_of_type() { + let task1 = PepeTask { number: 10 }; + let task2 = AyratTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let queue_pooled_connection = queue.connection_pool.get().unwrap(); + + queue_pooled_connection.test_transaction::<(), Error, _>(|| { + let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap(); + + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some()); + + Queue::remove_tasks_of_type_query(&queue_pooled_connection, "weirdo").unwrap(); + + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some()); + assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_none()); + + Ok(()) + }); } } diff --git a/src/blocking/runnable.rs b/src/blocking/runnable.rs new file mode 100644 index 0000000..1d787cb --- /dev/null +++ b/src/blocking/runnable.rs @@ -0,0 +1,22 @@ +use crate::queue::Queueable; +use crate::Error; +use crate::Scheduled; + +pub const COMMON_TYPE: &str = "common"; + +#[typetag::serde(tag = "type")] +pub trait Runnable { + fn run(&self, _queueable: &dyn Queueable) -> Result<(), Error>; + + fn task_type(&self) -> String { + COMMON_TYPE.to_string() + } + + fn uniq(&self) -> bool { + false + } + + fn cron(&self) -> Option { + None + } +} diff --git a/src/blocking/scheduler.rs b/src/blocking/scheduler.rs deleted file mode 100644 index 994c7d4..0000000 --- a/src/blocking/scheduler.rs +++ /dev/null @@ -1,125 +0,0 @@ -use crate::executor::Runnable; -use crate::queue::Queue; -use crate::PeriodicTask; -use std::thread; -use std::time::Duration; - -pub struct Scheduler { - pub check_period: Duration, - pub error_margin: Duration, - pub queue: Queue, -} - -impl Drop for Scheduler { - fn drop(&mut self) { - Scheduler::start(self.check_period, self.error_margin) - } -} - -impl Scheduler { - pub fn start(check_period: Duration, error_margin: Duration) { - let queue = Queue::new(); - let builder = thread::Builder::new().name("scheduler".to_string()); - - builder - .spawn(move || { - let scheduler = Self::new(check_period, error_margin, queue); - - scheduler.schedule_loop(); - }) - .unwrap(); - } - - pub fn new(check_period: Duration, error_margin: Duration, queue: Queue) -> Self { - Self { - check_period, - queue, - error_margin, - } - } - - pub fn schedule_loop(&self) { - loop { - self.schedule(); - - thread::sleep(self.check_period); - } - } - - pub fn schedule(&self) { - if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin) { - for task in tasks { - self.process_task(task); - } - }; - } - - fn process_task(&self, task: PeriodicTask) { - match task.scheduled_at { - None => { - self.queue.schedule_next_task_execution(&task).unwrap(); - } - Some(_) => { - let actual_task: Box = - serde_json::from_value(task.metadata.clone()).unwrap(); - - self.queue.push_task(&(*actual_task)).unwrap(); - - self.queue.schedule_next_task_execution(&task).unwrap(); - } - } - } -} - -#[cfg(test)] -mod task_scheduler_tests { - use super::Scheduler; - use crate::executor::Error; - use crate::executor::Runnable; - use crate::queue::Queue; - use crate::schema::fang_tasks; - use crate::typetag; - use crate::Task; - use diesel::pg::PgConnection; - use diesel::prelude::*; - use serde::{Deserialize, Serialize}; - use std::thread; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct ScheduledTask {} - - #[typetag::serde] - impl Runnable for ScheduledTask { - fn run(&self, _connection: &PgConnection) -> Result<(), Error> { - Ok(()) - } - - fn task_type(&self) -> String { - "schedule".to_string() - } - } - - #[test] - #[ignore] - fn schedules_tasks() { - let queue = Queue::new(); - - queue.push_periodic_task(&ScheduledTask {}, 10000).unwrap(); - Scheduler::start(Duration::from_secs(1), Duration::from_secs(2)); - - let sleep_duration = Duration::from_secs(15); - thread::sleep(sleep_duration); - - let tasks = get_all_tasks(&queue.connection); - - assert_eq!(1, tasks.len()); - } - - fn get_all_tasks(conn: &PgConnection) -> Vec { - fang_tasks::table - .filter(fang_tasks::task_type.eq("schedule")) - .get_results::(conn) - .unwrap() - } -} diff --git a/src/blocking/schema.rs b/src/blocking/schema.rs index 620a6f4..d9a2d89 100644 --- a/src/blocking/schema.rs +++ b/src/blocking/schema.rs @@ -24,17 +24,8 @@ table! { error_message -> Nullable, state -> FangTaskStateMapping, task_type -> Varchar, - created_at -> Timestamptz, - updated_at -> Timestamptz, - } -} - -table! { - fang_periodic_tasks (id) { - id -> Uuid, - metadata -> Jsonb, - period_in_millis -> Int8, - scheduled_at -> Nullable, + uniq_hash -> Nullable, + scheduled_at -> Timestamptz, created_at -> Timestamptz, updated_at -> Timestamptz, } diff --git a/src/blocking/worker.rs b/src/blocking/worker.rs new file mode 100644 index 0000000..67f7255 --- /dev/null +++ b/src/blocking/worker.rs @@ -0,0 +1,330 @@ +use crate::error::FangError; +use crate::queue::Queueable; +use crate::queue::Task; +use crate::runnable::Runnable; +use crate::runnable::COMMON_TYPE; +use crate::schema::FangTaskState; +use crate::Scheduled::*; +use crate::{RetentionMode, SleepParams}; +use log::error; +use std::thread; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder)] +pub struct Worker +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + #[builder(setter(into))] + pub queue: BQueue, + #[builder(default=COMMON_TYPE.to_string(), setter(into))] + pub task_type: String, + #[builder(default, setter(into))] + pub sleep_params: SleepParams, + #[builder(default, setter(into))] + pub retention_mode: RetentionMode, +} + +#[derive(Debug)] +pub struct Error { + pub description: String, +} + +impl Worker +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + pub fn run(&self, task: Task) { + let result = self.execute_task(task); + self.finalize_task(result) + } + + pub fn run_tasks(&mut self) -> Result<(), FangError> { + loop { + match self.queue.fetch_and_touch_task(self.task_type.clone()) { + Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task)?; + } + + self.maybe_reset_sleep_period(); + self.run(task); + } + Ok(None) => { + self.sleep(); + } + + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep(); + } + }; + } + } + + #[cfg(test)] + pub fn run_tasks_until_none(&mut self) -> Result<(), FangError> { + loop { + match self.queue.fetch_and_touch_task(self.task_type.clone()) { + Ok(Some(task)) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + self.queue.schedule_task(&*actual_task)?; + } + + self.maybe_reset_sleep_period(); + self.run(task); + } + Ok(None) => { + return Ok(()); + } + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep(); + } + }; + } + } + + pub fn maybe_reset_sleep_period(&mut self) { + self.sleep_params.maybe_reset_sleep_period(); + } + + pub fn sleep(&mut self) { + self.sleep_params.maybe_increase_sleep_period(); + + thread::sleep(self.sleep_params.sleep_period); + } + + fn execute_task(&self, task: Task) -> Result { + let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); + let task_result = actual_task.run(&self.queue); + + match task_result { + Ok(()) => Ok(task), + Err(error) => Err((task, error.description)), + } + } + + fn finalize_task(&self, result: Result) { + match self.retention_mode { + RetentionMode::KeepAll => { + match result { + Ok(task) => self + .queue + .update_task_state(&task, FangTaskState::Finished) + .unwrap(), + Err((task, error)) => self.queue.fail_task(&task, error).unwrap(), + }; + } + RetentionMode::RemoveAll => { + match result { + Ok(task) => self.queue.remove_task(task.id).unwrap(), + Err((task, _error)) => self.queue.remove_task(task.id).unwrap(), + }; + } + RetentionMode::RemoveFinished => match result { + Ok(task) => { + self.queue.remove_task(task.id).unwrap(); + } + Err((task, error)) => { + self.queue.fail_task(&task, error).unwrap(); + } + }, + } + } +} + +#[cfg(test)] +mod worker_tests { + use super::Error; + use super::RetentionMode; + use super::Runnable; + use super::Worker; + use crate::queue::Queue; + use crate::queue::Queueable; + use crate::schema::FangTaskState; + use crate::typetag; + use chrono::Utc; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + struct WorkerTaskTest { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for WorkerTaskTest { + fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + println!("the number is {}", self.number); + + Ok(()) + } + + fn task_type(&self) -> String { + "worker_task".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct FailedTask { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for FailedTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + let message = format!("the number is {}", self.number); + + Err(Error { + description: message, + }) + } + + fn task_type(&self) -> String { + "F_task".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct TaskType1 {} + + #[typetag::serde] + impl Runnable for TaskType1 { + fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + Ok(()) + } + + fn task_type(&self) -> String { + "type1".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct TaskType2 {} + + #[typetag::serde] + impl Runnable for TaskType2 { + fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + Ok(()) + } + + fn task_type(&self) -> String { + "type2".to_string() + } + } + + // Worker tests has to commit because the worker operations commits + #[test] + #[ignore] + fn executes_and_finishes_task() { + let task = WorkerTaskTest { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let worker = Worker::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .task_type(task.task_type()) + .build(); + let pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task.state); + + // this operation commits and thats why need to commit this test + worker.run(task.clone()); + + let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Finished, found_task.state); + + Queue::remove_tasks_of_type_query(&pooled_connection, "worker_task").unwrap(); + } + + #[test] + #[ignore] + fn executes_task_only_of_specific_type() { + let task1 = TaskType1 {}; + let task2 = TaskType2 {}; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut worker = Worker::::builder() + .queue(queue) + .task_type(task1.task_type()) + .retention_mode(RetentionMode::KeepAll) + .build(); + + let pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task1 = Queue::insert_query(&pooled_connection, &task1, Utc::now()).unwrap(); + let task2 = Queue::insert_query(&pooled_connection, &task2, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task1.state); + assert_eq!(FangTaskState::New, task2.state); + + worker.run_tasks_until_none().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + + let found_task1 = Queue::find_task_by_id_query(&pooled_connection, task1.id).unwrap(); + assert_eq!(FangTaskState::Finished, found_task1.state); + + let found_task2 = Queue::find_task_by_id_query(&pooled_connection, task2.id).unwrap(); + assert_eq!(FangTaskState::New, found_task2.state); + + Queue::remove_tasks_of_type_query(&pooled_connection, "type1").unwrap(); + Queue::remove_tasks_of_type_query(&pooled_connection, "type2").unwrap(); + } + + #[test] + #[ignore] + fn saves_error_for_failed_task() { + let task = FailedTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let worker = Worker::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .task_type(task.task_type()) + .build(); + + let pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task.state); + + worker.run(task.clone()); + + let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Failed, found_task.state); + assert_eq!( + "the number is 10".to_string(), + found_task.error_message.unwrap() + ); + + Queue::remove_tasks_of_type_query(&pooled_connection, "F_task").unwrap(); + } +} diff --git a/src/blocking/worker_pool.rs b/src/blocking/worker_pool.rs index 651c787..1bfada8 100644 --- a/src/blocking/worker_pool.rs +++ b/src/blocking/worker_pool.rs @@ -1,36 +1,38 @@ -use crate::diesel::r2d2; -use crate::diesel::PgConnection; use crate::error::FangError; -use crate::executor::Executor; -use crate::queue::Queue; -use crate::{RetentionMode, SleepParams}; +use crate::queue::Queueable; +use crate::worker::Worker; +use crate::RetentionMode; +use crate::SleepParams; use log::error; use log::info; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; use std::thread; +use typed_builder::TypedBuilder; -#[derive(Clone)] -pub struct WorkerPool { +#[derive(Clone, TypedBuilder)] +pub struct WorkerPool +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + #[builder(setter(into))] pub number_of_workers: u32, - pub worker_params: WorkerParams, - pub connection_pool: r2d2::Pool>, - shared_state: SharedState, - thread_join_handles: Arc>>>, + #[builder(setter(into))] + pub queue: BQueue, + #[builder(setter(into), default)] + pub task_type: String, + #[builder(setter(into), default)] + pub sleep_params: SleepParams, + #[builder(setter(into), default)] + pub retention_mode: RetentionMode, } -pub struct WorkerThread { +#[derive(Clone, TypedBuilder)] +pub struct WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ pub name: String, pub restarts: u64, - pub worker_pool: WorkerPool, - graceful_shutdown: bool, -} - -pub type SharedState = Arc>; - -pub enum WorkerState { - Running, - Shutdown, + pub worker_pool: WorkerPool, } #[derive(Clone)] @@ -40,342 +42,73 @@ pub struct WorkerParams { pub task_type: Option, } -impl Default for WorkerParams { - fn default() -> Self { - Self::new() - } -} - -impl WorkerParams { - pub fn new() -> Self { - Self { - retention_mode: None, - sleep_params: None, - task_type: None, - } - } - - pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { - self.retention_mode = Some(retention_mode); - } - - pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { - self.sleep_params = Some(sleep_params); - } - - pub fn set_task_type(&mut self, task_type: String) { - self.task_type = Some(task_type); - } -} - -impl WorkerPool { - pub fn new(number_of_workers: u32) -> Self { - let worker_params = WorkerParams::new(); - let connection_pool = Queue::connection_pool(number_of_workers); - - Self { - number_of_workers, - worker_params, - connection_pool, - shared_state: Arc::new(RwLock::new(WorkerState::Running)), - thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( - number_of_workers as usize, - ))), - } - } - - pub fn new_with_params(number_of_workers: u32, worker_params: WorkerParams) -> Self { - let connection_pool = Queue::connection_pool(number_of_workers); - - Self { - number_of_workers, - worker_params, - connection_pool, - shared_state: Arc::new(RwLock::new(WorkerState::Running)), - thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( - number_of_workers as usize, - ))), - } - } - +impl WorkerPool +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ pub fn start(&mut self) -> Result<(), FangError> { for idx in 1..self.number_of_workers + 1 { - let worker_type = self - .worker_params - .task_type - .clone() - .unwrap_or_else(|| "".to_string()); - let name = format!("worker_{}{}", worker_type, idx); - WorkerThread::spawn_in_pool(name.clone(), 0, self.clone())?; - } - Ok(()) - } + let name = format!("worker_{}{}", self.task_type, idx); - /// Attempt graceful shutdown of each job thread, blocks until all threads exit. Threads exit - /// when their current job finishes. - pub fn shutdown(&mut self) -> Result<(), FangError> { - *self.shared_state.write()? = WorkerState::Shutdown; + let worker_thread = WorkerThread::builder() + .name(name.clone()) + .restarts(0) + .worker_pool(self.clone()) + .build(); - for (worker_name, thread) in self.thread_join_handles.write()?.drain() { - if let Err(err) = thread.join() { - error!( - "Failed to exit executor thread '{}' cleanly: {:?}", - worker_name, err - ); - } + worker_thread.spawn()?; } Ok(()) } } -impl WorkerThread { - pub fn new(name: String, restarts: u64, worker_pool: WorkerPool) -> Self { - Self { - name, - restarts, - worker_pool, - graceful_shutdown: false, - } - } - - pub fn spawn_in_pool( - name: String, - restarts: u64, - worker_pool: WorkerPool, - ) -> Result<(), FangError> { +impl WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ + fn spawn(self) -> Result<(), FangError> { info!( "starting a worker thread {}, number of restarts {}", - name, restarts + self.name, self.restarts ); - let job = WorkerThread::new(name.clone(), restarts, worker_pool.clone()); - let join_handle = Self::spawn_thread(name.clone(), job)?; - worker_pool - .thread_join_handles - .write()? - .insert(name, join_handle); - Ok(()) - } + let builder = thread::Builder::new().name(self.name.clone()); - fn spawn_thread( - name: String, - mut job: WorkerThread, - ) -> Result, FangError> { - let builder = thread::Builder::new().name(name.clone()); builder .spawn(move || { - match job.worker_pool.connection_pool.get() { - Ok(connection) => { - let mut executor = Executor::new(connection); - executor.set_shared_state(job.worker_pool.shared_state.clone()); + let mut worker: Worker = Worker::builder() + .queue(self.worker_pool.queue.clone()) + .task_type(self.worker_pool.task_type.clone()) + .retention_mode(self.worker_pool.retention_mode.clone()) + .sleep_params(self.worker_pool.sleep_params.clone()) + .build(); - if let Some(ref task_type_str) = job.worker_pool.worker_params.task_type { - executor.set_task_type(task_type_str.to_owned()); - } - - if let Some(ref retention_mode) = - job.worker_pool.worker_params.retention_mode - { - executor.set_retention_mode(retention_mode.to_owned()); - } - - if let Some(ref sleep_params) = job.worker_pool.worker_params.sleep_params { - executor.set_sleep_params(sleep_params.clone()); - } - - // Run executor - match executor.run_tasks() { - Ok(_) => { - job.graceful_shutdown = true; - } - Err(error) => { - error!("Error executing tasks in worker '{}': {:?}", name, error); - } - } - } - Err(error) => { - error!("Failed to get postgres connection: {:?}", error); - } + // Run worker + if let Err(error) = worker.run_tasks() { + error!( + "Error executing tasks in worker '{}': {:?}", + self.name, error + ); } }) - .map_err(FangError::from) + .map_err(FangError::from)?; + + Ok(()) } } -impl Drop for WorkerThread { +impl Drop for WorkerThread +where + BQueue: Queueable + Clone + Sync + Send + 'static, +{ fn drop(&mut self) { - if self.graceful_shutdown { - return; - } + self.restarts += 1; - WorkerThread::spawn_in_pool( - self.name.clone(), - self.restarts + 1, - self.worker_pool.clone(), - ) - .unwrap(); - } -} - -#[cfg(test)] -mod task_pool_tests { - use super::WorkerParams; - use super::WorkerPool; - use crate::executor::Error; - use crate::executor::Runnable; - use crate::queue::Queue; - use crate::schema::{fang_tasks, FangTaskState}; - use crate::typetag; - use crate::RetentionMode; - use crate::Task; - use diesel::pg::PgConnection; - use diesel::prelude::*; - use serde::{Deserialize, Serialize}; - use std::thread; - use std::time::Duration; - - #[derive(Serialize, Deserialize)] - struct MyTask { - pub number: u16, - pub current_thread_name: String, - } - - impl MyTask { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } - } - - #[typetag::serde] - impl Runnable for MyTask { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_task = MyTask::new(self.number + 1); - - Queue::push_task_query(connection, &new_task).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "worker_pool_test".to_string() - } - } - - #[derive(Serialize, Deserialize)] - struct ShutdownTask { - pub number: u16, - pub current_thread_name: String, - } - - impl ShutdownTask { - pub fn new(number: u16) -> Self { - let handle = thread::current(); - let current_thread_name = handle.name().unwrap().to_string(); - - Self { - number, - current_thread_name, - } - } - } - - #[typetag::serde] - impl Runnable for ShutdownTask { - fn run(&self, connection: &PgConnection) -> Result<(), Error> { - thread::sleep(Duration::from_secs(3)); - - let new_task = MyTask::new(self.number + 1); - - Queue::push_task_query(connection, &new_task).unwrap(); - - Ok(()) - } - - fn task_type(&self) -> String { - "shutdown_test".to_string() - } - } - - fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec { - fang_tasks::table - .filter(fang_tasks::task_type.eq(job_type)) - .get_results::(conn) - .unwrap() - } - - // Following tests ignored because they commit data to the db - #[test] - #[ignore] - fn tasks_are_finished_on_shutdown() { - let queue = Queue::new(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut task_pool = WorkerPool::new_with_params(2, worker_params); - - queue.push_task(&ShutdownTask::new(100)).unwrap(); - queue.push_task(&ShutdownTask::new(200)).unwrap(); - - task_pool.start().unwrap(); - thread::sleep(Duration::from_secs(1)); - task_pool.shutdown().unwrap(); - thread::sleep(Duration::from_secs(5)); - - let tasks = get_all_tasks(&queue.connection, "shutdown_test"); - let in_progress_tasks = tasks - .iter() - .filter(|task| task.state == FangTaskState::InProgress); - let finished_tasks = tasks - .iter() - .filter(|task| task.state == FangTaskState::Finished); - - // Asserts first two tasks are allowed to finish, the tasks they spawn are not started - // though. No tasks should be in progress after a graceful shutdown. - assert_eq!(in_progress_tasks.count(), 0); - assert_eq!(finished_tasks.count(), 2); - } - - #[test] - #[ignore] - fn tasks_are_split_between_two_threads() { - let queue = Queue::new(); - - let mut worker_params = WorkerParams::new(); - worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut task_pool = WorkerPool::new_with_params(2, worker_params); - - queue.push_task(&MyTask::new(100)).unwrap(); - queue.push_task(&MyTask::new(200)).unwrap(); - - task_pool.start().unwrap(); - - thread::sleep(Duration::from_secs(100)); - - let tasks = get_all_tasks(&queue.connection, "worker_pool_test"); - - assert!(tasks.len() > 40); - - let test_worker1_tasks = tasks.clone().into_iter().filter(|task| { - serde_json::to_string(&task.metadata) - .unwrap() - .contains("worker_1") - }); - - let test_worker2_tasks = tasks.into_iter().filter(|task| { - serde_json::to_string(&task.metadata) - .unwrap() - .contains("worker_2") - }); - - assert!(test_worker1_tasks.count() > 20); - assert!(test_worker2_tasks.count() > 20); + error!( + "Worker {} stopped. Restarting. The number of restarts {}", + self.name, self.restarts, + ); + + self.clone().spawn().unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index f099c68..fe0f7c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,22 @@ #![allow(clippy::extra_unused_lifetimes)] use std::time::Duration; +use thiserror::Error; + +pub enum Scheduled { + CronPattern(String), + ScheduleOnce(DateTime), +} + +#[derive(Debug, Error)] +pub enum CronError { + #[error(transparent)] + LibraryError(#[from] cron::error::Error), + #[error("You have to implement method `cron()` in your AsyncRunnable")] + TaskNotSchedulableError, + #[error("No timestamps match with this cron pattern")] + NoTimestampsError, +} #[derive(Clone, Debug)] pub enum RetentionMode {