Enhance migrations to be more resilient to panics

This commit is contained in:
asonix 2020-10-16 15:27:54 -05:00
parent e19f03ba6c
commit 577d358da1
6 changed files with 175 additions and 51 deletions

2
Cargo.lock generated
View file

@ -1446,7 +1446,7 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pict-rs"
version = "0.2.2"
version = "0.2.3"
dependencies = [
"actix-form-data",
"actix-fs",

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.2.2"
version = "0.2.3"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"

View file

@ -47,6 +47,8 @@ trait SledTree {
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>;
fn flush(&self) -> Result<(), UploadError>;
}
pub(crate) struct LatestDb {
@ -64,10 +66,20 @@ impl LatestDb {
pub(crate) fn migrate(self) -> Result<sled::Db, UploadError> {
let LatestDb { root_dir, version } = self;
version.migrate(root_dir)
loop {
let root_dir2 = root_dir.clone();
let res = std::panic::catch_unwind(move || {
version.migrate(root_dir2)
});
if let Ok(res) = res {
return res;
}
}
}
}
#[derive(Clone, Copy)]
enum DbVersion {
Sled0320Rc1,
Sled032,
@ -77,7 +89,7 @@ enum DbVersion {
impl DbVersion {
fn exists(root: PathBuf) -> Self {
if s034::exists(root.clone()) {
if s034::exists(root.clone()) && !s034::migrating(root.clone()) {
return DbVersion::Sled034;
}
@ -127,20 +139,48 @@ where
let old_alias_tree = old_db.open_tree("alias")?;
let new_alias_tree = new_db.open_tree("alias")?;
let old_fname_tree = old_db.open_tree("filename")?;
let new_fname_tree = new_db.open_tree("filename")?;
let new_migrate_tree = new_db.open_tree("migrate")?;
if let Some(_) = new_migrate_tree.get("done")? {
return Ok(new_db);
}
for res in old_alias_tree.iter() {
let (iterator, mut counter) = if let Some(last_migrated) = new_migrate_tree.get("last_migrated")? {
let mut last_migrated = String::from_utf8_lossy(&last_migrated).to_string();
info!("Previous migration failed after {}, attempting to skip", last_migrated);
if let Some(index) = last_migrated.find('.') {
last_migrated = last_migrated.split_at(index).0.to_owned();
}
if last_migrated.len() > 3 {
last_migrated = last_migrated.split_at(3).0.to_owned();
}
let last_migrated = increment_alphanumeric(&last_migrated).as_bytes().to_owned();
new_migrate_tree.insert("last_migrated", last_migrated.clone())?;
new_migrate_tree.flush()?;
if let Some(count) = new_migrate_tree.get("counter")? {
(old_alias_tree.range(last_migrated..), String::from_utf8_lossy(&count).parse::<usize>().unwrap())
} else {
(old_alias_tree.range(last_migrated..), 0)
}
} else {
(old_alias_tree.iter(), 0)
};
for res in iterator {
let (k, _) = res?;
if let Some(v) = old_alias_tree.get(&k)? {
if !k.contains(&b"/"[0]) {
// k is an alias
migrate_main_tree(&k, &v, &old_db, &new_db)?;
debug!(
"Moving alias -> hash for alias {}",
String::from_utf8_lossy(k.as_ref()),
);
if let Some(id) = old_alias_tree.get(alias_id_key(&String::from_utf8_lossy(&k)))? {
counter += 1;
info!("Migrating alias #{}", counter);
// k is an alias
migrate_main_tree(&k, &v, &old_db, &new_db, &String::from_utf8_lossy(&id))?;
debug!(
"Moving alias -> hash for alias {}",
String::from_utf8_lossy(k.as_ref()),
);
new_migrate_tree.insert("counter", format!("{}", counter))?;
}
} else {
debug!(
"Moving {}, {}",
@ -148,24 +188,16 @@ where
String::from_utf8_lossy(v.as_ref())
);
}
new_alias_tree.insert(k, v)?;
new_alias_tree.insert(k.clone(), v)?;
new_migrate_tree.insert("last_migrated", k)?;
new_migrate_tree.flush()?;
} else {
warn!("MISSING {}", String::from_utf8_lossy(k.as_ref()));
}
}
info!("Moved {} unique aliases", counter);
for res in old_fname_tree.iter() {
let (k, _) = res?;
if let Some(v) = old_fname_tree.get(&k)? {
debug!(
"Moving file -> hash for file {}",
String::from_utf8_lossy(k.as_ref()),
);
new_fname_tree.insert(&k, &v)?;
} else {
warn!("MISSING {}", String::from_utf8_lossy(k.as_ref()));
}
}
new_migrate_tree.insert("done", "true")?;
Ok(new_db)
}
@ -175,6 +207,7 @@ fn migrate_main_tree<Old, New>(
hash: &[u8],
old_db: Old,
new_db: New,
id: &str,
) -> Result<(), UploadError>
where
Old: SledDb,
@ -182,28 +215,38 @@ where
{
let main_tree = new_db.open_tree("main")?;
let new_fname_tree = new_db.open_tree("filename")?;
debug!(
"Migrating files for {}",
String::from_utf8_lossy(alias.as_ref())
);
if let Some(v) = old_db.self_tree().get(&hash)? {
main_tree.insert(&hash, v)?;
if let Some(filename) = old_db.self_tree().get(&hash)? {
main_tree.insert(&hash, filename.clone())?;
new_fname_tree.insert(filename, hash.clone())?;
} else {
warn!("Missing filename");
}
let (start, end) = alias_key_bounds(&hash);
for res in old_db.self_tree().range(start..end) {
let (k, v) = res?;
debug!("Moving alias {}", String::from_utf8_lossy(v.as_ref()));
main_tree.insert(k, v)?;
let key = alias_key(&hash, id);
if let Some(v) = old_db.self_tree().get(&key)? {
main_tree.insert(key, v)?;
} else {
warn!("Not migrating alias {} id {}", String::from_utf8_lossy(&alias), id);
return Ok(());
}
let (start, end) = variant_key_bounds(&hash);
for res in old_db.self_tree().range(start..end) {
let (k, v) = res?;
debug!("Moving variant {}", String::from_utf8_lossy(v.as_ref()));
main_tree.insert(k, v)?;
if main_tree.range(start.clone()..end.clone()).next().is_none() {
let mut counter = 0;
for res in old_db.self_tree().range(start.clone()..end.clone()) {
counter += 1;
let (k, v) = res?;
debug!("Moving variant #{} for {}", counter, String::from_utf8_lossy(v.as_ref()));
main_tree.insert(k, v)?;
}
info!("Moved {} variants for {}", counter, String::from_utf8_lossy(alias.as_ref()));
}
Ok(())
@ -228,3 +271,65 @@ pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
(start, end)
}
pub(crate) fn alias_id_key(alias: &str) -> String {
format!("{}/id", alias)
}
pub(crate) fn alias_key(hash: &[u8], id: &str) -> Vec<u8> {
let mut key = hash.to_vec();
// add a separator to the key between the hash and the ID
key.extend(&[0]);
key.extend(id.as_bytes());
key
}
const VALID: &[char] = &[
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
];
fn increment_alphanumeric(input: &str) -> String {
let (_, output) = input.chars().rev().fold((true, String::new()), |(incr_next, mut acc), item| {
if incr_next {
let mut index = None;
for (i, test) in VALID.iter().enumerate() {
if *test == item {
index = Some(i);
}
}
let index = index.unwrap_or(0);
let (set_incr_next, next_index) = if index == (VALID.len() - 1) {
(true, 0)
} else {
(false, index + 1)
};
acc.extend(&[VALID[next_index]]);
(set_incr_next, acc)
} else {
acc.extend(&[item]);
(false, acc)
}
});
output.chars().rev().collect()
}
#[cfg(test)]
mod tests {
use super::increment_alphanumeric;
#[test]
fn increments() {
assert_eq!(increment_alphanumeric("hello"), "hellp");
assert_eq!(increment_alphanumeric("0"), "1");
assert_eq!(increment_alphanumeric("9"), "A");
assert_eq!(increment_alphanumeric("Z"), "a");
assert_eq!(increment_alphanumeric("z"), "0");
assert_eq!(increment_alphanumeric("az"), "b0");
assert_eq!(increment_alphanumeric("19"), "1A");
assert_eq!(increment_alphanumeric("AZ"), "Aa");
}
}

View file

@ -78,4 +78,8 @@ impl SledTree for sled032::Tree {
.map_err(UploadError::from)
}))
}
fn flush(&self) -> Result<(), UploadError> {
sled032::Tree::flush(self).map(|_| ()).map_err(UploadError::from)
}
}

View file

@ -14,6 +14,18 @@ pub(crate) fn exists(mut base: PathBuf) -> bool {
std::fs::metadata(base).is_ok()
}
pub(crate) fn migrating(base: PathBuf) -> bool {
if let Ok(db) = open(base) {
if let Ok(tree) = db.open_tree("migrate") {
if let Ok(Some(_)) = tree.get("done") {
return false;
}
}
}
true
}
pub(crate) fn open(mut base: PathBuf) -> Result<sled034::Db, UploadError> {
base.push("sled");
base.push(SLED_034);
@ -66,4 +78,8 @@ impl SledTree for sled034::Tree {
.map_err(UploadError::from)
}))
}
fn flush(&self) -> Result<(), UploadError> {
sled034::Tree::flush(self).map(|_| ()).map_err(UploadError::from)
}
}

View file

@ -1,7 +1,7 @@
use crate::{
config::Format,
error::UploadError,
migrate::{alias_key_bounds, variant_key_bounds, LatestDb},
migrate::{alias_key_bounds, variant_key_bounds, LatestDb, alias_id_key, alias_key},
to_ext,
validate::validate_image,
};
@ -11,6 +11,18 @@ use sha2::Digest;
use std::{path::PathBuf, pin::Pin, sync::Arc};
use tracing::{debug, error, info, instrument, warn, Span};
// TREE STRUCTURE
// - Alias Tree
// - alias -> hash
// - alias / id -> u64(id)
// - alias / delete -> delete token
// - Main Tree
// - hash -> filename
// - hash 0 u64(id) -> alias
// - hash 2 variant path -> variant path
// - Filename Tree
// - filename -> hash
#[derive(Clone)]
pub struct UploadManager {
inner: Arc<UploadManagerInner>,
@ -740,19 +752,6 @@ fn file_name(name: String, content_type: mime::Mime) -> Result<String, UploadErr
Ok(format!("{}{}", name, to_ext(content_type)?))
}
fn alias_key(hash: &[u8], id: &str) -> Vec<u8> {
let mut key = hash.to_vec();
// add a separator to the key between the hash and the ID
key.extend(&[0]);
key.extend(id.as_bytes());
key
}
fn alias_id_key(alias: &str) -> String {
format!("{}/id", alias)
}
fn delete_key(alias: &str) -> String {
format!("{}/delete", alias)
}