diff --git a/src/postgres.rs b/src/postgres.rs index f0c837a..5305742 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -63,6 +63,7 @@ impl Postgres { .order(fang_tasks::created_at.asc()) .limit(1) .for_update() + .skip_locked() .get_result::(&self.connection) { Ok(record) => Some(record), @@ -78,7 +79,7 @@ mod postgres_tests { use super::Task; use crate::schema::fang_tasks; use crate::schema::FangTaskState; - use chrono::{Duration, Utc}; + use chrono::{DateTime, Duration, Utc}; use diesel::connection::Connection; use diesel::prelude::*; use diesel::result::Error; @@ -106,23 +107,11 @@ mod postgres_tests { postgres.connection.test_transaction::<(), Error, _>(|| { let timestamp1 = Utc::now() - Duration::hours(40); - let task1 = diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(serde_json::json!(true)), - fang_tasks::created_at.eq(timestamp1), - )]) - .get_result::(&postgres.connection) - .unwrap(); + let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection); let timestamp2 = Utc::now() - Duration::hours(20); - diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(serde_json::json!(false)), - fang_tasks::created_at.eq(timestamp2), - )]) - .get_result::(&postgres.connection) - .unwrap(); + insert_job(serde_json::json!(false), timestamp2, &postgres.connection); let found_task = postgres.fetch_task().unwrap(); @@ -132,37 +121,28 @@ mod postgres_tests { }); } + // this test ignored because it commits data to the db #[test] #[ignore] fn fetch_task_locks_the_record() { let postgres = Postgres::new(None); let timestamp1 = Utc::now() - Duration::hours(40); - let task1 = diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(serde_json::json!(true)), - fang_tasks::created_at.eq(timestamp1), - )]) - .get_result::(&postgres.connection) - .unwrap(); + let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection); + + let task1_id = task1.id; let timestamp2 = Utc::now() - Duration::hours(20); - let task2 = diesel::insert_into(fang_tasks::table) - .values(&vec![( - fang_tasks::metadata.eq(serde_json::json!(false)), - fang_tasks::created_at.eq(timestamp2), - )]) - .get_result::(&postgres.connection) - .unwrap(); + let task2 = insert_job(serde_json::json!(false), timestamp2, &postgres.connection); let thread = std::thread::spawn(move || { let postgres = Postgres::new(None); - postgres.connection.test_transaction::<(), Error, _>(|| { + postgres.connection.transaction::<(), Error, _>(|| { let found_task = postgres.fetch_task().unwrap(); - assert_eq!(found_task.id, task2.id); + assert_eq!(found_task.id, task1.id); std::thread::sleep(std::time::Duration::from_millis(5000)); @@ -174,12 +154,28 @@ mod postgres_tests { let found_task = postgres.fetch_task().unwrap(); - assert_eq!(found_task.id, task1.id); + assert_eq!(found_task.id, task2.id); - let result = thread.join(); + let _result = thread.join(); - eprintln!("{:?}", result); + // returns unlocked record - // assert_eq!(Ok(()), result); + let found_task = postgres.fetch_task().unwrap(); + + assert_eq!(found_task.id, task1_id); + } + + fn insert_job( + metadata: serde_json::Value, + timestamp: DateTime, + connection: &PgConnection, + ) -> Task { + diesel::insert_into(fang_tasks::table) + .values(&vec![( + fang_tasks::metadata.eq(metadata), + fang_tasks::created_at.eq(timestamp), + )]) + .get_result::(connection) + .unwrap() } }