Renaming task to job (#29)
This commit is contained in:
parent
3bb45022d9
commit
58ae681a4e
4 changed files with 95 additions and 95 deletions
|
@ -195,12 +195,12 @@ mod executor_tests {
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct ExecutorJobTest {
|
struct ExecutorTaskTest {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for ExecutorJobTest {
|
impl Runnable for ExecutorTaskTest {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
println!("the number is {}", self.number);
|
println!("the number is {}", self.number);
|
||||||
|
|
||||||
|
@ -209,12 +209,12 @@ mod executor_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct FailedJob {
|
struct FailedTask {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for FailedJob {
|
impl Runnable for FailedTask {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
let message = format!("the number is {}", self.number);
|
let message = format!("the number is {}", self.number);
|
||||||
|
|
||||||
|
@ -225,10 +225,10 @@ mod executor_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct JobType1 {}
|
struct TaskType1 {}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for JobType1 {
|
impl Runnable for TaskType1 {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -239,10 +239,10 @@ mod executor_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct JobType2 {}
|
struct TaskType2 {}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for JobType2 {
|
impl Runnable for TaskType2 {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -258,7 +258,7 @@ mod executor_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn executes_and_finishes_task() {
|
fn executes_and_finishes_task() {
|
||||||
let job = ExecutorJobTest { number: 10 };
|
let job = ExecutorTaskTest { number: 10 };
|
||||||
|
|
||||||
let new_task = NewTask {
|
let new_task = NewTask {
|
||||||
metadata: serialize(&job),
|
metadata: serialize(&job),
|
||||||
|
@ -289,16 +289,16 @@ mod executor_tests {
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn executes_task_only_of_specific_type() {
|
fn executes_task_only_of_specific_type() {
|
||||||
let job1 = JobType1 {};
|
let task1 = TaskType1 {};
|
||||||
let job2 = JobType2 {};
|
let task2 = TaskType2 {};
|
||||||
|
|
||||||
let new_task1 = NewTask {
|
let new_task1 = NewTask {
|
||||||
metadata: serialize(&job1),
|
metadata: serialize(&task1),
|
||||||
task_type: "type1".to_string(),
|
task_type: "type1".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_task2 = NewTask {
|
let new_task2 = NewTask {
|
||||||
metadata: serialize(&job2),
|
metadata: serialize(&task2),
|
||||||
task_type: "type2".to_string(),
|
task_type: "type2".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -331,10 +331,10 @@ mod executor_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn saves_error_for_failed_task() {
|
fn saves_error_for_failed_task() {
|
||||||
let job = FailedJob { number: 10 };
|
let task = FailedTask { number: 10 };
|
||||||
|
|
||||||
let new_task = NewTask {
|
let new_task = NewTask {
|
||||||
metadata: serialize(&job),
|
metadata: serialize(&task),
|
||||||
task_type: "common".to_string(),
|
task_type: "common".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
98
src/queue.rs
98
src/queue.rs
|
@ -77,19 +77,19 @@ impl Queue {
|
||||||
Self { connection }
|
Self { connection }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
|
pub fn push_task(&self, task: &dyn Runnable) -> Result<Task, Error> {
|
||||||
Self::push_task_query(&self.connection, job)
|
Self::push_task_query(&self.connection, task)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_task_query(connection: &PgConnection, job: &dyn Runnable) -> Result<Task, Error> {
|
pub fn push_task_query(connection: &PgConnection, task: &dyn Runnable) -> Result<Task, Error> {
|
||||||
let json_job = serde_json::to_value(job).unwrap();
|
let json_task = serde_json::to_value(task).unwrap();
|
||||||
|
|
||||||
match Self::find_task_by_metadata_query(connection, &json_job) {
|
match Self::find_task_by_metadata_query(connection, &json_task) {
|
||||||
Some(task) => Ok(task),
|
Some(task) => Ok(task),
|
||||||
None => {
|
None => {
|
||||||
let new_task = NewTask {
|
let new_task = NewTask {
|
||||||
metadata: json_job.clone(),
|
metadata: json_task.clone(),
|
||||||
task_type: job.task_type(),
|
task_type: task.task_type(),
|
||||||
};
|
};
|
||||||
Self::insert_query(connection, &new_task)
|
Self::insert_query(connection, &new_task)
|
||||||
}
|
}
|
||||||
|
@ -98,24 +98,24 @@ impl Queue {
|
||||||
|
|
||||||
pub fn push_periodic_task(
|
pub fn push_periodic_task(
|
||||||
&self,
|
&self,
|
||||||
job: &dyn Runnable,
|
task: &dyn Runnable,
|
||||||
period: i32,
|
period: i32,
|
||||||
) -> Result<PeriodicTask, Error> {
|
) -> Result<PeriodicTask, Error> {
|
||||||
Self::push_periodic_task_query(&self.connection, job, period)
|
Self::push_periodic_task_query(&self.connection, task, period)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_periodic_task_query(
|
pub fn push_periodic_task_query(
|
||||||
connection: &PgConnection,
|
connection: &PgConnection,
|
||||||
job: &dyn Runnable,
|
task: &dyn Runnable,
|
||||||
period: i32,
|
period: i32,
|
||||||
) -> Result<PeriodicTask, Error> {
|
) -> Result<PeriodicTask, Error> {
|
||||||
let json_job = serde_json::to_value(job).unwrap();
|
let json_task = serde_json::to_value(task).unwrap();
|
||||||
|
|
||||||
match Self::find_periodic_task_by_metadata_query(connection, &json_job) {
|
match Self::find_periodic_task_by_metadata_query(connection, &json_task) {
|
||||||
Some(task) => Ok(task),
|
Some(task) => Ok(task),
|
||||||
None => {
|
None => {
|
||||||
let new_task = NewPeriodicTask {
|
let new_task = NewPeriodicTask {
|
||||||
metadata: json_job,
|
metadata: json_task,
|
||||||
period_in_seconds: period,
|
period_in_seconds: period,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -126,8 +126,8 @@ impl Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue_task(job: &dyn Runnable) -> Result<Task, Error> {
|
pub fn enqueue_task(task: &dyn Runnable) -> Result<Task, Error> {
|
||||||
Self::new().push_task(job)
|
Self::new().push_task(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
|
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
|
||||||
|
@ -440,11 +440,11 @@ mod queue_tests {
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let timestamp1 = Utc::now() - Duration::hours(40);
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
let task1 = insert_job(serde_json::json!(true), timestamp1, &queue.connection);
|
let task1 = insert_task(serde_json::json!(true), timestamp1, &queue.connection);
|
||||||
|
|
||||||
let timestamp2 = Utc::now() - Duration::hours(20);
|
let timestamp2 = Utc::now() - Duration::hours(20);
|
||||||
|
|
||||||
insert_job(serde_json::json!(false), timestamp2, &queue.connection);
|
insert_task(serde_json::json!(false), timestamp2, &queue.connection);
|
||||||
|
|
||||||
let found_task = queue.fetch_task(&None).unwrap();
|
let found_task = queue.fetch_task(&None).unwrap();
|
||||||
|
|
||||||
|
@ -459,7 +459,7 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_new_job(&queue.connection);
|
let task = insert_new_task(&queue.connection);
|
||||||
|
|
||||||
let updated_task = queue.finish_task(&task).unwrap();
|
let updated_task = queue.finish_task(&task).unwrap();
|
||||||
|
|
||||||
|
@ -474,7 +474,7 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_new_job(&queue.connection);
|
let task = insert_new_task(&queue.connection);
|
||||||
let error = "Failed".to_string();
|
let error = "Failed".to_string();
|
||||||
|
|
||||||
let updated_task = queue.fail_task(&task, error.clone()).unwrap();
|
let updated_task = queue.fail_task(&task, error.clone()).unwrap();
|
||||||
|
@ -491,7 +491,7 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let _task = insert_new_job(&queue.connection);
|
let _task = insert_new_task(&queue.connection);
|
||||||
|
|
||||||
let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap();
|
let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap();
|
||||||
|
|
||||||
|
@ -519,8 +519,8 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let task = PepeTask { number: 10 };
|
||||||
let task = queue.push_task(&job).unwrap();
|
let task = queue.push_task(&task).unwrap();
|
||||||
|
|
||||||
let mut m = serde_json::value::Map::new();
|
let mut m = serde_json::value::Map::new();
|
||||||
m.insert(
|
m.insert(
|
||||||
|
@ -529,7 +529,7 @@ mod queue_tests {
|
||||||
);
|
);
|
||||||
m.insert(
|
m.insert(
|
||||||
"type".to_string(),
|
"type".to_string(),
|
||||||
serde_json::value::Value::String("Job".to_string()),
|
serde_json::value::Value::String("PepeTask".to_string()),
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(task.metadata, serde_json::value::Value::Object(m));
|
assert_eq!(task.metadata, serde_json::value::Value::Object(m));
|
||||||
|
@ -543,10 +543,10 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let task = PepeTask { number: 10 };
|
||||||
let task2 = queue.push_task(&job).unwrap();
|
let task2 = queue.push_task(&task).unwrap();
|
||||||
|
|
||||||
let task1 = queue.push_task(&job).unwrap();
|
let task1 = queue.push_task(&task).unwrap();
|
||||||
|
|
||||||
assert_eq!(task1.id, task2.id);
|
assert_eq!(task1.id, task2.id);
|
||||||
|
|
||||||
|
@ -559,8 +559,8 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let task = PepeTask { number: 10 };
|
||||||
let task = queue.push_periodic_task(&job, 60).unwrap();
|
let task = queue.push_periodic_task(&task, 60).unwrap();
|
||||||
|
|
||||||
assert_eq!(task.period_in_seconds, 60);
|
assert_eq!(task.period_in_seconds, 60);
|
||||||
assert!(queue.find_periodic_task_by_id(task.id).is_some());
|
assert!(queue.find_periodic_task_by_id(task.id).is_some());
|
||||||
|
@ -570,14 +570,14 @@ mod queue_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_periodic_task_returns_existing_job() {
|
fn push_periodic_task_returns_existing_task() {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let task = PepeTask { number: 10 };
|
||||||
let task1 = queue.push_periodic_task(&job, 60).unwrap();
|
let task1 = queue.push_periodic_task(&task, 60).unwrap();
|
||||||
|
|
||||||
let task2 = queue.push_periodic_task(&job, 60).unwrap();
|
let task2 = queue.push_periodic_task(&task, 60).unwrap();
|
||||||
|
|
||||||
assert_eq!(task1.id, task2.id);
|
assert_eq!(task1.id, task2.id);
|
||||||
|
|
||||||
|
@ -590,12 +590,12 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let task = PepeTask { number: 10 };
|
||||||
let task = queue.push_periodic_task(&job, 60).unwrap();
|
let task = queue.push_periodic_task(&task, 60).unwrap();
|
||||||
|
|
||||||
let schedule_in_future = Utc::now() + Duration::hours(100);
|
let schedule_in_future = Utc::now() + Duration::hours(100);
|
||||||
|
|
||||||
insert_periodic_job(
|
insert_periodic_task(
|
||||||
serde_json::json!(true),
|
serde_json::json!(true),
|
||||||
schedule_in_future,
|
schedule_in_future,
|
||||||
100,
|
100,
|
||||||
|
@ -617,7 +617,7 @@ mod queue_tests {
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task =
|
let task =
|
||||||
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
|
|
||||||
let updated_task = queue.schedule_next_task_execution(&task).unwrap();
|
let updated_task = queue.schedule_next_task_execution(&task).unwrap();
|
||||||
|
|
||||||
|
@ -640,7 +640,7 @@ mod queue_tests {
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task =
|
let task =
|
||||||
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
|
|
||||||
let result = queue.remove_all_periodic_tasks().unwrap();
|
let result = queue.remove_all_periodic_tasks().unwrap();
|
||||||
|
|
||||||
|
@ -657,7 +657,7 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_job(serde_json::json!(true), Utc::now(), &queue.connection);
|
let task = insert_task(serde_json::json!(true), Utc::now(), &queue.connection);
|
||||||
let result = queue.remove_all_tasks().unwrap();
|
let result = queue.remove_all_tasks().unwrap();
|
||||||
|
|
||||||
assert_eq!(1, result);
|
assert_eq!(1, result);
|
||||||
|
@ -675,7 +675,7 @@ mod queue_tests {
|
||||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let schedule_in_future = Utc::now() + Duration::hours(100);
|
let schedule_in_future = Utc::now() + Duration::hours(100);
|
||||||
|
|
||||||
insert_periodic_job(
|
insert_periodic_task(
|
||||||
serde_json::json!(true),
|
serde_json::json!(true),
|
||||||
schedule_in_future,
|
schedule_in_future,
|
||||||
100,
|
100,
|
||||||
|
@ -683,7 +683,7 @@ mod queue_tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let task =
|
let task =
|
||||||
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
|
|
||||||
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
||||||
|
|
||||||
|
@ -762,8 +762,8 @@ mod queue_tests {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
let timestamp1 = Utc::now() - Duration::hours(40);
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
let task1 = insert_job(
|
let task1 = insert_task(
|
||||||
serde_json::json!(Job { number: 12 }),
|
serde_json::json!(PepeTask { number: 12 }),
|
||||||
timestamp1,
|
timestamp1,
|
||||||
&queue.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
@ -772,8 +772,8 @@ mod queue_tests {
|
||||||
|
|
||||||
let timestamp2 = Utc::now() - Duration::hours(20);
|
let timestamp2 = Utc::now() - Duration::hours(20);
|
||||||
|
|
||||||
let task2 = insert_job(
|
let task2 = insert_task(
|
||||||
serde_json::json!(Job { number: 11 }),
|
serde_json::json!(PepeTask { number: 11 }),
|
||||||
timestamp2,
|
timestamp2,
|
||||||
&queue.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
@ -808,12 +808,12 @@ mod queue_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct Job {
|
struct PepeTask {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for Job {
|
impl Runnable for PepeTask {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> {
|
||||||
println!("the number is {}", self.number);
|
println!("the number is {}", self.number);
|
||||||
|
|
||||||
|
@ -821,7 +821,7 @@ mod queue_tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_job(
|
fn insert_task(
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
timestamp: DateTime<Utc>,
|
timestamp: DateTime<Utc>,
|
||||||
connection: &PgConnection,
|
connection: &PgConnection,
|
||||||
|
@ -835,7 +835,7 @@ mod queue_tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_periodic_job(
|
fn insert_periodic_task(
|
||||||
metadata: serde_json::Value,
|
metadata: serde_json::Value,
|
||||||
timestamp: DateTime<Utc>,
|
timestamp: DateTime<Utc>,
|
||||||
period_in_seconds: i32,
|
period_in_seconds: i32,
|
||||||
|
@ -851,7 +851,7 @@ mod queue_tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_new_job(connection: &PgConnection) -> Task {
|
fn insert_new_task(connection: &PgConnection) -> Task {
|
||||||
diesel::insert_into(fang_tasks::table)
|
diesel::insert_into(fang_tasks::table)
|
||||||
.values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)])
|
.values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)])
|
||||||
.get_result::<Task>(connection)
|
.get_result::<Task>(connection)
|
||||||
|
|
|
@ -77,7 +77,7 @@ impl Scheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod job_scheduler_tests {
|
mod task_scheduler_tests {
|
||||||
use super::Scheduler;
|
use super::Scheduler;
|
||||||
use crate::executor::Error;
|
use crate::executor::Error;
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
|
@ -92,10 +92,10 @@ mod job_scheduler_tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct ScheduledJob {}
|
struct ScheduledTask {}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for ScheduledJob {
|
impl Runnable for ScheduledTask {
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -107,10 +107,10 @@ mod job_scheduler_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn schedules_jobs() {
|
fn schedules_tasks() {
|
||||||
let queue = Queue::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
queue.push_periodic_task(&ScheduledJob {}, 10).unwrap();
|
queue.push_periodic_task(&ScheduledTask {}, 10).unwrap();
|
||||||
Scheduler::start(1, 2);
|
Scheduler::start(1, 2);
|
||||||
|
|
||||||
let sleep_duration = Duration::from_secs(15);
|
let sleep_duration = Duration::from_secs(15);
|
||||||
|
|
|
@ -219,7 +219,7 @@ impl Drop for WorkerThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod job_pool_tests {
|
mod task_pool_tests {
|
||||||
use super::WorkerParams;
|
use super::WorkerParams;
|
||||||
use super::WorkerPool;
|
use super::WorkerPool;
|
||||||
use crate::executor::Error;
|
use crate::executor::Error;
|
||||||
|
@ -236,12 +236,12 @@ mod job_pool_tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct MyJob {
|
struct MyTask {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
pub current_thread_name: String,
|
pub current_thread_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MyJob {
|
impl MyTask {
|
||||||
pub fn new(number: u16) -> Self {
|
pub fn new(number: u16) -> Self {
|
||||||
let handle = thread::current();
|
let handle = thread::current();
|
||||||
let current_thread_name = handle.name().unwrap().to_string();
|
let current_thread_name = handle.name().unwrap().to_string();
|
||||||
|
@ -254,13 +254,13 @@ mod job_pool_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for MyJob {
|
impl Runnable for MyTask {
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||||
thread::sleep(Duration::from_secs(3));
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
let new_job = MyJob::new(self.number + 1);
|
let new_task = MyTask::new(self.number + 1);
|
||||||
|
|
||||||
Queue::push_task_query(connection, &new_job).unwrap();
|
Queue::push_task_query(connection, &new_task).unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -271,12 +271,12 @@ mod job_pool_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct ShutdownJob {
|
struct ShutdownTask {
|
||||||
pub number: u16,
|
pub number: u16,
|
||||||
pub current_thread_name: String,
|
pub current_thread_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShutdownJob {
|
impl ShutdownTask {
|
||||||
pub fn new(number: u16) -> Self {
|
pub fn new(number: u16) -> Self {
|
||||||
let handle = thread::current();
|
let handle = thread::current();
|
||||||
let current_thread_name = handle.name().unwrap().to_string();
|
let current_thread_name = handle.name().unwrap().to_string();
|
||||||
|
@ -289,13 +289,13 @@ mod job_pool_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for ShutdownJob {
|
impl Runnable for ShutdownTask {
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||||
thread::sleep(Duration::from_secs(3));
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
let new_job = MyJob::new(self.number + 1);
|
let new_task = MyTask::new(self.number + 1);
|
||||||
|
|
||||||
Queue::push_task_query(connection, &new_job).unwrap();
|
Queue::push_task_query(connection, &new_task).unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -320,14 +320,14 @@ mod job_pool_tests {
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
let mut worker_params = WorkerParams::new();
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||||
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
|
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
||||||
|
|
||||||
queue.push_task(&ShutdownJob::new(100)).unwrap();
|
queue.push_task(&ShutdownTask::new(100)).unwrap();
|
||||||
queue.push_task(&ShutdownJob::new(200)).unwrap();
|
queue.push_task(&ShutdownTask::new(200)).unwrap();
|
||||||
|
|
||||||
job_pool.start().unwrap();
|
task_pool.start().unwrap();
|
||||||
thread::sleep(Duration::from_secs(1));
|
thread::sleep(Duration::from_secs(1));
|
||||||
job_pool.shutdown().unwrap();
|
task_pool.shutdown().unwrap();
|
||||||
thread::sleep(Duration::from_secs(5));
|
thread::sleep(Duration::from_secs(5));
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
|
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
|
||||||
|
@ -351,12 +351,12 @@ mod job_pool_tests {
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
let mut worker_params = WorkerParams::new();
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||||
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
|
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
||||||
|
|
||||||
queue.push_task(&MyJob::new(100)).unwrap();
|
queue.push_task(&MyTask::new(100)).unwrap();
|
||||||
queue.push_task(&MyJob::new(200)).unwrap();
|
queue.push_task(&MyTask::new(200)).unwrap();
|
||||||
|
|
||||||
job_pool.start().unwrap();
|
task_pool.start().unwrap();
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(100));
|
thread::sleep(Duration::from_secs(100));
|
||||||
|
|
||||||
|
@ -364,19 +364,19 @@ mod job_pool_tests {
|
||||||
|
|
||||||
assert!(tasks.len() > 40);
|
assert!(tasks.len() > 40);
|
||||||
|
|
||||||
let test_worker1_jobs = tasks.clone().into_iter().filter(|job| {
|
let test_worker1_tasks = tasks.clone().into_iter().filter(|task| {
|
||||||
serde_json::to_string(&job.metadata)
|
serde_json::to_string(&task.metadata)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.contains("worker_1")
|
.contains("worker_1")
|
||||||
});
|
});
|
||||||
|
|
||||||
let test_worker2_jobs = tasks.into_iter().filter(|job| {
|
let test_worker2_tasks = tasks.into_iter().filter(|task| {
|
||||||
serde_json::to_string(&job.metadata)
|
serde_json::to_string(&task.metadata)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.contains("worker_2")
|
.contains("worker_2")
|
||||||
});
|
});
|
||||||
|
|
||||||
assert!(test_worker1_jobs.count() > 20);
|
assert!(test_worker1_tasks.count() > 20);
|
||||||
assert!(test_worker2_jobs.count() > 20);
|
assert!(test_worker2_tasks.count() > 20);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue