diff --git a/migrations/V0029__subscription__add_chain_id.sql b/migrations/V0029__subscription__add_chain_id.sql new file mode 100644 index 0000000..52410bb --- /dev/null +++ b/migrations/V0029__subscription__add_chain_id.sql @@ -0,0 +1,2 @@ +ALTER TABLE subscription ADD COLUMN chain_id VARCHAR(50) NOT NULL DEFAULT 'eip155:31337'; +ALTER TABLE subscription ALTER COLUMN chain_id DROP DEFAULT; diff --git a/migrations/schema.sql b/migrations/schema.sql index 557ceae..77f2e79 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -141,6 +141,7 @@ CREATE TABLE subscription ( sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE, sender_address VARCHAR(100) NOT NULL, recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE, + chain_id VARCHAR(50) NOT NULL, expires_at TIMESTAMP WITH TIME ZONE NOT NULL, updated_at TIMESTAMP WITH TIME ZONE NOT NULL, UNIQUE (sender_id, recipient_id) diff --git a/src/ethereum/contracts.rs b/src/ethereum/contracts.rs index 55cdbfe..8168707 100644 --- a/src/ethereum/contracts.rs +++ b/src/ethereum/contracts.rs @@ -91,6 +91,7 @@ pub struct ContractSet { #[derive(Clone)] pub struct Blockchain { + pub config: EthereumConfig, pub contract_set: ContractSet, pub sync_state: SyncState, } @@ -193,5 +194,9 @@ pub async fn get_contracts( subscription: maybe_subscription, subscription_adapter: maybe_subscription_adapter, }; - Ok(Blockchain { contract_set, sync_state }) + Ok(Blockchain { + config: config.clone(), + contract_set, + sync_state, + }) } diff --git a/src/ethereum/subscriptions.rs b/src/ethereum/subscriptions.rs index d3d78ca..515e1ec 100644 --- a/src/ethereum/subscriptions.rs +++ b/src/ethereum/subscriptions.rs @@ -40,6 +40,7 @@ use crate::models::users::queries::{ get_user_by_wallet_address, }; use crate::models::users::types::User; +use crate::utils::caip2::ChainId; use crate::utils::currencies::Currency; use super::contracts::ContractSet; use super::errors::EthereumError; @@ -86,6 +87,7 @@ async fn send_subscription_notifications( /// Search for subscription update events pub async fn check_ethereum_subscriptions( + config: &EthereumConfig, instance: &Instance, web3: &Web3, contract: &Contract, @@ -179,14 +181,24 @@ pub async fn check_ethereum_subscriptions( ); continue; }; + if subscription.chain_id != config.chain_id && + subscription.chain_id != ChainId::ethereum_devnet() + { + // Switching from from devnet is allowed during migration + // because there's no persistent state + log::error!("can't switch to another chain"); + continue; + }; if subscription.updated_at >= block_date { // Event already processed continue; }; // Update subscription expiration date + // TODO: disallow automatic chain ID updates after migration update_subscription( db_client, subscription.id, + &config.chain_id, &expires_at, &block_date, ).await?; @@ -218,6 +230,7 @@ pub async fn check_ethereum_subscriptions( &sender.id, &sender_address, &recipient.id, + &config.chain_id, &expires_at, &block_date, ).await?; diff --git a/src/ethereum/utils.rs b/src/ethereum/utils.rs index 6dd849c..478b73d 100644 --- a/src/ethereum/utils.rs +++ b/src/ethereum/utils.rs @@ -19,7 +19,7 @@ pub enum ChainIdError { /// Parses CAIP-2 chain ID pub fn parse_caip2_chain_id(chain_id: &ChainId) -> Result { - if chain_id.namespace != "eip155" { + if !chain_id.is_ethereum() { return Err(ChainIdError::UnsupportedChain); }; let eth_chain_id: u32 = chain_id.reference.parse()?; diff --git a/src/models/subscriptions/queries.rs b/src/models/subscriptions/queries.rs index 3710f10..eccfe3a 100644 --- a/src/models/subscriptions/queries.rs +++ b/src/models/subscriptions/queries.rs @@ -8,6 +8,7 @@ use crate::database::catch_unique_violation; use crate::errors::DatabaseError; use crate::models::relationships::queries::{subscribe, subscribe_opt}; use crate::models::relationships::types::RelationshipType; +use crate::utils::caip2::ChainId; use super::types::{DbSubscription, Subscription}; pub async fn create_subscription( @@ -15,6 +16,7 @@ pub async fn create_subscription( sender_id: &Uuid, sender_address: &str, recipient_id: &Uuid, + chain_id: &ChainId, expires_at: &DateTime, updated_at: &DateTime, ) -> Result<(), DatabaseError> { @@ -25,15 +27,17 @@ pub async fn create_subscription( sender_id, sender_address, recipient_id, + chain_id, expires_at, updated_at ) - VALUES ($1, $2, $3, $4, $5) + VALUES ($1, $2, $3, $4, $5, $6) ", &[ &sender_id, &sender_address, &recipient_id, + &chain_id, &expires_at, &updated_at, ], @@ -46,6 +50,7 @@ pub async fn create_subscription( pub async fn update_subscription( db_client: &mut impl GenericClient, subscription_id: i32, + chain_id: &ChainId, expires_at: &DateTime, updated_at: &DateTime, ) -> Result<(), DatabaseError> { @@ -54,15 +59,17 @@ pub async fn update_subscription( " UPDATE subscription SET - expires_at = $1, - updated_at = $2 - WHERE id = $3 + chain_id = $2, + expires_at = $3, + updated_at = $4 + WHERE id = $1 RETURNING sender_id, recipient_id ", &[ + &subscription_id, + &chain_id, &expires_at, &updated_at, - &subscription_id, ], ).await?; let row = maybe_row.ok_or(DatabaseError::NotFound("subscription"))?; @@ -139,3 +146,55 @@ pub async fn get_incoming_subscriptions( .collect::>()?; Ok(subscriptions) } + +#[cfg(test)] +mod tests { + use serial_test::serial; + use crate::database::test_utils::create_test_database; + use crate::models::{ + profiles::queries::create_profile, + profiles::types::ProfileCreateData, + relationships::queries::has_relationship, + relationships::types::RelationshipType, + users::queries::create_user, + users::types::UserCreateData, + }; + use super::*; + + #[tokio::test] + #[serial] + async fn test_create_subscription() { + let db_client = &mut create_test_database().await; + let sender_data = ProfileCreateData { + username: "sender".to_string(), + ..Default::default() + }; + let sender = create_profile(db_client, sender_data).await.unwrap(); + let sender_address = "0xb9c5714089478a327f09197987f16f9e5d936e8a"; + let recipient_data = UserCreateData { + username: "recipient".to_string(), + ..Default::default() + }; + let recipient = create_user(db_client, recipient_data).await.unwrap(); + let chain_id = ChainId::ethereum_mainnet(); + let expires_at = Utc::now(); + let updated_at = Utc::now(); + create_subscription( + db_client, + &sender.id, + sender_address, + &recipient.id, + &chain_id, + &expires_at, + &updated_at, + ).await.unwrap(); + + let is_subscribed = has_relationship( + db_client, + &sender.id, + &recipient.id, + RelationshipType::Subscription, + ).await.unwrap(); + assert_eq!(is_subscribed, true); + } +} diff --git a/src/models/subscriptions/types.rs b/src/models/subscriptions/types.rs index 03d70fa..9555716 100644 --- a/src/models/subscriptions/types.rs +++ b/src/models/subscriptions/types.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use crate::errors::DatabaseError; use crate::models::profiles::types::DbActorProfile; +use crate::utils::caip2::ChainId; #[derive(FromSql)] #[postgres(name = "subscription")] @@ -15,6 +16,7 @@ pub struct DbSubscription { pub sender_id: Uuid, pub sender_address: String, pub recipient_id: Uuid, + pub chain_id: ChainId, pub expires_at: DateTime, pub updated_at: DateTime, } diff --git a/src/scheduler.rs b/src/scheduler.rs index 7aa6bd4..6cb6eda 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -79,6 +79,7 @@ async fn ethereum_subscription_monitor_task( None => return Ok(()), // feature not enabled }; check_ethereum_subscriptions( + &blockchain.config, instance, &blockchain.contract_set.web3, subscription, diff --git a/src/utils/caip2.rs b/src/utils/caip2.rs index 8c27d16..5a515b6 100644 --- a/src/utils/caip2.rs +++ b/src/utils/caip2.rs @@ -13,6 +13,7 @@ use serde::{ const CAIP2_RE: &str = r"(?P[-a-z0-9]{3,8}):(?P[-a-zA-Z0-9]{1,32})"; const CAIP2_ETHEREUM_NAMESPACE: &str = "eip155"; const ETHEREUM_MAINNET_ID: i32 = 1; +const ETHEREUM_DEVNET_ID: i32 = 31337; #[derive(Clone, Debug, PartialEq)] pub struct ChainId { @@ -27,6 +28,17 @@ impl ChainId { reference: ETHEREUM_MAINNET_ID.to_string(), } } + + pub fn ethereum_devnet() -> Self { + Self { + namespace: CAIP2_ETHEREUM_NAMESPACE.to_string(), + reference: ETHEREUM_DEVNET_ID.to_string(), + } + } + + pub fn is_ethereum(&self) -> bool { + self.namespace == CAIP2_ETHEREUM_NAMESPACE + } } #[derive(thiserror::Error, Debug)] @@ -70,6 +82,48 @@ impl<'de> Deserialize<'de> for ChainId { } } +mod sql { + use postgres_protocol::types::{text_from_sql, text_to_sql}; + use postgres_types::{ + accepts, + private::BytesMut, + to_sql_checked, + FromSql, + IsNull, + ToSql, + Type, + }; + use super::ChainId; + + impl<'a> FromSql<'a> for ChainId { + fn from_sql( + _: &Type, + raw: &'a [u8], + ) -> Result> { + let value_str = text_from_sql(raw)?; + let value: Self = value_str.parse()?; + Ok(value) + } + + accepts!(VARCHAR); + } + + impl ToSql for ChainId { + fn to_sql( + &self, + _: &Type, + out: &mut BytesMut, + ) -> Result> { + let value_str = self.to_string(); + text_to_sql(&value_str, out); + Ok(IsNull::No) + } + + accepts!(VARCHAR, TEXT); + to_sql_checked!(); + } +} + #[cfg(test)] mod tests { use super::*;