use tokio_postgres::GenericClient; use uuid::Uuid; use crate::database::catch_unique_violation; use crate::errors::DatabaseError; use crate::ethereum::identity::DidPkh; use crate::models::cleanup::{ find_orphaned_files, find_orphaned_ipfs_objects, DeletionQueue, }; use crate::models::relationships::types::RelationshipType; use crate::utils::id::new_uuid; use super::currencies::{get_currency_field_name, Currency}; use super::types::{ DbActorProfile, ExtraFields, IdentityProofs, ProfileCreateData, ProfileUpdateData, }; /// Create new profile using given Client or Transaction. pub async fn create_profile( db_client: &impl GenericClient, profile_data: ProfileCreateData, ) -> Result { let profile_id = new_uuid(); let row = db_client.query_one( " INSERT INTO actor_profile ( id, username, display_name, acct, bio, bio_source, avatar_file_name, banner_file_name, identity_proofs, extra_fields, actor_json ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING actor_profile ", &[ &profile_id, &profile_data.username, &profile_data.display_name, &profile_data.acct, &profile_data.bio, &profile_data.bio, &profile_data.avatar, &profile_data.banner, &IdentityProofs(profile_data.identity_proofs), &ExtraFields(profile_data.extra_fields), &profile_data.actor_json, ], ).await.map_err(catch_unique_violation("profile"))?; let profile = row.try_get("actor_profile")?; Ok(profile) } pub async fn update_profile( db_client: &impl GenericClient, profile_id: &Uuid, data: ProfileUpdateData, ) -> Result { let maybe_row = db_client.query_opt( " UPDATE actor_profile SET display_name = $1, bio = $2, bio_source = $3, avatar_file_name = $4, banner_file_name = $5, identity_proofs = $6, extra_fields = $7, actor_json = $8 WHERE id = $9 RETURNING actor_profile ", &[ &data.display_name, &data.bio, &data.bio_source, &data.avatar, &data.banner, &IdentityProofs(data.identity_proofs), &ExtraFields(data.extra_fields), &data.actor_json, &profile_id, ], ).await?; let profile = match maybe_row { Some(row) => row.try_get("actor_profile")?, None => return Err(DatabaseError::NotFound("profile")), }; Ok(profile) } pub async fn get_profile_by_id( db_client: &impl GenericClient, profile_id: &Uuid, ) -> Result { let result = db_client.query_opt( " SELECT actor_profile FROM actor_profile WHERE id = $1 ", &[&profile_id], ).await?; let profile = match result { Some(row) => row.try_get("actor_profile")?, None => return Err(DatabaseError::NotFound("profile")), }; Ok(profile) } pub async fn get_profile_by_actor_id( db_client: &impl GenericClient, actor_id: &str, ) -> Result { let result = db_client.query_opt( " SELECT actor_profile FROM actor_profile WHERE actor_id = $1 ", &[&actor_id], ).await?; let profile = match result { Some(row) => row.try_get("actor_profile")?, None => return Err(DatabaseError::NotFound("profile")), }; Ok(profile) } pub async fn get_profile_by_acct( db_client: &impl GenericClient, acct: &str, ) -> Result { let result = db_client.query_opt( " SELECT actor_profile FROM actor_profile WHERE actor_profile.acct = $1 ", &[&acct], ).await?; let profile = match result { Some(row) => row.try_get("actor_profile")?, None => return Err(DatabaseError::NotFound("profile")), }; Ok(profile) } pub async fn get_profiles( db_client: &impl GenericClient, offset: i64, limit: i64, ) -> Result, DatabaseError> { let rows = db_client.query( " SELECT actor_profile FROM actor_profile ORDER BY username LIMIT $1 OFFSET $2 ", &[&limit, &offset], ).await?; let profiles = rows.iter() .map(|row| row.try_get("actor_profile")) .collect::, _>>()?; Ok(profiles) } pub async fn get_profiles_by_accts( db_client: &impl GenericClient, accts: Vec, ) -> Result, DatabaseError> { let rows = db_client.query( " SELECT actor_profile FROM actor_profile WHERE acct = ANY($1) ", &[&accts], ).await?; let profiles = rows.iter() .map(|row| row.try_get("actor_profile")) .collect::>()?; Ok(profiles) } /// Deletes profile from database and returns collection of orphaned objects. pub async fn delete_profile( db_client: &mut impl GenericClient, profile_id: &Uuid, ) -> Result { let transaction = db_client.transaction().await?; // Select all posts authored by given actor, // their descendants and reposts. let posts_rows = transaction.query( " WITH RECURSIVE context (post_id) AS ( SELECT post.id FROM post WHERE post.author_id = $1 UNION SELECT post.id FROM post JOIN context ON ( post.in_reply_to_id = context.post_id OR post.repost_of_id = context.post_id ) ) SELECT post_id FROM context ", &[&profile_id], ).await?; let posts: Vec = posts_rows.iter() .map(|row| row.try_get("post_id")) .collect::>()?; // Get list of media files let files_rows = transaction.query( " SELECT unnest(array_remove(ARRAY[avatar_file_name, banner_file_name], NULL)) AS file_name FROM actor_profile WHERE id = $1 UNION ALL SELECT file_name FROM media_attachment WHERE post_id = ANY($2) ", &[&profile_id, &posts], ).await?; let files: Vec = files_rows.iter() .map(|row| row.try_get("file_name")) .collect::>()?; // Get list of IPFS objects let ipfs_objects_rows = transaction.query( " SELECT ipfs_cid FROM media_attachment WHERE post_id = ANY($1) AND ipfs_cid IS NOT NULL UNION ALL SELECT ipfs_cid FROM post WHERE id = ANY($1) AND ipfs_cid IS NOT NULL ", &[&posts], ).await?; let ipfs_objects: Vec = ipfs_objects_rows.iter() .map(|row| row.try_get("ipfs_cid")) .collect::>()?; // Update post counters transaction.execute( " UPDATE actor_profile SET post_count = post_count - post.count FROM ( SELECT post.author_id, count(*) FROM post WHERE post.id = ANY($1) GROUP BY post.author_id ) AS post WHERE actor_profile.id = post.author_id ", &[&posts], ).await?; // Update counters transaction.execute( " UPDATE actor_profile SET follower_count = follower_count - 1 FROM relationship WHERE relationship.source_id = $1 AND relationship.target_id = actor_profile.id AND relationship.relationship_type = $2 ", &[&profile_id, &RelationshipType::Follow], ).await?; transaction.execute( " UPDATE actor_profile SET following_count = following_count - 1 FROM relationship WHERE relationship.source_id = actor_profile.id AND relationship.target_id = $1 AND relationship.relationship_type = $2 ", &[&profile_id, &RelationshipType::Follow], ).await?; transaction.execute( " UPDATE post SET reply_count = reply_count - reply.count FROM ( SELECT in_reply_to_id, count(*) FROM post WHERE author_id = $1 AND in_reply_to_id IS NOT NULL GROUP BY in_reply_to_id ) AS reply WHERE post.id = reply.in_reply_to_id ", &[&profile_id], ).await?; transaction.execute( " UPDATE post SET reaction_count = reaction_count - 1 FROM post_reaction WHERE post_reaction.post_id = post.id AND post_reaction.author_id = $1 ", &[&profile_id], ).await?; transaction.execute( " UPDATE post SET repost_count = post.repost_count - 1 FROM post AS repost WHERE repost.repost_of_id = post.id AND repost.author_id = $1 ", &[&profile_id], ).await?; // Delete profile let deleted_count = transaction.execute( " DELETE FROM actor_profile WHERE id = $1 RETURNING actor_profile ", &[&profile_id], ).await?; if deleted_count == 0 { return Err(DatabaseError::NotFound("profile")); } let orphaned_files = find_orphaned_files(&transaction, files).await?; let orphaned_ipfs_objects = find_orphaned_ipfs_objects(&transaction, ipfs_objects).await?; transaction.commit().await?; Ok(DeletionQueue { files: orphaned_files, ipfs_objects: orphaned_ipfs_objects, }) } pub async fn search_profile( db_client: &impl GenericClient, username: &str, instance: Option<&String>, ) -> Result, DatabaseError> { let db_search_query = match instance { Some(instance) => { // Search for exact actor address format!("{}@{}", username, instance) }, None => { // Fuzzy search for username format!("%{}%", username) }, }; let rows = db_client.query( " SELECT actor_profile FROM actor_profile WHERE acct ILIKE $1 ", &[&db_search_query], ).await?; let profiles: Vec = rows.iter() .map(|row| row.try_get("actor_profile")) .collect::>()?; Ok(profiles) } pub async fn search_profile_by_wallet_address( db_client: &impl GenericClient, currency: &Currency, wallet_address: &str, prefer_verified: bool, ) -> Result, DatabaseError> { let field_name = get_currency_field_name(currency); let did_str = DidPkh::from_address(currency, wallet_address).to_string(); // If currency is Ethereum, // search over extra fields must be case insensitive. // This query does not scan user_account.wallet_address because // login addresses are private by default. let rows = db_client.query( " SELECT actor_profile, TRUE AS is_verified FROM actor_profile WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(actor_profile.identity_proofs) AS proof WHERE proof ->> 'issuer' = $3 ) UNION ALL SELECT actor_profile, FALSE FROM actor_profile WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(actor_profile.extra_fields) AS field WHERE field ->> 'name' ILIKE $1 AND field ->> 'value' ILIKE $2 ) ", &[&field_name, &wallet_address, &did_str], ).await?; let mut verified = vec![]; let mut unverified = vec![]; for row in rows { let profile: DbActorProfile = row.try_get("actor_profile")?; let is_verified: bool = row.try_get("is_verified")?; if is_verified { verified.push(profile); } else if !verified.iter().any(|item| item.id == profile.id) { unverified.push(profile); }; }; let results = if prefer_verified && verified.len() > 0 { verified } else { [verified, unverified].concat() }; Ok(results) } pub async fn update_follower_count( db_client: &impl GenericClient, profile_id: &Uuid, change: i32, ) -> Result { let maybe_row = db_client.query_opt( " UPDATE actor_profile SET follower_count = follower_count + $1 WHERE id = $2 RETURNING actor_profile ", &[&change, &profile_id], ).await?; let row = maybe_row.ok_or(DatabaseError::NotFound("profile"))?; let profile = row.try_get("actor_profile")?; Ok(profile) } pub async fn update_following_count( db_client: &impl GenericClient, profile_id: &Uuid, change: i32, ) -> Result { let maybe_row = db_client.query_opt( " UPDATE actor_profile SET following_count = following_count + $1 WHERE id = $2 RETURNING actor_profile ", &[&change, &profile_id], ).await?; let row = maybe_row.ok_or(DatabaseError::NotFound("profile"))?; let profile = row.try_get("actor_profile")?; Ok(profile) } pub async fn update_post_count( db_client: &impl GenericClient, profile_id: &Uuid, change: i32, ) -> Result { let maybe_row = db_client.query_opt( " UPDATE actor_profile SET post_count = post_count + $1 WHERE id = $2 RETURNING actor_profile ", &[&change, &profile_id], ).await?; let row = maybe_row.ok_or(DatabaseError::NotFound("profile"))?; let profile = row.try_get("actor_profile")?; Ok(profile) } #[cfg(test)] mod tests { use serial_test::serial; use crate::activitypub::actor::Actor; use crate::database::test_utils::create_test_database; use crate::models::profiles::queries::create_profile; use crate::models::profiles::types::{ ExtraField, IdentityProof, ProfileCreateData, }; use crate::models::users::queries::create_user; use crate::models::users::types::UserCreateData; use super::*; #[tokio::test] #[serial] async fn test_create_profile() { let profile_data = ProfileCreateData { username: "test".to_string(), ..Default::default() }; let db_client = create_test_database().await; let profile = create_profile(&db_client, profile_data).await.unwrap(); assert_eq!(profile.username, "test"); assert_eq!(profile.identity_proofs.into_inner().len(), 0); assert_eq!(profile.extra_fields.into_inner().len(), 0); } #[tokio::test] #[serial] async fn test_actor_id_unique() { let db_client = create_test_database().await; let actor_id = "https://example.com/users/test"; let create_actor = |actor_id: &str| { Actor { id: actor_id.to_string(), ..Default::default() } }; let profile_data_1 = ProfileCreateData { username: "test-1".to_string(), acct: "test-1@example.com".to_string(), actor_json: Some(create_actor(actor_id)), ..Default::default() }; create_profile(&db_client, profile_data_1).await.unwrap(); let profile_data_2 = ProfileCreateData { username: "test-2".to_string(), acct: "test-2@example.com".to_string(), actor_json: Some(create_actor(actor_id)), ..Default::default() }; let error = create_profile(&db_client, profile_data_2).await.err().unwrap(); assert_eq!(error.to_string(), "profile already exists"); } #[tokio::test] #[serial] async fn test_delete_profile() { let profile_data = ProfileCreateData::default(); let mut db_client = create_test_database().await; let profile = create_profile(&db_client, profile_data).await.unwrap(); let deletion_queue = delete_profile(&mut db_client, &profile.id).await.unwrap(); assert_eq!(deletion_queue.files.len(), 0); assert_eq!(deletion_queue.ipfs_objects.len(), 0); } const ETHEREUM: Currency = Currency::Ethereum; #[tokio::test] #[serial] async fn test_search_profile_by_wallet_address_local() { let db_client = &mut create_test_database().await; let wallet_address = "0x1234abcd"; let user_data = UserCreateData { wallet_address: Some(wallet_address.to_string()), ..Default::default() }; let _user = create_user(db_client, user_data).await.unwrap(); let profiles = search_profile_by_wallet_address( db_client, ÐEREUM, wallet_address, false).await.unwrap(); // Login address must not be exposed assert_eq!(profiles.len(), 0); } #[tokio::test] #[serial] async fn test_search_profile_by_wallet_address_remote() { let db_client = &mut create_test_database().await; let extra_field = ExtraField { name: "$eth".to_string(), value: "0x1234aBcD".to_string(), value_source: None, }; let profile_data = ProfileCreateData { extra_fields: vec![extra_field], ..Default::default() }; let profile = create_profile(db_client, profile_data).await.unwrap(); let profiles = search_profile_by_wallet_address( db_client, ÐEREUM, "0x1234abcd", false).await.unwrap(); assert_eq!(profiles.len(), 1); assert_eq!(profiles[0].id, profile.id); } #[tokio::test] #[serial] async fn test_search_profile_by_wallet_address_identity_proof() { let db_client = &mut create_test_database().await; let identity_proof = IdentityProof { issuer: DidPkh::from_address(ÐEREUM, "0x1234abcd"), proof_type: "ethereum".to_string(), value: "13590013185bdea963".to_string(), }; let profile_data = ProfileCreateData { identity_proofs: vec![identity_proof], ..Default::default() }; let profile = create_profile(db_client, profile_data).await.unwrap(); let profiles = search_profile_by_wallet_address( db_client, ÐEREUM, "0x1234abcd", false).await.unwrap(); assert_eq!(profiles.len(), 1); assert_eq!(profiles[0].id, profile.id); } }