From 32856471178eb5f865b472a342b0a2415ba3d5e4 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Wed, 22 Mar 2023 10:50:52 +0100 Subject: [PATCH] Allow to schedule tasks directly with pg connection --- src/queries.rs | 5 ++++- src/queue.rs | 2 +- src/store.rs | 19 +++++++++++++++++++ src/task.rs | 13 ++++++++++--- 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/queries.rs b/src/queries.rs index a58ca60..3de578e 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -7,7 +7,10 @@ use chrono::Duration; use chrono::Utc; use diesel::prelude::*; use diesel::ExpressionMethods; -use diesel_async::{pg::AsyncPgConnection, RunQueryDsl}; +use diesel::query_builder::{Query, QueryFragment, QueryId}; +use diesel::sql_types::{HasSqlType, SingleValue}; +use diesel_async::return_futures::GetResult; +use diesel_async::{pg::AsyncPgConnection, AsyncConnection, RunQueryDsl}; impl Task { pub(crate) async fn remove( diff --git a/src/queue.rs b/src/queue.rs index 547d6e7..b69eaf0 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -25,7 +25,7 @@ where { // TODO: Add option to specify the timeout of a task self.task_store - .create_task(NewTask::new(background_task, Duration::from_secs(10))?) + .create_task(NewTask::with_timeout(background_task, Duration::from_secs(10))?) .await?; Ok(()) } diff --git a/src/store.rs b/src/store.rs index 1fd0b0b..07e1d84 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,5 +1,7 @@ use crate::errors::AsyncQueueError; use crate::task::{NewTask, Task, TaskId, TaskState}; +use crate::BackgroundTask; +use async_trait::async_trait; use diesel::result::Error::QueryBuilderError; use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::AsyncConnection; @@ -17,6 +19,23 @@ impl PgTaskStore { } } +/// A trait that is used to enqueue tasks for the PostgreSQL backend. +#[async_trait::async_trait] +pub trait PgQueueTask { + async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError>; +} + +impl PgQueueTask for T +where + T: BackgroundTask, +{ + async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError> { + let new_task = NewTask::new::(self)?; + Task::insert(connection, new_task).await?; + Ok(()) + } +} + #[async_trait::async_trait] impl TaskStore for PgTaskStore { async fn pull_next_task( diff --git a/src/task.rs b/src/task.rs index ebe553c..a09c4d8 100644 --- a/src/task.rs +++ b/src/task.rs @@ -117,9 +117,9 @@ pub struct NewTask { } impl NewTask { - pub(crate) fn new(background_task: T, timeout: Duration) -> Result - where - T: BackgroundTask, + pub(crate) fn with_timeout(background_task: T, timeout: Duration) -> Result + where + T: BackgroundTask, { let max_retries = background_task.max_retries(); let uniq_hash = background_task.uniq(); @@ -134,6 +134,13 @@ impl NewTask { max_retries, }) } + + pub(crate) fn new(background_task: T) -> Result + where + T: BackgroundTask, + { + Self::with_timeout(background_task, Duration::from_secs(120)) + } } #[cfg(test)]