Don't overwrite existing variants

This commit is contained in:
asonix 2023-08-16 15:12:16 -05:00
parent 22cfbe979d
commit 1559d57f0a
5 changed files with 33 additions and 18 deletions

View file

@ -4,7 +4,7 @@ use crate::{
error::{Error, UploadError},
ffmpeg::ThumbnailFormat,
formats::{InputProcessableFormat, InternalVideoFormat},
repo::{Alias, ArcRepo, Hash},
repo::{Alias, ArcRepo, Hash, VariantAlreadyExists},
store::{Identifier, Store},
};
use actix_web::web::Bytes;
@ -167,13 +167,19 @@ async fn process<S: Store + 'static>(
let identifier = store
.save_bytes(bytes.clone(), details.media_type())
.await?;
if let Err(VariantAlreadyExists) = repo
.relate_variant_identifier(
hash,
thumbnail_path.to_string_lossy().to_string(),
&identifier,
)
.await?
{
store.remove(&identifier).await?;
}
repo.relate_details(&identifier, &details).await?;
repo.relate_variant_identifier(
hash,
thumbnail_path.to_string_lossy().to_string(),
&identifier,
)
.await?;
guard.disarm();

View file

@ -253,7 +253,8 @@ where
Ok(new_identifier) => {
migrate_details(repo, &identifier, &new_identifier).await?;
repo.remove_variant(hash.clone(), variant.clone()).await?;
repo.relate_variant_identifier(hash.clone(), variant, &new_identifier)
let _ = repo
.relate_variant_identifier(hash.clone(), variant, &new_identifier)
.await?;
repo.mark_migrated(&identifier, &new_identifier).await?;

View file

@ -43,6 +43,8 @@ pub(crate) struct DeleteToken {
pub(crate) struct HashAlreadyExists;
#[derive(Debug)]
pub(crate) struct AliasAlreadyExists;
#[derive(Debug)]
pub(crate) struct VariantAlreadyExists;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
@ -474,7 +476,7 @@ pub(crate) trait HashRepo: BaseRepo {
hash: Hash,
variant: String,
identifier: &dyn Identifier,
) -> Result<(), StoreError>;
) -> Result<Result<(), VariantAlreadyExists>, StoreError>;
async fn variant_identifier(
&self,
hash: Hash,
@ -531,7 +533,7 @@ where
hash: Hash,
variant: String,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
) -> Result<Result<(), VariantAlreadyExists>, StoreError> {
T::relate_variant_identifier(self, hash, variant, identifier).await
}

View file

@ -173,7 +173,7 @@ async fn do_migrate_hash_04<S: Store>(
}
for (variant, identifier) in variants {
new_repo
let _ = new_repo
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
.await?;

View file

@ -4,7 +4,7 @@ use crate::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
},
serde_str::Serde,
store::StoreError,
@ -1053,18 +1053,24 @@ impl HashRepo for SledRepo {
hash: Hash,
variant: String,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
) -> Result<Result<(), VariantAlreadyExists>, StoreError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let value = identifier.to_bytes()?;
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.insert(key, value)
);
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
Ok(())
actix_rt::task::spawn_blocking(move || {
hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value))
.map(|res| res.map_err(|_| VariantAlreadyExists))
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(SledError::from)
.map_err(RepoError::from)
.map_err(StoreError::from)
}
#[tracing::instrument(level = "trace", skip(self))]