Renaming task to job (#29) (#31)

This commit is contained in:
Pmarquez 2022-07-19 13:47:55 +00:00 committed by GitHub
parent a60eb08fa6
commit 9c478e66a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 95 deletions

View file

@ -195,12 +195,12 @@ mod executor_tests {
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct ExecutorJobTest {
struct ExecutorTaskTest {
pub number: u16,
}
#[typetag::serde]
impl Runnable for ExecutorJobTest {
impl Runnable for ExecutorTaskTest {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
println!("the number is {}", self.number);
@ -209,12 +209,12 @@ mod executor_tests {
}
#[derive(Serialize, Deserialize)]
struct FailedJob {
struct FailedTask {
pub number: u16,
}
#[typetag::serde]
impl Runnable for FailedJob {
impl Runnable for FailedTask {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
let message = format!("the number is {}", self.number);
@ -225,10 +225,10 @@ mod executor_tests {
}
#[derive(Serialize, Deserialize)]
struct JobType1 {}
struct TaskType1 {}
#[typetag::serde]
impl Runnable for JobType1 {
impl Runnable for TaskType1 {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
Ok(())
}
@ -239,10 +239,10 @@ mod executor_tests {
}
#[derive(Serialize, Deserialize)]
struct JobType2 {}
struct TaskType2 {}
#[typetag::serde]
impl Runnable for JobType2 {
impl Runnable for TaskType2 {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
Ok(())
}
@ -258,7 +258,7 @@ mod executor_tests {
#[test]
fn executes_and_finishes_task() {
let job = ExecutorJobTest { number: 10 };
let job = ExecutorTaskTest { number: 10 };
let new_task = NewTask {
metadata: serialize(&job),
@ -289,16 +289,16 @@ mod executor_tests {
#[test]
#[ignore]
fn executes_task_only_of_specific_type() {
let job1 = JobType1 {};
let job2 = JobType2 {};
let task1 = TaskType1 {};
let task2 = TaskType2 {};
let new_task1 = NewTask {
metadata: serialize(&job1),
metadata: serialize(&task1),
task_type: "type1".to_string(),
};
let new_task2 = NewTask {
metadata: serialize(&job2),
metadata: serialize(&task2),
task_type: "type2".to_string(),
};
@ -331,10 +331,10 @@ mod executor_tests {
#[test]
fn saves_error_for_failed_task() {
let job = FailedJob { number: 10 };
let task = FailedTask { number: 10 };
let new_task = NewTask {
metadata: serialize(&job),
metadata: serialize(&task),
task_type: "common".to_string(),
};

View file

@ -41,19 +41,19 @@ impl Queue {
Self { connection }
}
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
Self::push_task_query(&self.connection, job)
pub fn push_task(&self, task: &dyn Runnable) -> Result<Task, Error> {
Self::push_task_query(&self.connection, task)
}
pub fn push_task_query(connection: &PgConnection, job: &dyn Runnable) -> Result<Task, Error> {
let json_job = serde_json::to_value(job).unwrap();
pub fn push_task_query(connection: &PgConnection, task: &dyn Runnable) -> Result<Task, Error> {
let json_task = serde_json::to_value(task).unwrap();
match Self::find_task_by_metadata_query(connection, &json_job) {
match Self::find_task_by_metadata_query(connection, &json_task) {
Some(task) => Ok(task),
None => {
let new_task = NewTask {
metadata: json_job.clone(),
task_type: job.task_type(),
metadata: json_task.clone(),
task_type: task.task_type(),
};
Self::insert_query(connection, &new_task)
}
@ -62,24 +62,24 @@ impl Queue {
pub fn push_periodic_task(
&self,
job: &dyn Runnable,
task: &dyn Runnable,
period: i32,
) -> Result<PeriodicTask, Error> {
Self::push_periodic_task_query(&self.connection, job, period)
Self::push_periodic_task_query(&self.connection, task, period)
}
pub fn push_periodic_task_query(
connection: &PgConnection,
job: &dyn Runnable,
task: &dyn Runnable,
period: i32,
) -> Result<PeriodicTask, Error> {
let json_job = serde_json::to_value(job).unwrap();
let json_task = serde_json::to_value(task).unwrap();
match Self::find_periodic_task_by_metadata_query(connection, &json_job) {
match Self::find_periodic_task_by_metadata_query(connection, &json_task) {
Some(task) => Ok(task),
None => {
let new_task = NewPeriodicTask {
metadata: json_job,
metadata: json_task,
period_in_seconds: period,
};
@ -90,8 +90,8 @@ impl Queue {
}
}
pub fn enqueue_task(job: &dyn Runnable) -> Result<Task, Error> {
Self::new().push_task(job)
pub fn enqueue_task(task: &dyn Runnable) -> Result<Task, Error> {
Self::new().push_task(task)
}
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
@ -404,11 +404,11 @@ mod queue_tests {
queue.connection.test_transaction::<(), Error, _>(|| {
let timestamp1 = Utc::now() - Duration::hours(40);
let task1 = insert_job(serde_json::json!(true), timestamp1, &queue.connection);
let task1 = insert_task(serde_json::json!(true), timestamp1, &queue.connection);
let timestamp2 = Utc::now() - Duration::hours(20);
insert_job(serde_json::json!(false), timestamp2, &queue.connection);
insert_task(serde_json::json!(false), timestamp2, &queue.connection);
let found_task = queue.fetch_task(&None).unwrap();
@ -423,7 +423,7 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let task = insert_new_job(&queue.connection);
let task = insert_new_task(&queue.connection);
let updated_task = queue.finish_task(&task).unwrap();
@ -438,7 +438,7 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let task = insert_new_job(&queue.connection);
let task = insert_new_task(&queue.connection);
let error = "Failed".to_string();
let updated_task = queue.fail_task(&task, error.clone()).unwrap();
@ -455,7 +455,7 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let _task = insert_new_job(&queue.connection);
let _task = insert_new_task(&queue.connection);
let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap();
@ -483,8 +483,8 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = queue.push_task(&job).unwrap();
let task = PepeTask { number: 10 };
let task = queue.push_task(&task).unwrap();
let mut m = serde_json::value::Map::new();
m.insert(
@ -493,7 +493,7 @@ mod queue_tests {
);
m.insert(
"type".to_string(),
serde_json::value::Value::String("Job".to_string()),
serde_json::value::Value::String("PepeTask".to_string()),
);
assert_eq!(task.metadata, serde_json::value::Value::Object(m));
@ -507,10 +507,10 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task2 = queue.push_task(&job).unwrap();
let task = PepeTask { number: 10 };
let task2 = queue.push_task(&task).unwrap();
let task1 = queue.push_task(&job).unwrap();
let task1 = queue.push_task(&task).unwrap();
assert_eq!(task1.id, task2.id);
@ -523,8 +523,8 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = queue.push_periodic_task(&job, 60).unwrap();
let task = PepeTask { number: 10 };
let task = queue.push_periodic_task(&task, 60).unwrap();
assert_eq!(task.period_in_seconds, 60);
assert!(queue.find_periodic_task_by_id(task.id).is_some());
@ -534,14 +534,14 @@ mod queue_tests {
}
#[test]
fn push_periodic_task_returns_existing_job() {
fn push_periodic_task_returns_existing_task() {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task1 = queue.push_periodic_task(&job, 60).unwrap();
let task = PepeTask { number: 10 };
let task1 = queue.push_periodic_task(&task, 60).unwrap();
let task2 = queue.push_periodic_task(&job, 60).unwrap();
let task2 = queue.push_periodic_task(&task, 60).unwrap();
assert_eq!(task1.id, task2.id);
@ -554,12 +554,12 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let job = Job { number: 10 };
let task = queue.push_periodic_task(&job, 60).unwrap();
let task = PepeTask { number: 10 };
let task = queue.push_periodic_task(&task, 60).unwrap();
let schedule_in_future = Utc::now() + Duration::hours(100);
insert_periodic_job(
insert_periodic_task(
serde_json::json!(true),
schedule_in_future,
100,
@ -581,7 +581,7 @@ mod queue_tests {
queue.connection.test_transaction::<(), Error, _>(|| {
let task =
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
let updated_task = queue.schedule_next_task_execution(&task).unwrap();
@ -604,7 +604,7 @@ mod queue_tests {
queue.connection.test_transaction::<(), Error, _>(|| {
let task =
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
let result = queue.remove_all_periodic_tasks().unwrap();
@ -621,7 +621,7 @@ mod queue_tests {
let queue = Queue::new();
queue.connection.test_transaction::<(), Error, _>(|| {
let task = insert_job(serde_json::json!(true), Utc::now(), &queue.connection);
let task = insert_task(serde_json::json!(true), Utc::now(), &queue.connection);
let result = queue.remove_all_tasks().unwrap();
assert_eq!(1, result);
@ -639,7 +639,7 @@ mod queue_tests {
queue.connection.test_transaction::<(), Error, _>(|| {
let schedule_in_future = Utc::now() + Duration::hours(100);
insert_periodic_job(
insert_periodic_task(
serde_json::json!(true),
schedule_in_future,
100,
@ -647,7 +647,7 @@ mod queue_tests {
);
let task =
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
let tasks = queue.fetch_periodic_tasks(100).unwrap();
@ -726,8 +726,8 @@ mod queue_tests {
let queue = Queue::new();
let timestamp1 = Utc::now() - Duration::hours(40);
let task1 = insert_job(
serde_json::json!(Job { number: 12 }),
let task1 = insert_task(
serde_json::json!(PepeTask { number: 12 }),
timestamp1,
&queue.connection,
);
@ -736,8 +736,8 @@ mod queue_tests {
let timestamp2 = Utc::now() - Duration::hours(20);
let task2 = insert_job(
serde_json::json!(Job { number: 11 }),
let task2 = insert_task(
serde_json::json!(PepeTask { number: 11 }),
timestamp2,
&queue.connection,
);
@ -772,12 +772,12 @@ mod queue_tests {
}
#[derive(Serialize, Deserialize)]
struct Job {
struct PepeTask {
pub number: u16,
}
#[typetag::serde]
impl Runnable for Job {
impl Runnable for PepeTask {
fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> {
println!("the number is {}", self.number);
@ -785,7 +785,7 @@ mod queue_tests {
}
}
fn insert_job(
fn insert_task(
metadata: serde_json::Value,
timestamp: DateTime<Utc>,
connection: &PgConnection,
@ -799,7 +799,7 @@ mod queue_tests {
.unwrap()
}
fn insert_periodic_job(
fn insert_periodic_task(
metadata: serde_json::Value,
timestamp: DateTime<Utc>,
period_in_seconds: i32,
@ -815,7 +815,7 @@ mod queue_tests {
.unwrap()
}
fn insert_new_job(connection: &PgConnection) -> Task {
fn insert_new_task(connection: &PgConnection) -> Task {
diesel::insert_into(fang_tasks::table)
.values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)])
.get_result::<Task>(connection)

View file

@ -77,7 +77,7 @@ impl Scheduler {
}
#[cfg(test)]
mod job_scheduler_tests {
mod task_scheduler_tests {
use super::Scheduler;
use crate::executor::Error;
use crate::executor::Runnable;
@ -92,10 +92,10 @@ mod job_scheduler_tests {
use std::time::Duration;
#[derive(Serialize, Deserialize)]
struct ScheduledJob {}
struct ScheduledTask {}
#[typetag::serde]
impl Runnable for ScheduledJob {
impl Runnable for ScheduledTask {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
Ok(())
}
@ -107,10 +107,10 @@ mod job_scheduler_tests {
#[test]
#[ignore]
fn schedules_jobs() {
fn schedules_tasks() {
let queue = Queue::new();
queue.push_periodic_task(&ScheduledJob {}, 10).unwrap();
queue.push_periodic_task(&ScheduledTask {}, 10).unwrap();
Scheduler::start(1, 2);
let sleep_duration = Duration::from_secs(15);

View file

@ -219,7 +219,7 @@ impl Drop for WorkerThread {
}
#[cfg(test)]
mod job_pool_tests {
mod task_pool_tests {
use super::WorkerParams;
use super::WorkerPool;
use crate::executor::Error;
@ -236,12 +236,12 @@ mod job_pool_tests {
use std::time::Duration;
#[derive(Serialize, Deserialize)]
struct MyJob {
struct MyTask {
pub number: u16,
pub current_thread_name: String,
}
impl MyJob {
impl MyTask {
pub fn new(number: u16) -> Self {
let handle = thread::current();
let current_thread_name = handle.name().unwrap().to_string();
@ -254,13 +254,13 @@ mod job_pool_tests {
}
#[typetag::serde]
impl Runnable for MyJob {
impl Runnable for MyTask {
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
thread::sleep(Duration::from_secs(3));
let new_job = MyJob::new(self.number + 1);
let new_task = MyTask::new(self.number + 1);
Queue::push_task_query(connection, &new_job).unwrap();
Queue::push_task_query(connection, &new_task).unwrap();
Ok(())
}
@ -271,12 +271,12 @@ mod job_pool_tests {
}
#[derive(Serialize, Deserialize)]
struct ShutdownJob {
struct ShutdownTask {
pub number: u16,
pub current_thread_name: String,
}
impl ShutdownJob {
impl ShutdownTask {
pub fn new(number: u16) -> Self {
let handle = thread::current();
let current_thread_name = handle.name().unwrap().to_string();
@ -289,13 +289,13 @@ mod job_pool_tests {
}
#[typetag::serde]
impl Runnable for ShutdownJob {
impl Runnable for ShutdownTask {
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
thread::sleep(Duration::from_secs(3));
let new_job = MyJob::new(self.number + 1);
let new_task = MyTask::new(self.number + 1);
Queue::push_task_query(connection, &new_job).unwrap();
Queue::push_task_query(connection, &new_task).unwrap();
Ok(())
}
@ -320,14 +320,14 @@ mod job_pool_tests {
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::KeepAll);
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
queue.push_task(&ShutdownJob::new(100)).unwrap();
queue.push_task(&ShutdownJob::new(200)).unwrap();
queue.push_task(&ShutdownTask::new(100)).unwrap();
queue.push_task(&ShutdownTask::new(200)).unwrap();
job_pool.start().unwrap();
task_pool.start().unwrap();
thread::sleep(Duration::from_secs(1));
job_pool.shutdown().unwrap();
task_pool.shutdown().unwrap();
thread::sleep(Duration::from_secs(5));
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
@ -351,12 +351,12 @@ mod job_pool_tests {
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::KeepAll);
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
queue.push_task(&MyJob::new(100)).unwrap();
queue.push_task(&MyJob::new(200)).unwrap();
queue.push_task(&MyTask::new(100)).unwrap();
queue.push_task(&MyTask::new(200)).unwrap();
job_pool.start().unwrap();
task_pool.start().unwrap();
thread::sleep(Duration::from_secs(100));
@ -364,19 +364,19 @@ mod job_pool_tests {
assert!(tasks.len() > 40);
let test_worker1_jobs = tasks.clone().into_iter().filter(|job| {
serde_json::to_string(&job.metadata)
let test_worker1_tasks = tasks.clone().into_iter().filter(|task| {
serde_json::to_string(&task.metadata)
.unwrap()
.contains("worker_1")
});
let test_worker2_jobs = tasks.into_iter().filter(|job| {
serde_json::to_string(&job.metadata)
let test_worker2_tasks = tasks.into_iter().filter(|task| {
serde_json::to_string(&task.metadata)
.unwrap()
.contains("worker_2")
});
assert!(test_worker1_jobs.count() > 20);
assert!(test_worker2_jobs.count() > 20);
assert!(test_worker1_tasks.count() > 20);
assert!(test_worker2_tasks.count() > 20);
}
}