Remove is_missing error, make Identifier and DeleteToken return Option on fetch

This commit is contained in:
asonix 2023-07-07 13:17:26 -05:00
parent d16e81be20
commit 9e7376d411
8 changed files with 105 additions and 79 deletions

View file

@ -81,6 +81,9 @@ pub(crate) enum UploadError {
#[error("Requested a file that doesn't exist")]
MissingAlias,
#[error("Requested a file that pict-rs lost track of")]
MissingIdentifier,
#[error("Provided token did not match expected token")]
InvalidToken,
@ -169,12 +172,7 @@ impl ResponseError for Error {
| UploadError::Repo(crate::repo::RepoError::AlreadyClaimed)
| UploadError::SilentVideoDisabled,
) => StatusCode::BAD_REQUEST,
Some(
UploadError::Repo(crate::repo::RepoError::SledError(
crate::repo::sled::SledError::Missing(_),
))
| UploadError::MissingAlias,
) => StatusCode::NOT_FOUND,
Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND,
Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN,
Some(UploadError::Range) => StatusCode::RANGE_NOT_SATISFIABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -2,7 +2,7 @@ use crate::{
concurrent_processor::CancelSafeProcessor,
config::ImageFormat,
details::Details,
error::Error,
error::{Error, UploadError},
ffmpeg::{ThumbnailFormat, VideoFormat},
repo::{Alias, FullRepo},
store::Store,
@ -64,7 +64,10 @@ async fn process<R: FullRepo, S: Store + 'static>(
{
identifier
} else {
let identifier = repo.identifier(hash.clone()).await?;
let Some(identifier) = repo.identifier(hash.clone()).await? else {
return Err(UploadError::MissingIdentifier.into());
};
let reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,

View file

@ -3,7 +3,7 @@ use crate::{
either::Either,
error::{Error, UploadError},
magick::ValidInputType,
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, FullRepo, HashRepo},
store::Store,
CONFIG,
};
@ -156,8 +156,7 @@ where
tracing::trace!("Saving delete token");
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
if res.is_err() {
let delete_token = self.repo.delete_token(&alias).await?;
if let Err(AlreadyExists(delete_token)) = res {
tracing::trace!("Returning existing delete token, {:?}", delete_token);
return Ok(delete_token);
}
@ -232,7 +231,7 @@ where
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Ok(token) = repo.delete_token(&alias).await {
if let Ok(Some(token)) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();

View file

@ -822,7 +822,14 @@ async fn serve<R: FullRepo, S: Store + 'static>(
(hash, alias, true)
};
let identifier = repo.identifier(hash).await?;
let Some(identifier) = repo.identifier(hash.clone()).await? else {
tracing::warn!(
"Original File identifier for hash {} is missing, queue cleanup task",
hex::encode(&hash)
);
crate::queue::cleanup_hash(&repo, hash).await?;
return Ok(HttpResponse::NotFound().finish());
};
let details = ensure_details(&repo, &store, &alias).await?;
@ -1548,8 +1555,8 @@ where
}
let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await {
Ok(identifier) => identifier,
Err(e) if e.is_missing() => {
Ok(Some(identifier)) => identifier,
Ok(None) => {
tracing::warn!(
"Original File identifier for hash {} is missing, queue cleanup task",
hex::encode(&hash)

View file

@ -92,8 +92,10 @@ where
if !aliases.is_empty() {
for alias in aliases {
let token = repo.delete_token(&alias).await?;
super::cleanup_alias(repo, alias, token).await?;
// TODO: decide if it is okay to skip aliases without tokens
if let Some(token) = repo.delete_token(&alias).await? {
super::cleanup_alias(repo, alias, token).await?;
}
}
// Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned
return Ok(());
@ -105,7 +107,7 @@ where
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
idents.push(repo.identifier(hash.clone()).await?);
idents.extend(repo.identifier(hash.clone()).await?);
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
@ -123,7 +125,8 @@ where
R: FullRepo,
{
let saved_delete_token = repo.delete_token(&alias).await?;
if saved_delete_token != token {
if saved_delete_token.is_some() && saved_delete_token != Some(token) {
return Err(UploadError::InvalidToken.into());
}

View file

@ -34,7 +34,10 @@ pub(crate) struct DeleteToken {
id: MaybeUuid,
}
pub(crate) struct AlreadyExists;
pub(crate) struct HashAlreadyExists;
pub(crate) struct AliasAlreadyExists;
pub(crate) struct AlreadyExists(pub(super) DeleteToken);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
@ -56,15 +59,9 @@ pub(crate) enum RepoError {
#[error("Panic in blocking operation")]
Canceled,
}
impl RepoError {
pub(crate) const fn is_missing(&self) -> bool {
match self {
Self::SledError(e) => e.is_missing(),
_ => false,
}
}
#[error("Required field is missing {0}")]
Missing(&'static str),
}
#[async_trait::async_trait(?Send)]
@ -91,7 +88,7 @@ pub(crate) trait FullRepo:
return Ok(None);
};
self.identifier(hash).await.map(Some)
self.identifier(hash).await
}
#[tracing::instrument(skip(self))]
@ -112,7 +109,9 @@ pub(crate) trait FullRepo:
return Ok(None);
};
let identifier = self.identifier::<I>(hash.clone()).await?;
let Some(identifier) = self.identifier::<I>(hash.clone()).await? else {
return Ok(None);
};
match self.details(&identifier).await? {
Some(details) if details.is_motion() => self.motion_identifier::<I>(hash).await,
@ -270,7 +269,7 @@ pub(crate) trait HashRepo: BaseRepo {
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
@ -281,8 +280,10 @@ pub(crate) trait HashRepo: BaseRepo {
hash: Self::Bytes,
identifier: &I,
) -> Result<(), StoreError>;
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes)
-> Result<I, StoreError>;
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, StoreError>;
async fn relate_variant_identifier<I: Identifier>(
&self,
@ -329,7 +330,7 @@ where
T::hashes(self).await
}
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError> {
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError> {
T::create(self, hash).await
}
@ -356,7 +357,7 @@ where
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, StoreError> {
) -> Result<Option<I>, StoreError> {
T::identifier(self, hash).await
}
@ -410,14 +411,14 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo: BaseRepo {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError>;
async fn relate_delete_token(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError>;
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>;
async fn hash(&self, alias: &Alias) -> Result<Option<Self::Bytes>, RepoError>;
@ -430,7 +431,7 @@ impl<T> AliasRepo for actix_web::web::Data<T>
where
T: AliasRepo,
{
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError> {
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
T::create(self, alias).await
}
@ -442,7 +443,7 @@ where
T::relate_delete_token(self, alias, delete_token).await
}
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError> {
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
T::delete_token(self, alias).await
}
@ -612,7 +613,11 @@ where
}
}
let main_identifier = repo.identifier::<FileId>(hash.clone()).await?;
let Some(main_identifier) = repo.identifier::<FileId>(hash.clone()).await? else {
tracing::warn!("Missing identifier for hash {}, queueing cleanup", hex::encode(&hash));
crate::queue::cleanup_hash(repo, hash.clone()).await?;
return Err(RepoError::Missing("hash -> identifier").into());
};
if let Some(new_main_identifier) = main_identifier.normalize_for_migration() {
migrate_identifier_details(repo, &main_identifier, &new_main_identifier).await?;

View file

@ -1,14 +1,15 @@
use crate::{
repo::{
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo,
UploadId, UploadRepo, UploadResult,
},
serde_str::Serde,
store::StoreError,
stream::from_iterator,
};
use futures_util::Stream;
use sled::{Db, IVec, Tree};
use sled::{CompareAndSwapError, Db, IVec, Tree};
use std::{
collections::HashMap,
pin::Pin,
@ -44,19 +45,10 @@ pub(crate) enum SledError {
#[error("Invalid details json")]
Details(#[from] serde_json::Error),
#[error("Required field was not present: {0}")]
Missing(&'static str),
#[error("Operation panicked")]
Panic,
}
impl SledError {
pub(super) const fn is_missing(&self) -> bool {
matches!(self, Self::Missing(_))
}
}
#[derive(Clone)]
pub(crate) struct SledRepo {
healthz_count: Arc<AtomicU64>,
@ -456,13 +448,13 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError> {
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
hashes.compare_and_swap(hash, None as Option<Self::Bytes>, Some(hash2))
});
Ok(res.map_err(|_| AlreadyExists))
Ok(res.map_err(|_| HashAlreadyExists))
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
@ -515,13 +507,12 @@ impl HashRepo for SledRepo {
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, StoreError> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
) -> Result<Option<I>, StoreError> {
let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else {
return Ok(None);
};
opt.ok_or(SledError::Missing("hash -> identifier"))
.map_err(RepoError::from)
.map_err(StoreError::from)
.and_then(|ivec| I::from_bytes(ivec.to_vec()))
Ok(Some(I::from_bytes(ivec.to_vec())?))
}
#[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
@ -679,7 +670,7 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError> {
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -688,7 +679,7 @@ impl AliasRepo for SledRepo {
aliases.compare_and_swap(bytes, None as Option<Self::Bytes>, Some(bytes2))
);
Ok(res.map_err(|_| AlreadyExists))
Ok(res.map_err(|_| AliasAlreadyExists))
}
#[tracing::instrument(level = "trace", skip(self))]
@ -700,23 +691,50 @@ impl AliasRepo for SledRepo {
let key = alias.to_bytes();
let token = delete_token.to_bytes();
let res = b!(
self.alias_delete_tokens,
alias_delete_tokens.compare_and_swap(key, None as Option<Self::Bytes>, Some(token))
);
let res = b!(self.alias_delete_tokens, {
let mut prev: Option<Self::Bytes> = None;
Ok(res.map_err(|_| AlreadyExists))
loop {
let key = key.clone();
let token = token.clone();
let res = alias_delete_tokens.compare_and_swap(key, prev, Some(token))?;
match res {
Ok(()) => return Ok(Ok(())) as Result<_, SledError>,
Err(CompareAndSwapError {
current: Some(token),
..
}) => {
if let Some(token) = DeleteToken::from_slice(&token) {
return Ok(Err(AlreadyExists(token)));
} else {
prev = Some(token);
};
}
Err(CompareAndSwapError { current: None, .. }) => {
prev = None;
}
}
}
});
Ok(res)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError> {
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
let key = alias.to_bytes();
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
let Some(ivec) = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)) else {
return Ok(None);
};
opt.and_then(|ivec| DeleteToken::from_slice(&ivec))
.ok_or(SledError::Missing("alias -> delete-token"))
.map_err(RepoError::from)
let Some(token) = DeleteToken::from_slice(&ivec) else {
return Ok(None);
};
Ok(Some(token))
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]

View file

@ -28,13 +28,6 @@ impl StoreError {
pub(crate) const fn is_not_found(&self) -> bool {
matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_))
}
pub(crate) const fn is_missing(&self) -> bool {
match self {
Self::Repo(e) => e.is_missing(),
_ => false,
}
}
}
impl From<crate::store::file_store::FileError> for StoreError {