From abe324fc57886b5f077d5cf8b7b14bd240cce1ce Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Tue, 19 Jul 2022 13:49:43 +0000 Subject: [PATCH] Fetch task (#26) * fetch task async and renaming every Job/job to Task/task * fixing cargo clippy * Task instead Job in readme * change one task because if not test will fail, redefined all tasks structs * derive feature * deleting schema * changing query * task builder * fix bug enum fang task state * fetch_test based on metadata, good improve testing fetch_task * deleting toSql FromSql derives that are useless for Task * builders and fail_task change --- Cargo.toml | 3 +- README.md | 32 ++--- src/asynk/async_queue.rs | 174 ++++++++++++++++++++++++-- src/asynk/queries/fetch_task_type.sql | 1 + src/lib.rs | 40 ------ src/sync/queue.rs | 38 +++++- 6 files changed, 219 insertions(+), 69 deletions(-) create mode 100644 src/asynk/queries/fetch_task_type.sql diff --git a/Cargo.toml b/Cargo.toml index 919566c..6741d8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,8 @@ typetag = "0.2" log = "0.4" serde = { version = "1", features = ["derive"] } thiserror = "1.0" -bb8-postgres = {version = "0.8", features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4"]} +bb8-postgres = {version = "0.8", features = ["with-serde_json-1" , "with-uuid-0_8" , "with-chrono-0_4" ]} +postgres-types = { version = "0.X.X", features = ["derive"] } tokio = { version = "1.20", features = ["full"] } async-trait = "0.1" typed-builder = "0.10" diff --git a/README.md b/README.md index e836446..f7c2d2d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ # Fang -Background job processing library for Rust. It uses Postgres DB as a task queue. +Background task processing library for Rust. It uses Postgres DB as a task queue. ## Installation @@ -24,9 +24,9 @@ serde = { version = "1.0", features = ["derive"] } ## Usage -### Defining a job +### Defining a task -Every job should implement `fang::Runnable` trait which is used by `fang` to execute it. +Every task should implement `fang::Runnable` trait which is used by `fang` to execute it. ```rust use fang::Error; @@ -36,12 +36,12 @@ use fang::PgConnection; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] -struct Job { +struct MyTask { pub number: u16, } #[typetag::serde] -impl Runnable for Job { +impl Runnable for MyTask { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { println!("the number is {}", self.number); @@ -50,13 +50,13 @@ impl Runnable for Job { } ``` -As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the job. +As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task. -The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the job queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it. +The second parameter of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it. -### Enqueuing a job +### Enqueuing a task -To enqueue a job use `Queue::enqueue_task` +To enqueue a task use `Queue::enqueue_task` ```rust @@ -64,17 +64,17 @@ use fang::Queue; ... -Queue::enqueue_task(&Job { number: 10 }).unwrap(); +Queue::enqueue_task(&MyTask { number: 10 }).unwrap(); ``` -The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several jobs use Postgres struct instance: +The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several tasks use Postgres struct instance: ```rust let queue = Queue::new(); for id in &unsynced_feed_ids { - queue.push_task(&SyncFeedJob { feed_id: *id }).unwrap(); + queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap(); } ``` @@ -82,7 +82,7 @@ for id in &unsynced_feed_ids { Or you can use `PgConnection` struct: ```rust -Queue::push_task_query(pg_connection, &new_job).unwrap(); +Queue::push_task_query(pg_connection, &new_task).unwrap(); ``` ### Starting workers @@ -131,7 +131,7 @@ Add `task_type` method to the `Runnable` trait implementation: ... #[typetag::serde] -impl Runnable for Job { +impl Runnable for MyTask { fn run(&self) -> Result<(), Error> { println!("the number is {}", self.number); @@ -222,11 +222,11 @@ use fang::Queue; let queue = Queue::new(); queue - .push_periodic_task(&SyncJob::default(), 120) + .push_periodic_task(&SyncMyTask::default(), 120) .unwrap(); queue - .push_periodic_task(&DeliverJob::default(), 60) + .push_periodic_task(&DeliverMyTask::default(), 60) .unwrap(); Scheduler::start(10, 5); diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 99e4fef..1bb4386 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,17 +1,85 @@ use crate::asynk::AsyncRunnable; -use crate::Task; use bb8_postgres::bb8::Pool; use bb8_postgres::bb8::RunError; +use bb8_postgres::tokio_postgres::row::Row; use bb8_postgres::tokio_postgres::tls::MakeTlsConnect; use bb8_postgres::tokio_postgres::tls::TlsConnect; -use bb8_postgres::tokio_postgres::types::ToSql; use bb8_postgres::tokio_postgres::Socket; use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; +use chrono::DateTime; use chrono::Utc; +use postgres_types::{FromSql, ToSql}; use thiserror::Error; use typed_builder::TypedBuilder; +use uuid::Uuid; +#[derive(Debug, Eq, PartialEq, Clone, ToSql, FromSql)] +#[postgres(name = "fang_task_state")] +pub enum FangTaskState { + #[postgres(name = "new")] + New, + #[postgres(name = "in_progress")] + InProgress, + #[postgres(name = "failed")] + Failed, + #[postgres(name = "finished")] + Finished, +} +impl Default for FangTaskState { + fn default() -> Self { + FangTaskState::New + } +} +#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] +pub struct Task { + #[builder(setter(into))] + pub id: Uuid, + #[builder(setter(into))] + pub metadata: serde_json::Value, + #[builder(setter(into))] + pub error_message: Option, + #[builder(default, setter(into))] + pub state: FangTaskState, + #[builder(setter(into))] + pub task_type: String, + #[builder(setter(into))] + pub created_at: DateTime, + #[builder(setter(into))] + pub updated_at: DateTime, +} + +#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] +pub struct PeriodicTask { + #[builder(setter(into))] + pub id: Uuid, + #[builder(setter(into))] + pub metadata: serde_json::Value, + #[builder(setter(into))] + pub period_in_seconds: i32, + #[builder(setter(into))] + pub scheduled_at: Option>, + #[builder(setter(into))] + pub created_at: DateTime, + #[builder(setter(into))] + pub updated_at: DateTime, +} + +#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] +pub struct NewTask { + #[builder(setter(into))] + pub metadata: serde_json::Value, + #[builder(setter(into))] + pub task_type: String, +} + +#[derive(TypedBuilder, Debug, Eq, PartialEq, Clone)] +pub struct NewPeriodicTask { + #[builder(setter(into))] + pub metadata: serde_json::Value, + #[builder(setter(into))] + pub period_in_seconds: i32, +} #[derive(Debug, Error)] pub enum AsyncQueueError { #[error(transparent)] @@ -46,6 +114,7 @@ const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql"); const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); +const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); impl<'a, Tls> AsyncQueue<'a, Tls> where @@ -83,6 +152,58 @@ where None => Err(AsyncQueueError::TransactionEmpty), } } + pub async fn fetch_task( + &mut self, + task_type: &Option, + ) -> Result { + let mut task = match task_type { + None => self.get_task_type("common").await?, + Some(task_type_str) => self.get_task_type(task_type_str).await?, + }; + self.update_task_state(&task, FangTaskState::InProgress) + .await?; + task.state = FangTaskState::InProgress; + Ok(task) + } + pub async fn get_task_type(&mut self, task_type: &str) -> Result { + let row: Row = self.get_row(FETCH_TASK_TYPE_QUERY, &[&task_type]).await?; + let id: Uuid = row.get("id"); + let metadata: serde_json::Value = row.get("metadata"); + let error_message: Option = match row.try_get("error_message") { + Ok(error_message) => Some(error_message), + Err(_) => None, + }; + let state: FangTaskState = FangTaskState::New; + let task_type: String = row.get("task_type"); + let created_at: DateTime = row.get("created_at"); + let updated_at: DateTime = row.get("updated_at"); + let task = Task::builder() + .id(id) + .metadata(metadata) + .error_message(error_message) + .state(state) + .task_type(task_type) + .created_at(created_at) + .updated_at(updated_at) + .build(); + Ok(task) + } + pub async fn get_row( + &mut self, + query: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result { + let row: Row = if let Some(pool) = &self.pool { + let connection = pool.get().await?; + + connection.query_one(query, params).await? + } else if let Some(transaction) = &self.transaction { + transaction.query_one(query, params).await? + } else { + return Err(AsyncQueueError::PoolAndTransactionEmpty); + }; + Ok(row) + } pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { let metadata = serde_json::to_value(task).unwrap(); let task_type = task.task_type(); @@ -93,7 +214,7 @@ where pub async fn update_task_state( &mut self, task: &Task, - state: &str, + state: FangTaskState, ) -> Result { let updated_at = Utc::now(); self.execute( @@ -117,7 +238,12 @@ where let updated_at = Utc::now(); self.execute( FAIL_TASK_QUERY, - &[&"failed", &task.error_message, &updated_at, &task.id], + &[ + &FangTaskState::Failed, + &task.error_message, + &updated_at, + &task.id, + ], Some(1), ) .await @@ -163,13 +289,13 @@ mod async_queue_tests { use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] - struct Job { + struct AsyncTask { pub number: u16, } #[typetag::serde] #[async_trait] - impl AsyncRunnable for Job { + impl AsyncRunnable for AsyncTask { async fn run(&self, _connection: &Client) -> Result<(), Error> { Ok(()) } @@ -182,7 +308,7 @@ mod async_queue_tests { let transaction = connection.transaction().await.unwrap(); let mut queue = AsyncQueue::::new_with_transaction(transaction); - let result = queue.insert_task(&Job { number: 1 }).await.unwrap(); + let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); assert_eq!(1, result); queue.rollback().await.unwrap(); @@ -195,14 +321,40 @@ mod async_queue_tests { let transaction = connection.transaction().await.unwrap(); let mut queue = AsyncQueue::::new_with_transaction(transaction); - let result = queue.insert_task(&Job { number: 1 }).await.unwrap(); + let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); assert_eq!(1, result); - let result = queue.insert_task(&Job { number: 2 }).await.unwrap(); + let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap(); assert_eq!(1, result); let result = queue.remove_all_tasks().await.unwrap(); assert_eq!(2, result); queue.rollback().await.unwrap(); } + + #[tokio::test] + async fn fetch_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + let mut queue = AsyncQueue::::new_with_transaction(transaction); + + let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); + assert_eq!(1, result); + let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap(); + assert_eq!(1, result); + let task = queue.fetch_task(&None).await.unwrap(); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + let task = queue.fetch_task(&None).await.unwrap(); + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + assert_eq!(Some(2), number); + assert_eq!(Some("AsyncTask"), type_task); + queue.rollback().await.unwrap(); + } #[tokio::test] async fn remove_tasks_type_test() { let pool = pool().await; @@ -210,9 +362,9 @@ mod async_queue_tests { let transaction = connection.transaction().await.unwrap(); let mut queue = AsyncQueue::::new_with_transaction(transaction); - let result = queue.insert_task(&Job { number: 1 }).await.unwrap(); + let result = queue.insert_task(&AsyncTask { number: 1 }).await.unwrap(); assert_eq!(1, result); - let result = queue.insert_task(&Job { number: 2 }).await.unwrap(); + let result = queue.insert_task(&AsyncTask { number: 2 }).await.unwrap(); assert_eq!(1, result); let result = queue.remove_tasks_type("common").await.unwrap(); assert_eq!(2, result); diff --git a/src/asynk/queries/fetch_task_type.sql b/src/asynk/queries/fetch_task_type.sql new file mode 100644 index 0000000..360a5fa --- /dev/null +++ b/src/asynk/queries/fetch_task_type.sql @@ -0,0 +1 @@ +SELECT * FROM fang_tasks WHERE state = 'new' AND task_type = $1 ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/src/lib.rs b/src/lib.rs index 14694f9..a6d3bf1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,46 +14,6 @@ // pub use scheduler::*; // pub use schema::*; // pub use worker_pool::*; -use chrono::DateTime; -use chrono::Utc; -use uuid::Uuid; - -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] -#[table_name = "fang_tasks"] -pub struct Task { - pub id: Uuid, - pub metadata: serde_json::Value, - pub error_message: Option, - pub state: FangTaskState, - pub task_type: String, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] -#[table_name = "fang_periodic_tasks"] -pub struct PeriodicTask { - pub id: Uuid, - pub metadata: serde_json::Value, - pub period_in_seconds: i32, - pub scheduled_at: Option>, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Insertable)] -#[table_name = "fang_tasks"] -pub struct NewTask { - pub metadata: serde_json::Value, - pub task_type: String, -} - -#[derive(Insertable)] -#[table_name = "fang_periodic_tasks"] -pub struct NewPeriodicTask { - pub metadata: serde_json::Value, - pub period_in_seconds: i32, -} #[macro_use] extern crate diesel; diff --git a/src/sync/queue.rs b/src/sync/queue.rs index 62a93d9..8302078 100644 --- a/src/sync/queue.rs +++ b/src/sync/queue.rs @@ -2,7 +2,6 @@ use crate::executor::Runnable; use crate::schema::fang_periodic_tasks; use crate::schema::fang_tasks; use crate::schema::FangTaskState; -use crate::{NewPeriodicTask, NewTask, PeriodicTask, Task}; use chrono::DateTime; use chrono::Duration; use chrono::Utc; @@ -14,6 +13,43 @@ use dotenv::dotenv; use std::env; use uuid::Uuid; +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] +#[table_name = "fang_tasks"] +pub struct Task { + pub id: Uuid, + pub metadata: serde_json::Value, + pub error_message: Option, + pub state: FangTaskState, + pub task_type: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] +#[table_name = "fang_periodic_tasks"] +pub struct PeriodicTask { + pub id: Uuid, + pub metadata: serde_json::Value, + pub period_in_seconds: i32, + pub scheduled_at: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Insertable)] +#[table_name = "fang_tasks"] +pub struct NewTask { + pub metadata: serde_json::Value, + pub task_type: String, +} + +#[derive(Insertable)] +#[table_name = "fang_periodic_tasks"] +pub struct NewPeriodicTask { + pub metadata: serde_json::Value, + pub period_in_seconds: i32, +} + pub struct Queue { pub connection: PgConnection, }