Add periodic task for monitoring Monero payments
This commit is contained in:
parent
cd09fe0801
commit
6b60e9a0da
5 changed files with 217 additions and 4 deletions
|
@ -5,7 +5,7 @@ use crate::database::catch_unique_violation;
|
||||||
use crate::errors::DatabaseError;
|
use crate::errors::DatabaseError;
|
||||||
use crate::utils::caip2::ChainId;
|
use crate::utils::caip2::ChainId;
|
||||||
use crate::utils::id::new_uuid;
|
use crate::utils::id::new_uuid;
|
||||||
use super::types::DbInvoice;
|
use super::types::{DbInvoice, InvoiceStatus};
|
||||||
|
|
||||||
pub async fn create_invoice(
|
pub async fn create_invoice(
|
||||||
db_client: &impl GenericClient,
|
db_client: &impl GenericClient,
|
||||||
|
@ -39,12 +39,64 @@ pub async fn create_invoice(
|
||||||
Ok(invoice)
|
Ok(invoice)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_invoice_by_address(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
chain_id: &ChainId,
|
||||||
|
payment_address: &str,
|
||||||
|
) -> Result<DbInvoice, DatabaseError> {
|
||||||
|
let maybe_row = db_client.query_opt(
|
||||||
|
"
|
||||||
|
SELECT invoice
|
||||||
|
FROM invoice WHERE chain_id = $1 AND payment_address = $2
|
||||||
|
",
|
||||||
|
&[&chain_id, &payment_address],
|
||||||
|
).await?;
|
||||||
|
let row = maybe_row.ok_or(DatabaseError::NotFound("invoice"))?;
|
||||||
|
let invoice = row.try_get("invoice")?;
|
||||||
|
Ok(invoice)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_invoices_by_status(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
chain_id: &ChainId,
|
||||||
|
status: InvoiceStatus,
|
||||||
|
) -> Result<Vec<DbInvoice>, DatabaseError> {
|
||||||
|
let rows = db_client.query(
|
||||||
|
"
|
||||||
|
SELECT invoice
|
||||||
|
FROM invoice WHERE chain_id = $1 AND invoice_status = $2
|
||||||
|
",
|
||||||
|
&[&chain_id, &status],
|
||||||
|
).await?;
|
||||||
|
let invoices = rows.iter()
|
||||||
|
.map(|row| row.try_get("invoice"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
Ok(invoices)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_invoice_status(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
invoice_id: &Uuid,
|
||||||
|
status: InvoiceStatus,
|
||||||
|
) -> Result<(), DatabaseError> {
|
||||||
|
let updated_count = db_client.execute(
|
||||||
|
"
|
||||||
|
UPDATE invoice SET invoice_status = $2
|
||||||
|
WHERE id = $1
|
||||||
|
",
|
||||||
|
&[&invoice_id, &status],
|
||||||
|
).await?;
|
||||||
|
if updated_count == 0 {
|
||||||
|
return Err(DatabaseError::NotFound("invoice"));
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
use crate::database::test_utils::create_test_database;
|
use crate::database::test_utils::create_test_database;
|
||||||
use crate::models::{
|
use crate::models::{
|
||||||
invoices::types::InvoiceStatus,
|
|
||||||
profiles::queries::create_profile,
|
profiles::queries::create_profile,
|
||||||
profiles::types::ProfileCreateData,
|
profiles::types::ProfileCreateData,
|
||||||
users::queries::create_user,
|
users::queries::create_user,
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
|
pub mod subscriptions;
|
||||||
pub mod wallet;
|
pub mod wallet;
|
||||||
|
|
129
src/monero/subscriptions.rs
Normal file
129
src/monero/subscriptions.rs
Normal file
|
@ -0,0 +1,129 @@
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use monero_rpc::{RpcClient, TransferType};
|
||||||
|
use monero_rpc::monero::{Address, Amount};
|
||||||
|
|
||||||
|
use crate::config::MoneroConfig;
|
||||||
|
use crate::database::{get_database_client, Pool};
|
||||||
|
use crate::models::{
|
||||||
|
invoices::queries::{
|
||||||
|
get_invoice_by_address,
|
||||||
|
get_invoices_by_status,
|
||||||
|
set_invoice_status,
|
||||||
|
},
|
||||||
|
invoices::types::InvoiceStatus,
|
||||||
|
profiles::types::PaymentOption,
|
||||||
|
users::queries::get_user_by_id,
|
||||||
|
};
|
||||||
|
use super::wallet::{DEFAULT_ACCOUNT, MoneroError};
|
||||||
|
|
||||||
|
pub async fn check_monero_subscriptions(
|
||||||
|
config: &MoneroConfig,
|
||||||
|
db_pool: &Pool,
|
||||||
|
) -> Result<(), MoneroError> {
|
||||||
|
let db_client = &**get_database_client(db_pool).await?;
|
||||||
|
|
||||||
|
let wallet_client = RpcClient::new(config.wallet_url.clone()).wallet();
|
||||||
|
wallet_client.open_wallet(
|
||||||
|
config.wallet_name.clone(),
|
||||||
|
config.wallet_password.clone(),
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
// Invoices waiting for payment
|
||||||
|
let mut address_waitlist = vec![];
|
||||||
|
let open_invoices = get_invoices_by_status(
|
||||||
|
db_client,
|
||||||
|
&config.chain_id,
|
||||||
|
InvoiceStatus::Open,
|
||||||
|
).await?;
|
||||||
|
for invoice in open_invoices {
|
||||||
|
let address = Address::from_str(&invoice.payment_address)?;
|
||||||
|
let address_index = wallet_client.get_address_index(address).await?;
|
||||||
|
address_waitlist.push(address_index.minor);
|
||||||
|
};
|
||||||
|
let maybe_incoming_transfers = if !address_waitlist.is_empty() {
|
||||||
|
log::info!("{} invoices are waiting for payment", address_waitlist.len());
|
||||||
|
let incoming_transfers = wallet_client.incoming_transfers(
|
||||||
|
TransferType::Available,
|
||||||
|
Some(DEFAULT_ACCOUNT),
|
||||||
|
Some(address_waitlist),
|
||||||
|
).await?;
|
||||||
|
incoming_transfers.transfers
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
if let Some(transfers) = maybe_incoming_transfers {
|
||||||
|
for transfer in transfers {
|
||||||
|
let address_data = wallet_client.get_address(
|
||||||
|
DEFAULT_ACCOUNT,
|
||||||
|
Some(vec![transfer.subaddr_index.minor]),
|
||||||
|
).await?;
|
||||||
|
let subaddress = if let [subaddress_data] = &address_data.addresses[..] {
|
||||||
|
subaddress_data.address
|
||||||
|
} else {
|
||||||
|
return Err(MoneroError::OtherError("invalid response from wallet"));
|
||||||
|
};
|
||||||
|
let invoice = get_invoice_by_address(
|
||||||
|
db_client,
|
||||||
|
&config.chain_id,
|
||||||
|
&subaddress.to_string(),
|
||||||
|
).await?;
|
||||||
|
log::info!(
|
||||||
|
"received payment for invoice {}: {}",
|
||||||
|
invoice.id,
|
||||||
|
transfer.amount,
|
||||||
|
);
|
||||||
|
set_invoice_status(db_client, &invoice.id, InvoiceStatus::Paid).await?;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
// Invoices waiting to be forwarded
|
||||||
|
let paid_invoices = get_invoices_by_status(
|
||||||
|
db_client,
|
||||||
|
&config.chain_id,
|
||||||
|
InvoiceStatus::Paid,
|
||||||
|
).await?;
|
||||||
|
for invoice in paid_invoices {
|
||||||
|
let address = Address::from_str(&invoice.payment_address)?;
|
||||||
|
let address_index = wallet_client.get_address_index(address).await?;
|
||||||
|
let balance_data = wallet_client.get_balance(
|
||||||
|
address_index.major,
|
||||||
|
Some(vec![address_index.minor]),
|
||||||
|
).await?;
|
||||||
|
let unlocked_balance = if let [subaddress_data] = &balance_data.per_subaddress[..] {
|
||||||
|
subaddress_data.unlocked_balance
|
||||||
|
} else {
|
||||||
|
return Err(MoneroError::OtherError("invalid response from wallet"));
|
||||||
|
};
|
||||||
|
if unlocked_balance == Amount::ZERO {
|
||||||
|
// Not ready for forwarding
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let recipient = get_user_by_id(db_client, &invoice.recipient_id).await?;
|
||||||
|
let maybe_payment_info = recipient.profile.payment_options
|
||||||
|
.into_inner().into_iter()
|
||||||
|
.find_map(|option| match option {
|
||||||
|
PaymentOption::MoneroSubscription(payment_info) => {
|
||||||
|
if payment_info.chain_id == config.chain_id {
|
||||||
|
Some(payment_info)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => None,
|
||||||
|
});
|
||||||
|
let payment_info = if let Some(payment_info) = maybe_payment_info {
|
||||||
|
payment_info
|
||||||
|
} else {
|
||||||
|
log::error!("subscription is not configured for user {}", recipient.id);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let payout_address = Address::from_str(&payment_info.payout_address)?;
|
||||||
|
log::info!(
|
||||||
|
"invoice {}, payout address {}",
|
||||||
|
invoice.id,
|
||||||
|
payout_address,
|
||||||
|
);
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,14 +1,24 @@
|
||||||
use monero_rpc::RpcClient;
|
use monero_rpc::RpcClient;
|
||||||
use monero_rpc::monero::Address;
|
use monero_rpc::monero::{Address, util::address::Error as AddressError};
|
||||||
|
|
||||||
use crate::config::MoneroConfig;
|
use crate::config::MoneroConfig;
|
||||||
|
use crate::errors::DatabaseError;
|
||||||
|
|
||||||
const DEFAULT_ACCOUNT: u32 = 0;
|
pub const DEFAULT_ACCOUNT: u32 = 0;
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum MoneroError {
|
pub enum MoneroError {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
WalletError(#[from] anyhow::Error),
|
WalletError(#[from] anyhow::Error),
|
||||||
|
|
||||||
|
#[error(transparent)]
|
||||||
|
AddressError(#[from] AddressError),
|
||||||
|
|
||||||
|
#[error(transparent)]
|
||||||
|
DatabaseError(#[from] DatabaseError),
|
||||||
|
|
||||||
|
#[error("other error")]
|
||||||
|
OtherError(&'static str),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// http://monerotoruzizulg5ttgat2emf4d6fbmiea25detrmmy7erypseyteyd.onion/resources/developer-guides/wallet-rpc.html#create_wallet
|
/// http://monerotoruzizulg5ttgat2emf4d6fbmiea25detrmmy7erypseyteyd.onion/resources/developer-guides/wallet-rpc.html#create_wallet
|
||||||
|
|
|
@ -13,12 +13,14 @@ use crate::ethereum::subscriptions::{
|
||||||
check_ethereum_subscriptions,
|
check_ethereum_subscriptions,
|
||||||
update_expired_subscriptions,
|
update_expired_subscriptions,
|
||||||
};
|
};
|
||||||
|
use crate::monero::subscriptions::check_monero_subscriptions;
|
||||||
|
|
||||||
#[derive(Debug, Eq, Hash, PartialEq)]
|
#[derive(Debug, Eq, Hash, PartialEq)]
|
||||||
enum Task {
|
enum Task {
|
||||||
NftMonitor,
|
NftMonitor,
|
||||||
EthereumSubscriptionMonitor,
|
EthereumSubscriptionMonitor,
|
||||||
SubscriptionExpirationMonitor,
|
SubscriptionExpirationMonitor,
|
||||||
|
MoneroPaymentMonitor,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task {
|
impl Task {
|
||||||
|
@ -28,6 +30,7 @@ impl Task {
|
||||||
Self::NftMonitor => 30,
|
Self::NftMonitor => 30,
|
||||||
Self::EthereumSubscriptionMonitor => 300,
|
Self::EthereumSubscriptionMonitor => 300,
|
||||||
Self::SubscriptionExpirationMonitor => 300,
|
Self::SubscriptionExpirationMonitor => 300,
|
||||||
|
Self::MoneroPaymentMonitor => 30,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,6 +91,20 @@ async fn ethereum_subscription_monitor_task(
|
||||||
).await.map_err(Error::from)
|
).await.map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn monero_payment_monitor_task(
|
||||||
|
config: &Config,
|
||||||
|
db_pool: &Pool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let maybe_monero_config = config.blockchain()
|
||||||
|
.and_then(|conf| conf.monero_config());
|
||||||
|
let monero_config = match maybe_monero_config {
|
||||||
|
Some(monero_config) => monero_config,
|
||||||
|
None => return Ok(()), // not configured
|
||||||
|
};
|
||||||
|
check_monero_subscriptions(monero_config, db_pool).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn run(
|
pub fn run(
|
||||||
config: Config,
|
config: Config,
|
||||||
mut maybe_blockchain: Option<Blockchain>,
|
mut maybe_blockchain: Option<Blockchain>,
|
||||||
|
@ -98,6 +115,7 @@ pub fn run(
|
||||||
scheduler_state.insert(Task::NftMonitor, None);
|
scheduler_state.insert(Task::NftMonitor, None);
|
||||||
scheduler_state.insert(Task::EthereumSubscriptionMonitor, None);
|
scheduler_state.insert(Task::EthereumSubscriptionMonitor, None);
|
||||||
scheduler_state.insert(Task::SubscriptionExpirationMonitor, None);
|
scheduler_state.insert(Task::SubscriptionExpirationMonitor, None);
|
||||||
|
scheduler_state.insert(Task::MoneroPaymentMonitor, None);
|
||||||
|
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
|
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
|
||||||
|
@ -129,6 +147,9 @@ pub fn run(
|
||||||
&db_pool,
|
&db_pool,
|
||||||
).await.map_err(Error::from)
|
).await.map_err(Error::from)
|
||||||
},
|
},
|
||||||
|
Task::MoneroPaymentMonitor => {
|
||||||
|
monero_payment_monitor_task(&config, &db_pool).await
|
||||||
|
},
|
||||||
};
|
};
|
||||||
task_result.unwrap_or_else(|err| {
|
task_result.unwrap_or_else(|err| {
|
||||||
log::error!("{:?}: {}", task, err);
|
log::error!("{:?}: {}", task, err);
|
||||||
|
|
Loading…
Reference in a new issue