Periodic tasks (#5)

* Periodic tasks

* schedule next execution

* add scheduler

* ignore test

* fix clippy

* make start public

* check if the period task already exists

* do not insert task if it's already in the queue

* fix tests
This commit is contained in:
Ayrat Badykov 2021-07-18 22:09:30 +03:00 committed by GitHub
parent df1da87e13
commit 45eb336b8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 483 additions and 11 deletions

View file

@ -1,2 +1,4 @@
DROP TABLE fang_tasks; DROP TABLE fang_tasks;
DROP TYPE fang_task_state; DROP TYPE fang_task_state;
DROP TABLE fang_periodic_tasks;

View file

@ -15,3 +15,16 @@ CREATE TABLE fang_tasks (
CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);
CREATE TABLE fang_periodic_tasks (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
metadata jsonb NOT NULL,
period_in_seconds INTEGER NOT NULL,
scheduled_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);
CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);

View file

@ -10,6 +10,7 @@ mod schema;
pub mod executor; pub mod executor;
pub mod postgres; pub mod postgres;
pub mod scheduler;
pub mod worker_pool; pub mod worker_pool;
pub use executor::*; pub use executor::*;

View file

@ -1,7 +1,10 @@
use crate::executor::Runnable; use crate::executor::Runnable;
use crate::schema::fang_periodic_tasks;
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use chrono::{DateTime, Utc}; use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::result::Error; use diesel::result::Error;
@ -21,6 +24,17 @@ pub struct Task {
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
#[table_name = "fang_periodic_tasks"]
pub struct PeriodicTask {
pub id: Uuid,
pub metadata: serde_json::Value,
pub period_in_seconds: i32,
pub scheduled_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Insertable)] #[derive(Insertable)]
#[table_name = "fang_tasks"] #[table_name = "fang_tasks"]
pub struct NewTask { pub struct NewTask {
@ -28,6 +42,13 @@ pub struct NewTask {
pub task_type: String, pub task_type: String,
} }
#[derive(Insertable)]
#[table_name = "fang_periodic_tasks"]
pub struct NewPeriodicTask {
pub metadata: serde_json::Value,
pub period_in_seconds: i32,
}
pub struct Postgres { pub struct Postgres {
pub connection: PgConnection, pub connection: PgConnection,
} }
@ -54,12 +75,38 @@ impl Postgres {
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> { pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
let json_job = serde_json::to_value(job).unwrap(); let json_job = serde_json::to_value(job).unwrap();
let new_task = NewTask { match self.find_task_by_metadata(&json_job) {
metadata: json_job, Some(task) => Ok(task),
task_type: job.task_type(), None => {
}; let new_task = NewTask {
metadata: json_job.clone(),
task_type: job.task_type(),
};
self.insert(&new_task)
}
}
}
self.insert(&new_task) pub fn push_periodic_task(
&self,
job: &dyn Runnable,
period: i32,
) -> Result<PeriodicTask, Error> {
let json_job = serde_json::to_value(job).unwrap();
match self.find_periodic_task_by_metadata(&json_job) {
Some(task) => Ok(task),
None => {
let new_task = NewPeriodicTask {
metadata: json_job,
period_in_seconds: period,
};
diesel::insert_into(fang_periodic_tasks::table)
.values(new_task)
.get_result::<PeriodicTask>(&self.connection)
}
}
} }
pub fn enqueue_task(job: &dyn Runnable) -> Result<Task, Error> { pub fn enqueue_task(job: &dyn Runnable) -> Result<Task, Error> {
@ -101,6 +148,50 @@ impl Postgres {
.ok() .ok()
} }
pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option<PeriodicTask> {
fang_periodic_tasks::table
.filter(fang_periodic_tasks::id.eq(id))
.first::<PeriodicTask>(&self.connection)
.ok()
}
pub fn fetch_periodic_tasks(&self, error_margin_seconds: i64) -> Option<Vec<PeriodicTask>> {
let current_time = Self::current_time();
let low_limit = current_time - Duration::seconds(error_margin_seconds);
let high_limit = current_time + Duration::seconds(error_margin_seconds);
fang_periodic_tasks::table
.filter(
fang_periodic_tasks::scheduled_at
.gt(low_limit)
.and(fang_periodic_tasks::scheduled_at.lt(high_limit)),
)
.or_filter(fang_periodic_tasks::scheduled_at.is_null())
.load::<PeriodicTask>(&self.connection)
.ok()
}
pub fn schedule_next_task_execution(&self, task: &PeriodicTask) -> Result<PeriodicTask, Error> {
let current_time = Self::current_time();
let scheduled_at = current_time + Duration::seconds(task.period_in_seconds.into());
diesel::update(task)
.set((
fang_periodic_tasks::scheduled_at.eq(scheduled_at),
fang_periodic_tasks::updated_at.eq(current_time),
))
.get_result::<PeriodicTask>(&self.connection)
}
pub fn remove_all_tasks(&self) -> Result<usize, Error> {
diesel::delete(fang_tasks::table).execute(&self.connection)
}
pub fn remove_all_periodic_tasks(&self) -> Result<usize, Error> {
diesel::delete(fang_periodic_tasks::table).execute(&self.connection)
}
pub fn remove_task(&self, id: Uuid) -> Result<usize, Error> { pub fn remove_task(&self, id: Uuid) -> Result<usize, Error> {
let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
@ -172,19 +263,41 @@ impl Postgres {
.get_result::<Task>(&self.connection) .get_result::<Task>(&self.connection)
.ok() .ok()
} }
fn find_periodic_task_by_metadata(&self, metadata: &serde_json::Value) -> Option<PeriodicTask> {
fang_periodic_tasks::table
.filter(fang_periodic_tasks::metadata.eq(metadata))
.first::<PeriodicTask>(&self.connection)
.ok()
}
fn find_task_by_metadata(&self, metadata: &serde_json::Value) -> Option<Task> {
fang_tasks::table
.filter(fang_tasks::metadata.eq(metadata))
.filter(
fang_tasks::state
.eq(FangTaskState::New)
.or(fang_tasks::state.eq(FangTaskState::InProgress)),
)
.first::<Task>(&self.connection)
.ok()
}
} }
#[cfg(test)] #[cfg(test)]
mod postgres_tests { mod postgres_tests {
use super::NewTask; use super::NewTask;
use super::PeriodicTask;
use super::Postgres; use super::Postgres;
use super::Task; use super::Task;
use crate::executor::Error as ExecutorError; use crate::executor::Error as ExecutorError;
use crate::executor::Runnable; use crate::executor::Runnable;
use crate::schema::fang_periodic_tasks;
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use crate::typetag; use crate::typetag;
use crate::{Deserialize, Serialize}; use crate::{Deserialize, Serialize};
use chrono::prelude::*;
use chrono::{DateTime, Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use diesel::connection::Connection; use diesel::connection::Connection;
use diesel::prelude::*; use diesel::prelude::*;
@ -312,6 +425,174 @@ mod postgres_tests {
}); });
} }
#[test]
fn push_task_does_not_insert_the_same_task() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task2 = postgres.push_task(&job).unwrap();
let task1 = postgres.push_task(&job).unwrap();
assert_eq!(task1.id, task2.id);
Ok(())
});
}
#[test]
fn push_periodic_task() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = postgres.push_periodic_task(&job, 60).unwrap();
assert_eq!(task.period_in_seconds, 60);
assert!(postgres.find_periodic_task_by_id(task.id).is_some());
Ok(())
});
}
#[test]
fn push_periodic_task_returns_existing_job() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task1 = postgres.push_periodic_task(&job, 60).unwrap();
let task2 = postgres.push_periodic_task(&job, 60).unwrap();
assert_eq!(task1.id, task2.id);
Ok(())
});
}
#[test]
fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = postgres.push_periodic_task(&job, 60).unwrap();
let schedule_in_future = Utc::now() + Duration::hours(100);
insert_periodic_job(
serde_json::json!(true),
schedule_in_future,
100,
&postgres.connection,
);
let tasks = postgres.fetch_periodic_tasks(100).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].id, task.id);
Ok(())
});
}
#[test]
fn schedule_next_task_execution() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = insert_periodic_job(
serde_json::json!(true),
Utc::now(),
100,
&postgres.connection,
);
let updated_task = postgres.schedule_next_task_execution(&task).unwrap();
let next_schedule = (task.scheduled_at.unwrap()
+ Duration::seconds(task.period_in_seconds.into()))
.round_subsecs(0);
assert_eq!(
next_schedule,
updated_task.scheduled_at.unwrap().round_subsecs(0)
);
Ok(())
});
}
#[test]
fn remove_all_periodic_tasks() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = insert_periodic_job(
serde_json::json!(true),
Utc::now(),
100,
&postgres.connection,
);
let result = postgres.remove_all_periodic_tasks().unwrap();
assert_eq!(1, result);
assert_eq!(None, postgres.find_periodic_task_by_id(task.id));
Ok(())
});
}
#[test]
fn remove_all_tasks() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = insert_job(serde_json::json!(true), Utc::now(), &postgres.connection);
let result = postgres.remove_all_tasks().unwrap();
assert_eq!(1, result);
assert_eq!(None, postgres.find_task_by_id(task.id));
Ok(())
});
}
#[test]
fn fetch_periodic_tasks() {
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let schedule_in_future = Utc::now() + Duration::hours(100);
insert_periodic_job(
serde_json::json!(true),
schedule_in_future,
100,
&postgres.connection,
);
let task = insert_periodic_job(
serde_json::json!(true),
Utc::now(),
100,
&postgres.connection,
);
let tasks = postgres.fetch_periodic_tasks(100).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].id, task.id);
Ok(())
});
}
#[test] #[test]
fn remove_task() { fn remove_task() {
let postgres = Postgres::new(); let postgres = Postgres::new();
@ -351,13 +632,21 @@ mod postgres_tests {
let postgres = Postgres::new(); let postgres = Postgres::new();
let timestamp1 = Utc::now() - Duration::hours(40); let timestamp1 = Utc::now() - Duration::hours(40);
let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection); let task1 = insert_job(
serde_json::json!(Job { number: 12 }),
timestamp1,
&postgres.connection,
);
let task1_id = task1.id; let task1_id = task1.id;
let timestamp2 = Utc::now() - Duration::hours(20); let timestamp2 = Utc::now() - Duration::hours(20);
let task2 = insert_job(serde_json::json!(false), timestamp2, &postgres.connection); let task2 = insert_job(
serde_json::json!(Job { number: 11 }),
timestamp2,
&postgres.connection,
);
let thread = std::thread::spawn(move || { let thread = std::thread::spawn(move || {
let postgres = Postgres::new(); let postgres = Postgres::new();
@ -416,6 +705,22 @@ mod postgres_tests {
.unwrap() .unwrap()
} }
fn insert_periodic_job(
metadata: serde_json::Value,
timestamp: DateTime<Utc>,
period_in_seconds: i32,
connection: &PgConnection,
) -> PeriodicTask {
diesel::insert_into(fang_periodic_tasks::table)
.values(&vec![(
fang_periodic_tasks::metadata.eq(metadata),
fang_periodic_tasks::scheduled_at.eq(timestamp),
fang_periodic_tasks::period_in_seconds.eq(period_in_seconds),
)])
.get_result::<PeriodicTask>(connection)
.unwrap()
}
fn insert_new_job(connection: &PgConnection) -> Task { fn insert_new_job(connection: &PgConnection) -> Task {
diesel::insert_into(fang_tasks::table) diesel::insert_into(fang_tasks::table)
.values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)]) .values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)])

133
src/scheduler.rs Normal file
View file

@ -0,0 +1,133 @@
use crate::executor::Runnable;
use crate::postgres::PeriodicTask;
use crate::postgres::Postgres;
use std::thread;
use std::time::Duration;
pub struct Scheduler {
pub check_period: u64,
pub error_margin_seconds: u64,
pub postgres: Postgres,
}
impl Drop for Scheduler {
fn drop(&mut self) {
Scheduler::start(
self.check_period,
self.error_margin_seconds,
Postgres::new(),
)
}
}
impl Scheduler {
pub fn start(check_period: u64, error_margin_seconds: u64, postgres: Postgres) {
let builder = thread::Builder::new().name("scheduler".to_string());
builder
.spawn(move || {
let scheduler = Self::new(check_period, error_margin_seconds, postgres);
scheduler.schedule_loop();
})
.unwrap();
}
pub fn new(check_period: u64, error_margin_seconds: u64, postgres: Postgres) -> Self {
Self {
check_period,
postgres,
error_margin_seconds,
}
}
pub fn schedule_loop(&self) {
let sleep_duration = Duration::from_secs(self.check_period);
loop {
self.schedule();
thread::sleep(sleep_duration);
}
}
pub fn schedule(&self) {
if let Some(tasks) = self
.postgres
.fetch_periodic_tasks(self.error_margin_seconds as i64)
{
for task in tasks {
self.process_task(task);
}
};
}
fn process_task(&self, task: PeriodicTask) {
match task.scheduled_at {
None => {
self.postgres.schedule_next_task_execution(&task).unwrap();
}
Some(_) => {
let actual_task: Box<dyn Runnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
self.postgres.push_task(&(*actual_task)).unwrap();
self.postgres.schedule_next_task_execution(&task).unwrap();
}
}
}
}
#[cfg(test)]
mod job_scheduler_tests {
use super::Scheduler;
use crate::executor::Error;
use crate::executor::Runnable;
use crate::postgres::Postgres;
use crate::postgres::Task;
use crate::schema::fang_tasks;
use crate::typetag;
use crate::{Deserialize, Serialize};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use std::thread;
use std::time::Duration;
#[derive(Serialize, Deserialize)]
struct ScheduledJob {}
#[typetag::serde]
impl Runnable for ScheduledJob {
fn run(&self) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"schedule".to_string()
}
}
#[test]
#[ignore]
fn schedules_jobs() {
let postgres = Postgres::new();
postgres.push_periodic_task(&ScheduledJob {}, 10).unwrap();
Scheduler::start(1, 2, Postgres::new());
let sleep_duration = Duration::from_secs(15);
thread::sleep(sleep_duration);
let tasks = get_all_tasks(&postgres.connection);
assert_eq!(1, tasks.len());
}
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
fang_tasks::table
.filter(fang_tasks::task_type.eq("schedule"))
.get_results::<Task>(conn)
.unwrap()
}
}

View file

@ -28,3 +28,14 @@ table! {
updated_at -> Timestamptz, updated_at -> Timestamptz,
} }
} }
table! {
fang_periodic_tasks (id) {
id -> Uuid,
metadata -> Jsonb,
period_in_seconds -> Int4,
scheduled_at -> Nullable<Timestamptz>,
created_at -> Timestamptz,
updated_at -> Timestamptz,
}
}

View file

@ -170,7 +170,10 @@ mod job_pool_tests {
} }
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> { fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
fang_tasks::table.get_results::<Task>(conn).unwrap() fang_tasks::table
.filter(fang_tasks::task_type.eq("worker_pool_test"))
.get_results::<Task>(conn)
.unwrap()
} }
#[typetag::serde] #[typetag::serde]
@ -186,6 +189,10 @@ mod job_pool_tests {
Ok(()) Ok(())
} }
fn task_type(&self) -> String {
"worker_pool_test".to_string()
}
} }
// this test is ignored because it commits data to the db // this test is ignored because it commits data to the db
@ -200,8 +207,8 @@ mod job_pool_tests {
worker_params.set_retention_mode(RetentionMode::KeepAll); worker_params.set_retention_mode(RetentionMode::KeepAll);
let job_pool = WorkerPool::new_with_params(2, worker_params); let job_pool = WorkerPool::new_with_params(2, worker_params);
postgres.push_task(&MyJob::new(0)).unwrap(); postgres.push_task(&MyJob::new(100)).unwrap();
postgres.push_task(&MyJob::new(0)).unwrap(); postgres.push_task(&MyJob::new(200)).unwrap();
job_pool.start(); job_pool.start();