Sleep with Duration (#67)
* Sleep in millis * SleepParams Duration :D * Readme updated! * Changing Schedule and SleepParams to std::time::Duration * Millis period in Periodic tasks * Readme! * Error management * period_in_millis i64 * fix clippy
This commit is contained in:
parent
32b12182e0
commit
1dc513c4a7
12 changed files with 122 additions and 109 deletions
38
README.md
38
README.md
|
@ -315,10 +315,10 @@ You can use use `SleepParams` to confugure sleep values:
|
|||
|
||||
```rust
|
||||
pub struct SleepParams {
|
||||
pub sleep_period: u64, \\ default value is 5
|
||||
pub max_sleep_period: u64, \\ default value is 15
|
||||
pub min_sleep_period: u64, \\ default value is 5
|
||||
pub sleep_step: u64, \\ default value is 5
|
||||
pub sleep_period: Duration, \\ default value is 5 seconds
|
||||
pub max_sleep_period: Duration, \\ default value is 15 seconds
|
||||
pub min_sleep_period: Duration, \\ default value is 5 seconds
|
||||
pub sleep_step: Duration, \\ default value is 5 seconds
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -328,10 +328,10 @@ If there are no tasks in the DB, a worker sleeps for `sleep_period` and each tim
|
|||
Use `set_sleep_params` to set it:
|
||||
```rust
|
||||
let sleep_params = SleepParams {
|
||||
sleep_period: 2,
|
||||
max_sleep_period: 6,
|
||||
min_sleep_period: 2,
|
||||
sleep_step: 1,
|
||||
sleep_period: Duration::from_secs(2),
|
||||
max_sleep_period: Duration::from_secs(6),
|
||||
min_sleep_period: Duration::from_secs(2),
|
||||
sleep_step: Duration::from_secs(1),
|
||||
};
|
||||
let mut worker_params = WorkerParams::new();
|
||||
worker_params.set_sleep_params(sleep_params);
|
||||
|
@ -357,27 +357,29 @@ use fang::Queue;
|
|||
let queue = Queue::new();
|
||||
|
||||
queue
|
||||
.push_periodic_task(&SyncMyTask::default(), 120)
|
||||
.push_periodic_task(&SyncMyTask::default(), 120000)
|
||||
.unwrap();
|
||||
|
||||
queue
|
||||
.push_periodic_task(&DeliverMyTask::default(), 60)
|
||||
.push_periodic_task(&DeliverMyTask::default(), 60000)
|
||||
.unwrap();
|
||||
|
||||
Scheduler::start(10, 5);
|
||||
Scheduler::start(Duration::from_secs(10), Duration::from_secs(5));
|
||||
```
|
||||
|
||||
In the example above, `push_periodic_task` is used to save the specified task to the `fang_periodic_tasks` table which will be enqueued (saved to `fang_tasks` table) every specied number of seconds.
|
||||
In the example above, `push_periodic_task` is used to save the specified task to the `fang_periodic_tasks` table which will be enqueued (saved to `fang_tasks` table) every specied number of milliseconds.
|
||||
|
||||
`Scheduler::start(10, 5)` starts scheduler. It accepts two parameters:
|
||||
- Db check period in seconds
|
||||
- Acceptable error limit in seconds - |current_time - scheduled_time| < error
|
||||
`Scheduler::start(Duration::from_secs(10), Duration::from_secs(5))` starts scheduler. It accepts two parameters:
|
||||
- Db check period
|
||||
- Acceptable error limit - |current_time - scheduled_time| < error
|
||||
|
||||
#### Asynk feature
|
||||
```rust
|
||||
use fang::asynk::async_scheduler::Scheduler;
|
||||
use fang::asynk::async_queue::AsyncQueueable;
|
||||
use fang::asynk::async_queue::AsyncQueue;
|
||||
use std::time::Duration;
|
||||
use chrono::Duration as OtherDuration;
|
||||
|
||||
// Build a AsyncQueue as before
|
||||
|
||||
|
@ -386,7 +388,7 @@ let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
|
|||
let _periodic_task = queue.insert_periodic_task(
|
||||
&AsyncTask { number: 1 },
|
||||
schedule_in_future,
|
||||
10,
|
||||
10000, // period in milliseconds
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -394,8 +396,8 @@ let check_period: u64 = 1;
|
|||
let error_margin_seconds: u64 = 2;
|
||||
|
||||
let mut scheduler = Scheduler::builder()
|
||||
.check_period(check_period)
|
||||
.error_margin_seconds(error_margin_seconds)
|
||||
.check_period(Duration::from_secs(check_period))
|
||||
.error_margin_seconds(Duration::from_secs(error_margin_seconds))
|
||||
.queue(&mut queue as &mut dyn AsyncQueueable)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
CREATE TABLE fang_periodic_tasks (
|
||||
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
metadata jsonb NOT NULL,
|
||||
period_in_seconds INTEGER NOT NULL,
|
||||
period_in_millis BIGINT NOT NULL,
|
||||
scheduled_at TIMESTAMP WITH TIME ZONE,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||
|
|
|
@ -12,6 +12,7 @@ use chrono::DateTime;
|
|||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use postgres_types::{FromSql, ToSql};
|
||||
use std::time::Duration as StdDuration;
|
||||
use thiserror::Error;
|
||||
use typed_builder::TypedBuilder;
|
||||
use uuid::Uuid;
|
||||
|
@ -82,7 +83,7 @@ pub struct PeriodicTask {
|
|||
#[builder(setter(into))]
|
||||
pub metadata: serde_json::Value,
|
||||
#[builder(setter(into))]
|
||||
pub period_in_seconds: i32,
|
||||
pub period_in_millis: i64,
|
||||
#[builder(setter(into))]
|
||||
pub scheduled_at: Option<DateTime<Utc>>,
|
||||
#[builder(setter(into))]
|
||||
|
@ -105,6 +106,8 @@ pub enum AsyncQueueError {
|
|||
"AsyncQueue is not connected :( , call connect() method first and then perform operations"
|
||||
)]
|
||||
NotConnectedError,
|
||||
#[error("Can not convert `std::time::Duration` to `chrono::Duration`")]
|
||||
TimeError,
|
||||
}
|
||||
|
||||
impl From<AsyncQueueError> for FangError {
|
||||
|
@ -141,14 +144,14 @@ pub trait AsyncQueueable: Send {
|
|||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin_seconds: i64,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError>;
|
||||
|
||||
async fn insert_periodic_task(
|
||||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError>;
|
||||
|
||||
async fn schedule_next_task(
|
||||
|
@ -258,7 +261,7 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
|
@ -277,13 +280,12 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin_seconds: i64,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
let periodic_task =
|
||||
AsyncQueue::<NoTls>::fetch_periodic_tasks_query(transaction, error_margin_seconds)
|
||||
.await?;
|
||||
AsyncQueue::<NoTls>::fetch_periodic_tasks_query(transaction, error_margin).await?;
|
||||
|
||||
Ok(periodic_task)
|
||||
}
|
||||
|
@ -472,7 +474,7 @@ where
|
|||
periodic_task: PeriodicTask,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let updated_at = Utc::now();
|
||||
let scheduled_at = updated_at + Duration::seconds(periodic_task.period_in_seconds.into());
|
||||
let scheduled_at = updated_at + Duration::milliseconds(periodic_task.period_in_millis);
|
||||
|
||||
let row: Row = transaction
|
||||
.query_one(SCHEDULE_NEXT_TASK_QUERY, &[&scheduled_at, &updated_at])
|
||||
|
@ -486,7 +488,7 @@ where
|
|||
transaction: &mut Transaction<'_>,
|
||||
metadata: serde_json::Value,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
let row: Row = transaction
|
||||
.query_one(
|
||||
|
@ -500,12 +502,17 @@ where
|
|||
|
||||
async fn fetch_periodic_tasks_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
error_margin_seconds: i64,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
let current_time = Utc::now();
|
||||
|
||||
let low_limit = current_time - Duration::seconds(error_margin_seconds);
|
||||
let high_limit = current_time + Duration::seconds(error_margin_seconds);
|
||||
let margin: Duration = match Duration::from_std(error_margin) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(_) => Err(AsyncQueueError::TimeError),
|
||||
}?;
|
||||
|
||||
let low_limit = current_time - margin;
|
||||
let high_limit = current_time + margin;
|
||||
let rows: Vec<Row> = transaction
|
||||
.query(FETCH_PERIODIC_TASKS_QUERY, &[&low_limit, &high_limit])
|
||||
.await?;
|
||||
|
@ -569,7 +576,7 @@ where
|
|||
fn row_to_periodic_task(row: Row) -> PeriodicTask {
|
||||
let id: Uuid = row.get("id");
|
||||
let metadata: serde_json::Value = row.get("metadata");
|
||||
let period_in_seconds: i32 = row.get("period_in_seconds");
|
||||
let period_in_millis: i64 = row.get("period_in_millis");
|
||||
let scheduled_at: Option<DateTime<Utc>> = match row.try_get("scheduled_at") {
|
||||
Ok(datetime) => Some(datetime),
|
||||
Err(_) => None,
|
||||
|
@ -580,7 +587,7 @@ where
|
|||
PeriodicTask::builder()
|
||||
.id(id)
|
||||
.metadata(metadata)
|
||||
.period_in_seconds(period_in_seconds)
|
||||
.period_in_millis(period_in_millis)
|
||||
.scheduled_at(scheduled_at)
|
||||
.created_at(created_at)
|
||||
.updated_at(updated_at)
|
||||
|
@ -657,7 +664,7 @@ where
|
|||
&mut self,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
|
@ -690,14 +697,14 @@ where
|
|||
|
||||
async fn fetch_periodic_tasks(
|
||||
&mut self,
|
||||
error_margin_seconds: i64,
|
||||
error_margin: StdDuration,
|
||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let periodic_task =
|
||||
Self::fetch_periodic_tasks_query(&mut transaction, error_margin_seconds).await?;
|
||||
Self::fetch_periodic_tasks_query(&mut transaction, error_margin).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
|
|
|
@ -15,9 +15,9 @@ where
|
|||
AQueue: AsyncQueueable + Clone + Sync + 'static,
|
||||
{
|
||||
#[builder(setter(into))]
|
||||
pub check_period: u64,
|
||||
pub check_period: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub error_margin_seconds: u64,
|
||||
pub error_margin: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub queue: AQueue,
|
||||
#[builder(default = 0, setter(into))]
|
||||
|
@ -66,7 +66,7 @@ where
|
|||
pub async fn schedule_loop(&mut self) -> JoinHandle<Result<(), Error>> {
|
||||
let mut scheduler = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let sleep_duration = Duration::from_secs(scheduler.check_period);
|
||||
let sleep_duration = scheduler.check_period;
|
||||
|
||||
loop {
|
||||
scheduler.schedule().await?;
|
||||
|
@ -77,11 +77,7 @@ where
|
|||
}
|
||||
|
||||
pub async fn schedule(&mut self) -> Result<(), Error> {
|
||||
if let Some(tasks) = self
|
||||
.queue
|
||||
.fetch_periodic_tasks(self.error_margin_seconds as i64)
|
||||
.await?
|
||||
{
|
||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
||||
for task in tasks {
|
||||
self.process_task(task).await?;
|
||||
}
|
||||
|
@ -111,9 +107,9 @@ where
|
|||
#[derive(TypedBuilder)]
|
||||
pub struct SchedulerTest<'a> {
|
||||
#[builder(setter(into))]
|
||||
pub check_period: u64,
|
||||
pub check_period: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub error_margin_seconds: u64,
|
||||
pub error_margin: Duration,
|
||||
#[builder(setter(into))]
|
||||
pub queue: &'a mut dyn AsyncQueueable,
|
||||
#[builder(default = 0, setter(into))]
|
||||
|
@ -123,14 +119,10 @@ pub struct SchedulerTest<'a> {
|
|||
#[cfg(test)]
|
||||
impl<'a> SchedulerTest<'a> {
|
||||
async fn schedule_test(&mut self) -> Result<(), Error> {
|
||||
let sleep_duration = Duration::from_secs(self.check_period);
|
||||
let sleep_duration = self.check_period;
|
||||
|
||||
loop {
|
||||
match self
|
||||
.queue
|
||||
.fetch_periodic_tasks(self.error_margin_seconds as i64)
|
||||
.await?
|
||||
{
|
||||
match self.queue.fetch_periodic_tasks(self.error_margin).await? {
|
||||
Some(tasks) => {
|
||||
for task in tasks {
|
||||
self.process_task(task).await?;
|
||||
|
@ -179,6 +171,7 @@ mod async_scheduler_tests {
|
|||
use chrono::Duration as OtherDuration;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AsyncScheduledTask {
|
||||
|
@ -210,7 +203,7 @@ mod async_scheduler_tests {
|
|||
&mut test,
|
||||
&AsyncScheduledTask { number: 1 },
|
||||
schedule_in_future,
|
||||
10,
|
||||
10000,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -218,8 +211,8 @@ mod async_scheduler_tests {
|
|||
let error_margin_seconds: u64 = 2;
|
||||
|
||||
let mut scheduler = SchedulerTest::builder()
|
||||
.check_period(check_period)
|
||||
.error_margin_seconds(error_margin_seconds)
|
||||
.check_period(Duration::from_secs(check_period))
|
||||
.error_margin(Duration::from_secs(error_margin_seconds))
|
||||
.queue(&mut test as &mut dyn AsyncQueueable)
|
||||
.build();
|
||||
// Scheduler start tricky not loop :)
|
||||
|
@ -248,9 +241,9 @@ mod async_scheduler_tests {
|
|||
test: &mut AsyncQueueTest<'_>,
|
||||
task: &dyn AsyncRunnable,
|
||||
timestamp: DateTime<Utc>,
|
||||
period_in_seconds: i32,
|
||||
period_in_millis: i64,
|
||||
) -> PeriodicTask {
|
||||
test.insert_periodic_task(task, timestamp, period_in_seconds)
|
||||
test.insert_periodic_task(task, timestamp, period_in_millis)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ use crate::asynk::async_runnable::AsyncRunnable;
|
|||
use crate::asynk::AsyncError as Error;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
use log::error;
|
||||
use std::time::Duration;
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
#[derive(TypedBuilder)]
|
||||
|
@ -84,7 +83,7 @@ where
|
|||
pub async fn sleep(&mut self) {
|
||||
self.sleep_params.maybe_increase_sleep_period();
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await;
|
||||
tokio::time::sleep(self.sleep_params.sleep_period).await;
|
||||
}
|
||||
|
||||
pub async fn run_tasks(&mut self) -> Result<(), Error> {
|
||||
|
@ -183,8 +182,9 @@ impl<'a> AsyncWorkerTest<'a> {
|
|||
pub async fn sleep(&mut self) {
|
||||
self.sleep_params.maybe_increase_sleep_period();
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await;
|
||||
tokio::time::sleep(self.sleep_params.sleep_period).await;
|
||||
}
|
||||
|
||||
pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self
|
||||
|
|
|
@ -1 +1 @@
|
|||
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_seconds") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at
|
||||
INSERT INTO "fang_periodic_tasks" ("metadata", "scheduled_at", "period_in_millis") VALUES ($1, $2, $3) RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
||||
|
|
|
@ -1 +1 @@
|
|||
UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_seconds , scheduled_at , created_at , updated_at
|
||||
UPDATE "fang_periodic_tasks" SET "scheduled_at" = $1 , "updated_at" = $2 RETURNING id , metadata , period_in_millis , scheduled_at , created_at , updated_at
|
||||
|
|
|
@ -7,7 +7,6 @@ use diesel::pg::PgConnection;
|
|||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
||||
use log::error;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct Executor {
|
||||
pub pooled_connection: PooledConnection<ConnectionManager<PgConnection>>,
|
||||
|
@ -97,7 +96,7 @@ impl Executor {
|
|||
pub fn sleep(&mut self) {
|
||||
self.sleep_params.maybe_increase_sleep_period();
|
||||
|
||||
thread::sleep(Duration::from_secs(self.sleep_params.sleep_period));
|
||||
thread::sleep(self.sleep_params.sleep_period);
|
||||
}
|
||||
|
||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||
|
|
|
@ -11,6 +11,7 @@ use diesel::r2d2;
|
|||
use diesel::result::Error;
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use std::time::Duration as StdDuration;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)]
|
||||
|
@ -30,7 +31,7 @@ pub struct Task {
|
|||
pub struct PeriodicTask {
|
||||
pub id: Uuid,
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
pub period_in_millis: i64,
|
||||
pub scheduled_at: Option<DateTime<Utc>>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
|
@ -47,7 +48,7 @@ pub struct NewTask {
|
|||
#[table_name = "fang_periodic_tasks"]
|
||||
pub struct NewPeriodicTask {
|
||||
pub metadata: serde_json::Value,
|
||||
pub period_in_seconds: i32,
|
||||
pub period_in_millis: i64,
|
||||
}
|
||||
|
||||
pub struct Queue {
|
||||
|
@ -99,7 +100,7 @@ impl Queue {
|
|||
pub fn push_periodic_task(
|
||||
&self,
|
||||
task: &dyn Runnable,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, Error> {
|
||||
Self::push_periodic_task_query(&self.connection, task, period)
|
||||
}
|
||||
|
@ -107,7 +108,7 @@ impl Queue {
|
|||
pub fn push_periodic_task_query(
|
||||
connection: &PgConnection,
|
||||
task: &dyn Runnable,
|
||||
period: i32,
|
||||
period: i64,
|
||||
) -> Result<PeriodicTask, Error> {
|
||||
let json_task = serde_json::to_value(task).unwrap();
|
||||
|
||||
|
@ -116,7 +117,7 @@ impl Queue {
|
|||
None => {
|
||||
let new_task = NewPeriodicTask {
|
||||
metadata: json_task,
|
||||
period_in_seconds: period,
|
||||
period_in_millis: period,
|
||||
};
|
||||
|
||||
diesel::insert_into(fang_periodic_tasks::table)
|
||||
|
@ -198,18 +199,20 @@ impl Queue {
|
|||
.ok()
|
||||
}
|
||||
|
||||
pub fn fetch_periodic_tasks(&self, error_margin_seconds: i64) -> Option<Vec<PeriodicTask>> {
|
||||
Self::fetch_periodic_tasks_query(&self.connection, error_margin_seconds)
|
||||
pub fn fetch_periodic_tasks(&self, error_margin: StdDuration) -> Option<Vec<PeriodicTask>> {
|
||||
Self::fetch_periodic_tasks_query(&self.connection, error_margin)
|
||||
}
|
||||
|
||||
pub fn fetch_periodic_tasks_query(
|
||||
connection: &PgConnection,
|
||||
error_margin_seconds: i64,
|
||||
error_margin: StdDuration,
|
||||
) -> Option<Vec<PeriodicTask>> {
|
||||
let current_time = Self::current_time();
|
||||
|
||||
let low_limit = current_time - Duration::seconds(error_margin_seconds);
|
||||
let high_limit = current_time + Duration::seconds(error_margin_seconds);
|
||||
let margin = Duration::from_std(error_margin).unwrap();
|
||||
|
||||
let low_limit = current_time - margin;
|
||||
let high_limit = current_time + margin;
|
||||
|
||||
fang_periodic_tasks::table
|
||||
.filter(
|
||||
|
@ -224,7 +227,7 @@ impl Queue {
|
|||
|
||||
pub fn schedule_next_task_execution(&self, task: &PeriodicTask) -> Result<PeriodicTask, Error> {
|
||||
let current_time = Self::current_time();
|
||||
let scheduled_at = current_time + Duration::seconds(task.period_in_seconds.into());
|
||||
let scheduled_at = current_time + Duration::milliseconds(task.period_in_millis);
|
||||
|
||||
diesel::update(task)
|
||||
.set((
|
||||
|
@ -415,6 +418,7 @@ mod queue_tests {
|
|||
use diesel::prelude::*;
|
||||
use diesel::result::Error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration as StdDuration;
|
||||
|
||||
#[test]
|
||||
fn insert_inserts_task() {
|
||||
|
@ -560,9 +564,9 @@ mod queue_tests {
|
|||
|
||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||
let task = PepeTask { number: 10 };
|
||||
let task = queue.push_periodic_task(&task, 60).unwrap();
|
||||
let task = queue.push_periodic_task(&task, 60_i64).unwrap();
|
||||
|
||||
assert_eq!(task.period_in_seconds, 60);
|
||||
assert_eq!(task.period_in_millis, 60_i64);
|
||||
assert!(queue.find_periodic_task_by_id(task.id).is_some());
|
||||
|
||||
Ok(())
|
||||
|
@ -602,7 +606,9 @@ mod queue_tests {
|
|||
&queue.connection,
|
||||
);
|
||||
|
||||
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
||||
let tasks = queue
|
||||
.fetch_periodic_tasks(StdDuration::from_secs(100))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].id, task.id);
|
||||
|
@ -616,13 +622,17 @@ mod queue_tests {
|
|||
let queue = Queue::new();
|
||||
|
||||
queue.connection.test_transaction::<(), Error, _>(|| {
|
||||
let task =
|
||||
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||
let task = insert_periodic_task(
|
||||
serde_json::json!(true),
|
||||
Utc::now(),
|
||||
100000,
|
||||
&queue.connection,
|
||||
);
|
||||
|
||||
let updated_task = queue.schedule_next_task_execution(&task).unwrap();
|
||||
|
||||
let next_schedule = (task.scheduled_at.unwrap()
|
||||
+ Duration::seconds(task.period_in_seconds.into()))
|
||||
+ Duration::milliseconds(task.period_in_millis))
|
||||
.round_subsecs(0);
|
||||
|
||||
assert_eq!(
|
||||
|
@ -685,7 +695,9 @@ mod queue_tests {
|
|||
let task =
|
||||
insert_periodic_task(serde_json::json!(true), Utc::now(), 100, &queue.connection);
|
||||
|
||||
let tasks = queue.fetch_periodic_tasks(100).unwrap();
|
||||
let tasks = queue
|
||||
.fetch_periodic_tasks(StdDuration::from_secs(100))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].id, task.id);
|
||||
|
@ -838,14 +850,14 @@ mod queue_tests {
|
|||
fn insert_periodic_task(
|
||||
metadata: serde_json::Value,
|
||||
timestamp: DateTime<Utc>,
|
||||
period_in_seconds: i32,
|
||||
period_in_millis: i64,
|
||||
connection: &PgConnection,
|
||||
) -> PeriodicTask {
|
||||
diesel::insert_into(fang_periodic_tasks::table)
|
||||
.values(&vec![(
|
||||
fang_periodic_tasks::metadata.eq(metadata),
|
||||
fang_periodic_tasks::scheduled_at.eq(timestamp),
|
||||
fang_periodic_tasks::period_in_seconds.eq(period_in_seconds),
|
||||
fang_periodic_tasks::period_in_millis.eq(period_in_millis),
|
||||
)])
|
||||
.get_result::<PeriodicTask>(connection)
|
||||
.unwrap()
|
||||
|
|
|
@ -5,54 +5,49 @@ use std::thread;
|
|||
use std::time::Duration;
|
||||
|
||||
pub struct Scheduler {
|
||||
pub check_period: u64,
|
||||
pub error_margin_seconds: u64,
|
||||
pub check_period: Duration,
|
||||
pub error_margin: Duration,
|
||||
pub queue: Queue,
|
||||
}
|
||||
|
||||
impl Drop for Scheduler {
|
||||
fn drop(&mut self) {
|
||||
Scheduler::start(self.check_period, self.error_margin_seconds)
|
||||
Scheduler::start(self.check_period, self.error_margin)
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn start(check_period: u64, error_margin_seconds: u64) {
|
||||
pub fn start(check_period: Duration, error_margin: Duration) {
|
||||
let queue = Queue::new();
|
||||
let builder = thread::Builder::new().name("scheduler".to_string());
|
||||
|
||||
builder
|
||||
.spawn(move || {
|
||||
let scheduler = Self::new(check_period, error_margin_seconds, queue);
|
||||
let scheduler = Self::new(check_period, error_margin, queue);
|
||||
|
||||
scheduler.schedule_loop();
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn new(check_period: u64, error_margin_seconds: u64, queue: Queue) -> Self {
|
||||
pub fn new(check_period: Duration, error_margin: Duration, queue: Queue) -> Self {
|
||||
Self {
|
||||
check_period,
|
||||
queue,
|
||||
error_margin_seconds,
|
||||
error_margin,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule_loop(&self) {
|
||||
let sleep_duration = Duration::from_secs(self.check_period);
|
||||
|
||||
loop {
|
||||
self.schedule();
|
||||
|
||||
thread::sleep(sleep_duration);
|
||||
thread::sleep(self.check_period);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schedule(&self) {
|
||||
if let Some(tasks) = self
|
||||
.queue
|
||||
.fetch_periodic_tasks(self.error_margin_seconds as i64)
|
||||
{
|
||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin) {
|
||||
for task in tasks {
|
||||
self.process_task(task);
|
||||
}
|
||||
|
@ -110,8 +105,8 @@ mod task_scheduler_tests {
|
|||
fn schedules_tasks() {
|
||||
let queue = Queue::new();
|
||||
|
||||
queue.push_periodic_task(&ScheduledTask {}, 10).unwrap();
|
||||
Scheduler::start(1, 2);
|
||||
queue.push_periodic_task(&ScheduledTask {}, 10000).unwrap();
|
||||
Scheduler::start(Duration::from_secs(1), Duration::from_secs(2));
|
||||
|
||||
let sleep_duration = Duration::from_secs(15);
|
||||
thread::sleep(sleep_duration);
|
||||
|
|
|
@ -33,7 +33,7 @@ table! {
|
|||
fang_periodic_tasks (id) {
|
||||
id -> Uuid,
|
||||
metadata -> Jsonb,
|
||||
period_in_seconds -> Int4,
|
||||
period_in_millis -> Int8,
|
||||
scheduled_at -> Nullable<Timestamptz>,
|
||||
created_at -> Timestamptz,
|
||||
updated_at -> Timestamptz,
|
||||
|
|
21
src/lib.rs
21
src/lib.rs
|
@ -1,4 +1,7 @@
|
|||
#![allow(clippy::extra_unused_lifetimes)]
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum RetentionMode {
|
||||
KeepAll,
|
||||
|
@ -12,11 +15,12 @@ impl Default for RetentionMode {
|
|||
}
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SleepParams {
|
||||
pub sleep_period: u64,
|
||||
pub max_sleep_period: u64,
|
||||
pub min_sleep_period: u64,
|
||||
pub sleep_step: u64,
|
||||
pub sleep_period: Duration,
|
||||
pub max_sleep_period: Duration,
|
||||
pub min_sleep_period: Duration,
|
||||
pub sleep_step: Duration,
|
||||
}
|
||||
|
||||
impl SleepParams {
|
||||
pub fn maybe_reset_sleep_period(&mut self) {
|
||||
if self.sleep_period != self.min_sleep_period {
|
||||
|
@ -30,13 +34,14 @@ impl SleepParams {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SleepParams {
|
||||
fn default() -> Self {
|
||||
SleepParams {
|
||||
sleep_period: 5,
|
||||
max_sleep_period: 15,
|
||||
min_sleep_period: 5,
|
||||
sleep_step: 5,
|
||||
sleep_period: Duration::from_secs(5),
|
||||
max_sleep_period: Duration::from_secs(15),
|
||||
min_sleep_period: Duration::from_secs(5),
|
||||
sleep_step: Duration::from_secs(5),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue