Refactor ethereum subscription monitor

This commit is contained in:
silverpill 2022-08-28 12:01:26 +00:00
parent c5c3911de6
commit cc6d9d7688
2 changed files with 86 additions and 48 deletions

View file

@ -1,6 +1,7 @@
use std::convert::TryInto; use std::convert::TryInto;
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use tokio_postgres::GenericClient;
use web3::{ use web3::{
api::Web3, api::Web3,
@ -26,6 +27,7 @@ use crate::models::profiles::queries::{
get_profile_by_id, get_profile_by_id,
search_profile_by_wallet_address, search_profile_by_wallet_address,
}; };
use crate::models::profiles::types::DbActorProfile;
use crate::models::relationships::queries::unsubscribe; use crate::models::relationships::queries::unsubscribe;
use crate::models::subscriptions::queries::{ use crate::models::subscriptions::queries::{
create_subscription, create_subscription,
@ -37,6 +39,7 @@ use crate::models::users::queries::{
get_user_by_id, get_user_by_id,
get_user_by_wallet_address, get_user_by_wallet_address,
}; };
use crate::models::users::types::User;
use crate::utils::currencies::Currency; use crate::utils::currencies::Currency;
use super::contracts::ContractSet; use super::contracts::ContractSet;
use super::errors::EthereumError; use super::errors::EthereumError;
@ -59,8 +62,30 @@ fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
Ok(datetime) Ok(datetime)
} }
async fn send_subscription_notifications(
db_client: &impl GenericClient,
instance: &Instance,
sender: &DbActorProfile,
recipient: &User,
) -> Result<(), DatabaseError> {
create_subscription_notification(
db_client,
&sender.id,
&recipient.id,
).await?;
if let Some(ref remote_sender) = sender.actor_json {
prepare_add_person(
instance,
recipient,
remote_sender,
LocalActorCollection::Subscribers,
).spawn_deliver();
};
Ok(())
}
/// Search for subscription update events /// Search for subscription update events
pub async fn check_subscriptions( pub async fn check_ethereum_subscriptions(
instance: &Instance, instance: &Instance,
web3: &Web3<Http>, web3: &Web3<Http>,
contract: &Contract<Http>, contract: &Contract<Http>,
@ -154,7 +179,10 @@ pub async fn check_subscriptions(
); );
continue; continue;
}; };
if subscription.updated_at < block_date { if subscription.updated_at >= block_date {
// Event already processed
continue;
};
// Update subscription expiration date // Update subscription expiration date
update_subscription( update_subscription(
db_client, db_client,
@ -162,27 +190,25 @@ pub async fn check_subscriptions(
&expires_at, &expires_at,
&block_date, &block_date,
).await?; ).await?;
#[allow(clippy::comparison_chain)]
if expires_at > subscription.expires_at {
log::info!( log::info!(
"subscription updated: {0} to {1}", "subscription extended: {0} to {1}",
subscription.sender_id, subscription.sender_id,
subscription.recipient_id, subscription.recipient_id,
); );
if expires_at > subscription.expires_at { send_subscription_notifications(
// Subscription was extended
create_subscription_notification(
db_client, db_client,
&subscription.sender_id,
&subscription.recipient_id,
).await?;
if let Some(ref remote_sender) = sender.actor_json {
prepare_add_person(
instance, instance,
sender,
&recipient, &recipient,
remote_sender, ).await?;
LocalActorCollection::Subscribers, } else if expires_at < subscription.expires_at {
).spawn_deliver(); log::info!(
}; "subscription cancelled: {0} to {1}",
}; subscription.sender_id,
subscription.recipient_id,
);
}; };
}, },
Err(DatabaseError::NotFound(_)) => { Err(DatabaseError::NotFound(_)) => {
@ -200,24 +226,26 @@ pub async fn check_subscriptions(
sender.id, sender.id,
recipient.id, recipient.id,
); );
create_subscription_notification( send_subscription_notifications(
db_client, db_client,
&sender.id,
&recipient.id,
).await?;
if let Some(ref remote_sender) = sender.actor_json {
prepare_add_person(
instance, instance,
sender,
&recipient, &recipient,
remote_sender, ).await?;
LocalActorCollection::Subscribers,
).spawn_deliver();
};
}, },
Err(other_error) => return Err(other_error.into()), Err(other_error) => return Err(other_error.into()),
}; };
}; };
sync_state.update(&contract.address(), to_block)?;
Ok(())
}
pub async fn update_expired_subscriptions(
instance: &Instance,
db_pool: &Pool,
) -> Result<(), EthereumError> {
let db_client = &**get_database_client(db_pool).await?;
for subscription in get_expired_subscriptions(db_client).await? { for subscription in get_expired_subscriptions(db_client).await? {
// Remove relationship // Remove relationship
unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?; unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?;
@ -243,8 +271,6 @@ pub async fn check_subscriptions(
).await?; ).await?;
}; };
}; };
sync_state.update(&contract.address(), to_block)?;
Ok(()) Ok(())
} }

View file

@ -9,12 +9,16 @@ use crate::config::{Config, Instance};
use crate::database::Pool; use crate::database::Pool;
use crate::ethereum::contracts::Blockchain; use crate::ethereum::contracts::Blockchain;
use crate::ethereum::nft::process_nft_events; use crate::ethereum::nft::process_nft_events;
use crate::ethereum::subscriptions::check_subscriptions; use crate::ethereum::subscriptions::{
check_ethereum_subscriptions,
update_expired_subscriptions,
};
#[derive(Debug, Eq, Hash, PartialEq)] #[derive(Debug, Eq, Hash, PartialEq)]
enum Task { enum Task {
NftMonitor, NftMonitor,
SubscriptionMonitor, EthereumSubscriptionMonitor,
SubscriptionExpirationMonitor,
} }
impl Task { impl Task {
@ -22,7 +26,8 @@ impl Task {
fn period(&self) -> i64 { fn period(&self) -> i64 {
match self { match self {
Self::NftMonitor => 30, Self::NftMonitor => 30,
Self::SubscriptionMonitor => 300, Self::EthereumSubscriptionMonitor => 300,
Self::SubscriptionExpirationMonitor => 300,
} }
} }
} }
@ -60,7 +65,7 @@ async fn nft_monitor_task(
Ok(()) Ok(())
} }
async fn subscription_monitor_task( async fn ethereum_subscription_monitor_task(
instance: &Instance, instance: &Instance,
maybe_blockchain: Option<&mut Blockchain>, maybe_blockchain: Option<&mut Blockchain>,
db_pool: &Pool, db_pool: &Pool,
@ -73,7 +78,7 @@ async fn subscription_monitor_task(
Some(contract) => contract, Some(contract) => contract,
None => return Ok(()), // feature not enabled None => return Ok(()), // feature not enabled
}; };
check_subscriptions( check_ethereum_subscriptions(
instance, instance,
&blockchain.contract_set.web3, &blockchain.contract_set.web3,
subscription, subscription,
@ -90,7 +95,8 @@ pub fn run(
tokio::spawn(async move { tokio::spawn(async move {
let mut scheduler_state = HashMap::new(); let mut scheduler_state = HashMap::new();
scheduler_state.insert(Task::NftMonitor, None); scheduler_state.insert(Task::NftMonitor, None);
scheduler_state.insert(Task::SubscriptionMonitor, None); scheduler_state.insert(Task::EthereumSubscriptionMonitor, None);
scheduler_state.insert(Task::SubscriptionExpirationMonitor, None);
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new(); let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
@ -109,19 +115,25 @@ pub fn run(
&mut token_waitlist_map, &mut token_waitlist_map,
).await ).await
}, },
Task::SubscriptionMonitor => { Task::EthereumSubscriptionMonitor => {
subscription_monitor_task( ethereum_subscription_monitor_task(
&config.instance(), &config.instance(),
maybe_blockchain.as_mut(), maybe_blockchain.as_mut(),
&db_pool, &db_pool,
).await ).await
}, },
Task::SubscriptionExpirationMonitor => {
update_expired_subscriptions(
&config.instance(),
&db_pool,
).await.map_err(Error::from)
},
}; };
task_result.unwrap_or_else(|err| { task_result.unwrap_or_else(|err| {
log::error!("{:?}: {}", task, err); log::error!("{:?}: {}", task, err);
}); });
*last_run = Some(Utc::now()); *last_run = Some(Utc::now());
}; };
} };
}); });
} }