Diesel and Uuid updated, it was not hard xd (#78)

This commit is contained in:
Pmarquez 2022-09-01 15:54:04 +00:00 committed by GitHub
parent 553f5f7de1
commit f2ca0c046d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 148 deletions

View file

@ -28,15 +28,15 @@ sha2 = "0.10"
thiserror = "1.0"
typed-builder = "0.10"
typetag = "0.2"
uuid = { version = "0.8", features = ["v4"] }
uuid = { version = "1.1", features = ["v4"] }
[dependencies.diesel]
version = "1.4"
features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"]
version = "2.0"
features = ["postgres", "serde_json", "chrono", "uuid", "r2d2"]
optional = true
[dependencies.diesel-derive-enum]
version = "1"
version = "2.0.0-rc.0"
features = ["postgres"]
optional = true
@ -46,7 +46,7 @@ optional = true
[dependencies.bb8-postgres]
version = "0.8"
features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"]
features = ["with-serde_json-1" , "with-uuid-1" , "with-chrono-0_4"]
optional = true
[dependencies.postgres-types]

View file

@ -1,5 +0,0 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"

View file

@ -28,7 +28,7 @@ use std::env;
pub type PoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone, TypedBuilder)]
#[table_name = "fang_tasks"]
#[diesel(table_name = fang_tasks)]
pub struct Task {
#[builder(setter(into))]
pub id: Uuid,
@ -51,7 +51,7 @@ pub struct Task {
}
#[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)]
#[table_name = "fang_tasks"]
#[diesel(table_name = fang_tasks)]
pub struct NewTask {
#[builder(setter(into))]
metadata: serde_json::Value,
@ -109,62 +109,62 @@ pub struct Queue {
impl Queueable for Queue {
fn fetch_and_touch_task(&self, task_type: String) -> Result<Option<Task>, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::fetch_and_touch_query(&connection, task_type)
Self::fetch_and_touch_query(&mut connection, task_type)
}
fn insert_task(&self, params: &dyn Runnable) -> Result<Task, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::insert_query(&connection, params, Utc::now())
Self::insert_query(&mut connection, params, Utc::now())
}
fn schedule_task(&self, params: &dyn Runnable) -> Result<Task, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::schedule_task_query(&connection, params)
Self::schedule_task_query(&mut connection, params)
}
fn remove_all_scheduled_tasks(&self) -> Result<usize, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::remove_all_scheduled_tasks_query(&connection)
Self::remove_all_scheduled_tasks_query(&mut connection)
}
fn remove_all_tasks(&self) -> Result<usize, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::remove_all_tasks_query(&connection)
Self::remove_all_tasks_query(&mut connection)
}
fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::remove_tasks_of_type_query(&connection, task_type)
Self::remove_tasks_of_type_query(&mut connection, task_type)
}
fn remove_task(&self, id: Uuid) -> Result<usize, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::remove_task_query(&connection, id)
Self::remove_task_query(&mut connection, id)
}
fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::update_task_state_query(&connection, task, state)
Self::update_task_state_query(&mut connection, task, state)
}
fn fail_task(&self, task: &Task, error: String) -> Result<Task, QueueError> {
let connection = self.get_connection()?;
let mut connection = self.get_connection()?;
Self::fail_task_query(&connection, task, error)
Self::fail_task_query(&mut connection, task, error)
}
fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
let connection = self.get_connection().unwrap();
let mut connection = self.get_connection().unwrap();
Self::find_task_by_id_query(&connection, id)
Self::find_task_by_id_query(&mut connection, id)
}
}
@ -181,7 +181,7 @@ impl Queue {
}
pub fn schedule_task_query(
connection: &PgConnection,
connection: &mut PgConnection,
params: &dyn Runnable,
) -> Result<Task, QueueError> {
let scheduled_at = match params.cron() {
@ -212,7 +212,7 @@ impl Queue {
}
pub fn insert_query(
connection: &PgConnection,
connection: &mut PgConnection,
params: &dyn Runnable,
scheduled_at: DateTime<Utc>,
) -> Result<Task, QueueError> {
@ -250,23 +250,23 @@ impl Queue {
}
}
pub fn fetch_task_query(connection: &PgConnection, task_type: String) -> Option<Task> {
pub fn fetch_task_query(connection: &mut PgConnection, task_type: String) -> Option<Task> {
Self::fetch_task_of_type_query(connection, &task_type)
}
pub fn fetch_and_touch_query(
connection: &PgConnection,
connection: &mut PgConnection,
task_type: String,
) -> Result<Option<Task>, QueueError> {
connection.transaction::<Option<Task>, QueueError, _>(|| {
let found_task = Self::fetch_task_query(connection, task_type);
connection.transaction::<Option<Task>, QueueError, _>(|conn| {
let found_task = Self::fetch_task_query(conn, task_type);
if found_task.is_none() {
return Ok(None);
}
match Self::update_task_state_query(
connection,
conn,
&found_task.unwrap(),
FangTaskState::InProgress,
) {
@ -276,19 +276,19 @@ impl Queue {
})
}
pub fn find_task_by_id_query(connection: &PgConnection, id: Uuid) -> Option<Task> {
pub fn find_task_by_id_query(connection: &mut PgConnection, id: Uuid) -> Option<Task> {
fang_tasks::table
.filter(fang_tasks::id.eq(id))
.first::<Task>(connection)
.ok()
}
pub fn remove_all_tasks_query(connection: &PgConnection) -> Result<usize, QueueError> {
pub fn remove_all_tasks_query(connection: &mut PgConnection) -> Result<usize, QueueError> {
Ok(diesel::delete(fang_tasks::table).execute(connection)?)
}
pub fn remove_all_scheduled_tasks_query(
connection: &PgConnection,
connection: &mut PgConnection,
) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now()));
@ -296,7 +296,7 @@ impl Queue {
}
pub fn remove_tasks_of_type_query(
connection: &PgConnection,
connection: &mut PgConnection,
task_type: &str,
) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type));
@ -304,14 +304,14 @@ impl Queue {
Ok(diesel::delete(query).execute(connection)?)
}
pub fn remove_task_query(connection: &PgConnection, id: Uuid) -> Result<usize, QueueError> {
pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::id.eq(id));
Ok(diesel::delete(query).execute(connection)?)
}
pub fn update_task_state_query(
connection: &PgConnection,
connection: &mut PgConnection,
task: &Task,
state: FangTaskState,
) -> Result<Task, QueueError> {
@ -324,7 +324,7 @@ impl Queue {
}
pub fn fail_task_query(
connection: &PgConnection,
connection: &mut PgConnection,
task: &Task,
error: String,
) -> Result<Task, QueueError> {
@ -355,7 +355,7 @@ impl Queue {
.unwrap()
}
fn fetch_task_of_type_query(connection: &PgConnection, task_type: &str) -> Option<Task> {
fn fetch_task_of_type_query(connection: &mut PgConnection, task_type: &str) -> Option<Task> {
fang_tasks::table
.order(fang_tasks::created_at.asc())
.order(fang_tasks::scheduled_at.asc())
@ -369,7 +369,10 @@ impl Queue {
.ok()
}
fn find_task_by_uniq_hash_query(connection: &PgConnection, uniq_hash: &str) -> Option<Task> {
fn find_task_by_uniq_hash_query(
connection: &mut PgConnection,
uniq_hash: &str,
) -> Option<Task> {
fang_tasks::table
.filter(fang_tasks::uniq_hash.eq(uniq_hash))
.filter(fang_tasks::state.eq(FangTaskState::New))
@ -462,17 +465,17 @@ mod queue_tests {
}
#[test]
fn insert_inserts_task() {
fn insert_task_test() {
let task = PepeTask { number: 10 };
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task = Queue::insert_query(conn, &task, Utc::now()).unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
@ -496,14 +499,13 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap();
let _task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap();
let _task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap();
let found_task =
Queue::fetch_task_query(&queue_pooled_connection, COMMON_TYPE.to_string()).unwrap();
let found_task = Queue::fetch_task_query(conn, COMMON_TYPE.to_string()).unwrap();
assert_eq!(found_task.id, task1.id);
Ok(())
});
@ -517,17 +519,13 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task = Queue::insert_query(conn, &task, Utc::now()).unwrap();
let found_task = Queue::update_task_state_query(
&queue_pooled_connection,
&task,
FangTaskState::Finished,
)
.unwrap();
let found_task =
Queue::update_task_state_query(conn, &task, FangTaskState::Finished).unwrap();
let metadata = found_task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
@ -550,15 +548,14 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task = Queue::insert_query(conn, &task, Utc::now()).unwrap();
let error = "Failed".to_string();
let found_task =
Queue::fail_task_query(&queue_pooled_connection, &task, error.clone()).unwrap();
let found_task = Queue::fail_task_query(conn, &task, error.clone()).unwrap();
let metadata = found_task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
@ -582,15 +579,14 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task = Queue::insert_query(conn, &task, Utc::now()).unwrap();
let found_task =
Queue::fetch_and_touch_query(&queue_pooled_connection, COMMON_TYPE.to_string())
.unwrap()
.unwrap();
let found_task = Queue::fetch_and_touch_query(conn, COMMON_TYPE.to_string())
.unwrap()
.unwrap();
let metadata = found_task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
@ -611,12 +607,10 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let found_task =
Queue::fetch_and_touch_query(&queue_pooled_connection, COMMON_TYPE.to_string())
.unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let found_task = Queue::fetch_and_touch_query(conn, COMMON_TYPE.to_string()).unwrap();
assert_eq!(None, found_task);
@ -632,11 +626,11 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task1 = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
let task2 = Queue::insert_query(&queue_pooled_connection, &task, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &task, Utc::now()).unwrap();
let task2 = Queue::insert_query(conn, &task, Utc::now()).unwrap();
assert_eq!(task2.id, task1.id);
Ok(())
@ -648,9 +642,9 @@ mod queue_tests {
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
let task = &ScheduledPepeTask {
@ -658,7 +652,7 @@ mod queue_tests {
datetime: datetime.to_string(),
};
let task = Queue::schedule_task_query(&queue_pooled_connection, task).unwrap();
let task = Queue::schedule_task_query(conn, task).unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
@ -677,9 +671,9 @@ mod queue_tests {
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
let task1 = &ScheduledPepeTask {
@ -692,10 +686,10 @@ mod queue_tests {
datetime: datetime.to_string(),
};
Queue::schedule_task_query(&queue_pooled_connection, task1).unwrap();
Queue::schedule_task_query(&queue_pooled_connection, task2).unwrap();
Queue::schedule_task_query(conn, task1).unwrap();
Queue::schedule_task_query(conn, task2).unwrap();
let number = Queue::remove_all_scheduled_tasks_query(&queue_pooled_connection).unwrap();
let number = Queue::remove_all_scheduled_tasks_query(conn).unwrap();
assert_eq!(2, number);
@ -712,23 +706,17 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap();
let result = Queue::remove_all_tasks_query(&queue_pooled_connection).unwrap();
let result = Queue::remove_all_tasks_query(conn).unwrap();
assert_eq!(2, result);
assert_eq!(
None,
Queue::find_task_by_id_query(&queue_pooled_connection, task1.id)
);
assert_eq!(
None,
Queue::find_task_by_id_query(&queue_pooled_connection, task2.id)
);
assert_eq!(None, Queue::find_task_by_id_query(conn, task1.id));
assert_eq!(None, Queue::find_task_by_id_query(conn, task2.id));
Ok(())
});
@ -743,19 +731,19 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap();
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some());
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task1.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_some());
Queue::remove_task_query(&queue_pooled_connection, task1.id).unwrap();
Queue::remove_task_query(conn, task1.id).unwrap();
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_none());
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task1.id).is_none());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_some());
Ok(())
});
@ -770,19 +758,19 @@ mod queue_tests {
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
let mut queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let task1 = Queue::insert_query(&queue_pooled_connection, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(&queue_pooled_connection, &task2, Utc::now()).unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|conn| {
let task1 = Queue::insert_query(conn, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(conn, &task2, Utc::now()).unwrap();
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some());
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task1.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_some());
Queue::remove_tasks_of_type_query(&queue_pooled_connection, "weirdo").unwrap();
Queue::remove_tasks_of_type_query(conn, "weirdo").unwrap();
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task1.id).is_some());
assert!(Queue::find_task_by_id_query(&queue_pooled_connection, task2.id).is_none());
assert!(Queue::find_task_by_id_query(conn, task1.id).is_some());
assert!(Queue::find_task_by_id_query(conn, task2.id).is_none());
Ok(())
});

View file

@ -1,6 +1,11 @@
use diesel_derive_enum::DbEnum;
pub mod sql_types {
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "fang_task_state"))]
pub struct FangTaskState;
}
#[derive(DbEnum, Debug, Eq, PartialEq, Clone)]
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
#[DieselTypePath = "crate::blocking::schema::sql_types::FangTaskState"]
pub enum FangTaskState {
New,
InProgress,
@ -9,22 +14,22 @@ pub enum FangTaskState {
}
table! {
use super::FangTaskStateMapping;
use super::sql_types::FangTaskState;
use diesel::sql_types::Jsonb;
use diesel::sql_types::Nullable;
use diesel::sql_types::Text;
use diesel::sql_types::Timestamptz;
use diesel::sql_types::Uuid;
use diesel::sql_types::Varchar;
use diesel::pg::sql_types::Bpchar;
fang_tasks (id) {
id -> Uuid,
metadata -> Jsonb,
error_message -> Nullable<Text>,
state -> FangTaskStateMapping,
state -> FangTaskState,
task_type -> Varchar,
uniq_hash -> Nullable<Text>,
uniq_hash -> Nullable<Bpchar>,
scheduled_at -> Timestamptz,
created_at -> Timestamptz,
updated_at -> Timestamptz,

View file

@ -235,20 +235,20 @@ mod worker_tests {
.retention_mode(RetentionMode::KeepAll)
.task_type(task.task_type())
.build();
let pooled_connection = worker.queue.connection_pool.get().unwrap();
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap();
let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap();
assert_eq!(FangTaskState::New, task.state);
// this operation commits and thats why need to commit this test
worker.run(task.clone());
let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap();
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
assert_eq!(FangTaskState::Finished, found_task.state);
Queue::remove_tasks_of_type_query(&pooled_connection, "worker_task").unwrap();
Queue::remove_tasks_of_type_query(&mut pooled_connection, "worker_task").unwrap();
}
#[test]
@ -267,10 +267,10 @@ mod worker_tests {
.retention_mode(RetentionMode::KeepAll)
.build();
let pooled_connection = worker.queue.connection_pool.get().unwrap();
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
let task1 = Queue::insert_query(&pooled_connection, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(&pooled_connection, &task2, Utc::now()).unwrap();
let task1 = Queue::insert_query(&mut pooled_connection, &task1, Utc::now()).unwrap();
let task2 = Queue::insert_query(&mut pooled_connection, &task2, Utc::now()).unwrap();
assert_eq!(FangTaskState::New, task1.state);
assert_eq!(FangTaskState::New, task2.state);
@ -279,14 +279,14 @@ mod worker_tests {
std::thread::sleep(std::time::Duration::from_millis(1000));
let found_task1 = Queue::find_task_by_id_query(&pooled_connection, task1.id).unwrap();
let found_task1 = Queue::find_task_by_id_query(&mut pooled_connection, task1.id).unwrap();
assert_eq!(FangTaskState::Finished, found_task1.state);
let found_task2 = Queue::find_task_by_id_query(&pooled_connection, task2.id).unwrap();
let found_task2 = Queue::find_task_by_id_query(&mut pooled_connection, task2.id).unwrap();
assert_eq!(FangTaskState::New, found_task2.state);
Queue::remove_tasks_of_type_query(&pooled_connection, "type1").unwrap();
Queue::remove_tasks_of_type_query(&pooled_connection, "type2").unwrap();
Queue::remove_tasks_of_type_query(&mut pooled_connection, "type1").unwrap();
Queue::remove_tasks_of_type_query(&mut pooled_connection, "type2").unwrap();
}
#[test]
@ -304,15 +304,15 @@ mod worker_tests {
.task_type(task.task_type())
.build();
let pooled_connection = worker.queue.connection_pool.get().unwrap();
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap();
let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap();
assert_eq!(FangTaskState::New, task.state);
worker.run(task.clone());
let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap();
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
assert_eq!(FangTaskState::Failed, found_task.state);
assert_eq!(
@ -320,6 +320,6 @@ mod worker_tests {
found_task.error_message.unwrap()
);
Queue::remove_tasks_of_type_query(&pooled_connection, "F_task").unwrap();
Queue::remove_tasks_of_type_query(&mut pooled_connection, "F_task").unwrap();
}
}