From 2f532307a16fc8c51a5d0765f94f7ea00759f637 Mon Sep 17 00:00:00 2001 From: silverpill Date: Fri, 24 Jun 2022 23:27:09 +0000 Subject: [PATCH] Switch to incremental blockchain synchronization --- src/ethereum/contracts.rs | 22 +++++++----- src/ethereum/nft.rs | 27 ++++++++++---- src/ethereum/subscriptions.rs | 13 ++++++- src/ethereum/sync.rs | 66 +++++++++++++++++++++++++++++++++-- src/main.rs | 6 ++-- src/scheduler.rs | 20 +++++------ 6 files changed, 123 insertions(+), 31 deletions(-) diff --git a/src/ethereum/contracts.rs b/src/ethereum/contracts.rs index 241d973..380c974 100644 --- a/src/ethereum/contracts.rs +++ b/src/ethereum/contracts.rs @@ -12,7 +12,7 @@ use super::api::connect; use super::errors::EthereumError; use super::sync::{ get_current_block_number, - CHAIN_REORG_MAX_DEPTH, + SyncState, }; use super::utils::parse_address; @@ -49,18 +49,22 @@ fn load_abi( #[derive(Clone)] pub struct ContractSet { pub web3: Web3, - // Last synced block - pub current_block: u64, pub adapter: Contract, pub collectible: Contract, pub subscription: Contract, } +#[derive(Clone)] +pub struct Blockchain { + pub contract_set: ContractSet, + pub sync_state: SyncState, +} + pub async fn get_contracts( config: &BlockchainConfig, storage_dir: &Path, -) -> Result { +) -> Result { let web3 = connect(&config.api_url)?; let chain_id = web3.eth().chain_id().await?; if chain_id != config.ethereum_chain_id().into() { @@ -100,15 +104,17 @@ pub async fn get_contracts( let current_block = get_current_block_number(&web3, storage_dir).await?; log::info!("current block is {}", current_block); - // Take reorgs into account - let current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH); + let sync_state = SyncState::new( + current_block, + vec![collectible.address(), subscription.address()], + storage_dir, + ); let contract_set = ContractSet { web3, - current_block, adapter, collectible, subscription, }; - Ok(contract_set) + Ok(Blockchain { contract_set, sync_state }) } diff --git a/src/ethereum/nft.rs b/src/ethereum/nft.rs index 433d55b..e8291f6 100644 --- a/src/ethereum/nft.rs +++ b/src/ethereum/nft.rs @@ -23,6 +23,7 @@ use crate::models::posts::queries::{ }; use super::errors::EthereumError; use super::signatures::{sign_contract_call, CallArgs, SignatureData}; +use super::sync::{save_current_block_number, SyncState}; use super::utils::parse_address; const TOKEN_WAIT_TIME: i64 = 10; // in minutes @@ -32,7 +33,7 @@ const TOKEN_WAIT_TIME: i64 = 10; // in minutes pub async fn process_nft_events( web3: &Web3, contract: &Contract, - from_block: u64, + sync_state: &mut SyncState, db_pool: &Pool, token_waitlist_map: &mut HashMap>, ) -> Result<(), EthereumError> { @@ -51,20 +52,28 @@ pub async fn process_nft_events( duration.num_minutes() < TOKEN_WAIT_TIME }) .count(); - if token_waitlist_active_count == 0 { - return Ok(()) + if token_waitlist_active_count > 0 { + log::info!( + "{} posts are waiting for confirmation of tokenization tx", + token_waitlist_active_count, + ); + } else if !sync_state.is_out_of_sync(&contract.address()) { + // Don't scan blockchain if already in sync and waitlist is empty + return Ok(()); }; - log::info!( - "{} posts are waiting for confirmation of tokenization tx", - token_waitlist_active_count, - ); // Search for Transfer events let event_abi = contract.abi().event("Transfer")?; + let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); + let to_block = std::cmp::min( + web3.eth().block_number().await?.as_u64(), + to_block, + ); let filter = FilterBuilder::default() .address(vec![contract.address()]) .topics(Some(vec![event_abi.signature()]), None, None, None) .from_block(BlockNumber::Number(from_block.into())) + .to_block(BlockNumber::Number(to_block.into())) .build(); let logs = web3.eth().logs(filter).await?; for log in logs { @@ -113,6 +122,10 @@ pub async fn process_nft_events( }; }; }; + + if sync_state.update(&contract.address(), to_block) { + save_current_block_number(&sync_state.storage_dir, sync_state.current_block)?; + }; Ok(()) } diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index acc4945..eee7876 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -26,6 +26,7 @@ use crate::models::subscriptions::queries::{ use crate::models::users::queries::get_user_by_wallet_address; use super::errors::EthereumError; use super::signatures::{sign_contract_call, CallArgs, SignatureData}; +use super::sync::{save_current_block_number, SyncState}; use super::utils::{address_to_string, parse_address}; const ETHEREUM: Currency = Currency::Ethereum; @@ -42,15 +43,21 @@ fn u256_to_date(value: U256) -> Result, ConversionError> { pub async fn check_subscriptions( web3: &Web3, contract: &Contract, - from_block: u64, + sync_state: &mut SyncState, db_pool: &Pool, ) -> Result<(), EthereumError> { let db_client = &mut **get_database_client(db_pool).await?; let event_abi = contract.abi().event("UpdateSubscription")?; + let (from_block, to_block) = sync_state.get_scan_range(&contract.address()); + let to_block = std::cmp::min( + web3.eth().block_number().await?.as_u64(), + to_block, + ); let filter = FilterBuilder::default() .address(vec![contract.address()]) .topics(Some(vec![event_abi.signature()]), None, None, None) .from_block(BlockNumber::Number(from_block.into())) + .to_block(BlockNumber::Number(to_block.into())) .build(); let logs = web3.eth().logs(filter).await?; for log in logs { @@ -180,6 +187,10 @@ pub async fn check_subscriptions( subscription.recipient_id, ); }; + + if sync_state.update(&contract.address(), to_block) { + save_current_block_number(&sync_state.storage_dir, sync_state.current_block)?; + }; Ok(()) } diff --git a/src/ethereum/sync.rs b/src/ethereum/sync.rs index ca8a5b8..a156b44 100644 --- a/src/ethereum/sync.rs +++ b/src/ethereum/sync.rs @@ -1,12 +1,14 @@ -use std::path::Path; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; -use web3::{api::Web3, transports::Http}; +use web3::{api::Web3, transports::Http, types::Address}; use crate::utils::files::write_file; use super::errors::EthereumError; const BLOCK_NUMBER_FILE_NAME: &str = "current_block"; -pub const CHAIN_REORG_MAX_DEPTH: u64 = 100; +const CHAIN_REORG_MAX_DEPTH: u64 = 100; +const CHAIN_SYNC_STEP: u64 = 1000; pub fn save_current_block_number( storage_dir: &Path, @@ -49,3 +51,61 @@ pub async fn get_current_block_number( }; Ok(block_number) } + +#[derive(Clone)] +pub struct SyncState { + pub current_block: u64, + contracts: HashMap, + + pub storage_dir: PathBuf, +} + +impl SyncState { + pub fn new( + current_block: u64, + contracts: Vec
, + storage_dir: &Path, + ) -> Self { + let mut contract_map = HashMap::new(); + for address in contracts { + contract_map.insert(address, current_block); + }; + Self { + current_block, + contracts: contract_map, + storage_dir: storage_dir.to_path_buf(), + } + } + + pub fn get_scan_range(&self, contract_address: &Address) -> (u64, u64) { + let current_block = self.contracts[contract_address]; + // Take reorgs into account + let safe_current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH); + (safe_current_block, safe_current_block + CHAIN_SYNC_STEP) + } + + pub fn is_out_of_sync(&self, contract_address: &Address) -> bool { + if let Some(max_value) = self.contracts.values().max().copied() { + if self.contracts[contract_address] == max_value { + return false; + }; + }; + true + } + + pub fn update( + &mut self, + contract_address: &Address, + block_number: u64, + ) -> bool { + self.contracts.insert(*contract_address, block_number); + if let Some(min_value) = self.contracts.values().min().copied() { + if min_value > self.current_block { + self.current_block = min_value; + log::info!("synced to block {}", self.current_block); + return true; + }; + }; + false + } +} diff --git a/src/main.rs b/src/main.rs index 61f8d26..c516ccd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -58,8 +58,10 @@ async fn main() -> std::io::Result<()> { } else { None }; + let maybe_contract_set = maybe_blockchain.clone() + .map(|blockchain| blockchain.contract_set); - scheduler::run(config.clone(), maybe_blockchain.clone(), db_pool.clone()); + scheduler::run(config.clone(), maybe_blockchain, db_pool.clone()); log::info!("scheduler started"); let http_socket_addr = format!( @@ -116,7 +118,7 @@ async fn main() -> std::io::Result<()> { .app_data(web::JsonConfig::default().limit(MAX_UPLOAD_SIZE)) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(db_pool.clone())) - .app_data(web::Data::new(maybe_blockchain.clone())) + .app_data(web::Data::new(maybe_contract_set.clone())) .app_data(web::Data::clone(&inbox_mutex)) .service(actix_files::Files::new( "/media", diff --git a/src/scheduler.rs b/src/scheduler.rs index b1b4f21..534afde 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::config::Config; use crate::database::Pool; -use crate::ethereum::contracts::ContractSet; +use crate::ethereum::contracts::Blockchain; use crate::ethereum::nft::process_nft_events; use crate::ethereum::subscriptions::check_subscriptions; @@ -39,7 +39,7 @@ fn is_task_ready(last_run: &Option>, period: i64) -> bool { pub fn run( _config: Config, - maybe_contract_set: Option, + mut maybe_blockchain: Option, db_pool: Pool, ) -> () { tokio::spawn(async move { @@ -58,23 +58,23 @@ pub fn run( }; let task_result = match task { Task::NftMonitor => { - if let Some(contract_set) = maybe_contract_set.as_ref() { + if let Some(blockchain) = maybe_blockchain.as_mut() { // Monitor events only if ethereum integration is enabled process_nft_events( - &contract_set.web3, - &contract_set.collectible, - contract_set.current_block, + &blockchain.contract_set.web3, + &blockchain.contract_set.collectible, + &mut blockchain.sync_state, &db_pool, &mut token_waitlist_map, ).await.map_err(Error::from) } else { Ok(()) } }, Task::SubscriptionMonitor => { - if let Some(contract_set) = maybe_contract_set.as_ref() { + if let Some(blockchain) = maybe_blockchain.as_mut() { check_subscriptions( - &contract_set.web3, - &contract_set.subscription, - contract_set.current_block, + &blockchain.contract_set.web3, + &blockchain.contract_set.subscription, + &mut blockchain.sync_state, &db_pool, ).await.map_err(Error::from) } else { Ok(()) }