backie/src/queries.rs

117 lines
3.8 KiB
Rust
Raw Normal View History

2023-03-04 19:46:09 +00:00
use crate::errors::AsyncQueueError;
use crate::schema::backie_tasks;
use crate::task::Task;
2023-03-11 15:38:32 +00:00
use crate::task::{NewTask, TaskId};
2023-03-04 18:07:17 +00:00
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use diesel::prelude::*;
use diesel::ExpressionMethods;
2023-03-04 19:46:09 +00:00
use diesel_async::{pg::AsyncPgConnection, RunQueryDsl};
2023-03-04 18:07:17 +00:00
impl Task {
pub(crate) async fn remove(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
id: TaskId,
2023-03-04 18:07:17 +00:00
) -> Result<u64, AsyncQueueError> {
let query = backie_tasks::table.filter(backie_tasks::id.eq(id));
2023-03-04 18:07:17 +00:00
Ok(diesel::delete(query).execute(connection).await? as u64)
}
pub(crate) async fn fail_with_message(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
id: TaskId,
2023-03-04 18:07:17 +00:00
error_message: &str,
) -> Result<Task, AsyncQueueError> {
let error = serde_json::json!({
"error": error_message,
});
let query = backie_tasks::table.filter(backie_tasks::id.eq(id));
Ok(diesel::update(query)
2023-03-04 18:07:17 +00:00
.set((
backie_tasks::error_info.eq(Some(error)),
backie_tasks::done_at.eq(Utc::now()),
2023-03-04 18:07:17 +00:00
))
.get_result::<Task>(connection)
.await?)
}
pub(crate) async fn schedule_retry(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
id: TaskId,
2023-03-04 18:07:17 +00:00
backoff_seconds: u32,
error_message: &str,
2023-03-04 18:07:17 +00:00
) -> Result<Task, AsyncQueueError> {
use crate::schema::backie_tasks::dsl;
let error = serde_json::json!({
"error": error_message,
});
let task = diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id)))
2023-03-04 18:07:17 +00:00
.set((
backie_tasks::error_info.eq(Some(error)),
backie_tasks::retries.eq(dsl::retries + 1),
2023-03-11 16:49:23 +00:00
backie_tasks::scheduled_at
.eq(Utc::now() + Duration::seconds(backoff_seconds as i64)),
backie_tasks::running_at.eq::<Option<DateTime<Utc>>>(None),
2023-03-04 18:07:17 +00:00
))
.get_result::<Task>(connection)
.await?;
Ok(task)
}
pub(crate) async fn fetch_next_pending(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
queue_name: &str,
2023-03-11 21:33:25 +00:00
task_names: &[String],
2023-03-04 18:07:17 +00:00
) -> Option<Task> {
backie_tasks::table
2023-03-11 21:22:25 +00:00
.filter(backie_tasks::task_name.eq_any(task_names))
.filter(backie_tasks::scheduled_at.lt(Utc::now())) // skip tasks scheduled for the future
.order(backie_tasks::created_at.asc()) // get the oldest task first
.filter(backie_tasks::running_at.is_null()) // that is not marked as running already
.filter(backie_tasks::done_at.is_null()) // and not marked as done
.filter(backie_tasks::queue_name.eq(queue_name))
2023-03-04 18:07:17 +00:00
.limit(1)
.for_update()
.skip_locked()
.get_result::<Task>(connection)
.await
.ok()
}
pub(crate) async fn set_running(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
task: Task,
) -> Result<Task, AsyncQueueError> {
Ok(diesel::update(&task)
2023-03-07 16:52:26 +00:00
.set((backie_tasks::running_at.eq(Utc::now()),))
2023-03-04 18:07:17 +00:00
.get_result::<Task>(connection)
.await?)
}
pub(crate) async fn set_done(
2023-03-04 18:07:17 +00:00
connection: &mut AsyncPgConnection,
id: TaskId,
2023-03-04 18:07:17 +00:00
) -> Result<Task, AsyncQueueError> {
2023-03-07 16:52:26 +00:00
Ok(
diesel::update(backie_tasks::table.filter(backie_tasks::id.eq(id)))
.set((backie_tasks::done_at.eq(Utc::now()),))
.get_result::<Task>(connection)
.await?,
)
}
pub(crate) async fn insert(
connection: &mut AsyncPgConnection,
new_task: NewTask,
) -> Result<Task, AsyncQueueError> {
Ok(diesel::insert_into(backie_tasks::table)
.values(new_task)
.get_result::<Task>(connection)
.await?)
2023-03-04 18:07:17 +00:00
}
}