Retry migrate_file, remove variant before relating new one

This commit is contained in:
asonix 2023-01-04 18:58:05 -06:00
parent 28f7a139a0
commit 5e5dd27a05
4 changed files with 45 additions and 13 deletions

2
Cargo.lock generated
View file

@ -1591,7 +1591,7 @@ dependencies = [
[[package]]
name = "pict-rs"
version = "0.4.0-beta.13"
version = "0.4.0-beta.14"
dependencies = [
"actix-form-data",
"actix-rt",

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.4.0-beta.13"
version = "0.4.0-beta.14"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"

View file

@ -42,7 +42,7 @@ use std::{
path::Path,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant, SystemTime},
time::{Duration, SystemTime},
};
use tokio::sync::Semaphore;
use tracing_actix_web::TracingLogger;
@ -1383,6 +1383,8 @@ where
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
.await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?;
@ -1417,18 +1419,38 @@ where
S1: Store,
S2: Store,
{
const CONST_TIME: Duration = Duration::from_millis(250);
let mut failure_count = 0;
let start = Instant::now();
loop {
match do_migrate_file(from, to, identifier).await {
Ok(identifier) => return Ok(identifier),
Err(e) => {
failure_count += 1;
tokio::time::sleep(Duration::from_secs(5)).await;
if failure_count > 50 {
tracing::error!("Error migrating file: {}", e.to_string());
return Err(e);
}
}
}
}
}
async fn do_migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
where
S1: Store,
S2: Store,
{
let stream = from.to_stream(identifier, None, None).await?;
let new_identifier = to.save_stream(stream).await?;
let elapsed = start.elapsed();
if elapsed < CONST_TIME {
tokio::time::sleep(CONST_TIME - elapsed).await;
}
Ok(new_identifier)
}

View file

@ -682,10 +682,20 @@ impl HashRepo for SledRepo {
.scan_prefix(&hash)
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = I::from_bytes(ivec.to_vec()).ok()?;
let variant = variant_from_key(&hash, &key)?;
let identifier = I::from_bytes(ivec.to_vec()).ok();
if identifier.is_none() {
tracing::warn!(
"Skipping an identifier: {}",
String::from_utf8_lossy(&ivec)
);
}
Some((variant, identifier))
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, sled::Error>
);