Use initial block number as lower bound when making log query

This commit is contained in:
silverpill 2022-06-14 18:55:04 +00:00
parent 91309e1a7b
commit 0148343655
8 changed files with 75 additions and 4 deletions

View file

@ -10,6 +10,10 @@ use web3::{
use crate::config::BlockchainConfig;
use super::api::connect;
use super::errors::EthereumError;
use super::sync::{
get_current_block_number,
CHAIN_REORG_MAX_DEPTH,
};
use super::utils::parse_address;
pub const ADAPTER: &str = "IAdapter";
@ -45,6 +49,8 @@ pub fn load_abi(
pub struct ContractSet {
pub web3: Web3<Http>,
// Last synced block
pub current_block: u64,
#[allow(dead_code)]
pub adapter: Contract<Http>,
@ -55,6 +61,7 @@ pub struct ContractSet {
pub async fn get_contracts(
config: &BlockchainConfig,
storage_dir: &Path,
) -> Result<ContractSet, EthereumError> {
let web3 = connect(&config.api_url)?;
let chain_id = web3.eth().chain_id().await?;
@ -93,8 +100,14 @@ pub async fn get_contracts(
)?;
log::info!("subscription contract address is {:?}", subscription.address());
let current_block = get_current_block_number(&web3, storage_dir).await?;
log::info!("current block is {}", current_block);
// Take reorgs into account
let current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH);
let contract_set = ContractSet {
web3,
current_block,
adapter,
collectible,
subscription,

View file

@ -30,5 +30,8 @@ pub enum EthereumError {
DatabaseError(#[from] DatabaseError),
#[error("signature error")]
SigError(#[from] SignatureError),
SignatureError(#[from] SignatureError),
#[error("{0}")]
OtherError(&'static str),
}

View file

@ -7,4 +7,5 @@ pub mod identity;
pub mod nft;
pub mod signatures;
pub mod subscriptions;
mod sync;
pub mod utils;

View file

@ -32,6 +32,7 @@ const TOKEN_WAIT_TIME: i64 = 10; // in minutes
pub async fn process_nft_events(
web3: &Web3<Http>,
contract: &Contract<Http>,
from_block: u64,
db_pool: &Pool,
token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>,
) -> Result<(), EthereumError> {
@ -63,7 +64,7 @@ pub async fn process_nft_events(
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Earliest)
.from_block(BlockNumber::Number(from_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {

View file

@ -42,6 +42,7 @@ fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
pub async fn check_subscriptions(
web3: &Web3<Http>,
contract: &Contract<Http>,
from_block: u64,
db_pool: &Pool,
) -> Result<(), EthereumError> {
let db_client = &mut **get_database_client(db_pool).await?;
@ -49,7 +50,7 @@ pub async fn check_subscriptions(
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Earliest)
.from_block(BlockNumber::Number(from_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {

50
src/ethereum/sync.rs Normal file
View file

@ -0,0 +1,50 @@
use std::path::Path;
use web3::{api::Web3, transports::Http};
use crate::utils::files::write_file;
use super::errors::EthereumError;
const BLOCK_NUMBER_FILE_NAME: &str = "current_block";
pub const CHAIN_REORG_MAX_DEPTH: u64 = 100;
fn save_current_block_number(
storage_dir: &Path,
block_number: u64,
) -> Result<(), EthereumError> {
let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME);
write_file(block_number.to_string().as_bytes(), &file_path)
.map_err(|_| EthereumError::OtherError("failed to save current block"))?;
Ok(())
}
fn read_current_block_number(
storage_dir: &Path,
) -> Result<Option<u64>, EthereumError> {
let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME);
let block_number = if file_path.exists() {
let block_number: u64 = std::fs::read_to_string(&file_path)
.map_err(|_| EthereumError::OtherError("failed to read current block"))?
.parse()
.map_err(|_| EthereumError::OtherError("failed to parse block number"))?;
Some(block_number)
} else {
None
};
Ok(block_number)
}
pub async fn get_current_block_number(
web3: &Web3<Http>,
storage_dir: &Path,
) -> Result<u64, EthereumError> {
let block_number = match read_current_block_number(storage_dir)? {
Some(block_number) => block_number,
None => {
let block_number = web3.eth().block_number().await?.as_u64();
save_current_block_number(storage_dir, block_number)?;
block_number
},
};
Ok(block_number)
}

View file

@ -53,7 +53,7 @@ async fn main() -> std::io::Result<()> {
let maybe_contract_set = if let Some(blockchain_config) = &config.blockchain {
// Create blockchain interface
get_contracts(blockchain_config).await
get_contracts(blockchain_config, &config.storage_dir).await
.map_err(|err| log::error!("{}", err))
.ok()
} else {

View file

@ -26,6 +26,7 @@ pub fn run(
process_nft_events(
&contract_set.web3,
&contract_set.collectible,
contract_set.current_block,
&db_pool,
&mut token_waitlist_map,
).await.unwrap_or_else(|err| {
@ -34,6 +35,7 @@ pub fn run(
check_subscriptions(
&contract_set.web3,
&contract_set.subscription,
contract_set.current_block,
&db_pool,
).await.unwrap_or_else(|err| {
log::error!("{}", err);