Save subscription state to database when processing blockchain event

This commit is contained in:
silverpill 2022-02-04 21:43:18 +00:00
parent 4e49f113e7
commit 71fc2d9dad
8 changed files with 254 additions and 7 deletions

View file

@ -0,0 +1,9 @@
CREATE TABLE subscription (
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE,
sender_address VARCHAR(100) NOT NULL,
recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE (sender_id, recipient_id)
);

View file

@ -124,3 +124,13 @@ CREATE TABLE timeline_marker (
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
UNIQUE (user_id, timeline)
);
CREATE TABLE subscription (
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE,
sender_address VARCHAR(100) NOT NULL,
recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE (sender_id, recipient_id)
);

View file

@ -12,7 +12,17 @@ use web3::{
use crate::config::BlockchainConfig;
use crate::database::{Pool, get_database_client};
use crate::errors::ConversionError;
use crate::errors::{ConversionError, DatabaseError};
use crate::models::profiles::queries::search_profile_by_wallet_address;
use crate::models::relationships::queries::unsubscribe;
use crate::models::subscriptions::queries::{
create_subscription,
update_subscription,
get_expired_subscriptions,
get_subscription_by_addresses,
};
use crate::models::users::queries::get_user_by_wallet_address;
use crate::models::users::types::WALLET_CURRENCY_CODE;
use super::errors::EthereumError;
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
use super::utils::parse_address;
@ -36,7 +46,7 @@ pub async fn check_subscriptions(
contract: &Contract<Http>,
db_pool: &Pool,
) -> Result<(), EthereumError> {
let _db_client = &**get_database_client(db_pool).await?;
let db_client = &mut **get_database_client(db_pool).await?;
let event_abi = contract.abi().event("UpdateSubscription")?;
let filter = FilterBuilder::default()
.address(vec![contract.address()])
@ -72,12 +82,77 @@ pub async fn check_subscriptions(
.timestamp;
let block_date = u256_to_date(block_timestamp)
.map_err(|_| EthereumError::ConversionError)?;
match get_subscription_by_addresses(
db_client,
&sender_address,
&recipient_address,
).await {
Ok(subscription) => {
if subscription.updated_at < block_date {
// Update subscription expiration date
update_subscription(
db_client,
subscription.id,
&expires_at,
&block_date,
).await?;
log::info!(
"subscription: from {0} to {1}, expires at {2}, updated at {3}",
"subscription updated: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
};
},
Err(DatabaseError::NotFound(_)) => {
// New subscription
let profiles = search_profile_by_wallet_address(
db_client,
WALLET_CURRENCY_CODE,
&sender_address,
).await?;
let sender = match &profiles[..] {
[profile] => profile,
[] => {
// Profile not found, skip event
log::error!("unknown subscriber {}", sender_address);
continue;
},
_ => {
// Ambiguous results, skip event
log::error!(
"search returned multiple results for address {}",
sender_address,
recipient_address,
expires_at,
block_date,
);
continue;
},
};
let recipient = get_user_by_wallet_address(db_client, &recipient_address).await?;
create_subscription(
db_client,
&sender.id,
&sender_address,
&recipient.id,
&expires_at,
&block_date,
).await?;
log::info!(
"subscription created: {0} to {1}",
sender.id,
recipient.id,
);
},
Err(other_error) => return Err(other_error.into()),
};
};
for subscription in get_expired_subscriptions(db_client).await? {
// Remove relationship
unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?;
log::info!(
"subscription expired: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
};
Ok(())

View file

@ -7,4 +7,5 @@ pub mod posts;
pub mod profiles;
pub mod reactions;
pub mod relationships;
pub mod subscriptions;
pub mod users;

View file

@ -331,6 +331,22 @@ pub async fn subscribe(
Ok(())
}
pub async fn subscribe_opt(
db_client: &impl GenericClient,
source_id: &Uuid,
target_id: &Uuid,
) -> Result<(), DatabaseError> {
db_client.execute(
"
INSERT INTO relationship (source_id, target_id, relationship_type)
VALUES ($1, $2, $3)
ON CONFLICT (source_id, target_id, relationship_type) DO NOTHING
",
&[&source_id, &target_id, &RelationshipType::Subscription],
).await?;
Ok(())
}
pub async fn unsubscribe(
db_client: &impl GenericClient,
source_id: &Uuid,

View file

@ -0,0 +1,2 @@
pub mod queries;
mod types;

View file

@ -0,0 +1,119 @@
use chrono::{DateTime, Utc};
use tokio_postgres::GenericClient;
use uuid::Uuid;
use crate::database::catch_unique_violation;
use crate::errors::DatabaseError;
use crate::models::relationships::queries::{subscribe, subscribe_opt};
use crate::models::relationships::types::RelationshipType;
use super::types::DbSubscription;
pub async fn create_subscription(
db_client: &mut impl GenericClient,
sender_id: &Uuid,
sender_address: &str,
recipient_id: &Uuid,
expires_at: &DateTime<Utc>,
updated_at: &DateTime<Utc>,
) -> Result<(), DatabaseError> {
let transaction = db_client.transaction().await?;
transaction.execute(
"
INSERT INTO subscription (
sender_id,
sender_address,
recipient_id,
expires_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5)
",
&[
&sender_id,
&sender_address,
&recipient_id,
&expires_at,
&updated_at,
],
).await.map_err(catch_unique_violation("subscription"))?;
subscribe(&transaction, sender_id, recipient_id).await?;
transaction.commit().await?;
Ok(())
}
pub async fn update_subscription(
db_client: &mut impl GenericClient,
subscription_id: i32,
expires_at: &DateTime<Utc>,
updated_at: &DateTime<Utc>,
) -> Result<(), DatabaseError> {
let transaction = db_client.transaction().await?;
let maybe_row = transaction.query_opt(
"
UPDATE subscription
SET
expires_at = $1,
updated_at = $2
WHERE id = $3
RETURNING sender_id, recipient_id
",
&[
&expires_at,
&updated_at,
&subscription_id,
],
).await?;
let row = maybe_row.ok_or(DatabaseError::NotFound("subscription"))?;
let sender_id: Uuid = row.try_get("sender_id")?;
let recipient_id: Uuid = row.try_get("recipient_id")?;
subscribe_opt(&transaction, &sender_id, &recipient_id).await?;
transaction.commit().await?;
Ok(())
}
/// Find subscription by participants' addresses.
/// The query is case-sensitive.
pub async fn get_subscription_by_addresses(
db_client: &impl GenericClient,
sender_address: &str,
recipient_address: &str,
) -> Result<DbSubscription, DatabaseError> {
let maybe_row = db_client.query_opt(
"
SELECT subscription
FROM subscription
JOIN user_account AS recipient
ON (subscription.recipient_id = recipient.id)
WHERE
subscription.sender_address = $1
AND recipient.wallet_address = $2
",
&[&sender_address, &recipient_address],
).await?;
let row = maybe_row.ok_or(DatabaseError::NotFound("subscription"))?;
let subscription: DbSubscription = row.try_get("subscription")?;
Ok(subscription)
}
pub async fn get_expired_subscriptions(
db_client: &impl GenericClient,
) -> Result<Vec<DbSubscription>, DatabaseError> {
let rows = db_client.query(
"
SELECT subscription
FROM subscription
JOIN relationship
ON (
relationship.source_id = subscription.sender_id
AND relationship.target_id = subscription.recipient_id
AND relationship.relationship_type = $1
)
WHERE subscription.expires_at <= CURRENT_TIMESTAMP
",
&[&RelationshipType::Subscription],
).await?;
let subscriptions = rows.iter()
.map(|row| row.try_get("subscription"))
.collect::<Result<_, _>>()?;
Ok(subscriptions)
}

View file

@ -0,0 +1,15 @@
use chrono::{DateTime, Utc};
use postgres_types::FromSql;
use uuid::Uuid;
#[derive(FromSql)]
#[postgres(name = "subscription")]
pub struct DbSubscription {
pub id: i32,
pub sender_id: Uuid,
#[allow(dead_code)]
sender_address: String,
pub recipient_id: Uuid,
pub expires_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}