Increase client timeout, attempt to keep track of migration progress for resuming

This commit is contained in:
asonix 2023-01-04 15:58:32 -06:00
parent 7808cf1ee9
commit c55b1115d1

View file

@ -995,6 +995,7 @@ fn build_client() -> awc::Client {
Client::builder() Client::builder()
.wrap(Tracing) .wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .add_default_header(("User-Agent", "pict-rs v0.4.0-main"))
.timeout(Duration::from_secs(30))
.finish() .finish()
} }
@ -1310,6 +1311,8 @@ pub async fn run() -> color_eyre::Result<()> {
} }
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const STORE_MIGRATION_MOTION: &str = "store-migration-motion";
const STORE_MIGRATION_VARIANT: &str = "store-migration-variant";
async fn migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error> async fn migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
where where
@ -1320,23 +1323,49 @@ where
let stream = repo.hashes().await; let stream = repo.hashes().await;
let mut stream = Box::pin(stream); let mut stream = Box::pin(stream);
let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?;
while let Some(hash) = stream.next().await { while let Some(hash) = stream.next().await {
let hash = hash?; let hash = hash?;
if let Some(progress) = &progress_opt {
if progress.as_ref() == hash.as_ref() {
progress_opt.take();
}
continue;
}
if let Some(identifier) = repo if let Some(identifier) = repo
.motion_identifier(hash.as_ref().to_vec().into()) .motion_identifier(hash.as_ref().to_vec().into())
.await? .await?
{ {
let new_identifier = migrate_file(&from, &to, &identifier).await?; if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
migrate_details(repo, identifier, &new_identifier).await?; let new_identifier = migrate_file(&from, &to, &identifier).await?;
repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) migrate_details(repo, identifier, &new_identifier).await?;
.await?; repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
.await?;
}
} }
let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?;
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? {
if let Some(variant_progress) = &variant_progress_opt {
if variant.as_bytes() == variant_progress.as_ref() {
variant_progress_opt.take();
}
continue;
}
let new_identifier = migrate_file(&from, &to, &identifier).await?; let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?; migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?; .await?;
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into())
.await?;
} }
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?;
@ -1347,6 +1376,8 @@ where
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
.await?; .await?;
repo.remove(STORE_MIGRATION_VARIANT).await?;
repo.remove(STORE_MIGRATION_MOTION).await?;
} }
// clean up the migration key to avoid interfering with future migrations // clean up the migration key to avoid interfering with future migrations