2023-03-04 19:46:09 +00:00
|
|
|
use crate::errors::AsyncQueueError;
|
2023-03-07 15:41:20 +00:00
|
|
|
use crate::runnable::RunnableTask;
|
|
|
|
use crate::schema::backie_tasks;
|
|
|
|
use crate::task::Task;
|
2023-03-07 16:52:26 +00:00
|
|
|
use crate::task::{NewTask, TaskHash, TaskId, TaskType};
|
2023-03-04 18:07:17 +00:00
|
|
|
use chrono::DateTime;
|
|
|
|
use chrono::Duration;
|
|
|
|
use chrono::Utc;
|
|
|
|
use diesel::prelude::*;
|
|
|
|
use diesel::ExpressionMethods;
|
2023-03-04 19:46:09 +00:00
|
|
|
use diesel_async::{pg::AsyncPgConnection, RunQueryDsl};
|
2023-03-04 18:07:17 +00:00
|
|
|
|
|
|
|
impl Task {
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn remove_all(
|
2023-03-05 00:19:35 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
|
|
|
) -> Result<u64, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
Ok(diesel::delete(backie_tasks::table)
|
2023-03-05 00:19:35 +00:00
|
|
|
.execute(connection)
|
|
|
|
.await? as u64)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn remove_all_scheduled(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
|
|
|
) -> Result<u64, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
let query = backie_tasks::table.filter(backie_tasks::running_at.is_null());
|
2023-03-04 18:07:17 +00:00
|
|
|
Ok(diesel::delete(query).execute(connection).await? as u64)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn remove(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
id: TaskId,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Result<u64, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
let query = backie_tasks::table.filter(backie_tasks::id.eq(id));
|
2023-03-04 18:07:17 +00:00
|
|
|
Ok(diesel::delete(query).execute(connection).await? as u64)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn remove_by_hash(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
task_hash: TaskHash,
|
|
|
|
) -> Result<bool, AsyncQueueError> {
|
|
|
|
let query = backie_tasks::table.filter(backie_tasks::uniq_hash.eq(task_hash));
|
|
|
|
let qty = diesel::delete(query).execute(connection).await?;
|
|
|
|
Ok(qty > 0)
|
2023-03-04 18:07:17 +00:00
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn remove_by_type(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
task_type: TaskType,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Result<u64, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
let query = backie_tasks::table.filter(backie_tasks::task_type.eq(task_type));
|
2023-03-04 18:07:17 +00:00
|
|
|
Ok(diesel::delete(query).execute(connection).await? as u64)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn find_by_id(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
id: TaskId,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Result<Task, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
let task = backie_tasks::table
|
|
|
|
.filter(backie_tasks::id.eq(id))
|
2023-03-04 18:07:17 +00:00
|
|
|
.first::<Task>(connection)
|
|
|
|
.await?;
|
|
|
|
Ok(task)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn fail_with_message(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
id: TaskId,
|
2023-03-04 18:07:17 +00:00
|
|
|
error_message: &str,
|
|
|
|
) -> Result<Task, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
let query = backie_tasks::table.filter(backie_tasks::id.eq(id));
|
|
|
|
Ok(diesel::update(query)
|
2023-03-04 18:07:17 +00:00
|
|
|
.set((
|
2023-03-07 15:41:20 +00:00
|
|
|
backie_tasks::error_message.eq(error_message),
|
|
|
|
backie_tasks::done_at.eq(Utc::now()),
|
2023-03-04 18:07:17 +00:00
|
|
|
))
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn schedule_retry(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
id: TaskId,
|
2023-03-04 18:07:17 +00:00
|
|
|
backoff_seconds: u32,
|
|
|
|
error: &str,
|
|
|
|
) -> Result<Task, AsyncQueueError> {
|
2023-03-07 15:41:20 +00:00
|
|
|
use crate::schema::backie_tasks::dsl;
|
|
|
|
|
2023-03-04 18:07:17 +00:00
|
|
|
let now = Utc::now();
|
|
|
|
let scheduled_at = now + Duration::seconds(backoff_seconds as i64);
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
let task = diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id)))
|
2023-03-04 18:07:17 +00:00
|
|
|
.set((
|
2023-03-07 15:41:20 +00:00
|
|
|
backie_tasks::error_message.eq(error),
|
|
|
|
backie_tasks::retries.eq(dsl::retries + 1),
|
|
|
|
backie_tasks::created_at.eq(scheduled_at),
|
|
|
|
backie_tasks::running_at.eq::<Option<DateTime<Utc>>>(None),
|
2023-03-04 18:07:17 +00:00
|
|
|
))
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(task)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn fetch_next_pending(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
task_type: TaskType,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Option<Task> {
|
2023-03-07 15:41:20 +00:00
|
|
|
backie_tasks::table
|
|
|
|
.filter(backie_tasks::created_at.lt(Utc::now())) // skip tasks scheduled for the future
|
|
|
|
.order(backie_tasks::created_at.asc()) // get the oldest task first
|
|
|
|
.filter(backie_tasks::running_at.is_null()) // that is not marked as running already
|
|
|
|
.filter(backie_tasks::done_at.is_null()) // and not marked as done
|
|
|
|
.filter(backie_tasks::task_type.eq(task_type))
|
2023-03-04 18:07:17 +00:00
|
|
|
.limit(1)
|
|
|
|
.for_update()
|
|
|
|
.skip_locked()
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await
|
|
|
|
.ok()
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn set_running(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
|
|
|
task: Task,
|
|
|
|
) -> Result<Task, AsyncQueueError> {
|
|
|
|
Ok(diesel::update(&task)
|
2023-03-07 16:52:26 +00:00
|
|
|
.set((backie_tasks::running_at.eq(Utc::now()),))
|
2023-03-04 18:07:17 +00:00
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn set_done(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
id: TaskId,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Result<Task, AsyncQueueError> {
|
2023-03-07 16:52:26 +00:00
|
|
|
Ok(
|
|
|
|
diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id)))
|
|
|
|
.set((backie_tasks::done_at.eq(Utc::now()),))
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?,
|
|
|
|
)
|
2023-03-07 15:41:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn insert(
|
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 16:52:26 +00:00
|
|
|
runnable: &dyn RunnableTask,
|
2023-03-07 15:41:20 +00:00
|
|
|
) -> Result<Task, AsyncQueueError> {
|
|
|
|
let payload = serde_json::to_value(runnable)?;
|
|
|
|
match runnable.uniq() {
|
|
|
|
None => {
|
|
|
|
let new_task = NewTask::builder()
|
|
|
|
.uniq_hash(None)
|
|
|
|
.task_type(runnable.task_type())
|
|
|
|
.payload(payload)
|
|
|
|
.build();
|
|
|
|
|
|
|
|
Ok(diesel::insert_into(backie_tasks::table)
|
|
|
|
.values(new_task)
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?)
|
|
|
|
}
|
2023-03-07 16:52:26 +00:00
|
|
|
Some(hash) => match Self::find_by_uniq_hash(connection, hash.clone()).await {
|
|
|
|
Some(task) => Ok(task),
|
|
|
|
None => {
|
|
|
|
let new_task = NewTask::builder()
|
|
|
|
.uniq_hash(Some(hash))
|
|
|
|
.task_type(runnable.task_type())
|
|
|
|
.payload(payload)
|
|
|
|
.build();
|
|
|
|
|
|
|
|
Ok(diesel::insert_into(backie_tasks::table)
|
|
|
|
.values(new_task)
|
|
|
|
.get_result::<Task>(connection)
|
|
|
|
.await?)
|
2023-03-04 18:07:17 +00:00
|
|
|
}
|
2023-03-07 16:52:26 +00:00
|
|
|
},
|
2023-03-04 18:07:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-07 15:41:20 +00:00
|
|
|
pub(crate) async fn find_by_uniq_hash(
|
2023-03-04 18:07:17 +00:00
|
|
|
connection: &mut AsyncPgConnection,
|
2023-03-07 15:41:20 +00:00
|
|
|
hash: TaskHash,
|
2023-03-04 18:07:17 +00:00
|
|
|
) -> Option<Task> {
|
2023-03-07 15:41:20 +00:00
|
|
|
backie_tasks::table
|
|
|
|
.filter(backie_tasks::uniq_hash.eq(hash))
|
2023-03-04 18:07:17 +00:00
|
|
|
.first::<Task>(connection)
|
|
|
|
.await
|
|
|
|
.ok()
|
|
|
|
}
|
|
|
|
}
|