Update subscription state after processing invoice
This commit is contained in:
parent
4e73bff32e
commit
64fb51e92a
9 changed files with 97 additions and 15 deletions
|
@ -1127,6 +1127,7 @@ components:
|
||||||
sender_address:
|
sender_address:
|
||||||
description: Sender address.
|
description: Sender address.
|
||||||
type: string
|
type: string
|
||||||
|
nullable: true
|
||||||
example: '0xd8da6bf...'
|
example: '0xd8da6bf...'
|
||||||
expires_at:
|
expires_at:
|
||||||
description: The date when subscription expires.
|
description: The date when subscription expires.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
ALTER TABLE subscription ALTER COLUMN sender_address DROP NOT NULL;
|
|
@ -150,7 +150,7 @@ CREATE TABLE invoice (
|
||||||
CREATE TABLE subscription (
|
CREATE TABLE subscription (
|
||||||
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE,
|
sender_id UUID NOT NULL REFERENCES actor_profile (id) ON DELETE CASCADE,
|
||||||
sender_address VARCHAR(100) NOT NULL,
|
sender_address VARCHAR(100),
|
||||||
recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE,
|
recipient_id UUID NOT NULL REFERENCES user_account (id) ON DELETE CASCADE,
|
||||||
chain_id VARCHAR(50) NOT NULL,
|
chain_id VARCHAR(50) NOT NULL,
|
||||||
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||||
|
|
|
@ -63,7 +63,7 @@ fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
|
||||||
Ok(datetime)
|
Ok(datetime)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_subscription_notifications(
|
pub async fn send_subscription_notifications(
|
||||||
db_client: &impl GenericClient,
|
db_client: &impl GenericClient,
|
||||||
instance: &Instance,
|
instance: &Instance,
|
||||||
sender: &DbActorProfile,
|
sender: &DbActorProfile,
|
||||||
|
@ -170,13 +170,15 @@ pub async fn check_ethereum_subscriptions(
|
||||||
&recipient.id,
|
&recipient.id,
|
||||||
).await {
|
).await {
|
||||||
Ok(subscription) => {
|
Ok(subscription) => {
|
||||||
if subscription.sender_address != sender_address {
|
let current_sender_address =
|
||||||
|
subscription.sender_address.unwrap_or("''".to_string());
|
||||||
|
if current_sender_address != sender_address {
|
||||||
// Trust only key/address that was linked to profile
|
// Trust only key/address that was linked to profile
|
||||||
// when first subscription event occured.
|
// when first subscription event occured.
|
||||||
// Key rotation is not supported.
|
// Key rotation is not supported.
|
||||||
log::error!(
|
log::error!(
|
||||||
"subscriber address changed from {} to {}",
|
"subscriber address changed from {} to {}",
|
||||||
subscription.sender_address,
|
current_sender_address,
|
||||||
sender_address,
|
sender_address,
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
|
@ -228,7 +230,7 @@ pub async fn check_ethereum_subscriptions(
|
||||||
create_subscription(
|
create_subscription(
|
||||||
db_client,
|
db_client,
|
||||||
&sender.id,
|
&sender.id,
|
||||||
&sender_address,
|
Some(&sender_address),
|
||||||
&recipient.id,
|
&recipient.id,
|
||||||
&config.chain_id,
|
&config.chain_id,
|
||||||
&expires_at,
|
&expires_at,
|
||||||
|
|
|
@ -337,7 +337,7 @@ pub struct FollowListQueryParams {
|
||||||
pub struct ApiSubscription {
|
pub struct ApiSubscription {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub sender: Account,
|
pub sender: Account,
|
||||||
pub sender_address: String,
|
pub sender_address: Option<String>,
|
||||||
pub expires_at: DateTime<Utc>,
|
pub expires_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,12 +14,13 @@ use super::types::{DbSubscription, Subscription};
|
||||||
pub async fn create_subscription(
|
pub async fn create_subscription(
|
||||||
db_client: &mut impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
sender_id: &Uuid,
|
sender_id: &Uuid,
|
||||||
sender_address: &str,
|
sender_address: Option<&str>,
|
||||||
recipient_id: &Uuid,
|
recipient_id: &Uuid,
|
||||||
chain_id: &ChainId,
|
chain_id: &ChainId,
|
||||||
expires_at: &DateTime<Utc>,
|
expires_at: &DateTime<Utc>,
|
||||||
updated_at: &DateTime<Utc>,
|
updated_at: &DateTime<Utc>,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
|
assert!(chain_id.is_ethereum() == sender_address.is_some());
|
||||||
let transaction = db_client.transaction().await?;
|
let transaction = db_client.transaction().await?;
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
|
@ -182,7 +183,7 @@ mod tests {
|
||||||
create_subscription(
|
create_subscription(
|
||||||
db_client,
|
db_client,
|
||||||
&sender.id,
|
&sender.id,
|
||||||
sender_address,
|
Some(sender_address),
|
||||||
&recipient.id,
|
&recipient.id,
|
||||||
&chain_id,
|
&chain_id,
|
||||||
&expires_at,
|
&expires_at,
|
||||||
|
|
|
@ -14,7 +14,7 @@ use crate::utils::caip2::ChainId;
|
||||||
pub struct DbSubscription {
|
pub struct DbSubscription {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub sender_id: Uuid,
|
pub sender_id: Uuid,
|
||||||
pub sender_address: String,
|
pub sender_address: Option<String>,
|
||||||
pub recipient_id: Uuid,
|
pub recipient_id: Uuid,
|
||||||
pub chain_id: ChainId,
|
pub chain_id: ChainId,
|
||||||
pub expires_at: DateTime<Utc>,
|
pub expires_at: DateTime<Utc>,
|
||||||
|
@ -24,7 +24,7 @@ pub struct DbSubscription {
|
||||||
pub struct Subscription {
|
pub struct Subscription {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub sender: DbActorProfile,
|
pub sender: DbActorProfile,
|
||||||
pub sender_address: String,
|
pub sender_address: Option<String>,
|
||||||
pub expires_at: DateTime<Utc>,
|
pub expires_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use chrono::{Duration, Utc};
|
||||||
use monero_rpc::{RpcClient, TransferType};
|
use monero_rpc::{RpcClient, TransferType};
|
||||||
use monero_rpc::monero::{Address, Amount};
|
use monero_rpc::monero::{Address, Amount};
|
||||||
|
|
||||||
use crate::config::MoneroConfig;
|
use crate::config::{Instance, MoneroConfig};
|
||||||
use crate::database::{get_database_client, Pool};
|
use crate::database::{get_database_client, Pool};
|
||||||
|
use crate::errors::DatabaseError;
|
||||||
|
use crate::ethereum::subscriptions::send_subscription_notifications;
|
||||||
use crate::models::{
|
use crate::models::{
|
||||||
invoices::queries::{
|
invoices::queries::{
|
||||||
get_invoice_by_address,
|
get_invoice_by_address,
|
||||||
|
@ -12,16 +16,23 @@ use crate::models::{
|
||||||
set_invoice_status,
|
set_invoice_status,
|
||||||
},
|
},
|
||||||
invoices::types::InvoiceStatus,
|
invoices::types::InvoiceStatus,
|
||||||
|
profiles::queries::get_profile_by_id,
|
||||||
profiles::types::PaymentOption,
|
profiles::types::PaymentOption,
|
||||||
|
subscriptions::queries::{
|
||||||
|
create_subscription,
|
||||||
|
get_subscription_by_participants,
|
||||||
|
update_subscription,
|
||||||
|
},
|
||||||
users::queries::get_user_by_id,
|
users::queries::get_user_by_id,
|
||||||
};
|
};
|
||||||
use super::wallet::{send_monero, DEFAULT_ACCOUNT, MoneroError};
|
use super::wallet::{send_monero, DEFAULT_ACCOUNT, MoneroError};
|
||||||
|
|
||||||
pub async fn check_monero_subscriptions(
|
pub async fn check_monero_subscriptions(
|
||||||
|
instance: &Instance,
|
||||||
config: &MoneroConfig,
|
config: &MoneroConfig,
|
||||||
db_pool: &Pool,
|
db_pool: &Pool,
|
||||||
) -> Result<(), MoneroError> {
|
) -> Result<(), MoneroError> {
|
||||||
let db_client = &**get_database_client(db_pool).await?;
|
let db_client = &mut **get_database_client(db_pool).await?;
|
||||||
|
|
||||||
let wallet_client = RpcClient::new(config.wallet_url.clone()).wallet();
|
let wallet_client = RpcClient::new(config.wallet_url.clone()).wallet();
|
||||||
wallet_client.open_wallet(
|
wallet_client.open_wallet(
|
||||||
|
@ -99,8 +110,9 @@ pub async fn check_monero_subscriptions(
|
||||||
// Not ready for forwarding
|
// Not ready for forwarding
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
let sender = get_profile_by_id(db_client, &invoice.sender_id).await?;
|
||||||
let recipient = get_user_by_id(db_client, &invoice.recipient_id).await?;
|
let recipient = get_user_by_id(db_client, &invoice.recipient_id).await?;
|
||||||
let maybe_payment_info = recipient.profile.payment_options
|
let maybe_payment_info = recipient.profile.payment_options.clone()
|
||||||
.into_inner().into_iter()
|
.into_inner().into_iter()
|
||||||
.find_map(|option| match option {
|
.find_map(|option| match option {
|
||||||
PaymentOption::MoneroSubscription(payment_info) => {
|
PaymentOption::MoneroSubscription(payment_info) => {
|
||||||
|
@ -119,17 +131,78 @@ pub async fn check_monero_subscriptions(
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
let payout_address = Address::from_str(&payment_info.payout_address)?;
|
let payout_address = Address::from_str(&payment_info.payout_address)?;
|
||||||
let _payout_amount = send_monero(
|
let payout_amount = send_monero(
|
||||||
&wallet_client,
|
&wallet_client,
|
||||||
address_index.minor,
|
address_index.minor,
|
||||||
payout_address,
|
payout_address,
|
||||||
).await?;
|
).await?;
|
||||||
|
let duration_secs = (payout_amount.as_pico() / payment_info.price)
|
||||||
|
.try_into()
|
||||||
|
.map_err(|_| MoneroError::OtherError("invalid duration"))?;
|
||||||
|
let expires_at = Utc::now() + Duration::seconds(duration_secs);
|
||||||
|
|
||||||
set_invoice_status(
|
set_invoice_status(
|
||||||
db_client,
|
db_client,
|
||||||
&invoice.id,
|
&invoice.id,
|
||||||
InvoiceStatus::Forwarded,
|
InvoiceStatus::Forwarded,
|
||||||
).await?;
|
).await?;
|
||||||
log::info!("processed payment for invoice {}", invoice.id);
|
log::info!("processed payment for invoice {}", invoice.id);
|
||||||
|
|
||||||
|
match get_subscription_by_participants(
|
||||||
|
db_client,
|
||||||
|
&sender.id,
|
||||||
|
&recipient.id,
|
||||||
|
).await {
|
||||||
|
Ok(subscription) => {
|
||||||
|
if subscription.chain_id != config.chain_id {
|
||||||
|
log::error!("can't switch to another chain");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
// Update subscription expiration date
|
||||||
|
update_subscription(
|
||||||
|
db_client,
|
||||||
|
subscription.id,
|
||||||
|
&subscription.chain_id,
|
||||||
|
&expires_at,
|
||||||
|
&Utc::now(),
|
||||||
|
).await?;
|
||||||
|
log::info!(
|
||||||
|
"subscription updated: {0} to {1}",
|
||||||
|
subscription.sender_id,
|
||||||
|
subscription.recipient_id,
|
||||||
|
);
|
||||||
|
send_subscription_notifications(
|
||||||
|
db_client,
|
||||||
|
instance,
|
||||||
|
&sender,
|
||||||
|
&recipient,
|
||||||
|
).await?;
|
||||||
|
},
|
||||||
|
Err(DatabaseError::NotFound(_)) => {
|
||||||
|
// New subscription
|
||||||
|
create_subscription(
|
||||||
|
db_client,
|
||||||
|
&sender.id,
|
||||||
|
None, // matching by address is not required
|
||||||
|
&recipient.id,
|
||||||
|
&config.chain_id,
|
||||||
|
&expires_at,
|
||||||
|
&Utc::now(),
|
||||||
|
).await?;
|
||||||
|
log::info!(
|
||||||
|
"subscription created: {0} to {1}",
|
||||||
|
sender.id,
|
||||||
|
recipient.id,
|
||||||
|
);
|
||||||
|
send_subscription_notifications(
|
||||||
|
db_client,
|
||||||
|
instance,
|
||||||
|
&sender,
|
||||||
|
&recipient,
|
||||||
|
).await?;
|
||||||
|
},
|
||||||
|
Err(other_error) => return Err(other_error.into()),
|
||||||
|
};
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,11 @@ async fn monero_payment_monitor_task(
|
||||||
Some(monero_config) => monero_config,
|
Some(monero_config) => monero_config,
|
||||||
None => return Ok(()), // not configured
|
None => return Ok(()), // not configured
|
||||||
};
|
};
|
||||||
check_monero_subscriptions(monero_config, db_pool).await?;
|
check_monero_subscriptions(
|
||||||
|
&config.instance(),
|
||||||
|
monero_config,
|
||||||
|
db_pool,
|
||||||
|
).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue