diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index 5feef77..a19caa3 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -20,16 +20,16 @@ use crate::ethereum::subscriptions::{ use crate::monero::subscriptions::check_monero_subscriptions; #[derive(Debug, Eq, Hash, PartialEq)] -enum Task { +enum PeriodicTask { NftMonitor, EthereumSubscriptionMonitor, SubscriptionExpirationMonitor, MoneroPaymentMonitor, - IncomingActivityQueue, - OutgoingActivityQueue, + IncomingActivityQueueExecutor, + OutgoingActivityQueueExecutor, } -impl Task { +impl PeriodicTask { /// Returns task period (in seconds) fn period(&self) -> i64 { match self { @@ -37,23 +37,23 @@ impl Task { Self::EthereumSubscriptionMonitor => 300, Self::SubscriptionExpirationMonitor => 300, Self::MoneroPaymentMonitor => 30, - Self::IncomingActivityQueue => 5, - Self::OutgoingActivityQueue => 5, + Self::IncomingActivityQueueExecutor => 5, + Self::OutgoingActivityQueueExecutor => 5, + } + } + + fn is_ready(&self, last_run: &Option>) -> bool { + match last_run { + Some(last_run) => { + let time_passed = Utc::now() - *last_run; + time_passed.num_seconds() >= self.period() + }, + None => true, } } } -fn is_task_ready(last_run: &Option>, period: i64) -> bool { - match last_run { - Some(last_run) => { - let time_passed = Utc::now() - *last_run; - time_passed.num_seconds() >= period - }, - None => true, - } -} - -async fn nft_monitor_task( +async fn nft_monitor( maybe_blockchain: Option<&mut Blockchain>, db_pool: &DbPool, token_waitlist_map: &mut HashMap>, @@ -76,7 +76,7 @@ async fn nft_monitor_task( Ok(()) } -async fn ethereum_subscription_monitor_task( +async fn ethereum_subscription_monitor( instance: &Instance, maybe_blockchain: Option<&mut Blockchain>, db_pool: &DbPool, @@ -99,7 +99,18 @@ async fn ethereum_subscription_monitor_task( ).await.map_err(Error::from) } -async fn monero_payment_monitor_task( +async fn subscription_expiration_monitor( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + update_expired_subscriptions( + &config.instance(), + db_pool, + ).await?; + Ok(()) +} + +async fn monero_payment_monitor( config: &Config, db_pool: &DbPool, ) -> Result<(), Error> { @@ -117,21 +128,21 @@ async fn monero_payment_monitor_task( Ok(()) } -async fn incoming_activity_queue_task( +async fn incoming_activity_queue_executor( config: &Config, db_pool: &DbPool, ) -> Result<(), Error> { let db_client = &mut **get_database_client(db_pool).await?; let duration_max = Duration::from_secs(600); - let task_completed = process_queued_incoming_activities(config, db_client); - match tokio::time::timeout(duration_max, task_completed).await { + let completed = process_queued_incoming_activities(config, db_client); + match tokio::time::timeout(duration_max, completed).await { Ok(result) => result?, - Err(_) => log::error!("task timeout: IncomingActivityQueue"), + Err(_) => log::error!("incoming activity queue executor timeout"), }; Ok(()) } -async fn outgoing_activity_queue_task( +async fn outgoing_activity_queue_executor( config: &Config, db_pool: &DbPool, ) -> Result<(), Error> { @@ -146,12 +157,12 @@ pub fn run( ) -> () { tokio::spawn(async move { let mut scheduler_state = HashMap::from([ - (Task::NftMonitor, None), - (Task::EthereumSubscriptionMonitor, None), - (Task::SubscriptionExpirationMonitor, None), - (Task::MoneroPaymentMonitor, None), - (Task::IncomingActivityQueue, None), - (Task::OutgoingActivityQueue, None), + (PeriodicTask::NftMonitor, None), + (PeriodicTask::EthereumSubscriptionMonitor, None), + (PeriodicTask::SubscriptionExpirationMonitor, None), + (PeriodicTask::MoneroPaymentMonitor, None), + (PeriodicTask::IncomingActivityQueueExecutor, None), + (PeriodicTask::OutgoingActivityQueueExecutor, None), ]); let mut interval = tokio::time::interval(Duration::from_secs(5)); @@ -160,38 +171,35 @@ pub fn run( interval.tick().await; for (task, last_run) in scheduler_state.iter_mut() { - if !is_task_ready(last_run, task.period()) { + if !task.is_ready(last_run) { continue; }; let task_result = match task { - Task::NftMonitor => { - nft_monitor_task( + PeriodicTask::NftMonitor => { + nft_monitor( maybe_blockchain.as_mut(), &db_pool, &mut token_waitlist_map, ).await }, - Task::EthereumSubscriptionMonitor => { - ethereum_subscription_monitor_task( + PeriodicTask::EthereumSubscriptionMonitor => { + ethereum_subscription_monitor( &config.instance(), maybe_blockchain.as_mut(), &db_pool, ).await }, - Task::SubscriptionExpirationMonitor => { - update_expired_subscriptions( - &config.instance(), - &db_pool, - ).await.map_err(Error::from) + PeriodicTask::SubscriptionExpirationMonitor => { + subscription_expiration_monitor(&config, &db_pool).await }, - Task::MoneroPaymentMonitor => { - monero_payment_monitor_task(&config, &db_pool).await + PeriodicTask::MoneroPaymentMonitor => { + monero_payment_monitor(&config, &db_pool).await }, - Task::IncomingActivityQueue => { - incoming_activity_queue_task(&config, &db_pool).await + PeriodicTask::IncomingActivityQueueExecutor => { + incoming_activity_queue_executor(&config, &db_pool).await }, - Task::OutgoingActivityQueue => { - outgoing_activity_queue_task(&config, &db_pool).await + PeriodicTask::OutgoingActivityQueueExecutor => { + outgoing_activity_queue_executor(&config, &db_pool).await }, }; task_result.unwrap_or_else(|err| {