Unpin and remove orphaned IPFS objects from local node
This commit is contained in:
parent
90aac4d162
commit
c41cb16d23
8 changed files with 171 additions and 42 deletions
|
@ -13,7 +13,6 @@ use mitra::models::users::queries::{
|
||||||
generate_invite_code,
|
generate_invite_code,
|
||||||
get_invite_codes,
|
get_invite_codes,
|
||||||
};
|
};
|
||||||
use mitra::utils::files::remove_files;
|
|
||||||
|
|
||||||
/// Admin CLI tool
|
/// Admin CLI tool
|
||||||
#[derive(Clap)]
|
#[derive(Clap)]
|
||||||
|
@ -68,13 +67,13 @@ async fn main() {
|
||||||
|
|
||||||
match opts.subcmd {
|
match opts.subcmd {
|
||||||
SubCommand::DeleteProfile(subopts) => {
|
SubCommand::DeleteProfile(subopts) => {
|
||||||
let orphaned_files = delete_profile(db_client, &subopts.id).await.unwrap();
|
let deletion_queue = delete_profile(db_client, &subopts.id).await.unwrap();
|
||||||
remove_files(orphaned_files, &config.media_dir());
|
deletion_queue.process(&config).await;
|
||||||
println!("profile deleted");
|
println!("profile deleted");
|
||||||
},
|
},
|
||||||
SubCommand::DeletePost(subopts) => {
|
SubCommand::DeletePost(subopts) => {
|
||||||
let orphaned_files = delete_post(db_client, &subopts.id).await.unwrap();
|
let deletion_queue = delete_post(db_client, &subopts.id).await.unwrap();
|
||||||
remove_files(orphaned_files, &config.media_dir());
|
deletion_queue.process(&config).await;
|
||||||
println!("post deleted");
|
println!("post deleted");
|
||||||
},
|
},
|
||||||
SubCommand::GenerateInviteCode(_) => {
|
SubCommand::GenerateInviteCode(_) => {
|
||||||
|
|
|
@ -9,7 +9,7 @@ struct ObjectAdded {
|
||||||
hash: String,
|
hash: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add file to IPFS.
|
/// Adds file to IPFS.
|
||||||
/// Returns CID v1 of the object.
|
/// Returns CID v1 of the object.
|
||||||
pub async fn add(ipfs_api_url: &str, data: Vec<u8>) -> Result<String, reqwest::Error> {
|
pub async fn add(ipfs_api_url: &str, data: Vec<u8>) -> Result<String, reqwest::Error> {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
|
@ -19,8 +19,34 @@ pub async fn add(ipfs_api_url: &str, data: Vec<u8>) -> Result<String, reqwest::E
|
||||||
let response = client.post(&url)
|
let response = client.post(&url)
|
||||||
.query(&[("cid-version", 1)])
|
.query(&[("cid-version", 1)])
|
||||||
.multipart(form)
|
.multipart(form)
|
||||||
.send()
|
.send().await?;
|
||||||
.await?;
|
response.error_for_status_ref()?;
|
||||||
let info: ObjectAdded = response.json().await?;
|
let info: ObjectAdded = response.json().await?;
|
||||||
Ok(info.hash)
|
Ok(info.hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Unpins and removes files from local IPFS node.
|
||||||
|
pub async fn remove(
|
||||||
|
ipfs_api_url: &str,
|
||||||
|
cids: Vec<String>,
|
||||||
|
) -> Result<(), reqwest::Error> {
|
||||||
|
let client = Client::new();
|
||||||
|
let remove_pin_url = format!("{}/api/v0/pin/rm", ipfs_api_url);
|
||||||
|
let mut remove_pin_args = vec![];
|
||||||
|
for cid in cids {
|
||||||
|
log::info!("removing {} from IPFS node", cid);
|
||||||
|
remove_pin_args.push(("arg", cid));
|
||||||
|
}
|
||||||
|
let remove_pin_response = client.post(&remove_pin_url)
|
||||||
|
.query(&remove_pin_args)
|
||||||
|
.query(&[("recursive", true)])
|
||||||
|
.send().await?;
|
||||||
|
remove_pin_response.error_for_status()?;
|
||||||
|
let gc_url = format!("{}/api/v0/repo/gc", ipfs_api_url);
|
||||||
|
// Garbage collecting can take a long time
|
||||||
|
// https://github.com/ipfs/go-ipfs/issues/7752
|
||||||
|
let gc_response = client.post(&gc_url)
|
||||||
|
.send().await?;
|
||||||
|
gc_response.error_for_status()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
@ -10,5 +10,5 @@ pub mod mastodon_api;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod nodeinfo;
|
pub mod nodeinfo;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod utils;
|
mod utils;
|
||||||
pub mod webfinger;
|
pub mod webfinger;
|
||||||
|
|
|
@ -23,31 +23,6 @@ pub async fn create_attachment(
|
||||||
Ok(db_attachment)
|
Ok(db_attachment)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn find_orphaned_files(
|
|
||||||
db_client: &impl GenericClient,
|
|
||||||
files: Vec<String>,
|
|
||||||
) -> Result<Vec<String>, DatabaseError> {
|
|
||||||
let rows = db_client.query(
|
|
||||||
"
|
|
||||||
SELECT fname
|
|
||||||
FROM unnest($1::text[]) AS fname
|
|
||||||
WHERE
|
|
||||||
NOT EXISTS (
|
|
||||||
SELECT 1 FROM media_attachment WHERE file_name = fname
|
|
||||||
)
|
|
||||||
AND NOT EXISTS (
|
|
||||||
SELECT 1 FROM actor_profile
|
|
||||||
WHERE avatar_file_name = fname OR banner_file_name = fname
|
|
||||||
)
|
|
||||||
",
|
|
||||||
&[&files],
|
|
||||||
).await?;
|
|
||||||
let orphaned_files = rows.iter()
|
|
||||||
.map(|row| row.try_get("fname"))
|
|
||||||
.collect::<Result<_, _>>()?;
|
|
||||||
Ok(orphaned_files)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn set_attachment_ipfs_cid(
|
pub async fn set_attachment_ipfs_cid(
|
||||||
db_client: &impl GenericClient,
|
db_client: &impl GenericClient,
|
||||||
attachment_id: &Uuid,
|
attachment_id: &Uuid,
|
||||||
|
|
80
src/models/cleanup.rs
Normal file
80
src/models/cleanup.rs
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
use tokio_postgres::GenericClient;
|
||||||
|
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::errors::DatabaseError;
|
||||||
|
use crate::ipfs::store as ipfs_store;
|
||||||
|
use crate::utils::files::remove_files;
|
||||||
|
|
||||||
|
pub struct DeletionQueue {
|
||||||
|
pub files: Vec<String>,
|
||||||
|
pub ipfs_objects: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DeletionQueue {
|
||||||
|
pub async fn process(self, config: &Config) -> () {
|
||||||
|
remove_files(self.files, &config.media_dir());
|
||||||
|
if self.ipfs_objects.len() > 0 {
|
||||||
|
match &config.ipfs_api_url {
|
||||||
|
Some(ipfs_api_url) => {
|
||||||
|
ipfs_store::remove(ipfs_api_url, self.ipfs_objects).await
|
||||||
|
.unwrap_or_else(|err| log::error!("{}", err));
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"can not remove objects because IPFS API URL is not set: {:?}",
|
||||||
|
self.ipfs_objects,
|
||||||
|
);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn find_orphaned_files(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
files: Vec<String>,
|
||||||
|
) -> Result<Vec<String>, DatabaseError> {
|
||||||
|
let rows = db_client.query(
|
||||||
|
"
|
||||||
|
SELECT fname
|
||||||
|
FROM unnest($1::text[]) AS fname
|
||||||
|
WHERE
|
||||||
|
NOT EXISTS (
|
||||||
|
SELECT 1 FROM media_attachment WHERE file_name = fname
|
||||||
|
)
|
||||||
|
AND NOT EXISTS (
|
||||||
|
SELECT 1 FROM actor_profile
|
||||||
|
WHERE avatar_file_name = fname OR banner_file_name = fname
|
||||||
|
)
|
||||||
|
",
|
||||||
|
&[&files],
|
||||||
|
).await?;
|
||||||
|
let orphaned_files = rows.iter()
|
||||||
|
.map(|row| row.try_get("fname"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
Ok(orphaned_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn find_orphaned_ipfs_objects(
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
ipfs_objects: Vec<String>,
|
||||||
|
) -> Result<Vec<String>, DatabaseError> {
|
||||||
|
let rows = db_client.query(
|
||||||
|
"
|
||||||
|
SELECT cid
|
||||||
|
FROM unnest($1::text[]) AS cid
|
||||||
|
WHERE
|
||||||
|
NOT EXISTS (
|
||||||
|
SELECT 1 FROM media_attachment WHERE ipfs_cid = cid
|
||||||
|
)
|
||||||
|
AND NOT EXISTS (
|
||||||
|
SELECT 1 FROM post WHERE ipfs_cid = cid
|
||||||
|
)
|
||||||
|
",
|
||||||
|
&[&ipfs_objects],
|
||||||
|
).await?;
|
||||||
|
let orphaned_ipfs_objects = rows.iter()
|
||||||
|
.map(|row| row.try_get("cid"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
Ok(orphaned_ipfs_objects)
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
pub mod attachments;
|
pub mod attachments;
|
||||||
|
mod cleanup;
|
||||||
pub mod posts;
|
pub mod posts;
|
||||||
pub mod profiles;
|
pub mod profiles;
|
||||||
pub mod relationships;
|
pub mod relationships;
|
||||||
|
|
|
@ -5,8 +5,12 @@ use tokio_postgres::GenericClient;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::errors::DatabaseError;
|
use crate::errors::DatabaseError;
|
||||||
use crate::models::attachments::queries::find_orphaned_files;
|
|
||||||
use crate::models::attachments::types::DbMediaAttachment;
|
use crate::models::attachments::types::DbMediaAttachment;
|
||||||
|
use crate::models::cleanup::{
|
||||||
|
find_orphaned_files,
|
||||||
|
find_orphaned_ipfs_objects,
|
||||||
|
DeletionQueue,
|
||||||
|
};
|
||||||
use crate::models::profiles::queries::update_post_count;
|
use crate::models::profiles::queries::update_post_count;
|
||||||
use super::types::{DbPost, Post, PostCreateData};
|
use super::types::{DbPost, Post, PostCreateData};
|
||||||
|
|
||||||
|
@ -294,11 +298,11 @@ pub async fn get_token_waitlist(
|
||||||
Ok(waitlist)
|
Ok(waitlist)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes post from database and returns list of orphaned files.
|
/// Deletes post from database and returns collection of orphaned objects.
|
||||||
pub async fn delete_post(
|
pub async fn delete_post(
|
||||||
db_client: &mut impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
post_id: &Uuid,
|
post_id: &Uuid,
|
||||||
) -> Result<Vec<String>, DatabaseError> {
|
) -> Result<DeletionQueue, DatabaseError> {
|
||||||
let transaction = db_client.transaction().await?;
|
let transaction = db_client.transaction().await?;
|
||||||
// Get list of attached files
|
// Get list of attached files
|
||||||
let files_rows = transaction.query(
|
let files_rows = transaction.query(
|
||||||
|
@ -311,6 +315,22 @@ pub async fn delete_post(
|
||||||
let files: Vec<String> = files_rows.iter()
|
let files: Vec<String> = files_rows.iter()
|
||||||
.map(|row| row.try_get("file_name"))
|
.map(|row| row.try_get("file_name"))
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
// Get list of linked IPFS objects
|
||||||
|
let ipfs_objects_rows = transaction.query(
|
||||||
|
"
|
||||||
|
SELECT ipfs_cid
|
||||||
|
FROM media_attachment
|
||||||
|
WHERE post_id = $1 AND ipfs_cid IS NOT NULL
|
||||||
|
UNION ALL
|
||||||
|
SELECT ipfs_cid
|
||||||
|
FROM post
|
||||||
|
WHERE id = $1 AND ipfs_cid IS NOT NULL
|
||||||
|
",
|
||||||
|
&[&post_id],
|
||||||
|
).await?;
|
||||||
|
let ipfs_objects: Vec<String> = ipfs_objects_rows.iter()
|
||||||
|
.map(|row| row.try_get("ipfs_cid"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
// Delete post
|
// Delete post
|
||||||
let maybe_post_row = transaction.query_opt(
|
let maybe_post_row = transaction.query_opt(
|
||||||
"
|
"
|
||||||
|
@ -327,6 +347,10 @@ pub async fn delete_post(
|
||||||
}
|
}
|
||||||
update_post_count(&transaction, &db_post.author_id, -1).await?;
|
update_post_count(&transaction, &db_post.author_id, -1).await?;
|
||||||
let orphaned_files = find_orphaned_files(&transaction, files).await?;
|
let orphaned_files = find_orphaned_files(&transaction, files).await?;
|
||||||
|
let orphaned_ipfs_objects = find_orphaned_ipfs_objects(&transaction, ipfs_objects).await?;
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
Ok(orphaned_files)
|
Ok(DeletionQueue {
|
||||||
|
files: orphaned_files,
|
||||||
|
ipfs_objects: orphaned_ipfs_objects,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,11 @@ use tokio_postgres::GenericClient;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::errors::DatabaseError;
|
use crate::errors::DatabaseError;
|
||||||
use crate::models::attachments::queries::find_orphaned_files;
|
use crate::models::cleanup::{
|
||||||
|
find_orphaned_files,
|
||||||
|
find_orphaned_ipfs_objects,
|
||||||
|
DeletionQueue,
|
||||||
|
};
|
||||||
use super::types::{
|
use super::types::{
|
||||||
ExtraFields,
|
ExtraFields,
|
||||||
DbActorProfile,
|
DbActorProfile,
|
||||||
|
@ -180,11 +184,11 @@ pub async fn get_followers(
|
||||||
Ok(profiles)
|
Ok(profiles)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes profile from database and returns list of orphaned files.
|
/// Deletes profile from database and returns collection of orphaned objects.
|
||||||
pub async fn delete_profile(
|
pub async fn delete_profile(
|
||||||
db_client: &mut impl GenericClient,
|
db_client: &mut impl GenericClient,
|
||||||
profile_id: &Uuid,
|
profile_id: &Uuid,
|
||||||
) -> Result<Vec<String>, DatabaseError> {
|
) -> Result<DeletionQueue, DatabaseError> {
|
||||||
let transaction = db_client.transaction().await?;
|
let transaction = db_client.transaction().await?;
|
||||||
// Get list of media files owned by actor
|
// Get list of media files owned by actor
|
||||||
let files_rows = transaction.query(
|
let files_rows = transaction.query(
|
||||||
|
@ -200,6 +204,22 @@ pub async fn delete_profile(
|
||||||
let files: Vec<String> = files_rows.iter()
|
let files: Vec<String> = files_rows.iter()
|
||||||
.map(|row| row.try_get("file_name"))
|
.map(|row| row.try_get("file_name"))
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
// Get list of IPFS objects owned by actor
|
||||||
|
let ipfs_objects_rows = transaction.query(
|
||||||
|
"
|
||||||
|
SELECT ipfs_cid
|
||||||
|
FROM media_attachment
|
||||||
|
WHERE owner_id = $1 AND ipfs_cid IS NOT NULL
|
||||||
|
UNION ALL
|
||||||
|
SELECT ipfs_cid
|
||||||
|
FROM post
|
||||||
|
WHERE author_id = $1 AND ipfs_cid IS NOT NULL
|
||||||
|
",
|
||||||
|
&[&profile_id],
|
||||||
|
).await?;
|
||||||
|
let ipfs_objects: Vec<String> = ipfs_objects_rows.iter()
|
||||||
|
.map(|row| row.try_get("ipfs_cid"))
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
// Update counters
|
// Update counters
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"
|
"
|
||||||
|
@ -235,8 +255,12 @@ pub async fn delete_profile(
|
||||||
return Err(DatabaseError::NotFound("profile"));
|
return Err(DatabaseError::NotFound("profile"));
|
||||||
}
|
}
|
||||||
let orphaned_files = find_orphaned_files(&transaction, files).await?;
|
let orphaned_files = find_orphaned_files(&transaction, files).await?;
|
||||||
|
let orphaned_ipfs_objects = find_orphaned_ipfs_objects(&transaction, ipfs_objects).await?;
|
||||||
transaction.commit().await?;
|
transaction.commit().await?;
|
||||||
Ok(orphaned_files)
|
Ok(DeletionQueue {
|
||||||
|
files: orphaned_files,
|
||||||
|
ipfs_objects: orphaned_ipfs_objects,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn search_profile(
|
pub async fn search_profile(
|
||||||
|
|
Loading…
Reference in a new issue