Wait for required number of confirmations before processing block

This commit is contained in:
silverpill 2022-07-16 22:44:15 +00:00
parent c47822ff53
commit 85d35f9733
3 changed files with 21 additions and 13 deletions

View file

@ -64,10 +64,9 @@ pub async fn process_nft_events(
// Search for Transfer events // Search for Transfer events
let event_abi = contract.abi().event("Transfer")?; let event_abi = contract.abi().event("Transfer")?;
let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); let (from_block, to_block) = sync_state.get_scan_range(
let to_block = std::cmp::min( &contract.address(),
web3.eth().block_number().await?.as_u64(), web3.eth().block_number().await?.as_u64(),
to_block,
); );
let filter = FilterBuilder::default() let filter = FilterBuilder::default()
.address(vec![contract.address()]) .address(vec![contract.address()])

View file

@ -69,10 +69,9 @@ pub async fn check_subscriptions(
) -> Result<(), EthereumError> { ) -> Result<(), EthereumError> {
let db_client = &mut **get_database_client(db_pool).await?; let db_client = &mut **get_database_client(db_pool).await?;
let event_abi = contract.abi().event("UpdateSubscription")?; let event_abi = contract.abi().event("UpdateSubscription")?;
let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); let (from_block, to_block) = sync_state.get_scan_range(
let to_block = std::cmp::min( &contract.address(),
web3.eth().block_number().await?.as_u64(), web3.eth().block_number().await?.as_u64(),
to_block,
); );
let filter = FilterBuilder::default() let filter = FilterBuilder::default()
.address(vec![contract.address()]) .address(vec![contract.address()])

View file

@ -82,11 +82,21 @@ impl SyncState {
} }
} }
pub fn get_scan_range(&self, contract_address: &Address) -> (u64, u64) { pub fn get_scan_range(
&self,
contract_address: &Address,
latest_block: u64,
) -> (u64, u64) {
let current_block = self.contracts[contract_address]; let current_block = self.contracts[contract_address];
// Take reorgs into account // Take reorgs into account
let safe_current_block = current_block.saturating_sub(self.reorg_max_depth); let latest_safe_block = latest_block.saturating_sub(self.reorg_max_depth);
(safe_current_block, safe_current_block + self.sync_step) let next_block = std::cmp::min(
current_block + self.sync_step,
latest_safe_block,
);
// Next block should not be less than current block
let next_block = std::cmp::max(current_block, next_block);
(current_block, next_block)
} }
pub fn is_out_of_sync(&self, contract_address: &Address) -> bool { pub fn is_out_of_sync(&self, contract_address: &Address) -> bool {
@ -129,7 +139,7 @@ mod tests {
10, 10,
Path::new("test"), Path::new("test"),
); );
let (from_block, to_block) = sync_state.get_scan_range(&address); let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 0); assert_eq!(from_block, 0);
assert_eq!(to_block, 100); assert_eq!(to_block, 100);
} }
@ -144,8 +154,8 @@ mod tests {
10, 10,
Path::new("test"), Path::new("test"),
); );
let (from_block, to_block) = sync_state.get_scan_range(&address); let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 490); assert_eq!(from_block, 500);
assert_eq!(to_block, 590); assert_eq!(to_block, 545);
} }
} }