From 0ab374e9eaa810647f0b41dc6cfdf29b3cc38d18 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sun, 5 Feb 2023 12:11:30 +0000 Subject: [PATCH] Move periodic tasks to job_queue::periodic_tasks module --- src/job_queue/mod.rs | 1 + src/job_queue/periodic_tasks.rs | 117 ++++++++++++++++++++++++++++++++ src/job_queue/scheduler.rs | 116 ++----------------------------- 3 files changed, 123 insertions(+), 111 deletions(-) create mode 100644 src/job_queue/periodic_tasks.rs diff --git a/src/job_queue/mod.rs b/src/job_queue/mod.rs index 81b3546..fff7672 100644 --- a/src/job_queue/mod.rs +++ b/src/job_queue/mod.rs @@ -1 +1,2 @@ +mod periodic_tasks; pub mod scheduler; diff --git a/src/job_queue/periodic_tasks.rs b/src/job_queue/periodic_tasks.rs new file mode 100644 index 0000000..06fd597 --- /dev/null +++ b/src/job_queue/periodic_tasks.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +use crate::activitypub::queues::{ + process_queued_incoming_activities, + process_queued_outgoing_activities, +}; +use crate::config::Config; +use crate::database::{get_database_client, DbPool}; +use crate::ethereum::contracts::Blockchain; +use crate::ethereum::nft::process_nft_events; +use crate::ethereum::subscriptions::{ + check_ethereum_subscriptions, + update_expired_subscriptions, +}; +use crate::monero::subscriptions::check_monero_subscriptions; + +pub async fn nft_monitor( + maybe_blockchain: Option<&mut Blockchain>, + db_pool: &DbPool, + token_waitlist_map: &mut HashMap>, +) -> Result<(), Error> { + let blockchain = match maybe_blockchain { + Some(blockchain) => blockchain, + None => return Ok(()), + }; + let collectible = match &blockchain.contract_set.collectible { + Some(contract) => contract, + None => return Ok(()), // feature not enabled + }; + process_nft_events( + &blockchain.contract_set.web3, + collectible, + &mut blockchain.sync_state, + db_pool, + token_waitlist_map, + ).await?; + Ok(()) +} + +pub async fn ethereum_subscription_monitor( + config: &Config, + maybe_blockchain: Option<&mut Blockchain>, + db_pool: &DbPool, +) -> Result<(), Error> { + let blockchain = match maybe_blockchain { + Some(blockchain) => blockchain, + None => return Ok(()), + }; + let subscription = match &blockchain.contract_set.subscription { + Some(contract) => contract, + None => return Ok(()), // feature not enabled + }; + check_ethereum_subscriptions( + &blockchain.config, + &config.instance(), + &blockchain.contract_set.web3, + subscription, + &mut blockchain.sync_state, + db_pool, + ).await.map_err(Error::from) +} + +pub async fn subscription_expiration_monitor( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + update_expired_subscriptions( + &config.instance(), + db_pool, + ).await?; + Ok(()) +} + +pub async fn monero_payment_monitor( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + let maybe_monero_config = config.blockchain() + .and_then(|conf| conf.monero_config()); + let monero_config = match maybe_monero_config { + Some(monero_config) => monero_config, + None => return Ok(()), // not configured + }; + check_monero_subscriptions( + &config.instance(), + monero_config, + db_pool, + ).await?; + Ok(()) +} + +pub async fn incoming_activity_queue_executor( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + let db_client = &mut **get_database_client(db_pool).await?; + let duration_max = Duration::from_secs(600); + let completed = process_queued_incoming_activities(config, db_client); + match tokio::time::timeout(duration_max, completed).await { + Ok(result) => result?, + Err(_) => log::error!("incoming activity queue executor timeout"), + }; + Ok(()) +} + +pub async fn outgoing_activity_queue_executor( + config: &Config, + db_pool: &DbPool, +) -> Result<(), Error> { + process_queued_outgoing_activities(config, db_pool).await?; + Ok(()) +} diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index a19caa3..f8ad0c8 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -1,23 +1,14 @@ use std::collections::HashMap; use std::time::Duration; -use anyhow::Error; use chrono::{DateTime, Utc}; use uuid::Uuid; -use crate::activitypub::queues::{ - process_queued_incoming_activities, - process_queued_outgoing_activities, -}; -use crate::config::{Config, Instance}; -use crate::database::{get_database_client, DbPool}; +use crate::config::Config; +use crate::database::DbPool; use crate::ethereum::contracts::Blockchain; -use crate::ethereum::nft::process_nft_events; -use crate::ethereum::subscriptions::{ - check_ethereum_subscriptions, - update_expired_subscriptions, -}; -use crate::monero::subscriptions::check_monero_subscriptions; + +use super::periodic_tasks::*; #[derive(Debug, Eq, Hash, PartialEq)] enum PeriodicTask { @@ -53,103 +44,6 @@ impl PeriodicTask { } } -async fn nft_monitor( - maybe_blockchain: Option<&mut Blockchain>, - db_pool: &DbPool, - token_waitlist_map: &mut HashMap>, -) -> Result<(), Error> { - let blockchain = match maybe_blockchain { - Some(blockchain) => blockchain, - None => return Ok(()), - }; - let collectible = match &blockchain.contract_set.collectible { - Some(contract) => contract, - None => return Ok(()), // feature not enabled - }; - process_nft_events( - &blockchain.contract_set.web3, - collectible, - &mut blockchain.sync_state, - db_pool, - token_waitlist_map, - ).await?; - Ok(()) -} - -async fn ethereum_subscription_monitor( - instance: &Instance, - maybe_blockchain: Option<&mut Blockchain>, - db_pool: &DbPool, -) -> Result<(), Error> { - let blockchain = match maybe_blockchain { - Some(blockchain) => blockchain, - None => return Ok(()), - }; - let subscription = match &blockchain.contract_set.subscription { - Some(contract) => contract, - None => return Ok(()), // feature not enabled - }; - check_ethereum_subscriptions( - &blockchain.config, - instance, - &blockchain.contract_set.web3, - subscription, - &mut blockchain.sync_state, - db_pool, - ).await.map_err(Error::from) -} - -async fn subscription_expiration_monitor( - config: &Config, - db_pool: &DbPool, -) -> Result<(), Error> { - update_expired_subscriptions( - &config.instance(), - db_pool, - ).await?; - Ok(()) -} - -async fn monero_payment_monitor( - config: &Config, - db_pool: &DbPool, -) -> Result<(), Error> { - let maybe_monero_config = config.blockchain() - .and_then(|conf| conf.monero_config()); - let monero_config = match maybe_monero_config { - Some(monero_config) => monero_config, - None => return Ok(()), // not configured - }; - check_monero_subscriptions( - &config.instance(), - monero_config, - db_pool, - ).await?; - Ok(()) -} - -async fn incoming_activity_queue_executor( - config: &Config, - db_pool: &DbPool, -) -> Result<(), Error> { - let db_client = &mut **get_database_client(db_pool).await?; - let duration_max = Duration::from_secs(600); - let completed = process_queued_incoming_activities(config, db_client); - match tokio::time::timeout(duration_max, completed).await { - Ok(result) => result?, - Err(_) => log::error!("incoming activity queue executor timeout"), - }; - Ok(()) -} - -async fn outgoing_activity_queue_executor( - config: &Config, - db_pool: &DbPool, -) -> Result<(), Error> { - process_queued_outgoing_activities(config, db_pool).await?; - Ok(()) -} - pub fn run( config: Config, mut maybe_blockchain: Option, @@ -184,7 +78,7 @@ pub fn run( }, PeriodicTask::EthereumSubscriptionMonitor => { ethereum_subscription_monitor( - &config.instance(), + &config, maybe_blockchain.as_mut(), &db_pool, ).await