From 2b6de17fb967e17e6d1c09d2360abba371d67933 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sun, 12 Mar 2023 01:53:59 +0000 Subject: [PATCH] Save latest ethereum block number to database instead of file --- CHANGELOG.md | 8 ++++++ mitra-cli/src/cli.rs | 6 ++-- src/ethereum/contracts.rs | 5 ++-- src/ethereum/nft.rs | 2 +- src/ethereum/subscriptions.rs | 2 +- src/ethereum/sync.rs | 53 +++++++++++++++++++++-------------- src/main.rs | 16 ++++++----- 7 files changed, 57 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f90277..7a1e384 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] +### Changed + +- Save latest ethereum block number to database instead of file. + +### Deprecated + +- Reading ethereum block number from `current_block` file. + ### Removed - Disabled post tokenization (can be re-enabled with `ethereum-extras` feature). diff --git a/mitra-cli/src/cli.rs b/mitra-cli/src/cli.rs index 3e079fc..93ff8d2 100644 --- a/mitra-cli/src/cli.rs +++ b/mitra-cli/src/cli.rs @@ -493,10 +493,10 @@ pub struct UpdateCurrentBlock { impl UpdateCurrentBlock { pub async fn execute( &self, - config: &Config, - _db_client: &impl DatabaseClient, + _config: &Config, + db_client: &impl DatabaseClient, ) -> Result<(), Error> { - save_current_block_number(&config.storage_dir, self.number)?; + save_current_block_number(db_client, self.number).await?; println!("current block updated"); Ok(()) } diff --git a/src/ethereum/contracts.rs b/src/ethereum/contracts.rs index 9a66b45..4651c35 100644 --- a/src/ethereum/contracts.rs +++ b/src/ethereum/contracts.rs @@ -10,6 +10,7 @@ use web3::{ use mitra_config::EthereumConfig; +use crate::database::DatabaseClient; use super::api::connect; use super::errors::EthereumError; use super::sync::{ @@ -98,6 +99,7 @@ pub struct Blockchain { } pub async fn get_contracts( + db_client: &impl DatabaseClient, config: &EthereumConfig, storage_dir: &Path, ) -> Result { @@ -181,13 +183,12 @@ pub async fn get_contracts( maybe_subscription_adapter = Some(subscription_adapter); }; - let current_block = get_current_block_number(&web3, storage_dir).await?; + let current_block = get_current_block_number(db_client, &web3, storage_dir).await?; let sync_state = SyncState::new( current_block, sync_targets, config.chain_sync_step, config.chain_reorg_max_depth, - storage_dir, ); let contract_set = ContractSet { diff --git a/src/ethereum/nft.rs b/src/ethereum/nft.rs index 242eb6c..9e98a4d 100644 --- a/src/ethereum/nft.rs +++ b/src/ethereum/nft.rs @@ -142,7 +142,7 @@ pub async fn process_nft_events( TOKEN_WAITLIST_MAP_PROPERTY_NAME, &token_waitlist_map, ).await?; - sync_state.update(&contract.address(), to_block)?; + sync_state.update(db_client, &contract.address(), to_block).await?; Ok(()) } diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index 0e9a7d5..9b70597 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -253,7 +253,7 @@ pub async fn check_ethereum_subscriptions( }; }; - sync_state.update(&contract.address(), to_block)?; + sync_state.update(db_client, &contract.address(), to_block).await?; Ok(()) } diff --git a/src/ethereum/sync.rs b/src/ethereum/sync.rs index 4dbfb37..298d602 100644 --- a/src/ethereum/sync.rs +++ b/src/ethereum/sync.rs @@ -1,29 +1,44 @@ use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::Path; use web3::{api::Web3, transports::Http, types::Address}; -use mitra_utils::files::write_file; - +use crate::database::DatabaseClient; +use crate::models::properties::queries::{ + get_internal_property, + set_internal_property, +}; use super::errors::EthereumError; const BLOCK_NUMBER_FILE_NAME: &str = "current_block"; +const CURRENT_BLOCK_PROPERTY_NAME: &str = "ethereum_current_block"; -pub fn save_current_block_number( - storage_dir: &Path, +pub async fn save_current_block_number( + db_client: &impl DatabaseClient, block_number: u64, ) -> Result<(), EthereumError> { - let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); - write_file(block_number.to_string().as_bytes(), &file_path) - .map_err(|_| EthereumError::OtherError("failed to save current block"))?; + set_internal_property( + db_client, + CURRENT_BLOCK_PROPERTY_NAME, + &block_number, + ).await?; Ok(()) } -fn read_current_block_number( +async fn read_current_block_number( + db_client: &impl DatabaseClient, storage_dir: &Path, ) -> Result, EthereumError> { + let maybe_block_number = get_internal_property( + db_client, + CURRENT_BLOCK_PROPERTY_NAME, + ).await?; + if maybe_block_number.is_some() { + return Ok(maybe_block_number); + }; + // Try to read from file if internal property is not set let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); - let block_number = if file_path.exists() { + let maybe_block_number = if file_path.exists() { let block_number: u64 = std::fs::read_to_string(&file_path) .map_err(|_| EthereumError::OtherError("failed to read current block"))? .parse() @@ -32,19 +47,20 @@ fn read_current_block_number( } else { None }; - Ok(block_number) + Ok(maybe_block_number) } pub async fn get_current_block_number( + db_client: &impl DatabaseClient, web3: &Web3, storage_dir: &Path, ) -> Result { - let block_number = match read_current_block_number(storage_dir)? { + let block_number = match read_current_block_number(db_client, storage_dir).await? { Some(block_number) => block_number, None => { // Save block number when connecting to the node for the first time let block_number = web3.eth().block_number().await?.as_u64(); - save_current_block_number(storage_dir, block_number)?; + save_current_block_number(db_client, block_number).await?; block_number }, }; @@ -57,8 +73,6 @@ pub struct SyncState { contracts: HashMap, sync_step: u64, reorg_max_depth: u64, - - storage_dir: PathBuf, } impl SyncState { @@ -67,7 +81,6 @@ impl SyncState { contracts: Vec
, sync_step: u64, reorg_max_depth: u64, - storage_dir: &Path, ) -> Self { log::info!("current block is {}", current_block); let mut contract_map = HashMap::new(); @@ -79,7 +92,6 @@ impl SyncState { contracts: contract_map, sync_step, reorg_max_depth, - storage_dir: storage_dir.to_path_buf(), } } @@ -109,8 +121,9 @@ impl SyncState { true } - pub fn update( + pub async fn update( &mut self, + db_client: &impl DatabaseClient, contract_address: &Address, block_number: u64, ) -> Result<(), EthereumError> { @@ -118,7 +131,7 @@ impl SyncState { if let Some(min_value) = self.contracts.values().min().copied() { if min_value > self.current_block { self.current_block = min_value; - save_current_block_number(&self.storage_dir, self.current_block)?; + save_current_block_number(db_client, self.current_block).await?; log::info!("synced to block {}", self.current_block); }; }; @@ -138,7 +151,6 @@ mod tests { vec![address.clone()], 100, 10, - Path::new("test"), ); let (from_block, to_block) = sync_state.get_scan_range(&address, 555); assert_eq!(from_block, 0); @@ -153,7 +165,6 @@ mod tests { vec![address.clone()], 100, 10, - Path::new("test"), ); let (from_block, to_block) = sync_state.get_scan_range(&address, 555); assert_eq!(from_block, 500); diff --git a/src/main.rs b/src/main.rs index c15ca9c..b7ab133 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,21 +54,16 @@ async fn main() -> std::io::Result<()> { let db_pool = create_pool(&config.database_url); let mut db_client = get_database_client(&db_pool).await.unwrap(); apply_migrations(&mut db_client).await; - std::mem::drop(db_client); + if !config.media_dir().exists() { std::fs::create_dir(config.media_dir()) .expect("failed to create media directory"); }; - log::info!( - "app initialized; version {}, environment = '{:?}'", - MITRA_VERSION, - config.environment, - ); let maybe_blockchain = if let Some(blockchain_config) = config.blockchain() { if let Some(ethereum_config) = blockchain_config.ethereum_config() { // Create blockchain interface - get_contracts(ethereum_config, &config.storage_dir).await + get_contracts(&**db_client, ethereum_config, &config.storage_dir).await .map(Some).unwrap() } else { None @@ -79,6 +74,13 @@ async fn main() -> std::io::Result<()> { let maybe_contract_set = maybe_blockchain.clone() .map(|blockchain| blockchain.contract_set); + std::mem::drop(db_client); + log::info!( + "app initialized; version {}, environment = '{:?}'", + MITRA_VERSION, + config.environment, + ); + scheduler::run(config.clone(), maybe_blockchain, db_pool.clone()); log::info!("scheduler started");