diff --git a/src/ethereum/contracts.rs b/src/ethereum/contracts.rs index cd7ebf3..376efb6 100644 --- a/src/ethereum/contracts.rs +++ b/src/ethereum/contracts.rs @@ -10,6 +10,10 @@ use web3::{ use crate::config::BlockchainConfig; use super::api::connect; use super::errors::EthereumError; +use super::sync::{ + get_current_block_number, + CHAIN_REORG_MAX_DEPTH, +}; use super::utils::parse_address; pub const ADAPTER: &str = "IAdapter"; @@ -45,6 +49,8 @@ pub fn load_abi( pub struct ContractSet { pub web3: Web3, + // Last synced block + pub current_block: u64, #[allow(dead_code)] pub adapter: Contract, @@ -55,6 +61,7 @@ pub struct ContractSet { pub async fn get_contracts( config: &BlockchainConfig, + storage_dir: &Path, ) -> Result { let web3 = connect(&config.api_url)?; let chain_id = web3.eth().chain_id().await?; @@ -93,8 +100,14 @@ pub async fn get_contracts( )?; log::info!("subscription contract address is {:?}", subscription.address()); + let current_block = get_current_block_number(&web3, storage_dir).await?; + log::info!("current block is {}", current_block); + // Take reorgs into account + let current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH); + let contract_set = ContractSet { web3, + current_block, adapter, collectible, subscription, diff --git a/src/ethereum/errors.rs b/src/ethereum/errors.rs index 127025c..d0756a4 100644 --- a/src/ethereum/errors.rs +++ b/src/ethereum/errors.rs @@ -30,5 +30,8 @@ pub enum EthereumError { DatabaseError(#[from] DatabaseError), #[error("signature error")] - SigError(#[from] SignatureError), + SignatureError(#[from] SignatureError), + + #[error("{0}")] + OtherError(&'static str), } diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index 326c63e..5cd3369 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -7,4 +7,5 @@ pub mod identity; pub mod nft; pub mod signatures; pub mod subscriptions; +mod sync; pub mod utils; diff --git a/src/ethereum/nft.rs b/src/ethereum/nft.rs index 0936a9f..433d55b 100644 --- a/src/ethereum/nft.rs +++ b/src/ethereum/nft.rs @@ -32,6 +32,7 @@ const TOKEN_WAIT_TIME: i64 = 10; // in minutes pub async fn process_nft_events( web3: &Web3, contract: &Contract, + from_block: u64, db_pool: &Pool, token_waitlist_map: &mut HashMap>, ) -> Result<(), EthereumError> { @@ -63,7 +64,7 @@ pub async fn process_nft_events( let filter = FilterBuilder::default() .address(vec![contract.address()]) .topics(Some(vec![event_abi.signature()]), None, None, None) - .from_block(BlockNumber::Earliest) + .from_block(BlockNumber::Number(from_block.into())) .build(); let logs = web3.eth().logs(filter).await?; for log in logs { diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index cc5a269..acc4945 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -42,6 +42,7 @@ fn u256_to_date(value: U256) -> Result, ConversionError> { pub async fn check_subscriptions( web3: &Web3, contract: &Contract, + from_block: u64, db_pool: &Pool, ) -> Result<(), EthereumError> { let db_client = &mut **get_database_client(db_pool).await?; @@ -49,7 +50,7 @@ pub async fn check_subscriptions( let filter = FilterBuilder::default() .address(vec![contract.address()]) .topics(Some(vec![event_abi.signature()]), None, None, None) - .from_block(BlockNumber::Earliest) + .from_block(BlockNumber::Number(from_block.into())) .build(); let logs = web3.eth().logs(filter).await?; for log in logs { diff --git a/src/ethereum/sync.rs b/src/ethereum/sync.rs new file mode 100644 index 0000000..fa1577d --- /dev/null +++ b/src/ethereum/sync.rs @@ -0,0 +1,50 @@ +use std::path::Path; + +use web3::{api::Web3, transports::Http}; + +use crate::utils::files::write_file; +use super::errors::EthereumError; + +const BLOCK_NUMBER_FILE_NAME: &str = "current_block"; +pub const CHAIN_REORG_MAX_DEPTH: u64 = 100; + +fn save_current_block_number( + storage_dir: &Path, + block_number: u64, +) -> Result<(), EthereumError> { + let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); + write_file(block_number.to_string().as_bytes(), &file_path) + .map_err(|_| EthereumError::OtherError("failed to save current block"))?; + Ok(()) +} + +fn read_current_block_number( + storage_dir: &Path, +) -> Result, EthereumError> { + let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); + let block_number = if file_path.exists() { + let block_number: u64 = std::fs::read_to_string(&file_path) + .map_err(|_| EthereumError::OtherError("failed to read current block"))? + .parse() + .map_err(|_| EthereumError::OtherError("failed to parse block number"))?; + Some(block_number) + } else { + None + }; + Ok(block_number) +} + +pub async fn get_current_block_number( + web3: &Web3, + storage_dir: &Path, +) -> Result { + let block_number = match read_current_block_number(storage_dir)? { + Some(block_number) => block_number, + None => { + let block_number = web3.eth().block_number().await?.as_u64(); + save_current_block_number(storage_dir, block_number)?; + block_number + }, + }; + Ok(block_number) +} diff --git a/src/main.rs b/src/main.rs index 2aca903..cbbd520 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> std::io::Result<()> { let maybe_contract_set = if let Some(blockchain_config) = &config.blockchain { // Create blockchain interface - get_contracts(blockchain_config).await + get_contracts(blockchain_config, &config.storage_dir).await .map_err(|err| log::error!("{}", err)) .ok() } else { diff --git a/src/scheduler.rs b/src/scheduler.rs index 3b2bca6..b996a19 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -26,6 +26,7 @@ pub fn run( process_nft_events( &contract_set.web3, &contract_set.collectible, + contract_set.current_block, &db_pool, &mut token_waitlist_map, ).await.unwrap_or_else(|err| { @@ -34,6 +35,7 @@ pub fn run( check_subscriptions( &contract_set.web3, &contract_set.subscription, + contract_set.current_block, &db_pool, ).await.unwrap_or_else(|err| { log::error!("{}", err);