Save latest ethereum block number to database instead of file

This commit is contained in:
silverpill 2023-03-12 01:53:59 +00:00
parent 0e9879bacb
commit 2b6de17fb9
7 changed files with 57 additions and 35 deletions

View file

@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased] ## [Unreleased]
### Changed
- Save latest ethereum block number to database instead of file.
### Deprecated
- Reading ethereum block number from `current_block` file.
### Removed ### Removed
- Disabled post tokenization (can be re-enabled with `ethereum-extras` feature). - Disabled post tokenization (can be re-enabled with `ethereum-extras` feature).

View file

@ -493,10 +493,10 @@ pub struct UpdateCurrentBlock {
impl UpdateCurrentBlock { impl UpdateCurrentBlock {
pub async fn execute( pub async fn execute(
&self, &self,
config: &Config, _config: &Config,
_db_client: &impl DatabaseClient, db_client: &impl DatabaseClient,
) -> Result<(), Error> { ) -> Result<(), Error> {
save_current_block_number(&config.storage_dir, self.number)?; save_current_block_number(db_client, self.number).await?;
println!("current block updated"); println!("current block updated");
Ok(()) Ok(())
} }

View file

@ -10,6 +10,7 @@ use web3::{
use mitra_config::EthereumConfig; use mitra_config::EthereumConfig;
use crate::database::DatabaseClient;
use super::api::connect; use super::api::connect;
use super::errors::EthereumError; use super::errors::EthereumError;
use super::sync::{ use super::sync::{
@ -98,6 +99,7 @@ pub struct Blockchain {
} }
pub async fn get_contracts( pub async fn get_contracts(
db_client: &impl DatabaseClient,
config: &EthereumConfig, config: &EthereumConfig,
storage_dir: &Path, storage_dir: &Path,
) -> Result<Blockchain, EthereumError> { ) -> Result<Blockchain, EthereumError> {
@ -181,13 +183,12 @@ pub async fn get_contracts(
maybe_subscription_adapter = Some(subscription_adapter); maybe_subscription_adapter = Some(subscription_adapter);
}; };
let current_block = get_current_block_number(&web3, storage_dir).await?; let current_block = get_current_block_number(db_client, &web3, storage_dir).await?;
let sync_state = SyncState::new( let sync_state = SyncState::new(
current_block, current_block,
sync_targets, sync_targets,
config.chain_sync_step, config.chain_sync_step,
config.chain_reorg_max_depth, config.chain_reorg_max_depth,
storage_dir,
); );
let contract_set = ContractSet { let contract_set = ContractSet {

View file

@ -142,7 +142,7 @@ pub async fn process_nft_events(
TOKEN_WAITLIST_MAP_PROPERTY_NAME, TOKEN_WAITLIST_MAP_PROPERTY_NAME,
&token_waitlist_map, &token_waitlist_map,
).await?; ).await?;
sync_state.update(&contract.address(), to_block)?; sync_state.update(db_client, &contract.address(), to_block).await?;
Ok(()) Ok(())
} }

View file

@ -253,7 +253,7 @@ pub async fn check_ethereum_subscriptions(
}; };
}; };
sync_state.update(&contract.address(), to_block)?; sync_state.update(db_client, &contract.address(), to_block).await?;
Ok(()) Ok(())
} }

View file

@ -1,29 +1,44 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path, PathBuf}; use std::path::Path;
use web3::{api::Web3, transports::Http, types::Address}; use web3::{api::Web3, transports::Http, types::Address};
use mitra_utils::files::write_file; use crate::database::DatabaseClient;
use crate::models::properties::queries::{
get_internal_property,
set_internal_property,
};
use super::errors::EthereumError; use super::errors::EthereumError;
const BLOCK_NUMBER_FILE_NAME: &str = "current_block"; const BLOCK_NUMBER_FILE_NAME: &str = "current_block";
const CURRENT_BLOCK_PROPERTY_NAME: &str = "ethereum_current_block";
pub fn save_current_block_number( pub async fn save_current_block_number(
storage_dir: &Path, db_client: &impl DatabaseClient,
block_number: u64, block_number: u64,
) -> Result<(), EthereumError> { ) -> Result<(), EthereumError> {
let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); set_internal_property(
write_file(block_number.to_string().as_bytes(), &file_path) db_client,
.map_err(|_| EthereumError::OtherError("failed to save current block"))?; CURRENT_BLOCK_PROPERTY_NAME,
&block_number,
).await?;
Ok(()) Ok(())
} }
fn read_current_block_number( async fn read_current_block_number(
db_client: &impl DatabaseClient,
storage_dir: &Path, storage_dir: &Path,
) -> Result<Option<u64>, EthereumError> { ) -> Result<Option<u64>, EthereumError> {
let maybe_block_number = get_internal_property(
db_client,
CURRENT_BLOCK_PROPERTY_NAME,
).await?;
if maybe_block_number.is_some() {
return Ok(maybe_block_number);
};
// Try to read from file if internal property is not set
let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME); let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME);
let block_number = if file_path.exists() { let maybe_block_number = if file_path.exists() {
let block_number: u64 = std::fs::read_to_string(&file_path) let block_number: u64 = std::fs::read_to_string(&file_path)
.map_err(|_| EthereumError::OtherError("failed to read current block"))? .map_err(|_| EthereumError::OtherError("failed to read current block"))?
.parse() .parse()
@ -32,19 +47,20 @@ fn read_current_block_number(
} else { } else {
None None
}; };
Ok(block_number) Ok(maybe_block_number)
} }
pub async fn get_current_block_number( pub async fn get_current_block_number(
db_client: &impl DatabaseClient,
web3: &Web3<Http>, web3: &Web3<Http>,
storage_dir: &Path, storage_dir: &Path,
) -> Result<u64, EthereumError> { ) -> Result<u64, EthereumError> {
let block_number = match read_current_block_number(storage_dir)? { let block_number = match read_current_block_number(db_client, storage_dir).await? {
Some(block_number) => block_number, Some(block_number) => block_number,
None => { None => {
// Save block number when connecting to the node for the first time // Save block number when connecting to the node for the first time
let block_number = web3.eth().block_number().await?.as_u64(); let block_number = web3.eth().block_number().await?.as_u64();
save_current_block_number(storage_dir, block_number)?; save_current_block_number(db_client, block_number).await?;
block_number block_number
}, },
}; };
@ -57,8 +73,6 @@ pub struct SyncState {
contracts: HashMap<Address, u64>, contracts: HashMap<Address, u64>,
sync_step: u64, sync_step: u64,
reorg_max_depth: u64, reorg_max_depth: u64,
storage_dir: PathBuf,
} }
impl SyncState { impl SyncState {
@ -67,7 +81,6 @@ impl SyncState {
contracts: Vec<Address>, contracts: Vec<Address>,
sync_step: u64, sync_step: u64,
reorg_max_depth: u64, reorg_max_depth: u64,
storage_dir: &Path,
) -> Self { ) -> Self {
log::info!("current block is {}", current_block); log::info!("current block is {}", current_block);
let mut contract_map = HashMap::new(); let mut contract_map = HashMap::new();
@ -79,7 +92,6 @@ impl SyncState {
contracts: contract_map, contracts: contract_map,
sync_step, sync_step,
reorg_max_depth, reorg_max_depth,
storage_dir: storage_dir.to_path_buf(),
} }
} }
@ -109,8 +121,9 @@ impl SyncState {
true true
} }
pub fn update( pub async fn update(
&mut self, &mut self,
db_client: &impl DatabaseClient,
contract_address: &Address, contract_address: &Address,
block_number: u64, block_number: u64,
) -> Result<(), EthereumError> { ) -> Result<(), EthereumError> {
@ -118,7 +131,7 @@ impl SyncState {
if let Some(min_value) = self.contracts.values().min().copied() { if let Some(min_value) = self.contracts.values().min().copied() {
if min_value > self.current_block { if min_value > self.current_block {
self.current_block = min_value; self.current_block = min_value;
save_current_block_number(&self.storage_dir, self.current_block)?; save_current_block_number(db_client, self.current_block).await?;
log::info!("synced to block {}", self.current_block); log::info!("synced to block {}", self.current_block);
}; };
}; };
@ -138,7 +151,6 @@ mod tests {
vec![address.clone()], vec![address.clone()],
100, 100,
10, 10,
Path::new("test"),
); );
let (from_block, to_block) = sync_state.get_scan_range(&address, 555); let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 0); assert_eq!(from_block, 0);
@ -153,7 +165,6 @@ mod tests {
vec![address.clone()], vec![address.clone()],
100, 100,
10, 10,
Path::new("test"),
); );
let (from_block, to_block) = sync_state.get_scan_range(&address, 555); let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 500); assert_eq!(from_block, 500);

View file

@ -54,21 +54,16 @@ async fn main() -> std::io::Result<()> {
let db_pool = create_pool(&config.database_url); let db_pool = create_pool(&config.database_url);
let mut db_client = get_database_client(&db_pool).await.unwrap(); let mut db_client = get_database_client(&db_pool).await.unwrap();
apply_migrations(&mut db_client).await; apply_migrations(&mut db_client).await;
std::mem::drop(db_client);
if !config.media_dir().exists() { if !config.media_dir().exists() {
std::fs::create_dir(config.media_dir()) std::fs::create_dir(config.media_dir())
.expect("failed to create media directory"); .expect("failed to create media directory");
}; };
log::info!(
"app initialized; version {}, environment = '{:?}'",
MITRA_VERSION,
config.environment,
);
let maybe_blockchain = if let Some(blockchain_config) = config.blockchain() { let maybe_blockchain = if let Some(blockchain_config) = config.blockchain() {
if let Some(ethereum_config) = blockchain_config.ethereum_config() { if let Some(ethereum_config) = blockchain_config.ethereum_config() {
// Create blockchain interface // Create blockchain interface
get_contracts(ethereum_config, &config.storage_dir).await get_contracts(&**db_client, ethereum_config, &config.storage_dir).await
.map(Some).unwrap() .map(Some).unwrap()
} else { } else {
None None
@ -79,6 +74,13 @@ async fn main() -> std::io::Result<()> {
let maybe_contract_set = maybe_blockchain.clone() let maybe_contract_set = maybe_blockchain.clone()
.map(|blockchain| blockchain.contract_set); .map(|blockchain| blockchain.contract_set);
std::mem::drop(db_client);
log::info!(
"app initialized; version {}, environment = '{:?}'",
MITRA_VERSION,
config.environment,
);
scheduler::run(config.clone(), maybe_blockchain, db_pool.clone()); scheduler::run(config.clone(), maybe_blockchain, db_pool.clone());
log::info!("scheduler started"); log::info!("scheduler started");