pass PgConnection into run function (#8)
* pass PgConnection into run function Changes: - rename Postgres into Queue - pass PgConnection into run function * remove all tasks of specific type * add pgconnection * pass connection to every queue function * add connection pool * reuse connection in test * update readme
This commit is contained in:
parent
cb02a03fea
commit
824e32f17b
7 changed files with 402 additions and 280 deletions
|
@ -11,7 +11,7 @@ readme = "README.md"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
diesel = { version = "1.4.6", features = ["postgres", "serde_json", "chrono", "uuidv07"] }
|
diesel = { version = "1.4.6", features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"] }
|
||||||
diesel-derive-enum = { version = "1", features = ["postgres"] }
|
diesel-derive-enum = { version = "1", features = ["postgres"] }
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
|
|
|
@ -6,6 +6,9 @@
|
||||||
|
|
||||||
Background job processing library for Rust. It uses Postgres DB as a task queue.
|
Background job processing library for Rust. It uses Postgres DB as a task queue.
|
||||||
|
|
||||||
|
Note that the README follows the master branch, to see instructions for the latest published version, check [crates.io](https://crates.io/crates/fang).
|
||||||
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
1. Add this to your Cargo.toml
|
1. Add this to your Cargo.toml
|
||||||
|
|
149
src/executor.rs
149
src/executor.rs
|
@ -1,12 +1,12 @@
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use crate::postgres::Task;
|
use crate::queue::Task;
|
||||||
use std::panic;
|
use diesel::pg::PgConnection;
|
||||||
use std::panic::RefUnwindSafe;
|
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct Executor {
|
pub struct Executor {
|
||||||
pub storage: Postgres,
|
pub pooled_connection: PooledConnection<ConnectionManager<PgConnection>>,
|
||||||
pub task_type: Option<String>,
|
pub task_type: Option<String>,
|
||||||
pub sleep_params: SleepParams,
|
pub sleep_params: SleepParams,
|
||||||
pub retention_mode: RetentionMode,
|
pub retention_mode: RetentionMode,
|
||||||
|
@ -58,11 +58,8 @@ pub struct Error {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[typetag::serde(tag = "type")]
|
#[typetag::serde(tag = "type")]
|
||||||
pub trait Runnable
|
pub trait Runnable {
|
||||||
where
|
fn run(&self, connection: &PgConnection) -> Result<(), Error>;
|
||||||
Self: RefUnwindSafe,
|
|
||||||
{
|
|
||||||
fn run(&self) -> Result<(), Error>;
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
fn task_type(&self) -> String {
|
||||||
"common".to_string()
|
"common".to_string()
|
||||||
|
@ -70,9 +67,9 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Executor {
|
impl Executor {
|
||||||
pub fn new(storage: Postgres) -> Self {
|
pub fn new(pooled_connection: PooledConnection<ConnectionManager<PgConnection>>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
storage,
|
pooled_connection,
|
||||||
sleep_params: SleepParams::default(),
|
sleep_params: SleepParams::default(),
|
||||||
retention_mode: RetentionMode::RemoveFinished,
|
retention_mode: RetentionMode::RemoveFinished,
|
||||||
task_type: None,
|
task_type: None,
|
||||||
|
@ -99,7 +96,7 @@ impl Executor {
|
||||||
|
|
||||||
pub fn run_tasks(&mut self) {
|
pub fn run_tasks(&mut self) {
|
||||||
loop {
|
loop {
|
||||||
match self.storage.fetch_and_touch(&self.task_type.clone()) {
|
match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) {
|
||||||
Ok(Some(task)) => {
|
Ok(Some(task)) => {
|
||||||
self.maybe_reset_sleep_period();
|
self.maybe_reset_sleep_period();
|
||||||
self.run(task);
|
self.run(task);
|
||||||
|
@ -129,19 +126,11 @@ impl Executor {
|
||||||
|
|
||||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||||
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
let task_result = panic::catch_unwind(|| actual_task.run());
|
let task_result = actual_task.run(&self.pooled_connection);
|
||||||
|
|
||||||
match task_result {
|
match task_result {
|
||||||
Ok(result) => match result {
|
Ok(()) => Ok(task),
|
||||||
Ok(()) => Ok(task),
|
Err(error) => Err((task, error.description)),
|
||||||
Err(error) => Err((task, error.description)),
|
|
||||||
},
|
|
||||||
|
|
||||||
Err(error) => {
|
|
||||||
let message = format!("panicked during task execution {:?}", error);
|
|
||||||
|
|
||||||
Err((task, message))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,22 +138,26 @@ impl Executor {
|
||||||
match self.retention_mode {
|
match self.retention_mode {
|
||||||
RetentionMode::KeepAll => {
|
RetentionMode::KeepAll => {
|
||||||
match result {
|
match result {
|
||||||
Ok(task) => self.storage.finish_task(&task).unwrap(),
|
Ok(task) => Queue::finish_task_query(&self.pooled_connection, &task).unwrap(),
|
||||||
Err((task, error)) => self.storage.fail_task(&task, error).unwrap(),
|
Err((task, error)) => {
|
||||||
|
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
RetentionMode::RemoveAll => {
|
RetentionMode::RemoveAll => {
|
||||||
match result {
|
match result {
|
||||||
Ok(task) => self.storage.remove_task(task.id).unwrap(),
|
Ok(task) => Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(),
|
||||||
Err((task, _error)) => self.storage.remove_task(task.id).unwrap(),
|
Err((task, _error)) => {
|
||||||
|
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
RetentionMode::RemoveFinished => match result {
|
RetentionMode::RemoveFinished => match result {
|
||||||
Ok(task) => {
|
Ok(task) => {
|
||||||
self.storage.remove_task(task.id).unwrap();
|
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap();
|
||||||
}
|
}
|
||||||
Err((task, error)) => {
|
Err((task, error)) => {
|
||||||
self.storage.fail_task(&task, error).unwrap();
|
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -177,12 +170,14 @@ mod executor_tests {
|
||||||
use super::Executor;
|
use super::Executor;
|
||||||
use super::RetentionMode;
|
use super::RetentionMode;
|
||||||
use super::Runnable;
|
use super::Runnable;
|
||||||
use crate::postgres::NewTask;
|
use crate::queue::NewTask;
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use crate::schema::FangTaskState;
|
use crate::schema::FangTaskState;
|
||||||
use crate::typetag;
|
use crate::typetag;
|
||||||
use crate::{Deserialize, Serialize};
|
use crate::{Deserialize, Serialize};
|
||||||
use diesel::connection::Connection;
|
use diesel::connection::Connection;
|
||||||
|
use diesel::pg::PgConnection;
|
||||||
|
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct ExecutorJobTest {
|
struct ExecutorJobTest {
|
||||||
|
@ -191,7 +186,7 @@ mod executor_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for ExecutorJobTest {
|
impl Runnable for ExecutorJobTest {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
println!("the number is {}", self.number);
|
println!("the number is {}", self.number);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -205,7 +200,7 @@ mod executor_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for FailedJob {
|
impl Runnable for FailedJob {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
let message = format!("the number is {}", self.number);
|
let message = format!("the number is {}", self.number);
|
||||||
|
|
||||||
Err(Error {
|
Err(Error {
|
||||||
|
@ -214,26 +209,12 @@ mod executor_tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct PanicJob {}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for PanicJob {
|
|
||||||
fn run(&self) -> Result<(), Error> {
|
|
||||||
if true {
|
|
||||||
panic!("panic!");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct JobType1 {}
|
struct JobType1 {}
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for JobType1 {
|
impl Runnable for JobType1 {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,7 +228,7 @@ mod executor_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for JobType2 {
|
impl Runnable for JobType2 {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,20 +250,20 @@ mod executor_tests {
|
||||||
task_type: "common".to_string(),
|
task_type: "common".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut executor = Executor::new(Postgres::new());
|
let mut executor = Executor::new(pooled_connection());
|
||||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
executor.set_retention_mode(RetentionMode::KeepAll);
|
||||||
|
|
||||||
executor
|
executor
|
||||||
.storage
|
.pooled_connection
|
||||||
.connection
|
|
||||||
.test_transaction::<(), Error, _>(|| {
|
.test_transaction::<(), Error, _>(|| {
|
||||||
let task = executor.storage.insert(&new_task).unwrap();
|
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
executor.run(task.clone());
|
executor.run(task.clone());
|
||||||
|
|
||||||
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
|
let found_task =
|
||||||
|
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Finished, found_task.state);
|
assert_eq!(FangTaskState::Finished, found_task.state);
|
||||||
|
|
||||||
|
@ -306,17 +287,16 @@ mod executor_tests {
|
||||||
task_type: "type2".to_string(),
|
task_type: "type2".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let executor = Executor::new(Postgres::new());
|
let executor = Executor::new(pooled_connection());
|
||||||
|
|
||||||
let task1 = executor.storage.insert(&new_task1).unwrap();
|
let task1 = Queue::insert_query(&executor.pooled_connection, &new_task1).unwrap();
|
||||||
let task2 = executor.storage.insert(&new_task2).unwrap();
|
let task2 = Queue::insert_query(&executor.pooled_connection, &new_task2).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task1.state);
|
assert_eq!(FangTaskState::New, task1.state);
|
||||||
assert_eq!(FangTaskState::New, task2.state);
|
assert_eq!(FangTaskState::New, task2.state);
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let postgres = Postgres::new();
|
let mut executor = Executor::new(pooled_connection());
|
||||||
let mut executor = Executor::new(postgres);
|
|
||||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
executor.set_retention_mode(RetentionMode::KeepAll);
|
||||||
executor.set_task_type("type1".to_string());
|
executor.set_task_type("type1".to_string());
|
||||||
|
|
||||||
|
@ -325,10 +305,12 @@ mod executor_tests {
|
||||||
|
|
||||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||||
|
|
||||||
let found_task1 = executor.storage.find_task_by_id(task1.id).unwrap();
|
let found_task1 =
|
||||||
|
Queue::find_task_by_id_query(&executor.pooled_connection, task1.id).unwrap();
|
||||||
assert_eq!(FangTaskState::Finished, found_task1.state);
|
assert_eq!(FangTaskState::Finished, found_task1.state);
|
||||||
|
|
||||||
let found_task2 = executor.storage.find_task_by_id(task2.id).unwrap();
|
let found_task2 =
|
||||||
|
Queue::find_task_by_id_query(&executor.pooled_connection, task2.id).unwrap();
|
||||||
assert_eq!(FangTaskState::New, found_task2.state);
|
assert_eq!(FangTaskState::New, found_task2.state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,19 +323,19 @@ mod executor_tests {
|
||||||
task_type: "common".to_string(),
|
task_type: "common".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let executor = Executor::new(Postgres::new());
|
let executor = Executor::new(pooled_connection());
|
||||||
|
|
||||||
executor
|
executor
|
||||||
.storage
|
.pooled_connection
|
||||||
.connection
|
|
||||||
.test_transaction::<(), Error, _>(|| {
|
.test_transaction::<(), Error, _>(|| {
|
||||||
let task = executor.storage.insert(&new_task).unwrap();
|
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
executor.run(task.clone());
|
executor.run(task.clone());
|
||||||
|
|
||||||
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
|
let found_task =
|
||||||
|
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
assert_eq!(FangTaskState::Failed, found_task.state);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -365,36 +347,7 @@ mod executor_tests {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn pooled_connection() -> PooledConnection<ConnectionManager<PgConnection>> {
|
||||||
fn recovers_from_panics() {
|
Queue::connection_pool(5).get().unwrap()
|
||||||
let job = PanicJob {};
|
|
||||||
|
|
||||||
let new_task = NewTask {
|
|
||||||
metadata: serialize(&job),
|
|
||||||
task_type: "common".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let executor = Executor::new(Postgres::new());
|
|
||||||
|
|
||||||
executor
|
|
||||||
.storage
|
|
||||||
.connection
|
|
||||||
.test_transaction::<(), Error, _>(|| {
|
|
||||||
let task = executor.storage.insert(&new_task).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
|
||||||
|
|
||||||
executor.run(task.clone());
|
|
||||||
|
|
||||||
let found_task = executor.storage.find_task_by_id(task.id).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
|
||||||
assert!(found_task
|
|
||||||
.error_message
|
|
||||||
.unwrap()
|
|
||||||
.contains("panicked during task execution Any"));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
#![allow(clippy::nonstandard_macro_braces)]
|
#![allow(clippy::nonstandard_macro_braces)]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate diesel;
|
pub extern crate diesel;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
@ -9,15 +9,17 @@ extern crate log;
|
||||||
mod schema;
|
mod schema;
|
||||||
|
|
||||||
pub mod executor;
|
pub mod executor;
|
||||||
pub mod postgres;
|
pub mod queue;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod worker_pool;
|
pub mod worker_pool;
|
||||||
|
|
||||||
pub use executor::*;
|
pub use executor::*;
|
||||||
pub use postgres::*;
|
pub use queue::*;
|
||||||
pub use scheduler::*;
|
pub use scheduler::*;
|
||||||
pub use worker_pool::*;
|
pub use worker_pool::*;
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub use diesel::pg::PgConnection;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use serde::{Deserialize, Serialize};
|
pub use serde::{Deserialize, Serialize};
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
|
|
@ -7,6 +7,7 @@ use chrono::Duration;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use diesel::pg::PgConnection;
|
use diesel::pg::PgConnection;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
use diesel::r2d2;
|
||||||
use diesel::result::Error;
|
use diesel::result::Error;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use std::env;
|
use std::env;
|
||||||
|
@ -49,17 +50,17 @@ pub struct NewPeriodicTask {
|
||||||
pub period_in_seconds: i32,
|
pub period_in_seconds: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Postgres {
|
pub struct Queue {
|
||||||
pub connection: PgConnection,
|
pub connection: PgConnection,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Postgres {
|
impl Default for Queue {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::new()
|
Self::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Postgres {
|
impl Queue {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let connection = Self::pg_connection(None);
|
let connection = Self::pg_connection(None);
|
||||||
|
|
||||||
|
@ -72,17 +73,25 @@ impl Postgres {
|
||||||
Self { connection }
|
Self { connection }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_with_connection(connection: PgConnection) -> Self {
|
||||||
|
Self { connection }
|
||||||
|
}
|
||||||
|
|
||||||
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
|
pub fn push_task(&self, job: &dyn Runnable) -> Result<Task, Error> {
|
||||||
|
Self::push_task_query(&self.connection, job)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_task_query(connection: &PgConnection, job: &dyn Runnable) -> Result<Task, Error> {
|
||||||
let json_job = serde_json::to_value(job).unwrap();
|
let json_job = serde_json::to_value(job).unwrap();
|
||||||
|
|
||||||
match self.find_task_by_metadata(&json_job) {
|
match Self::find_task_by_metadata_query(connection, &json_job) {
|
||||||
Some(task) => Ok(task),
|
Some(task) => Ok(task),
|
||||||
None => {
|
None => {
|
||||||
let new_task = NewTask {
|
let new_task = NewTask {
|
||||||
metadata: json_job.clone(),
|
metadata: json_job.clone(),
|
||||||
task_type: job.task_type(),
|
task_type: job.task_type(),
|
||||||
};
|
};
|
||||||
self.insert(&new_task)
|
Self::insert_query(connection, &new_task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,10 +100,18 @@ impl Postgres {
|
||||||
&self,
|
&self,
|
||||||
job: &dyn Runnable,
|
job: &dyn Runnable,
|
||||||
period: i32,
|
period: i32,
|
||||||
|
) -> Result<PeriodicTask, Error> {
|
||||||
|
Self::push_periodic_task_query(&self.connection, job, period)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_periodic_task_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
job: &dyn Runnable,
|
||||||
|
period: i32,
|
||||||
) -> Result<PeriodicTask, Error> {
|
) -> Result<PeriodicTask, Error> {
|
||||||
let json_job = serde_json::to_value(job).unwrap();
|
let json_job = serde_json::to_value(job).unwrap();
|
||||||
|
|
||||||
match self.find_periodic_task_by_metadata(&json_job) {
|
match Self::find_periodic_task_by_metadata_query(connection, &json_job) {
|
||||||
Some(task) => Ok(task),
|
Some(task) => Ok(task),
|
||||||
None => {
|
None => {
|
||||||
let new_task = NewPeriodicTask {
|
let new_task = NewPeriodicTask {
|
||||||
|
@ -104,7 +121,7 @@ impl Postgres {
|
||||||
|
|
||||||
diesel::insert_into(fang_periodic_tasks::table)
|
diesel::insert_into(fang_periodic_tasks::table)
|
||||||
.values(new_task)
|
.values(new_task)
|
||||||
.get_result::<PeriodicTask>(&self.connection)
|
.get_result::<PeriodicTask>(connection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,27 +131,42 @@ impl Postgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
|
pub fn insert(&self, params: &NewTask) -> Result<Task, Error> {
|
||||||
|
Self::insert_query(&self.connection, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_query(connection: &PgConnection, params: &NewTask) -> Result<Task, Error> {
|
||||||
diesel::insert_into(fang_tasks::table)
|
diesel::insert_into(fang_tasks::table)
|
||||||
.values(params)
|
.values(params)
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_task(&self, task_type: &Option<String>) -> Option<Task> {
|
pub fn fetch_task(&self, task_type: &Option<String>) -> Option<Task> {
|
||||||
|
Self::fetch_task_query(&self.connection, task_type)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fetch_task_query(connection: &PgConnection, task_type: &Option<String>) -> Option<Task> {
|
||||||
match task_type {
|
match task_type {
|
||||||
None => self.fetch_any_task(),
|
None => Self::fetch_any_task_query(connection),
|
||||||
Some(task_type_str) => self.fetch_task_of_type(task_type_str),
|
Some(task_type_str) => Self::fetch_task_of_type_query(connection, task_type_str),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_and_touch(&self, task_type: &Option<String>) -> Result<Option<Task>, Error> {
|
pub fn fetch_and_touch(&self, task_type: &Option<String>) -> Result<Option<Task>, Error> {
|
||||||
self.connection.transaction::<Option<Task>, Error, _>(|| {
|
Self::fetch_and_touch_query(&self.connection, task_type)
|
||||||
let found_task = self.fetch_task(task_type);
|
}
|
||||||
|
|
||||||
|
pub fn fetch_and_touch_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
task_type: &Option<String>,
|
||||||
|
) -> Result<Option<Task>, Error> {
|
||||||
|
connection.transaction::<Option<Task>, Error, _>(|| {
|
||||||
|
let found_task = Self::fetch_task_query(connection, task_type);
|
||||||
|
|
||||||
if found_task.is_none() {
|
if found_task.is_none() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.start_processing_task(&found_task.unwrap()) {
|
match Self::start_processing_task_query(connection, &found_task.unwrap()) {
|
||||||
Ok(updated_task) => Ok(Some(updated_task)),
|
Ok(updated_task) => Ok(Some(updated_task)),
|
||||||
Err(err) => Err(err),
|
Err(err) => Err(err),
|
||||||
}
|
}
|
||||||
|
@ -142,20 +174,38 @@ impl Postgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
|
pub fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
|
||||||
|
Self::find_task_by_id_query(&self.connection, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_task_by_id_query(connection: &PgConnection, id: Uuid) -> Option<Task> {
|
||||||
fang_tasks::table
|
fang_tasks::table
|
||||||
.filter(fang_tasks::id.eq(id))
|
.filter(fang_tasks::id.eq(id))
|
||||||
.first::<Task>(&self.connection)
|
.first::<Task>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option<PeriodicTask> {
|
pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option<PeriodicTask> {
|
||||||
|
Self::find_periodic_task_by_id_query(&self.connection, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_periodic_task_by_id_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
id: Uuid,
|
||||||
|
) -> Option<PeriodicTask> {
|
||||||
fang_periodic_tasks::table
|
fang_periodic_tasks::table
|
||||||
.filter(fang_periodic_tasks::id.eq(id))
|
.filter(fang_periodic_tasks::id.eq(id))
|
||||||
.first::<PeriodicTask>(&self.connection)
|
.first::<PeriodicTask>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_periodic_tasks(&self, error_margin_seconds: i64) -> Option<Vec<PeriodicTask>> {
|
pub fn fetch_periodic_tasks(&self, error_margin_seconds: i64) -> Option<Vec<PeriodicTask>> {
|
||||||
|
Self::fetch_periodic_tasks_query(&self.connection, error_margin_seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fetch_periodic_tasks_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
error_margin_seconds: i64,
|
||||||
|
) -> Option<Vec<PeriodicTask>> {
|
||||||
let current_time = Self::current_time();
|
let current_time = Self::current_time();
|
||||||
|
|
||||||
let low_limit = current_time - Duration::seconds(error_margin_seconds);
|
let low_limit = current_time - Duration::seconds(error_margin_seconds);
|
||||||
|
@ -168,7 +218,7 @@ impl Postgres {
|
||||||
.and(fang_periodic_tasks::scheduled_at.lt(high_limit)),
|
.and(fang_periodic_tasks::scheduled_at.lt(high_limit)),
|
||||||
)
|
)
|
||||||
.or_filter(fang_periodic_tasks::scheduled_at.is_null())
|
.or_filter(fang_periodic_tasks::scheduled_at.is_null())
|
||||||
.load::<PeriodicTask>(&self.connection)
|
.load::<PeriodicTask>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,45 +235,102 @@ impl Postgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_all_tasks(&self) -> Result<usize, Error> {
|
pub fn remove_all_tasks(&self) -> Result<usize, Error> {
|
||||||
diesel::delete(fang_tasks::table).execute(&self.connection)
|
Self::remove_all_tasks_query(&self.connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_all_tasks_query(connection: &PgConnection) -> Result<usize, Error> {
|
||||||
|
diesel::delete(fang_tasks::table).execute(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, Error> {
|
||||||
|
Self::remove_tasks_of_type_query(&self.connection, task_type)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_tasks_of_type_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
task_type: &str,
|
||||||
|
) -> Result<usize, Error> {
|
||||||
|
let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type));
|
||||||
|
|
||||||
|
diesel::delete(query).execute(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_all_periodic_tasks(&self) -> Result<usize, Error> {
|
pub fn remove_all_periodic_tasks(&self) -> Result<usize, Error> {
|
||||||
diesel::delete(fang_periodic_tasks::table).execute(&self.connection)
|
Self::remove_all_periodic_tasks_query(&self.connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_all_periodic_tasks_query(connection: &PgConnection) -> Result<usize, Error> {
|
||||||
|
diesel::delete(fang_periodic_tasks::table).execute(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_task(&self, id: Uuid) -> Result<usize, Error> {
|
pub fn remove_task(&self, id: Uuid) -> Result<usize, Error> {
|
||||||
|
Self::remove_task_query(&self.connection, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_task_query(connection: &PgConnection, id: Uuid) -> Result<usize, Error> {
|
||||||
let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
|
let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
|
||||||
|
|
||||||
diesel::delete(query).execute(&self.connection)
|
diesel::delete(query).execute(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish_task(&self, task: &Task) -> Result<Task, Error> {
|
pub fn finish_task(&self, task: &Task) -> Result<Task, Error> {
|
||||||
|
Self::finish_task_query(&self.connection, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finish_task_query(connection: &PgConnection, task: &Task) -> Result<Task, Error> {
|
||||||
diesel::update(task)
|
diesel::update(task)
|
||||||
.set((
|
.set((
|
||||||
fang_tasks::state.eq(FangTaskState::Finished),
|
fang_tasks::state.eq(FangTaskState::Finished),
|
||||||
fang_tasks::updated_at.eq(Self::current_time()),
|
fang_tasks::updated_at.eq(Self::current_time()),
|
||||||
))
|
))
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_processing_task(&self, task: &Task) -> Result<Task, Error> {
|
pub fn start_processing_task(&self, task: &Task) -> Result<Task, Error> {
|
||||||
|
Self::start_processing_task_query(&self.connection, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_processing_task_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
task: &Task,
|
||||||
|
) -> Result<Task, Error> {
|
||||||
diesel::update(task)
|
diesel::update(task)
|
||||||
.set((
|
.set((
|
||||||
fang_tasks::state.eq(FangTaskState::InProgress),
|
fang_tasks::state.eq(FangTaskState::InProgress),
|
||||||
fang_tasks::updated_at.eq(Self::current_time()),
|
fang_tasks::updated_at.eq(Self::current_time()),
|
||||||
))
|
))
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fail_task(&self, task: &Task, error: String) -> Result<Task, Error> {
|
pub fn fail_task(&self, task: &Task, error: String) -> Result<Task, Error> {
|
||||||
|
Self::fail_task_query(&self.connection, task, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fail_task_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
task: &Task,
|
||||||
|
error: String,
|
||||||
|
) -> Result<Task, Error> {
|
||||||
diesel::update(task)
|
diesel::update(task)
|
||||||
.set((
|
.set((
|
||||||
fang_tasks::state.eq(FangTaskState::Failed),
|
fang_tasks::state.eq(FangTaskState::Failed),
|
||||||
fang_tasks::error_message.eq(error),
|
fang_tasks::error_message.eq(error),
|
||||||
fang_tasks::updated_at.eq(Self::current_time()),
|
fang_tasks::updated_at.eq(Self::current_time()),
|
||||||
))
|
))
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||||
|
|
||||||
|
let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||||
|
|
||||||
|
r2d2::Pool::builder()
|
||||||
|
.max_size(pool_size)
|
||||||
|
.build(manager)
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_time() -> DateTime<Utc> {
|
fn current_time() -> DateTime<Utc> {
|
||||||
|
@ -241,18 +348,18 @@ impl Postgres {
|
||||||
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url))
|
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_any_task(&self) -> Option<Task> {
|
fn fetch_any_task_query(connection: &PgConnection) -> Option<Task> {
|
||||||
fang_tasks::table
|
fang_tasks::table
|
||||||
.order(fang_tasks::created_at.asc())
|
.order(fang_tasks::created_at.asc())
|
||||||
.limit(1)
|
.limit(1)
|
||||||
.filter(fang_tasks::state.eq(FangTaskState::New))
|
.filter(fang_tasks::state.eq(FangTaskState::New))
|
||||||
.for_update()
|
.for_update()
|
||||||
.skip_locked()
|
.skip_locked()
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_task_of_type(&self, task_type: &str) -> Option<Task> {
|
fn fetch_task_of_type_query(connection: &PgConnection, task_type: &str) -> Option<Task> {
|
||||||
fang_tasks::table
|
fang_tasks::table
|
||||||
.order(fang_tasks::created_at.asc())
|
.order(fang_tasks::created_at.asc())
|
||||||
.limit(1)
|
.limit(1)
|
||||||
|
@ -260,18 +367,24 @@ impl Postgres {
|
||||||
.filter(fang_tasks::task_type.eq(task_type))
|
.filter(fang_tasks::task_type.eq(task_type))
|
||||||
.for_update()
|
.for_update()
|
||||||
.skip_locked()
|
.skip_locked()
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_periodic_task_by_metadata(&self, metadata: &serde_json::Value) -> Option<PeriodicTask> {
|
fn find_periodic_task_by_metadata_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
metadata: &serde_json::Value,
|
||||||
|
) -> Option<PeriodicTask> {
|
||||||
fang_periodic_tasks::table
|
fang_periodic_tasks::table
|
||||||
.filter(fang_periodic_tasks::metadata.eq(metadata))
|
.filter(fang_periodic_tasks::metadata.eq(metadata))
|
||||||
.first::<PeriodicTask>(&self.connection)
|
.first::<PeriodicTask>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_task_by_metadata(&self, metadata: &serde_json::Value) -> Option<Task> {
|
fn find_task_by_metadata_query(
|
||||||
|
connection: &PgConnection,
|
||||||
|
metadata: &serde_json::Value,
|
||||||
|
) -> Option<Task> {
|
||||||
fang_tasks::table
|
fang_tasks::table
|
||||||
.filter(fang_tasks::metadata.eq(metadata))
|
.filter(fang_tasks::metadata.eq(metadata))
|
||||||
.filter(
|
.filter(
|
||||||
|
@ -279,16 +392,16 @@ impl Postgres {
|
||||||
.eq(FangTaskState::New)
|
.eq(FangTaskState::New)
|
||||||
.or(fang_tasks::state.eq(FangTaskState::InProgress)),
|
.or(fang_tasks::state.eq(FangTaskState::InProgress)),
|
||||||
)
|
)
|
||||||
.first::<Task>(&self.connection)
|
.first::<Task>(connection)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod postgres_tests {
|
mod queue_tests {
|
||||||
use super::NewTask;
|
use super::NewTask;
|
||||||
use super::PeriodicTask;
|
use super::PeriodicTask;
|
||||||
use super::Postgres;
|
use super::Queue;
|
||||||
use super::Task;
|
use super::Task;
|
||||||
use crate::executor::Error as ExecutorError;
|
use crate::executor::Error as ExecutorError;
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
|
@ -305,16 +418,16 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn insert_inserts_task() {
|
fn insert_inserts_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
let new_task = NewTask {
|
let new_task = NewTask {
|
||||||
metadata: serde_json::json!(true),
|
metadata: serde_json::json!(true),
|
||||||
task_type: "common".to_string(),
|
task_type: "common".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = postgres
|
let result = queue
|
||||||
.connection
|
.connection
|
||||||
.test_transaction::<Task, Error, _>(|| postgres.insert(&new_task));
|
.test_transaction::<Task, Error, _>(|| queue.insert(&new_task));
|
||||||
|
|
||||||
assert_eq!(result.state, FangTaskState::New);
|
assert_eq!(result.state, FangTaskState::New);
|
||||||
assert_eq!(result.error_message, None);
|
assert_eq!(result.error_message, None);
|
||||||
|
@ -322,18 +435,18 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fetch_task_fetches_the_oldest_task() {
|
fn fetch_task_fetches_the_oldest_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let timestamp1 = Utc::now() - Duration::hours(40);
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection);
|
let task1 = insert_job(serde_json::json!(true), timestamp1, &queue.connection);
|
||||||
|
|
||||||
let timestamp2 = Utc::now() - Duration::hours(20);
|
let timestamp2 = Utc::now() - Duration::hours(20);
|
||||||
|
|
||||||
insert_job(serde_json::json!(false), timestamp2, &postgres.connection);
|
insert_job(serde_json::json!(false), timestamp2, &queue.connection);
|
||||||
|
|
||||||
let found_task = postgres.fetch_task(&None).unwrap();
|
let found_task = queue.fetch_task(&None).unwrap();
|
||||||
|
|
||||||
assert_eq!(found_task.id, task1.id);
|
assert_eq!(found_task.id, task1.id);
|
||||||
|
|
||||||
|
@ -343,12 +456,12 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn finish_task_updates_state_field() {
|
fn finish_task_updates_state_field() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_new_job(&postgres.connection);
|
let task = insert_new_job(&queue.connection);
|
||||||
|
|
||||||
let updated_task = postgres.finish_task(&task).unwrap();
|
let updated_task = queue.finish_task(&task).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Finished, updated_task.state);
|
assert_eq!(FangTaskState::Finished, updated_task.state);
|
||||||
|
|
||||||
|
@ -358,13 +471,13 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fail_task_updates_state_field_and_sets_error_message() {
|
fn fail_task_updates_state_field_and_sets_error_message() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_new_job(&postgres.connection);
|
let task = insert_new_job(&queue.connection);
|
||||||
let error = "Failed".to_string();
|
let error = "Failed".to_string();
|
||||||
|
|
||||||
let updated_task = postgres.fail_task(&task, error.clone()).unwrap();
|
let updated_task = queue.fail_task(&task, error.clone()).unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Failed, updated_task.state);
|
assert_eq!(FangTaskState::Failed, updated_task.state);
|
||||||
assert_eq!(error, updated_task.error_message.unwrap());
|
assert_eq!(error, updated_task.error_message.unwrap());
|
||||||
|
@ -375,12 +488,12 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fetch_and_touch_updates_state() {
|
fn fetch_and_touch_updates_state() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let _task = insert_new_job(&postgres.connection);
|
let _task = insert_new_job(&queue.connection);
|
||||||
|
|
||||||
let updated_task = postgres.fetch_and_touch(&None).unwrap().unwrap();
|
let updated_task = queue.fetch_and_touch(&None).unwrap().unwrap();
|
||||||
|
|
||||||
assert_eq!(FangTaskState::InProgress, updated_task.state);
|
assert_eq!(FangTaskState::InProgress, updated_task.state);
|
||||||
|
|
||||||
|
@ -390,10 +503,10 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fetch_and_touch_returns_none() {
|
fn fetch_and_touch_returns_none() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = postgres.fetch_and_touch(&None).unwrap();
|
let task = queue.fetch_and_touch(&None).unwrap();
|
||||||
|
|
||||||
assert_eq!(None, task);
|
assert_eq!(None, task);
|
||||||
|
|
||||||
|
@ -403,11 +516,11 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_task_serializes_and_inserts_task() {
|
fn push_task_serializes_and_inserts_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let job = Job { number: 10 };
|
||||||
let task = postgres.push_task(&job).unwrap();
|
let task = queue.push_task(&job).unwrap();
|
||||||
|
|
||||||
let mut m = serde_json::value::Map::new();
|
let mut m = serde_json::value::Map::new();
|
||||||
m.insert(
|
m.insert(
|
||||||
|
@ -427,13 +540,13 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_task_does_not_insert_the_same_task() {
|
fn push_task_does_not_insert_the_same_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let job = Job { number: 10 };
|
||||||
let task2 = postgres.push_task(&job).unwrap();
|
let task2 = queue.push_task(&job).unwrap();
|
||||||
|
|
||||||
let task1 = postgres.push_task(&job).unwrap();
|
let task1 = queue.push_task(&job).unwrap();
|
||||||
|
|
||||||
assert_eq!(task1.id, task2.id);
|
assert_eq!(task1.id, task2.id);
|
||||||
|
|
||||||
|
@ -443,14 +556,14 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_periodic_task() {
|
fn push_periodic_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let job = Job { number: 10 };
|
||||||
let task = postgres.push_periodic_task(&job, 60).unwrap();
|
let task = queue.push_periodic_task(&job, 60).unwrap();
|
||||||
|
|
||||||
assert_eq!(task.period_in_seconds, 60);
|
assert_eq!(task.period_in_seconds, 60);
|
||||||
assert!(postgres.find_periodic_task_by_id(task.id).is_some());
|
assert!(queue.find_periodic_task_by_id(task.id).is_some());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -458,13 +571,13 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_periodic_task_returns_existing_job() {
|
fn push_periodic_task_returns_existing_job() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let job = Job { number: 10 };
|
||||||
let task1 = postgres.push_periodic_task(&job, 60).unwrap();
|
let task1 = queue.push_periodic_task(&job, 60).unwrap();
|
||||||
|
|
||||||
let task2 = postgres.push_periodic_task(&job, 60).unwrap();
|
let task2 = queue.push_periodic_task(&job, 60).unwrap();
|
||||||
|
|
||||||
assert_eq!(task1.id, task2.id);
|
assert_eq!(task1.id, task2.id);
|
||||||
|
|
||||||
|
@ -474,11 +587,11 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() {
|
fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let job = Job { number: 10 };
|
let job = Job { number: 10 };
|
||||||
let task = postgres.push_periodic_task(&job, 60).unwrap();
|
let task = queue.push_periodic_task(&job, 60).unwrap();
|
||||||
|
|
||||||
let schedule_in_future = Utc::now() + Duration::hours(100);
|
let schedule_in_future = Utc::now() + Duration::hours(100);
|
||||||
|
|
||||||
|
@ -486,10 +599,10 @@ mod postgres_tests {
|
||||||
serde_json::json!(true),
|
serde_json::json!(true),
|
||||||
schedule_in_future,
|
schedule_in_future,
|
||||||
100,
|
100,
|
||||||
&postgres.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
|
||||||
let tasks = postgres.fetch_periodic_tasks(100).unwrap();
|
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
||||||
|
|
||||||
assert_eq!(tasks.len(), 1);
|
assert_eq!(tasks.len(), 1);
|
||||||
assert_eq!(tasks[0].id, task.id);
|
assert_eq!(tasks[0].id, task.id);
|
||||||
|
@ -500,17 +613,13 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn schedule_next_task_execution() {
|
fn schedule_next_task_execution() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_periodic_job(
|
let task =
|
||||||
serde_json::json!(true),
|
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
Utc::now(),
|
|
||||||
100,
|
|
||||||
&postgres.connection,
|
|
||||||
);
|
|
||||||
|
|
||||||
let updated_task = postgres.schedule_next_task_execution(&task).unwrap();
|
let updated_task = queue.schedule_next_task_execution(&task).unwrap();
|
||||||
|
|
||||||
let next_schedule = (task.scheduled_at.unwrap()
|
let next_schedule = (task.scheduled_at.unwrap()
|
||||||
+ Duration::seconds(task.period_in_seconds.into()))
|
+ Duration::seconds(task.period_in_seconds.into()))
|
||||||
|
@ -527,21 +636,17 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn remove_all_periodic_tasks() {
|
fn remove_all_periodic_tasks() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_periodic_job(
|
let task =
|
||||||
serde_json::json!(true),
|
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
Utc::now(),
|
|
||||||
100,
|
|
||||||
&postgres.connection,
|
|
||||||
);
|
|
||||||
|
|
||||||
let result = postgres.remove_all_periodic_tasks().unwrap();
|
let result = queue.remove_all_periodic_tasks().unwrap();
|
||||||
|
|
||||||
assert_eq!(1, result);
|
assert_eq!(1, result);
|
||||||
|
|
||||||
assert_eq!(None, postgres.find_periodic_task_by_id(task.id));
|
assert_eq!(None, queue.find_periodic_task_by_id(task.id));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -549,15 +654,15 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn remove_all_tasks() {
|
fn remove_all_tasks() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task = insert_job(serde_json::json!(true), Utc::now(), &postgres.connection);
|
let task = insert_job(serde_json::json!(true), Utc::now(), &queue.connection);
|
||||||
let result = postgres.remove_all_tasks().unwrap();
|
let result = queue.remove_all_tasks().unwrap();
|
||||||
|
|
||||||
assert_eq!(1, result);
|
assert_eq!(1, result);
|
||||||
|
|
||||||
assert_eq!(None, postgres.find_task_by_id(task.id));
|
assert_eq!(None, queue.find_task_by_id(task.id));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -565,26 +670,22 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fetch_periodic_tasks() {
|
fn fetch_periodic_tasks() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let schedule_in_future = Utc::now() + Duration::hours(100);
|
let schedule_in_future = Utc::now() + Duration::hours(100);
|
||||||
|
|
||||||
insert_periodic_job(
|
insert_periodic_job(
|
||||||
serde_json::json!(true),
|
serde_json::json!(true),
|
||||||
schedule_in_future,
|
schedule_in_future,
|
||||||
100,
|
100,
|
||||||
&postgres.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
|
||||||
let task = insert_periodic_job(
|
let task =
|
||||||
serde_json::json!(true),
|
insert_periodic_job(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||||
Utc::now(),
|
|
||||||
100,
|
|
||||||
&postgres.connection,
|
|
||||||
);
|
|
||||||
|
|
||||||
let tasks = postgres.fetch_periodic_tasks(100).unwrap();
|
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
||||||
|
|
||||||
assert_eq!(tasks.len(), 1);
|
assert_eq!(tasks.len(), 1);
|
||||||
assert_eq!(tasks[0].id, task.id);
|
assert_eq!(tasks[0].id, task.id);
|
||||||
|
@ -595,7 +696,7 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn remove_task() {
|
fn remove_task() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
let new_task1 = NewTask {
|
let new_task1 = NewTask {
|
||||||
metadata: serde_json::json!(true),
|
metadata: serde_json::json!(true),
|
||||||
|
@ -607,19 +708,48 @@ mod postgres_tests {
|
||||||
task_type: "common".to_string(),
|
task_type: "common".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
postgres.connection.test_transaction::<(), Error, _>(|| {
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
let task1 = postgres.insert(&new_task1).unwrap();
|
let task1 = queue.insert(&new_task1).unwrap();
|
||||||
assert!(postgres.find_task_by_id(task1.id).is_some());
|
assert!(queue.find_task_by_id(task1.id).is_some());
|
||||||
|
|
||||||
let task2 = postgres.insert(&new_task2).unwrap();
|
let task2 = queue.insert(&new_task2).unwrap();
|
||||||
assert!(postgres.find_task_by_id(task2.id).is_some());
|
assert!(queue.find_task_by_id(task2.id).is_some());
|
||||||
|
|
||||||
postgres.remove_task(task1.id).unwrap();
|
queue.remove_task(task1.id).unwrap();
|
||||||
assert!(postgres.find_task_by_id(task1.id).is_none());
|
assert!(queue.find_task_by_id(task1.id).is_none());
|
||||||
assert!(postgres.find_task_by_id(task2.id).is_some());
|
assert!(queue.find_task_by_id(task2.id).is_some());
|
||||||
|
|
||||||
postgres.remove_task(task2.id).unwrap();
|
queue.remove_task(task2.id).unwrap();
|
||||||
assert!(postgres.find_task_by_id(task2.id).is_none());
|
assert!(queue.find_task_by_id(task2.id).is_none());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_task_of_type() {
|
||||||
|
let queue = Queue::new();
|
||||||
|
|
||||||
|
let new_task1 = NewTask {
|
||||||
|
metadata: serde_json::json!(true),
|
||||||
|
task_type: "type1".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_task2 = NewTask {
|
||||||
|
metadata: serde_json::json!(true),
|
||||||
|
task_type: "type2".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||||
|
let task1 = queue.insert(&new_task1).unwrap();
|
||||||
|
assert!(queue.find_task_by_id(task1.id).is_some());
|
||||||
|
|
||||||
|
let task2 = queue.insert(&new_task2).unwrap();
|
||||||
|
assert!(queue.find_task_by_id(task2.id).is_some());
|
||||||
|
|
||||||
|
queue.remove_tasks_of_type("type1").unwrap();
|
||||||
|
assert!(queue.find_task_by_id(task1.id).is_none());
|
||||||
|
assert!(queue.find_task_by_id(task2.id).is_some());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -629,13 +759,13 @@ mod postgres_tests {
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn fetch_task_locks_the_record() {
|
fn fetch_task_locks_the_record() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
let timestamp1 = Utc::now() - Duration::hours(40);
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
let task1 = insert_job(
|
let task1 = insert_job(
|
||||||
serde_json::json!(Job { number: 12 }),
|
serde_json::json!(Job { number: 12 }),
|
||||||
timestamp1,
|
timestamp1,
|
||||||
&postgres.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
|
||||||
let task1_id = task1.id;
|
let task1_id = task1.id;
|
||||||
|
@ -645,14 +775,14 @@ mod postgres_tests {
|
||||||
let task2 = insert_job(
|
let task2 = insert_job(
|
||||||
serde_json::json!(Job { number: 11 }),
|
serde_json::json!(Job { number: 11 }),
|
||||||
timestamp2,
|
timestamp2,
|
||||||
&postgres.connection,
|
&queue.connection,
|
||||||
);
|
);
|
||||||
|
|
||||||
let thread = std::thread::spawn(move || {
|
let thread = std::thread::spawn(move || {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.connection.transaction::<(), Error, _>(|| {
|
queue.connection.transaction::<(), Error, _>(|| {
|
||||||
let found_task = postgres.fetch_task(&None).unwrap();
|
let found_task = queue.fetch_task(&None).unwrap();
|
||||||
|
|
||||||
assert_eq!(found_task.id, task1.id);
|
assert_eq!(found_task.id, task1.id);
|
||||||
|
|
||||||
|
@ -664,7 +794,7 @@ mod postgres_tests {
|
||||||
|
|
||||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||||
|
|
||||||
let found_task = postgres.fetch_task(&None).unwrap();
|
let found_task = queue.fetch_task(&None).unwrap();
|
||||||
|
|
||||||
assert_eq!(found_task.id, task2.id);
|
assert_eq!(found_task.id, task2.id);
|
||||||
|
|
||||||
|
@ -672,7 +802,7 @@ mod postgres_tests {
|
||||||
|
|
||||||
// returns unlocked record
|
// returns unlocked record
|
||||||
|
|
||||||
let found_task = postgres.fetch_task(&None).unwrap();
|
let found_task = queue.fetch_task(&None).unwrap();
|
||||||
|
|
||||||
assert_eq!(found_task.id, task1_id);
|
assert_eq!(found_task.id, task1_id);
|
||||||
}
|
}
|
||||||
|
@ -684,7 +814,7 @@ mod postgres_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for Job {
|
impl Runnable for Job {
|
||||||
fn run(&self) -> Result<(), ExecutorError> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), ExecutorError> {
|
||||||
println!("the number is {}", self.number);
|
println!("the number is {}", self.number);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
|
@ -1,13 +1,13 @@
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
use crate::postgres::PeriodicTask;
|
use crate::queue::PeriodicTask;
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
pub check_period: u64,
|
pub check_period: u64,
|
||||||
pub error_margin_seconds: u64,
|
pub error_margin_seconds: u64,
|
||||||
pub postgres: Postgres,
|
pub queue: Queue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Scheduler {
|
impl Drop for Scheduler {
|
||||||
|
@ -18,22 +18,22 @@ impl Drop for Scheduler {
|
||||||
|
|
||||||
impl Scheduler {
|
impl Scheduler {
|
||||||
pub fn start(check_period: u64, error_margin_seconds: u64) {
|
pub fn start(check_period: u64, error_margin_seconds: u64) {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
let builder = thread::Builder::new().name("scheduler".to_string());
|
let builder = thread::Builder::new().name("scheduler".to_string());
|
||||||
|
|
||||||
builder
|
builder
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let scheduler = Self::new(check_period, error_margin_seconds, postgres);
|
let scheduler = Self::new(check_period, error_margin_seconds, queue);
|
||||||
|
|
||||||
scheduler.schedule_loop();
|
scheduler.schedule_loop();
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(check_period: u64, error_margin_seconds: u64, postgres: Postgres) -> Self {
|
pub fn new(check_period: u64, error_margin_seconds: u64, queue: Queue) -> Self {
|
||||||
Self {
|
Self {
|
||||||
check_period,
|
check_period,
|
||||||
postgres,
|
queue,
|
||||||
error_margin_seconds,
|
error_margin_seconds,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ impl Scheduler {
|
||||||
|
|
||||||
pub fn schedule(&self) {
|
pub fn schedule(&self) {
|
||||||
if let Some(tasks) = self
|
if let Some(tasks) = self
|
||||||
.postgres
|
.queue
|
||||||
.fetch_periodic_tasks(self.error_margin_seconds as i64)
|
.fetch_periodic_tasks(self.error_margin_seconds as i64)
|
||||||
{
|
{
|
||||||
for task in tasks {
|
for task in tasks {
|
||||||
|
@ -62,15 +62,15 @@ impl Scheduler {
|
||||||
fn process_task(&self, task: PeriodicTask) {
|
fn process_task(&self, task: PeriodicTask) {
|
||||||
match task.scheduled_at {
|
match task.scheduled_at {
|
||||||
None => {
|
None => {
|
||||||
self.postgres.schedule_next_task_execution(&task).unwrap();
|
self.queue.schedule_next_task_execution(&task).unwrap();
|
||||||
}
|
}
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
let actual_task: Box<dyn Runnable> =
|
let actual_task: Box<dyn Runnable> =
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
self.postgres.push_task(&(*actual_task)).unwrap();
|
self.queue.push_task(&(*actual_task)).unwrap();
|
||||||
|
|
||||||
self.postgres.schedule_next_task_execution(&task).unwrap();
|
self.queue.schedule_next_task_execution(&task).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,8 +81,8 @@ mod job_scheduler_tests {
|
||||||
use super::Scheduler;
|
use super::Scheduler;
|
||||||
use crate::executor::Error;
|
use crate::executor::Error;
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use crate::postgres::Task;
|
use crate::queue::Task;
|
||||||
use crate::schema::fang_tasks;
|
use crate::schema::fang_tasks;
|
||||||
use crate::typetag;
|
use crate::typetag;
|
||||||
use crate::{Deserialize, Serialize};
|
use crate::{Deserialize, Serialize};
|
||||||
|
@ -96,7 +96,7 @@ mod job_scheduler_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for ScheduledJob {
|
impl Runnable for ScheduledJob {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,15 +108,15 @@ mod job_scheduler_tests {
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn schedules_jobs() {
|
fn schedules_jobs() {
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
postgres.push_periodic_task(&ScheduledJob {}, 10).unwrap();
|
queue.push_periodic_task(&ScheduledJob {}, 10).unwrap();
|
||||||
Scheduler::start(1, 2);
|
Scheduler::start(1, 2);
|
||||||
|
|
||||||
let sleep_duration = Duration::from_secs(15);
|
let sleep_duration = Duration::from_secs(15);
|
||||||
thread::sleep(sleep_duration);
|
thread::sleep(sleep_duration);
|
||||||
|
|
||||||
let tasks = get_all_tasks(&postgres.connection);
|
let tasks = get_all_tasks(&queue.connection);
|
||||||
|
|
||||||
assert_eq!(1, tasks.len());
|
assert_eq!(1, tasks.len());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,22 @@
|
||||||
|
use crate::diesel::r2d2;
|
||||||
|
use crate::diesel::PgConnection;
|
||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::executor::RetentionMode;
|
use crate::executor::RetentionMode;
|
||||||
use crate::executor::SleepParams;
|
use crate::executor::SleepParams;
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
pub struct WorkerPool {
|
pub struct WorkerPool {
|
||||||
pub number_of_workers: u16,
|
pub number_of_workers: u32,
|
||||||
pub worker_params: WorkerParams,
|
pub worker_params: WorkerParams,
|
||||||
|
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WorkerThread {
|
pub struct WorkerThread {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub worker_params: WorkerParams,
|
pub worker_params: WorkerParams,
|
||||||
pub restarts: u64,
|
pub restarts: u64,
|
||||||
|
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -51,19 +55,24 @@ impl WorkerParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerPool {
|
impl WorkerPool {
|
||||||
pub fn new(number_of_workers: u16) -> Self {
|
pub fn new(number_of_workers: u32) -> Self {
|
||||||
let worker_params = WorkerParams::new();
|
let worker_params = WorkerParams::new();
|
||||||
|
let connection_pool = Queue::connection_pool(number_of_workers);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
number_of_workers,
|
number_of_workers,
|
||||||
worker_params,
|
worker_params,
|
||||||
|
connection_pool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_params(number_of_workers: u16, worker_params: WorkerParams) -> Self {
|
pub fn new_with_params(number_of_workers: u32, worker_params: WorkerParams) -> Self {
|
||||||
|
let connection_pool = Queue::connection_pool(number_of_workers);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
number_of_workers,
|
number_of_workers,
|
||||||
worker_params,
|
worker_params,
|
||||||
|
connection_pool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,21 +84,37 @@ impl WorkerPool {
|
||||||
.clone()
|
.clone()
|
||||||
.unwrap_or_else(|| "".to_string());
|
.unwrap_or_else(|| "".to_string());
|
||||||
let name = format!("worker_{}{}", worker_type, idx);
|
let name = format!("worker_{}{}", worker_type, idx);
|
||||||
WorkerThread::spawn_in_pool(self.worker_params.clone(), name, 0)
|
WorkerThread::spawn_in_pool(
|
||||||
|
self.worker_params.clone(),
|
||||||
|
name,
|
||||||
|
0,
|
||||||
|
self.connection_pool.clone(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerThread {
|
impl WorkerThread {
|
||||||
pub fn new(worker_params: WorkerParams, name: String, restarts: u64) -> Self {
|
pub fn new(
|
||||||
|
worker_params: WorkerParams,
|
||||||
|
name: String,
|
||||||
|
restarts: u64,
|
||||||
|
connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name,
|
name,
|
||||||
worker_params,
|
worker_params,
|
||||||
restarts,
|
restarts,
|
||||||
|
connection_pool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_in_pool(worker_params: WorkerParams, name: String, restarts: u64) {
|
pub fn spawn_in_pool(
|
||||||
|
worker_params: WorkerParams,
|
||||||
|
name: String,
|
||||||
|
restarts: u64,
|
||||||
|
connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
||||||
|
) {
|
||||||
let builder = thread::Builder::new().name(name.clone());
|
let builder = thread::Builder::new().name(name.clone());
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
@ -100,25 +125,35 @@ impl WorkerThread {
|
||||||
builder
|
builder
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
// when _job is dropped, it will be restarted (see Drop trait impl)
|
// when _job is dropped, it will be restarted (see Drop trait impl)
|
||||||
let _job = WorkerThread::new(worker_params.clone(), name, restarts);
|
let _job = WorkerThread::new(
|
||||||
|
worker_params.clone(),
|
||||||
|
name,
|
||||||
|
restarts,
|
||||||
|
connection_pool.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
let postgres = Postgres::new();
|
match connection_pool.get() {
|
||||||
|
Ok(connection) => {
|
||||||
|
let mut executor = Executor::new(connection);
|
||||||
|
|
||||||
let mut executor = Executor::new(postgres);
|
if let Some(task_type_str) = worker_params.task_type {
|
||||||
|
executor.set_task_type(task_type_str);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(task_type_str) = worker_params.task_type {
|
if let Some(retention_mode) = worker_params.retention_mode {
|
||||||
executor.set_task_type(task_type_str);
|
executor.set_retention_mode(retention_mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(sleep_params) = worker_params.sleep_params {
|
||||||
|
executor.set_sleep_params(sleep_params);
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.run_tasks();
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
error!("Failed to get postgres connection: {:?}", error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(retention_mode) = worker_params.retention_mode {
|
|
||||||
executor.set_retention_mode(retention_mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(sleep_params) = worker_params.sleep_params {
|
|
||||||
executor.set_sleep_params(sleep_params);
|
|
||||||
}
|
|
||||||
|
|
||||||
executor.run_tasks();
|
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
@ -130,6 +165,7 @@ impl Drop for WorkerThread {
|
||||||
self.worker_params.clone(),
|
self.worker_params.clone(),
|
||||||
self.name.clone(),
|
self.name.clone(),
|
||||||
self.restarts + 1,
|
self.restarts + 1,
|
||||||
|
self.connection_pool.clone(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -141,8 +177,8 @@ mod job_pool_tests {
|
||||||
use crate::executor::Error;
|
use crate::executor::Error;
|
||||||
use crate::executor::RetentionMode;
|
use crate::executor::RetentionMode;
|
||||||
use crate::executor::Runnable;
|
use crate::executor::Runnable;
|
||||||
use crate::postgres::Postgres;
|
use crate::queue::Queue;
|
||||||
use crate::postgres::Task;
|
use crate::queue::Task;
|
||||||
use crate::schema::fang_tasks;
|
use crate::schema::fang_tasks;
|
||||||
use crate::typetag;
|
use crate::typetag;
|
||||||
use crate::{Deserialize, Serialize};
|
use crate::{Deserialize, Serialize};
|
||||||
|
@ -178,14 +214,12 @@ mod job_pool_tests {
|
||||||
|
|
||||||
#[typetag::serde]
|
#[typetag::serde]
|
||||||
impl Runnable for MyJob {
|
impl Runnable for MyJob {
|
||||||
fn run(&self) -> Result<(), Error> {
|
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||||
let postgres = Postgres::new();
|
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(3));
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
let new_job = MyJob::new(self.number + 1);
|
let new_job = MyJob::new(self.number + 1);
|
||||||
|
|
||||||
postgres.push_task(&new_job).unwrap();
|
Queue::push_task_query(connection, &new_job).unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -201,20 +235,20 @@ mod job_pool_tests {
|
||||||
fn tasks_are_split_between_two_threads() {
|
fn tasks_are_split_between_two_threads() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let postgres = Postgres::new();
|
let queue = Queue::new();
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
let mut worker_params = WorkerParams::new();
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||||
let job_pool = WorkerPool::new_with_params(2, worker_params);
|
let job_pool = WorkerPool::new_with_params(2, worker_params);
|
||||||
|
|
||||||
postgres.push_task(&MyJob::new(100)).unwrap();
|
queue.push_task(&MyJob::new(100)).unwrap();
|
||||||
postgres.push_task(&MyJob::new(200)).unwrap();
|
queue.push_task(&MyJob::new(200)).unwrap();
|
||||||
|
|
||||||
job_pool.start();
|
job_pool.start();
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(100));
|
thread::sleep(Duration::from_secs(100));
|
||||||
|
|
||||||
let tasks = get_all_tasks(&postgres.connection);
|
let tasks = get_all_tasks(&queue.connection);
|
||||||
|
|
||||||
assert!(tasks.len() > 40);
|
assert!(tasks.len() > 40);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue