run tasks in threads

This commit is contained in:
Ayrat Badykov 2021-06-23 13:48:03 +03:00
parent 3bc32c9c93
commit d293e31ac8
No known key found for this signature in database
GPG key ID: 16AE533AB7A3E8C6
7 changed files with 228 additions and 73 deletions

View file

@ -11,4 +11,5 @@ CREATE TABLE fang_tasks (
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);

View file

@ -42,21 +42,23 @@ impl Executor {
}
pub fn run_tasks(&mut self) {
match self.storage.fetch_and_touch() {
Ok(Some(task)) => {
self.maybe_reset_sleep_period();
self.run(&task);
}
Ok(None) => {
self.sleep();
}
loop {
match self.storage.fetch_and_touch() {
Ok(Some(task)) => {
self.maybe_reset_sleep_period();
self.run(&task);
}
Ok(None) => {
self.sleep();
}
Err(error) => {
error!("Failed to fetch a task {:?}", error);
Err(error) => {
error!("Failed to fetch a task {:?}", error);
self.sleep();
}
};
self.sleep();
}
};
}
}
pub fn maybe_reset_sleep_period(&mut self) {

View file

@ -1,57 +0,0 @@
use crate::executor::Executor;
use crate::postgres::Postgres;
use std::thread;
struct JobPool {
pub number_of_workers: u16,
pub name: String,
}
struct JobThread {
pub name: String,
}
impl JobPool {
pub fn new(number_of_workers: u16, name: String) -> Self {
Self {
number_of_workers,
name,
}
}
pub fn start(&self) {
for idx in 1..self.number_of_workers {
let name = format!("{}{}", self.name, idx);
spawn_in_pool(name)
}
}
}
impl JobThread {
pub fn new(name: String) -> Self {
Self { name }
}
}
impl Drop for JobThread {
fn drop(&mut self) {
spawn_in_pool(self.name.clone())
}
}
fn spawn_in_pool(name: String) {
let mut builder = thread::Builder::new().name(name.clone());
builder = builder;
builder
.spawn(move || {
// when _job is dropped, it will be restarted (see Drop trait impl)
let _job = JobThread::new(name);
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
})
.unwrap();
}

View file

@ -5,6 +5,6 @@ extern crate diesel;
extern crate log;
pub mod executor;
pub mod job_pool;
pub mod postgres;
mod schema;
pub mod worker_pool;

View file

@ -1,3 +1,4 @@
use crate::executor::Runnable;
use crate::schema::fang_tasks;
use crate::schema::FangTaskState;
use chrono::{DateTime, Utc};
@ -8,7 +9,7 @@ use dotenv::dotenv;
use std::env;
use uuid::Uuid;
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq)]
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
#[table_name = "fang_tasks"]
pub struct Task {
pub id: Uuid,
@ -52,6 +53,14 @@ impl Postgres {
}
}
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
let json_job = serde_json::to_value(job).unwrap();
let new_task = NewTask { metadata: json_job };
self.insert(&new_task)
}
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
diesel::insert_into(fang_tasks::table)
.values(params)
@ -62,6 +71,7 @@ impl Postgres {
match fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
@ -134,12 +144,15 @@ mod postgres_tests {
use super::NewTask;
use super::Postgres;
use super::Task;
use crate::executor::Error as ExecutorError;
use crate::executor::Runnable;
use crate::schema::fang_tasks;
use crate::schema::FangTaskState;
use chrono::{DateTime, Duration, Utc};
use diesel::connection::Connection;
use diesel::prelude::*;
use diesel::result::Error;
use serde::{Deserialize, Serialize};
#[test]
fn insert_inserts_task() {
@ -238,6 +251,30 @@ mod postgres_tests {
});
}
#[test]
fn push_task_serializes_and_inserts_task() {
let postgres = Postgres::new(None);
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = postgres.push_task(&job).unwrap();
let mut m = serde_json::value::Map::new();
m.insert(
"number".to_string(),
serde_json::value::Value::Number(10.into()),
);
m.insert(
"type".to_string(),
serde_json::value::Value::String("Job".to_string()),
);
assert_eq!(task.metadata, serde_json::value::Value::Object(m));
Ok(())
});
}
// this test is ignored because it commits data to the db
#[test]
#[ignore]
@ -282,6 +319,20 @@ mod postgres_tests {
assert_eq!(found_task.id, task1_id);
}
#[derive(Serialize, Deserialize)]
struct Job {
pub number: u16,
}
#[typetag::serde]
impl Runnable for Job {
fn run(&self) -> Result<(), ExecutorError> {
println!("the number is {}", self.number);
Ok(())
}
}
fn insert_job(
metadata: serde_json::Value,
timestamp: DateTime<Utc>,

View file

@ -1,6 +1,6 @@
use diesel_derive_enum::DbEnum;
#[derive(DbEnum, Debug, Eq, PartialEq)]
#[derive(DbEnum, Debug, Eq, PartialEq, Clone)]
pub enum FangTaskState {
New,
InProgress,

158
src/worker_pool.rs Normal file
View file

@ -0,0 +1,158 @@
use crate::executor::Executor;
use crate::postgres::Postgres;
use std::thread;
struct WorkerPool {
pub number_of_workers: u16,
pub name: String,
}
struct WorkerThread {
pub name: String,
pub restarts: u64,
}
impl WorkerPool {
pub fn new(number_of_workers: u16, name: String) -> Self {
Self {
number_of_workers,
name,
}
}
pub fn start(&self) {
for idx in 1..self.number_of_workers + 1 {
let name = format!("{}{}", self.name, idx);
WorkerThread::spawn_in_pool(name, 0)
}
}
}
impl WorkerThread {
pub fn new(name: String, restarts: u64) -> Self {
Self { name, restarts }
}
pub fn spawn_in_pool(name: String, restarts: u64) {
let mut builder = thread::Builder::new().name(name.clone());
builder = builder;
info!(
"starting a worker thread {}, number of restarts {}",
name, restarts
);
builder
.spawn(move || {
// when _job is dropped, it will be restarted (see Drop trait impl)
let _job = WorkerThread::new(name, restarts);
let postgres = Postgres::new(None);
Executor::new(postgres).run_tasks()
})
.unwrap();
}
}
impl Drop for WorkerThread {
fn drop(&mut self) {
WorkerThread::spawn_in_pool(self.name.clone(), self.restarts + 1)
}
}
#[cfg(test)]
mod job_pool_tests {
use super::WorkerPool;
use crate::executor::Error;
use crate::executor::Runnable;
use crate::postgres::Postgres;
use crate::postgres::Task;
use crate::schema::fang_tasks;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::thread;
use std::time::Duration;
#[derive(Serialize, Deserialize)]
struct MyJob {
pub number: u16,
pub current_thread_name: String,
}
impl MyJob {
pub fn new(number: u16) -> Self {
let handle = thread::current();
let current_thread_name = handle.name().unwrap().to_string();
Self {
number,
current_thread_name,
}
}
}
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
fang_tasks::table.get_results::<Task>(conn).unwrap()
}
#[typetag::serde]
impl Runnable for MyJob {
fn run(&self) -> Result<(), Error> {
let postgres = Postgres::new(None);
thread::sleep(Duration::from_secs(3));
let new_job = MyJob::new(self.number + 1);
postgres.push_task(&new_job).unwrap();
Ok(())
}
}
// this test is ignored because it commits data to the db
#[test]
#[ignore]
fn tasks_are_split_between_two_threads() {
env_logger::init();
let postgres = Postgres::new(None);
let job_pool = WorkerPool::new(2, "test_worker".to_string());
postgres.push_task(&MyJob::new(0)).unwrap();
postgres.push_task(&MyJob::new(0)).unwrap();
job_pool.start();
thread::sleep(Duration::from_secs(100));
let tasks = get_all_tasks(&postgres.connection);
assert!(tasks.len() > 40);
let test_worker1_jobs: Vec<Task> = tasks
.clone()
.into_iter()
.filter(|job| {
serde_json::to_string(&job.metadata)
.unwrap()
.contains("test_worker1")
})
.collect();
let test_worker2_jobs: Vec<Task> = tasks
.into_iter()
.filter(|job| {
serde_json::to_string(&job.metadata)
.unwrap()
.contains("test_worker2")
})
.collect();
assert!(test_worker1_jobs.len() > 20);
assert!(test_worker2_jobs.len() > 20);
}
}