Add chain ID column to subscription table

Set chain ID to "eip155:31337" for existing entries.
This commit is contained in:
silverpill 2022-08-30 20:33:45 +00:00
parent 2ed8d44001
commit 2eb7ec2f64
9 changed files with 144 additions and 7 deletions

View file

@ -0,0 +1,2 @@
ALTER TABLE subscription ADD COLUMN chain_id VARCHAR(50) NOT NULL DEFAULT 'eip155:31337';
ALTER TABLE subscription ALTER COLUMN chain_id DROP DEFAULT;

View file

@ -141,6 +141,7 @@ CREATE TABLE subscription (
sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE, sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE,
sender_address VARCHAR(100) NOT NULL, sender_address VARCHAR(100) NOT NULL,
recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE, recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE,
chain_id VARCHAR(50) NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL, expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL, updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE (sender_id, recipient_id) UNIQUE (sender_id, recipient_id)

View file

@ -91,6 +91,7 @@ pub struct ContractSet {
#[derive(Clone)] #[derive(Clone)]
pub struct Blockchain { pub struct Blockchain {
pub config: EthereumConfig,
pub contract_set: ContractSet, pub contract_set: ContractSet,
pub sync_state: SyncState, pub sync_state: SyncState,
} }
@ -193,5 +194,9 @@ pub async fn get_contracts(
subscription: maybe_subscription, subscription: maybe_subscription,
subscription_adapter: maybe_subscription_adapter, subscription_adapter: maybe_subscription_adapter,
}; };
Ok(Blockchain { contract_set, sync_state }) Ok(Blockchain {
config: config.clone(),
contract_set,
sync_state,
})
} }

View file

@ -40,6 +40,7 @@ use crate::models::users::queries::{
get_user_by_wallet_address, get_user_by_wallet_address,
}; };
use crate::models::users::types::User; use crate::models::users::types::User;
use crate::utils::caip2::ChainId;
use crate::utils::currencies::Currency; use crate::utils::currencies::Currency;
use super::contracts::ContractSet; use super::contracts::ContractSet;
use super::errors::EthereumError; use super::errors::EthereumError;
@ -86,6 +87,7 @@ async fn send_subscription_notifications(
/// Search for subscription update events /// Search for subscription update events
pub async fn check_ethereum_subscriptions( pub async fn check_ethereum_subscriptions(
config: &EthereumConfig,
instance: &Instance, instance: &Instance,
web3: &Web3<Http>, web3: &Web3<Http>,
contract: &Contract<Http>, contract: &Contract<Http>,
@ -179,14 +181,24 @@ pub async fn check_ethereum_subscriptions(
); );
continue; continue;
}; };
if subscription.chain_id != config.chain_id &&
subscription.chain_id != ChainId::ethereum_devnet()
{
// Switching from from devnet is allowed during migration
// because there's no persistent state
log::error!("can't switch to another chain");
continue;
};
if subscription.updated_at >= block_date { if subscription.updated_at >= block_date {
// Event already processed // Event already processed
continue; continue;
}; };
// Update subscription expiration date // Update subscription expiration date
// TODO: disallow automatic chain ID updates after migration
update_subscription( update_subscription(
db_client, db_client,
subscription.id, subscription.id,
&config.chain_id,
&expires_at, &expires_at,
&block_date, &block_date,
).await?; ).await?;
@ -218,6 +230,7 @@ pub async fn check_ethereum_subscriptions(
&sender.id, &sender.id,
&sender_address, &sender_address,
&recipient.id, &recipient.id,
&config.chain_id,
&expires_at, &expires_at,
&block_date, &block_date,
).await?; ).await?;

View file

@ -19,7 +19,7 @@ pub enum ChainIdError {
/// Parses CAIP-2 chain ID /// Parses CAIP-2 chain ID
pub fn parse_caip2_chain_id(chain_id: &ChainId) -> Result<u32, ChainIdError> { pub fn parse_caip2_chain_id(chain_id: &ChainId) -> Result<u32, ChainIdError> {
if chain_id.namespace != "eip155" { if !chain_id.is_ethereum() {
return Err(ChainIdError::UnsupportedChain); return Err(ChainIdError::UnsupportedChain);
}; };
let eth_chain_id: u32 = chain_id.reference.parse()?; let eth_chain_id: u32 = chain_id.reference.parse()?;

View file

@ -8,6 +8,7 @@ use crate::database::catch_unique_violation;
use crate::errors::DatabaseError; use crate::errors::DatabaseError;
use crate::models::relationships::queries::{subscribe, subscribe_opt}; use crate::models::relationships::queries::{subscribe, subscribe_opt};
use crate::models::relationships::types::RelationshipType; use crate::models::relationships::types::RelationshipType;
use crate::utils::caip2::ChainId;
use super::types::{DbSubscription, Subscription}; use super::types::{DbSubscription, Subscription};
pub async fn create_subscription( pub async fn create_subscription(
@ -15,6 +16,7 @@ pub async fn create_subscription(
sender_id: &Uuid, sender_id: &Uuid,
sender_address: &str, sender_address: &str,
recipient_id: &Uuid, recipient_id: &Uuid,
chain_id: &ChainId,
expires_at: &DateTime<Utc>, expires_at: &DateTime<Utc>,
updated_at: &DateTime<Utc>, updated_at: &DateTime<Utc>,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
@ -25,15 +27,17 @@ pub async fn create_subscription(
sender_id, sender_id,
sender_address, sender_address,
recipient_id, recipient_id,
chain_id,
expires_at, expires_at,
updated_at updated_at
) )
VALUES ($1, $2, $3, $4, $5) VALUES ($1, $2, $3, $4, $5, $6)
", ",
&[ &[
&sender_id, &sender_id,
&sender_address, &sender_address,
&recipient_id, &recipient_id,
&chain_id,
&expires_at, &expires_at,
&updated_at, &updated_at,
], ],
@ -46,6 +50,7 @@ pub async fn create_subscription(
pub async fn update_subscription( pub async fn update_subscription(
db_client: &mut impl GenericClient, db_client: &mut impl GenericClient,
subscription_id: i32, subscription_id: i32,
chain_id: &ChainId,
expires_at: &DateTime<Utc>, expires_at: &DateTime<Utc>,
updated_at: &DateTime<Utc>, updated_at: &DateTime<Utc>,
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
@ -54,15 +59,17 @@ pub async fn update_subscription(
" "
UPDATE subscription UPDATE subscription
SET SET
expires_at = $1, chain_id = $2,
updated_at = $2 expires_at = $3,
WHERE id = $3 updated_at = $4
WHERE id = $1
RETURNING sender_id, recipient_id RETURNING sender_id, recipient_id
", ",
&[ &[
&subscription_id,
&chain_id,
&expires_at, &expires_at,
&updated_at, &updated_at,
&subscription_id,
], ],
).await?; ).await?;
let row = maybe_row.ok_or(DatabaseError::NotFound("subscription"))?; let row = maybe_row.ok_or(DatabaseError::NotFound("subscription"))?;
@ -139,3 +146,55 @@ pub async fn get_incoming_subscriptions(
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
Ok(subscriptions) Ok(subscriptions)
} }
#[cfg(test)]
mod tests {
use serial_test::serial;
use crate::database::test_utils::create_test_database;
use crate::models::{
profiles::queries::create_profile,
profiles::types::ProfileCreateData,
relationships::queries::has_relationship,
relationships::types::RelationshipType,
users::queries::create_user,
users::types::UserCreateData,
};
use super::*;
#[tokio::test]
#[serial]
async fn test_create_subscription() {
let db_client = &mut create_test_database().await;
let sender_data = ProfileCreateData {
username: "sender".to_string(),
..Default::default()
};
let sender = create_profile(db_client, sender_data).await.unwrap();
let sender_address = "0xb9c5714089478a327f09197987f16f9e5d936e8a";
let recipient_data = UserCreateData {
username: "recipient".to_string(),
..Default::default()
};
let recipient = create_user(db_client, recipient_data).await.unwrap();
let chain_id = ChainId::ethereum_mainnet();
let expires_at = Utc::now();
let updated_at = Utc::now();
create_subscription(
db_client,
&sender.id,
sender_address,
&recipient.id,
&chain_id,
&expires_at,
&updated_at,
).await.unwrap();
let is_subscribed = has_relationship(
db_client,
&sender.id,
&recipient.id,
RelationshipType::Subscription,
).await.unwrap();
assert_eq!(is_subscribed, true);
}
}

View file

@ -7,6 +7,7 @@ use uuid::Uuid;
use crate::errors::DatabaseError; use crate::errors::DatabaseError;
use crate::models::profiles::types::DbActorProfile; use crate::models::profiles::types::DbActorProfile;
use crate::utils::caip2::ChainId;
#[derive(FromSql)] #[derive(FromSql)]
#[postgres(name = "subscription")] #[postgres(name = "subscription")]
@ -15,6 +16,7 @@ pub struct DbSubscription {
pub sender_id: Uuid, pub sender_id: Uuid,
pub sender_address: String, pub sender_address: String,
pub recipient_id: Uuid, pub recipient_id: Uuid,
pub chain_id: ChainId,
pub expires_at: DateTime<Utc>, pub expires_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }

View file

@ -79,6 +79,7 @@ async fn ethereum_subscription_monitor_task(
None => return Ok(()), // feature not enabled None => return Ok(()), // feature not enabled
}; };
check_ethereum_subscriptions( check_ethereum_subscriptions(
&blockchain.config,
instance, instance,
&blockchain.contract_set.web3, &blockchain.contract_set.web3,
subscription, subscription,

View file

@ -13,6 +13,7 @@ use serde::{
const CAIP2_RE: &str = r"(?P<namespace>[-a-z0-9]{3,8}):(?P<reference>[-a-zA-Z0-9]{1,32})"; const CAIP2_RE: &str = r"(?P<namespace>[-a-z0-9]{3,8}):(?P<reference>[-a-zA-Z0-9]{1,32})";
const CAIP2_ETHEREUM_NAMESPACE: &str = "eip155"; const CAIP2_ETHEREUM_NAMESPACE: &str = "eip155";
const ETHEREUM_MAINNET_ID: i32 = 1; const ETHEREUM_MAINNET_ID: i32 = 1;
const ETHEREUM_DEVNET_ID: i32 = 31337;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct ChainId { pub struct ChainId {
@ -27,6 +28,17 @@ impl ChainId {
reference: ETHEREUM_MAINNET_ID.to_string(), reference: ETHEREUM_MAINNET_ID.to_string(),
} }
} }
pub fn ethereum_devnet() -> Self {
Self {
namespace: CAIP2_ETHEREUM_NAMESPACE.to_string(),
reference: ETHEREUM_DEVNET_ID.to_string(),
}
}
pub fn is_ethereum(&self) -> bool {
self.namespace == CAIP2_ETHEREUM_NAMESPACE
}
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
@ -70,6 +82,48 @@ impl<'de> Deserialize<'de> for ChainId {
} }
} }
mod sql {
use postgres_protocol::types::{text_from_sql, text_to_sql};
use postgres_types::{
accepts,
private::BytesMut,
to_sql_checked,
FromSql,
IsNull,
ToSql,
Type,
};
use super::ChainId;
impl<'a> FromSql<'a> for ChainId {
fn from_sql(
_: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let value_str = text_from_sql(raw)?;
let value: Self = value_str.parse()?;
Ok(value)
}
accepts!(VARCHAR);
}
impl ToSql for ChainId {
fn to_sql(
&self,
_: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
let value_str = self.to_string();
text_to_sql(&value_str, out);
Ok(IsNull::No)
}
accepts!(VARCHAR, TEXT);
to_sql_checked!();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;