Fuck blockchain

This commit is contained in:
Rafael Caricio 2023-04-08 21:20:12 +02:00
parent cdb728a70a
commit 5ef024d923
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
35 changed files with 730 additions and 3574 deletions

1880
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -50,8 +50,6 @@ env_logger = { version = "0.9.0", default-features = false }
ed25519-dalek = "1.0.1"
ed25519 = "1.5.3"
blake2 = "0.10.5"
# Used to query Monero node
monero-rpc = "0.3.2"
# Used to determine the number of CPUs on the system
num_cpus = "1.13.0"
# Used for working with regular expressions
@ -60,8 +58,6 @@ regex = "1.6.0"
reqwest = { version = "0.11.13", features = ["json", "multipart", "socks"] }
# Used for working with RSA keys
rsa = "0.5.0"
# Used for working with ethereum keys
secp256k1 = { version = "0.21.3", features = ["rand", "rand-std"] }
# Used for serialization/deserialization
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.89"
@ -77,8 +73,6 @@ tokio = { version = "1.20.4", features = ["macros"] }
url = "2.2.2"
# Used to work with UUIDs
uuid = { version = "1.1.2", features = ["serde", "v4"] }
# Used to query ethereum node
web3 = { version = "0.18.0", default-features = false, features = ["http", "http-tls", "signing"] }
[dev-dependencies]
mitra-config = { path = "mitra-config", features = ["test-utils"] }
@ -88,7 +82,4 @@ mitra-utils = { path = "mitra-utils", features = ["test-utils"] }
serial_test = "0.7.0"
[features]
ethereum-extras = []
fep-e232 = []
production = ["mitra-config/production"]

View file

@ -8,27 +8,19 @@ Features:
- Micro-blogging service (includes support for quote posts, custom emojis and more).
- Mastodon API.
- Content subscription service. Subscriptions provide a way to receive monthly payments from subscribers and to publish private content made exclusively for them.
- Supported payment methods: [Monero](https://www.getmonero.org/get-started/what-is-monero/) and [ERC-20](https://ethereum.org/en/developers/docs/standards/tokens/erc-20/) tokens (on Ethereum and other EVM-compatible blockchains).
- [Sign-in with a wallet](https://eips.ethereum.org/EIPS/eip-4361).
- Donation buttons.
- Account migrations (from one server to another). Identity can be detached from the server.
- Federation over Tor.
Follow: [@mitra@mitra.social](https://mitra.social/@mitra)
Matrix chat: [#mitra:halogen.city](https://matrix.to/#/#mitra:halogen.city)
## Instances
- [FediList](http://demo.fedilist.com/instance?software=mitra)
- [Fediverse Observer](https://mitra.fediverse.observer/list)
- [FediList](http://demo.fedilist.com/instance?software=reef)
- [Fediverse Observer](https://reef.fediverse.observer/list)
Demo instance: https://public.mitra.social/ ([invite-only](https://public.mitra.social/about))
Demo instance: https://nullpointer.social/ ([invite-only](https://nullpointer.social/about))
## Code
Server: https://codeberg.org/silverpill/mitra (this repo)
Server: (this repo)
Web client: https://codeberg.org/silverpill/mitra-web

View file

@ -1,16 +0,0 @@
# https://monerodocs.org/interacting/monero-wallet-rpc-reference/
daemon-address=http://example.tld:18081
untrusted-daemon=1
non-interactive=1
rpc-bind-port=18082
disable-rpc-login=1
wallet-dir=/var/lib/monero-wallet/wallet
log-file=/var/lib/monero-wallet/wallet-rpc.log
log-level=0
max-log-file-size=10000000
max-log-files=50

View file

@ -8,20 +8,11 @@ use mitra::activitypub::{
builders::delete_person::prepare_delete_person,
fetcher::fetchers::fetch_actor,
};
use mitra::ethereum::{
signatures::generate_ecdsa_key,
sync::save_current_block_number,
utils::key_to_ethereum_address,
};
use mitra::media::{
remove_files,
remove_media,
MediaStorage,
};
use mitra::monero::{
helpers::check_expired_invoice,
wallet::create_monero_wallet,
};
use mitra::validators::emojis::EMOJI_LOCAL_MAX_SIZE;
use mitra_config::Config;
use mitra_models::{
@ -114,12 +105,7 @@ pub struct GenerateEthereumAddress;
impl GenerateEthereumAddress {
pub fn execute(&self) -> () {
let private_key = generate_ecdsa_key();
let address = key_to_ethereum_address(&private_key);
println!(
"address {:?}; private key {}",
address, private_key.display_secret(),
);
println!("dummy");
}
}
@ -526,9 +512,8 @@ impl UpdateCurrentBlock {
pub async fn execute(
&self,
_config: &Config,
db_client: &impl DatabaseClient,
_db_client: &impl DatabaseClient,
) -> Result<(), Error> {
save_current_block_number(db_client, self.number).await?;
println!("current block updated");
Ok(())
}
@ -565,16 +550,8 @@ pub struct CreateMoneroWallet {
impl CreateMoneroWallet {
pub async fn execute(
&self,
config: &Config,
_config: &Config,
) -> Result<(), Error> {
let monero_config = config.blockchain()
.and_then(|conf| conf.monero_config())
.ok_or(anyhow!("monero configuration not found"))?;
create_monero_wallet(
monero_config,
self.name.clone(),
self.password.clone(),
).await?;
println!("wallet created");
Ok(())
}
@ -589,17 +566,9 @@ pub struct CheckExpiredInvoice {
impl CheckExpiredInvoice {
pub async fn execute(
&self,
config: &Config,
db_client: &impl DatabaseClient,
_config: &Config,
_db_client: &impl DatabaseClient,
) -> Result<(), Error> {
let monero_config = config.blockchain()
.and_then(|conf| conf.monero_config())
.ok_or(anyhow!("monero configuration not found"))?;
check_expired_invoice(
monero_config,
db_client,
&self.id,
).await?;
Ok(())
}
}

View file

@ -13,7 +13,6 @@ use crate::activitypub::vocabulary::{
PROPERTY_VALUE,
};
use crate::errors::ValidationError;
use crate::ethereum::identity::verify_eip191_signature;
use crate::identity::{
claims::create_identity_claim,
minisign::{
@ -79,15 +78,8 @@ pub fn parse_identity_proof(
&signature_bin,
).map_err(|_| ValidationError("invalid identity proof"))?;
},
Did::Pkh(ref did_pkh) => {
if !matches!(proof_type, IdentityProofType::LegacyEip191IdentityProof) {
Did::Pkh(ref _did_pkh) => {
return Err(ValidationError("incorrect proof type"));
};
verify_eip191_signature(
did_pkh,
&message,
signature,
).map_err(|_| ValidationError("invalid identity proof"))?;
},
};
let proof = IdentityProof {

View file

@ -1,10 +0,0 @@
use web3::{
api::Web3,
transports::Http,
};
pub fn connect(json_rpc_url: &str) -> Result<Web3<Http>, web3::Error> {
let transport = Http::new(json_rpc_url)?;
let connection = Web3::new(transport);
Ok(connection)
}

View file

@ -1,206 +0,0 @@
use std::fs;
use std::path::Path;
use web3::{
api::Web3,
contract::{Contract, Error as ContractError, Options},
ethabi,
transports::Http,
};
use mitra_config::EthereumConfig;
use mitra_models::database::DatabaseClient;
use super::api::connect;
use super::errors::EthereumError;
use super::sync::{
get_current_block_number,
SyncState,
};
use super::utils::parse_address;
const ERC165: &str = "IERC165";
const GATE: &str = "IGate";
const MINTER: &str = "IMinter";
const SUBSCRIPTION_ADAPTER: &str = "ISubscriptionAdapter";
const SUBSCRIPTION: &str = "ISubscription";
const ERC721: &str = "IERC721Metadata";
#[derive(thiserror::Error, Debug)]
pub enum ArtifactError {
#[error("io error")]
IoError(#[from] std::io::Error),
#[error("json error")]
JsonError(#[from] serde_json::Error),
#[error("key error")]
KeyError,
#[error(transparent)]
AbiError(#[from] ethabi::Error),
}
fn load_abi(
contract_dir: &Path,
contract_name: &str,
) -> Result<ethabi::Contract, ArtifactError> {
let artifact_path = contract_dir.join(format!("{}.json", contract_name));
let artifact = fs::read_to_string(artifact_path)?;
let artifact_value: serde_json::Value =
serde_json::from_str(&artifact)?;
let abi_json = artifact_value.get("abi")
.ok_or(ArtifactError::KeyError)?
.to_string();
let abi = ethabi::Contract::load(abi_json.as_bytes())?;
Ok(abi)
}
// https://eips.ethereum.org/EIPS/eip-165
// Interface identifier is the XOR of all function selectors in the interface
fn interface_signature(interface: &ethabi::Contract) -> [u8; 4] {
interface.functions()
.map(|func| func.short_signature())
.fold([0; 4], |mut acc, item| {
for i in 0..4 {
acc[i] ^= item[i];
};
acc
})
}
/// Returns true if contract supports interface (per ERC-165)
async fn is_interface_supported(
contract: &Contract<Http>,
interface: &ethabi::Contract,
) -> Result<bool, ContractError> {
let signature = interface_signature(interface);
contract.query(
"supportsInterface",
(signature,), None, Options::default(), None,
).await
}
#[derive(Clone)]
pub struct ContractSet {
pub web3: Web3<Http>,
pub gate: Option<Contract<Http>>,
pub collectible: Option<Contract<Http>>,
pub subscription: Option<Contract<Http>>,
pub subscription_adapter: Option<Contract<Http>>,
}
#[derive(Clone)]
pub struct Blockchain {
pub config: EthereumConfig,
pub contract_set: ContractSet,
pub sync_state: SyncState,
}
pub async fn get_contracts(
db_client: &impl DatabaseClient,
config: &EthereumConfig,
storage_dir: &Path,
) -> Result<Blockchain, EthereumError> {
let web3 = connect(&config.api_url)?;
let chain_id = web3.eth().chain_id().await?;
if chain_id != config.ethereum_chain_id().into() {
return Err(EthereumError::ImproperlyConfigured("incorrect chain ID"));
};
let adapter_address = parse_address(&config.contract_address)?;
let erc165_abi = load_abi(&config.contract_dir, ERC165)?;
let erc165 = Contract::new(
web3.eth(),
adapter_address,
erc165_abi,
);
let mut maybe_gate = None;
let mut maybe_collectible = None;
let mut maybe_subscription = None;
let mut maybe_subscription_adapter = None;
let mut sync_targets = vec![];
let gate_abi = load_abi(&config.contract_dir, GATE)?;
if is_interface_supported(&erc165, &gate_abi).await? {
let gate = Contract::new(
web3.eth(),
adapter_address,
gate_abi,
);
maybe_gate = Some(gate);
log::info!("found gate interface");
};
let minter_abi = load_abi(&config.contract_dir, MINTER)?;
if cfg!(feature = "ethereum-extras") &&
is_interface_supported(&erc165, &minter_abi).await?
{
let minter = Contract::new(
web3.eth(),
adapter_address,
minter_abi,
);
log::info!("found minter interface");
let collectible_address = minter.query(
"collectible",
(), None, Options::default(), None,
).await?;
let collectible_abi = load_abi(&config.contract_dir, ERC721)?;
let collectible = Contract::new(
web3.eth(),
collectible_address,
collectible_abi,
);
log::info!("collectible item contract address is {:?}", collectible.address());
sync_targets.push(collectible.address());
maybe_collectible = Some(collectible);
};
let subscription_adapter_abi = load_abi(&config.contract_dir, SUBSCRIPTION_ADAPTER)?;
if is_interface_supported(&erc165, &subscription_adapter_abi).await? {
let subscription_adapter = Contract::new(
web3.eth(),
adapter_address,
subscription_adapter_abi,
);
log::info!("found subscription interface");
let subscription_address = subscription_adapter.query(
"subscription",
(), None, Options::default(), None,
).await?;
let subscription_abi = load_abi(&config.contract_dir, SUBSCRIPTION)?;
let subscription = Contract::new(
web3.eth(),
subscription_address,
subscription_abi,
);
log::info!("subscription contract address is {:?}", subscription.address());
sync_targets.push(subscription.address());
maybe_subscription = Some(subscription);
maybe_subscription_adapter = Some(subscription_adapter);
};
let current_block = get_current_block_number(db_client, &web3, storage_dir).await?;
let sync_state = SyncState::new(
current_block,
sync_targets,
config.chain_sync_step,
config.chain_reorg_max_depth,
);
let contract_set = ContractSet {
web3,
gate: maybe_gate,
collectible: maybe_collectible,
subscription: maybe_subscription,
subscription_adapter: maybe_subscription_adapter,
};
Ok(Blockchain {
config: config.clone(),
contract_set,
sync_state,
})
}

View file

@ -1,80 +0,0 @@
/// Sign-In with Ethereum https://eips.ethereum.org/EIPS/eip-4361
use hex::FromHex;
use siwe::Message;
use web3::types::H160;
use crate::errors::ValidationError;
use super::utils::address_to_string;
/// Verifies EIP-4361 signature and returns wallet address
pub fn verify_eip4361_signature(
message: &str,
signature: &str,
instance_hostname: &str,
login_message: &str,
) -> Result<String, ValidationError> {
let message: Message = message.parse()
.map_err(|_| ValidationError("invalid EIP-4361 message"))?;
let signature_bytes = <[u8; 65]>::from_hex(signature.trim_start_matches("0x"))
.map_err(|_| ValidationError("invalid signature string"))?;
if message.domain != instance_hostname {
return Err(ValidationError("domain doesn't match instance hostname"));
};
let statement = message.statement.as_ref()
.ok_or(ValidationError("statement is missing"))?;
if statement != login_message {
return Err(ValidationError("statement doesn't match login message"));
};
if !message.valid_now() {
return Err(ValidationError("message is not currently valid"));
};
if message.not_before.is_some() || message.expiration_time.is_some() {
return Err(ValidationError("message shouldn't have expiration time"));
};
message.verify_eip191(&signature_bytes)
.map_err(|_| ValidationError("invalid signature"))?;
// Return wallet address in lower case
let wallet_address = address_to_string(H160(message.address));
Ok(wallet_address)
}
#[cfg(test)]
mod tests {
use super::*;
const INSTANCE_HOSTNAME: &str = "example.com";
const LOGIN_MESSAGE: &str = "test";
#[test]
fn test_verify_eip4361_signature() {
let message = "example.com wants you to sign in with your Ethereum account:
0x70997970C51812dc3A010C7d01b50e0d17dc79C8
test
URI: https://example.com
Version: 1
Chain ID: 1
Nonce: 3cb7760eac2f
Issued At: 2022-02-14T22:27:35.500Z";
let signature = "0x9059c9a69c31e87d887262a574abcc33f320d5b778bea8a35c6fbdea94a17e9652b99f7cdd146ed67fa8e4bb02462774b958a129c421fe8d743a43bf67dcbcd61c";
let wallet_address = verify_eip4361_signature(
message, signature,
INSTANCE_HOSTNAME,
LOGIN_MESSAGE,
).unwrap();
assert_eq!(wallet_address, "0x70997970c51812dc3a010c7d01b50e0d17dc79c8");
}
#[test]
fn test_verify_eip4361_signature_invalid() {
let message = "abc";
let signature = "xyz";
let error = verify_eip4361_signature(
message, signature,
INSTANCE_HOSTNAME,
LOGIN_MESSAGE,
).unwrap_err();
assert_eq!(error.to_string(), "invalid EIP-4361 message");
}
}

View file

@ -1,38 +0,0 @@
use mitra_models::database::DatabaseError;
use super::contracts::ArtifactError;
use super::signatures::SignatureError;
use super::utils::AddressError;
#[derive(thiserror::Error, Debug)]
pub enum EthereumError {
#[error("{0}")]
ImproperlyConfigured(&'static str),
#[error("invalid address")]
InvalidAddress(#[from] AddressError),
#[error(transparent)]
Web3Error(#[from] web3::Error),
#[error("artifact error")]
ArtifactError(#[from] ArtifactError),
#[error("abi error")]
AbiError(#[from] web3::ethabi::Error),
#[error("contract error")]
ContractError(#[from] web3::contract::Error),
#[error("data conversion error")]
ConversionError,
#[error(transparent)]
DatabaseError(#[from] DatabaseError),
#[error("signature error")]
SignatureError(#[from] SignatureError),
#[error("{0}")]
OtherError(&'static str),
}

View file

@ -1,19 +0,0 @@
use web3::{
contract::{Contract, Options},
transports::Http,
};
use super::errors::EthereumError;
use super::utils::parse_address;
pub async fn is_allowed_user(
gate: &Contract<Http>,
user_address: &str,
) -> Result<bool, EthereumError> {
let user_address = parse_address(user_address)?;
let result: bool = gate.query(
"isAllowedUser", (user_address,),
None, Options::default(), None,
).await?;
Ok(result)
}

View file

@ -1,56 +0,0 @@
use mitra_utils::did_pkh::DidPkh;
use super::signatures::{recover_address, SignatureError};
use super::utils::address_to_string;
#[derive(thiserror::Error, Debug)]
pub enum Eip191VerificationError {
#[error(transparent)]
InvalidSignature(#[from] SignatureError),
#[error("invalid signer")]
InvalidSigner,
}
pub fn verify_eip191_signature(
did: &DidPkh,
message: &str,
signature_hex: &str,
) -> Result<(), Eip191VerificationError> {
let signature_data = signature_hex.parse()?;
let signer = recover_address(message.as_bytes(), &signature_data)?;
if address_to_string(signer) != did.address.to_lowercase() {
return Err(Eip191VerificationError::InvalidSigner);
};
Ok(())
}
#[cfg(test)]
mod tests {
use web3::signing::{Key, SecretKeyRef};
use mitra_utils::currencies::Currency;
use crate::ethereum::{
signatures::{
generate_ecdsa_key,
sign_message,
},
utils::address_to_string,
};
use super::*;
const ETHEREUM: Currency = Currency::Ethereum;
#[test]
fn test_verify_eip191_signature() {
let message = "test";
let secret_key = generate_ecdsa_key();
let secret_key_ref = SecretKeyRef::new(&secret_key);
let secret_key_str = secret_key.display_secret().to_string();
let address = address_to_string(secret_key_ref.address());
let did = DidPkh::from_address(&ETHEREUM, &address);
let signature = sign_message(&secret_key_str, message.as_bytes())
.unwrap().to_string();
let result = verify_eip191_signature(&did, message, &signature);
assert_eq!(result.is_ok(), true);
}
}

View file

@ -1,13 +0,0 @@
mod api;
pub mod contracts;
pub mod eip4361;
mod errors;
pub mod gate;
pub mod identity;
pub mod signatures;
pub mod subscriptions;
pub mod sync;
pub mod utils;
#[cfg(feature = "ethereum-extras")]
pub mod nft;

View file

@ -1,168 +0,0 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use web3::{
api::Web3,
contract::{Contract, Options},
ethabi::RawLog,
transports::Http,
types::{BlockNumber, FilterBuilder},
};
use mitra_config::EthereumConfig;
use mitra_models::{
database::{get_database_client, DatabaseError, DbPool},
posts::queries::{
get_post_by_ipfs_cid,
get_token_waitlist,
set_post_token_id,
set_post_token_tx_id,
},
properties::queries::{
get_internal_property,
set_internal_property,
},
};
use crate::ipfs::utils::parse_ipfs_url;
use super::errors::EthereumError;
use super::signatures::{sign_contract_call, CallArgs, SignatureData};
use super::sync::SyncState;
use super::utils::parse_address;
const TOKEN_WAITLIST_MAP_PROPERTY_NAME: &str = "token_waitlist_map";
const TOKEN_WAIT_TIME: i64 = 10; // in minutes
const TOKEN_WAIT_RESET_TIME: i64 = 12 * 60; // in minutes
/// Finds posts awaiting tokenization
/// and looks for corresponding Mint events
pub async fn process_nft_events(
web3: &Web3<Http>,
contract: &Contract<Http>,
sync_state: &mut SyncState,
db_pool: &DbPool,
) -> Result<(), EthereumError> {
let db_client = &**get_database_client(db_pool).await?;
// Create/update token waitlist map
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> =
get_internal_property(db_client, TOKEN_WAITLIST_MAP_PROPERTY_NAME)
.await?.unwrap_or_default();
token_waitlist_map.retain(|_, waiting_since| {
// Re-add token to waitlist if waiting for too long
let duration = Utc::now() - *waiting_since;
duration.num_minutes() < TOKEN_WAIT_RESET_TIME
});
let token_waitlist = get_token_waitlist(db_client).await?;
for post_id in token_waitlist {
if !token_waitlist_map.contains_key(&post_id) {
token_waitlist_map.insert(post_id, Utc::now());
};
};
let token_waitlist_active_count = token_waitlist_map.values()
.filter(|waiting_since| {
let duration = Utc::now() - **waiting_since;
duration.num_minutes() < TOKEN_WAIT_TIME
})
.count();
if token_waitlist_active_count > 0 {
log::info!(
"{} posts are waiting for confirmation of tokenization tx",
token_waitlist_active_count,
);
} else if !sync_state.is_out_of_sync(&contract.address()) {
// Don't scan blockchain if already in sync and waitlist is empty
return Ok(());
};
// Search for Transfer events
let event_abi = contract.abi().event("Transfer")?;
let (from_block, to_block) = sync_state.get_scan_range(
&contract.address(),
web3.eth().block_number().await?.as_u64(),
);
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Number(from_block.into()))
.to_block(BlockNumber::Number(to_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {
let raw_log = RawLog {
topics: log.topics.clone(),
data: log.data.clone().0,
};
let event = event_abi.parse_log(raw_log)?;
let from_address = event.params[0].value.clone().into_address()
.ok_or(EthereumError::ConversionError)?;
if from_address.is_zero() {
// Mint event found
let token_id_u256 = event.params[2].value.clone().into_uint()
.ok_or(EthereumError::ConversionError)?;
let token_uri: String = contract.query(
"tokenURI", (token_id_u256,),
None, Options::default(), None,
).await?;
let tx_id_h256 = log.transaction_hash
.ok_or(EthereumError::ConversionError)?;
let tx_id = hex::encode(tx_id_h256.as_bytes());
let ipfs_cid = parse_ipfs_url(&token_uri)
.map_err(|_| EthereumError::ConversionError)?;
let post = match get_post_by_ipfs_cid(db_client, &ipfs_cid).await {
Ok(post) => post,
Err(DatabaseError::NotFound(_)) => {
// Post was deleted
continue;
},
Err(err) => {
// Unexpected error
log::error!("{}", err);
continue;
},
};
if post.token_id.is_none() {
log::info!("post {} was tokenized via {}", post.id, tx_id);
let token_id: i32 = token_id_u256.try_into()
.map_err(|_| EthereumError::ConversionError)?;
set_post_token_id(db_client, &post.id, token_id).await?;
if post.token_tx_id.as_ref() != Some(&tx_id) {
log::warn!("overwriting incorrect tx id {:?}", post.token_tx_id);
set_post_token_tx_id(db_client, &post.id, &tx_id).await?;
};
token_waitlist_map.remove(&post.id);
};
};
};
set_internal_property(
db_client,
TOKEN_WAITLIST_MAP_PROPERTY_NAME,
&token_waitlist_map,
).await?;
sync_state.update(db_client, &contract.address(), to_block).await?;
Ok(())
}
pub fn create_mint_signature(
blockchain_config: &EthereumConfig,
user_address: &str,
token_uri: &str,
) -> Result<SignatureData, EthereumError> {
let user_address = parse_address(user_address)?;
let call_args: CallArgs = vec![
Box::new(user_address),
Box::new(token_uri.to_string()),
];
let signature = sign_contract_call(
&blockchain_config.signing_key,
blockchain_config.ethereum_chain_id(),
&blockchain_config.contract_address,
"mint",
call_args,
)?;
Ok(signature)
}

View file

@ -1,221 +0,0 @@
use std::convert::TryInto;
use std::str::FromStr;
use secp256k1::{Error as KeyError, SecretKey, rand::rngs::OsRng};
use serde::Serialize;
use web3::ethabi::{token::Token, encode as encode_tokens};
use web3::signing::{
keccak256,
recover,
Key,
RecoveryError,
SecretKeyRef,
SigningError,
};
use web3::types::{Address, H256, Recovery};
/// Generates signing key
pub fn generate_ecdsa_key() -> SecretKey {
let mut rng = OsRng::new().expect("failed to initialize RNG");
SecretKey::new(&mut rng)
}
#[derive(Serialize)]
pub struct SignatureData {
pub v: u64,
#[serde(serialize_with = "hex::serde::serialize")]
pub r: [u8; 32],
#[serde(serialize_with = "hex::serde::serialize")]
pub s: [u8; 32],
}
#[derive(thiserror::Error, Debug)]
pub enum SignatureError {
#[error("invalid key")]
InvalidKey(#[from] KeyError),
#[error("invalid data")]
InvalidData,
#[error("signing error")]
SigningError(#[from] SigningError),
#[error("invalid signature")]
InvalidSignature,
#[error("recovery error")]
RecoveryError(#[from] RecoveryError),
}
impl ToString for SignatureData {
fn to_string(&self) -> String {
let mut bytes = Vec::with_capacity(65);
bytes.extend_from_slice(&self.r);
bytes.extend_from_slice(&self.s);
let v: u8 = self.v.try_into()
.expect("signature recovery in electrum notation always fits in a u8");
bytes.push(v);
hex::encode(bytes)
}
}
impl FromStr for SignatureData {
type Err = SignatureError;
fn from_str(value_hex: &str) -> Result<Self, Self::Err> {
let mut bytes = [0u8; 65];
hex::decode_to_slice(value_hex, &mut bytes)
.map_err(|_| Self::Err::InvalidSignature)?;
let v = bytes[64].into();
let r = bytes[0..32].try_into()
.map_err(|_| Self::Err::InvalidSignature)?;
let s = bytes[32..64].try_into()
.map_err(|_| Self::Err::InvalidSignature)?;
let signature_data = Self { v, r, s };
Ok(signature_data)
}
}
fn prepare_message(message: &[u8]) -> [u8; 32] {
let eip_191_message = [
"\x19Ethereum Signed Message:\n".as_bytes(),
message.len().to_string().as_bytes(),
message,
].concat();
let eip_191_message_hash = keccak256(&eip_191_message);
eip_191_message_hash
}
/// Create EIP-191 signature
/// https://eips.ethereum.org/EIPS/eip-191
pub fn sign_message(
signing_key: &str,
message: &[u8],
) -> Result<SignatureData, SignatureError> {
let key = SecretKey::from_str(signing_key)?;
let key_ref = SecretKeyRef::new(&key);
let eip_191_message_hash = prepare_message(message);
// Create signature without replay protection (chain ID is None)
let signature = key_ref.sign(&eip_191_message_hash, None)?;
let signature_data = SignatureData {
v: signature.v,
r: signature.r.to_fixed_bytes(),
s: signature.s.to_fixed_bytes(),
};
Ok(signature_data)
}
/// Verify EIP-191 signature
pub fn recover_address(
message: &[u8],
signature: &SignatureData,
) -> Result<Address, SignatureError> {
let eip_191_message_hash = prepare_message(message);
let recovery = Recovery::new(
"", // this message is not used
signature.v,
H256(signature.r),
H256(signature.s),
);
let (signature_raw, recovery_id) = recovery.as_signature()
.ok_or(SignatureError::InvalidSignature)?;
let address = recover(
&eip_191_message_hash,
&signature_raw,
recovery_id,
)?;
Ok(address)
}
pub type CallArgs = Vec<Box<dyn AsRef<[u8]>>>;
pub fn encode_uint256(value: u64) -> Vec<u8> {
let token = Token::Uint(value.into());
encode_tokens(&[token])
}
pub fn sign_contract_call(
signing_key: &str,
chain_id: u32,
contract_address: &str,
method_name: &str,
method_args: CallArgs,
) -> Result<SignatureData, SignatureError> {
let chain_id_bin = encode_uint256(chain_id.into());
let contract_address = Address::from_str(contract_address)
.map_err(|_| SignatureError::InvalidData)?;
let mut message = [
&chain_id_bin,
contract_address.as_bytes(),
method_name.as_bytes(),
].concat();
for arg in method_args {
message.extend(arg.as_ref().as_ref());
};
let message_hash = keccak256(&message);
let signature = sign_message(signing_key, &message_hash)?;
Ok(signature)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_signature_string_conversion() {
let v = 28;
let r: [u8; 32] = hex::decode("b91467e570a6466aa9e9876cbcd013baba02900b8979d43fe208a4a4f339f5fd")
.unwrap().try_into().unwrap();
let s: [u8; 32] = hex::decode("6007e74cd82e037b800186422fc2da167c747ef045e5d18a5f5d4300f8e1a029")
.unwrap().try_into().unwrap();
let expected_signature =
"b91467e570a6466aa9e9876cbcd013baba02900b8979d43fe208a4a4f339f5fd6007e74cd82e037b800186422fc2da167c747ef045e5d18a5f5d4300f8e1a0291c";
let signature_data = SignatureData { v, r, s };
let signature_str = signature_data.to_string();
assert_eq!(signature_str, expected_signature);
let parsed = signature_str.parse::<SignatureData>().unwrap();
assert_eq!(parsed.v, v);
assert_eq!(parsed.r, r);
assert_eq!(parsed.s, s);
}
#[test]
fn test_signature_from_string_with_0x_prefix() {
let signature_str = "0xb91467e570a6466aa9e9876cbcd013baba02900b8979d43fe208a4a4f339f5fd6007e74cd82e037b800186422fc2da167c747ef045e5d18a5f5d4300f8e1a0291c";
let result = signature_str.parse::<SignatureData>();
assert_eq!(result.is_err(), true);
}
#[test]
fn test_sign_message() {
let signing_key = generate_ecdsa_key();
let message = "test_message";
let result = sign_message(
&signing_key.display_secret().to_string(),
message.as_bytes(),
).unwrap();
assert!(result.v == 27 || result.v == 28);
let recovered = recover_address(message.as_bytes(), &result).unwrap();
assert_eq!(recovered, SecretKeyRef::new(&signing_key).address());
}
#[test]
fn test_sign_contract_call() {
let signing_key = generate_ecdsa_key().display_secret().to_string();
let chain_id = 1;
let contract_address = "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0";
let method_name = "test";
let method_args: CallArgs = vec![Box::new("arg1"), Box::new("arg2")];
let result = sign_contract_call(
&signing_key,
chain_id,
contract_address,
method_name,
method_args,
).unwrap();
assert!(result.v == 27 || result.v == 28);
}
}

View file

@ -1,328 +0,0 @@
use std::convert::TryInto;
use chrono::{DateTime, TimeZone, Utc};
use web3::{
api::Web3,
contract::{Contract, Options},
ethabi::RawLog,
transports::Http,
types::{BlockId, BlockNumber, FilterBuilder, U256},
};
use mitra_config::{EthereumConfig, Instance};
use mitra_models::{
database::{
get_database_client,
DatabaseClient,
DatabaseError,
DbPool,
},
notifications::queries::{
create_subscription_notification,
create_subscription_expiration_notification,
},
profiles::queries::{
get_profile_by_id,
search_profiles_by_wallet_address,
},
profiles::types::DbActorProfile,
relationships::queries::unsubscribe,
subscriptions::queries::{
create_subscription,
update_subscription,
get_expired_subscriptions,
get_subscription_by_participants,
},
users::queries::{
get_user_by_id,
get_user_by_public_wallet_address,
},
users::types::User,
};
use mitra_utils::currencies::Currency;
use crate::activitypub::{
builders::{
add_person::prepare_add_person,
remove_person::prepare_remove_person,
},
identifiers::LocalActorCollection,
};
use crate::errors::ConversionError;
use super::contracts::ContractSet;
use super::errors::EthereumError;
use super::signatures::{
encode_uint256,
sign_contract_call,
CallArgs,
SignatureData,
};
use super::sync::SyncState;
use super::utils::{address_to_string, parse_address};
const ETHEREUM: Currency = Currency::Ethereum;
fn u256_to_date(value: U256) -> Result<DateTime<Utc>, ConversionError> {
let timestamp: i64 = value.try_into().map_err(|_| ConversionError)?;
let datetime = Utc.timestamp_opt(timestamp, 0)
.single()
.ok_or(ConversionError)?;
Ok(datetime)
}
pub async fn send_subscription_notifications(
db_client: &impl DatabaseClient,
instance: &Instance,
sender: &DbActorProfile,
recipient: &User,
) -> Result<(), DatabaseError> {
create_subscription_notification(
db_client,
&sender.id,
&recipient.id,
).await?;
if let Some(ref remote_sender) = sender.actor_json {
prepare_add_person(
instance,
recipient,
remote_sender,
LocalActorCollection::Subscribers,
).enqueue(db_client).await?;
};
Ok(())
}
/// Search for subscription update events
pub async fn check_ethereum_subscriptions(
config: &EthereumConfig,
instance: &Instance,
web3: &Web3<Http>,
contract: &Contract<Http>,
sync_state: &mut SyncState,
db_pool: &DbPool,
) -> Result<(), EthereumError> {
let db_client = &mut **get_database_client(db_pool).await?;
let event_abi = contract.abi().event("UpdateSubscription")?;
let (from_block, to_block) = sync_state.get_scan_range(
&contract.address(),
web3.eth().block_number().await?.as_u64(),
);
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(Some(vec![event_abi.signature()]), None, None, None)
.from_block(BlockNumber::Number(from_block.into()))
.to_block(BlockNumber::Number(to_block.into()))
.build();
let logs = web3.eth().logs(filter).await?;
for log in logs {
let block_number = if let Some(block_number) = log.block_number {
block_number
} else {
// Skips logs without block number
continue;
};
let raw_log = RawLog {
topics: log.topics.clone(),
data: log.data.clone().0,
};
let event = event_abi.parse_log(raw_log)?;
let sender_address = event.params[0].value.clone().into_address()
.map(address_to_string)
.ok_or(EthereumError::ConversionError)?;
let recipient_address = event.params[1].value.clone().into_address()
.map(address_to_string)
.ok_or(EthereumError::ConversionError)?;
let expires_at_timestamp = event.params[2].value.clone().into_uint()
.ok_or(EthereumError::ConversionError)?;
let expires_at = u256_to_date(expires_at_timestamp)
.map_err(|_| EthereumError::ConversionError)?;
let block_id = BlockId::Number(BlockNumber::Number(block_number));
let block_timestamp = web3.eth().block(block_id).await?
.ok_or(EthereumError::ConversionError)?
.timestamp;
let block_date = u256_to_date(block_timestamp)
.map_err(|_| EthereumError::ConversionError)?;
let profiles = search_profiles_by_wallet_address(
db_client,
&ETHEREUM,
&sender_address,
true, // prefer verified addresses
).await?;
let sender = match &profiles[..] {
[profile] => profile,
[] => {
// Profile not found, skip event
log::error!("unknown subscriber {}", sender_address);
continue;
},
_ => {
// Ambiguous results, skip event
log::error!(
"search returned multiple results for address {}",
sender_address,
);
continue;
},
};
let recipient = get_user_by_public_wallet_address(
db_client,
&ETHEREUM,
&recipient_address,
).await?;
match get_subscription_by_participants(
db_client,
&sender.id,
&recipient.id,
).await {
Ok(subscription) => {
if subscription.chain_id != config.chain_id {
log::error!("can't switch to another chain");
continue;
};
let current_sender_address =
subscription.sender_address.unwrap_or("''".to_string());
if current_sender_address != sender_address {
// Trust only key/address that was linked to profile
// when first subscription event occured.
// Key rotation is not supported.
log::error!(
"subscriber address changed from {} to {}",
current_sender_address,
sender_address,
);
continue;
};
if subscription.updated_at >= block_date {
// Event already processed
continue;
};
// Update subscription expiration date
update_subscription(
db_client,
subscription.id,
&expires_at,
&block_date,
).await?;
#[allow(clippy::comparison_chain)]
if expires_at > subscription.expires_at {
log::info!(
"subscription extended: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
send_subscription_notifications(
db_client,
instance,
sender,
&recipient,
).await?;
} else if expires_at < subscription.expires_at {
log::info!(
"subscription cancelled: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
};
},
Err(DatabaseError::NotFound(_)) => {
// New subscription
create_subscription(
db_client,
&sender.id,
Some(&sender_address),
&recipient.id,
&config.chain_id,
&expires_at,
&block_date,
).await?;
log::info!(
"subscription created: {0} to {1}",
sender.id,
recipient.id,
);
send_subscription_notifications(
db_client,
instance,
sender,
&recipient,
).await?;
},
Err(other_error) => return Err(other_error.into()),
};
};
sync_state.update(db_client, &contract.address(), to_block).await?;
Ok(())
}
pub async fn update_expired_subscriptions(
instance: &Instance,
db_pool: &DbPool,
) -> Result<(), EthereumError> {
let db_client = &mut **get_database_client(db_pool).await?;
for subscription in get_expired_subscriptions(db_client).await? {
// Remove relationship
unsubscribe(db_client, &subscription.sender_id, &subscription.recipient_id).await?;
log::info!(
"subscription expired: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
let sender = get_profile_by_id(db_client, &subscription.sender_id).await?;
if let Some(ref remote_sender) = sender.actor_json {
let recipient = get_user_by_id(db_client, &subscription.recipient_id).await?;
prepare_remove_person(
instance,
&recipient,
remote_sender,
LocalActorCollection::Subscribers,
).enqueue(db_client).await?;
} else {
create_subscription_expiration_notification(
db_client,
&subscription.recipient_id,
&subscription.sender_id,
).await?;
};
};
Ok(())
}
pub fn create_subscription_signature(
blockchain_config: &EthereumConfig,
user_address: &str,
price: u64,
) -> Result<SignatureData, EthereumError> {
let user_address = parse_address(user_address)?;
let call_args: CallArgs = vec![
Box::new(user_address),
Box::new(encode_uint256(price)),
];
let signature = sign_contract_call(
&blockchain_config.signing_key,
blockchain_config.ethereum_chain_id(),
&blockchain_config.contract_address,
"configureSubscription",
call_args,
)?;
Ok(signature)
}
pub async fn is_registered_recipient(
contract_set: &ContractSet,
user_address: &str,
) -> Result<bool, EthereumError> {
let adapter = match &contract_set.subscription_adapter {
Some(contract) => contract,
None => return Ok(false),
};
let user_address = parse_address(user_address)?;
let result: bool = adapter.query(
"isSubscriptionConfigured", (user_address,),
None, Options::default(), None,
).await?;
Ok(result)
}

View file

@ -1,176 +0,0 @@
use std::collections::HashMap;
use std::path::Path;
use web3::{api::Web3, transports::Http, types::Address};
use mitra_models::{
database::DatabaseClient,
properties::queries::{
get_internal_property,
set_internal_property,
},
};
use super::errors::EthereumError;
const BLOCK_NUMBER_FILE_NAME: &str = "current_block";
const CURRENT_BLOCK_PROPERTY_NAME: &str = "ethereum_current_block";
pub async fn save_current_block_number(
db_client: &impl DatabaseClient,
block_number: u64,
) -> Result<(), EthereumError> {
set_internal_property(
db_client,
CURRENT_BLOCK_PROPERTY_NAME,
&block_number,
).await?;
Ok(())
}
async fn read_current_block_number(
db_client: &impl DatabaseClient,
storage_dir: &Path,
) -> Result<Option<u64>, EthereumError> {
let maybe_block_number = get_internal_property(
db_client,
CURRENT_BLOCK_PROPERTY_NAME,
).await?;
if maybe_block_number.is_some() {
return Ok(maybe_block_number);
};
// Try to read from file if internal property is not set
let file_path = storage_dir.join(BLOCK_NUMBER_FILE_NAME);
let maybe_block_number = if file_path.exists() {
let block_number: u64 = std::fs::read_to_string(&file_path)
.map_err(|_| EthereumError::OtherError("failed to read current block"))?
.parse()
.map_err(|_| EthereumError::OtherError("failed to parse block number"))?;
Some(block_number)
} else {
None
};
Ok(maybe_block_number)
}
pub async fn get_current_block_number(
db_client: &impl DatabaseClient,
web3: &Web3<Http>,
storage_dir: &Path,
) -> Result<u64, EthereumError> {
let block_number = match read_current_block_number(db_client, storage_dir).await? {
Some(block_number) => block_number,
None => {
// Save block number when connecting to the node for the first time
let block_number = web3.eth().block_number().await?.as_u64();
save_current_block_number(db_client, block_number).await?;
block_number
},
};
Ok(block_number)
}
#[derive(Clone)]
pub struct SyncState {
pub current_block: u64,
contracts: HashMap<Address, u64>,
sync_step: u64,
reorg_max_depth: u64,
}
impl SyncState {
pub fn new(
current_block: u64,
contracts: Vec<Address>,
sync_step: u64,
reorg_max_depth: u64,
) -> Self {
log::info!("current block is {}", current_block);
let mut contract_map = HashMap::new();
for address in contracts {
contract_map.insert(address, current_block);
};
Self {
current_block,
contracts: contract_map,
sync_step,
reorg_max_depth,
}
}
pub fn get_scan_range(
&self,
contract_address: &Address,
latest_block: u64,
) -> (u64, u64) {
let current_block = self.contracts[contract_address];
// Take reorgs into account
let latest_safe_block = latest_block.saturating_sub(self.reorg_max_depth);
let next_block = std::cmp::min(
current_block + self.sync_step,
latest_safe_block,
);
// Next block should not be less than current block
let next_block = std::cmp::max(current_block, next_block);
(current_block, next_block)
}
pub fn is_out_of_sync(&self, contract_address: &Address) -> bool {
if let Some(max_value) = self.contracts.values().max().copied() {
if self.contracts[contract_address] == max_value {
return false;
};
};
true
}
pub async fn update(
&mut self,
db_client: &impl DatabaseClient,
contract_address: &Address,
block_number: u64,
) -> Result<(), EthereumError> {
self.contracts.insert(*contract_address, block_number);
if let Some(min_value) = self.contracts.values().min().copied() {
if min_value > self.current_block {
self.current_block = min_value;
save_current_block_number(db_client, self.current_block).await?;
log::info!("synced to block {}", self.current_block);
};
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_scan_range_from_zero() {
let address = Address::default();
let sync_state = SyncState::new(
0,
vec![address.clone()],
100,
10,
);
let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 0);
assert_eq!(to_block, 100);
}
#[test]
fn test_get_scan_range() {
let address = Address::default();
let sync_state = SyncState::new(
500,
vec![address.clone()],
100,
10,
);
let (from_block, to_block) = sync_state.get_scan_range(&address, 555);
assert_eq!(from_block, 500);
assert_eq!(to_block, 545);
}
}

View file

@ -1,62 +0,0 @@
use std::str::FromStr;
use regex::Regex;
use secp256k1::SecretKey;
use web3::{
signing::Key,
types::Address,
};
use crate::errors::ValidationError;
pub fn key_to_ethereum_address(private_key: &SecretKey) -> Address {
private_key.address()
}
#[derive(thiserror::Error, Debug)]
#[error("address error")]
pub struct AddressError;
pub fn parse_address(address: &str) -> Result<Address, AddressError> {
Address::from_str(address).map_err(|_| AddressError)
}
/// Converts address object to lowercase hex string
pub fn address_to_string(address: Address) -> String {
format!("{:#x}", address)
}
pub fn validate_ethereum_address(
wallet_address: &str,
) -> Result<(), ValidationError> {
let address_regexp = Regex::new(r"^0x[a-fA-F0-9]{40}$").unwrap();
if !address_regexp.is_match(wallet_address) {
return Err(ValidationError("invalid address"));
};
// Address should be lowercase
if wallet_address.to_lowercase() != wallet_address {
return Err(ValidationError("address is not lowercase"));
};
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_ethereum_address() {
let result_1 = validate_ethereum_address("0xab5801a7d398351b8be11c439e05c5b3259aec9b");
assert_eq!(result_1.is_ok(), true);
let result_2 = validate_ethereum_address("ab5801a7d398351b8be11c439e05c5b3259aec9b");
assert_eq!(
result_2.err().unwrap().to_string(),
"invalid address",
);
let result_3 = validate_ethereum_address("0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B");
assert_eq!(
result_3.err().unwrap().to_string(),
"address is not lowercase",
);
}
}

View file

@ -20,92 +20,7 @@ use crate::activitypub::queues::{
process_queued_incoming_activities,
process_queued_outgoing_activities,
};
use crate::ethereum::{
contracts::Blockchain,
subscriptions::{
check_ethereum_subscriptions,
update_expired_subscriptions,
},
};
use crate::media::remove_media;
use crate::monero::subscriptions::check_monero_subscriptions;
#[cfg(feature = "ethereum-extras")]
use crate::ethereum::nft::process_nft_events;
#[cfg(feature = "ethereum-extras")]
pub async fn nft_monitor(
maybe_blockchain: Option<&mut Blockchain>,
db_pool: &DbPool,
) -> Result<(), Error> {
let blockchain = match maybe_blockchain {
Some(blockchain) => blockchain,
None => return Ok(()),
};
let collectible = match &blockchain.contract_set.collectible {
Some(contract) => contract,
None => return Ok(()), // feature not enabled
};
process_nft_events(
&blockchain.contract_set.web3,
collectible,
&mut blockchain.sync_state,
db_pool,
).await?;
Ok(())
}
pub async fn ethereum_subscription_monitor(
config: &Config,
maybe_blockchain: Option<&mut Blockchain>,
db_pool: &DbPool,
) -> Result<(), Error> {
let blockchain = match maybe_blockchain {
Some(blockchain) => blockchain,
None => return Ok(()),
};
let subscription = match &blockchain.contract_set.subscription {
Some(contract) => contract,
None => return Ok(()), // feature not enabled
};
check_ethereum_subscriptions(
&blockchain.config,
&config.instance(),
&blockchain.contract_set.web3,
subscription,
&mut blockchain.sync_state,
db_pool,
).await.map_err(Error::from)
}
pub async fn subscription_expiration_monitor(
config: &Config,
db_pool: &DbPool,
) -> Result<(), Error> {
update_expired_subscriptions(
&config.instance(),
db_pool,
).await?;
Ok(())
}
pub async fn monero_payment_monitor(
config: &Config,
db_pool: &DbPool,
) -> Result<(), Error> {
let maybe_monero_config = config.blockchain()
.and_then(|conf| conf.monero_config());
let monero_config = match maybe_monero_config {
Some(monero_config) => monero_config,
None => return Ok(()), // not configured
};
check_monero_subscriptions(
&config.instance(),
monero_config,
db_pool,
).await?;
Ok(())
}
pub async fn incoming_activity_queue_executor(
config: &Config,

View file

@ -6,39 +6,28 @@ use chrono::{DateTime, Utc};
use mitra_config::Config;
use mitra_models::database::DbPool;
use crate::ethereum::contracts::Blockchain;
use super::periodic_tasks::*;
#[derive(Debug, Eq, Hash, PartialEq)]
enum PeriodicTask {
EthereumSubscriptionMonitor,
SubscriptionExpirationMonitor,
MoneroPaymentMonitor,
IncomingActivityQueueExecutor,
OutgoingActivityQueueExecutor,
DeleteExtraneousPosts,
DeleteEmptyProfiles,
PruneRemoteEmojis,
#[cfg(feature = "ethereum-extras")]
NftMonitor,
}
impl PeriodicTask {
/// Returns task period (in seconds)
fn period(&self) -> i64 {
match self {
Self::EthereumSubscriptionMonitor => 300,
Self::SubscriptionExpirationMonitor => 300,
Self::MoneroPaymentMonitor => 30,
Self::IncomingActivityQueueExecutor => 5,
Self::OutgoingActivityQueueExecutor => 5,
Self::DeleteExtraneousPosts => 3600,
Self::DeleteEmptyProfiles => 3600,
Self::PruneRemoteEmojis => 3600,
#[cfg(feature = "ethereum-extras")]
Self::NftMonitor => 30,
}
}
@ -55,20 +44,14 @@ impl PeriodicTask {
pub fn run(
config: Config,
mut maybe_blockchain: Option<Blockchain>,
db_pool: DbPool,
) -> () {
tokio::spawn(async move {
let mut scheduler_state = HashMap::from([
(PeriodicTask::EthereumSubscriptionMonitor, None),
(PeriodicTask::SubscriptionExpirationMonitor, None),
(PeriodicTask::MoneroPaymentMonitor, None),
(PeriodicTask::IncomingActivityQueueExecutor, None),
(PeriodicTask::OutgoingActivityQueueExecutor, None),
(PeriodicTask::PruneRemoteEmojis, None),
#[cfg(feature = "ethereum-extras")]
(PeriodicTask::NftMonitor, None),
]);
if config.retention.extraneous_posts.is_some() {
scheduler_state.insert(PeriodicTask::DeleteExtraneousPosts, None);
@ -86,19 +69,6 @@ pub fn run(
continue;
};
let task_result = match task {
PeriodicTask::EthereumSubscriptionMonitor => {
ethereum_subscription_monitor(
&config,
maybe_blockchain.as_mut(),
&db_pool,
).await
},
PeriodicTask::SubscriptionExpirationMonitor => {
subscription_expiration_monitor(&config, &db_pool).await
},
PeriodicTask::MoneroPaymentMonitor => {
monero_payment_monitor(&config, &db_pool).await
},
PeriodicTask::IncomingActivityQueueExecutor => {
incoming_activity_queue_executor(&config, &db_pool).await
},
@ -114,13 +84,7 @@ pub fn run(
PeriodicTask::PruneRemoteEmojis => {
prune_remote_emojis(&config, &db_pool).await
},
#[cfg(feature = "ethereum-extras")]
PeriodicTask::NftMonitor => {
nft_monitor(
maybe_blockchain.as_mut(),
&db_pool,
).await
},
PeriodicTask::SubscriptionExpirationMonitor => Ok(()),
};
task_result.unwrap_or_else(|err| {
log::error!("{:?}: {}", task, err);

View file

@ -16,7 +16,6 @@ use mitra_utils::{
multibase::{decode_multibase_base58btc, MultibaseError},
};
use crate::ethereum::identity::verify_eip191_signature;
use crate::identity::{
minisign::verify_minisign_signature,
};
@ -112,13 +111,11 @@ pub fn verify_rsa_json_signature(
}
pub fn verify_eip191_json_signature(
signer: &DidPkh,
message: &str,
signature: &[u8],
_signer: &DidPkh,
_message: &str,
_signature: &[u8],
) -> Result<(), VerificationError> {
let signature_hex = hex::encode(signature);
verify_eip191_signature(signer, message, &signature_hex)
.map_err(|_| VerificationError::InvalidSignature)
Err(VerificationError::InvalidSignature)
}
pub fn verify_ed25519_json_signature(

View file

@ -1,7 +1,6 @@
pub mod activitypub;
pub mod atom;
mod errors;
pub mod ethereum;
pub mod http;
mod http_signatures;
mod identity;
@ -11,7 +10,6 @@ mod json_signatures;
pub mod logger;
pub mod mastodon_api;
pub mod media;
pub mod monero;
pub mod nodeinfo;
pub mod validators;
pub mod webfinger;

View file

@ -12,7 +12,6 @@ use tokio::sync::Mutex;
use mitra::activitypub::views as activitypub;
use mitra::atom::views::atom_scope;
use mitra::ethereum::contracts::get_contracts;
use mitra::http::{
create_auth_error_handler,
create_default_headers_middleware,
@ -60,21 +59,6 @@ async fn main() -> std::io::Result<()> {
std::fs::create_dir(config.media_dir())
.expect("failed to create media directory");
};
let maybe_blockchain = if let Some(blockchain_config) = config.blockchain() {
if let Some(ethereum_config) = blockchain_config.ethereum_config() {
// Create blockchain interface
get_contracts(&**db_client, ethereum_config, &config.storage_dir).await
.map(Some).unwrap()
} else {
None
}
} else {
None
};
let maybe_contract_set = maybe_blockchain.clone()
.map(|blockchain| blockchain.contract_set);
std::mem::drop(db_client);
log::info!(
"app initialized; version {}, environment = '{:?}'",
@ -82,7 +66,7 @@ async fn main() -> std::io::Result<()> {
config.environment,
);
scheduler::run(config.clone(), maybe_blockchain, db_pool.clone());
scheduler::run(config.clone(), db_pool.clone());
log::info!("scheduler started");
let num_workers = std::cmp::max(num_cpus::get(), 4);
@ -146,7 +130,6 @@ async fn main() -> std::io::Result<()> {
)
.app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(db_pool.clone()))
.app_data(web::Data::new(maybe_contract_set.clone()))
.app_data(web::Data::clone(&inbox_mutex))
.service(actix_files::Files::new(
"/media",
@ -184,15 +167,6 @@ async fn main() -> std::io::Result<()> {
web::resource("/.well-known/{path}")
.to(HttpResponse::NotFound)
);
if let Some(blockchain_config) = config.blockchain() {
if let Some(ethereum_config) = blockchain_config.ethereum_config() {
// Serve artifacts if available
app = app.service(actix_files::Files::new(
"/contracts",
&ethereum_config.contract_dir,
));
};
};
if let Some(ref web_client_dir) = config.web_client_dir {
app = app.service(web_client::static_service(web_client_dir));
};

View file

@ -70,12 +70,6 @@ use crate::activitypub::{
identifiers::local_actor_id,
};
use crate::errors::ValidationError;
use crate::ethereum::{
contracts::ContractSet,
eip4361::verify_eip4361_signature,
gate::is_allowed_user,
identity::verify_eip191_signature,
};
use crate::http::get_request_base_url;
use crate::identity::{
claims::create_identity_claim,
@ -130,7 +124,6 @@ pub async fn create_account(
connection_info: ConnectionInfo,
config: web::Data<Config>,
db_pool: web::Data<DbPool>,
maybe_blockchain: web::Data<Option<ContractSet>>,
account_data: web::Json<AccountCreateData>,
) -> Result<HttpResponse, MastodonError> {
let db_client = &mut **get_database_client(&db_pool).await?;
@ -151,36 +144,6 @@ pub async fn create_account(
} else {
None
};
let maybe_wallet_address = if let Some(message) = account_data.message.as_ref() {
let signature = account_data.signature.as_ref()
.ok_or(ValidationError("signature is required"))?;
let wallet_address = verify_eip4361_signature(
message,
signature,
&config.instance().hostname(),
&config.login_message,
)?;
Some(wallet_address)
} else {
None
};
if maybe_wallet_address.is_some() == maybe_password_hash.is_some() {
// Either password or EIP-4361 auth must be used (but not both)
return Err(ValidationError("invalid login data").into());
};
if let Some(contract_set) = maybe_blockchain.as_ref() {
if let Some(ref gate) = contract_set.gate {
// Wallet address is required if token gate is present
let wallet_address = maybe_wallet_address.as_ref()
.ok_or(ValidationError("wallet address is required"))?;
let is_allowed = is_allowed_user(gate, wallet_address).await
.map_err(|_| MastodonError::InternalError)?;
if !is_allowed {
return Err(ValidationError("not allowed to sign up").into());
};
};
};
// Generate RSA private key for actor
let private_key = match web::block(generate_rsa_key).await {
@ -200,7 +163,7 @@ pub async fn create_account(
username,
password_hash: maybe_password_hash,
private_key_pem,
wallet_address: maybe_wallet_address,
wallet_address: None,
invite_code,
role,
};
@ -443,12 +406,7 @@ async fn create_identity_proof(
return Err(ValidationError("DID doesn't match current identity").into());
};
};
verify_eip191_signature(
did_pkh,
&message,
&proof_data.signature,
).map_err(|_| ValidationError("invalid signature"))?;
IdentityProofType::LegacyEip191IdentityProof
return Err(ValidationError("invalid signature").into());
},
};

View file

@ -1,15 +1,12 @@
use serde::Serialize;
use serde_json::{to_value, Value};
use mitra_config::{
BlockchainConfig,
Config,
RegistrationType,
MITRA_VERSION,
};
use mitra_utils::markdown::markdown_to_html;
use crate::ethereum::contracts::ContractSet;
use crate::mastodon_api::MASTODON_API_VERSION;
use crate::media::SUPPORTED_MEDIA_TYPES;
use crate::validators::posts::ATTACHMENTS_MAX_NUM;
@ -39,21 +36,6 @@ struct InstanceConfiguration {
media_attachments: InstanceMediaLimits,
}
#[derive(Serialize)]
struct BlockchainFeatures {
gate: bool,
minter: bool,
subscriptions: bool,
}
#[derive(Serialize)]
struct BlockchainInfo {
chain_id: String,
chain_metadata: Option<Value>,
contract_address: Option<String>,
features: BlockchainFeatures,
}
/// https://docs.joinmastodon.org/entities/V1_Instance/
#[derive(Serialize)]
pub struct InstanceInfo {
@ -71,7 +53,6 @@ pub struct InstanceInfo {
login_message: String,
post_character_limit: usize, // deprecated
blockchains: Vec<BlockchainInfo>,
ipfs_gateway_url: Option<String>,
}
@ -86,53 +67,10 @@ fn get_full_api_version(version: &str) -> String {
impl InstanceInfo {
pub fn create(
config: &Config,
maybe_blockchain: Option<&ContractSet>,
user_count: i64,
post_count: i64,
peer_count: i64,
) -> Self {
let mut blockchains = vec![];
match config.blockchain() {
Some(BlockchainConfig::Ethereum(ethereum_config)) => {
let features = if let Some(contract_set) = maybe_blockchain {
BlockchainFeatures {
gate: contract_set.gate.is_some(),
minter: contract_set.collectible.is_some(),
subscriptions: contract_set.subscription.is_some(),
}
} else {
BlockchainFeatures {
gate: false,
minter: false,
subscriptions: false,
}
};
let maybe_chain_metadata = ethereum_config
.chain_metadata.as_ref()
.and_then(|metadata| to_value(metadata).ok());
blockchains.push(BlockchainInfo {
chain_id: ethereum_config.chain_id.to_string(),
chain_metadata: maybe_chain_metadata,
contract_address:
Some(ethereum_config.contract_address.clone()),
features: features,
});
},
Some(BlockchainConfig::Monero(monero_config)) => {
let features = BlockchainFeatures {
gate: false,
minter: false,
subscriptions: true,
};
blockchains.push(BlockchainInfo {
chain_id: monero_config.chain_id.to_string(),
chain_metadata: None,
contract_address: None,
features: features,
})
},
None => (),
};
Self {
uri: config.instance().hostname(),
title: config.instance_title.clone(),
@ -165,7 +103,6 @@ impl InstanceInfo {
},
login_message: config.login_message.clone(),
post_character_limit: config.limits.posts.character_limit,
blockchains: blockchains,
ipfs_gateway_url: config.ipfs_gateway_url.clone(),
}
}

View file

@ -8,7 +8,6 @@ use mitra_models::{
users::queries::get_user_count,
};
use crate::ethereum::contracts::ContractSet;
use crate::mastodon_api::errors::MastodonError;
use super::types::InstanceInfo;
@ -18,7 +17,6 @@ use super::types::InstanceInfo;
async fn instance_view(
config: web::Data<Config>,
db_pool: web::Data<DbPool>,
maybe_blockchain: web::Data<Option<ContractSet>>,
) -> Result<HttpResponse, MastodonError> {
let db_client = &**get_database_client(&db_pool).await?;
let user_count = get_user_count(db_client).await?;
@ -26,7 +24,6 @@ async fn instance_view(
let peer_count = get_peer_count(db_client).await?;
let instance = InstanceInfo::create(
config.as_ref(),
maybe_blockchain.as_ref().as_ref(),
user_count,
post_count,
peer_count,

View file

@ -21,16 +21,11 @@ use mitra_models::{
},
users::queries::{
get_user_by_name,
get_user_by_login_address,
},
};
use mitra_utils::passwords::verify_password;
use crate::errors::ValidationError;
use crate::ethereum::{
eip4361::verify_eip4361_signature,
utils::validate_ethereum_address,
};
use crate::http::FormOrJson;
use crate::mastodon_api::errors::MastodonError;
@ -115,7 +110,7 @@ const ACCESS_TOKEN_EXPIRES_IN: i64 = 86400 * 7;
/// https://oauth.net/2/grant-types/password/
#[post("/token")]
async fn token_view(
config: web::Data<Config>,
_config: web::Data<Config>,
db_pool: web::Data<DbPool>,
request_data: FormOrJson<TokenRequest>,
) -> Result<HttpResponse, MastodonError> {
@ -135,26 +130,6 @@ async fn token_view(
.ok_or(ValidationError("username is required"))?;
get_user_by_name(db_client, username).await?
},
"ethereum" => {
// DEPRECATED
let wallet_address = request_data.wallet_address.as_ref()
.ok_or(ValidationError("wallet address is required"))?;
validate_ethereum_address(wallet_address)?;
get_user_by_login_address(db_client, wallet_address).await?
},
"eip4361" => {
let message = request_data.message.as_ref()
.ok_or(ValidationError("message is required"))?;
let signature = request_data.signature.as_ref()
.ok_or(ValidationError("signature is required"))?;
let wallet_address = verify_eip4361_signature(
message,
signature,
&config.instance().hostname(),
&config.login_message,
)?;
get_user_by_login_address(db_client, &wallet_address).await?
},
_ => {
return Err(ValidationError("unsupported grant type").into());
},

View file

@ -13,7 +13,6 @@ use mitra_models::{
profiles::queries::{
search_profiles,
search_profiles_by_did_only,
search_profiles_by_wallet_address,
},
profiles::types::DbActorProfile,
tags::queries::search_tags,
@ -23,7 +22,6 @@ use mitra_models::{
},
};
use mitra_utils::{
currencies::Currency,
did::Did,
};
@ -37,7 +35,6 @@ use crate::activitypub::{
HandlerError,
};
use crate::errors::ValidationError;
use crate::ethereum::utils::validate_ethereum_address;
use crate::media::MediaStorage;
use crate::webfinger::types::ActorAddress;
@ -47,7 +44,6 @@ enum SearchQuery {
ProfileQuery(String, Option<String>),
TagQuery(String),
Url(String),
WalletAddress(String),
Did(Did),
Unknown,
}
@ -87,12 +83,6 @@ fn parse_search_query(search_query: &str) -> SearchQuery {
if Url::parse(search_query).is_ok() {
return SearchQuery::Url(search_query.to_string());
};
// TODO: support other currencies
if validate_ethereum_address(
&search_query.to_lowercase(),
).is_ok() {
return SearchQuery::WalletAddress(search_query.to_string());
};
if let Ok(tag) = parse_tag_query(search_query) {
return SearchQuery::TagQuery(tag);
};
@ -267,16 +257,6 @@ pub async fn search(
};
};
},
SearchQuery::WalletAddress(address) => {
// Search by wallet address, assuming it's ethereum address
// TODO: support other currencies
profiles = search_profiles_by_wallet_address(
db_client,
&Currency::Ethereum,
&address,
false,
).await?;
},
SearchQuery::Did(did) => {
profiles = search_profiles_by_did_only(
db_client,

View file

@ -1,4 +1,4 @@
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -7,7 +7,6 @@ use mitra_models::{
profiles::types::PaymentOption,
};
use crate::monero::subscriptions::MONERO_INVOICE_TIMEOUT;
#[derive(Deserialize)]
pub struct InvoiceData {
@ -35,12 +34,7 @@ impl From<DbInvoice> for Invoice {
InvoiceStatus::Forwarded => "forwarded",
InvoiceStatus::Timeout => "timeout",
};
let expires_at = if value.chain_id.inner().is_monero() {
value.created_at + Duration::seconds(MONERO_INVOICE_TIMEOUT)
} else {
// Epoch 0
Default::default()
};
let expires_at = Default::default();
Self {
id: value.id,
sender_id: value.sender_id,

View file

@ -12,45 +12,19 @@ use uuid::Uuid;
use mitra_config::Config;
use mitra_models::{
database::{get_database_client, DbPool},
invoices::queries::{create_invoice, get_invoice_by_id},
profiles::queries::{
get_profile_by_id,
update_profile,
},
profiles::types::{
MoneroSubscription,
PaymentOption,
PaymentType,
ProfileUpdateData,
},
invoices::queries::{get_invoice_by_id},
subscriptions::queries::get_subscription_by_participants,
users::queries::get_user_by_id,
users::types::Permission,
};
use mitra_utils::currencies::Currency;
use crate::activitypub::builders::update_person::prepare_update_person;
use crate::errors::ValidationError;
use crate::ethereum::{
contracts::ContractSet,
subscriptions::{
create_subscription_signature,
is_registered_recipient,
},
};
use crate::http::get_request_base_url;
use crate::mastodon_api::{
accounts::types::Account,
errors::MastodonError,
oauth::auth::get_current_user,
};
use crate::monero::{
helpers::validate_monero_address,
wallet::create_monero_address,
};
use super::types::{
Invoice,
InvoiceData,
SubscriptionAuthorizationQueryParams,
SubscriptionDetails,
SubscriptionOption,
@ -60,28 +34,17 @@ use super::types::{
#[get("/authorize")]
pub async fn authorize_subscription(
auth: BearerAuth,
config: web::Data<Config>,
_config: web::Data<Config>,
db_pool: web::Data<DbPool>,
query_params: web::Query<SubscriptionAuthorizationQueryParams>,
_query_params: web::Query<SubscriptionAuthorizationQueryParams>,
) -> Result<HttpResponse, MastodonError> {
let db_client = &**get_database_client(&db_pool).await?;
let current_user = get_current_user(db_client, auth.token()).await?;
let ethereum_config = config.blockchain()
.ok_or(MastodonError::NotSupported)?
.ethereum_config()
.ok_or(MastodonError::NotSupported)?;
let _current_user = get_current_user(db_client, auth.token()).await?;
// The user must have a public ethereum address,
// because subscribers should be able
// to verify that payments are actually sent to the recipient.
let wallet_address = current_user
.public_wallet_address(&Currency::Ethereum)
.ok_or(MastodonError::PermissionError)?;
let signature = create_subscription_signature(
ethereum_config,
&wallet_address,
query_params.price,
).map_err(|_| MastodonError::InternalError)?;
Ok(HttpResponse::Ok().json(signature))
return Err(MastodonError::PermissionError);
}
#[get("/options")]
@ -104,77 +67,14 @@ pub async fn register_subscription_option(
connection_info: ConnectionInfo,
config: web::Data<Config>,
db_pool: web::Data<DbPool>,
maybe_blockchain: web::Data<Option<ContractSet>>,
subscription_option: web::Json<SubscriptionOption>,
_subscription_option: web::Json<SubscriptionOption>,
) -> Result<HttpResponse, MastodonError> {
let db_client = &mut **get_database_client(&db_pool).await?;
let mut current_user = get_current_user(db_client, auth.token()).await?;
let current_user = get_current_user(db_client, auth.token()).await?;
if !current_user.role.has_permission(Permission::ManageSubscriptionOptions) {
return Err(MastodonError::PermissionError);
};
let maybe_payment_option = match subscription_option.into_inner() {
SubscriptionOption::Ethereum => {
let ethereum_config = config.blockchain()
.and_then(|conf| conf.ethereum_config())
.ok_or(MastodonError::NotSupported)?;
let contract_set = maybe_blockchain.as_ref().as_ref()
.ok_or(MastodonError::NotSupported)?;
let wallet_address = current_user
.public_wallet_address(&Currency::Ethereum)
.ok_or(MastodonError::PermissionError)?;
if current_user.profile.payment_options
.any(PaymentType::EthereumSubscription)
{
// Ignore attempts to update payment option
None
} else {
let is_registered = is_registered_recipient(
contract_set,
&wallet_address,
).await.map_err(|_| MastodonError::InternalError)?;
if !is_registered {
return Err(ValidationError("recipient is not registered").into());
};
Some(PaymentOption::ethereum_subscription(
ethereum_config.chain_id.clone(),
))
}
},
SubscriptionOption::Monero { price, payout_address } => {
let monero_config = config.blockchain()
.and_then(|conf| conf.monero_config())
.ok_or(MastodonError::NotSupported)?;
if price == 0 {
return Err(ValidationError("price must be greater than 0").into());
};
validate_monero_address(&payout_address)?;
let payment_info = MoneroSubscription {
chain_id: monero_config.chain_id.clone(),
price,
payout_address,
};
Some(PaymentOption::MoneroSubscription(payment_info))
},
};
if let Some(payment_option) = maybe_payment_option {
let mut profile_data = ProfileUpdateData::from(&current_user.profile);
profile_data.add_payment_option(payment_option);
current_user.profile = update_profile(
db_client,
&current_user.id,
profile_data,
).await?;
// Federate
prepare_update_person(
db_client,
&config.instance(),
&current_user,
None,
).await?.enqueue(db_client).await?;
};
let account = Account::from_user(
&get_request_base_url(connection_info),
&config.instance_url(),
@ -201,44 +101,6 @@ async fn find_subscription(
Ok(HttpResponse::Ok().json(details))
}
#[post("/invoices")]
async fn create_invoice_view(
config: web::Data<Config>,
db_pool: web::Data<DbPool>,
invoice_data: web::Json<InvoiceData>,
) -> Result<HttpResponse, MastodonError> {
let monero_config = config.blockchain()
.ok_or(MastodonError::NotSupported)?
.monero_config()
.ok_or(MastodonError::NotSupported)?;
if invoice_data.sender_id == invoice_data.recipient_id {
return Err(ValidationError("sender must be different from recipient").into());
};
if invoice_data.amount <= 0 {
return Err(ValidationError("amount must be positive").into());
};
let db_client = &**get_database_client(&db_pool).await?;
let sender = get_profile_by_id(db_client, &invoice_data.sender_id).await?;
let recipient = get_user_by_id(db_client, &invoice_data.recipient_id).await?;
if !recipient.profile.payment_options.any(PaymentType::MoneroSubscription) {
let error_message = "recipient can't accept subscription payments";
return Err(ValidationError(error_message).into());
};
let payment_address = create_monero_address(monero_config).await
.map_err(|_| MastodonError::InternalError)?
.to_string();
let db_invoice = create_invoice(
db_client,
&sender.id,
&recipient.id,
&monero_config.chain_id,
&payment_address,
invoice_data.amount,
).await?;
let invoice = Invoice::from(db_invoice);
Ok(HttpResponse::Ok().json(invoice))
}
#[get("/invoices/{invoice_id}")]
async fn get_invoice(
@ -257,6 +119,5 @@ pub fn subscription_api_scope() -> Scope {
.service(get_subscription_options)
.service(register_subscription_option)
.service(find_subscription)
.service(create_invoice_view)
.service(get_invoice)
}

View file

@ -1,69 +0,0 @@
use std::str::FromStr;
use monero_rpc::TransferType;
use monero_rpc::monero::Address;
use uuid::Uuid;
use mitra_config::MoneroConfig;
use mitra_models::{
database::DatabaseClient,
invoices::queries::{
get_invoice_by_id,
set_invoice_status,
},
invoices::types::InvoiceStatus,
};
use crate::errors::ValidationError;
use super::wallet::{
open_monero_wallet,
MoneroError,
};
pub fn validate_monero_address(address: &str)
-> Result<(), ValidationError>
{
Address::from_str(address)
.map_err(|_| ValidationError("invalid monero address"))?;
Ok(())
}
pub async fn check_expired_invoice(
config: &MoneroConfig,
db_client: &impl DatabaseClient,
invoice_id: &Uuid,
) -> Result<(), MoneroError> {
let wallet_client = open_monero_wallet(config).await?;
let invoice = get_invoice_by_id(db_client, invoice_id).await?;
if invoice.chain_id != config.chain_id ||
invoice.invoice_status != InvoiceStatus::Timeout
{
return Err(MoneroError::OtherError("can't process invoice"));
};
let address = Address::from_str(&invoice.payment_address)?;
let address_index = wallet_client.get_address_index(address).await?;
let transfers = wallet_client.incoming_transfers(
TransferType::Available,
Some(config.account_index),
Some(vec![address_index.minor]),
).await?
.transfers
.unwrap_or_default();
if transfers.is_empty() {
log::info!("no incoming transfers");
} else {
for transfer in transfers {
if transfer.subaddr_index != address_index {
return Err(MoneroError::WalletRpcError("unexpected transfer"));
};
log::info!(
"received payment for invoice {}: {}",
invoice.id,
transfer.amount,
);
};
set_invoice_status(db_client, &invoice.id, InvoiceStatus::Paid).await?;
};
Ok(())
}

View file

@ -1,3 +0,0 @@
pub mod helpers;
pub mod subscriptions;
pub mod wallet;

View file

@ -1,221 +0,0 @@
use std::str::FromStr;
use chrono::{Duration, Utc};
use monero_rpc::TransferType;
use monero_rpc::monero::{Address, Amount};
use mitra_config::{Instance, MoneroConfig};
use mitra_models::{
database::{get_database_client, DatabaseError, DbPool},
invoices::queries::{
get_invoice_by_address,
get_invoices_by_status,
set_invoice_status,
},
invoices::types::InvoiceStatus,
profiles::queries::get_profile_by_id,
profiles::types::PaymentOption,
subscriptions::queries::{
create_subscription,
get_subscription_by_participants,
update_subscription,
},
users::queries::get_user_by_id,
};
use crate::ethereum::subscriptions::send_subscription_notifications;
use super::wallet::{
get_single_item,
get_subaddress_balance,
open_monero_wallet,
send_monero,
MoneroError,
};
pub const MONERO_INVOICE_TIMEOUT: i64 = 3 * 60 * 60; // 3 hours
pub async fn check_monero_subscriptions(
instance: &Instance,
config: &MoneroConfig,
db_pool: &DbPool,
) -> Result<(), MoneroError> {
let db_client = &mut **get_database_client(db_pool).await?;
let wallet_client = open_monero_wallet(config).await?;
// Invoices waiting for payment
let mut address_waitlist = vec![];
let open_invoices = get_invoices_by_status(
db_client,
&config.chain_id,
InvoiceStatus::Open,
).await?;
for invoice in open_invoices {
let invoice_age = Utc::now() - invoice.created_at;
if invoice_age.num_seconds() >= MONERO_INVOICE_TIMEOUT {
set_invoice_status(
db_client,
&invoice.id,
InvoiceStatus::Timeout,
).await?;
continue;
};
let address = Address::from_str(&invoice.payment_address)?;
let address_index = wallet_client.get_address_index(address).await?;
address_waitlist.push(address_index.minor);
};
let maybe_incoming_transfers = if !address_waitlist.is_empty() {
log::info!("{} invoices are waiting for payment", address_waitlist.len());
let incoming_transfers = wallet_client.incoming_transfers(
TransferType::Available,
Some(config.account_index),
Some(address_waitlist),
).await?;
incoming_transfers.transfers
} else {
None
};
if let Some(transfers) = maybe_incoming_transfers {
for transfer in transfers {
let address_data = wallet_client.get_address(
config.account_index,
Some(vec![transfer.subaddr_index.minor]),
).await?;
let subaddress_data = get_single_item(address_data.addresses)?;
let subaddress = subaddress_data.address;
let invoice = get_invoice_by_address(
db_client,
&config.chain_id,
&subaddress.to_string(),
).await?;
log::info!(
"received payment for invoice {}: {}",
invoice.id,
transfer.amount,
);
if invoice.invoice_status == InvoiceStatus::Open {
set_invoice_status(db_client, &invoice.id, InvoiceStatus::Paid).await?;
} else {
log::warn!("invoice has already been paid");
};
};
};
// Invoices waiting to be forwarded
let paid_invoices = get_invoices_by_status(
db_client,
&config.chain_id,
InvoiceStatus::Paid,
).await?;
for invoice in paid_invoices {
let address = Address::from_str(&invoice.payment_address)?;
let address_index = wallet_client.get_address_index(address).await?;
let balance_data = get_subaddress_balance(
&wallet_client,
&address_index,
).await?;
if balance_data.balance != balance_data.unlocked_balance ||
balance_data.balance == Amount::ZERO
{
// Don't forward payment until all outputs are unlocked
continue;
};
let sender = get_profile_by_id(db_client, &invoice.sender_id).await?;
let recipient = get_user_by_id(db_client, &invoice.recipient_id).await?;
let maybe_payment_info = recipient.profile.payment_options.clone()
.into_inner().into_iter()
.find_map(|option| match option {
PaymentOption::MoneroSubscription(payment_info) => {
if payment_info.chain_id == config.chain_id {
Some(payment_info)
} else {
None
}
},
_ => None,
});
let payment_info = if let Some(payment_info) = maybe_payment_info {
payment_info
} else {
log::error!("subscription is not configured for user {}", recipient.id);
continue;
};
let payout_address = Address::from_str(&payment_info.payout_address)?;
let payout_amount = send_monero(
&wallet_client,
config.account_index,
address_index.minor,
payout_address,
).await?;
let duration_secs = (payout_amount.as_pico() / payment_info.price)
.try_into()
.map_err(|_| MoneroError::OtherError("invalid duration"))?;
set_invoice_status(
db_client,
&invoice.id,
InvoiceStatus::Forwarded,
).await?;
log::info!("processed payment for invoice {}", invoice.id);
match get_subscription_by_participants(
db_client,
&sender.id,
&recipient.id,
).await {
Ok(subscription) => {
if subscription.chain_id != config.chain_id {
log::error!("can't switch to another chain");
continue;
};
// Update subscription expiration date
let expires_at =
std::cmp::max(subscription.expires_at, Utc::now()) +
Duration::seconds(duration_secs);
update_subscription(
db_client,
subscription.id,
&expires_at,
&Utc::now(),
).await?;
log::info!(
"subscription updated: {0} to {1}",
subscription.sender_id,
subscription.recipient_id,
);
send_subscription_notifications(
db_client,
instance,
&sender,
&recipient,
).await?;
},
Err(DatabaseError::NotFound(_)) => {
// New subscription
let expires_at = Utc::now() + Duration::seconds(duration_secs);
create_subscription(
db_client,
&sender.id,
None, // matching by address is not required
&recipient.id,
&config.chain_id,
&expires_at,
&Utc::now(),
).await?;
log::info!(
"subscription created: {0} to {1}",
sender.id,
recipient.id,
);
send_subscription_notifications(
db_client,
instance,
&sender,
&recipient,
).await?;
},
Err(other_error) => return Err(other_error.into()),
};
};
Ok(())
}

View file

@ -1,166 +0,0 @@
use monero_rpc::{
HashString,
RpcClientBuilder,
SubaddressBalanceData,
SweepAllArgs,
TransferPriority,
WalletClient,
};
use monero_rpc::monero::{
cryptonote::subaddress::Index,
util::address::Error as AddressError,
Address,
Amount,
};
use mitra_config::MoneroConfig;
use mitra_models::database::DatabaseError;
#[derive(thiserror::Error, Debug)]
pub enum MoneroError {
#[error(transparent)]
WalletError(#[from] anyhow::Error),
#[error("{0}")]
WalletRpcError(&'static str),
#[error(transparent)]
AddressError(#[from] AddressError),
#[error(transparent)]
DatabaseError(#[from] DatabaseError),
#[error("other error")]
OtherError(&'static str),
}
/// https://monerodocs.org/interacting/monero-wallet-rpc-reference/#create_wallet
pub async fn create_monero_wallet(
config: &MoneroConfig,
name: String,
password: Option<String>,
) -> Result<(), MoneroError> {
let wallet_client = RpcClientBuilder::new()
.build(config.wallet_url.clone())?
.wallet();
let language = "English".to_string();
wallet_client.create_wallet(name, password, language).await?;
Ok(())
}
/// https://monerodocs.org/interacting/monero-wallet-rpc-reference/#open_wallet
pub async fn open_monero_wallet(
config: &MoneroConfig,
) -> Result<WalletClient, MoneroError> {
let wallet_client = RpcClientBuilder::new()
.build(config.wallet_url.clone())?
.wallet();
if let Err(error) = wallet_client.refresh(None).await {
if error.to_string() == "Server error: No wallet file" {
// Try to open wallet
if let Some(ref wallet_name) = config.wallet_name {
wallet_client.open_wallet(
wallet_name.clone(),
config.wallet_password.clone(),
).await?;
} else {
return Err(MoneroError::WalletRpcError("wallet file is required"));
};
} else {
return Err(error.into());
};
};
Ok(wallet_client)
}
pub async fn create_monero_address(
config: &MoneroConfig,
) -> Result<Address, MoneroError> {
let wallet_client = open_monero_wallet(config).await?;
let account_index = config.account_index;
let (address, address_index) =
wallet_client.create_address(account_index, None).await?;
log::info!("created monero address {}/{}", account_index, address_index);
// Save wallet
wallet_client.close_wallet().await?;
Ok(address)
}
pub fn get_single_item<T: Clone>(items: Vec<T>) -> Result<T, MoneroError> {
if let [item] = &items[..] {
Ok(item.clone())
} else {
Err(MoneroError::WalletRpcError("expected single item"))
}
}
pub async fn get_subaddress_balance(
wallet_client: &WalletClient,
subaddress_index: &Index,
) -> Result<SubaddressBalanceData, MoneroError> {
let balance_data = wallet_client.get_balance(
subaddress_index.major,
Some(vec![subaddress_index.minor]),
).await?;
let subaddress_data = get_single_item(balance_data.per_subaddress)?;
Ok(subaddress_data)
}
/// https://monerodocs.org/interacting/monero-wallet-rpc-reference/#sweep_all
pub async fn send_monero(
wallet_client: &WalletClient,
from_account: u32,
from_address: u32,
to_address: Address,
) -> Result<Amount, MoneroError> {
let sweep_args = SweepAllArgs {
address: to_address,
account_index: from_account,
subaddr_indices: Some(vec![from_address]),
priority: TransferPriority::Default,
mixin: 15,
ring_size: 16,
unlock_time: 1,
get_tx_keys: None,
below_amount: None,
do_not_relay: None,
get_tx_hex: None,
get_tx_metadata: None,
};
let sweep_data = wallet_client.sweep_all(sweep_args).await?;
let HashString(tx_hash) = get_single_item(sweep_data.tx_hash_list)?;
let amount = get_single_item(sweep_data.amount_list)?;
let fee = get_single_item(sweep_data.fee_list)?;
// TODO: transaction can fail
// https://github.com/monero-project/monero/issues/8372
let maybe_transfer = wallet_client.get_transfer(
tx_hash,
Some(from_account),
).await?;
let transfer_status = maybe_transfer
.map(|data| data.transfer_type.into())
.unwrap_or("dropped");
if transfer_status == "dropped" || transfer_status == "failed" {
log::error!(
"sent transaction {:x} from {}/{}, {}",
tx_hash,
from_account,
from_address,
transfer_status,
);
return Err(MoneroError::WalletRpcError("transaction failed"));
};
log::info!(
"sent transaction {:x} from {}/{}, amount {}, fee {}",
tx_hash,
from_account,
from_address,
amount,
fee,
);
// Save wallet
wallet_client.close_wallet().await?;
Ok(amount)
}

View file

@ -4,8 +4,8 @@ use serde::Serialize;
use mitra_config::{Config, RegistrationType, MITRA_VERSION};
const MITRA_NAME: &str = "mitra";
const MITRA_REPOSITORY: &str = "https://codeberg.org/silverpill/mitra";
const MITRA_NAME: &str = "reef";
const MITRA_REPOSITORY: &str = "https://code.caric.io/reef/reef";
const ATOM_SERVICE: &str = "atom1.0";
const ACTIVITYPUB_PROTOCOL: &str = "activitypub";