Use postgres for job storage

This commit is contained in:
asonix 2020-03-22 16:18:36 -05:00
parent d00961ae86
commit 6511e7f32e
11 changed files with 249 additions and 19 deletions

24
Cargo.lock generated
View file

@ -455,7 +455,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.8.0-alpha.0" version = "0.8.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
@ -464,7 +464,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.7.0-alpha.0" version = "0.7.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f"
dependencies = [ dependencies = [
"actix", "actix",
"actix-rt", "actix-rt",
@ -479,12 +479,13 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"uuid",
] ]
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.7.0" version = "0.7.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -494,6 +495,7 @@ dependencies = [
"serde 1.0.105", "serde 1.0.105",
"serde_json", "serde_json",
"thiserror", "thiserror",
"uuid",
] ]
[[package]] [[package]]
@ -1664,8 +1666,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e634590e8812c500088d88db721195979223dabb05149f43cb50931d0ff5865d" checksum = "e634590e8812c500088d88db721195979223dabb05149f43cb50931d0ff5865d"
dependencies = [ dependencies = [
"bytes", "bytes",
"chrono",
"fallible-iterator", "fallible-iterator",
"postgres-protocol", "postgres-protocol",
"serde 1.0.105",
"serde_json",
"uuid",
] ]
[[package]] [[package]]
@ -1712,14 +1718,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro-hack" name = "proc-macro-hack"
version = "0.5.12" version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f918f2b601f93baa836c1c2945faef682ba5b6d4828ecb45eeb7cc3c71b811b4" checksum = "fcfdefadc3d57ca21cf17990a28ef4c0f7c61383a28cb7604cf4a18e6ede1420"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "proc-macro-nested" name = "proc-macro-nested"
@ -1826,7 +1827,9 @@ dependencies = [
"actix-web", "actix-web",
"actix-webfinger", "actix-webfinger",
"anyhow", "anyhow",
"async-trait",
"background-jobs", "background-jobs",
"background-jobs-core",
"base64 0.12.0", "base64 0.12.0",
"bb8-postgres", "bb8-postgres",
"config", "config",
@ -2640,6 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
dependencies = [ dependencies = [
"rand", "rand",
"serde 1.0.105",
] ]
[[package]] [[package]]

View file

@ -19,9 +19,11 @@ actix-rt = "1.0.0"
actix-web = { version = "3.0.0-alpha.1", features = ["rustls"] } actix-web = { version = "3.0.0-alpha.1", features = ["rustls"] }
actix-webfinger = "0.3.0-alpha.3" actix-webfinger = "0.3.0-alpha.3"
activitystreams = "0.5.0-alpha.11" activitystreams = "0.5.0-alpha.11"
async-trait = "0.1.24"
background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] }
background-jobs-core = { version = "0.7.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" }
base64 = "0.12" base64 = "0.12"
bb8-postgres = "0.4.0" bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] }
config = "0.10.1" config = "0.10.1"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.7.1" env_logger = "0.7.1"
@ -43,7 +45,7 @@ structopt = "0.3.12"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "0.2.13", features = ["sync"] } tokio = { version = "0.2.13", features = ["sync"] }
ttl_cache = "0.5.1" ttl_cache = "0.5.1"
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4", "serde"] }
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"

View file

@ -0,0 +1,5 @@
FROM asonix/diesel-cli:v1.4.0-r0-arm64v8
COPY migrations /migrations
CMD ["diesel", "migration", "run", "--migration-dir", "/migrations"]

View file

@ -32,6 +32,7 @@ cross build \
mkdir -p artifacts mkdir -p artifacts
cp ./target/aarch64-unknown-linux-musl/release/relay artifacts/relay cp ./target/aarch64-unknown-linux-musl/release/relay artifacts/relay
cp -r ./migrations artifacts/migrations
# from `sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes` # from `sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes`
docker build \ docker build \
@ -39,12 +40,23 @@ docker build \
--no-cache \ --no-cache \
--build-arg BUILD_DATE="${BUILD_DATE}" \ --build-arg BUILD_DATE="${BUILD_DATE}" \
--build-arg TAG="${TAG}" \ --build-arg TAG="${TAG}" \
-f "Dockerfile.arm64v8" \ -f Dockerfile.arm64v8 \
-t "asonix/relay:${VERSION}-arm64v8" \ -t "asonix/relay:${VERSION}-arm64v8" \
-t "asonix/relay:latest-arm64v8" \ -t "asonix/relay:latest-arm64v8" \
-t "asonix/relay:latest" \ -t "asonix/relay:latest" \
./artifacts ./artifacts
docker build \
--pull \
--no-cache \
--build-arg BUILD_DATE="${BUILD_DATE}" \
--build-arg TAG="${TAG}" \
-f Dockerfile.migrations.arm64v8 \
-t "asonix/relay-migrations:${VERSION}-arm64v8" \
-t "asonix/relay-migrations:latest-arm64v8" \
-t "asonix/relay-migrations:latest" \
./artifacts
docker push "asonix/relay:${VERSION}-arm64v8" docker push "asonix/relay:${VERSION}-arm64v8"
docker push "asonix/relay:latest-arm64v8" docker push "asonix/relay:latest-arm64v8"
docker push "asonix/relay:latest" docker push "asonix/relay:latest"

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP INDEX jobs_queue_status_index;
DROP TABLE jobs;

View file

@ -0,0 +1,17 @@
-- Your SQL goes here
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID UNIQUE NOT NULL,
job_queue TEXT NOT NULL,
job_timeout BIGINT NOT NULL,
job_updated TIMESTAMP NOT NULL,
job_status TEXT NOT NULL,
job_value JSONB NOT NULL,
job_next_run TIMESTAMP,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX jobs_queue_status_index ON jobs(job_queue, job_status);
SELECT diesel_manage_updated_at('jobs');

View file

@ -33,6 +33,10 @@ impl Db {
Ok(Db { pool }) Ok(Db { pool })
} }
pub fn pool(&self) -> &Pool {
&self.pool
}
pub async fn remove_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> { pub async fn remove_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> {
let conn = self.pool.get().await?; let conn = self.pool.get().await?;

View file

@ -1,17 +1,19 @@
mod deliver; mod deliver;
mod deliver_many; mod deliver_many;
mod storage;
pub use self::{deliver::Deliver, deliver_many::DeliverMany}; pub use self::{deliver::Deliver, deliver_many::DeliverMany};
use crate::{ use crate::{
db::Db,
error::MyError, error::MyError,
jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor}, jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor, storage::Storage},
requests::Requests, requests::Requests,
state::State, state::State,
}; };
use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; use background_jobs::{Job, QueueHandle, WorkerConfig};
pub fn create_server() -> JobServer { pub fn create_server(db: Db) -> JobServer {
JobServer::new(background_jobs::create_server(Storage::new())) JobServer::new(background_jobs::create_server(Storage::new(db)))
} }
pub fn create_workers(state: State, job_server: JobServer) { pub fn create_workers(state: State, job_server: JobServer) {

165
src/jobs/storage.rs Normal file
View file

@ -0,0 +1,165 @@
use crate::{db::Db, error::MyError};
use background_jobs_core::{JobInfo, Stats};
use log::debug;
use uuid::Uuid;
#[derive(Clone)]
pub struct Storage {
db: Db,
}
impl Storage {
pub fn new(db: Db) -> Self {
Storage { db }
}
}
#[async_trait::async_trait]
impl background_jobs_core::Storage for Storage {
type Error = MyError;
async fn generate_id(&self) -> Result<Uuid, MyError> {
// TODO: Ensure unique job id
Ok(Uuid::new_v4())
}
async fn save_job(&self, job: JobInfo) -> Result<(), MyError> {
let id = job.id();
let queue = job.queue().to_owned();
let timeout = job.timeout();
let updated = job.updated_at().naive_utc();
let status = job.status().to_string();
let next_queue = job.next_queue().map(|q| q.naive_utc());
let value = serde_json::to_value(job)?;
let conn = self.db.pool().get().await?;
debug!("Inserting job {} status {} for queue {}", id, status, queue);
conn.execute(
"INSERT INTO jobs
(job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at)
VALUES
($1::UUID, $2::TEXT, $3::BIGINT, $4::TIMESTAMP, $5::TEXT, $6::TIMESTAMP, $7::JSONB, 'now')
ON CONFLICT (job_id)
DO UPDATE SET
job_updated = $4::TIMESTAMP,
job_status = $5::TEXT,
job_next_run = $6::TIMESTAMP,
job_value = $7::JSONB;",
&[&id, &queue, &timeout, &updated, &status, &next_queue, &value],
)
.await?;
Ok(())
}
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, MyError> {
let conn = self.db.pool().get().await?;
debug!(
"SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]",
id
);
let rows = conn
.query(
"SELECT job_value
FROM jobs
WHERE job_id = $1::UUID
LIMIT 1;",
&[&id],
)
.await?;
let row = if let Some(row) = rows.into_iter().next() {
row
} else {
return Ok(None);
};
let value = row.try_get(0)?;
Ok(Some(serde_json::from_value(value)?))
}
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, MyError> {
let conn = self.db.pool().get().await?;
let row = conn
.query_opt(
"UPDATE jobs
SET
job_status = 'Running',
job_updated = 'now'
WHERE
job_id = (
SELECT job_id
FROM jobs
WHERE
job_queue = $1::TEXT
AND
(
job_next_run IS NULL
OR
job_next_run < now()
)
AND
(
job_status = 'Pending'
OR
(
job_status = 'Running'
AND
NOW() > (INTERVAL '1 millisecond' * job_timeout + job_updated)
)
)
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING job_value;",
&[&queue],
)
.await?;
let row = if let Some(row) = row {
row
} else {
return Ok(None);
};
let value = row.try_get(0)?;
let job: JobInfo = serde_json::from_value(value)?;
debug!("Found job {} in queue {}", job.id(), queue);
Ok(Some(job))
}
async fn queue_job(&self, _queue: &str, _id: Uuid) -> Result<(), MyError> {
// Queue Job is a no-op, since jobs are always in their queue
Ok(())
}
async fn run_job(&self, _id: Uuid, _runner_id: Uuid) -> Result<(), MyError> {
// Run Job is a no-op, since jobs are marked running at fetch
Ok(())
}
async fn delete_job(&self, id: Uuid) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
debug!("Deleting job {}", id);
conn.execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id])
.await?;
Ok(())
}
async fn get_stats(&self) -> Result<Stats, MyError> {
// TODO: Stats are unimplemented
Ok(Stats::default())
}
async fn update_stats<F>(&self, _f: F) -> Result<(), MyError>
where
F: Fn(Stats) -> Stats + Send,
{
// TODO: Stats are unimplemented
Ok(())
}
}

View file

@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> {
let config = Config::build()?; let config = Config::build()?;
if config.debug() { if config.debug() {
std::env::set_var("RUST_LOG", "debug") std::env::set_var("RUST_LOG", "debug,tokio_postgres=info")
} else { } else {
std::env::set_var("RUST_LOG", "info") std::env::set_var("RUST_LOG", "info")
} }
@ -109,7 +109,7 @@ async fn main() -> Result<(), anyhow::Error> {
rehydrate::spawn(db.clone(), state.clone()); rehydrate::spawn(db.clone(), state.clone());
let job_server = create_server(); let job_server = create_server(db.clone());
let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone());

View file

@ -7,6 +7,21 @@ table! {
} }
} }
table! {
jobs (id) {
id -> Uuid,
job_id -> Uuid,
job_queue -> Text,
job_timeout -> Int8,
job_updated -> Timestamp,
job_status -> Text,
job_value -> Jsonb,
job_next_run -> Nullable<Timestamp>,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! { table! {
listeners (id) { listeners (id) {
id -> Uuid, id -> Uuid,
@ -37,6 +52,7 @@ table! {
allow_tables_to_appear_in_same_query!( allow_tables_to_appear_in_same_query!(
blocks, blocks,
jobs,
listeners, listeners,
settings, settings,
whitelists, whitelists,