From 85d35f9733bcfd68cc94eae9abc3307158a6b007 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sat, 16 Jul 2022 22:44:15 +0000 Subject: [PATCH] Wait for required number of confirmations before processing block --- src/ethereum/nft.rs | 5 ++--- src/ethereum/subscriptions.rs | 5 ++--- src/ethereum/sync.rs | 24 +++++++++++++++++------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/ethereum/nft.rs b/src/ethereum/nft.rs index 0eb1469..6184826 100644 --- a/src/ethereum/nft.rs +++ b/src/ethereum/nft.rs @@ -64,10 +64,9 @@ pub async fn process_nft_events( // Search for Transfer events let event_abi = contract.abi().event("Transfer")?; - let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); - let to_block = std::cmp::min( + let (from_block, to_block) = sync_state.get_scan_range( + &contract.address(), web3.eth().block_number().await?.as_u64(), - to_block, ); let filter = FilterBuilder::default() .address(vec![contract.address()]) diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index f9170b8..92d4632 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -69,10 +69,9 @@ pub async fn check_subscriptions( ) -> Result<(), EthereumError> { let db_client = &mut **get_database_client(db_pool).await?; let event_abi = contract.abi().event("UpdateSubscription")?; - let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); - let to_block = std::cmp::min( + let (from_block, to_block) = sync_state.get_scan_range( + &contract.address(), web3.eth().block_number().await?.as_u64(), - to_block, ); let filter = FilterBuilder::default() .address(vec![contract.address()]) diff --git a/src/ethereum/sync.rs b/src/ethereum/sync.rs index 14cd181..6e97e26 100644 --- a/src/ethereum/sync.rs +++ b/src/ethereum/sync.rs @@ -82,11 +82,21 @@ impl SyncState { } } - pub fn get_scan_range(&self, contract_address: &Address) -> (u64, u64) { + pub fn get_scan_range( + &self, + contract_address: &Address, + latest_block: u64, + ) -> (u64, u64) { let current_block = self.contracts[contract_address]; // Take reorgs into account - let safe_current_block = current_block.saturating_sub(self.reorg_max_depth); - (safe_current_block, safe_current_block + self.sync_step) + let latest_safe_block = latest_block.saturating_sub(self.reorg_max_depth); + let next_block = std::cmp::min( + current_block + self.sync_step, + latest_safe_block, + ); + // Next block should not be less than current block + let next_block = std::cmp::max(current_block, next_block); + (current_block, next_block) } pub fn is_out_of_sync(&self, contract_address: &Address) -> bool { @@ -129,7 +139,7 @@ mod tests { 10, Path::new("test"), ); - let (from_block, to_block) = sync_state.get_scan_range(&address); + let (from_block, to_block) = sync_state.get_scan_range(&address, 555); assert_eq!(from_block, 0); assert_eq!(to_block, 100); } @@ -144,8 +154,8 @@ mod tests { 10, Path::new("test"), ); - let (from_block, to_block) = sync_state.get_scan_range(&address); - assert_eq!(from_block, 490); - assert_eq!(to_block, 590); + let (from_block, to_block) = sync_state.get_scan_range(&address, 555); + assert_eq!(from_block, 500); + assert_eq!(to_block, 545); } }