Add periodic task that monitors subscription events
This commit is contained in:
parent
4e0f14df4c
commit
a095ea7061
3 changed files with 105 additions and 0 deletions
|
@ -13,6 +13,8 @@ use super::errors::EthereumError;
|
||||||
use super::utils::parse_address;
|
use super::utils::parse_address;
|
||||||
|
|
||||||
pub const ADAPTER: &str = "IAdapter";
|
pub const ADAPTER: &str = "IAdapter";
|
||||||
|
pub const SUBSCRIPTION: &str = "ISubscription";
|
||||||
|
pub const ERC20: &str = "IERC20";
|
||||||
pub const ERC721: &str = "IERC721Metadata";
|
pub const ERC721: &str = "IERC721Metadata";
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
@ -48,6 +50,7 @@ pub struct ContractSet {
|
||||||
pub adapter: Contract<Http>,
|
pub adapter: Contract<Http>,
|
||||||
|
|
||||||
pub collectible: Contract<Http>,
|
pub collectible: Contract<Http>,
|
||||||
|
pub subscription: Contract<Http>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_contracts(
|
pub async fn get_contracts(
|
||||||
|
@ -73,10 +76,24 @@ pub async fn get_contracts(
|
||||||
&collectibe_abi,
|
&collectibe_abi,
|
||||||
)?;
|
)?;
|
||||||
log::info!("collectible item contract address is {:?}", collectible.address());
|
log::info!("collectible item contract address is {:?}", collectible.address());
|
||||||
|
|
||||||
|
let subscription_address = adapter.query(
|
||||||
|
"subscription",
|
||||||
|
(), None, Options::default(), None,
|
||||||
|
).await?;
|
||||||
|
let subscription_abi = load_abi(&config.contract_dir, SUBSCRIPTION)?;
|
||||||
|
let subscription = Contract::from_json(
|
||||||
|
web3.eth(),
|
||||||
|
subscription_address,
|
||||||
|
&subscription_abi,
|
||||||
|
)?;
|
||||||
|
log::info!("subscription contract address is {:?}", subscription.address());
|
||||||
|
|
||||||
let contract_set = ContractSet {
|
let contract_set = ContractSet {
|
||||||
web3,
|
web3,
|
||||||
adapter,
|
adapter,
|
||||||
collectible,
|
collectible,
|
||||||
|
subscription,
|
||||||
};
|
};
|
||||||
Ok(contract_set)
|
Ok(contract_set)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,88 @@
|
||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
use chrono::{DateTime, TimeZone, Utc};
|
||||||
|
|
||||||
|
use web3::{
|
||||||
|
api::Web3,
|
||||||
|
contract::Contract,
|
||||||
|
ethabi::RawLog,
|
||||||
|
transports::Http,
|
||||||
|
types::{Address, BlockId, BlockNumber, FilterBuilder, U256},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::config::BlockchainConfig;
|
use crate::config::BlockchainConfig;
|
||||||
|
use crate::database::{Pool, get_database_client};
|
||||||
|
use crate::errors::ConversionError;
|
||||||
use super::errors::EthereumError;
|
use super::errors::EthereumError;
|
||||||
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
|
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
|
||||||
use super::utils::parse_address;
|
use super::utils::parse_address;
|
||||||
|
|
||||||
|
/// Converts address object to lowercase hex string
|
||||||
|
fn address_to_string(address: Address) -> String {
|
||||||
|
format!("{:#x}", address)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
|
||||||
|
let timestamp: i64 = value.try_into().map_err(|_| ConversionError)?;
|
||||||
|
let datetime = Utc.timestamp_opt(timestamp, 0)
|
||||||
|
.single()
|
||||||
|
.ok_or(ConversionError)?;
|
||||||
|
Ok(datetime)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search for subscription update events
|
||||||
|
pub async fn check_subscriptions(
|
||||||
|
web3: &Web3<Http>,
|
||||||
|
contract: &Contract<Http>,
|
||||||
|
db_pool: &Pool,
|
||||||
|
) -> Result<(), EthereumError> {
|
||||||
|
let _db_client = &**get_database_client(db_pool).await?;
|
||||||
|
let event_abi = contract.abi().event("UpdateSubscription")?;
|
||||||
|
let filter = FilterBuilder::default()
|
||||||
|
.address(vec![contract.address()])
|
||||||
|
.topics(Some(vec![event_abi.signature()]), None, None, None)
|
||||||
|
.from_block(BlockNumber::Earliest)
|
||||||
|
.build();
|
||||||
|
let logs = web3.eth().logs(filter).await?;
|
||||||
|
for log in logs {
|
||||||
|
let block_number = if let Some(block_number) = log.block_number {
|
||||||
|
block_number
|
||||||
|
} else {
|
||||||
|
// Skips logs without block number
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let raw_log = RawLog {
|
||||||
|
topics: log.topics.clone(),
|
||||||
|
data: log.data.clone().0,
|
||||||
|
};
|
||||||
|
let event = event_abi.parse_log(raw_log)?;
|
||||||
|
let sender_address = event.params[0].value.clone().into_address()
|
||||||
|
.map(address_to_string)
|
||||||
|
.ok_or(EthereumError::ConversionError)?;
|
||||||
|
let recipient_address = event.params[1].value.clone().into_address()
|
||||||
|
.map(address_to_string)
|
||||||
|
.ok_or(EthereumError::ConversionError)?;
|
||||||
|
let expires_at_timestamp = event.params[2].value.clone().into_uint()
|
||||||
|
.ok_or(EthereumError::ConversionError)?;
|
||||||
|
let expires_at = u256_to_date(expires_at_timestamp)
|
||||||
|
.map_err(|_| EthereumError::ConversionError)?;
|
||||||
|
let block_id = BlockId::Number(BlockNumber::Number(block_number));
|
||||||
|
let block_timestamp = web3.eth().block(block_id).await?
|
||||||
|
.ok_or(EthereumError::ConversionError)?
|
||||||
|
.timestamp;
|
||||||
|
let block_date = u256_to_date(block_timestamp)
|
||||||
|
.map_err(|_| EthereumError::ConversionError)?;
|
||||||
|
log::info!(
|
||||||
|
"subscription: from {0} to {1}, expires at {2}, updated at {3}",
|
||||||
|
sender_address,
|
||||||
|
recipient_address,
|
||||||
|
expires_at,
|
||||||
|
block_date,
|
||||||
|
);
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_subscription_signature(
|
pub fn create_subscription_signature(
|
||||||
blockchain_config: &BlockchainConfig,
|
blockchain_config: &BlockchainConfig,
|
||||||
user_address: &str,
|
user_address: &str,
|
||||||
|
|
|
@ -8,6 +8,7 @@ use crate::config::Config;
|
||||||
use crate::database::Pool;
|
use crate::database::Pool;
|
||||||
use crate::ethereum::contracts::get_contracts;
|
use crate::ethereum::contracts::get_contracts;
|
||||||
use crate::ethereum::nft::process_nft_events;
|
use crate::ethereum::nft::process_nft_events;
|
||||||
|
use crate::ethereum::subscriptions::check_subscriptions;
|
||||||
|
|
||||||
pub fn run(config: Config, db_pool: Pool) -> () {
|
pub fn run(config: Config, db_pool: Pool) -> () {
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
|
@ -34,6 +35,13 @@ pub fn run(config: Config, db_pool: Pool) -> () {
|
||||||
).await.unwrap_or_else(|err| {
|
).await.unwrap_or_else(|err| {
|
||||||
log::error!("{}", err);
|
log::error!("{}", err);
|
||||||
});
|
});
|
||||||
|
check_subscriptions(
|
||||||
|
&contract_set.web3,
|
||||||
|
&contract_set.subscription,
|
||||||
|
&db_pool,
|
||||||
|
).await.unwrap_or_else(|err| {
|
||||||
|
log::error!("{}", err);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue