lock the selected task record
This commit is contained in:
parent
506fd1c4cb
commit
27f3e1479f
5 changed files with 156 additions and 0 deletions
|
@ -10,6 +10,7 @@ license = "MIT"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
diesel = { version = "1.4.6", features = ["postgres", "serde_json", "chrono", "uuidv07"] }
|
diesel = { version = "1.4.6", features = ["postgres", "serde_json", "chrono", "uuidv07"] }
|
||||||
|
diesel-derive-enum = { version = "1", features = ["postgres"] }
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
DROP TABLE fang_tasks;
|
DROP TABLE fang_tasks;
|
||||||
|
DROP TYPE fang_task_state;
|
||||||
|
|
|
@ -1,8 +1,14 @@
|
||||||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||||
|
|
||||||
|
CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');
|
||||||
|
|
||||||
CREATE TABLE fang_tasks (
|
CREATE TABLE fang_tasks (
|
||||||
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||||
metadata jsonb NOT NULL,
|
metadata jsonb NOT NULL,
|
||||||
|
error_message TEXT,
|
||||||
|
state fang_task_state default 'new' NOT NULL,
|
||||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
|
||||||
|
|
128
src/postgres.rs
128
src/postgres.rs
|
@ -1,4 +1,5 @@
|
||||||
use crate::schema::fang_tasks;
|
use crate::schema::fang_tasks;
|
||||||
|
use crate::schema::FangTaskState;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use diesel::pg::PgConnection;
|
use diesel::pg::PgConnection;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
@ -12,6 +13,8 @@ use uuid::Uuid;
|
||||||
pub struct Task {
|
pub struct Task {
|
||||||
pub id: Uuid,
|
pub id: Uuid,
|
||||||
pub metadata: serde_json::Value,
|
pub metadata: serde_json::Value,
|
||||||
|
pub error_message: Option<String>,
|
||||||
|
pub state: FangTaskState,
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
pub updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
@ -54,4 +57,129 @@ impl Postgres {
|
||||||
.values(params)
|
.values(params)
|
||||||
.get_result::<Task>(&self.connection)
|
.get_result::<Task>(&self.connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn fetch_task(&self) -> Option<Task> {
|
||||||
|
match fang_tasks::table
|
||||||
|
.order(fang_tasks::created_at.asc())
|
||||||
|
.limit(1)
|
||||||
|
.for_update()
|
||||||
|
.get_result::<Task>(&self.connection)
|
||||||
|
{
|
||||||
|
Ok(record) => Some(record),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod postgres_tests {
|
||||||
|
use super::NewTask;
|
||||||
|
use super::Postgres;
|
||||||
|
use super::Task;
|
||||||
|
use crate::schema::fang_tasks;
|
||||||
|
use crate::schema::FangTaskState;
|
||||||
|
use chrono::{Duration, Utc};
|
||||||
|
use diesel::connection::Connection;
|
||||||
|
use diesel::prelude::*;
|
||||||
|
use diesel::result::Error;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_inserts_task() {
|
||||||
|
let postgres = Postgres::new(None);
|
||||||
|
|
||||||
|
let new_task = NewTask {
|
||||||
|
metadata: serde_json::json!(true),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = postgres
|
||||||
|
.connection
|
||||||
|
.test_transaction::<Task, Error, _>(|| postgres.insert(&new_task));
|
||||||
|
|
||||||
|
assert_eq!(result.state, FangTaskState::New);
|
||||||
|
assert_eq!(result.error_message, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fetch_task_fetches_the_oldest_task() {
|
||||||
|
let postgres = Postgres::new(None);
|
||||||
|
|
||||||
|
postgres.connection.test_transaction::<(), Error, _>(|| {
|
||||||
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
|
let task1 = diesel::insert_into(fang_tasks::table)
|
||||||
|
.values(&vec![(
|
||||||
|
fang_tasks::metadata.eq(serde_json::json!(true)),
|
||||||
|
fang_tasks::created_at.eq(timestamp1),
|
||||||
|
)])
|
||||||
|
.get_result::<Task>(&postgres.connection)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let timestamp2 = Utc::now() - Duration::hours(20);
|
||||||
|
|
||||||
|
diesel::insert_into(fang_tasks::table)
|
||||||
|
.values(&vec![(
|
||||||
|
fang_tasks::metadata.eq(serde_json::json!(false)),
|
||||||
|
fang_tasks::created_at.eq(timestamp2),
|
||||||
|
)])
|
||||||
|
.get_result::<Task>(&postgres.connection)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let found_task = postgres.fetch_task().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(found_task.id, task1.id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn fetch_task_locks_the_record() {
|
||||||
|
let postgres = Postgres::new(None);
|
||||||
|
let timestamp1 = Utc::now() - Duration::hours(40);
|
||||||
|
|
||||||
|
let task1 = diesel::insert_into(fang_tasks::table)
|
||||||
|
.values(&vec![(
|
||||||
|
fang_tasks::metadata.eq(serde_json::json!(true)),
|
||||||
|
fang_tasks::created_at.eq(timestamp1),
|
||||||
|
)])
|
||||||
|
.get_result::<Task>(&postgres.connection)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let timestamp2 = Utc::now() - Duration::hours(20);
|
||||||
|
|
||||||
|
let task2 = diesel::insert_into(fang_tasks::table)
|
||||||
|
.values(&vec![(
|
||||||
|
fang_tasks::metadata.eq(serde_json::json!(false)),
|
||||||
|
fang_tasks::created_at.eq(timestamp2),
|
||||||
|
)])
|
||||||
|
.get_result::<Task>(&postgres.connection)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let thread = std::thread::spawn(move || {
|
||||||
|
let postgres = Postgres::new(None);
|
||||||
|
|
||||||
|
postgres.connection.test_transaction::<(), Error, _>(|| {
|
||||||
|
let found_task = postgres.fetch_task().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(found_task.id, task2.id);
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(5000));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||||
|
|
||||||
|
let found_task = postgres.fetch_task().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(found_task.id, task1.id);
|
||||||
|
|
||||||
|
let result = thread.join();
|
||||||
|
|
||||||
|
eprintln!("{:?}", result);
|
||||||
|
|
||||||
|
// assert_eq!(Ok(()), result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,27 @@
|
||||||
|
use diesel_derive_enum::DbEnum;
|
||||||
|
|
||||||
|
#[derive(DbEnum, Debug, Eq, PartialEq)]
|
||||||
|
pub enum FangTaskState {
|
||||||
|
New,
|
||||||
|
InProgress,
|
||||||
|
Failed,
|
||||||
|
Finished,
|
||||||
|
}
|
||||||
|
|
||||||
table! {
|
table! {
|
||||||
|
use super::FangTaskStateMapping;
|
||||||
|
use diesel::sql_types::Jsonb;
|
||||||
|
use diesel::sql_types::Nullable;
|
||||||
|
use diesel::sql_types::Text;
|
||||||
|
use diesel::sql_types::Timestamptz;
|
||||||
|
use diesel::sql_types::Uuid;
|
||||||
|
|
||||||
|
|
||||||
fang_tasks (id) {
|
fang_tasks (id) {
|
||||||
id -> Uuid,
|
id -> Uuid,
|
||||||
metadata -> Jsonb,
|
metadata -> Jsonb,
|
||||||
|
error_message -> Nullable<Text>,
|
||||||
|
state -> FangTaskStateMapping,
|
||||||
created_at -> Timestamptz,
|
created_at -> Timestamptz,
|
||||||
updated_at -> Timestamptz,
|
updated_at -> Timestamptz,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue