Rename Task enum to PeriodicTask

This commit is contained in:
silverpill 2023-02-05 09:29:46 +00:00
parent b91e6e77b5
commit 3f89d97a5c

View file

@ -20,16 +20,16 @@ use crate::ethereum::subscriptions::{
use crate::monero::subscriptions::check_monero_subscriptions; use crate::monero::subscriptions::check_monero_subscriptions;
#[derive(Debug, Eq, Hash, PartialEq)] #[derive(Debug, Eq, Hash, PartialEq)]
enum Task { enum PeriodicTask {
NftMonitor, NftMonitor,
EthereumSubscriptionMonitor, EthereumSubscriptionMonitor,
SubscriptionExpirationMonitor, SubscriptionExpirationMonitor,
MoneroPaymentMonitor, MoneroPaymentMonitor,
IncomingActivityQueue, IncomingActivityQueueExecutor,
OutgoingActivityQueue, OutgoingActivityQueueExecutor,
} }
impl Task { impl PeriodicTask {
/// Returns task period (in seconds) /// Returns task period (in seconds)
fn period(&self) -> i64 { fn period(&self) -> i64 {
match self { match self {
@ -37,23 +37,23 @@ impl Task {
Self::EthereumSubscriptionMonitor => 300, Self::EthereumSubscriptionMonitor => 300,
Self::SubscriptionExpirationMonitor => 300, Self::SubscriptionExpirationMonitor => 300,
Self::MoneroPaymentMonitor => 30, Self::MoneroPaymentMonitor => 30,
Self::IncomingActivityQueue => 5, Self::IncomingActivityQueueExecutor => 5,
Self::OutgoingActivityQueue => 5, Self::OutgoingActivityQueueExecutor => 5,
}
}
fn is_ready(&self, last_run: &Option<DateTime<Utc>>) -> bool {
match last_run {
Some(last_run) => {
let time_passed = Utc::now() - *last_run;
time_passed.num_seconds() >= self.period()
},
None => true,
} }
} }
} }
fn is_task_ready(last_run: &Option<DateTime<Utc>>, period: i64) -> bool { async fn nft_monitor(
match last_run {
Some(last_run) => {
let time_passed = Utc::now() - *last_run;
time_passed.num_seconds() >= period
},
None => true,
}
}
async fn nft_monitor_task(
maybe_blockchain: Option<&mut Blockchain>, maybe_blockchain: Option<&mut Blockchain>,
db_pool: &DbPool, db_pool: &DbPool,
token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>, token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>,
@ -76,7 +76,7 @@ async fn nft_monitor_task(
Ok(()) Ok(())
} }
async fn ethereum_subscription_monitor_task( async fn ethereum_subscription_monitor(
instance: &Instance, instance: &Instance,
maybe_blockchain: Option<&mut Blockchain>, maybe_blockchain: Option<&mut Blockchain>,
db_pool: &DbPool, db_pool: &DbPool,
@ -99,7 +99,18 @@ async fn ethereum_subscription_monitor_task(
).await.map_err(Error::from) ).await.map_err(Error::from)
} }
async fn monero_payment_monitor_task( async fn subscription_expiration_monitor(
config: &Config,
db_pool: &DbPool,
) -> Result<(), Error> {
update_expired_subscriptions(
&config.instance(),
db_pool,
).await?;
Ok(())
}
async fn monero_payment_monitor(
config: &Config, config: &Config,
db_pool: &DbPool, db_pool: &DbPool,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -117,21 +128,21 @@ async fn monero_payment_monitor_task(
Ok(()) Ok(())
} }
async fn incoming_activity_queue_task( async fn incoming_activity_queue_executor(
config: &Config, config: &Config,
db_pool: &DbPool, db_pool: &DbPool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let db_client = &mut **get_database_client(db_pool).await?; let db_client = &mut **get_database_client(db_pool).await?;
let duration_max = Duration::from_secs(600); let duration_max = Duration::from_secs(600);
let task_completed = process_queued_incoming_activities(config, db_client); let completed = process_queued_incoming_activities(config, db_client);
match tokio::time::timeout(duration_max, task_completed).await { match tokio::time::timeout(duration_max, completed).await {
Ok(result) => result?, Ok(result) => result?,
Err(_) => log::error!("task timeout: IncomingActivityQueue"), Err(_) => log::error!("incoming activity queue executor timeout"),
}; };
Ok(()) Ok(())
} }
async fn outgoing_activity_queue_task( async fn outgoing_activity_queue_executor(
config: &Config, config: &Config,
db_pool: &DbPool, db_pool: &DbPool,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -146,12 +157,12 @@ pub fn run(
) -> () { ) -> () {
tokio::spawn(async move { tokio::spawn(async move {
let mut scheduler_state = HashMap::from([ let mut scheduler_state = HashMap::from([
(Task::NftMonitor, None), (PeriodicTask::NftMonitor, None),
(Task::EthereumSubscriptionMonitor, None), (PeriodicTask::EthereumSubscriptionMonitor, None),
(Task::SubscriptionExpirationMonitor, None), (PeriodicTask::SubscriptionExpirationMonitor, None),
(Task::MoneroPaymentMonitor, None), (PeriodicTask::MoneroPaymentMonitor, None),
(Task::IncomingActivityQueue, None), (PeriodicTask::IncomingActivityQueueExecutor, None),
(Task::OutgoingActivityQueue, None), (PeriodicTask::OutgoingActivityQueueExecutor, None),
]); ]);
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
@ -160,38 +171,35 @@ pub fn run(
interval.tick().await; interval.tick().await;
for (task, last_run) in scheduler_state.iter_mut() { for (task, last_run) in scheduler_state.iter_mut() {
if !is_task_ready(last_run, task.period()) { if !task.is_ready(last_run) {
continue; continue;
}; };
let task_result = match task { let task_result = match task {
Task::NftMonitor => { PeriodicTask::NftMonitor => {
nft_monitor_task( nft_monitor(
maybe_blockchain.as_mut(), maybe_blockchain.as_mut(),
&db_pool, &db_pool,
&mut token_waitlist_map, &mut token_waitlist_map,
).await ).await
}, },
Task::EthereumSubscriptionMonitor => { PeriodicTask::EthereumSubscriptionMonitor => {
ethereum_subscription_monitor_task( ethereum_subscription_monitor(
&config.instance(), &config.instance(),
maybe_blockchain.as_mut(), maybe_blockchain.as_mut(),
&db_pool, &db_pool,
).await ).await
}, },
Task::SubscriptionExpirationMonitor => { PeriodicTask::SubscriptionExpirationMonitor => {
update_expired_subscriptions( subscription_expiration_monitor(&config, &db_pool).await
&config.instance(),
&db_pool,
).await.map_err(Error::from)
}, },
Task::MoneroPaymentMonitor => { PeriodicTask::MoneroPaymentMonitor => {
monero_payment_monitor_task(&config, &db_pool).await monero_payment_monitor(&config, &db_pool).await
}, },
Task::IncomingActivityQueue => { PeriodicTask::IncomingActivityQueueExecutor => {
incoming_activity_queue_task(&config, &db_pool).await incoming_activity_queue_executor(&config, &db_pool).await
}, },
Task::OutgoingActivityQueue => { PeriodicTask::OutgoingActivityQueueExecutor => {
outgoing_activity_queue_task(&config, &db_pool).await outgoing_activity_queue_executor(&config, &db_pool).await
}, },
}; };
task_result.unwrap_or_else(|err| { task_result.unwrap_or_else(|err| {