diff --git a/src/bin/mitractl.rs b/src/bin/mitractl.rs index 0c6b808..e39267b 100644 --- a/src/bin/mitractl.rs +++ b/src/bin/mitractl.rs @@ -13,7 +13,6 @@ use mitra::models::users::queries::{ generate_invite_code, get_invite_codes, }; -use mitra::utils::files::remove_files; /// Admin CLI tool #[derive(Clap)] @@ -68,13 +67,13 @@ async fn main() { match opts.subcmd { SubCommand::DeleteProfile(subopts) => { - let orphaned_files = delete_profile(db_client, &subopts.id).await.unwrap(); - remove_files(orphaned_files, &config.media_dir()); + let deletion_queue = delete_profile(db_client, &subopts.id).await.unwrap(); + deletion_queue.process(&config).await; println!("profile deleted"); }, SubCommand::DeletePost(subopts) => { - let orphaned_files = delete_post(db_client, &subopts.id).await.unwrap(); - remove_files(orphaned_files, &config.media_dir()); + let deletion_queue = delete_post(db_client, &subopts.id).await.unwrap(); + deletion_queue.process(&config).await; println!("post deleted"); }, SubCommand::GenerateInviteCode(_) => { diff --git a/src/ipfs/store.rs b/src/ipfs/store.rs index 8a6c351..2b87f48 100644 --- a/src/ipfs/store.rs +++ b/src/ipfs/store.rs @@ -9,7 +9,7 @@ struct ObjectAdded { hash: String, } -/// Add file to IPFS. +/// Adds file to IPFS. /// Returns CID v1 of the object. pub async fn add(ipfs_api_url: &str, data: Vec) -> Result { let client = Client::new(); @@ -19,8 +19,34 @@ pub async fn add(ipfs_api_url: &str, data: Vec) -> Result, +) -> Result<(), reqwest::Error> { + let client = Client::new(); + let remove_pin_url = format!("{}/api/v0/pin/rm", ipfs_api_url); + let mut remove_pin_args = vec![]; + for cid in cids { + log::info!("removing {} from IPFS node", cid); + remove_pin_args.push(("arg", cid)); + } + let remove_pin_response = client.post(&remove_pin_url) + .query(&remove_pin_args) + .query(&[("recursive", true)]) + .send().await?; + remove_pin_response.error_for_status()?; + let gc_url = format!("{}/api/v0/repo/gc", ipfs_api_url); + // Garbage collecting can take a long time + // https://github.com/ipfs/go-ipfs/issues/7752 + let gc_response = client.post(&gc_url) + .send().await?; + gc_response.error_for_status()?; + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 307068b..19f3791 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,5 +10,5 @@ pub mod mastodon_api; pub mod models; pub mod nodeinfo; pub mod scheduler; -pub mod utils; +mod utils; pub mod webfinger; diff --git a/src/models/attachments/queries.rs b/src/models/attachments/queries.rs index d78d3c9..4acfb1d 100644 --- a/src/models/attachments/queries.rs +++ b/src/models/attachments/queries.rs @@ -23,31 +23,6 @@ pub async fn create_attachment( Ok(db_attachment) } -pub async fn find_orphaned_files( - db_client: &impl GenericClient, - files: Vec, -) -> Result, DatabaseError> { - let rows = db_client.query( - " - SELECT fname - FROM unnest($1::text[]) AS fname - WHERE - NOT EXISTS ( - SELECT 1 FROM media_attachment WHERE file_name = fname - ) - AND NOT EXISTS ( - SELECT 1 FROM actor_profile - WHERE avatar_file_name = fname OR banner_file_name = fname - ) - ", - &[&files], - ).await?; - let orphaned_files = rows.iter() - .map(|row| row.try_get("fname")) - .collect::>()?; - Ok(orphaned_files) -} - pub async fn set_attachment_ipfs_cid( db_client: &impl GenericClient, attachment_id: &Uuid, diff --git a/src/models/cleanup.rs b/src/models/cleanup.rs new file mode 100644 index 0000000..c51d336 --- /dev/null +++ b/src/models/cleanup.rs @@ -0,0 +1,80 @@ +use tokio_postgres::GenericClient; + +use crate::config::Config; +use crate::errors::DatabaseError; +use crate::ipfs::store as ipfs_store; +use crate::utils::files::remove_files; + +pub struct DeletionQueue { + pub files: Vec, + pub ipfs_objects: Vec, +} + +impl DeletionQueue { + pub async fn process(self, config: &Config) -> () { + remove_files(self.files, &config.media_dir()); + if self.ipfs_objects.len() > 0 { + match &config.ipfs_api_url { + Some(ipfs_api_url) => { + ipfs_store::remove(ipfs_api_url, self.ipfs_objects).await + .unwrap_or_else(|err| log::error!("{}", err)); + }, + None => { + log::error!( + "can not remove objects because IPFS API URL is not set: {:?}", + self.ipfs_objects, + ); + }, + } + } + } +} + +pub async fn find_orphaned_files( + db_client: &impl GenericClient, + files: Vec, +) -> Result, DatabaseError> { + let rows = db_client.query( + " + SELECT fname + FROM unnest($1::text[]) AS fname + WHERE + NOT EXISTS ( + SELECT 1 FROM media_attachment WHERE file_name = fname + ) + AND NOT EXISTS ( + SELECT 1 FROM actor_profile + WHERE avatar_file_name = fname OR banner_file_name = fname + ) + ", + &[&files], + ).await?; + let orphaned_files = rows.iter() + .map(|row| row.try_get("fname")) + .collect::>()?; + Ok(orphaned_files) +} + +pub async fn find_orphaned_ipfs_objects( + db_client: &impl GenericClient, + ipfs_objects: Vec, +) -> Result, DatabaseError> { + let rows = db_client.query( + " + SELECT cid + FROM unnest($1::text[]) AS cid + WHERE + NOT EXISTS ( + SELECT 1 FROM media_attachment WHERE ipfs_cid = cid + ) + AND NOT EXISTS ( + SELECT 1 FROM post WHERE ipfs_cid = cid + ) + ", + &[&ipfs_objects], + ).await?; + let orphaned_ipfs_objects = rows.iter() + .map(|row| row.try_get("cid")) + .collect::>()?; + Ok(orphaned_ipfs_objects) +} diff --git a/src/models/mod.rs b/src/models/mod.rs index 72f013f..3bd5279 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,4 +1,5 @@ pub mod attachments; +mod cleanup; pub mod posts; pub mod profiles; pub mod relationships; diff --git a/src/models/posts/queries.rs b/src/models/posts/queries.rs index cb9c2bc..1cc215b 100644 --- a/src/models/posts/queries.rs +++ b/src/models/posts/queries.rs @@ -5,8 +5,12 @@ use tokio_postgres::GenericClient; use uuid::Uuid; use crate::errors::DatabaseError; -use crate::models::attachments::queries::find_orphaned_files; use crate::models::attachments::types::DbMediaAttachment; +use crate::models::cleanup::{ + find_orphaned_files, + find_orphaned_ipfs_objects, + DeletionQueue, +}; use crate::models::profiles::queries::update_post_count; use super::types::{DbPost, Post, PostCreateData}; @@ -294,11 +298,11 @@ pub async fn get_token_waitlist( Ok(waitlist) } -/// Deletes post from database and returns list of orphaned files. +/// Deletes post from database and returns collection of orphaned objects. pub async fn delete_post( db_client: &mut impl GenericClient, post_id: &Uuid, -) -> Result, DatabaseError> { +) -> Result { let transaction = db_client.transaction().await?; // Get list of attached files let files_rows = transaction.query( @@ -311,6 +315,22 @@ pub async fn delete_post( let files: Vec = files_rows.iter() .map(|row| row.try_get("file_name")) .collect::>()?; + // Get list of linked IPFS objects + let ipfs_objects_rows = transaction.query( + " + SELECT ipfs_cid + FROM media_attachment + WHERE post_id = $1 AND ipfs_cid IS NOT NULL + UNION ALL + SELECT ipfs_cid + FROM post + WHERE id = $1 AND ipfs_cid IS NOT NULL + ", + &[&post_id], + ).await?; + let ipfs_objects: Vec = ipfs_objects_rows.iter() + .map(|row| row.try_get("ipfs_cid")) + .collect::>()?; // Delete post let maybe_post_row = transaction.query_opt( " @@ -327,6 +347,10 @@ pub async fn delete_post( } update_post_count(&transaction, &db_post.author_id, -1).await?; let orphaned_files = find_orphaned_files(&transaction, files).await?; + let orphaned_ipfs_objects = find_orphaned_ipfs_objects(&transaction, ipfs_objects).await?; transaction.commit().await?; - Ok(orphaned_files) + Ok(DeletionQueue { + files: orphaned_files, + ipfs_objects: orphaned_ipfs_objects, + }) } diff --git a/src/models/profiles/queries.rs b/src/models/profiles/queries.rs index 1f6ca67..52bdc44 100644 --- a/src/models/profiles/queries.rs +++ b/src/models/profiles/queries.rs @@ -2,7 +2,11 @@ use tokio_postgres::GenericClient; use uuid::Uuid; use crate::errors::DatabaseError; -use crate::models::attachments::queries::find_orphaned_files; +use crate::models::cleanup::{ + find_orphaned_files, + find_orphaned_ipfs_objects, + DeletionQueue, +}; use super::types::{ ExtraFields, DbActorProfile, @@ -180,11 +184,11 @@ pub async fn get_followers( Ok(profiles) } -/// Deletes profile from database and returns list of orphaned files. +/// Deletes profile from database and returns collection of orphaned objects. pub async fn delete_profile( db_client: &mut impl GenericClient, profile_id: &Uuid, -) -> Result, DatabaseError> { +) -> Result { let transaction = db_client.transaction().await?; // Get list of media files owned by actor let files_rows = transaction.query( @@ -200,6 +204,22 @@ pub async fn delete_profile( let files: Vec = files_rows.iter() .map(|row| row.try_get("file_name")) .collect::>()?; + // Get list of IPFS objects owned by actor + let ipfs_objects_rows = transaction.query( + " + SELECT ipfs_cid + FROM media_attachment + WHERE owner_id = $1 AND ipfs_cid IS NOT NULL + UNION ALL + SELECT ipfs_cid + FROM post + WHERE author_id = $1 AND ipfs_cid IS NOT NULL + ", + &[&profile_id], + ).await?; + let ipfs_objects: Vec = ipfs_objects_rows.iter() + .map(|row| row.try_get("ipfs_cid")) + .collect::>()?; // Update counters transaction.execute( " @@ -235,8 +255,12 @@ pub async fn delete_profile( return Err(DatabaseError::NotFound("profile")); } let orphaned_files = find_orphaned_files(&transaction, files).await?; + let orphaned_ipfs_objects = find_orphaned_ipfs_objects(&transaction, ipfs_objects).await?; transaction.commit().await?; - Ok(orphaned_files) + Ok(DeletionQueue { + files: orphaned_files, + ipfs_objects: orphaned_ipfs_objects, + }) } pub async fn search_profile(