From 58ae681a4e672f5e2b0a524bb250b2deac88ce83 Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Tue, 19 Jul 2022 11:39:26 +0000 Subject: [PATCH] Renaming task to job (#29) --- src/executor.rs | 30 +++++++------- src/queue.rs | 98 +++++++++++++++++++++++----------------------- src/scheduler.rs | 10 ++--- src/worker_pool.rs | 52 ++++++++++++------------ 4 files changed, 95 insertions(+), 95 deletions(-) diff --git a/src/executor.rs b/src/executor.rs index 1255b7b..5d3409e 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -195,12 +195,12 @@ mod executor_tests { use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] - struct ExecutorJobTest { + struct ExecutorTaskTest { pub number: u16, } #[typetag::serde] - impl Runnable for ExecutorJobTest { + impl Runnable for ExecutorTaskTest { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { println!("the number is {}", self.number); @@ -209,12 +209,12 @@ mod executor_tests { } #[derive(Serialize, Deserialize)] - struct FailedJob { + struct FailedTask { pub number: u16, } #[typetag::serde] - impl Runnable for FailedJob { + impl Runnable for FailedTask { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { let message = format!("the number is {}", self.number); @@ -225,10 +225,10 @@ mod executor_tests { } #[derive(Serialize, Deserialize)] - struct JobType1 {} + struct TaskType1 {} #[typetag::serde] - impl Runnable for JobType1 { + impl Runnable for TaskType1 { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { Ok(()) } @@ -239,10 +239,10 @@ mod executor_tests { } #[derive(Serialize, Deserialize)] - struct JobType2 {} + struct TaskType2 {} #[typetag::serde] - impl Runnable for JobType2 { + impl Runnable for TaskType2 { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { Ok(()) } @@ -258,7 +258,7 @@ mod executor_tests { #[test] fn executes_and_finishes_task() { - let job = ExecutorJobTest { number: 10 }; + let job = ExecutorTaskTest { number: 10 }; let new_task = NewTask { metadata: serialize(&job), @@ -289,16 +289,16 @@ mod executor_tests { #[test] #[ignore] fn executes_task_only_of_specific_type() { - let job1 = JobType1 {}; - let job2 = JobType2 {}; + let task1 = TaskType1 {}; + let task2 = TaskType2 {}; let new_task1 = NewTask { - metadata: serialize(&job1), + metadata: serialize(&task1), task_type: "type1".to_string(), }; let new_task2 = NewTask { - metadata: serialize(&job2), + metadata: serialize(&task2), task_type: "type2".to_string(), }; @@ -331,10 +331,10 @@ mod executor_tests { #[test] fn saves_error_for_failed_task() { - let job = FailedJob { number: 10 }; + let task = FailedTask { number: 10 }; let new_task = NewTask { - metadata: serialize(&job), + metadata: serialize(&task), task_type: "common".to_string(), }; diff --git a/src/queue.rs b/src/queue.rs index c5afd77..8302078 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -77,19 +77,19 @@ impl Queue { Self { connection } } - pub fn push_task(&self, job: &dyn Runnable) -> Result { - Self::push_task_query(&self.connection, job) + pub fn push_task(&self, task: &dyn Runnable) -> Result { + Self::push_task_query(&self.connection, task) } - pub fn push_task_query(connection: &PgConnection, job: &dyn Runnable) -> Result { - let json_job = serde_json::to_value(job).unwrap(); + pub fn push_task_query(connection: &PgConnection, task: &dyn Runnable) -> Result { + let json_task = serde_json::to_value(task).unwrap(); - match Self::find_task_by_metadata_query(connection, &json_job) { + match Self::find_task_by_metadata_query(connection, &json_task) { Some(task) => Ok(task), None => { let new_task = NewTask { - metadata: json_job.clone(), - task_type: job.task_type(), + metadata: json_task.clone(), + task_type: task.task_type(), }; Self::insert_query(connection, &new_task) } @@ -98,24 +98,24 @@ impl Queue { pub fn push_periodic_task( &self, - job: &dyn Runnable, + task: &dyn Runnable, period: i32, ) -> Result { - Self::push_periodic_task_query(&self.connection, job, period) + Self::push_periodic_task_query(&self.connection, task, period) } pub fn push_periodic_task_query( connection: &PgConnection, - job: &dyn Runnable, + task: &dyn Runnable, period: i32, ) -> Result { - let json_job = serde_json::to_value(job).unwrap(); + let json_task = serde_json::to_value(task).unwrap(); - match Self::find_periodic_task_by_metadata_query(connection, &json_job) { + match Self::find_periodic_task_by_metadata_query(connection, &json_task) { Some(task) => Ok(task), None => { let new_task = NewPeriodicTask { - metadata: json_job, + metadata: json_task, period_in_seconds: period, }; @@ -126,8 +126,8 @@ impl Queue { } } - pub fn enqueue_task(job: &dyn Runnable) -> Result { - Self::new().push_task(job) + pub fn enqueue_task(task: &dyn Runnable) -> Result { + Self::new().push_task(task) } pub fn insert(&self, params: &NewTask) -> Result { @@ -440,11 +440,11 @@ mod queue_tests { queue.connection.test_transaction::<(), Error, _>(|| { let timestamp1 = Utc::now() - Duration::hours(40); - let task1 = insert_job(serde_json::json!(true), timestamp1, &queue.connection); + let task1 = insert_task(serde_json::json!(true), timestamp1, &queue.connection); let timestamp2 = Utc::now() - Duration::hours(20); - insert_job(serde_json::json!(false), timestamp2, &queue.connection); + insert_task(serde_json::json!(false), timestamp2, &queue.connection); let found_task = queue.fetch_task(&None).unwrap(); @@ -459,7 +459,7 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_job(&queue.connection); + let task = insert_new_task(&queue.connection); let updated_task = queue.finish_task(&task).unwrap(); @@ -474,7 +474,7 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_new_job(&queue.connection); + let task = insert_new_task(&queue.connection); let error = "Failed".to_string(); let updated_task = queue.fail_task(&task, error.clone()).unwrap(); @@ -491,7 +491,7 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let _task = insert_new_job(&queue.connection); + let _task = insert_new_task(&queue.connection); let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap(); @@ -519,8 +519,8 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let job = Job { number: 10 }; - let task = queue.push_task(&job).unwrap(); + let task = PepeTask { number: 10 }; + let task = queue.push_task(&task).unwrap(); let mut m = serde_json::value::Map::new(); m.insert( @@ -529,7 +529,7 @@ mod queue_tests { ); m.insert( "type".to_string(), - serde_json::value::Value::String("Job".to_string()), + serde_json::value::Value::String("PepeTask".to_string()), ); assert_eq!(task.metadata, serde_json::value::Value::Object(m)); @@ -543,10 +543,10 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let job = Job { number: 10 }; - let task2 = queue.push_task(&job).unwrap(); + let task = PepeTask { number: 10 }; + let task2 = queue.push_task(&task).unwrap(); - let task1 = queue.push_task(&job).unwrap(); + let task1 = queue.push_task(&task).unwrap(); assert_eq!(task1.id, task2.id); @@ -559,8 +559,8 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let job = Job { number: 10 }; - let task = queue.push_periodic_task(&job, 60).unwrap(); + let task = PepeTask { number: 10 }; + let task = queue.push_periodic_task(&task, 60).unwrap(); assert_eq!(task.period_in_seconds, 60); assert!(queue.find_periodic_task_by_id(task.id).is_some()); @@ -570,14 +570,14 @@ mod queue_tests { } #[test] - fn push_periodic_task_returns_existing_job() { + fn push_periodic_task_returns_existing_task() { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let job = Job { number: 10 }; - let task1 = queue.push_periodic_task(&job, 60).unwrap(); + let task = PepeTask { number: 10 }; + let task1 = queue.push_periodic_task(&task, 60).unwrap(); - let task2 = queue.push_periodic_task(&job, 60).unwrap(); + let task2 = queue.push_periodic_task(&task, 60).unwrap(); assert_eq!(task1.id, task2.id); @@ -590,12 +590,12 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let job = Job { number: 10 }; - let task = queue.push_periodic_task(&job, 60).unwrap(); + let task = PepeTask { number: 10 }; + let task = queue.push_periodic_task(&task, 60).unwrap(); let schedule_in_future = Utc::now() + Duration::hours(100); - insert_periodic_job( + insert_periodic_task( serde_json::json!(true), schedule_in_future, 100, @@ -617,7 +617,7 @@ mod queue_tests { queue.connection.test_transaction::<(), Error, _>(|| { let task = - insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection); + insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); let updated_task = queue.schedule_next_task_execution(&task).unwrap(); @@ -640,7 +640,7 @@ mod queue_tests { queue.connection.test_transaction::<(), Error, _>(|| { let task = - insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection); + insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); let result = queue.remove_all_periodic_tasks().unwrap(); @@ -657,7 +657,7 @@ mod queue_tests { let queue = Queue::new(); queue.connection.test_transaction::<(), Error, _>(|| { - let task = insert_job(serde_json::json!(true), Utc::now(), &queue.connection); + let task = insert_task(serde_json::json!(true), Utc::now(), &queue.connection); let result = queue.remove_all_tasks().unwrap(); assert_eq!(1, result); @@ -675,7 +675,7 @@ mod queue_tests { queue.connection.test_transaction::<(), Error, _>(|| { let schedule_in_future = Utc::now() + Duration::hours(100); - insert_periodic_job( + insert_periodic_task( serde_json::json!(true), schedule_in_future, 100, @@ -683,7 +683,7 @@ mod queue_tests { ); let task = - insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection); + insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection); let tasks = queue.fetch_periodic_tasks(100).unwrap(); @@ -762,8 +762,8 @@ mod queue_tests { let queue = Queue::new(); let timestamp1 = Utc::now() - Duration::hours(40); - let task1 = insert_job( - serde_json::json!(Job { number: 12 }), + let task1 = insert_task( + serde_json::json!(PepeTask { number: 12 }), timestamp1, &queue.connection, ); @@ -772,8 +772,8 @@ mod queue_tests { let timestamp2 = Utc::now() - Duration::hours(20); - let task2 = insert_job( - serde_json::json!(Job { number: 11 }), + let task2 = insert_task( + serde_json::json!(PepeTask { number: 11 }), timestamp2, &queue.connection, ); @@ -808,12 +808,12 @@ mod queue_tests { } #[derive(Serialize, Deserialize)] - struct Job { + struct PepeTask { pub number: u16, } #[typetag::serde] - impl Runnable for Job { + impl Runnable for PepeTask { fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> { println!("the number is {}", self.number); @@ -821,7 +821,7 @@ mod queue_tests { } } - fn insert_job( + fn insert_task( metadata: serde_json::Value, timestamp: DateTime, connection: &PgConnection, @@ -835,7 +835,7 @@ mod queue_tests { .unwrap() } - fn insert_periodic_job( + fn insert_periodic_task( metadata: serde_json::Value, timestamp: DateTime, period_in_seconds: i32, @@ -851,7 +851,7 @@ mod queue_tests { .unwrap() } - fn insert_new_job(connection: &PgConnection) -> Task { + fn insert_new_task(connection: &PgConnection) -> Task { diesel::insert_into(fang_tasks::table) .values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)]) .get_result::(connection) diff --git a/src/scheduler.rs b/src/scheduler.rs index 70224ae..9f91de5 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -77,7 +77,7 @@ impl Scheduler { } #[cfg(test)] -mod job_scheduler_tests { +mod task_scheduler_tests { use super::Scheduler; use crate::executor::Error; use crate::executor::Runnable; @@ -92,10 +92,10 @@ mod job_scheduler_tests { use std::time::Duration; #[derive(Serialize, Deserialize)] - struct ScheduledJob {} + struct ScheduledTask {} #[typetag::serde] - impl Runnable for ScheduledJob { + impl Runnable for ScheduledTask { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { Ok(()) } @@ -107,10 +107,10 @@ mod job_scheduler_tests { #[test] #[ignore] - fn schedules_jobs() { + fn schedules_tasks() { let queue = Queue::new(); - queue.push_periodic_task(&ScheduledJob {}, 10).unwrap(); + queue.push_periodic_task(&ScheduledTask {}, 10).unwrap(); Scheduler::start(1, 2); let sleep_duration = Duration::from_secs(15); diff --git a/src/worker_pool.rs b/src/worker_pool.rs index bb4d394..ea1a488 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -219,7 +219,7 @@ impl Drop for WorkerThread { } #[cfg(test)] -mod job_pool_tests { +mod task_pool_tests { use super::WorkerParams; use super::WorkerPool; use crate::executor::Error; @@ -236,12 +236,12 @@ mod job_pool_tests { use std::time::Duration; #[derive(Serialize, Deserialize)] - struct MyJob { + struct MyTask { pub number: u16, pub current_thread_name: String, } - impl MyJob { + impl MyTask { pub fn new(number: u16) -> Self { let handle = thread::current(); let current_thread_name = handle.name().unwrap().to_string(); @@ -254,13 +254,13 @@ mod job_pool_tests { } #[typetag::serde] - impl Runnable for MyJob { + impl Runnable for MyTask { fn run(&self, connection: &PgConnection) -> Result<(), Error> { thread::sleep(Duration::from_secs(3)); - let new_job = MyJob::new(self.number + 1); + let new_task = MyTask::new(self.number + 1); - Queue::push_task_query(connection, &new_job).unwrap(); + Queue::push_task_query(connection, &new_task).unwrap(); Ok(()) } @@ -271,12 +271,12 @@ mod job_pool_tests { } #[derive(Serialize, Deserialize)] - struct ShutdownJob { + struct ShutdownTask { pub number: u16, pub current_thread_name: String, } - impl ShutdownJob { + impl ShutdownTask { pub fn new(number: u16) -> Self { let handle = thread::current(); let current_thread_name = handle.name().unwrap().to_string(); @@ -289,13 +289,13 @@ mod job_pool_tests { } #[typetag::serde] - impl Runnable for ShutdownJob { + impl Runnable for ShutdownTask { fn run(&self, connection: &PgConnection) -> Result<(), Error> { thread::sleep(Duration::from_secs(3)); - let new_job = MyJob::new(self.number + 1); + let new_task = MyTask::new(self.number + 1); - Queue::push_task_query(connection, &new_job).unwrap(); + Queue::push_task_query(connection, &new_task).unwrap(); Ok(()) } @@ -320,14 +320,14 @@ mod job_pool_tests { let mut worker_params = WorkerParams::new(); worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut job_pool = WorkerPool::new_with_params(2, worker_params); + let mut task_pool = WorkerPool::new_with_params(2, worker_params); - queue.push_task(&ShutdownJob::new(100)).unwrap(); - queue.push_task(&ShutdownJob::new(200)).unwrap(); + queue.push_task(&ShutdownTask::new(100)).unwrap(); + queue.push_task(&ShutdownTask::new(200)).unwrap(); - job_pool.start().unwrap(); + task_pool.start().unwrap(); thread::sleep(Duration::from_secs(1)); - job_pool.shutdown().unwrap(); + task_pool.shutdown().unwrap(); thread::sleep(Duration::from_secs(5)); let tasks = get_all_tasks(&queue.connection, "shutdown_test"); @@ -351,12 +351,12 @@ mod job_pool_tests { let mut worker_params = WorkerParams::new(); worker_params.set_retention_mode(RetentionMode::KeepAll); - let mut job_pool = WorkerPool::new_with_params(2, worker_params); + let mut task_pool = WorkerPool::new_with_params(2, worker_params); - queue.push_task(&MyJob::new(100)).unwrap(); - queue.push_task(&MyJob::new(200)).unwrap(); + queue.push_task(&MyTask::new(100)).unwrap(); + queue.push_task(&MyTask::new(200)).unwrap(); - job_pool.start().unwrap(); + task_pool.start().unwrap(); thread::sleep(Duration::from_secs(100)); @@ -364,19 +364,19 @@ mod job_pool_tests { assert!(tasks.len() > 40); - let test_worker1_jobs = tasks.clone().into_iter().filter(|job| { - serde_json::to_string(&job.metadata) + let test_worker1_tasks = tasks.clone().into_iter().filter(|task| { + serde_json::to_string(&task.metadata) .unwrap() .contains("worker_1") }); - let test_worker2_jobs = tasks.into_iter().filter(|job| { - serde_json::to_string(&job.metadata) + let test_worker2_tasks = tasks.into_iter().filter(|task| { + serde_json::to_string(&task.metadata) .unwrap() .contains("worker_2") }); - assert!(test_worker1_jobs.count() > 20); - assert!(test_worker2_jobs.count() > 20); + assert!(test_worker1_tasks.count() > 20); + assert!(test_worker2_tasks.count() > 20); } }