diff --git a/README.md b/README.md index 763f9bd..5dced93 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,6 @@ Background job processing library for Rust. -Currently, it uses Postgres to store state. But in the future, more backends will be supported. - Note that the README follows the master branch, to see instructions for the latest published version, check [crates.io](https://crates.io/crates/fang). ## Installation @@ -17,7 +15,7 @@ Note that the README follows the master branch, to see instructions for the late ```toml [dependencies] -fang = "0.3.1" +fang = "0.3.2" ``` 2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory](https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql). @@ -183,11 +181,35 @@ worker_params.set_sleep_params(sleep_params); WorkerPool::new_with_params(10, worker_params).start(); ``` -## Potential/future features +## Periodic Tasks + +Fang can add tasks to `fang_tasks` periodically. To use this feature first run [the migration with `fang_periodic_tasks` table](https://github.com/ayrat555/fang/tree/master/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql). + +Usage example: + +```rust +use fang::Scheduler; +use fang::Postgres; + +let postgres = Postgres::new(); + +postgres + .push_periodic_task(&SyncJob::default(), 120) + .unwrap(); + +postgres + .push_periodic_task(&DeliverJob::default(), 60) + .unwrap(); + +Scheduler::start(10, 5); +``` + +In the example above, `push_periodic_task` is used to save the specified task to the `fang_periodic_tasks` table which will be enqueued (saved to `fang_tasks` table) every specied number of seconds. + +`Scheduler::start(10, 5)` starts scheduler. It accepts two parameters: +- Db check period in seconds +- Acceptable error limit in seconds - |current_time - scheduled_time| < error - * Retries - * Scheduled tasks - * Extendable/new backends ## Contributing diff --git a/migrations/2021-06-05-112912_create_fang_tasks/down.sql b/migrations/2021-06-05-112912_create_fang_tasks/down.sql index 3658a90..e8becd4 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/down.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/down.sql @@ -1,4 +1,2 @@ DROP TABLE fang_tasks; DROP TYPE fang_task_state; - -DROP TABLE fang_periodic_tasks; diff --git a/migrations/2021-06-05-112912_create_fang_tasks/up.sql b/migrations/2021-06-05-112912_create_fang_tasks/up.sql index a60140d..19112eb 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/up.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/up.sql @@ -16,15 +16,3 @@ CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata); - -CREATE TABLE fang_periodic_tasks ( - id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), - metadata jsonb NOT NULL, - period_in_seconds INTEGER NOT NULL, - scheduled_at TIMESTAMP WITH TIME ZONE, - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() -); - -CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at); -CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata); diff --git a/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql b/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql new file mode 100644 index 0000000..3b764b4 --- /dev/null +++ b/migrations/2021-07-24-050243_create_fang_periodic_tasks/down.sql @@ -0,0 +1 @@ +DROP TABLE fang_periodic_tasks; diff --git a/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql b/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql new file mode 100644 index 0000000..ba7f461 --- /dev/null +++ b/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql @@ -0,0 +1,11 @@ +CREATE TABLE fang_periodic_tasks ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + metadata jsonb NOT NULL, + period_in_seconds INTEGER NOT NULL, + scheduled_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at); +CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata); diff --git a/src/lib.rs b/src/lib.rs index fe20481..9366a04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub mod worker_pool; pub use executor::*; pub use postgres::*; +pub use scheduler::*; pub use worker_pool::*; #[doc(hidden)] diff --git a/src/scheduler.rs b/src/scheduler.rs index abcfaea..13712a8 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -12,16 +12,13 @@ pub struct Scheduler { impl Drop for Scheduler { fn drop(&mut self) { - Scheduler::start( - self.check_period, - self.error_margin_seconds, - Postgres::new(), - ) + Scheduler::start(self.check_period, self.error_margin_seconds) } } impl Scheduler { - pub fn start(check_period: u64, error_margin_seconds: u64, postgres: Postgres) { + pub fn start(check_period: u64, error_margin_seconds: u64) { + let postgres = Postgres::new(); let builder = thread::Builder::new().name("scheduler".to_string()); builder @@ -114,7 +111,7 @@ mod job_scheduler_tests { let postgres = Postgres::new(); postgres.push_periodic_task(&ScheduledJob {}, 10).unwrap(); - Scheduler::start(1, 2, Postgres::new()); + Scheduler::start(1, 2); let sleep_duration = Duration::from_secs(15); thread::sleep(sleep_duration);