remove finished tasks (#2)

* execute different types of task in separate workers

* add more tests

* remove finished tasks

* add retention_mode

* make db url optional

* Add worker params

* Add CHANGELOG entry
This commit is contained in:
Ayrat Badykov 2021-07-03 15:18:41 +03:00 committed by GitHub
parent 4b1f537d19
commit 0cb3b7301b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 250 additions and 84 deletions

View file

@ -1,7 +1,7 @@
## Unreleased
- Execute different types of tasks in separate workers - [#1](https://github.com/ayrat555/fang/pull/1)
- Add retention mode for tasks - [#2](https://github.com/ayrat555/fang/pull/2)
## 0.2.0 (2021-06-24)

View file

@ -8,12 +8,50 @@ use std::time::Duration;
pub struct Executor {
pub storage: Postgres,
pub task_type: Option<String>,
pub sleep_params: SleepParams,
pub retention_mode: RetentionMode,
}
#[derive(Clone)]
pub enum RetentionMode {
KeepAll,
RemoveAll,
RemoveFinished,
}
#[derive(Clone)]
pub struct SleepParams {
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
pub sleep_step: u64,
}
impl SleepParams {
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
}
}
pub fn maybe_increase_sleep_period(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step;
}
}
}
impl Default for SleepParams {
fn default() -> Self {
SleepParams {
sleep_period: 5,
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
}
}
}
#[derive(Debug)]
pub struct Error {
pub description: String,
@ -35,10 +73,8 @@ impl Executor {
pub fn new(storage: Postgres) -> Self {
Self {
storage,
sleep_period: 5,
max_sleep_period: 15,
min_sleep_period: 5,
sleep_step: 5,
sleep_params: SleepParams::default(),
retention_mode: RetentionMode::RemoveFinished,
task_type: None,
}
}
@ -47,24 +83,18 @@ impl Executor {
self.task_type = Some(task_type);
}
pub fn run(&self, task: &Task) {
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
self.sleep_params = sleep_params;
}
let task_result = panic::catch_unwind(|| actual_task.run());
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
self.retention_mode = retention_mode;
}
match task_result {
Ok(result) => {
match result {
Ok(()) => self.storage.finish_task(task).unwrap(),
Err(error) => self.storage.fail_task(task, error.description).unwrap(),
};
}
pub fn run(&self, task: Task) {
let result = self.execute_task(task);
Err(error) => {
let message = format!("panicked during tak execution {:?}", error);
self.storage.fail_task(task, message).unwrap();
}
}
self.finalize_task(result)
}
pub fn run_tasks(&mut self) {
@ -72,7 +102,7 @@ impl Executor {
match self.storage.fetch_and_touch(&self.task_type.clone()) {
Ok(Some(task)) => {
self.maybe_reset_sleep_period();
self.run(&task);
self.run(task);
}
Ok(None) => {
self.sleep();
@ -88,17 +118,62 @@ impl Executor {
}
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
}
self.sleep_params.maybe_reset_sleep_period();
}
pub fn sleep(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step;
}
self.sleep_params.maybe_increase_sleep_period();
thread::sleep(Duration::from_secs(self.sleep_period));
thread::sleep(Duration::from_secs(self.sleep_params.sleep_period));
}
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
let task_result = panic::catch_unwind(|| actual_task.run());
match task_result {
Ok(result) => match result {
Ok(()) => Ok(task),
Err(error) => Err((task, error.description)),
},
Err(error) => {
let message = format!("panicked during task execution {:?}", error);
Err((task, message))
}
}
}
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
match self.retention_mode {
RetentionMode::KeepAll => {
match result {
Ok(task) => self.storage.finish_task(&task).unwrap(),
Err((task, error)) => self.storage.fail_task(&task, error).unwrap(),
};
()
}
RetentionMode::RemoveAll => {
match result {
Ok(task) => self.storage.remove_task(task.id).unwrap(),
Err((task, _error)) => self.storage.remove_task(task.id).unwrap(),
};
()
}
RetentionMode::RemoveFinished => match result {
Ok(task) => {
self.storage.remove_task(task.id).unwrap();
()
}
Err((task, error)) => {
self.storage.fail_task(&task, error).unwrap();
()
}
},
}
}
}
@ -106,6 +181,7 @@ impl Executor {
mod executor_tests {
use super::Error;
use super::Executor;
use super::RetentionMode;
use super::Runnable;
use crate::postgres::NewTask;
use crate::postgres::Postgres;
@ -198,7 +274,8 @@ mod executor_tests {
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));
let mut executor = Executor::new(Postgres::new());
executor.set_retention_mode(RetentionMode::KeepAll);
executor
.storage
@ -208,7 +285,7 @@ mod executor_tests {
assert_eq!(FangTaskState::New, task.state);
executor.run(&task);
executor.run(task.clone());
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
@ -234,7 +311,7 @@ mod executor_tests {
task_type: "type2".to_string(),
};
let executor = Executor::new(Postgres::new(None));
let executor = Executor::new(Postgres::new());
let task1 = executor.storage.insert(&new_task1).unwrap();
let task2 = executor.storage.insert(&new_task2).unwrap();
@ -243,8 +320,9 @@ mod executor_tests {
assert_eq!(FangTaskState::New, task2.state);
std::thread::spawn(move || {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
let mut executor = Executor::new(postgres);
executor.set_retention_mode(RetentionMode::KeepAll);
executor.set_task_type("type1".to_string());
executor.run_tasks();
@ -268,7 +346,7 @@ mod executor_tests {
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));
let executor = Executor::new(Postgres::new());
executor
.storage
@ -278,7 +356,7 @@ mod executor_tests {
assert_eq!(FangTaskState::New, task.state);
executor.run(&task);
executor.run(task.clone());
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
@ -301,7 +379,7 @@ mod executor_tests {
task_type: "common".to_string(),
};
let executor = Executor::new(Postgres::new(None));
let executor = Executor::new(Postgres::new());
executor
.storage
@ -311,13 +389,13 @@ mod executor_tests {
assert_eq!(FangTaskState::New, task.state);
executor.run(&task);
executor.run(task.clone());
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
assert_eq!(FangTaskState::Failed, found_task.state);
assert_eq!(
"panicked during tak execution Any".to_string(),
"panicked during task execution Any".to_string(),
found_task.error_message.unwrap()
);

View file

@ -33,8 +33,14 @@ pub struct Postgres {
}
impl Postgres {
pub fn new(database_url: Option<String>) -> Self {
let connection = Self::pg_connection(database_url);
pub fn new() -> Self {
let connection = Self::pg_connection(None);
Self { connection }
}
pub fn new_with_url(database_url: String) -> Self {
let connection = Self::pg_connection(Some(database_url));
Self { connection }
}
@ -51,7 +57,7 @@ impl Postgres {
}
pub fn enqueue_task(job: &dyn Runnable) -> Result<Task, Error> {
Self::new(None).push_task(job)
Self::new().push_task(job)
}
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
@ -83,13 +89,16 @@ impl Postgres {
}
pub fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
match fang_tasks::table
fang_tasks::table
.filter(fang_tasks::id.eq(id))
.first::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
}
.ok()
}
pub fn remove_task(&self, id: Uuid) -> Result<usize, Error> {
let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
diesel::delete(query).execute(&self.connection)
}
pub fn finish_task(&self, task: &Task) -> Result<Task, Error> {
@ -136,21 +145,18 @@ impl Postgres {
}
fn fetch_any_task(&self) -> Option<Task> {
match fang_tasks::table
fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
}
.ok()
}
fn fetch_task_of_type(&self, task_type: &String) -> Option<Task> {
match fang_tasks::table
fang_tasks::table
.order(fang_tasks::created_at.asc())
.limit(1)
.filter(fang_tasks::state.eq(FangTaskState::New))
@ -158,10 +164,7 @@ impl Postgres {
.for_update()
.skip_locked()
.get_result::<Task>(&self.connection)
{
Ok(record) => Some(record),
_ => None,
}
.ok()
}
}
@ -182,7 +185,7 @@ mod postgres_tests {
#[test]
fn insert_inserts_task() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
let new_task = NewTask {
metadata: serde_json::json!(true),
@ -199,7 +202,7 @@ mod postgres_tests {
#[test]
fn fetch_task_fetches_the_oldest_task() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let timestamp1 = Utc::now() - Duration::hours(40);
@ -220,7 +223,7 @@ mod postgres_tests {
#[test]
fn finish_task_updates_state_field() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = insert_new_job(&postgres.connection);
@ -235,7 +238,7 @@ mod postgres_tests {
#[test]
fn fail_task_updates_state_field_and_sets_error_message() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = insert_new_job(&postgres.connection);
@ -252,7 +255,7 @@ mod postgres_tests {
#[test]
fn fetch_and_touch_updates_state() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let _task = insert_new_job(&postgres.connection);
@ -267,7 +270,7 @@ mod postgres_tests {
#[test]
fn fetch_and_touch_returns_none() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let task = postgres.fetch_and_touch(&None).unwrap();
@ -280,7 +283,7 @@ mod postgres_tests {
#[test]
fn push_task_serializes_and_inserts_task() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
@ -302,11 +305,43 @@ mod postgres_tests {
});
}
#[test]
fn remove_task() {
let postgres = Postgres::new();
let new_task1 = NewTask {
metadata: serde_json::json!(true),
task_type: "common".to_string(),
};
let new_task2 = NewTask {
metadata: serde_json::json!(true),
task_type: "common".to_string(),
};
postgres.connection.test_transaction::<(), Error, _>(|| {
let task1 = postgres.insert(&new_task1).unwrap();
assert!(postgres.find_task_by_id(task1.id).is_some());
let task2 = postgres.insert(&new_task2).unwrap();
assert!(postgres.find_task_by_id(task2.id).is_some());
postgres.remove_task(task1.id).unwrap();
assert!(postgres.find_task_by_id(task1.id).is_none());
assert!(postgres.find_task_by_id(task2.id).is_some());
postgres.remove_task(task2.id).unwrap();
assert!(postgres.find_task_by_id(task2.id).is_none());
Ok(())
});
}
// this test is ignored because it commits data to the db
#[test]
#[ignore]
fn fetch_task_locks_the_record() {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
let timestamp1 = Utc::now() - Duration::hours(40);
let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection);
@ -318,7 +353,7 @@ mod postgres_tests {
let task2 = insert_job(serde_json::json!(false), timestamp2, &postgres.connection);
let thread = std::thread::spawn(move || {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
postgres.connection.transaction::<(), Error, _>(|| {
let found_task = postgres.fetch_task(&None).unwrap();

View file

@ -1,48 +1,89 @@
use crate::executor::Executor;
use crate::executor::RetentionMode;
use crate::executor::SleepParams;
use crate::postgres::Postgres;
use std::thread;
pub struct WorkerPool {
pub number_of_workers: u16,
pub task_type: Option<String>,
pub worker_params: WorkerParams,
}
pub struct WorkerThread {
pub name: String,
pub task_type: Option<String>,
pub worker_params: WorkerParams,
pub restarts: u64,
}
#[derive(Clone)]
pub struct WorkerParams {
pub retention_mode: Option<RetentionMode>,
pub sleep_params: Option<SleepParams>,
pub task_type: Option<String>,
}
impl WorkerParams {
pub fn new() -> Self {
Self {
retention_mode: None,
sleep_params: None,
task_type: None,
}
}
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
self.retention_mode = Some(retention_mode);
}
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
self.sleep_params = Some(sleep_params);
}
pub fn set_task_type(&mut self, task_type: String) {
self.task_type = Some(task_type);
}
}
impl WorkerPool {
pub fn new(number_of_workers: u16, task_type: Option<String>) -> Self {
pub fn new(number_of_workers: u16) -> Self {
let worker_params = WorkerParams::new();
Self {
number_of_workers,
task_type,
worker_params,
}
}
pub fn new_with_params(number_of_workers: u16, worker_params: WorkerParams) -> Self {
Self {
number_of_workers,
worker_params,
}
}
pub fn start(&self) {
for idx in 1..self.number_of_workers + 1 {
let name = format!(
"worker_{}{}",
self.task_type.clone().unwrap_or("".to_string()),
idx
);
WorkerThread::spawn_in_pool(self.task_type.clone(), name, 0)
let worker_type = self
.worker_params
.task_type
.clone()
.unwrap_or("".to_string());
let name = format!("worker_{}{}", worker_type, idx);
WorkerThread::spawn_in_pool(self.worker_params.clone(), name, 0)
}
}
}
impl WorkerThread {
pub fn new(task_type: Option<String>, name: String, restarts: u64) -> Self {
pub fn new(worker_params: WorkerParams, name: String, restarts: u64) -> Self {
Self {
name,
task_type,
worker_params,
restarts,
}
}
pub fn spawn_in_pool(task_type: Option<String>, name: String, restarts: u64) {
pub fn spawn_in_pool(worker_params: WorkerParams, name: String, restarts: u64) {
let builder = thread::Builder::new().name(name.clone());
info!(
@ -53,16 +94,24 @@ impl WorkerThread {
builder
.spawn(move || {
// when _job is dropped, it will be restarted (see Drop trait impl)
let _job = WorkerThread::new(task_type.clone(), name, restarts);
let _job = WorkerThread::new(worker_params.clone(), name, restarts);
let postgres = Postgres::new(None);
let postgres = Postgres::new();
let mut executor = Executor::new(postgres);
if let Some(task_type_str) = task_type {
if let Some(task_type_str) = worker_params.task_type {
executor.set_task_type(task_type_str);
}
if let Some(retention_mode) = worker_params.retention_mode {
executor.set_retention_mode(retention_mode);
}
if let Some(sleep_params) = worker_params.sleep_params {
executor.set_sleep_params(sleep_params);
}
executor.run_tasks();
})
.unwrap();
@ -71,7 +120,11 @@ impl WorkerThread {
impl Drop for WorkerThread {
fn drop(&mut self) {
WorkerThread::spawn_in_pool(self.task_type.clone(), self.name.clone(), self.restarts + 1)
WorkerThread::spawn_in_pool(
self.worker_params.clone(),
self.name.clone(),
self.restarts + 1,
)
}
}
@ -114,7 +167,7 @@ mod job_pool_tests {
#[typetag::serde]
impl Runnable for MyJob {
fn run(&self) -> Result<(), Error> {
let postgres = Postgres::new(None);
let postgres = Postgres::new();
thread::sleep(Duration::from_secs(3));
@ -132,8 +185,8 @@ mod job_pool_tests {
fn tasks_are_split_between_two_threads() {
env_logger::init();
let postgres = Postgres::new(None);
let job_pool = WorkerPool::new(2, None);
let postgres = Postgres::new();
let job_pool = WorkerPool::new(2);
postgres.push_task(&MyJob::new(0)).unwrap();
postgres.push_task(&MyJob::new(0)).unwrap();