1
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs.git synced 2025-04-23 00:24:36 +00:00

try adding concurrency to repo migration

This commit is contained in:
asonix 2025-04-04 13:59:59 -05:00
parent 0633784f8b
commit c9fae6d36e
3 changed files with 48 additions and 14 deletions

View file

@ -398,7 +398,7 @@ impl Args {
}
}
}
Command::MigrateRepo(MigrateRepo { repo }) => {
Command::MigrateRepo(MigrateRepo { concurrency, repo }) => {
let server = Server::default();
let client = Client::default();
let upgrade = Upgrade::default();
@ -420,6 +420,7 @@ impl Args {
store: None,
},
operation: Operation::MigrateRepo {
concurrency,
from: from.into(),
to: to.into(),
},
@ -439,6 +440,7 @@ impl Args {
store: None,
},
operation: Operation::MigrateRepo {
concurrency,
from: from.into(),
to: to.into(),
},
@ -460,6 +462,7 @@ impl Args {
store: None,
},
operation: Operation::MigrateRepo {
concurrency,
from: from.into(),
to: to.into(),
},
@ -479,6 +482,7 @@ impl Args {
store: None,
},
operation: Operation::MigrateRepo {
concurrency,
from: from.into(),
to: to.into(),
},
@ -510,6 +514,7 @@ pub(crate) enum Operation {
to: crate::config::primitives::Store,
},
MigrateRepo {
concurrency: usize,
from: crate::config::file::Repo,
to: crate::config::file::Repo,
},
@ -1231,8 +1236,9 @@ struct Run {
#[arg(long)]
read_only: bool,
/// Allow running without ffmpeg, imagemagick, or exiftool. This will allow hosting arbitrary
/// files and provide inaccurate metadata for uploaded media
/// Allow running without ffmpeg, imagemagick, or exiftool.
///
/// This will allow hosting arbitrary files and provide inaccurate metadata for uploaded media
#[arg(long)]
danger_dummy_mode: bool,
@ -1273,9 +1279,10 @@ struct MigrateStore {
#[arg(long)]
skip_missing_files: bool,
/// How many hashes pict-rs should attempt to migrate at the same time. This does not
/// correspond to a thread count, but instead how many in-flight migrations can happen.
/// Increasing this number may improve throughput
/// How many hashes pict-rs should attempt to migrate at the same time.
///
/// This does not correspond to a thread count, but instead how many in-flight migrations can
/// happen. Increasing this number may improve throughput
#[arg(long, default_value = "32")]
concurrency: usize,
@ -1285,6 +1292,13 @@ struct MigrateStore {
#[derive(Debug, Parser)]
struct MigrateRepo {
/// How many hashes pict-rs should attempt to migrate at the same time.
///
/// This does not correspond to a thread count, but instead how many in-flight migrations can
/// happen. Increasing this number may improve throughput
#[arg(short, long, default_value = "32")]
concurrency: usize,
#[command(subcommand)]
repo: MigrateRepoFrom,
}

View file

@ -2208,11 +2208,15 @@ impl PictRsConfiguration {
return Ok(());
}
Operation::MigrateRepo { from, to } => {
Operation::MigrateRepo {
concurrency,
from,
to,
} => {
let from = Repo::open(from).await?.to_arc();
let to = Repo::open(to).await?.to_arc();
repo::migrate_repo(from, to).await?;
repo::migrate_repo(from, to, concurrency).await?;
return Ok(());
}
}

View file

@ -19,7 +19,11 @@ use crate::{
};
#[tracing::instrument(skip_all)]
pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result<(), Error> {
pub(crate) async fn migrate_repo(
old_repo: ArcRepo,
new_repo: ArcRepo,
concurrency: usize,
) -> Result<(), Error> {
tracing::info!("Running checks");
if let Err(e) = old_repo.health_check().await {
tracing::warn!("Old repo is not configured correctly");
@ -35,6 +39,8 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
tracing::info!("Checks complete, migrating repo");
tracing::info!("{total_size} hashes will be migrated");
let mut set = JoinSet::new();
let hash_stream = std::pin::pin!(old_repo.hashes());
let mut hash_stream = hash_stream.into_streamer();
@ -43,17 +49,24 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
tracing::trace!("migrate_repo: looping");
if let Ok(hash) = res {
migrate_hash(old_repo.clone(), new_repo.clone(), hash).await;
set.spawn_local(migrate_hash(old_repo.clone(), new_repo.clone(), hash));
} else {
tracing::warn!("Failed to read hash, skipping");
}
index += 1;
while set.len() >= concurrency {
tracing::trace!("migrate: join looping");
if set.join_next().await.is_some() {
index += 1;
if index % pct == 0 {
let percent = index / pct;
if index % pct == 0 {
let percent = index / pct;
tracing::info!("Migration {percent}% complete - {index}/{total_size}");
tracing::info!("Migration {percent}% complete - {index}/{total_size}");
}
} else {
break;
}
}
}
@ -121,6 +134,8 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
tracing::info!("Migration {percent}% complete - {index}/{total_size}");
}
} else {
break;
}
}
}
@ -149,6 +164,7 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
Ok(())
}
#[tracing::instrument(skip_all)]
async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
let mut hash_failures = 0;