Stop waiting for token after 10 minutes
This commit is contained in:
parent
f87284b07c
commit
90aac4d162
3 changed files with 50 additions and 11 deletions
|
@ -1,7 +1,10 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use uuid::Uuid;
|
||||||
use web3::{
|
use web3::{
|
||||||
api::Web3,
|
api::Web3,
|
||||||
contract::{Contract, Options},
|
contract::{Contract, Options},
|
||||||
|
@ -17,7 +20,7 @@ use crate::ipfs::utils::parse_ipfs_url;
|
||||||
use crate::models::posts::queries::{
|
use crate::models::posts::queries::{
|
||||||
get_post_by_ipfs_cid,
|
get_post_by_ipfs_cid,
|
||||||
update_post,
|
update_post,
|
||||||
is_waiting_for_token,
|
get_token_waitlist,
|
||||||
};
|
};
|
||||||
use super::api::connect;
|
use super::api::connect;
|
||||||
use super::utils::{
|
use super::utils::{
|
||||||
|
@ -27,6 +30,7 @@ use super::utils::{
|
||||||
|
|
||||||
pub const COLLECTIBLE: &str = "Collectible";
|
pub const COLLECTIBLE: &str = "Collectible";
|
||||||
pub const MINTER: &str = "Minter";
|
pub const MINTER: &str = "Minter";
|
||||||
|
const TOKEN_WAIT_TIME: i64 = 10; // in minutes
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum EthereumError {
|
pub enum EthereumError {
|
||||||
|
@ -120,11 +124,30 @@ pub async fn process_events(
|
||||||
web3: &Web3<Http>,
|
web3: &Web3<Http>,
|
||||||
contract: &Contract<Http>,
|
contract: &Contract<Http>,
|
||||||
db_pool: &Pool,
|
db_pool: &Pool,
|
||||||
|
token_waitlist_map: &mut HashMap<Uuid, DateTime<Utc>>,
|
||||||
) -> Result<(), EthereumError> {
|
) -> Result<(), EthereumError> {
|
||||||
let db_client = &**get_database_client(&db_pool).await?;
|
let db_client = &**get_database_client(&db_pool).await?;
|
||||||
if !is_waiting_for_token(db_client).await? {
|
|
||||||
return Ok(());
|
// Create/update token waitlist map
|
||||||
|
let token_waitlist = get_token_waitlist(db_client).await?;
|
||||||
|
for post_id in token_waitlist {
|
||||||
|
if !token_waitlist_map.contains_key(&post_id) {
|
||||||
|
token_waitlist_map.insert(post_id, Utc::now());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
let token_waitlist_active_count = token_waitlist_map.values()
|
||||||
|
.filter(|waiting_since| {
|
||||||
|
let duration = Utc::now() - **waiting_since;
|
||||||
|
duration.num_minutes() < TOKEN_WAIT_TIME
|
||||||
|
})
|
||||||
|
.count();
|
||||||
|
if token_waitlist_active_count == 0 {
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
log::info!(
|
||||||
|
"{} posts are waiting for token to be minted",
|
||||||
|
token_waitlist_active_count,
|
||||||
|
);
|
||||||
|
|
||||||
// Search for Transfer events
|
// Search for Transfer events
|
||||||
let event_abi_params = vec![
|
let event_abi_params = vec![
|
||||||
|
@ -191,8 +214,12 @@ pub async fn process_events(
|
||||||
.map_err(|_| EthereumError::TokenUriParsingError)?;
|
.map_err(|_| EthereumError::TokenUriParsingError)?;
|
||||||
let mut post = match get_post_by_ipfs_cid(db_client, &ipfs_cid).await {
|
let mut post = match get_post_by_ipfs_cid(db_client, &ipfs_cid).await {
|
||||||
Ok(post) => post,
|
Ok(post) => post,
|
||||||
|
Err(DatabaseError::NotFound(_)) => {
|
||||||
|
// Post was deleted
|
||||||
|
continue;
|
||||||
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Post not found or some other error
|
// Unexpected error
|
||||||
log::error!("{}", err);
|
log::error!("{}", err);
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
|
@ -204,6 +231,7 @@ pub async fn process_events(
|
||||||
post.token_id = Some(token_id);
|
post.token_id = Some(token_id);
|
||||||
post.token_tx_id = Some(tx_id);
|
post.token_tx_id = Some(tx_id);
|
||||||
update_post(db_client, &post).await?;
|
update_post(db_client, &post).await?;
|
||||||
|
token_waitlist_map.remove(&post.id);
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
|
@ -277,19 +277,21 @@ pub async fn update_reply_count(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn is_waiting_for_token(
|
pub async fn get_token_waitlist(
|
||||||
db_client: &impl GenericClient,
|
db_client: &impl GenericClient,
|
||||||
) -> Result<bool, DatabaseError> {
|
) -> Result<Vec<Uuid>, DatabaseError> {
|
||||||
let row = db_client.query_one(
|
let rows = db_client.query(
|
||||||
"
|
"
|
||||||
SELECT count(post) > 0 AS is_waiting
|
SELECT post.id
|
||||||
FROM post
|
FROM post
|
||||||
WHERE ipfs_cid IS NOT NULL AND token_id IS NULL
|
WHERE ipfs_cid IS NOT NULL AND token_id IS NULL
|
||||||
",
|
",
|
||||||
&[],
|
&[],
|
||||||
).await?;
|
).await?;
|
||||||
let is_waiting: bool = row.try_get("is_waiting")?;
|
let waitlist: Vec<Uuid> = rows.iter()
|
||||||
Ok(is_waiting)
|
.map(|row| row.try_get("id"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
Ok(waitlist)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes post from database and returns list of orphaned files.
|
/// Deletes post from database and returns list of orphaned files.
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::database::Pool;
|
use crate::database::Pool;
|
||||||
use crate::ethereum::nft::{get_nft_contract, process_events};
|
use crate::ethereum::nft::{get_nft_contract, process_events};
|
||||||
|
@ -11,11 +15,16 @@ pub fn run(config: Config, db_pool: Pool) -> () {
|
||||||
let web3_contract = get_nft_contract(&config).await
|
let web3_contract = get_nft_contract(&config).await
|
||||||
.map_err(|err| log::error!("{}", err))
|
.map_err(|err| log::error!("{}", err))
|
||||||
.ok();
|
.ok();
|
||||||
|
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
// Process events only if contract is properly configured
|
// Process events only if contract is properly configured
|
||||||
if let Some((web3, contract)) = web3_contract.as_ref() {
|
if let Some((web3, contract)) = web3_contract.as_ref() {
|
||||||
process_events(web3, contract, &db_pool).await.unwrap_or_else(|err| {
|
process_events(
|
||||||
|
web3, contract,
|
||||||
|
&db_pool,
|
||||||
|
&mut token_waitlist_map,
|
||||||
|
).await.unwrap_or_else(|err| {
|
||||||
log::error!("{}", err);
|
log::error!("{}", err);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue