Refactor scheduler module
This commit is contained in:
parent
6615f372e2
commit
0f74175b29
1 changed files with 44 additions and 18 deletions
|
@ -37,6 +37,41 @@ fn is_task_ready(last_run: &Option<DateTime<Utc>>, period: i64) -> bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn nft_monitor_task(
|
||||||
|
maybe_blockchain: Option<&mut Blockchain>,
|
||||||
|
db_pool: &Pool,
|
||||||
|
token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let blockchain = match maybe_blockchain {
|
||||||
|
Some(blockchain) => blockchain,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
process_nft_events(
|
||||||
|
&blockchain.contract_set.web3,
|
||||||
|
&blockchain.contract_set.collectible,
|
||||||
|
&mut blockchain.sync_state,
|
||||||
|
db_pool,
|
||||||
|
token_waitlist_map,
|
||||||
|
).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn subscription_monitor_task(
|
||||||
|
maybe_blockchain: Option<&mut Blockchain>,
|
||||||
|
db_pool: &Pool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let blockchain = match maybe_blockchain {
|
||||||
|
Some(blockchain) => blockchain,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
check_subscriptions(
|
||||||
|
&blockchain.contract_set.web3,
|
||||||
|
&blockchain.contract_set.subscription,
|
||||||
|
&mut blockchain.sync_state,
|
||||||
|
db_pool,
|
||||||
|
).await.map_err(Error::from)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn run(
|
pub fn run(
|
||||||
_config: Config,
|
_config: Config,
|
||||||
mut maybe_blockchain: Option<Blockchain>,
|
mut maybe_blockchain: Option<Blockchain>,
|
||||||
|
@ -58,26 +93,17 @@ pub fn run(
|
||||||
};
|
};
|
||||||
let task_result = match task {
|
let task_result = match task {
|
||||||
Task::NftMonitor => {
|
Task::NftMonitor => {
|
||||||
if let Some(blockchain) = maybe_blockchain.as_mut() {
|
nft_monitor_task(
|
||||||
// Monitor events only if ethereum integration is enabled
|
maybe_blockchain.as_mut(),
|
||||||
process_nft_events(
|
&db_pool,
|
||||||
&blockchain.contract_set.web3,
|
&mut token_waitlist_map,
|
||||||
&blockchain.contract_set.collectible,
|
).await
|
||||||
&mut blockchain.sync_state,
|
|
||||||
&db_pool,
|
|
||||||
&mut token_waitlist_map,
|
|
||||||
).await.map_err(Error::from)
|
|
||||||
} else { Ok(()) }
|
|
||||||
},
|
},
|
||||||
Task::SubscriptionMonitor => {
|
Task::SubscriptionMonitor => {
|
||||||
if let Some(blockchain) = maybe_blockchain.as_mut() {
|
subscription_monitor_task(
|
||||||
check_subscriptions(
|
maybe_blockchain.as_mut(),
|
||||||
&blockchain.contract_set.web3,
|
&db_pool,
|
||||||
&blockchain.contract_set.subscription,
|
).await
|
||||||
&mut blockchain.sync_state,
|
|
||||||
&db_pool,
|
|
||||||
).await.map_err(Error::from)
|
|
||||||
} else { Ok(()) }
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
task_result.unwrap_or_else(|err| {
|
task_result.unwrap_or_else(|err| {
|
||||||
|
|
Loading…
Reference in a new issue