diff --git a/src/executor.rs b/src/executor.rs index 7a5d82e..c842c6c 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,11 +1,13 @@ use crate::postgres::Postgres; use crate::postgres::Task; -struct Executor {} +pub struct Executor { + pub storage: Postgres, +} #[derive(Debug)] pub struct Error { - description: String, + pub description: String, } #[typetag::serde(tag = "type")] @@ -14,14 +16,24 @@ pub trait Runnable { } impl Executor { - pub fn run(storage: &Postgres, task: &Task) { + pub fn new(storage: Postgres) -> Self { + Self { storage } + } + + pub fn run(&self, task: &Task) { let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); match actual_task.run() { - Ok(()) => storage.finish_task(task).unwrap(), - Err(error) => storage.fail_task(task, error.description).unwrap(), + Ok(()) => self.storage.finish_task(task).unwrap(), + Err(error) => self.storage.fail_task(task, error.description).unwrap(), }; } + + pub fn run_tasks(&self) { + while let Ok(Some(task)) = self.storage.fetch_and_touch() { + self.run(&task) + } + } } #[cfg(test)] @@ -73,53 +85,59 @@ mod executor_tests { fn executes_and_finishes_task() { let job = Job { number: 10 }; - let postgres = Postgres::new(None); - let new_task = NewTask { metadata: serialize(&job), }; - postgres.connection.test_transaction::<(), Error, _>(|| { - let task = postgres.insert(&new_task).unwrap(); + let executor = Executor::new(Postgres::new(None)); - assert_eq!(FangTaskState::New, task.state); + executor + .storage + .connection + .test_transaction::<(), Error, _>(|| { + let task = executor.storage.insert(&new_task).unwrap(); - Executor::run(&postgres, &task); + assert_eq!(FangTaskState::New, task.state); - let found_task = postgres.find_task_by_id(task.id).unwrap(); + executor.run(&task); - assert_eq!(FangTaskState::Finished, found_task.state); + let found_task = executor.storage.find_task_by_id(task.id).unwrap(); - Ok(()) - }); + assert_eq!(FangTaskState::Finished, found_task.state); + + Ok(()) + }); } #[test] fn saves_error_for_failed_task() { let job = FailedJob { number: 10 }; - let postgres = Postgres::new(None); - let new_task = NewTask { metadata: serialize(&job), }; - postgres.connection.test_transaction::<(), Error, _>(|| { - let task = postgres.insert(&new_task).unwrap(); + let executor = Executor::new(Postgres::new(None)); - assert_eq!(FangTaskState::New, task.state); + executor + .storage + .connection + .test_transaction::<(), Error, _>(|| { + let task = executor.storage.insert(&new_task).unwrap(); - Executor::run(&postgres, &task); + assert_eq!(FangTaskState::New, task.state); - let found_task = postgres.find_task_by_id(task.id).unwrap(); + executor.run(&task); - assert_eq!(FangTaskState::Failed, found_task.state); - assert_eq!( - "the number is 10".to_string(), - found_task.error_message.unwrap() - ); + let found_task = executor.storage.find_task_by_id(task.id).unwrap(); - Ok(()) - }); + assert_eq!(FangTaskState::Failed, found_task.state); + assert_eq!( + "the number is 10".to_string(), + found_task.error_message.unwrap() + ); + + Ok(()) + }); } } diff --git a/src/lib.rs b/src/lib.rs index 7bfb0d5..7760514 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ extern crate diesel; pub mod executor; pub mod postgres; +pub mod scheduler; mod schema; #[cfg(test)] diff --git a/src/postgres.rs b/src/postgres.rs index d69fde7..41cc4ad 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -71,6 +71,21 @@ impl Postgres { } } + pub fn fetch_and_touch(&self) -> Result, Error> { + self.connection.transaction::, Error, _>(|| { + let found_task = self.fetch_task(); + + if let None = found_task { + return Ok(None); + } + + match self.start_processing_task(&found_task.unwrap()) { + Ok(updated_task) => Ok(Some(updated_task)), + Err(err) => Err(err), + } + }) + } + pub fn find_task_by_id(&self, id: Uuid) -> Option { match fang_tasks::table .filter(fang_tasks::id.eq(id)) @@ -90,6 +105,15 @@ impl Postgres { .get_result::(&self.connection) } + pub fn start_processing_task(&self, task: &Task) -> Result { + diesel::update(task) + .set(( + fang_tasks::state.eq(FangTaskState::InProgress), + fang_tasks::updated_at.eq(Self::current_time()), + )) + .get_result::(&self.connection) + } + pub fn fail_task(&self, task: &Task, error: String) -> Result { diesel::update(task) .set(( @@ -186,6 +210,34 @@ mod postgres_tests { }); } + #[test] + fn fetch_and_touch_updates_state() { + let postgres = Postgres::new(None); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let _task = insert_new_job(&postgres.connection); + + let updated_task = postgres.fetch_and_touch().unwrap().unwrap(); + + assert_eq!(FangTaskState::InProgress, updated_task.state); + + Ok(()) + }); + } + + #[test] + fn fetch_and_touch_returns_none() { + let postgres = Postgres::new(None); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let task = postgres.fetch_and_touch().unwrap(); + + assert_eq!(None, task); + + Ok(()) + }); + } + // this test is ignored because it commits data to the db #[test] #[ignore] diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..def8a17 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,34 @@ +use crate::executor::Executor; +use crate::postgres::Postgres; +use std::thread; +use std::thread::JoinHandle; + +struct Scheduler { + pub number_of_workers: u16, + pub handles: Option>>, +} + +impl Scheduler { + pub fn new(number_of_workers: u16) -> Self { + Self { + number_of_workers, + handles: None, + } + } + + pub fn start(&mut self) { + let mut handles: Vec> = vec![]; + + for _ in 1..self.number_of_workers { + let handle = thread::spawn(|| { + let postgres = Postgres::new(None); + + Executor::new(postgres).run_tasks() + }); + + handles.push(handle); + } + + self.handles = Some(handles); + } +}