add job pool

This commit is contained in:
Ayrat Badykov 2021-06-23 08:32:35 +03:00
parent 3de7038e62
commit 3bc32c9c93
No known key found for this signature in database
GPG key ID: 16AE533AB7A3E8C6
5 changed files with 104 additions and 60 deletions

View file

@ -16,4 +16,6 @@ uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4" chrono = "0.4"
serde_json = "1.0" serde_json = "1.0"
typetag = "0.1" typetag = "0.1"
log = "0.4.0"
env_logger = "0.8.4"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View file

@ -1,8 +1,14 @@
use crate::postgres::Postgres; use crate::postgres::Postgres;
use crate::postgres::Task; use crate::postgres::Task;
use std::thread;
use std::time::Duration;
pub struct Executor { pub struct Executor {
pub storage: Postgres, pub storage: Postgres,
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
pub sleep_step: u64,
} }
#[derive(Debug)] #[derive(Debug)]
@ -17,7 +23,13 @@ pub trait Runnable {
impl Executor { impl Executor {
pub fn new(storage: Postgres) -> Self { pub fn new(storage: Postgres) -> Self {
Self { storage } Self {
storage,
sleep_period: 5,
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
}
} }
pub fn run(&self, task: &Task) { pub fn run(&self, task: &Task) {
@ -29,11 +41,37 @@ impl Executor {
}; };
} }
pub fn run_tasks(&self) { pub fn run_tasks(&mut self) {
while let Ok(Some(task)) = self.storage.fetch_and_touch() { match self.storage.fetch_and_touch() {
self.run(&task) Ok(Some(task)) => {
self.maybe_reset_sleep_period();
self.run(&task);
}
Ok(None) => {
self.sleep();
}
Err(error) => {
error!("Failed to fetch a task {:?}", error);
self.sleep();
}
};
}
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
} }
} }
pub fn sleep(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period = self.sleep_period + self.sleep_step;
}
thread::sleep(Duration::from_secs(self.sleep_period));
}
} }
#[cfg(test)] #[cfg(test)]

57
src/job_pool.rs Normal file
View file

@ -0,0 +1,57 @@
use crate::executor::Executor;
use crate::postgres::Postgres;
use std::thread;
struct JobPool {
pub number_of_workers: u16,
pub name: String,
}
struct JobThread {
pub name: String,
}
impl JobPool {
pub fn new(number_of_workers: u16, name: String) -> Self {
Self {
number_of_workers,
name,
}
}
pub fn start(&self) {
for idx in 1..self.number_of_workers {
let name = format!("{}{}", self.name, idx);
spawn_in_pool(name)
}
}
}
impl JobThread {
pub fn new(name: String) -> Self {
Self { name }
}
}
impl Drop for JobThread {
fn drop(&mut self) {
spawn_in_pool(self.name.clone())
}
}
fn spawn_in_pool(name: String) {
let mut builder = thread::Builder::new().name(name.clone());
builder = builder;
builder
.spawn(move || {
// when _job is dropped, it will be restarted (see Drop trait impl)
let _job = JobThread::new(name);
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
})
.unwrap();
}

View file

@ -1,29 +1,10 @@
#[macro_use] #[macro_use]
extern crate diesel; extern crate diesel;
// pub trait JobQueue { #[macro_use]
// type Job; extern crate log;
// type Error;
// fn push(job: Self::Job) -> Result<(), Self::Error> {
// }
// fn pop(job: Self::Job) -> Result<(), Self::Error>;
// }
// pub trait Storage {
// fn save() ->
// }
pub mod executor; pub mod executor;
pub mod job_pool;
pub mod postgres; pub mod postgres;
pub mod scheduler;
mod schema; mod schema;
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}

View file

@ -1,34 +0,0 @@
use crate::executor::Executor;
use crate::postgres::Postgres;
use std::thread;
use std::thread::JoinHandle;
struct Scheduler {
pub number_of_workers: u16,
pub handles: Option<Vec<JoinHandle<()>>>,
}
impl Scheduler {
pub fn new(number_of_workers: u16) -> Self {
Self {
number_of_workers,
handles: None,
}
}
pub fn start(&mut self) {
let mut handles: Vec<JoinHandle<()>> = vec![];
for _ in 1..self.number_of_workers {
let handle = thread::spawn(|| {
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
});
handles.push(handle);
}
self.handles = Some(handles);
}
}