remove all scheduled tasks, necessary for El Monitorro bot

This commit is contained in:
pxp9 2022-09-01 01:01:15 +02:00
parent b975e893e2
commit 553f5f7de1
No known key found for this signature in database
GPG key ID: 2DC73C49C2A9A6D4
3 changed files with 171 additions and 0 deletions

View file

@ -27,6 +27,8 @@ const INSERT_TASK_UNIQ_QUERY: &str = include_str!("queries/insert_task_uniq.sql"
const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql"); const UPDATE_TASK_STATE_QUERY: &str = include_str!("queries/update_task_state.sql");
const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql"); const FAIL_TASK_QUERY: &str = include_str!("queries/fail_task.sql");
const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql"); const REMOVE_ALL_TASK_QUERY: &str = include_str!("queries/remove_all_tasks.sql");
const REMOVE_ALL_SCHEDULED_TASK_QUERY: &str =
include_str!("queries/remove_all_scheduled_tasks.sql");
const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql"); const REMOVE_TASK_QUERY: &str = include_str!("queries/remove_task.sql");
const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql"); const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sql");
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
@ -113,6 +115,8 @@ pub trait AsyncQueueable: Send {
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>; async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>; async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError>;
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>; async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;
@ -249,6 +253,12 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await AsyncQueue::<NoTls>::remove_all_tasks_query(transaction).await
} }
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction;
AsyncQueue::<NoTls>::remove_all_scheduled_tasks_query(transaction).await
}
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> { async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
let transaction = &mut self.transaction; let transaction = &mut self.transaction;
@ -314,6 +324,18 @@ where
Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await Self::execute_query(transaction, REMOVE_ALL_TASK_QUERY, &[], None).await
} }
async fn remove_all_scheduled_tasks_query(
transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> {
Self::execute_query(
transaction,
REMOVE_ALL_SCHEDULED_TASK_QUERY,
&[&Utc::now()],
None,
)
.await
}
async fn remove_task_query( async fn remove_task_query(
transaction: &mut Transaction<'_>, transaction: &mut Transaction<'_>,
task: Task, task: Task,
@ -637,6 +659,18 @@ where
Ok(result) Ok(result)
} }
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?;
let mut transaction = connection.transaction().await?;
let result = Self::remove_all_scheduled_tasks_query(&mut transaction).await?;
transaction.commit().await?;
Ok(result)
}
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> { async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
self.check_if_connection()?; self.check_if_connection()?;
let mut connection = self.pool.as_ref().unwrap().get().await?; let mut connection = self.pool.as_ref().unwrap().get().await?;
@ -876,6 +910,34 @@ mod async_queue_tests {
assert_eq!(task.scheduled_at, datetime); assert_eq!(task.scheduled_at, datetime);
} }
#[tokio::test]
async fn remove_all_scheduled_tasks_test() {
let pool = pool().await;
let mut connection = pool.get().await.unwrap();
let transaction = connection.transaction().await.unwrap();
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
let task1 = &AsyncTaskSchedule {
number: 1,
datetime: datetime.to_string(),
};
let task2 = &AsyncTaskSchedule {
number: 2,
datetime: datetime.to_string(),
};
test.schedule_task(task1).await.unwrap();
test.schedule_task(task2).await.unwrap();
let number = test.remove_all_scheduled_tasks().await.unwrap();
assert_eq!(2, number);
}
#[tokio::test] #[tokio::test]
async fn fetch_and_touch_test() { async fn fetch_and_touch_test() {
let pool = pool().await; let pool = pool().await;

View file

@ -0,0 +1 @@
DELETE FROM "fang_tasks" WHERE scheduled_at > $1

View file

@ -86,6 +86,8 @@ pub trait Queueable {
fn remove_all_tasks(&self) -> Result<usize, QueueError>; fn remove_all_tasks(&self) -> Result<usize, QueueError>;
fn remove_all_scheduled_tasks(&self) -> Result<usize, QueueError>;
fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, QueueError>; fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, QueueError>;
fn remove_task(&self, id: Uuid) -> Result<usize, QueueError>; fn remove_task(&self, id: Uuid) -> Result<usize, QueueError>;
@ -122,6 +124,13 @@ impl Queueable for Queue {
Self::schedule_task_query(&connection, params) Self::schedule_task_query(&connection, params)
} }
fn remove_all_scheduled_tasks(&self) -> Result<usize, QueueError> {
let connection = self.get_connection()?;
Self::remove_all_scheduled_tasks_query(&connection)
}
fn remove_all_tasks(&self) -> Result<usize, QueueError> { fn remove_all_tasks(&self) -> Result<usize, QueueError> {
let connection = self.get_connection()?; let connection = self.get_connection()?;
@ -278,6 +287,14 @@ impl Queue {
Ok(diesel::delete(fang_tasks::table).execute(connection)?) Ok(diesel::delete(fang_tasks::table).execute(connection)?)
} }
pub fn remove_all_scheduled_tasks_query(
connection: &PgConnection,
) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now()));
Ok(diesel::delete(query).execute(connection)?)
}
pub fn remove_tasks_of_type_query( pub fn remove_tasks_of_type_query(
connection: &PgConnection, connection: &PgConnection,
task_type: &str, task_type: &str,
@ -365,11 +382,15 @@ impl Queue {
mod queue_tests { mod queue_tests {
use super::Queue; use super::Queue;
use super::Queueable; use super::Queueable;
use crate::chrono::SubsecRound;
use crate::runnable::Runnable; use crate::runnable::Runnable;
use crate::runnable::COMMON_TYPE; use crate::runnable::COMMON_TYPE;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use crate::typetag; use crate::typetag;
use crate::FangError; use crate::FangError;
use crate::Scheduled;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc; use chrono::Utc;
use diesel::connection::Connection; use diesel::connection::Connection;
use diesel::result::Error; use diesel::result::Error;
@ -413,6 +434,33 @@ mod queue_tests {
} }
} }
#[derive(Serialize, Deserialize)]
struct ScheduledPepeTask {
pub number: u16,
pub datetime: String,
}
#[typetag::serde]
impl Runnable for ScheduledPepeTask {
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
println!("the number is {}", self.number);
Ok(())
}
fn uniq(&self) -> bool {
true
}
fn task_type(&self) -> String {
"scheduled".to_string()
}
fn cron(&self) -> Option<Scheduled> {
let datetime = self.datetime.parse::<DateTime<Utc>>().ok()?;
Some(Scheduled::ScheduleOnce(datetime))
}
}
#[test] #[test]
fn insert_inserts_task() { fn insert_inserts_task() {
let task = PepeTask { number: 10 }; let task = PepeTask { number: 10 };
@ -595,6 +643,66 @@ mod queue_tests {
}); });
} }
#[test]
fn schedule_task_test() {
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
let task = &ScheduledPepeTask {
number: 10,
datetime: datetime.to_string(),
};
let task = Queue::schedule_task_query(&queue_pooled_connection, task).unwrap();
let metadata = task.metadata.as_object().unwrap();
let number = metadata["number"].as_u64();
let type_task = metadata["type"].as_str();
assert_eq!(Some(10), number);
assert_eq!(Some("ScheduledPepeTask"), type_task);
assert_eq!(task.scheduled_at, datetime);
Ok(())
});
}
#[test]
fn remove_all_scheduled_tasks_test() {
let pool = Queue::connection_pool(5);
let queue = Queue::builder().connection_pool(pool).build();
let queue_pooled_connection = queue.connection_pool.get().unwrap();
queue_pooled_connection.test_transaction::<(), Error, _>(|| {
let datetime = (Utc::now() + Duration::seconds(7)).round_subsecs(0);
let task1 = &ScheduledPepeTask {
number: 10,
datetime: datetime.to_string(),
};
let task2 = &ScheduledPepeTask {
number: 11,
datetime: datetime.to_string(),
};
Queue::schedule_task_query(&queue_pooled_connection, task1).unwrap();
Queue::schedule_task_query(&queue_pooled_connection, task2).unwrap();
let number = Queue::remove_all_scheduled_tasks_query(&queue_pooled_connection).unwrap();
assert_eq!(2, number);
Ok(())
});
}
#[test] #[test]
fn remove_all_tasks_test() { fn remove_all_tasks_test() {
let task1 = PepeTask { number: 10 }; let task1 = PepeTask { number: 10 };