Merge pull request #69 from ayrat555/ayrat555/change-schema
Change schema to improve scheduler
This commit is contained in:
commit
2b0eb0627d
55 changed files with 1948 additions and 2242 deletions
20
Cargo.toml
20
Cargo.toml
|
@ -14,26 +14,29 @@ rust-version = "1.62"
|
|||
[features]
|
||||
default = ["blocking", "asynk"]
|
||||
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
||||
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"]
|
||||
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "async-recursion"]
|
||||
|
||||
[dependencies]
|
||||
cron = "0.11"
|
||||
chrono = "0.4"
|
||||
hex = "0.4"
|
||||
log = "0.4"
|
||||
serde = "1"
|
||||
serde_derive = "1.0.141"
|
||||
serde_json = "1"
|
||||
sha2 = "0.10"
|
||||
thiserror = "1.0"
|
||||
typed-builder = "0.10"
|
||||
typetag = "0.2"
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
|
||||
uuid = { version = "1.1", features = ["v4"] }
|
||||
|
||||
[dependencies.diesel]
|
||||
version = "1.4"
|
||||
features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"]
|
||||
version = "2.0"
|
||||
features = ["postgres", "serde_json", "chrono", "uuid", "r2d2"]
|
||||
optional = true
|
||||
|
||||
[dependencies.diesel-derive-enum]
|
||||
version = "1"
|
||||
version = "2.0.0-rc.0"
|
||||
features = ["postgres"]
|
||||
optional = true
|
||||
|
||||
|
@ -43,7 +46,7 @@ optional = true
|
|||
|
||||
[dependencies.bb8-postgres]
|
||||
version = "0.8"
|
||||
features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"]
|
||||
features = ["with-serde_json-1" , "with-uuid-1" , "with-chrono-0_4"]
|
||||
optional = true
|
||||
|
||||
[dependencies.postgres-types]
|
||||
|
@ -60,9 +63,6 @@ optional = true
|
|||
version = "0.1"
|
||||
optional = true
|
||||
|
||||
[dependencies.typed-builder]
|
||||
version = "0.10"
|
||||
optional = true
|
||||
|
||||
[dependencies.async-recursion]
|
||||
version = "1"
|
||||
|
|
17
Makefile
Normal file
17
Makefile
Normal file
|
@ -0,0 +1,17 @@
|
|||
db:
|
||||
docker run --rm -d --name postgres -p 5432:5432 \
|
||||
-e POSTGRES_DB=fang \
|
||||
-e POSTGRES_USER=postgres \
|
||||
-e POSTGRES_PASSWORD=postgres \
|
||||
postgres:latest
|
||||
clippy:
|
||||
cargo clippy --all-features
|
||||
diesel:
|
||||
DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run
|
||||
stop:
|
||||
docker kill postgres
|
||||
tests:
|
||||
DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture
|
||||
|
||||
ignored:
|
||||
DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture --ignored
|
|
@ -1,5 +0,0 @@
|
|||
# For documentation on how to configure this file,
|
||||
# see diesel.rs/guides/configuring-diesel-cli
|
||||
|
||||
[print_schema]
|
||||
file = "src/schema.rs"
|
12
fang_examples/asynk/simple_async_cron_worker/Cargo.toml
Normal file
12
fang_examples/asynk/simple_async_cron_worker/Cargo.toml
Normal file
|
@ -0,0 +1,12 @@
|
|||
[package]
|
||||
name = "simple_async_cron_worker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
fang = { path = "../../../" , features = ["asynk"]}
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.0"
|
||||
tokio = { version = "1", features = ["full"] }
|
33
fang_examples/asynk/simple_async_cron_worker/src/lib.rs
Normal file
33
fang_examples/asynk/simple_async_cron_worker/src/lib.rs
Normal file
|
@ -0,0 +1,33 @@
|
|||
use fang::async_trait;
|
||||
use fang::asynk::async_queue::AsyncQueueable;
|
||||
use fang::asynk::async_runnable::Error;
|
||||
use fang::serde::{Deserialize, Serialize};
|
||||
use fang::typetag;
|
||||
use fang::AsyncRunnable;
|
||||
use fang::Scheduled;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(crate = "fang::serde")]
|
||||
pub struct MyCronTask {}
|
||||
|
||||
#[async_trait]
|
||||
#[typetag::serde]
|
||||
impl AsyncRunnable for MyCronTask {
|
||||
async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
log::info!("CRON!!!!!!!!!!!!!!!",);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
// sec min hour day of month month day of week year
|
||||
// be careful works only with UTC hour.
|
||||
// https://www.timeanddate.com/worldclock/timezone/utc
|
||||
let expression = "0/20 * * * Aug-Sep * 2022/1";
|
||||
Some(Scheduled::CronPattern(expression.to_string()))
|
||||
}
|
||||
|
||||
fn uniq(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
41
fang_examples/asynk/simple_async_cron_worker/src/main.rs
Normal file
41
fang_examples/asynk/simple_async_cron_worker/src/main.rs
Normal file
|
@ -0,0 +1,41 @@
|
|||
use fang::asynk::async_queue::AsyncQueue;
|
||||
use fang::asynk::async_queue::AsyncQueueable;
|
||||
use fang::asynk::async_worker_pool::AsyncWorkerPool;
|
||||
use fang::AsyncRunnable;
|
||||
use fang::NoTls;
|
||||
use simple_async_cron_worker::MyCronTask;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::init();
|
||||
|
||||
log::info!("Starting...");
|
||||
let max_pool_size: u32 = 3;
|
||||
let mut queue = AsyncQueue::builder()
|
||||
.uri("postgres://postgres:postgres@localhost/fang")
|
||||
.max_pool_size(max_pool_size)
|
||||
.build();
|
||||
|
||||
queue.connect(NoTls).await.unwrap();
|
||||
log::info!("Queue connected...");
|
||||
|
||||
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
|
||||
.number_of_workers(10_u32)
|
||||
.queue(queue.clone())
|
||||
.build();
|
||||
|
||||
log::info!("Pool created ...");
|
||||
|
||||
pool.start().await;
|
||||
log::info!("Workers started ...");
|
||||
|
||||
let task = MyCronTask {};
|
||||
|
||||
queue
|
||||
.schedule_task(&task as &dyn AsyncRunnable)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(100)).await;
|
||||
}
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
fang = { path = "../../" , features = ["asynk"]}
|
||||
fang = { path = "../../../" , features = ["asynk"]}
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.0"
|
||||
tokio = { version = "1", features = ["full"] }
|
|
@ -12,11 +12,10 @@ async fn main() {
|
|||
env_logger::init();
|
||||
|
||||
log::info!("Starting...");
|
||||
let max_pool_size: u32 = 2;
|
||||
let max_pool_size: u32 = 3;
|
||||
let mut queue = AsyncQueue::builder()
|
||||
.uri("postgres://postgres:postgres@localhost/fang")
|
||||
.max_pool_size(max_pool_size)
|
||||
.duplicated_tasks(true)
|
||||
.build();
|
||||
|
||||
queue.connect(NoTls).await.unwrap();
|
||||
|
@ -40,6 +39,7 @@ async fn main() {
|
|||
.insert_task(&task1 as &dyn AsyncRunnable)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
queue
|
||||
.insert_task(&task2 as &dyn AsyncRunnable)
|
||||
.await
|
13
fang_examples/blocking/simple_cron_worker/Cargo.toml
Normal file
13
fang_examples/blocking/simple_cron_worker/Cargo.toml
Normal file
|
@ -0,0 +1,13 @@
|
|||
[package]
|
||||
name = "simple_cron_worker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
fang = { path = "../../../" , features = ["blocking"]}
|
||||
dotenv = "0.15.0"
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.0"
|
||||
diesel = { version = "2", features = ["postgres", "r2d2"] }
|
35
fang_examples/blocking/simple_cron_worker/src/lib.rs
Normal file
35
fang_examples/blocking/simple_cron_worker/src/lib.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
use fang::runnable::Runnable;
|
||||
use fang::serde::{Deserialize, Serialize};
|
||||
use fang::typetag;
|
||||
use fang::FangError;
|
||||
use fang::Queueable;
|
||||
use fang::Scheduled;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(crate = "fang::serde")]
|
||||
pub struct MyCronTask {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for MyCronTask {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
log::info!("CRON !!!!!!!!!!!!!!!!!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"cron_test".to_string()
|
||||
}
|
||||
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
// sec min hour day of month month day of week year
|
||||
// be careful works only with UTC hour.
|
||||
// https://www.timeanddate.com/worldclock/timezone/utc
|
||||
let expression = "0/20 * * * Aug-Sep * 2022/1";
|
||||
Some(Scheduled::CronPattern(expression.to_string()))
|
||||
}
|
||||
|
||||
fn uniq(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
43
fang_examples/blocking/simple_cron_worker/src/main.rs
Normal file
43
fang_examples/blocking/simple_cron_worker/src/main.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
use diesel::r2d2;
|
||||
use dotenv::dotenv;
|
||||
use fang::PgConnection;
|
||||
use fang::Queue;
|
||||
use fang::Queueable;
|
||||
use fang::RetentionMode;
|
||||
use fang::WorkerPool;
|
||||
use simple_cron_worker::MyCronTask;
|
||||
use std::env;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
|
||||
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
|
||||
let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||
|
||||
r2d2::Pool::builder()
|
||||
.max_size(pool_size)
|
||||
.build(manager)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
dotenv().ok();
|
||||
|
||||
env_logger::init();
|
||||
|
||||
let queue = Queue::builder().connection_pool(connection_pool(2)).build();
|
||||
|
||||
let mut worker_pool = WorkerPool::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.number_of_workers(2_u32)
|
||||
.task_type("cron_test".to_string())
|
||||
.build();
|
||||
|
||||
worker_pool.queue.schedule_task(&MyCronTask {}).unwrap();
|
||||
|
||||
worker_pool.start().unwrap();
|
||||
|
||||
sleep(Duration::from_secs(100))
|
||||
}
|
2
fang_examples/blocking/simple_worker/.gitignore
vendored
Normal file
2
fang_examples/blocking/simple_worker/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
|
@ -1,12 +1,13 @@
|
|||
[package]
|
||||
name = "simple_worker"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
fang = { path = "../../" , features = ["blocking"]}
|
||||
signal-hook = "0.3.10"
|
||||
fang = { path = "../../../" , features = ["blocking"]}
|
||||
dotenv = "0.15.0"
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.0"
|
||||
diesel = { version = "2", features = ["postgres", "r2d2"] }
|
3
fang_examples/blocking/simple_worker/README.md
Normal file
3
fang_examples/blocking/simple_worker/README.md
Normal file
|
@ -0,0 +1,3 @@
|
|||
## Simple example
|
||||
|
||||
The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata.
|
96
fang_examples/blocking/simple_worker/src/lib.rs
Normal file
96
fang_examples/blocking/simple_worker/src/lib.rs
Normal file
|
@ -0,0 +1,96 @@
|
|||
use fang::runnable::Runnable;
|
||||
use fang::serde::{Deserialize, Serialize};
|
||||
use fang::typetag;
|
||||
use fang::FangError;
|
||||
use fang::Queueable;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(crate = "fang::serde")]
|
||||
pub struct MyTask {
|
||||
pub number: u16,
|
||||
pub current_thread_name: String,
|
||||
}
|
||||
|
||||
impl MyTask {
|
||||
pub fn new(number: u16) -> Self {
|
||||
let handle = thread::current();
|
||||
let current_thread_name = handle.name().unwrap().to_string();
|
||||
|
||||
Self {
|
||||
number,
|
||||
current_thread_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for MyTask {
|
||||
fn run(&self, queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
let new_task = MyTask::new(self.number + 1);
|
||||
|
||||
log::info!(
|
||||
"The number is {}, thread name {}",
|
||||
self.number,
|
||||
self.current_thread_name
|
||||
);
|
||||
|
||||
queue.insert_task(&new_task).unwrap();
|
||||
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"worker_pool_test".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(crate = "fang::serde")]
|
||||
pub struct MyFailingTask {
|
||||
pub number: u16,
|
||||
pub current_thread_name: String,
|
||||
}
|
||||
|
||||
impl MyFailingTask {
|
||||
pub fn new(number: u16) -> Self {
|
||||
let handle = thread::current();
|
||||
let current_thread_name = handle.name().unwrap().to_string();
|
||||
Self {
|
||||
number,
|
||||
current_thread_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for MyFailingTask {
|
||||
fn run(&self, queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
let new_task = MyFailingTask::new(self.number + 1);
|
||||
|
||||
queue.insert_task(&new_task).unwrap();
|
||||
|
||||
log::info!(
|
||||
"Failing task number {}, Thread name:{}",
|
||||
self.number,
|
||||
self.current_thread_name
|
||||
);
|
||||
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let b = true;
|
||||
|
||||
if b {
|
||||
panic!("Hello!");
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"worker_pool_test".to_string()
|
||||
}
|
||||
}
|
50
fang_examples/blocking/simple_worker/src/main.rs
Normal file
50
fang_examples/blocking/simple_worker/src/main.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
use diesel::r2d2;
|
||||
use dotenv::dotenv;
|
||||
use fang::PgConnection;
|
||||
use fang::Queue;
|
||||
use fang::Queueable;
|
||||
use fang::RetentionMode;
|
||||
use fang::WorkerPool;
|
||||
use simple_worker::MyFailingTask;
|
||||
use simple_worker::MyTask;
|
||||
use std::env;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
|
||||
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
|
||||
let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||
|
||||
r2d2::Pool::builder()
|
||||
.max_size(pool_size)
|
||||
.build(manager)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
dotenv().ok();
|
||||
|
||||
env_logger::init();
|
||||
|
||||
let queue = Queue::builder().connection_pool(connection_pool(3)).build();
|
||||
|
||||
let mut worker_pool = WorkerPool::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.number_of_workers(3_u32)
|
||||
.task_type("worker_pool_test".to_string())
|
||||
.build();
|
||||
|
||||
worker_pool.queue.insert_task(&MyTask::new(1)).unwrap();
|
||||
worker_pool.queue.insert_task(&MyTask::new(1000)).unwrap();
|
||||
|
||||
worker_pool
|
||||
.queue
|
||||
.insert_task(&MyFailingTask::new(5000))
|
||||
.unwrap();
|
||||
|
||||
worker_pool.start().unwrap();
|
||||
|
||||
sleep(Duration::from_secs(100))
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
use fang::serde::{Deserialize, Serialize};
|
||||
use fang::typetag;
|
||||
use fang::Error;
|
||||
use fang::PgConnection;
|
||||
use fang::Queue;
|
||||
use fang::Runnable;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(crate = "fang::serde")]
|
||||
pub struct MyJob {
|
||||
pub number: u16,
|
||||
pub current_thread_name: String,
|
||||
}
|
||||
|
||||
impl MyJob {
|
||||
pub fn new(number: u16) -> Self {
|
||||
let handle = thread::current();
|
||||
let current_thread_name = handle.name().unwrap().to_string();
|
||||
|
||||
Self {
|
||||
number,
|
||||
current_thread_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for MyJob {
|
||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let new_job = MyJob::new(self.number + 1);
|
||||
|
||||
Queue::push_task_query(connection, &new_job).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"worker_pool_test".to_string()
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
use dotenv::dotenv;
|
||||
use fang::Queue;
|
||||
use fang::RetentionMode;
|
||||
use fang::WorkerParams;
|
||||
use fang::WorkerPool;
|
||||
use signal_hook::{consts::signal::*, iterator::Signals};
|
||||
use simple_worker::MyJob;
|
||||
|
||||
fn main() {
|
||||
dotenv().ok();
|
||||
|
||||
env_logger::init();
|
||||
|
||||
let mut worker_params = WorkerParams::new();
|
||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||
|
||||
let mut worker_pool = WorkerPool::new_with_params(2, worker_params);
|
||||
worker_pool.start().unwrap();
|
||||
|
||||
let queue = Queue::new();
|
||||
|
||||
queue.push_task(&MyJob::new(1)).unwrap();
|
||||
queue.push_task(&MyJob::new(1000)).unwrap();
|
||||
|
||||
let mut signals = Signals::new(&[SIGINT, SIGTERM]).unwrap();
|
||||
for sig in signals.forever() {
|
||||
match sig {
|
||||
SIGINT => {
|
||||
println!("Received SIGINT");
|
||||
worker_pool.shutdown().unwrap();
|
||||
break;
|
||||
}
|
||||
SIGTERM => {
|
||||
println!("Received SIGTERM");
|
||||
worker_pool.shutdown().unwrap();
|
||||
break;
|
||||
}
|
||||
_ => unreachable!("Received unexpected signal: {:?}", sig),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
DROP TABLE fang_tasks;
|
||||
DROP TYPE fang_task_state;
|
|
@ -1 +0,0 @@
|
|||
DROP TABLE fang_periodic_tasks;
|
|
@ -1,11 +0,0 @@
|
|||
CREATE TABLE fang_periodic_tasks (
|
||||
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
metadata jsonb NOT NULL,
|
||||
period_in_millis BIGINT NOT NULL,
|
||||
scheduled_at TIMESTAMP WITH TIME ZONE,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);
|
||||
CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
|
1
migrations/2022-08-20-151615_create_fang_tasks/down.sql
Normal file
1
migrations/2022-08-20-151615_create_fang_tasks/down.sql
Normal file
|
@ -0,0 +1 @@
|
|||
DROP TABLE fang_tasks;
|
|
@ -6,13 +6,15 @@ CREATE TABLE fang_tasks (
|
|||
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
metadata jsonb NOT NULL,
|
||||
error_message TEXT,
|
||||
state fang_task_state default 'new' NOT NULL,
|
||||
task_type VARCHAR default 'common' NOT NULL,
|
||||
state fang_task_state DEFAULT 'new' NOT NULL,
|
||||
task_type VARCHAR DEFAULT 'common' NOT NULL,
|
||||
uniq_hash CHAR(64),
|
||||
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
|
||||
CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
|
||||
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
|
||||
CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);
|
||||
CREATE INDEX fang_tasks_scheduled_at_index ON fang_tasks(scheduled_at);
|
||||
CREATE INDEX fang_tasks_uniq_hash ON fang_tasks(uniq_hash);
|
|
@ -1,8 +1,9 @@
|
|||
pub mod async_queue;
|
||||
pub mod async_runnable;
|
||||
pub mod async_scheduler;
|
||||
pub mod async_worker;
|
||||
pub mod async_worker_pool;
|
||||
|
||||
pub use async_queue::*;
|
||||
pub use async_runnable::AsyncRunnable;
|
||||
pub use async_runnable::Error as AsyncError;
|
||||
pub use async_worker::*;
|
||||
pub use async_worker_pool::*;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::asynk::async_runnable::AsyncRunnable;
|
||||
use crate::asynk::async_runnable::Error as FangError;
|
||||
use crate::CronError;
|
||||
use crate::Scheduled::*;
|
||||
use async_trait::async_trait;
|
||||
use bb8_postgres::bb8::Pool;
|
||||
use bb8_postgres::bb8::RunError;
|
||||
|
@ -9,10 +10,11 @@ use bb8_postgres::tokio_postgres::Socket;
|
|||
use bb8_postgres::tokio_postgres::Transaction;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use cron::Schedule;
|
||||
use postgres_types::{FromSql, ToSql};
|
||||
use std::time::Duration as StdDuration;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::str::FromStr;
|
||||
use thiserror::Error;
|
||||
use typed_builder::TypedBuilder;
|
||||
use uuid::Uuid;
|
||||
|
@ -21,21 +23,17 @@ use uuid::Uuid;
|
|||
use bb8_postgres::tokio_postgres::tls::NoTls;
|
||||
|
||||
const INSERT_TASK_QUERY: &str = include_str!("queries/insert_task.sql");
|
||||
const INSERT_PERIODIC_TASK_QUERY: &str = include_str!("queries/insert_periodic_task.sql");
|
||||
const SCHEDULE_NEXT_TASK_QUERY: &str = include_str!("queries/schedule_next_task.sql");
|
||||
const INSERT_TASK_UNIQ_QUERY: &str = include_str!("queries/insert_task_uniq.sql");
|
||||
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
|
||||
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
|
||||
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
|
||||
const REMOVE_ALL_SCHEDULED_TASK_QUERY: &str =
|
||||
include_str!("queries/remove_all_scheduled_tasks.sql");
|
||||
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
|
||||
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
|
||||
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
|
||||
const FETCH_PERIODIC_TASKS_QUERY: &str = include_str!("queries/fetch_periodic_tasks.sql");
|
||||
const FIND_TASK_BY_METADATA_QUERY: &str = include_str!("queries/find_task_by_metadata.sql");
|
||||
|
||||
#[cfg(test)]
|
||||
const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql");
|
||||
const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql");
|
||||
#[cfg(test)]
|
||||
const FIND_PERIODIC_TASK_BY_ID_QUERY: &str = include_str!("queries/find_periodic_task_by_id.sql");
|
||||
|
||||
pub const DEFAULT_TASK_TYPE: &str = "common";
|
||||
|
||||
|
@ -71,21 +69,9 @@ pub struct Task {
|
|||
#[builder(setter(into))]
|
||||
pub task_type: String,
|
||||
#[builder(setter(into))]
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub uniq_hash: Option<String>,
|
||||
#[builder(setter(into))]
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)]
|
||||
pub struct PeriodicTask {
|
||||
#[builder(setter(into))]
|
||||
pub id: Uuid,
|
||||
#[builder(setter(into))]
|
||||
pub metadata: serde_json::Value,
|
||||
#[builder(setter(into))]
|
||||
pub period_in_millis: i64,
|
||||
#[builder(setter(into))]
|
||||
pub scheduled_at: Option<DateTime<Utc>>,
|
||||
pub scheduled_at: DateTime<Utc>,
|
||||
#[builder(setter(into))]
|
||||
pub created_at: DateTime<Utc>,
|
||||
#[builder(setter(into))]
|
||||
|
@ -100,6 +86,8 @@ pub enum AsyncQueueError {
|
|||
PgError(#[from] bb8_postgres::tokio_postgres::Error),
|
||||
#[error(transparent)]
|
||||
SerdeError(#[from] serde_json::Error),
|
||||
#[error(transparent)]
|
||||
CronError(#[from] CronError),
|
||||
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
|
||||
ResultError { expected: u64, found: u64 },
|
||||
#[error(
|
||||
|
@ -110,12 +98,9 @@ pub enum AsyncQueueError {
|
|||
TimeError,
|
||||
}
|
||||
|
||||
impl From<AsyncQueueError> for FangError {
|
||||
fn from(error: AsyncQueueError) -> Self {
|
||||
let message = format!("{:?}", error);
|
||||
FangError {
|
||||
description: message,
|
||||
}
|
||||
impl From<cron::error::Error> for AsyncQueueError {
|
||||
fn from(error: cron::error::Error) -> Self {
|
||||
AsyncQueueError::CronError(CronError::LibraryError(error))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,12 +112,17 @@ pub trait AsyncQueueable: Send {
|
|||
) -> Result<Option<Task>, AsyncQueueError>;
|
||||
|
||||
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
||||
|
||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
|
||||
|
||||
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;
|
||||
|
||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
|
||||
|
||||
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
|
||||
|
||||
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError>;
|
||||
|
||||
async fn update_task_state(
|
||||
&mut self,
|
||||
task: Task,
|
||||
|
@ -142,22 +132,7 @@ pub trait AsyncQueueable: Send {
|
|||
async fn fail_task(&mut self, task: Task, error_message: &str)
|
||||
-> Result<Task, AsyncQueueError>;
|
||||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError>;
|
||||
|
||||
async fn insert_periodic_task(
|
||||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError>;
|
||||
|
||||
async fn schedule_next_task(
|
||||
&mut self,
|
||||
periodic_task: PeriodicTask,
|
||||
) -> Result<PeriodicTask, AsyncQueueError>;
|
||||
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
||||
}
|
||||
|
||||
#[derive(TypedBuilder, Debug, Clone)]
|
||||
|
@ -174,8 +149,6 @@ where
|
|||
uri: String,
|
||||
#[builder(setter(into))]
|
||||
max_pool_size: u32,
|
||||
#[builder(default = false, setter(into))]
|
||||
duplicated_tasks: bool,
|
||||
#[builder(default = false, setter(skip))]
|
||||
connected: bool,
|
||||
}
|
||||
|
@ -185,47 +158,24 @@ where
|
|||
pub struct AsyncQueueTest<'a> {
|
||||
#[builder(setter(into))]
|
||||
pub transaction: Transaction<'a>,
|
||||
#[builder(default = false, setter(into))]
|
||||
pub duplicated_tasks: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> AsyncQueueTest<'a> {
|
||||
pub async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
||||
let row: Row = self
|
||||
.transaction
|
||||
.query_one(FIND_TASK_BY_ID_QUERY, &[&id])
|
||||
.await?;
|
||||
|
||||
let task = AsyncQueue::<NoTls>::row_to_task(row);
|
||||
Ok(task)
|
||||
}
|
||||
pub async fn find_periodic_task_by_id(
|
||||
&mut self,
|
||||
id: Uuid,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let row: Row = self
|
||||
.transaction
|
||||
.query_one(FIND_PERIODIC_TASK_BY_ID_QUERY, &[&id])
|
||||
.await?;
|
||||
|
||||
let task = AsyncQueue::<NoTls>::row_to_periodic_task(row);
|
||||
Ok(task)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[async_trait]
|
||||
impl AsyncQueueable for AsyncQueueTest<'_> {
|
||||
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
AsyncQueue::<NoTls>::find_task_by_id_query(transaction, id).await
|
||||
}
|
||||
|
||||
async fn fetch_and_touch_task(
|
||||
&mut self,
|
||||
task_type: Option<String>,
|
||||
) -> Result<Option<Task>, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let task = AsyncQueue::<NoTls>::fetch_and_touch_task_query(transaction, task_type).await?;
|
||||
|
||||
Ok(task)
|
||||
AsyncQueue::<NoTls>::fetch_and_touch_task_query(transaction, task_type).await
|
||||
}
|
||||
|
||||
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||
|
@ -233,84 +183,92 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
|
||||
let metadata = serde_json::to_value(task)?;
|
||||
|
||||
let task: Task = if self.duplicated_tasks {
|
||||
AsyncQueue::<NoTls>::insert_task_query(transaction, metadata, &task.task_type()).await?
|
||||
let task: Task = if !task.uniq() {
|
||||
AsyncQueue::<NoTls>::insert_task_query(
|
||||
transaction,
|
||||
metadata,
|
||||
&task.task_type(),
|
||||
Utc::now(),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
|
||||
transaction,
|
||||
metadata,
|
||||
&task.task_type(),
|
||||
Utc::now(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn schedule_next_task(
|
||||
&mut self,
|
||||
periodic_task: PeriodicTask,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let periodic_task =
|
||||
AsyncQueue::<NoTls>::schedule_next_task_query(transaction, periodic_task).await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
}
|
||||
async fn insert_periodic_task(
|
||||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let metadata = serde_json::to_value(task)?;
|
||||
|
||||
let periodic_task = AsyncQueue::<NoTls>::insert_periodic_task_query(
|
||||
let scheduled_at = match task.cron() {
|
||||
Some(scheduled) => match scheduled {
|
||||
CronPattern(cron_pattern) => {
|
||||
let schedule = Schedule::from_str(&cron_pattern)?;
|
||||
let mut iterator = schedule.upcoming(Utc);
|
||||
|
||||
iterator
|
||||
.next()
|
||||
.ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))?
|
||||
}
|
||||
ScheduleOnce(datetime) => datetime,
|
||||
},
|
||||
None => {
|
||||
return Err(AsyncQueueError::CronError(
|
||||
CronError::TaskNotSchedulableError,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let task: Task = if !task.uniq() {
|
||||
AsyncQueue::<NoTls>::insert_task_query(
|
||||
transaction,
|
||||
metadata,
|
||||
timestamp,
|
||||
period,
|
||||
&task.task_type(),
|
||||
scheduled_at,
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
} else {
|
||||
AsyncQueue::<NoTls>::insert_task_if_not_exist_query(
|
||||
transaction,
|
||||
metadata,
|
||||
&task.task_type(),
|
||||
scheduled_at,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
Ok(periodic_task)
|
||||
}
|
||||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let periodic_task =
|
||||
AsyncQueue::<NoTls>::fetch_periodic_tasks_query(transaction, error_margin).await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
Ok(task)
|
||||
}
|
||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let result = AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await?;
|
||||
AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
AsyncQueue::<NoTls>::remove_all_scheduled_tasks_query(transaction).await
|
||||
}
|
||||
|
||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let result = AsyncQueue::<NoTls>::remove_task_query(transaction, task).await?;
|
||||
|
||||
Ok(result)
|
||||
AsyncQueue::<NoTls>::remove_task_query(transaction, task).await
|
||||
}
|
||||
|
||||
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let result = AsyncQueue::<NoTls>::remove_tasks_type_query(transaction, task_type).await?;
|
||||
|
||||
Ok(result)
|
||||
AsyncQueue::<NoTls>::remove_tasks_type_query(transaction, task_type).await
|
||||
}
|
||||
|
||||
async fn update_task_state(
|
||||
|
@ -320,9 +278,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
) -> Result<Task, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let task = AsyncQueue::<NoTls>::update_task_state_query(transaction, task, state).await?;
|
||||
|
||||
Ok(task)
|
||||
AsyncQueue::<NoTls>::update_task_state_query(transaction, task, state).await
|
||||
}
|
||||
|
||||
async fn fail_task(
|
||||
|
@ -332,9 +288,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
) -> Result<Task, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let task = AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await?;
|
||||
|
||||
Ok(task)
|
||||
AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,6 +324,18 @@ where
|
|||
Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await
|
||||
}
|
||||
|
||||
async fn remove_all_scheduled_tasks_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
) -> Result<u64, AsyncQueueError> {
|
||||
Self::execute_query(
|
||||
transaction,
|
||||
REMOVE_ALL_SCHEDULED_TASK_QUERY,
|
||||
&[&Utc::now()],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn remove_task_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
task: Task,
|
||||
|
@ -384,6 +350,16 @@ where
|
|||
Self::execute_query(transaction, REMOVE_TASKS_TYPE_QUERY, &[&task_type], None).await
|
||||
}
|
||||
|
||||
async fn find_task_by_id_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
id: Uuid,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let row: Row = transaction.query_one(FIND_TASK_BY_ID_QUERY, &[&id]).await?;
|
||||
|
||||
let task = Self::row_to_task(row);
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn fail_task_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
task: Task,
|
||||
|
@ -435,7 +411,7 @@ where
|
|||
task_type: &str,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let row: Row = transaction
|
||||
.query_one(FETCH_TASK_TYPE_QUERY, &[&task_type])
|
||||
.query_one(FETCH_TASK_TYPE_QUERY, &[&task_type, &Utc::now()])
|
||||
.await?;
|
||||
|
||||
let task = Self::row_to_task(row);
|
||||
|
@ -461,72 +437,32 @@ where
|
|||
transaction: &mut Transaction<'_>,
|
||||
metadata: serde_json::Value,
|
||||
task_type: &str,
|
||||
scheduled_at: DateTime<Utc>,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let row: Row = transaction
|
||||
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type])
|
||||
.query_one(INSERT_TASK_QUERY, &[&metadata, &task_type, &scheduled_at])
|
||||
.await?;
|
||||
let task = Self::row_to_task(row);
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn schedule_next_task_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
periodic_task: PeriodicTask,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let updated_at = Utc::now();
|
||||
let scheduled_at = updated_at + Duration::milliseconds(periodic_task.period_in_millis);
|
||||
|
||||
let row: Row = transaction
|
||||
.query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at])
|
||||
.await?;
|
||||
|
||||
let periodic_task = Self::row_to_periodic_task(row);
|
||||
Ok(periodic_task)
|
||||
}
|
||||
|
||||
async fn insert_periodic_task_query(
|
||||
async fn insert_task_uniq_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
metadata: serde_json::Value,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
task_type: &str,
|
||||
scheduled_at: DateTime<Utc>,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let uniq_hash = Self::calculate_hash(metadata.to_string());
|
||||
|
||||
let row: Row = transaction
|
||||
.query_one(
|
||||
INSERT_PERIODIC_TASK_QUERY,
|
||||
&[&metadata, ×tamp, &period],
|
||||
INSERT_TASK_UNIQ_QUERY,
|
||||
&[&metadata, &task_type, &uniq_hash, &scheduled_at],
|
||||
)
|
||||
.await?;
|
||||
let periodic_task = Self::row_to_periodic_task(row);
|
||||
Ok(periodic_task)
|
||||
}
|
||||
|
||||
async fn fetch_periodic_tasks_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
let current_time = Utc::now();
|
||||
|
||||
let margin: Duration = match Duration::from_std(error_margin) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(_) => Err(AsyncQueueError::TimeError),
|
||||
}?;
|
||||
|
||||
let low_limit = current_time - margin;
|
||||
let high_limit = current_time + margin;
|
||||
let rows: Vec<Row> = transaction
|
||||
.query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit])
|
||||
.await?;
|
||||
|
||||
let periodic_tasks: Vec<PeriodicTask> = rows
|
||||
.into_iter()
|
||||
.map(|row| Self::row_to_periodic_task(row))
|
||||
.collect();
|
||||
|
||||
if periodic_tasks.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(periodic_tasks))
|
||||
}
|
||||
let task = Self::row_to_task(row);
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
|
@ -552,19 +488,31 @@ where
|
|||
transaction: &mut Transaction<'_>,
|
||||
metadata: serde_json::Value,
|
||||
task_type: &str,
|
||||
scheduled_at: DateTime<Utc>,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
match Self::find_task_by_metadata_query(transaction, &metadata).await {
|
||||
match Self::find_task_by_uniq_hash_query(transaction, &metadata).await {
|
||||
Some(task) => Ok(task),
|
||||
None => Self::insert_task_query(transaction, metadata, task_type).await,
|
||||
None => {
|
||||
Self::insert_task_uniq_query(transaction, metadata, task_type, scheduled_at).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_task_by_metadata_query(
|
||||
fn calculate_hash(json: String) -> String {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(json.as_bytes());
|
||||
let result = hasher.finalize();
|
||||
hex::encode(result)
|
||||
}
|
||||
|
||||
async fn find_task_by_uniq_hash_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
metadata: &serde_json::Value,
|
||||
) -> Option<Task> {
|
||||
let uniq_hash = Self::calculate_hash(metadata.to_string());
|
||||
|
||||
let result = transaction
|
||||
.query_one(FIND_TASK_BY_METADATA_QUERY, &[metadata])
|
||||
.query_one(FIND_TASK_BY_UNIQ_HASH_QUERY, &[&uniq_hash])
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
@ -573,47 +521,29 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn row_to_periodic_task(row: Row) -> PeriodicTask {
|
||||
let id: Uuid = row.get("id");
|
||||
let metadata: serde_json::Value = row.get("metadata");
|
||||
let period_in_millis: i64 = row.get("period_in_millis");
|
||||
let scheduled_at: Option<DateTime<Utc>> = match row.try_get("scheduled_at") {
|
||||
Ok(datetime) => Some(datetime),
|
||||
Err(_) => None,
|
||||
};
|
||||
let created_at: DateTime<Utc> = row.get("created_at");
|
||||
let updated_at: DateTime<Utc> = row.get("updated_at");
|
||||
|
||||
PeriodicTask::builder()
|
||||
.id(id)
|
||||
.metadata(metadata)
|
||||
.period_in_millis(period_in_millis)
|
||||
.scheduled_at(scheduled_at)
|
||||
.created_at(created_at)
|
||||
.updated_at(updated_at)
|
||||
.build()
|
||||
}
|
||||
|
||||
fn row_to_task(row: Row) -> Task {
|
||||
let id: Uuid = row.get("id");
|
||||
let metadata: serde_json::Value = row.get("metadata");
|
||||
let error_message: Option<String> = match row.try_get("error_message") {
|
||||
Ok(error_message) => Some(error_message),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
let error_message: Option<String> = row.try_get("error_message").ok();
|
||||
|
||||
let uniq_hash: Option<String> = row.try_get("uniq_hash").ok();
|
||||
let state: FangTaskState = row.get("state");
|
||||
let task_type: String = row.get("task_type");
|
||||
let created_at: DateTime<Utc> = row.get("created_at");
|
||||
let updated_at: DateTime<Utc> = row.get("updated_at");
|
||||
let scheduled_at: DateTime<Utc> = row.get("scheduled_at");
|
||||
|
||||
Task::builder()
|
||||
.id(id)
|
||||
.metadata(metadata)
|
||||
.error_message(error_message)
|
||||
.state(state)
|
||||
.uniq_hash(uniq_hash)
|
||||
.task_type(task_type)
|
||||
.created_at(created_at)
|
||||
.updated_at(updated_at)
|
||||
.scheduled_at(scheduled_at)
|
||||
.build()
|
||||
}
|
||||
}
|
||||
|
@ -626,6 +556,17 @@ where
|
|||
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||
{
|
||||
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError> {
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let task = Self::find_task_by_id_query(&mut transaction, id).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn fetch_and_touch_task(
|
||||
&mut self,
|
||||
task_type: Option<String>,
|
||||
|
@ -648,10 +589,16 @@ where
|
|||
|
||||
let metadata = serde_json::to_value(task)?;
|
||||
|
||||
let task: Task = if self.duplicated_tasks {
|
||||
Self::insert_task_query(&mut transaction, metadata, &task.task_type()).await?
|
||||
let task: Task = if !task.uniq() {
|
||||
Self::insert_task_query(&mut transaction, metadata, &task.task_type(), Utc::now())
|
||||
.await?
|
||||
} else {
|
||||
Self::insert_task_if_not_exist_query(&mut transaction, metadata, &task.task_type())
|
||||
Self::insert_task_if_not_exist_query(
|
||||
&mut transaction,
|
||||
metadata,
|
||||
&task.task_type(),
|
||||
Utc::now(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
|
@ -660,55 +607,44 @@ where
|
|||
Ok(task)
|
||||
}
|
||||
|
||||
async fn insert_periodic_task(
|
||||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let metadata = serde_json::to_value(task)?;
|
||||
|
||||
let periodic_task =
|
||||
Self::insert_periodic_task_query(&mut transaction, metadata, timestamp, period).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
let scheduled_at = match task.cron() {
|
||||
Some(scheduled) => match scheduled {
|
||||
CronPattern(cron_pattern) => {
|
||||
let schedule = Schedule::from_str(&cron_pattern)?;
|
||||
let mut iterator = schedule.upcoming(Utc);
|
||||
iterator
|
||||
.next()
|
||||
.ok_or(AsyncQueueError::CronError(CronError::NoTimestampsError))?
|
||||
}
|
||||
|
||||
async fn schedule_next_task(
|
||||
&mut self,
|
||||
periodic_task: PeriodicTask,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
ScheduleOnce(datetime) => datetime,
|
||||
},
|
||||
None => {
|
||||
return Err(AsyncQueueError::CronError(
|
||||
CronError::TaskNotSchedulableError,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let periodic_task =
|
||||
Self::fetch_periodic_tasks_query(&mut transaction, error_margin).await?;
|
||||
|
||||
let task: Task = if !task.uniq() {
|
||||
Self::insert_task_query(&mut transaction, metadata, &task.task_type(), scheduled_at)
|
||||
.await?
|
||||
} else {
|
||||
Self::insert_task_if_not_exist_query(
|
||||
&mut transaction,
|
||||
metadata,
|
||||
&task.task_type(),
|
||||
scheduled_at,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||
|
@ -723,6 +659,18 @@ where
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let result = Self::remove_all_scheduled_tasks_query(&mut transaction).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
|
@ -784,12 +732,17 @@ mod async_queue_tests {
|
|||
use super::AsyncQueueable;
|
||||
use super::FangTaskState;
|
||||
use super::Task;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::asynk::AsyncRunnable;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled;
|
||||
use async_trait::async_trait;
|
||||
use bb8_postgres::bb8::Pool;
|
||||
use bb8_postgres::tokio_postgres::NoTls;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::SubsecRound;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -800,11 +753,30 @@ mod async_queue_tests {
|
|||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncTask {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AsyncTaskSchedule {
|
||||
pub number: u16,
|
||||
pub datetime: String,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncTaskSchedule {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
let datetime = self.datetime.parse::<DateTime<Utc>>().ok()?;
|
||||
Some(Scheduled::ScheduleOnce(datetime))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_task_creates_new_task() {
|
||||
let pool = pool().await;
|
||||
|
@ -912,6 +884,60 @@ mod async_queue_tests {
|
|||
test.transaction.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schedule_task_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
|
||||
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
|
||||
|
||||
let task = &AsyncTaskSchedule {
|
||||
number: 1,
|
||||
datetime: datetime.to_string(),
|
||||
};
|
||||
|
||||
let task = test.schedule_task(task).await.unwrap();
|
||||
|
||||
let metadata = task.metadata.as_object().unwrap();
|
||||
let number = metadata["number"].as_u64();
|
||||
let type_task = metadata["type"].as_str();
|
||||
|
||||
assert_eq!(Some(1), number);
|
||||
assert_eq!(Some("AsyncTaskSchedule"), type_task);
|
||||
assert_eq!(task.scheduled_at, datetime);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remove_all_scheduled_tasks_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
|
||||
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
|
||||
|
||||
let task1 = &AsyncTaskSchedule {
|
||||
number: 1,
|
||||
datetime: datetime.to_string(),
|
||||
};
|
||||
|
||||
let task2 = &AsyncTaskSchedule {
|
||||
number: 2,
|
||||
datetime: datetime.to_string(),
|
||||
};
|
||||
|
||||
test.schedule_task(task1).await.unwrap();
|
||||
test.schedule_task(task2).await.unwrap();
|
||||
|
||||
let number = test.remove_all_scheduled_tasks().await.unwrap();
|
||||
|
||||
assert_eq!(2, number);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_and_touch_test() {
|
||||
let pool = pool().await;
|
||||
|
|
|
@ -1,19 +1,55 @@
|
|||
use crate::async_queue::AsyncQueueError;
|
||||
use crate::asynk::async_queue::AsyncQueueable;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled;
|
||||
use async_trait::async_trait;
|
||||
use bb8_postgres::bb8::RunError;
|
||||
use bb8_postgres::tokio_postgres::Error as TokioPostgresError;
|
||||
use serde_json::Error as SerdeError;
|
||||
|
||||
const COMMON_TYPE: &str = "common";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
pub description: String,
|
||||
impl From<AsyncQueueError> for FangError {
|
||||
fn from(error: AsyncQueueError) -> Self {
|
||||
let message = format!("{:?}", error);
|
||||
FangError {
|
||||
description: message,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TokioPostgresError> for FangError {
|
||||
fn from(error: TokioPostgresError) -> Self {
|
||||
Self::from(AsyncQueueError::PgError(error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RunError<TokioPostgresError>> for FangError {
|
||||
fn from(error: RunError<TokioPostgresError>) -> Self {
|
||||
Self::from(AsyncQueueError::PoolError(error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SerdeError> for FangError {
|
||||
fn from(error: SerdeError) -> Self {
|
||||
Self::from(AsyncQueueError::SerdeError(error))
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde(tag = "type")]
|
||||
#[async_trait]
|
||||
pub trait AsyncRunnable: Send + Sync {
|
||||
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>;
|
||||
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>;
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
COMMON_TYPE.to_string()
|
||||
}
|
||||
|
||||
fn uniq(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,260 +0,0 @@
|
|||
use crate::asynk::async_queue::AsyncQueueable;
|
||||
use crate::asynk::async_queue::PeriodicTask;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::asynk::AsyncRunnable;
|
||||
use async_recursion::async_recursion;
|
||||
use log::error;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
#[derive(TypedBuilder, Clone)]
|
||||
pub struct Scheduler<AQueue>
|
||||
where
|
||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
||||
{
|
||||
#[builder(setter(into))]
|
||||
pub check_period: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub error_margin: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub queue: AQueue,
|
||||
#[builder(default = 0, setter(into))]
|
||||
pub number_of_restarts: u32,
|
||||
}
|
||||
|
||||
impl<AQueue> Scheduler<AQueue>
|
||||
where
|
||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
||||
{
|
||||
#[async_recursion(?Send)]
|
||||
pub async fn start(&mut self) -> Result<(), Error> {
|
||||
let join_handle: JoinHandle<Result<(), Error>> = self.schedule_loop().await;
|
||||
|
||||
match join_handle.await {
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Scheduler panicked, restarting {:?}. Number of restarts {}",
|
||||
err, self.number_of_restarts
|
||||
);
|
||||
self.number_of_restarts += 1;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
self.start().await
|
||||
}
|
||||
Ok(task_res) => match task_res {
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Scheduler failed, restarting {:?}. Number of restarts {}",
|
||||
err, self.number_of_restarts
|
||||
);
|
||||
self.number_of_restarts += 1;
|
||||
self.start().await
|
||||
}
|
||||
Ok(_) => {
|
||||
error!(
|
||||
"Scheduler stopped. restarting. Number of restarts {}",
|
||||
self.number_of_restarts
|
||||
);
|
||||
self.number_of_restarts += 1;
|
||||
self.start().await
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn schedule_loop(&mut self) -> JoinHandle<Result<(), Error>> {
|
||||
let mut scheduler = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let sleep_duration = scheduler.check_period;
|
||||
|
||||
loop {
|
||||
scheduler.schedule().await?;
|
||||
|
||||
sleep(sleep_duration).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn schedule(&mut self) -> Result<(), Error> {
|
||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
||||
for task in tasks {
|
||||
self.process_task(task).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
|
||||
match task.scheduled_at {
|
||||
None => {
|
||||
self.queue.schedule_next_task(task).await?;
|
||||
}
|
||||
Some(_) => {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
self.queue.insert_task(&*actual_task).await?;
|
||||
|
||||
self.queue.schedule_next_task(task).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(TypedBuilder)]
|
||||
pub struct SchedulerTest<'a> {
|
||||
#[builder(setter(into))]
|
||||
pub check_period: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub error_margin: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub queue: &'a mut dyn AsyncQueueable,
|
||||
#[builder(default = 0, setter(into))]
|
||||
pub number_of_restarts: u32,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> SchedulerTest<'a> {
|
||||
async fn schedule_test(&mut self) -> Result<(), Error> {
|
||||
let sleep_duration = self.check_period;
|
||||
|
||||
loop {
|
||||
match self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
||||
Some(tasks) => {
|
||||
for task in tasks {
|
||||
self.process_task(task).await?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
None => {
|
||||
sleep(sleep_duration).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
|
||||
match task.scheduled_at {
|
||||
None => {
|
||||
self.queue.schedule_next_task(task).await?;
|
||||
}
|
||||
Some(_) => {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
self.queue.insert_task(&*actual_task).await?;
|
||||
|
||||
self.queue.schedule_next_task(task).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod async_scheduler_tests {
|
||||
use super::SchedulerTest;
|
||||
use crate::asynk::async_queue::AsyncQueueTest;
|
||||
use crate::asynk::async_queue::AsyncQueueable;
|
||||
use crate::asynk::async_queue::PeriodicTask;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::asynk::AsyncRunnable;
|
||||
use async_trait::async_trait;
|
||||
use bb8_postgres::bb8::Pool;
|
||||
use bb8_postgres::tokio_postgres::NoTls;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration as OtherDuration;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AsyncScheduledTask {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncScheduledTask {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn task_type(&self) -> String {
|
||||
"schedule".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schedules_tasks() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
|
||||
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
|
||||
|
||||
let _periodic_task = insert_periodic_task(
|
||||
&mut test,
|
||||
&AsyncScheduledTask { number: 1 },
|
||||
schedule_in_future,
|
||||
10000,
|
||||
)
|
||||
.await;
|
||||
|
||||
let check_period: u64 = 1;
|
||||
let error_margin_seconds: u64 = 2;
|
||||
|
||||
let mut scheduler = SchedulerTest::builder()
|
||||
.check_period(Duration::from_secs(check_period))
|
||||
.error_margin(Duration::from_secs(error_margin_seconds))
|
||||
.queue(&mut test as &mut dyn AsyncQueueable)
|
||||
.build();
|
||||
// Scheduler start tricky not loop :)
|
||||
scheduler.schedule_test().await.unwrap();
|
||||
|
||||
let task = scheduler
|
||||
.queue
|
||||
.fetch_and_touch_task(Some("schedule".to_string()))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let metadata = task.metadata.as_object().unwrap();
|
||||
let number = metadata["number"].as_u64();
|
||||
let type_task = metadata["type"].as_str();
|
||||
|
||||
let runnable_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
assert_eq!("schedule", runnable_task.task_type());
|
||||
assert_eq!(Some("AsyncScheduledTask"), type_task);
|
||||
assert_eq!(Some(1), number);
|
||||
}
|
||||
|
||||
async fn insert_periodic_task(
|
||||
test: &mut AsyncQueueTest<'_>,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period_in_millis: i64,
|
||||
) -> PeriodicTask {
|
||||
test.insert_periodic_task(task, timestamp, period_in_millis)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn pool() -> Pool<PostgresConnectionManager<NoTls>> {
|
||||
let pg_mgr = PostgresConnectionManager::new_from_stringlike(
|
||||
"postgres://postgres:postgres@localhost/fang",
|
||||
NoTls,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Pool::builder().build(pg_mgr).await.unwrap()
|
||||
}
|
||||
}
|
|
@ -3,7 +3,8 @@ use crate::asynk::async_queue::FangTaskState;
|
|||
use crate::asynk::async_queue::Task;
|
||||
use crate::asynk::async_queue::DEFAULT_TASK_TYPE;
|
||||
use crate::asynk::async_runnable::AsyncRunnable;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled::*;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use log::error;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
@ -27,15 +28,20 @@ impl<AQueue> AsyncWorker<AQueue>
|
|||
where
|
||||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
||||
{
|
||||
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
|
||||
let result = self.execute_task(task).await;
|
||||
pub async fn run(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<(), FangError> {
|
||||
let result = self.execute_task(task, actual_task).await;
|
||||
self.finalize_task(result).await
|
||||
}
|
||||
|
||||
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
async fn execute_task(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<Task, (Task, String)> {
|
||||
let task_result = actual_task.run(&mut self.queue).await;
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
|
@ -43,7 +49,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
async fn finalize_task(&mut self, result: Result<Task, (Task, String)>) -> Result<(), Error> {
|
||||
async fn finalize_task(
|
||||
&mut self,
|
||||
result: Result<Task, (Task, String)>,
|
||||
) -> Result<(), FangError> {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => match result {
|
||||
Ok(task) => {
|
||||
|
@ -86,16 +95,26 @@ where
|
|||
tokio::time::sleep(self.sleep_params.sleep_period).await;
|
||||
}
|
||||
|
||||
pub async fn run_tasks(&mut self) -> Result<(), Error> {
|
||||
pub async fn run_tasks(&mut self) -> Result<(), FangError> {
|
||||
loop {
|
||||
//fetch task
|
||||
match self
|
||||
.queue
|
||||
.fetch_and_touch_task(Some(self.task_type.clone()))
|
||||
.await
|
||||
{
|
||||
Ok(Some(task)) => {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
// check if task is scheduled or not
|
||||
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||
// program task
|
||||
self.queue.schedule_task(&*actual_task).await?;
|
||||
}
|
||||
self.sleep_params.maybe_reset_sleep_period();
|
||||
self.run(task).await?
|
||||
// run scheduled task
|
||||
self.run(task, actual_task).await?;
|
||||
}
|
||||
Ok(None) => {
|
||||
self.sleep().await;
|
||||
|
@ -126,15 +145,20 @@ pub struct AsyncWorkerTest<'a> {
|
|||
|
||||
#[cfg(test)]
|
||||
impl<'a> AsyncWorkerTest<'a> {
|
||||
pub async fn run(&mut self, task: Task) -> Result<(), Error> {
|
||||
let result = self.execute_task(task).await;
|
||||
pub async fn run(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<(), FangError> {
|
||||
let result = self.execute_task(task, actual_task).await;
|
||||
self.finalize_task(result).await
|
||||
}
|
||||
|
||||
async fn execute_task(&mut self, task: Task) -> Result<Task, (Task, String)> {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
async fn execute_task(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<Task, (Task, String)> {
|
||||
let task_result = actual_task.run(self.queue).await;
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
|
@ -142,7 +166,10 @@ impl<'a> AsyncWorkerTest<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn finalize_task(&mut self, result: Result<Task, (Task, String)>) -> Result<(), Error> {
|
||||
async fn finalize_task(
|
||||
&mut self,
|
||||
result: Result<Task, (Task, String)>,
|
||||
) -> Result<(), FangError> {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => match result {
|
||||
Ok(task) => {
|
||||
|
@ -185,7 +212,7 @@ impl<'a> AsyncWorkerTest<'a> {
|
|||
tokio::time::sleep(self.sleep_params.sleep_period).await;
|
||||
}
|
||||
|
||||
pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> {
|
||||
pub async fn run_tasks_until_none(&mut self) -> Result<(), FangError> {
|
||||
loop {
|
||||
match self
|
||||
.queue
|
||||
|
@ -193,8 +220,17 @@ impl<'a> AsyncWorkerTest<'a> {
|
|||
.await
|
||||
{
|
||||
Ok(Some(task)) => {
|
||||
let actual_task: Box<dyn AsyncRunnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
// check if task is scheduled or not
|
||||
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||
// program task
|
||||
self.queue.schedule_task(&*actual_task).await?;
|
||||
}
|
||||
self.sleep_params.maybe_reset_sleep_period();
|
||||
self.run(task).await?
|
||||
// run scheduled task
|
||||
self.run(task, actual_task).await?;
|
||||
}
|
||||
Ok(None) => {
|
||||
return Ok(());
|
||||
|
@ -216,13 +252,16 @@ mod async_worker_tests {
|
|||
use crate::asynk::async_queue::AsyncQueueable;
|
||||
use crate::asynk::async_queue::FangTaskState;
|
||||
use crate::asynk::async_worker::Task;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::asynk::AsyncRunnable;
|
||||
use crate::FangError;
|
||||
use crate::RetentionMode;
|
||||
use crate::Scheduled;
|
||||
use async_trait::async_trait;
|
||||
use bb8_postgres::bb8::Pool;
|
||||
use bb8_postgres::tokio_postgres::NoTls;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -233,10 +272,27 @@ mod async_worker_tests {
|
|||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for WorkerAsyncTask {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct WorkerAsyncTaskSchedule {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for WorkerAsyncTaskSchedule {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(1)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AsyncFailedTask {
|
||||
pub number: u16,
|
||||
|
@ -245,10 +301,10 @@ mod async_worker_tests {
|
|||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncFailedTask {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
let message = format!("number {} is wrong :(", self.number);
|
||||
|
||||
Err(Error {
|
||||
Err(FangError {
|
||||
description: message,
|
||||
})
|
||||
}
|
||||
|
@ -260,7 +316,7 @@ mod async_worker_tests {
|
|||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncTaskType1 {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -275,7 +331,7 @@ mod async_worker_tests {
|
|||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncTaskType2 {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -290,8 +346,9 @@ mod async_worker_tests {
|
|||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
let actual_task = WorkerAsyncTask { number: 1 };
|
||||
|
||||
let task = insert_task(&mut test, &WorkerAsyncTask { number: 1 }).await;
|
||||
let task = insert_task(&mut test, &actual_task).await;
|
||||
let id = task.id;
|
||||
|
||||
let mut worker = AsyncWorkerTest::builder()
|
||||
|
@ -299,12 +356,48 @@ mod async_worker_tests {
|
|||
.retention_mode(RetentionMode::KeepAll)
|
||||
.build();
|
||||
|
||||
worker.run(task).await.unwrap();
|
||||
worker.run(task, Box::new(actual_task)).await.unwrap();
|
||||
let task_finished = test.find_task_by_id(id).await.unwrap();
|
||||
assert_eq!(id, task_finished.id);
|
||||
assert_eq!(FangTaskState::Finished, task_finished.state);
|
||||
test.transaction.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn schedule_task_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
|
||||
let actual_task = WorkerAsyncTaskSchedule { number: 1 };
|
||||
|
||||
let task = test.schedule_task(&actual_task).await.unwrap();
|
||||
|
||||
let id = task.id;
|
||||
|
||||
let mut worker = AsyncWorkerTest::builder()
|
||||
.queue(&mut test as &mut dyn AsyncQueueable)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.build();
|
||||
|
||||
worker.run_tasks_until_none().await.unwrap();
|
||||
|
||||
let task = worker.queue.find_task_by_id(id).await.unwrap();
|
||||
|
||||
assert_eq!(id, task.id);
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
|
||||
|
||||
worker.run_tasks_until_none().await.unwrap();
|
||||
|
||||
let task = test.find_task_by_id(id).await.unwrap();
|
||||
assert_eq!(id, task.id);
|
||||
assert_eq!(FangTaskState::Finished, task.state);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn saves_error_for_failed_task() {
|
||||
let pool = pool().await;
|
||||
|
@ -312,8 +405,9 @@ mod async_worker_tests {
|
|||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
let failed_task = AsyncFailedTask { number: 1 };
|
||||
|
||||
let task = insert_task(&mut test, &AsyncFailedTask { number: 1 }).await;
|
||||
let task = insert_task(&mut test, &failed_task).await;
|
||||
let id = task.id;
|
||||
|
||||
let mut worker = AsyncWorkerTest::builder()
|
||||
|
@ -321,7 +415,7 @@ mod async_worker_tests {
|
|||
.retention_mode(RetentionMode::KeepAll)
|
||||
.build();
|
||||
|
||||
worker.run(task).await.unwrap();
|
||||
worker.run(task, Box::new(failed_task)).await.unwrap();
|
||||
let task_finished = test.find_task_by_id(id).await.unwrap();
|
||||
|
||||
assert_eq!(id, task_finished.id);
|
||||
|
@ -367,6 +461,7 @@ mod async_worker_tests {
|
|||
assert_eq!(FangTaskState::New, task2.state);
|
||||
test.transaction.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remove_when_finished() {
|
||||
let pool = pool().await;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::asynk::async_queue::AsyncQueueable;
|
||||
use crate::asynk::async_queue::DEFAULT_TASK_TYPE;
|
||||
use crate::asynk::async_worker::AsyncWorker;
|
||||
use crate::asynk::AsyncError as Error;
|
||||
use crate::FangError;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use async_recursion::async_recursion;
|
||||
use log::error;
|
||||
|
@ -61,7 +61,7 @@ where
|
|||
sleep_params: SleepParams,
|
||||
retention_mode: RetentionMode,
|
||||
task_type: String,
|
||||
) -> JoinHandle<Result<(), Error>> {
|
||||
) -> JoinHandle<Result<(), FangError>> {
|
||||
tokio::spawn(async move {
|
||||
Self::run_worker(queue, sleep_params, retention_mode, task_type).await
|
||||
})
|
||||
|
@ -71,7 +71,7 @@ where
|
|||
sleep_params: SleepParams,
|
||||
retention_mode: RetentionMode,
|
||||
task_type: String,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), FangError> {
|
||||
let mut worker: AsyncWorker<AQueue> = AsyncWorker::builder()
|
||||
.queue(queue)
|
||||
.sleep_params(sleep_params)
|
||||
|
|
|
@ -1 +1 @@
|
|||
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
||||
UPDATE "fang_tasks" SET "state" = $1 , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING *
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
SELECT * FROM fang_periodic_tasks WHERE scheduled_at BETWEEN $1 AND $2 OR scheduled_at IS NULL
|
|
@ -1 +1 @@
|
|||
SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||
SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
SELECT * FROM fang_periodic_tasks WHERE id = $1
|
|
@ -1 +0,0 @@
|
|||
SELECT * FROM fang_tasks WHERE metadata = $1 LIMIT 1
|
1
src/asynk/queries/find_task_by_uniq_hash.sql
Normal file
1
src/asynk/queries/find_task_by_uniq_hash.sql
Normal file
|
@ -0,0 +1 @@
|
|||
SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1
|
|
@ -1 +0,0 @@
|
|||
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_millis") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
|
@ -1 +1 @@
|
|||
INSERT INTO "fang_tasks" ("metadata", "task_type") VALUES ($1, $2) RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
||||
INSERT INTO "fang_tasks" ("metadata", "task_type", "scheduled_at") VALUES ($1, $2, $3) RETURNING *
|
||||
|
|
1
src/asynk/queries/insert_task_uniq.sql
Normal file
1
src/asynk/queries/insert_task_uniq.sql
Normal file
|
@ -0,0 +1 @@
|
|||
INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) RETURNING *
|
|
@ -1 +0,0 @@
|
|||
DELETE FROM "fang_periodic_tasks"
|
1
src/asynk/queries/remove_all_scheduled_tasks.sql
Normal file
1
src/asynk/queries/remove_all_scheduled_tasks.sql
Normal file
|
@ -0,0 +1 @@
|
|||
DELETE FROM "fang_tasks" WHERE scheduled_at > $1
|
|
@ -1 +0,0 @@
|
|||
UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
|
@ -1 +1 @@
|
|||
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at
|
||||
UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING *
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod queue;
|
||||
pub mod scheduler;
|
||||
pub mod runnable;
|
||||
pub mod schema;
|
||||
pub mod worker;
|
||||
pub mod worker_pool;
|
||||
|
||||
pub use error::FangError;
|
||||
pub use executor::*;
|
||||
pub use queue::*;
|
||||
pub use scheduler::*;
|
||||
pub use runnable::Runnable;
|
||||
pub use schema::*;
|
||||
pub use worker::*;
|
||||
pub use worker_pool::*;
|
||||
|
|
|
@ -1,21 +1,31 @@
|
|||
use crate::blocking::queue::QueueError;
|
||||
use crate::FangError;
|
||||
use diesel::r2d2::PoolError;
|
||||
use diesel::result::Error as DieselError;
|
||||
use std::io::Error as IoError;
|
||||
use std::sync::PoisonError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum FangError {
|
||||
#[error("The shared state in an executor thread became poisoned")]
|
||||
PoisonedLock,
|
||||
|
||||
#[error("Failed to create executor thread")]
|
||||
ExecutorThreadCreationFailed {
|
||||
#[from]
|
||||
source: IoError,
|
||||
},
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<T>> for FangError {
|
||||
fn from(_: PoisonError<T>) -> Self {
|
||||
Self::PoisonedLock
|
||||
impl From<IoError> for FangError {
|
||||
fn from(error: IoError) -> Self {
|
||||
let description = format!("{:?}", error);
|
||||
FangError { description }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<QueueError> for FangError {
|
||||
fn from(error: QueueError) -> Self {
|
||||
let description = format!("{:?}", error);
|
||||
FangError { description }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DieselError> for FangError {
|
||||
fn from(error: DieselError) -> Self {
|
||||
Self::from(QueueError::DieselError(error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PoolError> for FangError {
|
||||
fn from(error: PoolError) -> Self {
|
||||
Self::from(QueueError::PoolError(error))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,328 +0,0 @@
|
|||
use crate::error::FangError;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Task;
|
||||
use crate::worker_pool::{SharedState, WorkerState};
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||
use log::error;
|
||||
use std::thread;
|
||||
|
||||
pub struct Executor {
|
||||
pub pooled_connection: PooledConnection<ConnectionManager<PgConnection>>,
|
||||
pub task_type: Option<String>,
|
||||
pub sleep_params: SleepParams,
|
||||
pub retention_mode: RetentionMode,
|
||||
shared_state: Option<SharedState>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Error {
|
||||
pub description: String,
|
||||
}
|
||||
|
||||
#[typetag::serde(tag = "type")]
|
||||
pub trait Runnable {
|
||||
fn run(&self, connection: &PgConnection) -> Result<(), Error>;
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"common".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub fn new(pooled_connection: PooledConnection<ConnectionManager<PgConnection>>) -> Self {
|
||||
Self {
|
||||
pooled_connection,
|
||||
sleep_params: SleepParams::default(),
|
||||
retention_mode: RetentionMode::RemoveFinished,
|
||||
task_type: None,
|
||||
shared_state: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_shared_state(&mut self, shared_state: SharedState) {
|
||||
self.shared_state = Some(shared_state);
|
||||
}
|
||||
|
||||
pub fn set_task_type(&mut self, task_type: String) {
|
||||
self.task_type = Some(task_type);
|
||||
}
|
||||
|
||||
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
|
||||
self.sleep_params = sleep_params;
|
||||
}
|
||||
|
||||
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
|
||||
self.retention_mode = retention_mode;
|
||||
}
|
||||
|
||||
pub fn run(&self, task: Task) {
|
||||
let result = self.execute_task(task);
|
||||
self.finalize_task(result)
|
||||
}
|
||||
|
||||
pub fn run_tasks(&mut self) -> Result<(), FangError> {
|
||||
loop {
|
||||
if let Some(ref shared_state) = self.shared_state {
|
||||
let shared_state = shared_state.read()?;
|
||||
if let WorkerState::Shutdown = *shared_state {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) {
|
||||
Ok(Some(task)) => {
|
||||
self.maybe_reset_sleep_period();
|
||||
self.run(task);
|
||||
}
|
||||
Ok(None) => {
|
||||
self.sleep();
|
||||
}
|
||||
|
||||
Err(error) => {
|
||||
error!("Failed to fetch a task {:?}", error);
|
||||
|
||||
self.sleep();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_reset_sleep_period(&mut self) {
|
||||
self.sleep_params.maybe_reset_sleep_period();
|
||||
}
|
||||
|
||||
pub fn sleep(&mut self) {
|
||||
self.sleep_params.maybe_increase_sleep_period();
|
||||
|
||||
thread::sleep(self.sleep_params.sleep_period);
|
||||
}
|
||||
|
||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
let task_result = actual_task.run(&self.pooled_connection);
|
||||
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
Err(error) => Err((task, error.description)),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => {
|
||||
match result {
|
||||
Ok(task) => Queue::finish_task_query(&self.pooled_connection, &task).unwrap(),
|
||||
Err((task, error)) => {
|
||||
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap()
|
||||
}
|
||||
};
|
||||
}
|
||||
RetentionMode::RemoveAll => {
|
||||
match result {
|
||||
Ok(task) => Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(),
|
||||
Err((task, _error)) => {
|
||||
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap()
|
||||
}
|
||||
};
|
||||
}
|
||||
RetentionMode::RemoveFinished => match result {
|
||||
Ok(task) => {
|
||||
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap();
|
||||
}
|
||||
Err((task, error)) => {
|
||||
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod executor_tests {
|
||||
use super::Error;
|
||||
use super::Executor;
|
||||
use super::RetentionMode;
|
||||
use super::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::typetag;
|
||||
use crate::NewTask;
|
||||
use diesel::connection::Connection;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct ExecutorTaskTest {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for ExecutorTaskTest {
|
||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||
println!("the number is {}", self.number);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FailedTask {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for FailedTask {
|
||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||
let message = format!("the number is {}", self.number);
|
||||
|
||||
Err(Error {
|
||||
description: message,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct TaskType1 {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for TaskType1 {
|
||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"type1".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct TaskType2 {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for TaskType2 {
|
||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"type2".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize(job: &dyn Runnable) -> serde_json::Value {
|
||||
serde_json::to_value(job).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn executes_and_finishes_task() {
|
||||
let job = ExecutorTaskTest { number: 10 };
|
||||
|
||||
let new_task = NewTask {
|
||||
metadata: serialize(&job),
|
||||
task_type: "common".to_string(),
|
||||
};
|
||||
|
||||
let mut executor = Executor::new(pooled_connection());
|
||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
||||
|
||||
executor
|
||||
.pooled_connection
|
||||
.test_transaction::<(), Error, _>(|| {
|
||||
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
executor.run(task.clone());
|
||||
|
||||
let found_task =
|
||||
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Finished, found_task.state);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn executes_task_only_of_specific_type() {
|
||||
let task1 = TaskType1 {};
|
||||
let task2 = TaskType2 {};
|
||||
|
||||
let new_task1 = NewTask {
|
||||
metadata: serialize(&task1),
|
||||
task_type: "type1".to_string(),
|
||||
};
|
||||
|
||||
let new_task2 = NewTask {
|
||||
metadata: serialize(&task2),
|
||||
task_type: "type2".to_string(),
|
||||
};
|
||||
|
||||
let executor = Executor::new(pooled_connection());
|
||||
|
||||
let task1 = Queue::insert_query(&executor.pooled_connection, &new_task1).unwrap();
|
||||
let task2 = Queue::insert_query(&executor.pooled_connection, &new_task2).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task1.state);
|
||||
assert_eq!(FangTaskState::New, task2.state);
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut executor = Executor::new(pooled_connection());
|
||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
||||
executor.set_task_type("type1".to_string());
|
||||
|
||||
executor.run_tasks().unwrap();
|
||||
});
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
|
||||
let found_task1 =
|
||||
Queue::find_task_by_id_query(&executor.pooled_connection, task1.id).unwrap();
|
||||
assert_eq!(FangTaskState::Finished, found_task1.state);
|
||||
|
||||
let found_task2 =
|
||||
Queue::find_task_by_id_query(&executor.pooled_connection, task2.id).unwrap();
|
||||
assert_eq!(FangTaskState::New, found_task2.state);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn saves_error_for_failed_task() {
|
||||
let task = FailedTask { number: 10 };
|
||||
|
||||
let new_task = NewTask {
|
||||
metadata: serialize(&task),
|
||||
task_type: "common".to_string(),
|
||||
};
|
||||
|
||||
let executor = Executor::new(pooled_connection());
|
||||
|
||||
executor
|
||||
.pooled_connection
|
||||
.test_transaction::<(), Error, _>(|| {
|
||||
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
executor.run(task.clone());
|
||||
|
||||
let found_task =
|
||||
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
||||
assert_eq!(
|
||||
"the number is 10".to_string(),
|
||||
found_task.error_message.unwrap()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
fn pooled_connection() -> PooledConnection<ConnectionManager<PgConnection>> {
|
||||
Queue::connection_pool(5).get().unwrap()
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load diff
22
src/blocking/runnable.rs
Normal file
22
src/blocking/runnable.rs
Normal file
|
@ -0,0 +1,22 @@
|
|||
use crate::queue::Queueable;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled;
|
||||
|
||||
pub const COMMON_TYPE: &str = "common";
|
||||
|
||||
#[typetag::serde(tag = "type")]
|
||||
pub trait Runnable {
|
||||
fn run(&self, _queueable: &dyn Queueable) -> Result<(), FangError>;
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
COMMON_TYPE.to_string()
|
||||
}
|
||||
|
||||
fn uniq(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn cron(&self) -> Option<Scheduled> {
|
||||
None
|
||||
}
|
||||
}
|
|
@ -1,125 +0,0 @@
|
|||
use crate::executor::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::PeriodicTask;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct Scheduler {
|
||||
pub check_period: Duration,
|
||||
pub error_margin: Duration,
|
||||
pub queue: Queue,
|
||||
}
|
||||
|
||||
impl Drop for Scheduler {
|
||||
fn drop(&mut self) {
|
||||
Scheduler::start(self.check_period, self.error_margin)
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn start(check_period: Duration, error_margin: Duration) {
|
||||
let queue = Queue::new();
|
||||
let builder = thread::Builder::new().name("scheduler".to_string());
|
||||
|
||||
builder
|
||||
.spawn(move || {
|
||||
let scheduler = Self::new(check_period, error_margin, queue);
|
||||
|
||||
scheduler.schedule_loop();
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn new(check_period: Duration, error_margin: Duration, queue: Queue) -> Self {
|
||||
Self {
|
||||
check_period,
|
||||
queue,
|
||||
error_margin,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule_loop(&self) {
|
||||
loop {
|
||||
self.schedule();
|
||||
|
||||
thread::sleep(self.check_period);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule(&self) {
|
||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin) {
|
||||
for task in tasks {
|
||||
self.process_task(task);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn process_task(&self, task: PeriodicTask) {
|
||||
match task.scheduled_at {
|
||||
None => {
|
||||
self.queue.schedule_next_task_execution(&task).unwrap();
|
||||
}
|
||||
Some(_) => {
|
||||
let actual_task: Box<dyn Runnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
self.queue.push_task(&(*actual_task)).unwrap();
|
||||
|
||||
self.queue.schedule_next_task_execution(&task).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod task_scheduler_tests {
|
||||
use super::Scheduler;
|
||||
use crate::executor::Error;
|
||||
use crate::executor::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::schema::fang_tasks;
|
||||
use crate::typetag;
|
||||
use crate::Task;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct ScheduledTask {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for ScheduledTask {
|
||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"schedule".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn schedules_tasks() {
|
||||
let queue = Queue::new();
|
||||
|
||||
queue.push_periodic_task(&ScheduledTask {}, 10000).unwrap();
|
||||
Scheduler::start(Duration::from_secs(1), Duration::from_secs(2));
|
||||
|
||||
let sleep_duration = Duration::from_secs(15);
|
||||
thread::sleep(sleep_duration);
|
||||
|
||||
let tasks = get_all_tasks(&queue.connection);
|
||||
|
||||
assert_eq!(1, tasks.len());
|
||||
}
|
||||
|
||||
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
|
||||
fang_tasks::table
|
||||
.filter(fang_tasks::task_type.eq("schedule"))
|
||||
.get_results::<Task>(conn)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
|
@ -1,6 +1,11 @@
|
|||
use diesel_derive_enum::DbEnum;
|
||||
pub mod sql_types {
|
||||
#[derive(diesel::sql_types::SqlType)]
|
||||
#[diesel(postgres_type(name = "fang_task_state"))]
|
||||
pub struct FangTaskState;
|
||||
}
|
||||
|
||||
#[derive(DbEnum, Debug, Eq, PartialEq, Clone)]
|
||||
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
|
||||
#[DieselTypePath = "crate::blocking::schema::sql_types::FangTaskState"]
|
||||
pub enum FangTaskState {
|
||||
New,
|
||||
InProgress,
|
||||
|
@ -9,32 +14,23 @@ pub enum FangTaskState {
|
|||
}
|
||||
|
||||
table! {
|
||||
use super::FangTaskStateMapping;
|
||||
use super::sql_types::FangTaskState;
|
||||
use diesel::sql_types::Jsonb;
|
||||
use diesel::sql_types::Nullable;
|
||||
use diesel::sql_types::Text;
|
||||
use diesel::sql_types::Timestamptz;
|
||||
use diesel::sql_types::Uuid;
|
||||
use diesel::sql_types::Varchar;
|
||||
|
||||
use diesel::pg::sql_types::Bpchar;
|
||||
|
||||
fang_tasks (id) {
|
||||
id -> Uuid,
|
||||
metadata -> Jsonb,
|
||||
error_message -> Nullable<Text>,
|
||||
state -> FangTaskStateMapping,
|
||||
state -> FangTaskState,
|
||||
task_type -> Varchar,
|
||||
created_at -> Timestamptz,
|
||||
updated_at -> Timestamptz,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
fang_periodic_tasks (id) {
|
||||
id -> Uuid,
|
||||
metadata -> Jsonb,
|
||||
period_in_millis -> Int8,
|
||||
scheduled_at -> Nullable<Timestamptz>,
|
||||
uniq_hash -> Nullable<Bpchar>,
|
||||
scheduled_at -> Timestamptz,
|
||||
created_at -> Timestamptz,
|
||||
updated_at -> Timestamptz,
|
||||
}
|
||||
|
|
325
src/blocking/worker.rs
Normal file
325
src/blocking/worker.rs
Normal file
|
@ -0,0 +1,325 @@
|
|||
use crate::queue::Queueable;
|
||||
use crate::queue::Task;
|
||||
use crate::runnable::Runnable;
|
||||
use crate::runnable::COMMON_TYPE;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled::*;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use log::error;
|
||||
use std::thread;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
#[derive(TypedBuilder)]
|
||||
pub struct Worker<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
#[builder(setter(into))]
|
||||
pub queue: BQueue,
|
||||
#[builder(default=COMMON_TYPE.to_string(), setter(into))]
|
||||
pub task_type: String,
|
||||
#[builder(default, setter(into))]
|
||||
pub sleep_params: SleepParams,
|
||||
#[builder(default, setter(into))]
|
||||
pub retention_mode: RetentionMode,
|
||||
}
|
||||
|
||||
impl<BQueue> Worker<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub fn run(&self, task: Task) {
|
||||
let result = self.execute_task(task);
|
||||
self.finalize_task(result)
|
||||
}
|
||||
|
||||
pub fn run_tasks(&mut self) -> Result<(), FangError> {
|
||||
loop {
|
||||
match self.queue.fetch_and_touch_task(self.task_type.clone()) {
|
||||
Ok(Some(task)) => {
|
||||
let actual_task: Box<dyn Runnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
// check if task is scheduled or not
|
||||
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||
// program task
|
||||
self.queue.schedule_task(&*actual_task)?;
|
||||
}
|
||||
|
||||
self.maybe_reset_sleep_period();
|
||||
self.run(task);
|
||||
}
|
||||
Ok(None) => {
|
||||
self.sleep();
|
||||
}
|
||||
|
||||
Err(error) => {
|
||||
error!("Failed to fetch a task {:?}", error);
|
||||
|
||||
self.sleep();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn run_tasks_until_none(&mut self) -> Result<(), FangError> {
|
||||
loop {
|
||||
match self.queue.fetch_and_touch_task(self.task_type.clone()) {
|
||||
Ok(Some(task)) => {
|
||||
let actual_task: Box<dyn Runnable> =
|
||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
|
||||
// check if task is scheduled or not
|
||||
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||
// program task
|
||||
self.queue.schedule_task(&*actual_task)?;
|
||||
}
|
||||
|
||||
self.maybe_reset_sleep_period();
|
||||
self.run(task);
|
||||
}
|
||||
Ok(None) => {
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
error!("Failed to fetch a task {:?}", error);
|
||||
|
||||
self.sleep();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_reset_sleep_period(&mut self) {
|
||||
self.sleep_params.maybe_reset_sleep_period();
|
||||
}
|
||||
|
||||
pub fn sleep(&mut self) {
|
||||
self.sleep_params.maybe_increase_sleep_period();
|
||||
|
||||
thread::sleep(self.sleep_params.sleep_period);
|
||||
}
|
||||
|
||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
let task_result = actual_task.run(&self.queue);
|
||||
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
Err(error) => Err((task, error.description)),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => {
|
||||
match result {
|
||||
Ok(task) => self
|
||||
.queue
|
||||
.update_task_state(&task, FangTaskState::Finished)
|
||||
.unwrap(),
|
||||
Err((task, error)) => self.queue.fail_task(&task, error).unwrap(),
|
||||
};
|
||||
}
|
||||
RetentionMode::RemoveAll => {
|
||||
match result {
|
||||
Ok(task) => self.queue.remove_task(task.id).unwrap(),
|
||||
Err((task, _error)) => self.queue.remove_task(task.id).unwrap(),
|
||||
};
|
||||
}
|
||||
RetentionMode::RemoveFinished => match result {
|
||||
Ok(task) => {
|
||||
self.queue.remove_task(task.id).unwrap();
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(&task, error).unwrap();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod worker_tests {
|
||||
use super::RetentionMode;
|
||||
use super::Runnable;
|
||||
use super::Worker;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Queueable;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::typetag;
|
||||
use crate::FangError;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct WorkerTaskTest {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for WorkerTaskTest {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
println!("the number is {}", self.number);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"worker_task".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FailedTask {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for FailedTask {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
let message = format!("the number is {}", self.number);
|
||||
|
||||
Err(FangError {
|
||||
description: message,
|
||||
})
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"F_task".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct TaskType1 {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for TaskType1 {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"type1".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct TaskType2 {}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for TaskType2 {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"type2".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// Worker tests has to commit because the worker operations commits
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn executes_and_finishes_task() {
|
||||
let task = WorkerTaskTest { number: 10 };
|
||||
|
||||
let pool = Queue::connection_pool(5);
|
||||
|
||||
let queue = Queue::builder().connection_pool(pool).build();
|
||||
|
||||
let worker = Worker::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.task_type(task.task_type())
|
||||
.build();
|
||||
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||
|
||||
let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
// this operation commits and thats why need to commit this test
|
||||
worker.run(task.clone());
|
||||
|
||||
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Finished, found_task.state);
|
||||
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "worker_task").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn executes_task_only_of_specific_type() {
|
||||
let task1 = TaskType1 {};
|
||||
let task2 = TaskType2 {};
|
||||
|
||||
let pool = Queue::connection_pool(5);
|
||||
|
||||
let queue = Queue::builder().connection_pool(pool).build();
|
||||
|
||||
let mut worker = Worker::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.task_type(task1.task_type())
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.build();
|
||||
|
||||
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||
|
||||
let task1 = Queue::insert_query(&mut pooled_connection, &task1, Utc::now()).unwrap();
|
||||
let task2 = Queue::insert_query(&mut pooled_connection, &task2, Utc::now()).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task1.state);
|
||||
assert_eq!(FangTaskState::New, task2.state);
|
||||
|
||||
worker.run_tasks_until_none().unwrap();
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
|
||||
let found_task1 = Queue::find_task_by_id_query(&mut pooled_connection, task1.id).unwrap();
|
||||
assert_eq!(FangTaskState::Finished, found_task1.state);
|
||||
|
||||
let found_task2 = Queue::find_task_by_id_query(&mut pooled_connection, task2.id).unwrap();
|
||||
assert_eq!(FangTaskState::New, found_task2.state);
|
||||
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "type1").unwrap();
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "type2").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn saves_error_for_failed_task() {
|
||||
let task = FailedTask { number: 10 };
|
||||
|
||||
let pool = Queue::connection_pool(5);
|
||||
|
||||
let queue = Queue::builder().connection_pool(pool).build();
|
||||
|
||||
let worker = Worker::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.task_type(task.task_type())
|
||||
.build();
|
||||
|
||||
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||
|
||||
let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
worker.run(task.clone());
|
||||
|
||||
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
||||
assert_eq!(
|
||||
"the number is 10".to_string(),
|
||||
found_task.error_message.unwrap()
|
||||
);
|
||||
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "F_task").unwrap();
|
||||
}
|
||||
}
|
|
@ -1,36 +1,38 @@
|
|||
use crate::diesel::r2d2;
|
||||
use crate::diesel::PgConnection;
|
||||
use crate::error::FangError;
|
||||
use crate::executor::Executor;
|
||||
use crate::queue::Queue;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use crate::queue::Queueable;
|
||||
use crate::worker::Worker;
|
||||
use crate::FangError;
|
||||
use crate::RetentionMode;
|
||||
use crate::SleepParams;
|
||||
use log::error;
|
||||
use log::info;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WorkerPool {
|
||||
#[derive(Clone, TypedBuilder)]
|
||||
pub struct WorkerPool<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
#[builder(setter(into))]
|
||||
pub number_of_workers: u32,
|
||||
pub worker_params: WorkerParams,
|
||||
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
||||
shared_state: SharedState,
|
||||
thread_join_handles: Arc<RwLock<HashMap<String, thread::JoinHandle<()>>>>,
|
||||
#[builder(setter(into))]
|
||||
pub queue: BQueue,
|
||||
#[builder(setter(into), default)]
|
||||
pub task_type: String,
|
||||
#[builder(setter(into), default)]
|
||||
pub sleep_params: SleepParams,
|
||||
#[builder(setter(into), default)]
|
||||
pub retention_mode: RetentionMode,
|
||||
}
|
||||
|
||||
pub struct WorkerThread {
|
||||
#[derive(Clone, TypedBuilder)]
|
||||
pub struct WorkerThread<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub name: String,
|
||||
pub restarts: u64,
|
||||
pub worker_pool: WorkerPool,
|
||||
graceful_shutdown: bool,
|
||||
}
|
||||
|
||||
pub type SharedState = Arc<RwLock<WorkerState>>;
|
||||
|
||||
pub enum WorkerState {
|
||||
Running,
|
||||
Shutdown,
|
||||
pub worker_pool: WorkerPool<BQueue>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -40,342 +42,73 @@ pub struct WorkerParams {
|
|||
pub task_type: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for WorkerParams {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerParams {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
retention_mode: None,
|
||||
sleep_params: None,
|
||||
task_type: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
|
||||
self.retention_mode = Some(retention_mode);
|
||||
}
|
||||
|
||||
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
|
||||
self.sleep_params = Some(sleep_params);
|
||||
}
|
||||
|
||||
pub fn set_task_type(&mut self, task_type: String) {
|
||||
self.task_type = Some(task_type);
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerPool {
|
||||
pub fn new(number_of_workers: u32) -> Self {
|
||||
let worker_params = WorkerParams::new();
|
||||
let connection_pool = Queue::connection_pool(number_of_workers);
|
||||
|
||||
Self {
|
||||
number_of_workers,
|
||||
worker_params,
|
||||
connection_pool,
|
||||
shared_state: Arc::new(RwLock::new(WorkerState::Running)),
|
||||
thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity(
|
||||
number_of_workers as usize,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_params(number_of_workers: u32, worker_params: WorkerParams) -> Self {
|
||||
let connection_pool = Queue::connection_pool(number_of_workers);
|
||||
|
||||
Self {
|
||||
number_of_workers,
|
||||
worker_params,
|
||||
connection_pool,
|
||||
shared_state: Arc::new(RwLock::new(WorkerState::Running)),
|
||||
thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity(
|
||||
number_of_workers as usize,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
impl<BQueue> WorkerPool<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub fn start(&mut self) -> Result<(), FangError> {
|
||||
for idx in 1..self.number_of_workers + 1 {
|
||||
let worker_type = self
|
||||
.worker_params
|
||||
.task_type
|
||||
.clone()
|
||||
.unwrap_or_else(|| "".to_string());
|
||||
let name = format!("worker_{}{}", worker_type, idx);
|
||||
WorkerThread::spawn_in_pool(name.clone(), 0, self.clone())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
let name = format!("worker_{}{}", self.task_type, idx);
|
||||
|
||||
/// Attempt graceful shutdown of each job thread, blocks until all threads exit. Threads exit
|
||||
/// when their current job finishes.
|
||||
pub fn shutdown(&mut self) -> Result<(), FangError> {
|
||||
*self.shared_state.write()? = WorkerState::Shutdown;
|
||||
let worker_thread = WorkerThread::builder()
|
||||
.name(name.clone())
|
||||
.restarts(0)
|
||||
.worker_pool(self.clone())
|
||||
.build();
|
||||
|
||||
for (worker_name, thread) in self.thread_join_handles.write()?.drain() {
|
||||
if let Err(err) = thread.join() {
|
||||
error!(
|
||||
"Failed to exit executor thread '{}' cleanly: {:?}",
|
||||
worker_name, err
|
||||
);
|
||||
}
|
||||
worker_thread.spawn()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerThread {
|
||||
pub fn new(name: String, restarts: u64, worker_pool: WorkerPool) -> Self {
|
||||
Self {
|
||||
name,
|
||||
restarts,
|
||||
worker_pool,
|
||||
graceful_shutdown: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_in_pool(
|
||||
name: String,
|
||||
restarts: u64,
|
||||
worker_pool: WorkerPool,
|
||||
) -> Result<(), FangError> {
|
||||
impl<BQueue> WorkerThread<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
fn spawn(self) -> Result<(), FangError> {
|
||||
info!(
|
||||
"starting a worker thread {}, number of restarts {}",
|
||||
name, restarts
|
||||
self.name, self.restarts
|
||||
);
|
||||
|
||||
let job = WorkerThread::new(name.clone(), restarts, worker_pool.clone());
|
||||
let join_handle = Self::spawn_thread(name.clone(), job)?;
|
||||
worker_pool
|
||||
.thread_join_handles
|
||||
.write()?
|
||||
.insert(name, join_handle);
|
||||
Ok(())
|
||||
}
|
||||
let builder = thread::Builder::new().name(self.name.clone());
|
||||
|
||||
fn spawn_thread(
|
||||
name: String,
|
||||
mut job: WorkerThread,
|
||||
) -> Result<thread::JoinHandle<()>, FangError> {
|
||||
let builder = thread::Builder::new().name(name.clone());
|
||||
builder
|
||||
.spawn(move || {
|
||||
match job.worker_pool.connection_pool.get() {
|
||||
Ok(connection) => {
|
||||
let mut executor = Executor::new(connection);
|
||||
executor.set_shared_state(job.worker_pool.shared_state.clone());
|
||||
let mut worker: Worker<BQueue> = Worker::builder()
|
||||
.queue(self.worker_pool.queue.clone())
|
||||
.task_type(self.worker_pool.task_type.clone())
|
||||
.retention_mode(self.worker_pool.retention_mode.clone())
|
||||
.sleep_params(self.worker_pool.sleep_params.clone())
|
||||
.build();
|
||||
|
||||
if let Some(ref task_type_str) = job.worker_pool.worker_params.task_type {
|
||||
executor.set_task_type(task_type_str.to_owned());
|
||||
}
|
||||
|
||||
if let Some(ref retention_mode) =
|
||||
job.worker_pool.worker_params.retention_mode
|
||||
{
|
||||
executor.set_retention_mode(retention_mode.to_owned());
|
||||
}
|
||||
|
||||
if let Some(ref sleep_params) = job.worker_pool.worker_params.sleep_params {
|
||||
executor.set_sleep_params(sleep_params.clone());
|
||||
}
|
||||
|
||||
// Run executor
|
||||
match executor.run_tasks() {
|
||||
Ok(_) => {
|
||||
job.graceful_shutdown = true;
|
||||
}
|
||||
Err(error) => {
|
||||
error!("Error executing tasks in worker '{}': {:?}", name, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
error!("Failed to get postgres connection: {:?}", error);
|
||||
}
|
||||
// Run worker
|
||||
if let Err(error) = worker.run_tasks() {
|
||||
error!(
|
||||
"Error executing tasks in worker '{}': {:?}",
|
||||
self.name, error
|
||||
);
|
||||
}
|
||||
})
|
||||
.map_err(FangError::from)
|
||||
.map_err(FangError::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkerThread {
|
||||
impl<BQueue> Drop for WorkerThread<BQueue>
|
||||
where
|
||||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if self.graceful_shutdown {
|
||||
return;
|
||||
}
|
||||
self.restarts += 1;
|
||||
|
||||
WorkerThread::spawn_in_pool(
|
||||
self.name.clone(),
|
||||
self.restarts + 1,
|
||||
self.worker_pool.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod task_pool_tests {
|
||||
use super::WorkerParams;
|
||||
use super::WorkerPool;
|
||||
use crate::executor::Error;
|
||||
use crate::executor::Runnable;
|
||||
use crate::queue::Queue;
|
||||
use crate::schema::{fang_tasks, FangTaskState};
|
||||
use crate::typetag;
|
||||
use crate::RetentionMode;
|
||||
use crate::Task;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct MyTask {
|
||||
pub number: u16,
|
||||
pub current_thread_name: String,
|
||||
}
|
||||
|
||||
impl MyTask {
|
||||
pub fn new(number: u16) -> Self {
|
||||
let handle = thread::current();
|
||||
let current_thread_name = handle.name().unwrap().to_string();
|
||||
|
||||
Self {
|
||||
number,
|
||||
current_thread_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for MyTask {
|
||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let new_task = MyTask::new(self.number + 1);
|
||||
|
||||
Queue::push_task_query(connection, &new_task).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"worker_pool_test".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct ShutdownTask {
|
||||
pub number: u16,
|
||||
pub current_thread_name: String,
|
||||
}
|
||||
|
||||
impl ShutdownTask {
|
||||
pub fn new(number: u16) -> Self {
|
||||
let handle = thread::current();
|
||||
let current_thread_name = handle.name().unwrap().to_string();
|
||||
|
||||
Self {
|
||||
number,
|
||||
current_thread_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for ShutdownTask {
|
||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let new_task = MyTask::new(self.number + 1);
|
||||
|
||||
Queue::push_task_query(connection, &new_task).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"shutdown_test".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec<Task> {
|
||||
fang_tasks::table
|
||||
.filter(fang_tasks::task_type.eq(job_type))
|
||||
.get_results::<Task>(conn)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// Following tests ignored because they commit data to the db
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn tasks_are_finished_on_shutdown() {
|
||||
let queue = Queue::new();
|
||||
|
||||
let mut worker_params = WorkerParams::new();
|
||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
||||
|
||||
queue.push_task(&ShutdownTask::new(100)).unwrap();
|
||||
queue.push_task(&ShutdownTask::new(200)).unwrap();
|
||||
|
||||
task_pool.start().unwrap();
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
task_pool.shutdown().unwrap();
|
||||
thread::sleep(Duration::from_secs(5));
|
||||
|
||||
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
|
||||
let in_progress_tasks = tasks
|
||||
.iter()
|
||||
.filter(|task| task.state == FangTaskState::InProgress);
|
||||
let finished_tasks = tasks
|
||||
.iter()
|
||||
.filter(|task| task.state == FangTaskState::Finished);
|
||||
|
||||
// Asserts first two tasks are allowed to finish, the tasks they spawn are not started
|
||||
// though. No tasks should be in progress after a graceful shutdown.
|
||||
assert_eq!(in_progress_tasks.count(), 0);
|
||||
assert_eq!(finished_tasks.count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn tasks_are_split_between_two_threads() {
|
||||
let queue = Queue::new();
|
||||
|
||||
let mut worker_params = WorkerParams::new();
|
||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
||||
|
||||
queue.push_task(&MyTask::new(100)).unwrap();
|
||||
queue.push_task(&MyTask::new(200)).unwrap();
|
||||
|
||||
task_pool.start().unwrap();
|
||||
|
||||
thread::sleep(Duration::from_secs(100));
|
||||
|
||||
let tasks = get_all_tasks(&queue.connection, "worker_pool_test");
|
||||
|
||||
assert!(tasks.len() > 40);
|
||||
|
||||
let test_worker1_tasks = tasks.clone().into_iter().filter(|task| {
|
||||
serde_json::to_string(&task.metadata)
|
||||
.unwrap()
|
||||
.contains("worker_1")
|
||||
});
|
||||
|
||||
let test_worker2_tasks = tasks.into_iter().filter(|task| {
|
||||
serde_json::to_string(&task.metadata)
|
||||
.unwrap()
|
||||
.contains("worker_2")
|
||||
});
|
||||
|
||||
assert!(test_worker1_tasks.count() > 20);
|
||||
assert!(test_worker2_tasks.count() > 20);
|
||||
error!(
|
||||
"Worker {} stopped. Restarting. The number of restarts {}",
|
||||
self.name, self.restarts,
|
||||
);
|
||||
|
||||
self.clone().spawn().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
28
src/lib.rs
28
src/lib.rs
|
@ -1,6 +1,22 @@
|
|||
#![allow(clippy::extra_unused_lifetimes)]
|
||||
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
|
||||
pub enum Scheduled {
|
||||
CronPattern(String),
|
||||
ScheduleOnce(DateTime<Utc>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CronError {
|
||||
#[error(transparent)]
|
||||
LibraryError(#[from] cron::error::Error),
|
||||
#[error("You have to implement method `cron()` in your AsyncRunnable")]
|
||||
TaskNotSchedulableError,
|
||||
#[error("No timestamps match with this cron pattern")]
|
||||
NoTimestampsError,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum RetentionMode {
|
||||
|
@ -46,6 +62,11 @@ impl Default for SleepParams {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FangError {
|
||||
pub description: String,
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
#[cfg(feature = "blocking")]
|
||||
extern crate diesel;
|
||||
|
@ -60,9 +81,16 @@ pub use typetag;
|
|||
#[doc(hidden)]
|
||||
pub extern crate serde;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub extern crate chrono;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use serde_derive::{Deserialize, Serialize};
|
||||
|
||||
pub use chrono::DateTime;
|
||||
pub use chrono::Utc;
|
||||
pub use cron::Schedule;
|
||||
|
||||
#[cfg(feature = "blocking")]
|
||||
pub mod blocking;
|
||||
#[cfg(feature = "blocking")]
|
||||
|
|
Loading…
Reference in a new issue