Switch to incremental blockchain synchronization

This commit is contained in:
silverpill 2022-06-24 23:27:09 +00:00
parent b618c5b31f
commit 2f532307a1
6 changed files with 123 additions and 31 deletions

View file

@ -12,7 +12,7 @@ use super::api::connect;
use super::errors::EthereumError;
use super::sync::{
get_current_block_number,
CHAIN_REORG_MAX_DEPTH,
SyncState,
};
use super::utils::parse_address;
@ -49,18 +49,22 @@ fn load_abi(
#[derive(Clone)]
pub struct ContractSet {
pub web3: Web3<Http>,
// Last synced block
pub current_block: u64,
pub adapter: Contract<Http>,
pub collectible: Contract<Http>,
pub subscription: Contract<Http>,
}
#[derive(Clone)]
pub struct Blockchain {
pub contract_set: ContractSet,
pub sync_state: SyncState,
}
pub async fn get_contracts(
config: &BlockchainConfig,
storage_dir: &Path,
) -> Result<ContractSet, EthereumError> {
) -> Result<Blockchain, EthereumError> {
let web3 = connect(&config.api_url)?;
let chain_id = web3.eth().chain_id().await?;
if chain_id != config.ethereum_chain_id().into() {
@ -100,15 +104,17 @@ pub async fn get_contracts(
let current_block = get_current_block_number(&web3, storage_dir).await?;
log::info!("current block is {}", current_block);
// Take reorgs into account
let current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH);
let sync_state = SyncState::new(
current_block,
vec![collectible.address(), subscription.address()],
storage_dir,
);
let contract_set = ContractSet {
web3,
current_block,
adapter,
collectible,
subscription,
};
Ok(contract_set)
Ok(Blockchain { contract_set, sync_state })
}

View file

@ -23,6 +23,7 @@ use crate::models::posts::queries::{
};
use super::errors::EthereumError;
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
use super::sync::{save_current_block_number, SyncState};
use super::utils::parse_address;
const TOKEN_WAIT_TIME: i64 = 10; // in minutes
@ -32,7 +33,7 @@ const TOKEN_WAIT_TIME: i64 = 10; // in minutes
pub async fn process_nft_events(
web3: &Web3<Http>,
contract: &Contract<Http>,
from_block: u64,
sync_state: &mut SyncState,
db_pool: &Pool,
token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>,
) -> Result<(), EthereumError> {
@ -51,20 +52,28 @@ pub async fn process_nft_events(
duration.num_minutes() < TOKEN_WAIT_TIME
})
.count();
if token_waitlist_active_count == 0 {
return Ok(())
if token_waitlist_active_count > 0 {
log::info!(
"{} posts are waiting for confirmation of tokenization tx",
token_waitlist_active_count,
);
} else if !sync_state.is_out_of_sync(&contract.address()) {
// Don't scan blockchain if already in sync and waitlist is empty
return Ok(());
};
log::info!(
"{} posts are waiting for confirmation of tokenization tx",
token_waitlist_active_count,
);
// Search for Transfer events
let event_abi = contract.abi().event("Transfer")?;
let (from_block, to_block) = sync_state.get_scan_range(&contract.address());
let to_block = std::cmp::min(
web3.eth().block_number().await?.as_u64(),
to_block,
);
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Number(from_block.into()))
.to_block(BlockNumber::Number(to_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {
@ -113,6 +122,10 @@ pub async fn process_nft_events(
};
};
};
if sync_state.update(&contract.address(), to_block) {
save_current_block_number(&sync_state.storage_dir, sync_state.current_block)?;
};
Ok(())
}

View file

@ -26,6 +26,7 @@ use crate::models::subscriptions::queries::{
use crate::models::users::queries::get_user_by_wallet_address;
use super::errors::EthereumError;
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
use super::sync::{save_current_block_number, SyncState};
use super::utils::{address_to_string, parse_address};
const ETHEREUM: Currency = Currency::Ethereum;
@ -42,15 +43,21 @@ fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
pub async fn check_subscriptions(
web3: &Web3<Http>,
contract: &Contract<Http>,
from_block: u64,
sync_state: &mut SyncState,
db_pool: &Pool,
) -> Result<(), EthereumError> {
let db_client = &mut **get_database_client(db_pool).await?;
let event_abi = contract.abi().event("UpdateSubscription")?;
let (from_block, to_block) = sync_state.get_scan_range(&contract.address());
let to_block = std::cmp::min(
web3.eth().block_number().await?.as_u64(),
to_block,
);
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Number(from_block.into()))
.to_block(BlockNumber::Number(to_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {
@ -180,6 +187,10 @@ pub async fn check_subscriptions(
subscription.recipient_id,
);
};
if sync_state.update(&contract.address(), to_block) {
save_current_block_number(&sync_state.storage_dir, sync_state.current_block)?;
};
Ok(())
}

View file

@ -1,12 +1,14 @@
use std::path::Path;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use web3::{api::Web3, transports::Http};
use web3::{api::Web3, transports::Http, types::Address};
use crate::utils::files::write_file;
use super::errors::EthereumError;
const BLOCK_NUMBER_FILE_NAME: &str = "current_block";
pub const CHAIN_REORG_MAX_DEPTH: u64 = 100;
const CHAIN_REORG_MAX_DEPTH: u64 = 100;
const CHAIN_SYNC_STEP: u64 = 1000;
pub fn save_current_block_number(
storage_dir: &Path,
@ -49,3 +51,61 @@ pub async fn get_current_block_number(
};
Ok(block_number)
}
#[derive(Clone)]
pub struct SyncState {
pub current_block: u64,
contracts: HashMap<Address, u64>,
pub storage_dir: PathBuf,
}
impl SyncState {
pub fn new(
current_block: u64,
contracts: Vec<Address>,
storage_dir: &Path,
) -> Self {
let mut contract_map = HashMap::new();
for address in contracts {
contract_map.insert(address, current_block);
};
Self {
current_block,
contracts: contract_map,
storage_dir: storage_dir.to_path_buf(),
}
}
pub fn get_scan_range(&self, contract_address: &Address) -> (u64, u64) {
let current_block = self.contracts[contract_address];
// Take reorgs into account
let safe_current_block = current_block.saturating_sub(CHAIN_REORG_MAX_DEPTH);
(safe_current_block, safe_current_block + CHAIN_SYNC_STEP)
}
pub fn is_out_of_sync(&self, contract_address: &Address) -> bool {
if let Some(max_value) = self.contracts.values().max().copied() {
if self.contracts[contract_address] == max_value {
return false;
};
};
true
}
pub fn update(
&mut self,
contract_address: &Address,
block_number: u64,
) -> bool {
self.contracts.insert(*contract_address, block_number);
if let Some(min_value) = self.contracts.values().min().copied() {
if min_value > self.current_block {
self.current_block = min_value;
log::info!("synced to block {}", self.current_block);
return true;
};
};
false
}
}

View file

@ -58,8 +58,10 @@ async fn main() -> std::io::Result<()> {
} else {
None
};
let maybe_contract_set = maybe_blockchain.clone()
.map(|blockchain| blockchain.contract_set);
scheduler::run(config.clone(), maybe_blockchain.clone(), db_pool.clone());
scheduler::run(config.clone(), maybe_blockchain, db_pool.clone());
log::info!("scheduler started");
let http_socket_addr = format!(
@ -116,7 +118,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::JsonConfig::default().limit(MAX_UPLOAD_SIZE))
.app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(db_pool.clone()))
.app_data(web::Data::new(maybe_blockchain.clone()))
.app_data(web::Data::new(maybe_contract_set.clone()))
.app_data(web::Data::clone(&inbox_mutex))
.service(actix_files::Files::new(
"/media",

View file

@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::config::Config;
use crate::database::Pool;
use crate::ethereum::contracts::ContractSet;
use crate::ethereum::contracts::Blockchain;
use crate::ethereum::nft::process_nft_events;
use crate::ethereum::subscriptions::check_subscriptions;
@ -39,7 +39,7 @@ fn is_task_ready(last_run: &Option<DateTime<Utc>>, period: i64) -> bool {
pub fn run(
_config: Config,
maybe_contract_set: Option<ContractSet>,
mut maybe_blockchain: Option<Blockchain>,
db_pool: Pool,
) -> () {
tokio::spawn(async move {
@ -58,23 +58,23 @@ pub fn run(
};
let task_result = match task {
Task::NftMonitor => {
if let Some(contract_set) = maybe_contract_set.as_ref() {
if let Some(blockchain) = maybe_blockchain.as_mut() {
// Monitor events only if ethereum integration is enabled
process_nft_events(
&contract_set.web3,
&contract_set.collectible,
contract_set.current_block,
&blockchain.contract_set.web3,
&blockchain.contract_set.collectible,
&mut blockchain.sync_state,
&db_pool,
&mut token_waitlist_map,
).await.map_err(Error::from)
} else { Ok(()) }
},
Task::SubscriptionMonitor => {
if let Some(contract_set) = maybe_contract_set.as_ref() {
if let Some(blockchain) = maybe_blockchain.as_mut() {
check_subscriptions(
&contract_set.web3,
&contract_set.subscription,
contract_set.current_block,
&blockchain.contract_set.web3,
&blockchain.contract_set.subscription,
&mut blockchain.sync_state,
&db_pool,
).await.map_err(Error::from)
} else { Ok(()) }