add scheduler

This commit is contained in:
Ayrat Badykov 2021-06-20 11:58:03 +03:00
parent e52c81ddde
commit 3de7038e62
No known key found for this signature in database
GPG key ID: 16AE533AB7A3E8C6
4 changed files with 134 additions and 29 deletions

View file

@ -1,11 +1,13 @@
use crate::postgres::Postgres;
use crate::postgres::Task;
struct Executor {}
pub struct Executor {
pub storage: Postgres,
}
#[derive(Debug)]
pub struct Error {
description: String,
pub description: String,
}
#[typetag::serde(tag = "type")]
@ -14,14 +16,24 @@ pub trait Runnable {
}
impl Executor {
pub fn run(storage: &Postgres, task: &Task) {
pub fn new(storage: Postgres) -> Self {
Self { storage }
}
pub fn run(&self, task: &Task) {
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
match actual_task.run() {
Ok(()) => storage.finish_task(task).unwrap(),
Err(error) => storage.fail_task(task, error.description).unwrap(),
Ok(()) => self.storage.finish_task(task).unwrap(),
Err(error) => self.storage.fail_task(task, error.description).unwrap(),
};
}
pub fn run_tasks(&self) {
while let Ok(Some(task)) = self.storage.fetch_and_touch() {
self.run(&task)
}
}
}
#[cfg(test)]
@ -73,53 +85,59 @@ mod executor_tests {
fn executes_and_finishes_task() {
let job = Job { number: 10 };
let postgres = Postgres::new(None);
let new_task = NewTask {
metadata: serialize(&job),
};
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = postgres.insert(&new_task).unwrap();
let executor = Executor::new(Postgres::new(None));
assert_eq!(FangTaskState::New, task.state);
executor
.storage
.connection
.test_transaction::<(), Error, _>(|| {
let task = executor.storage.insert(&new_task).unwrap();
Executor::run(&postgres, &task);
assert_eq!(FangTaskState::New, task.state);
let found_task = postgres.find_task_by_id(task.id).unwrap();
executor.run(&task);
assert_eq!(FangTaskState::Finished, found_task.state);
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
Ok(())
});
assert_eq!(FangTaskState::Finished, found_task.state);
Ok(())
});
}
#[test]
fn saves_error_for_failed_task() {
let job = FailedJob { number: 10 };
let postgres = Postgres::new(None);
let new_task = NewTask {
metadata: serialize(&job),
};
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = postgres.insert(&new_task).unwrap();
let executor = Executor::new(Postgres::new(None));
assert_eq!(FangTaskState::New, task.state);
executor
.storage
.connection
.test_transaction::<(), Error, _>(|| {
let task = executor.storage.insert(&new_task).unwrap();
Executor::run(&postgres, &task);
assert_eq!(FangTaskState::New, task.state);
let found_task = postgres.find_task_by_id(task.id).unwrap();
executor.run(&task);
assert_eq!(FangTaskState::Failed, found_task.state);
assert_eq!(
"the number is 10".to_string(),
found_task.error_message.unwrap()
);
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
Ok(())
});
assert_eq!(FangTaskState::Failed, found_task.state);
assert_eq!(
"the number is 10".to_string(),
found_task.error_message.unwrap()
);
Ok(())
});
}
}

View file

@ -17,6 +17,7 @@ extern crate diesel;
pub mod executor;
pub mod postgres;
pub mod scheduler;
mod schema;
#[cfg(test)]

View file

@ -71,6 +71,21 @@ impl Postgres {
}
}
pub fn fetch_and_touch(&self) -> Result<Option<Task>, Error> {
self.connection.transaction::<Option<Task>, Error, _>(|| {
let found_task = self.fetch_task();
if let None = found_task {
return Ok(None);
}
match self.start_processing_task(&found_task.unwrap()) {
Ok(updated_task) => Ok(Some(updated_task)),
Err(err) => Err(err),
}
})
}
pub fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
match fang_tasks::table
.filter(fang_tasks::id.eq(id))
@ -90,6 +105,15 @@ impl Postgres {
.get_result::<Task>(&self.connection)
}
pub fn start_processing_task(&self, task: &Task) -> Result<Task, Error> {
diesel::update(task)
.set((
fang_tasks::state.eq(FangTaskState::InProgress),
fang_tasks::updated_at.eq(Self::current_time()),
))
.get_result::<Task>(&self.connection)
}
pub fn fail_task(&self, task: &Task, error: String) -> Result<Task, Error> {
diesel::update(task)
.set((
@ -186,6 +210,34 @@ mod postgres_tests {
});
}
#[test]
fn fetch_and_touch_updates_state() {
let postgres = Postgres::new(None);
postgres.connection.test_transaction::<(), Error, _>(|| {
let _task = insert_new_job(&postgres.connection);
let updated_task = postgres.fetch_and_touch().unwrap().unwrap();
assert_eq!(FangTaskState::InProgress, updated_task.state);
Ok(())
});
}
#[test]
fn fetch_and_touch_returns_none() {
let postgres = Postgres::new(None);
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = postgres.fetch_and_touch().unwrap();
assert_eq!(None, task);
Ok(())
});
}
// this test is ignored because it commits data to the db
#[test]
#[ignore]

34
src/scheduler.rs Normal file
View file

@ -0,0 +1,34 @@
use crate::executor::Executor;
use crate::postgres::Postgres;
use std::thread;
use std::thread::JoinHandle;
struct Scheduler {
pub number_of_workers: u16,
pub handles: Option<Vec<JoinHandle<()>>>,
}
impl Scheduler {
pub fn new(number_of_workers: u16) -> Self {
Self {
number_of_workers,
handles: None,
}
}
pub fn start(&mut self) {
let mut handles: Vec<JoinHandle<()>> = vec![];
for _ in 1..self.number_of_workers {
let handle = thread::spawn(|| {
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
});
handles.push(handle);
}
self.handles = Some(handles);
}
}