From d293e31ac814003b272b060d26ebd1f939ad2ef7 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Wed, 23 Jun 2021 13:48:03 +0300 Subject: [PATCH] run tasks in threads --- .../up.sql | 1 + src/executor.rs | 28 ++-- src/job_pool.rs | 57 ------- src/lib.rs | 2 +- src/postgres.rs | 53 +++++- src/schema.rs | 2 +- src/worker_pool.rs | 158 ++++++++++++++++++ 7 files changed, 228 insertions(+), 73 deletions(-) delete mode 100644 src/job_pool.rs create mode 100644 src/worker_pool.rs diff --git a/migrations/2021-06-05-112912_create_fang_tasks/up.sql b/migrations/2021-06-05-112912_create_fang_tasks/up.sql index d83f79a..7969f0e 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/up.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/up.sql @@ -11,4 +11,5 @@ CREATE TABLE fang_tasks ( updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); +CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); diff --git a/src/executor.rs b/src/executor.rs index f73d6d0..550df62 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -42,21 +42,23 @@ impl Executor { } pub fn run_tasks(&mut self) { - match self.storage.fetch_and_touch() { - Ok(Some(task)) => { - self.maybe_reset_sleep_period(); - self.run(&task); - } - Ok(None) => { - self.sleep(); - } + loop { + match self.storage.fetch_and_touch() { + Ok(Some(task)) => { + self.maybe_reset_sleep_period(); + self.run(&task); + } + Ok(None) => { + self.sleep(); + } - Err(error) => { - error!("Failed to fetch a task {:?}", error); + Err(error) => { + error!("Failed to fetch a task {:?}", error); - self.sleep(); - } - }; + self.sleep(); + } + }; + } } pub fn maybe_reset_sleep_period(&mut self) { diff --git a/src/job_pool.rs b/src/job_pool.rs deleted file mode 100644 index b9611ae..0000000 --- a/src/job_pool.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::executor::Executor; -use crate::postgres::Postgres; -use std::thread; - -struct JobPool { - pub number_of_workers: u16, - pub name: String, -} - -struct JobThread { - pub name: String, -} - -impl JobPool { - pub fn new(number_of_workers: u16, name: String) -> Self { - Self { - number_of_workers, - name, - } - } - - pub fn start(&self) { - for idx in 1..self.number_of_workers { - let name = format!("{}{}", self.name, idx); - - spawn_in_pool(name) - } - } -} - -impl JobThread { - pub fn new(name: String) -> Self { - Self { name } - } -} - -impl Drop for JobThread { - fn drop(&mut self) { - spawn_in_pool(self.name.clone()) - } -} - -fn spawn_in_pool(name: String) { - let mut builder = thread::Builder::new().name(name.clone()); - builder = builder; - - builder - .spawn(move || { - // when _job is dropped, it will be restarted (see Drop trait impl) - let _job = JobThread::new(name); - - let postgres = Postgres::new(None); - - Executor::new(postgres).run_tasks() - }) - .unwrap(); -} diff --git a/src/lib.rs b/src/lib.rs index ed06f2a..56bcbce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,6 @@ extern crate diesel; extern crate log; pub mod executor; -pub mod job_pool; pub mod postgres; mod schema; +pub mod worker_pool; diff --git a/src/postgres.rs b/src/postgres.rs index 41cc4ad..af62601 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,3 +1,4 @@ +use crate::executor::Runnable; use crate::schema::fang_tasks; use crate::schema::FangTaskState; use chrono::{DateTime, Utc}; @@ -8,7 +9,7 @@ use dotenv::dotenv; use std::env; use uuid::Uuid; -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq)] +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] #[table_name = "fang_tasks"] pub struct Task { pub id: Uuid, @@ -52,6 +53,14 @@ impl Postgres { } } + pub fn push_task(&self, job: &dyn Runnable) -> Result { + let json_job = serde_json::to_value(job).unwrap(); + + let new_task = NewTask { metadata: json_job }; + + self.insert(&new_task) + } + pub fn insert(&self, params: &NewTask) -> Result { diesel::insert_into(fang_tasks::table) .values(params) @@ -62,6 +71,7 @@ impl Postgres { match fang_tasks::table .order(fang_tasks::created_at.asc()) .limit(1) + .filter(fang_tasks::state.eq(FangTaskState::New)) .for_update() .skip_locked() .get_result::(&self.connection) @@ -134,12 +144,15 @@ mod postgres_tests { use super::NewTask; use super::Postgres; use super::Task; + use crate::executor::Error as ExecutorError; + use crate::executor::Runnable; use crate::schema::fang_tasks; use crate::schema::FangTaskState; use chrono::{DateTime, Duration, Utc}; use diesel::connection::Connection; use diesel::prelude::*; use diesel::result::Error; + use serde::{Deserialize, Serialize}; #[test] fn insert_inserts_task() { @@ -238,6 +251,30 @@ mod postgres_tests { }); } + #[test] + fn push_task_serializes_and_inserts_task() { + let postgres = Postgres::new(None); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let job = Job { number: 10 }; + let task = postgres.push_task(&job).unwrap(); + + let mut m = serde_json::value::Map::new(); + m.insert( + "number".to_string(), + serde_json::value::Value::Number(10.into()), + ); + m.insert( + "type".to_string(), + serde_json::value::Value::String("Job".to_string()), + ); + + assert_eq!(task.metadata, serde_json::value::Value::Object(m)); + + Ok(()) + }); + } + // this test is ignored because it commits data to the db #[test] #[ignore] @@ -282,6 +319,20 @@ mod postgres_tests { assert_eq!(found_task.id, task1_id); } + #[derive(Serialize, Deserialize)] + struct Job { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for Job { + fn run(&self) -> Result<(), ExecutorError> { + println!("the number is {}", self.number); + + Ok(()) + } + } + fn insert_job( metadata: serde_json::Value, timestamp: DateTime, diff --git a/src/schema.rs b/src/schema.rs index 1dbd236..7b5d638 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,6 +1,6 @@ use diesel_derive_enum::DbEnum; -#[derive(DbEnum, Debug, Eq, PartialEq)] +#[derive(DbEnum, Debug, Eq, PartialEq, Clone)] pub enum FangTaskState { New, InProgress, diff --git a/src/worker_pool.rs b/src/worker_pool.rs new file mode 100644 index 0000000..55b0b99 --- /dev/null +++ b/src/worker_pool.rs @@ -0,0 +1,158 @@ +use crate::executor::Executor; +use crate::postgres::Postgres; +use std::thread; + +struct WorkerPool { + pub number_of_workers: u16, + pub name: String, +} + +struct WorkerThread { + pub name: String, + pub restarts: u64, +} + +impl WorkerPool { + pub fn new(number_of_workers: u16, name: String) -> Self { + Self { + number_of_workers, + name, + } + } + + pub fn start(&self) { + for idx in 1..self.number_of_workers + 1 { + let name = format!("{}{}", self.name, idx); + + WorkerThread::spawn_in_pool(name, 0) + } + } +} + +impl WorkerThread { + pub fn new(name: String, restarts: u64) -> Self { + Self { name, restarts } + } + + pub fn spawn_in_pool(name: String, restarts: u64) { + let mut builder = thread::Builder::new().name(name.clone()); + builder = builder; + + info!( + "starting a worker thread {}, number of restarts {}", + name, restarts + ); + + builder + .spawn(move || { + // when _job is dropped, it will be restarted (see Drop trait impl) + let _job = WorkerThread::new(name, restarts); + + let postgres = Postgres::new(None); + + Executor::new(postgres).run_tasks() + }) + .unwrap(); + } +} + +impl Drop for WorkerThread { + fn drop(&mut self) { + WorkerThread::spawn_in_pool(self.name.clone(), self.restarts + 1) + } +} + +#[cfg(test)] +mod job_pool_tests { + use super::WorkerPool; + use crate::executor::Error; + use crate::executor::Runnable; + use crate::postgres::Postgres; + use crate::postgres::Task; + use crate::schema::fang_tasks; + use diesel::pg::PgConnection; + use diesel::prelude::*; + use serde::{Deserialize, Serialize}; + use std::thread; + use std::time::Duration; + + #[derive(Serialize, Deserialize)] + struct MyJob { + pub number: u16, + pub current_thread_name: String, + } + + impl MyJob { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + + Self { + number, + current_thread_name, + } + } + } + + fn get_all_tasks(conn: &PgConnection) -> Vec { + fang_tasks::table.get_results::(conn).unwrap() + } + + #[typetag::serde] + impl Runnable for MyJob { + fn run(&self) -> Result<(), Error> { + let postgres = Postgres::new(None); + + thread::sleep(Duration::from_secs(3)); + + let new_job = MyJob::new(self.number + 1); + + postgres.push_task(&new_job).unwrap(); + + Ok(()) + } + } + + // this test is ignored because it commits data to the db + #[test] + #[ignore] + fn tasks_are_split_between_two_threads() { + env_logger::init(); + + let postgres = Postgres::new(None); + let job_pool = WorkerPool::new(2, "test_worker".to_string()); + + postgres.push_task(&MyJob::new(0)).unwrap(); + postgres.push_task(&MyJob::new(0)).unwrap(); + + job_pool.start(); + + thread::sleep(Duration::from_secs(100)); + + let tasks = get_all_tasks(&postgres.connection); + + assert!(tasks.len() > 40); + + let test_worker1_jobs: Vec = tasks + .clone() + .into_iter() + .filter(|job| { + serde_json::to_string(&job.metadata) + .unwrap() + .contains("test_worker1") + }) + .collect(); + + let test_worker2_jobs: Vec = tasks + .into_iter() + .filter(|job| { + serde_json::to_string(&job.metadata) + .unwrap() + .contains("test_worker2") + }) + .collect(); + + assert!(test_worker1_jobs.len() > 20); + assert!(test_worker2_jobs.len() > 20); + } +}