From db5284e162979664dd36020be01e86119f865a30 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 10 Jul 2020 20:00:31 -0500 Subject: [PATCH] Use memory job store because why not --- src/jobs/mod.rs | 9 ++- src/jobs/storage.rs | 171 -------------------------------------------- src/main.rs | 2 +- 3 files changed, 5 insertions(+), 177 deletions(-) delete mode 100644 src/jobs/storage.rs diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 08f12bb..ca53b72 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -5,7 +5,6 @@ mod deliver_many; mod instance; mod nodeinfo; mod process_listeners; -mod storage; pub use self::{ cache_media::CacheMedia, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, @@ -17,14 +16,14 @@ use crate::{ data::{ActorCache, Media, NodeCache, State}, db::Db, error::MyError, - jobs::{process_listeners::Listeners, storage::Storage}, + jobs::process_listeners::Listeners, requests::Requests, }; -use background_jobs::{Job, QueueHandle, WorkerConfig}; +use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; use std::time::Duration; -pub fn create_server(db: Db) -> JobServer { - let shared = background_jobs::create_server(Storage::new(db)); +pub fn create_server() -> JobServer { + let shared = background_jobs::create_server(Storage::new()); shared.every(Duration::from_secs(60 * 5), Listeners); diff --git a/src/jobs/storage.rs b/src/jobs/storage.rs deleted file mode 100644 index a25e1ab..0000000 --- a/src/jobs/storage.rs +++ /dev/null @@ -1,171 +0,0 @@ -use crate::{db::Db, error::MyError}; -use background_jobs::{dev::JobInfo, Stats}; -use log::debug; -use tokio_postgres::types::Json; -use uuid::Uuid; - -#[derive(Clone)] -pub struct Storage { - db: Db, -} - -impl Storage { - pub fn new(db: Db) -> Self { - Storage { db } - } -} - -#[async_trait::async_trait] -impl background_jobs::dev::Storage for Storage { - type Error = MyError; - - async fn generate_id(&self) -> Result { - // TODO: Ensure unique job id - Ok(Uuid::new_v4()) - } - - async fn save_job(&self, job: JobInfo) -> Result<(), MyError> { - debug!( - "Inserting job {} status {} for queue {}", - job.id(), - job.status(), - job.queue() - ); - self.db.pool().get().await?.execute( - "INSERT INTO jobs - (job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at) - VALUES - ($1::UUID, $2::TEXT, $3::BIGINT, $4::TIMESTAMP, $5::TEXT, $6::TIMESTAMP, $7::JSONB, 'now') - ON CONFLICT (job_id) - DO UPDATE SET - job_updated = $4::TIMESTAMP, - job_status = $5::TEXT, - job_next_run = $6::TIMESTAMP, - job_value = $7::JSONB;", - &[&job.id(), &job.queue(), &job.timeout(), &job.updated_at().naive_utc(), &job.status().to_string(), &job.next_queue().map(|q| q.naive_utc()), &Json(&job)], - ) - .await?; - - Ok(()) - } - - async fn fetch_job(&self, id: Uuid) -> Result, MyError> { - debug!( - "SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]", - id - ); - let row_opt = self - .db - .pool() - .get() - .await? - .query_opt( - "SELECT job_value - FROM jobs - WHERE job_id = $1::UUID - LIMIT 1;", - &[&id], - ) - .await?; - - let row = if let Some(row) = row_opt { - row - } else { - return Ok(None); - }; - - let value: Json = row.try_get(0)?; - - Ok(Some(value.0)) - } - - async fn fetch_job_from_queue(&self, queue: &str) -> Result, MyError> { - let row_opt = self - .db - .pool() - .get() - .await? - .query_opt( - "UPDATE jobs - SET - job_status = 'Running', - job_updated = 'now' - WHERE - job_id = ( - SELECT job_id - FROM jobs - WHERE - job_queue = $1::TEXT - AND - ( - job_next_run IS NULL - OR - job_next_run < now() - ) - AND - ( - job_status = 'Pending' - OR - ( - job_status = 'Running' - AND - NOW() > (INTERVAL '1 millisecond' * job_timeout + job_updated) - ) - ) - LIMIT 1 - FOR UPDATE SKIP LOCKED - ) - RETURNING job_value;", - &[&queue], - ) - .await?; - - let row = if let Some(row) = row_opt { - row - } else { - return Ok(None); - }; - - let value: Json = row.try_get(0)?; - let job = value.0; - - debug!("Found job {} in queue {}", job.id(), queue); - - Ok(Some(job)) - } - - async fn queue_job(&self, _queue: &str, _id: Uuid) -> Result<(), MyError> { - // Queue Job is a no-op, since jobs are always in their queue - Ok(()) - } - - async fn run_job(&self, _id: Uuid, _runner_id: Uuid) -> Result<(), MyError> { - // Run Job is a no-op, since jobs are marked running at fetch - Ok(()) - } - - async fn delete_job(&self, id: Uuid) -> Result<(), MyError> { - debug!("Deleting job {}", id); - self.db - .pool() - .get() - .await? - .execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id]) - .await?; - - Ok(()) - } - - async fn get_stats(&self) -> Result { - // TODO: Stats are unimplemented - Ok(Stats::default()) - } - - async fn update_stats(&self, _f: F) -> Result<(), MyError> - where - F: Fn(Stats) -> Stats + Send, - { - // TODO: Stats are unimplemented - Ok(()) - } -} diff --git a/src/main.rs b/src/main.rs index c34a29b..f3aa48a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<(), anyhow::Error> { let media = Media::new(db.clone()); let state = State::hydrate(config.clone(), &db).await?; let actors = ActorCache::new(db.clone()); - let job_server = create_server(db.clone()); + let job_server = create_server(); notify::Notifier::new(config.database_url().parse()?) .register(notify::NewBlocks(state.clone()))