diff --git a/Cargo.toml b/Cargo.toml index 83e9a06..d188d03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,6 @@ uuid = { version = "0.8", features = ["v4"] } chrono = "0.4" serde_json = "1.0" typetag = "0.1" +log = "0.4.0" +env_logger = "0.8.4" serde = { version = "1.0", features = ["derive"] } diff --git a/src/executor.rs b/src/executor.rs index c842c6c..f73d6d0 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,8 +1,14 @@ use crate::postgres::Postgres; use crate::postgres::Task; +use std::thread; +use std::time::Duration; pub struct Executor { pub storage: Postgres, + pub sleep_period: u64, + pub max_sleep_period: u64, + pub min_sleep_period: u64, + pub sleep_step: u64, } #[derive(Debug)] @@ -17,7 +23,13 @@ pub trait Runnable { impl Executor { pub fn new(storage: Postgres) -> Self { - Self { storage } + Self { + storage, + sleep_period: 5, + max_sleep_period: 15, + min_sleep_period: 5, + sleep_step: 5, + } } pub fn run(&self, task: &Task) { @@ -29,11 +41,37 @@ impl Executor { }; } - pub fn run_tasks(&self) { - while let Ok(Some(task)) = self.storage.fetch_and_touch() { - self.run(&task) + pub fn run_tasks(&mut self) { + match self.storage.fetch_and_touch() { + Ok(Some(task)) => { + self.maybe_reset_sleep_period(); + self.run(&task); + } + Ok(None) => { + self.sleep(); + } + + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep(); + } + }; + } + + pub fn maybe_reset_sleep_period(&mut self) { + if self.sleep_period != self.min_sleep_period { + self.sleep_period = self.min_sleep_period; } } + + pub fn sleep(&mut self) { + if self.sleep_period < self.max_sleep_period { + self.sleep_period = self.sleep_period + self.sleep_step; + } + + thread::sleep(Duration::from_secs(self.sleep_period)); + } } #[cfg(test)] diff --git a/src/job_pool.rs b/src/job_pool.rs new file mode 100644 index 0000000..b9611ae --- /dev/null +++ b/src/job_pool.rs @@ -0,0 +1,57 @@ +use crate::executor::Executor; +use crate::postgres::Postgres; +use std::thread; + +struct JobPool { + pub number_of_workers: u16, + pub name: String, +} + +struct JobThread { + pub name: String, +} + +impl JobPool { + pub fn new(number_of_workers: u16, name: String) -> Self { + Self { + number_of_workers, + name, + } + } + + pub fn start(&self) { + for idx in 1..self.number_of_workers { + let name = format!("{}{}", self.name, idx); + + spawn_in_pool(name) + } + } +} + +impl JobThread { + pub fn new(name: String) -> Self { + Self { name } + } +} + +impl Drop for JobThread { + fn drop(&mut self) { + spawn_in_pool(self.name.clone()) + } +} + +fn spawn_in_pool(name: String) { + let mut builder = thread::Builder::new().name(name.clone()); + builder = builder; + + builder + .spawn(move || { + // when _job is dropped, it will be restarted (see Drop trait impl) + let _job = JobThread::new(name); + + let postgres = Postgres::new(None); + + Executor::new(postgres).run_tasks() + }) + .unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index 7760514..ed06f2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,29 +1,10 @@ #[macro_use] extern crate diesel; -// pub trait JobQueue { -// type Job; -// type Error; - -// fn push(job: Self::Job) -> Result<(), Self::Error> { - -// } -// fn pop(job: Self::Job) -> Result<(), Self::Error>; -// } - -// pub trait Storage { -// fn save() -> -// } +#[macro_use] +extern crate log; pub mod executor; +pub mod job_pool; pub mod postgres; -pub mod scheduler; mod schema; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/src/scheduler.rs b/src/scheduler.rs deleted file mode 100644 index def8a17..0000000 --- a/src/scheduler.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::executor::Executor; -use crate::postgres::Postgres; -use std::thread; -use std::thread::JoinHandle; - -struct Scheduler { - pub number_of_workers: u16, - pub handles: Option>>, -} - -impl Scheduler { - pub fn new(number_of_workers: u16) -> Self { - Self { - number_of_workers, - handles: None, - } - } - - pub fn start(&mut self) { - let mut handles: Vec> = vec![]; - - for _ in 1..self.number_of_workers { - let handle = thread::spawn(|| { - let postgres = Postgres::new(None); - - Executor::new(postgres).run_tasks() - }); - - handles.push(handle); - } - - self.handles = Some(handles); - } -}