From c28585ef92710980618caecdd007984f58dab970 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sun, 11 Dec 2022 21:13:24 +0000 Subject: [PATCH] Implement job queue --- migrations/V0035__background_job.sql | 8 ++ migrations/schema.sql | 9 ++ src/models/background_jobs/mod.rs | 2 + src/models/background_jobs/queries.rs | 118 ++++++++++++++++++++++++++ src/models/background_jobs/types.rs | 81 ++++++++++++++++++ src/models/mod.rs | 1 + 6 files changed, 219 insertions(+) create mode 100644 migrations/V0035__background_job.sql create mode 100644 src/models/background_jobs/mod.rs create mode 100644 src/models/background_jobs/queries.rs create mode 100644 src/models/background_jobs/types.rs diff --git a/migrations/V0035__background_job.sql b/migrations/V0035__background_job.sql new file mode 100644 index 0000000..e370610 --- /dev/null +++ b/migrations/V0035__background_job.sql @@ -0,0 +1,8 @@ +CREATE TABLE background_job ( + id UUID PRIMARY KEY, + job_type SMALLINT NOT NULL, + job_data JSONB NOT NULL, + job_status SMALLINT NOT NULL DEFAULT 1, + scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); diff --git a/migrations/schema.sql b/migrations/schema.sql index 0d5aaa8..c9c9798 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -1,3 +1,12 @@ +CREATE TABLE background_job ( + id UUID PRIMARY KEY, + job_type SMALLINT NOT NULL, + job_data JSONB NOT NULL, + job_status SMALLINT NOT NULL DEFAULT 1, + scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + CREATE TABLE instance ( hostname VARCHAR(100) PRIMARY KEY ); diff --git a/src/models/background_jobs/mod.rs b/src/models/background_jobs/mod.rs new file mode 100644 index 0000000..0333ab5 --- /dev/null +++ b/src/models/background_jobs/mod.rs @@ -0,0 +1,2 @@ +pub mod queries; +pub mod types; diff --git a/src/models/background_jobs/queries.rs b/src/models/background_jobs/queries.rs new file mode 100644 index 0000000..9d58002 --- /dev/null +++ b/src/models/background_jobs/queries.rs @@ -0,0 +1,118 @@ +use chrono::{DateTime, Utc}; +use serde_json::Value; +use tokio_postgres::GenericClient; +use uuid::Uuid; + +use crate::database::DatabaseError; +use super::types::{DbBackgroundJob, JobStatus, JobType}; + +pub async fn enqueue_job( + db_client: &impl GenericClient, + job_type: &JobType, + job_data: &Value, + scheduled_for: &DateTime, +) -> Result<(), DatabaseError> { + let job_id = Uuid::new_v4(); + db_client.execute( + " + INSERT INTO background_job ( + id, + job_type, + job_data, + scheduled_for + ) + VALUES ($1, $2, $3, $4) + ", + &[&job_id, &job_type, &job_data, &scheduled_for], + ).await?; + Ok(()) +} + +pub async fn get_job_batch( + db_client: &impl GenericClient, + job_type: &JobType, + batch_size: i64, +) -> Result, DatabaseError> { + let rows = db_client.query( + " + UPDATE background_job + SET + job_status = $1, + updated_at = CURRENT_TIMESTAMP + WHERE id IN ( + SELECT id + FROM background_job + WHERE + job_type = $2 + AND job_status = $3 + AND scheduled_for < CURRENT_TIMESTAMP + ORDER BY scheduled_for ASC + LIMIT $4 + ) + RETURNING background_job + ", + &[ + &JobStatus::Running, + &job_type, + &JobStatus::Queued, + &batch_size, + ], + ).await?; + let jobs = rows.iter() + .map(|row| row.try_get("background_job")) + .collect::>()?; + Ok(jobs) +} + +pub async fn delete_job_from_queue( + db_client: &impl GenericClient, + job_id: &Uuid, +) -> Result<(), DatabaseError> { + let deleted_count = db_client.execute( + " + DELETE FROM background_job + WHERE id = $1 + ", + &[&job_id], + ).await?; + if deleted_count == 0 { + return Err(DatabaseError::NotFound("background job")); + }; + Ok(()) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + use serial_test::serial; + use crate::database::test_utils::create_test_database; + use super::*; + + #[tokio::test] + #[serial] + async fn test_queue() { + let db_client = &create_test_database().await; + let job_type = JobType::IncomingActivity; + let job_data = json!({ + "activity": {}, + "is_authenticated": true, + "failure_count": 0, + }); + let scheduled_for = Utc::now(); + enqueue_job(db_client, &job_type, &job_data, &scheduled_for).await.unwrap(); + + let batch_1 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + assert_eq!(batch_1.len(), 1); + let job = &batch_1[0]; + assert_eq!(job.job_type, job_type); + assert_eq!(job.job_data, job_data); + assert_eq!(job.job_status, JobStatus::Running); + + let batch_2 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + assert_eq!(batch_2.len(), 0); + + delete_job_from_queue(db_client, &job.id).await.unwrap(); + let batch_3 = get_job_batch(db_client, &job_type, 10).await.unwrap(); + assert_eq!(batch_3.len(), 0); + } +} diff --git a/src/models/background_jobs/types.rs b/src/models/background_jobs/types.rs new file mode 100644 index 0000000..2d251ce --- /dev/null +++ b/src/models/background_jobs/types.rs @@ -0,0 +1,81 @@ +use std::convert::TryFrom; + +use chrono::{DateTime, Utc}; +use serde_json::Value; +use postgres_types::FromSql; +use uuid::Uuid; + +use crate::database::{ + int_enum::{int_enum_from_sql, int_enum_to_sql}, + DatabaseTypeError, +}; + +#[derive(Debug, PartialEq)] +pub enum JobType { + IncomingActivity, +} + +impl From<&JobType> for i16 { + fn from(value: &JobType) -> i16 { + match value { + JobType::IncomingActivity => 1, + } + } +} + +impl TryFrom for JobType { + type Error = DatabaseTypeError; + + fn try_from(value: i16) -> Result { + let job_type = match value { + 1 => Self::IncomingActivity, + _ => return Err(DatabaseTypeError), + }; + Ok(job_type) + } +} + +int_enum_from_sql!(JobType); +int_enum_to_sql!(JobType); + +#[derive(Debug, PartialEq)] +pub enum JobStatus { + Queued, + Running, +} + +impl From<&JobStatus> for i16 { + fn from(value: &JobStatus) -> i16 { + match value { + JobStatus::Queued => 1, + JobStatus::Running => 2, + } + } +} + +impl TryFrom for JobStatus { + type Error = DatabaseTypeError; + + fn try_from(value: i16) -> Result { + let job_status = match value { + 1 => Self::Queued, + 2 => Self::Running, + _ => return Err(DatabaseTypeError), + }; + Ok(job_status) + } +} + +int_enum_from_sql!(JobStatus); +int_enum_to_sql!(JobStatus); + +#[derive(FromSql)] +#[postgres(name = "background_job")] +pub struct DbBackgroundJob { + pub id: Uuid, + pub job_type: JobType, + pub job_data: Value, + pub job_status: JobStatus, + pub scheduled_for: DateTime, + pub updated_at: DateTime, +} diff --git a/src/models/mod.rs b/src/models/mod.rs index 3cc757a..f626378 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,4 +1,5 @@ pub mod attachments; +pub mod background_jobs; pub mod cleanup; pub mod instances; pub mod invoices;