execute different types of tasks in separate workers (#1)

* execute different types of task in separate workers

* add more tests

* pass reference

* add CHANGELOG
This commit is contained in:
Ayrat Badykov 2021-07-03 07:23:05 +03:00 committed by GitHub
parent c601984d37
commit 4b1f537d19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 172 additions and 36 deletions

8
CHANGELOG.md Normal file
View file

@ -0,0 +1,8 @@
## Unreleased
- Execute different types of tasks in separate workers - [#1](https://github.com/ayrat555/fang/pull/1)
## 0.2.0 (2021-06-24)
- The first release on crates.io

View file

@ -7,9 +7,11 @@ CREATE TABLE fang_tasks (
metadata jsonb NOT NULL,
error_message TEXT,
state fang_task_state default 'new' NOT NULL,
task_type VARCHAR default 'common' NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);

View file

@ -7,6 +7,7 @@ use std::time::Duration;
pub struct Executor {
pub storage: Postgres,
pub task_type: Option<String>,
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
@ -24,6 +25,10 @@ where
Self: RefUnwindSafe,
{
fn run(&self) -> Result<(), Error>;
fn task_type(&self) -> String {
"common".to_string()
}
}
impl Executor {
@ -34,9 +39,14 @@ impl Executor {
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
task_type: None,
}
}
pub fn set_task_type(&mut self, task_type: String) {
self.task_type = Some(task_type);
}
pub fn run(&self, task: &Task) {
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
@ -59,7 +69,7 @@ impl Executor {
pub fn run_tasks(&mut self) {
loop {
match self.storage.fetch_and_touch() {
match self.storage.fetch_and_touch(&self.task_type.clone()) {
Ok(Some(task)) => {
self.maybe_reset_sleep_period();
self.run(&task);
@ -147,6 +157,34 @@ mod executor_tests {
}
}
#[derive(Serialize, Deserialize)]
struct JobType1 {}
#[typetag::serde]
impl Runnable for JobType1 {
fn run(&self) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"type1".to_string()
}
}
#[derive(Serialize, Deserialize)]
struct JobType2 {}
#[typetag::serde]
impl Runnable for JobType2 {
fn run(&self) -> Result<(), Error> {
Ok(())
}
fn task_type(&self) -> String {
"type2".to_string()
}
}
pub fn serialize(job: &dyn Runnable) -> serde_json::Value {
serde_json::to_value(job).unwrap()
}
@ -157,6 +195,7 @@ mod executor_tests {
let new_task = NewTask {
metadata: serialize(&job),
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));
@ -179,12 +218,54 @@ mod executor_tests {
});
}
#[test]
#[ignore]
fn executes_task_only_of_specific_type() {
let job1 = JobType1 {};
let job2 = JobType2 {};
let new_task1 = NewTask {
metadata: serialize(&job1),
task_type: "type1".to_string(),
};
let new_task2 = NewTask {
metadata: serialize(&job2),
task_type: "type2".to_string(),
};
let executor = Executor::new(Postgres::new(None));
let task1 = executor.storage.insert(&new_task1).unwrap();
let task2 = executor.storage.insert(&new_task2).unwrap();
assert_eq!(FangTaskState::New, task1.state);
assert_eq!(FangTaskState::New, task2.state);
std::thread::spawn(move || {
let postgres = Postgres::new(None);
let mut executor = Executor::new(postgres);
executor.set_task_type("type1".to_string());
executor.run_tasks();
});
std::thread::sleep(std::time::Duration::from_millis(1000));
let found_task1 = executor.storage.find_task_by_id(task1.id).unwrap();
assert_eq!(FangTaskState::Finished, found_task1.state);
let found_task2 = executor.storage.find_task_by_id(task2.id).unwrap();
assert_eq!(FangTaskState::New, found_task2.state);
}
#[test]
fn saves_error_for_failed_task() {
let job = FailedJob { number: 10 };
let new_task = NewTask {
metadata: serialize(&job),
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));
@ -217,6 +298,7 @@ mod executor_tests {
let new_task = NewTask {
metadata: serialize(&job),
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));

View file

@ -16,6 +16,7 @@ pub struct Task {
pub metadata: serde_json::Value,
pub error_message: Option<String>,
pub state: FangTaskState,
pub task_type: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@ -24,6 +25,7 @@ pub struct Task {
#[table_name = "fang_tasks"]
pub struct NewTask {
pub metadata: serde_json::Value,
pub task_type: String,
}
pub struct Postgres {
@ -40,7 +42,10 @@ impl Postgres {
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
let json_job = serde_json::to_value(job).unwrap();
let new_task = NewTask { metadata: json_job };
let new_task = NewTask {
metadata: json_job,
task_type: job.task_type(),
};
self.insert(&new_task)
}
@ -55,23 +60,16 @@ impl Postgres {
.get_result::<Task>(&self.connection)
}
pub fn fetch_task(&self) -> Option<Task> {
match fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
pub fn fetch_task(&self, task_type: &Option<String>) -> Option<Task> {
match task_type {
None => self.fetch_any_task(),
Some(task_type_str) => self.fetch_task_of_type(task_type_str),
}
}
pub fn fetch_and_touch(&self) -> Result<Option<Task>, Error> {
pub fn fetch_and_touch(&self, task_type: &Option<String>) -> Result<Option<Task>, Error> {
self.connection.transaction::<Option<Task>, Error, _>(|| {
let found_task = self.fetch_task();
let found_task = self.fetch_task(task_type);
if found_task.is_none() {
return Ok(None);
@ -136,6 +134,35 @@ impl Postgres {
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url))
}
fn fetch_any_task(&self) -> Option<Task> {
match fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
}
}
fn fetch_task_of_type(&self, task_type: &String) -> Option<Task> {
match fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
.filter(fang_tasks::task_type.eq(task_type))
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
}
}
}
#[cfg(test)]
@ -159,6 +186,7 @@ mod postgres_tests {
let new_task = NewTask {
metadata: serde_json::json!(true),
task_type: "common".to_string(),
};
let result = postgres
@ -182,7 +210,7 @@ mod postgres_tests {
insert_job(serde_json::json!(false), timestamp2, &postgres.connection);
let found_task = postgres.fetch_task().unwrap();
let found_task = postgres.fetch_task(&None).unwrap();
assert_eq!(found_task.id, task1.id);
@ -229,7 +257,7 @@ mod postgres_tests {
postgres.connection.test_transaction::<(), Error, _>(|| {
let _task = insert_new_job(&postgres.connection);
let updated_task = postgres.fetch_and_touch().unwrap().unwrap();
let updated_task = postgres.fetch_and_touch(&None).unwrap().unwrap();
assert_eq!(FangTaskState::InProgress, updated_task.state);
@ -242,7 +270,7 @@ mod postgres_tests {
let postgres = Postgres::new(None);
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = postgres.fetch_and_touch().unwrap();
let task = postgres.fetch_and_touch(&None).unwrap();
assert_eq!(None, task);
@ -293,7 +321,7 @@ mod postgres_tests {
let postgres = Postgres::new(None);
postgres.connection.transaction::<(), Error, _>(|| {
let found_task = postgres.fetch_task().unwrap();
let found_task = postgres.fetch_task(&None).unwrap();
assert_eq!(found_task.id, task1.id);
@ -305,7 +333,7 @@ mod postgres_tests {
std::thread::sleep(std::time::Duration::from_millis(1000));
let found_task = postgres.fetch_task().unwrap();
let found_task = postgres.fetch_task(&None).unwrap();
assert_eq!(found_task.id, task2.id);
@ -313,7 +341,7 @@ mod postgres_tests {
// returns unlocked record
let found_task = postgres.fetch_task().unwrap();
let found_task = postgres.fetch_task(&None).unwrap();
assert_eq!(found_task.id, task1_id);
}

View file

@ -15,6 +15,7 @@ table! {
use diesel::sql_types::Text;
use diesel::sql_types::Timestamptz;
use diesel::sql_types::Uuid;
use diesel::sql_types::Varchar;
fang_tasks (id) {
@ -22,6 +23,7 @@ table! {
metadata -> Jsonb,
error_message -> Nullable<Text>,
state -> FangTaskStateMapping,
task_type -> Varchar,
created_at -> Timestamptz,
updated_at -> Timestamptz,
}

View file

@ -4,37 +4,45 @@ use std::thread;
pub struct WorkerPool {
pub number_of_workers: u16,
pub name: String,
pub task_type: Option<String>,
}
pub struct WorkerThread {
pub name: String,
pub task_type: Option<String>,
pub restarts: u64,
}
impl WorkerPool {
pub fn new(number_of_workers: u16, name: String) -> Self {
pub fn new(number_of_workers: u16, task_type: Option<String>) -> Self {
Self {
number_of_workers,
name,
task_type,
}
}
pub fn start(&self) {
for idx in 1..self.number_of_workers + 1 {
let name = format!("{}{}", self.name, idx);
WorkerThread::spawn_in_pool(name, 0)
let name = format!(
"worker_{}{}",
self.task_type.clone().unwrap_or("".to_string()),
idx
);
WorkerThread::spawn_in_pool(self.task_type.clone(), name, 0)
}
}
}
impl WorkerThread {
pub fn new(name: String, restarts: u64) -> Self {
Self { name, restarts }
pub fn new(task_type: Option<String>, name: String, restarts: u64) -> Self {
Self {
name,
task_type,
restarts,
}
}
pub fn spawn_in_pool(name: String, restarts: u64) {
pub fn spawn_in_pool(task_type: Option<String>, name: String, restarts: u64) {
let builder = thread::Builder::new().name(name.clone());
info!(
@ -45,11 +53,17 @@ impl WorkerThread {
builder
.spawn(move || {
// when _job is dropped, it will be restarted (see Drop trait impl)
let _job = WorkerThread::new(name, restarts);
let _job = WorkerThread::new(task_type.clone(), name, restarts);
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
let mut executor = Executor::new(postgres);
if let Some(task_type_str) = task_type {
executor.set_task_type(task_type_str);
}
executor.run_tasks();
})
.unwrap();
}
@ -57,7 +71,7 @@ impl WorkerThread {
impl Drop for WorkerThread {
fn drop(&mut self) {
WorkerThread::spawn_in_pool(self.name.clone(), self.restarts + 1)
WorkerThread::spawn_in_pool(self.task_type.clone(), self.name.clone(), self.restarts + 1)
}
}
@ -119,7 +133,7 @@ mod job_pool_tests {
env_logger::init();
let postgres = Postgres::new(None);
let job_pool = WorkerPool::new(2, "test_worker".to_string());
let job_pool = WorkerPool::new(2, None);
postgres.push_task(&MyJob::new(0)).unwrap();
postgres.push_task(&MyJob::new(0)).unwrap();
@ -138,7 +152,7 @@ mod job_pool_tests {
.filter(|job| {
serde_json::to_string(&job.metadata)
.unwrap()
.contains("test_worker1")
.contains("worker_1")
})
.collect();
@ -147,7 +161,7 @@ mod job_pool_tests {
.filter(|job| {
serde_json::to_string(&job.metadata)
.unwrap()
.contains("test_worker2")
.contains("worker_2")
})
.collect();