Allow to schedule tasks directly with pg connection

This commit is contained in:
Rafael Caricio 2023-03-22 10:50:52 +01:00
parent 7a9eddf9e4
commit 3285647117
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
4 changed files with 34 additions and 5 deletions

View file

@ -7,7 +7,10 @@ use chrono::Duration;
use chrono::Utc; use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::ExpressionMethods; use diesel::ExpressionMethods;
use diesel_async::{pg::AsyncPgConnection, RunQueryDsl}; use diesel::query_builder::{Query, QueryFragment, QueryId};
use diesel::sql_types::{HasSqlType, SingleValue};
use diesel_async::return_futures::GetResult;
use diesel_async::{pg::AsyncPgConnection, AsyncConnection, RunQueryDsl};
impl Task { impl Task {
pub(crate) async fn remove( pub(crate) async fn remove(

View file

@ -25,7 +25,7 @@ where
{ {
// TODO: Add option to specify the timeout of a task // TODO: Add option to specify the timeout of a task
self.task_store self.task_store
.create_task(NewTask::new(background_task, Duration::from_secs(10))?) .create_task(NewTask::with_timeout(background_task, Duration::from_secs(10))?)
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -1,5 +1,7 @@
use crate::errors::AsyncQueueError; use crate::errors::AsyncQueueError;
use crate::task::{NewTask, Task, TaskId, TaskState}; use crate::task::{NewTask, Task, TaskId, TaskState};
use crate::BackgroundTask;
use async_trait::async_trait;
use diesel::result::Error::QueryBuilderError; use diesel::result::Error::QueryBuilderError;
use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection; use diesel_async::AsyncConnection;
@ -17,6 +19,23 @@ impl PgTaskStore {
} }
} }
/// A trait that is used to enqueue tasks for the PostgreSQL backend.
#[async_trait::async_trait]
pub trait PgQueueTask {
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError>;
}
impl<T> PgQueueTask for T
where
T: BackgroundTask,
{
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError> {
let new_task = NewTask::new::<T>(self)?;
Task::insert(connection, new_task).await?;
Ok(())
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl TaskStore for PgTaskStore { impl TaskStore for PgTaskStore {
async fn pull_next_task( async fn pull_next_task(

View file

@ -117,9 +117,9 @@ pub struct NewTask {
} }
impl NewTask { impl NewTask {
pub(crate) fn new<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error> pub(crate) fn with_timeout<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error>
where where
T: BackgroundTask, T: BackgroundTask,
{ {
let max_retries = background_task.max_retries(); let max_retries = background_task.max_retries();
let uniq_hash = background_task.uniq(); let uniq_hash = background_task.uniq();
@ -134,6 +134,13 @@ impl NewTask {
max_retries, max_retries,
}) })
} }
pub(crate) fn new<T>(background_task: T) -> Result<Self, serde_json::Error>
where
T: BackgroundTask,
{
Self::with_timeout(background_task, Duration::from_secs(120))
}
} }
#[cfg(test)] #[cfg(test)]