From a095ea70619289b27d0e9a97a4c7de3f18308096 Mon Sep 17 00:00:00 2001 From: silverpill Date: Sun, 30 Jan 2022 20:58:05 +0000 Subject: [PATCH] Add periodic task that monitors subscription events --- src/ethereum/contracts.rs | 17 ++++++++ src/ethereum/subscriptions.rs | 80 +++++++++++++++++++++++++++++++++++ src/scheduler.rs | 8 ++++ 3 files changed, 105 insertions(+) diff --git a/src/ethereum/contracts.rs b/src/ethereum/contracts.rs index e2b1780..1a41bd3 100644 --- a/src/ethereum/contracts.rs +++ b/src/ethereum/contracts.rs @@ -13,6 +13,8 @@ use super::errors::EthereumError; use super::utils::parse_address; pub const ADAPTER: &str = "IAdapter"; +pub const SUBSCRIPTION: &str = "ISubscription"; +pub const ERC20: &str = "IERC20"; pub const ERC721: &str = "IERC721Metadata"; #[derive(thiserror::Error, Debug)] @@ -48,6 +50,7 @@ pub struct ContractSet { pub adapter: Contract, pub collectible: Contract, + pub subscription: Contract, } pub async fn get_contracts( @@ -73,10 +76,24 @@ pub async fn get_contracts( &collectibe_abi, )?; log::info!("collectible item contract address is {:?}", collectible.address()); + + let subscription_address = adapter.query( + "subscription", + (), None, Options::default(), None, + ).await?; + let subscription_abi = load_abi(&config.contract_dir, SUBSCRIPTION)?; + let subscription = Contract::from_json( + web3.eth(), + subscription_address, + &subscription_abi, + )?; + log::info!("subscription contract address is {:?}", subscription.address()); + let contract_set = ContractSet { web3, adapter, collectible, + subscription, }; Ok(contract_set) } diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index f3e14d3..e351f01 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -1,8 +1,88 @@ +use std::convert::TryInto; + +use chrono::{DateTime, TimeZone, Utc}; + +use web3::{ + api::Web3, + contract::Contract, + ethabi::RawLog, + transports::Http, + types::{Address, BlockId, BlockNumber, FilterBuilder, U256}, +}; + use crate::config::BlockchainConfig; +use crate::database::{Pool, get_database_client}; +use crate::errors::ConversionError; use super::errors::EthereumError; use super::signatures::{sign_contract_call, CallArgs, SignatureData}; use super::utils::parse_address; +/// Converts address object to lowercase hex string +fn address_to_string(address: Address) -> String { + format!("{:#x}", address) +} + +fn u256_to_date(value: U256) -> Result, ConversionError> { + let timestamp: i64 = value.try_into().map_err(|_| ConversionError)?; + let datetime = Utc.timestamp_opt(timestamp, 0) + .single() + .ok_or(ConversionError)?; + Ok(datetime) +} + +/// Search for subscription update events +pub async fn check_subscriptions( + web3: &Web3, + contract: &Contract, + db_pool: &Pool, +) -> Result<(), EthereumError> { + let _db_client = &**get_database_client(db_pool).await?; + let event_abi = contract.abi().event("UpdateSubscription")?; + let filter = FilterBuilder::default() + .address(vec![contract.address()]) + .topics(Some(vec![event_abi.signature()]), None, None, None) + .from_block(BlockNumber::Earliest) + .build(); + let logs = web3.eth().logs(filter).await?; + for log in logs { + let block_number = if let Some(block_number) = log.block_number { + block_number + } else { + // Skips logs without block number + continue; + }; + let raw_log = RawLog { + topics: log.topics.clone(), + data: log.data.clone().0, + }; + let event = event_abi.parse_log(raw_log)?; + let sender_address = event.params[0].value.clone().into_address() + .map(address_to_string) + .ok_or(EthereumError::ConversionError)?; + let recipient_address = event.params[1].value.clone().into_address() + .map(address_to_string) + .ok_or(EthereumError::ConversionError)?; + let expires_at_timestamp = event.params[2].value.clone().into_uint() + .ok_or(EthereumError::ConversionError)?; + let expires_at = u256_to_date(expires_at_timestamp) + .map_err(|_| EthereumError::ConversionError)?; + let block_id = BlockId::Number(BlockNumber::Number(block_number)); + let block_timestamp = web3.eth().block(block_id).await? + .ok_or(EthereumError::ConversionError)? + .timestamp; + let block_date = u256_to_date(block_timestamp) + .map_err(|_| EthereumError::ConversionError)?; + log::info!( + "subscription: from {0} to {1}, expires at {2}, updated at {3}", + sender_address, + recipient_address, + expires_at, + block_date, + ); + }; + Ok(()) +} + pub fn create_subscription_signature( blockchain_config: &BlockchainConfig, user_address: &str, diff --git a/src/scheduler.rs b/src/scheduler.rs index e53d5fd..9a70133 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -8,6 +8,7 @@ use crate::config::Config; use crate::database::Pool; use crate::ethereum::contracts::get_contracts; use crate::ethereum::nft::process_nft_events; +use crate::ethereum::subscriptions::check_subscriptions; pub fn run(config: Config, db_pool: Pool) -> () { actix_rt::spawn(async move { @@ -34,6 +35,13 @@ pub fn run(config: Config, db_pool: Pool) -> () { ).await.unwrap_or_else(|err| { log::error!("{}", err); }); + check_subscriptions( + &contract_set.web3, + &contract_set.subscription, + &db_pool, + ).await.unwrap_or_else(|err| { + log::error!("{}", err); + }); } } });