Implement job queue

This commit is contained in:
silverpill 2022-12-11 21:13:24 +00:00
parent e7d9733f96
commit c28585ef92
6 changed files with 219 additions and 0 deletions

View file

@ -0,0 +1,8 @@
CREATE TABLE background_job (
id UUID PRIMARY KEY,
job_type SMALLINT NOT NULL,
job_data JSONB NOT NULL,
job_status SMALLINT NOT NULL DEFAULT 1,
scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

View file

@ -1,3 +1,12 @@
CREATE TABLE background_job (
id UUID PRIMARY KEY,
job_type SMALLINT NOT NULL,
job_data JSONB NOT NULL,
job_status SMALLINT NOT NULL DEFAULT 1,
scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE instance ( CREATE TABLE instance (
hostname VARCHAR(100) PRIMARY KEY hostname VARCHAR(100) PRIMARY KEY
); );

View file

@ -0,0 +1,2 @@
pub mod queries;
pub mod types;

View file

@ -0,0 +1,118 @@
use chrono::{DateTime, Utc};
use serde_json::Value;
use tokio_postgres::GenericClient;
use uuid::Uuid;
use crate::database::DatabaseError;
use super::types::{DbBackgroundJob, JobStatus, JobType};
pub async fn enqueue_job(
db_client: &impl GenericClient,
job_type: &JobType,
job_data: &Value,
scheduled_for: &DateTime<Utc>,
) -> Result<(), DatabaseError> {
let job_id = Uuid::new_v4();
db_client.execute(
"
INSERT INTO background_job (
id,
job_type,
job_data,
scheduled_for
)
VALUES ($1, $2, $3, $4)
",
&[&job_id, &job_type, &job_data, &scheduled_for],
).await?;
Ok(())
}
pub async fn get_job_batch(
db_client: &impl GenericClient,
job_type: &JobType,
batch_size: i64,
) -> Result<Vec<DbBackgroundJob>, DatabaseError> {
let rows = db_client.query(
"
UPDATE background_job
SET
job_status = $1,
updated_at = CURRENT_TIMESTAMP
WHERE id IN (
SELECT id
FROM background_job
WHERE
job_type = $2
AND job_status = $3
AND scheduled_for < CURRENT_TIMESTAMP
ORDER BY scheduled_for ASC
LIMIT $4
)
RETURNING background_job
",
&[
&JobStatus::Running,
&job_type,
&JobStatus::Queued,
&batch_size,
],
).await?;
let jobs = rows.iter()
.map(|row| row.try_get("background_job"))
.collect::<Result<_, _>>()?;
Ok(jobs)
}
pub async fn delete_job_from_queue(
db_client: &impl GenericClient,
job_id: &Uuid,
) -> Result<(), DatabaseError> {
let deleted_count = db_client.execute(
"
DELETE FROM background_job
WHERE id = $1
",
&[&job_id],
).await?;
if deleted_count == 0 {
return Err(DatabaseError::NotFound("background job"));
};
Ok(())
}
#[cfg(test)]
mod tests {
use serde_json::json;
use serial_test::serial;
use crate::database::test_utils::create_test_database;
use super::*;
#[tokio::test]
#[serial]
async fn test_queue() {
let db_client = &create_test_database().await;
let job_type = JobType::IncomingActivity;
let job_data = json!({
"activity": {},
"is_authenticated": true,
"failure_count": 0,
});
let scheduled_for = Utc::now();
enqueue_job(db_client, &job_type, &job_data, &scheduled_for).await.unwrap();
let batch_1 = get_job_batch(db_client, &job_type, 10).await.unwrap();
assert_eq!(batch_1.len(), 1);
let job = &batch_1[0];
assert_eq!(job.job_type, job_type);
assert_eq!(job.job_data, job_data);
assert_eq!(job.job_status, JobStatus::Running);
let batch_2 = get_job_batch(db_client, &job_type, 10).await.unwrap();
assert_eq!(batch_2.len(), 0);
delete_job_from_queue(db_client, &job.id).await.unwrap();
let batch_3 = get_job_batch(db_client, &job_type, 10).await.unwrap();
assert_eq!(batch_3.len(), 0);
}
}

View file

@ -0,0 +1,81 @@
use std::convert::TryFrom;
use chrono::{DateTime, Utc};
use serde_json::Value;
use postgres_types::FromSql;
use uuid::Uuid;
use crate::database::{
int_enum::{int_enum_from_sql, int_enum_to_sql},
DatabaseTypeError,
};
#[derive(Debug, PartialEq)]
pub enum JobType {
IncomingActivity,
}
impl From<&JobType> for i16 {
fn from(value: &JobType) -> i16 {
match value {
JobType::IncomingActivity => 1,
}
}
}
impl TryFrom<i16> for JobType {
type Error = DatabaseTypeError;
fn try_from(value: i16) -> Result<Self, Self::Error> {
let job_type = match value {
1 => Self::IncomingActivity,
_ => return Err(DatabaseTypeError),
};
Ok(job_type)
}
}
int_enum_from_sql!(JobType);
int_enum_to_sql!(JobType);
#[derive(Debug, PartialEq)]
pub enum JobStatus {
Queued,
Running,
}
impl From<&JobStatus> for i16 {
fn from(value: &JobStatus) -> i16 {
match value {
JobStatus::Queued => 1,
JobStatus::Running => 2,
}
}
}
impl TryFrom<i16> for JobStatus {
type Error = DatabaseTypeError;
fn try_from(value: i16) -> Result<Self, Self::Error> {
let job_status = match value {
1 => Self::Queued,
2 => Self::Running,
_ => return Err(DatabaseTypeError),
};
Ok(job_status)
}
}
int_enum_from_sql!(JobStatus);
int_enum_to_sql!(JobStatus);
#[derive(FromSql)]
#[postgres(name = "background_job")]
pub struct DbBackgroundJob {
pub id: Uuid,
pub job_type: JobType,
pub job_data: Value,
pub job_status: JobStatus,
pub scheduled_for: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

View file

@ -1,4 +1,5 @@
pub mod attachments; pub mod attachments;
pub mod background_jobs;
pub mod cleanup; pub mod cleanup;
pub mod instances; pub mod instances;
pub mod invoices; pub mod invoices;