From 90aac4d16246cc67a1a0eeb20c4087cf7c265970 Mon Sep 17 00:00:00 2001 From: silverpill Date: Wed, 29 Sep 2021 01:03:47 +0000 Subject: [PATCH] Stop waiting for token after 10 minutes --- src/ethereum/nft.rs | 36 ++++++++++++++++++++++++++++++++---- src/models/posts/queries.rs | 14 ++++++++------ src/scheduler.rs | 11 ++++++++++- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/ethereum/nft.rs b/src/ethereum/nft.rs index d66a7b5..c9d51d8 100644 --- a/src/ethereum/nft.rs +++ b/src/ethereum/nft.rs @@ -1,7 +1,10 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::fs; use std::path::PathBuf; +use chrono::{DateTime, Utc}; +use uuid::Uuid; use web3::{ api::Web3, contract::{Contract, Options}, @@ -17,7 +20,7 @@ use crate::ipfs::utils::parse_ipfs_url; use crate::models::posts::queries::{ get_post_by_ipfs_cid, update_post, - is_waiting_for_token, + get_token_waitlist, }; use super::api::connect; use super::utils::{ @@ -27,6 +30,7 @@ use super::utils::{ pub const COLLECTIBLE: &str = "Collectible"; pub const MINTER: &str = "Minter"; +const TOKEN_WAIT_TIME: i64 = 10; // in minutes #[derive(thiserror::Error, Debug)] pub enum EthereumError { @@ -120,11 +124,30 @@ pub async fn process_events( web3: &Web3, contract: &Contract, db_pool: &Pool, + token_waitlist_map: &mut HashMap>, ) -> Result<(), EthereumError> { let db_client = &**get_database_client(&db_pool).await?; - if !is_waiting_for_token(db_client).await? { - return Ok(()); + + // Create/update token waitlist map + let token_waitlist = get_token_waitlist(db_client).await?; + for post_id in token_waitlist { + if !token_waitlist_map.contains_key(&post_id) { + token_waitlist_map.insert(post_id, Utc::now()); + } } + let token_waitlist_active_count = token_waitlist_map.values() + .filter(|waiting_since| { + let duration = Utc::now() - **waiting_since; + duration.num_minutes() < TOKEN_WAIT_TIME + }) + .count(); + if token_waitlist_active_count == 0 { + return Ok(()) + } + log::info!( + "{} posts are waiting for token to be minted", + token_waitlist_active_count, + ); // Search for Transfer events let event_abi_params = vec![ @@ -191,8 +214,12 @@ pub async fn process_events( .map_err(|_| EthereumError::TokenUriParsingError)?; let mut post = match get_post_by_ipfs_cid(db_client, &ipfs_cid).await { Ok(post) => post, + Err(DatabaseError::NotFound(_)) => { + // Post was deleted + continue; + }, Err(err) => { - // Post not found or some other error + // Unexpected error log::error!("{}", err); continue; }, @@ -204,6 +231,7 @@ pub async fn process_events( post.token_id = Some(token_id); post.token_tx_id = Some(tx_id); update_post(db_client, &post).await?; + token_waitlist_map.remove(&post.id); }; }; }; diff --git a/src/models/posts/queries.rs b/src/models/posts/queries.rs index 6ea6d1c..cb9c2bc 100644 --- a/src/models/posts/queries.rs +++ b/src/models/posts/queries.rs @@ -277,19 +277,21 @@ pub async fn update_reply_count( Ok(()) } -pub async fn is_waiting_for_token( +pub async fn get_token_waitlist( db_client: &impl GenericClient, -) -> Result { - let row = db_client.query_one( +) -> Result, DatabaseError> { + let rows = db_client.query( " - SELECT count(post) > 0 AS is_waiting + SELECT post.id FROM post WHERE ipfs_cid IS NOT NULL AND token_id IS NULL ", &[], ).await?; - let is_waiting: bool = row.try_get("is_waiting")?; - Ok(is_waiting) + let waitlist: Vec = rows.iter() + .map(|row| row.try_get("id")) + .collect::>()?; + Ok(waitlist) } /// Deletes post from database and returns list of orphaned files. diff --git a/src/scheduler.rs b/src/scheduler.rs index f45b9dd..3235823 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,5 +1,9 @@ +use std::collections::HashMap; use std::time::Duration; +use chrono::{DateTime, Utc}; +use uuid::Uuid; + use crate::config::Config; use crate::database::Pool; use crate::ethereum::nft::{get_nft_contract, process_events}; @@ -11,11 +15,16 @@ pub fn run(config: Config, db_pool: Pool) -> () { let web3_contract = get_nft_contract(&config).await .map_err(|err| log::error!("{}", err)) .ok(); + let mut token_waitlist_map: HashMap> = HashMap::new(); loop { interval.tick().await; // Process events only if contract is properly configured if let Some((web3, contract)) = web3_contract.as_ref() { - process_events(web3, contract, &db_pool).await.unwrap_or_else(|err| { + process_events( + web3, contract, + &db_pool, + &mut token_waitlist_map, + ).await.unwrap_or_else(|err| { log::error!("{}", err); }); }