From 4b1f537d192950b959aaea08e6b4aac3d1d15f17 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Sat, 3 Jul 2021 07:23:05 +0300 Subject: [PATCH] execute different types of tasks in separate workers (#1) * execute different types of task in separate workers * add more tests * pass reference * add CHANGELOG --- CHANGELOG.md | 8 ++ .../up.sql | 2 + src/executor.rs | 84 ++++++++++++++++++- src/postgres.rs | 68 ++++++++++----- src/schema.rs | 2 + src/worker_pool.rs | 44 ++++++---- 6 files changed, 172 insertions(+), 36 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..723e79e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,8 @@ +## Unreleased + +- Execute different types of tasks in separate workers - [#1](https://github.com/ayrat555/fang/pull/1) + + +## 0.2.0 (2021-06-24) + +- The first release on crates.io diff --git a/migrations/2021-06-05-112912_create_fang_tasks/up.sql b/migrations/2021-06-05-112912_create_fang_tasks/up.sql index 7969f0e..075415a 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/up.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/up.sql @@ -7,9 +7,11 @@ CREATE TABLE fang_tasks ( metadata jsonb NOT NULL, error_message TEXT, state fang_task_state default 'new' NOT NULL, + task_type VARCHAR default 'common' NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); CREATE INDEX fang_tasks_state_index ON fang_tasks(state); +CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); diff --git a/src/executor.rs b/src/executor.rs index c3d77b0..29267af 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -7,6 +7,7 @@ use std::time::Duration; pub struct Executor { pub storage: Postgres, + pub task_type: Option, pub sleep_period: u64, pub max_sleep_period: u64, pub min_sleep_period: u64, @@ -24,6 +25,10 @@ where Self: RefUnwindSafe, { fn run(&self) -> Result<(), Error>; + + fn task_type(&self) -> String { + "common".to_string() + } } impl Executor { @@ -34,9 +39,14 @@ impl Executor { max_sleep_period: 15, min_sleep_period: 5, sleep_step: 5, + task_type: None, } } + pub fn set_task_type(&mut self, task_type: String) { + self.task_type = Some(task_type); + } + pub fn run(&self, task: &Task) { let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); @@ -59,7 +69,7 @@ impl Executor { pub fn run_tasks(&mut self) { loop { - match self.storage.fetch_and_touch() { + match self.storage.fetch_and_touch(&self.task_type.clone()) { Ok(Some(task)) => { self.maybe_reset_sleep_period(); self.run(&task); @@ -147,6 +157,34 @@ mod executor_tests { } } + #[derive(Serialize, Deserialize)] + struct JobType1 {} + + #[typetag::serde] + impl Runnable for JobType1 { + fn run(&self) -> Result<(), Error> { + Ok(()) + } + + fn task_type(&self) -> String { + "type1".to_string() + } + } + + #[derive(Serialize, Deserialize)] + struct JobType2 {} + + #[typetag::serde] + impl Runnable for JobType2 { + fn run(&self) -> Result<(), Error> { + Ok(()) + } + + fn task_type(&self) -> String { + "type2".to_string() + } + } + pub fn serialize(job: &dyn Runnable) -> serde_json::Value { serde_json::to_value(job).unwrap() } @@ -157,6 +195,7 @@ mod executor_tests { let new_task = NewTask { metadata: serialize(&job), + task_type: "common".to_string(), }; let executor = Executor::new(Postgres::new(None)); @@ -179,12 +218,54 @@ mod executor_tests { }); } + #[test] + #[ignore] + fn executes_task_only_of_specific_type() { + let job1 = JobType1 {}; + let job2 = JobType2 {}; + + let new_task1 = NewTask { + metadata: serialize(&job1), + task_type: "type1".to_string(), + }; + + let new_task2 = NewTask { + metadata: serialize(&job2), + task_type: "type2".to_string(), + }; + + let executor = Executor::new(Postgres::new(None)); + + let task1 = executor.storage.insert(&new_task1).unwrap(); + let task2 = executor.storage.insert(&new_task2).unwrap(); + + assert_eq!(FangTaskState::New, task1.state); + assert_eq!(FangTaskState::New, task2.state); + + std::thread::spawn(move || { + let postgres = Postgres::new(None); + let mut executor = Executor::new(postgres); + executor.set_task_type("type1".to_string()); + + executor.run_tasks(); + }); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + + let found_task1 = executor.storage.find_task_by_id(task1.id).unwrap(); + assert_eq!(FangTaskState::Finished, found_task1.state); + + let found_task2 = executor.storage.find_task_by_id(task2.id).unwrap(); + assert_eq!(FangTaskState::New, found_task2.state); + } + #[test] fn saves_error_for_failed_task() { let job = FailedJob { number: 10 }; let new_task = NewTask { metadata: serialize(&job), + task_type: "common".to_string(), }; let executor = Executor::new(Postgres::new(None)); @@ -217,6 +298,7 @@ mod executor_tests { let new_task = NewTask { metadata: serialize(&job), + task_type: "common".to_string(), }; let executor = Executor::new(Postgres::new(None)); diff --git a/src/postgres.rs b/src/postgres.rs index 5b06e49..b2196a6 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -16,6 +16,7 @@ pub struct Task { pub metadata: serde_json::Value, pub error_message: Option, pub state: FangTaskState, + pub task_type: String, pub created_at: DateTime, pub updated_at: DateTime, } @@ -24,6 +25,7 @@ pub struct Task { #[table_name = "fang_tasks"] pub struct NewTask { pub metadata: serde_json::Value, + pub task_type: String, } pub struct Postgres { @@ -40,7 +42,10 @@ impl Postgres { pub fn push_task(&self, job: &dyn Runnable) -> Result { let json_job = serde_json::to_value(job).unwrap(); - let new_task = NewTask { metadata: json_job }; + let new_task = NewTask { + metadata: json_job, + task_type: job.task_type(), + }; self.insert(&new_task) } @@ -55,23 +60,16 @@ impl Postgres { .get_result::(&self.connection) } - pub fn fetch_task(&self) -> Option { - match fang_tasks::table - .order(fang_tasks::created_at.asc()) - .limit(1) - .filter(fang_tasks::state.eq(FangTaskState::New)) - .for_update() - .skip_locked() - .get_result::(&self.connection) - { - Ok(record) => Some(record), - _ => None, + pub fn fetch_task(&self, task_type: &Option) -> Option { + match task_type { + None => self.fetch_any_task(), + Some(task_type_str) => self.fetch_task_of_type(task_type_str), } } - pub fn fetch_and_touch(&self) -> Result, Error> { + pub fn fetch_and_touch(&self, task_type: &Option) -> Result, Error> { self.connection.transaction::, Error, _>(|| { - let found_task = self.fetch_task(); + let found_task = self.fetch_task(task_type); if found_task.is_none() { return Ok(None); @@ -136,6 +134,35 @@ impl Postgres { PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)) } + + fn fetch_any_task(&self) -> Option { + match fang_tasks::table + .order(fang_tasks::created_at.asc()) + .limit(1) + .filter(fang_tasks::state.eq(FangTaskState::New)) + .for_update() + .skip_locked() + .get_result::(&self.connection) + { + Ok(record) => Some(record), + _ => None, + } + } + + fn fetch_task_of_type(&self, task_type: &String) -> Option { + match fang_tasks::table + .order(fang_tasks::created_at.asc()) + .limit(1) + .filter(fang_tasks::state.eq(FangTaskState::New)) + .filter(fang_tasks::task_type.eq(task_type)) + .for_update() + .skip_locked() + .get_result::(&self.connection) + { + Ok(record) => Some(record), + _ => None, + } + } } #[cfg(test)] @@ -159,6 +186,7 @@ mod postgres_tests { let new_task = NewTask { metadata: serde_json::json!(true), + task_type: "common".to_string(), }; let result = postgres @@ -182,7 +210,7 @@ mod postgres_tests { insert_job(serde_json::json!(false), timestamp2, &postgres.connection); - let found_task = postgres.fetch_task().unwrap(); + let found_task = postgres.fetch_task(&None).unwrap(); assert_eq!(found_task.id, task1.id); @@ -229,7 +257,7 @@ mod postgres_tests { postgres.connection.test_transaction::<(), Error, _>(|| { let _task = insert_new_job(&postgres.connection); - let updated_task = postgres.fetch_and_touch().unwrap().unwrap(); + let updated_task = postgres.fetch_and_touch(&None).unwrap().unwrap(); assert_eq!(FangTaskState::InProgress, updated_task.state); @@ -242,7 +270,7 @@ mod postgres_tests { let postgres = Postgres::new(None); postgres.connection.test_transaction::<(), Error, _>(|| { - let task = postgres.fetch_and_touch().unwrap(); + let task = postgres.fetch_and_touch(&None).unwrap(); assert_eq!(None, task); @@ -293,7 +321,7 @@ mod postgres_tests { let postgres = Postgres::new(None); postgres.connection.transaction::<(), Error, _>(|| { - let found_task = postgres.fetch_task().unwrap(); + let found_task = postgres.fetch_task(&None).unwrap(); assert_eq!(found_task.id, task1.id); @@ -305,7 +333,7 @@ mod postgres_tests { std::thread::sleep(std::time::Duration::from_millis(1000)); - let found_task = postgres.fetch_task().unwrap(); + let found_task = postgres.fetch_task(&None).unwrap(); assert_eq!(found_task.id, task2.id); @@ -313,7 +341,7 @@ mod postgres_tests { // returns unlocked record - let found_task = postgres.fetch_task().unwrap(); + let found_task = postgres.fetch_task(&None).unwrap(); assert_eq!(found_task.id, task1_id); } diff --git a/src/schema.rs b/src/schema.rs index 7b5d638..5a20286 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -15,6 +15,7 @@ table! { use diesel::sql_types::Text; use diesel::sql_types::Timestamptz; use diesel::sql_types::Uuid; + use diesel::sql_types::Varchar; fang_tasks (id) { @@ -22,6 +23,7 @@ table! { metadata -> Jsonb, error_message -> Nullable, state -> FangTaskStateMapping, + task_type -> Varchar, created_at -> Timestamptz, updated_at -> Timestamptz, } diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 25b9c04..8a68adb 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -4,37 +4,45 @@ use std::thread; pub struct WorkerPool { pub number_of_workers: u16, - pub name: String, + pub task_type: Option, } pub struct WorkerThread { pub name: String, + pub task_type: Option, pub restarts: u64, } impl WorkerPool { - pub fn new(number_of_workers: u16, name: String) -> Self { + pub fn new(number_of_workers: u16, task_type: Option) -> Self { Self { number_of_workers, - name, + task_type, } } pub fn start(&self) { for idx in 1..self.number_of_workers + 1 { - let name = format!("{}{}", self.name, idx); - - WorkerThread::spawn_in_pool(name, 0) + let name = format!( + "worker_{}{}", + self.task_type.clone().unwrap_or("".to_string()), + idx + ); + WorkerThread::spawn_in_pool(self.task_type.clone(), name, 0) } } } impl WorkerThread { - pub fn new(name: String, restarts: u64) -> Self { - Self { name, restarts } + pub fn new(task_type: Option, name: String, restarts: u64) -> Self { + Self { + name, + task_type, + restarts, + } } - pub fn spawn_in_pool(name: String, restarts: u64) { + pub fn spawn_in_pool(task_type: Option, name: String, restarts: u64) { let builder = thread::Builder::new().name(name.clone()); info!( @@ -45,11 +53,17 @@ impl WorkerThread { builder .spawn(move || { // when _job is dropped, it will be restarted (see Drop trait impl) - let _job = WorkerThread::new(name, restarts); + let _job = WorkerThread::new(task_type.clone(), name, restarts); let postgres = Postgres::new(None); - Executor::new(postgres).run_tasks() + let mut executor = Executor::new(postgres); + + if let Some(task_type_str) = task_type { + executor.set_task_type(task_type_str); + } + + executor.run_tasks(); }) .unwrap(); } @@ -57,7 +71,7 @@ impl WorkerThread { impl Drop for WorkerThread { fn drop(&mut self) { - WorkerThread::spawn_in_pool(self.name.clone(), self.restarts + 1) + WorkerThread::spawn_in_pool(self.task_type.clone(), self.name.clone(), self.restarts + 1) } } @@ -119,7 +133,7 @@ mod job_pool_tests { env_logger::init(); let postgres = Postgres::new(None); - let job_pool = WorkerPool::new(2, "test_worker".to_string()); + let job_pool = WorkerPool::new(2, None); postgres.push_task(&MyJob::new(0)).unwrap(); postgres.push_task(&MyJob::new(0)).unwrap(); @@ -138,7 +152,7 @@ mod job_pool_tests { .filter(|job| { serde_json::to_string(&job.metadata) .unwrap() - .contains("test_worker1") + .contains("worker_1") }) .collect(); @@ -147,7 +161,7 @@ mod job_pool_tests { .filter(|job| { serde_json::to_string(&job.metadata) .unwrap() - .contains("test_worker2") + .contains("worker_2") }) .collect();