Move variant methods into variant repo trait

This commit is contained in:
asonix 2024-03-28 12:04:40 -05:00
parent 525deffd8d
commit 6e9239fa36
3 changed files with 245 additions and 226 deletions

View file

@ -103,6 +103,7 @@ pub(crate) trait FullRepo:
+ AliasRepo
+ QueueRepo
+ HashRepo
+ VariantRepo
+ StoreMigrationRepo
+ AliasAccessRepo
+ VariantAccessRepo
@ -653,20 +654,6 @@ pub(crate) trait HashRepo: BaseRepo {
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError>;
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError>;
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError>;
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError>;
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError>;
@ -726,6 +713,56 @@ where
T::identifier(self, hash).await
}
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
T::relate_blurhash(self, hash, blurhash).await
}
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::blurhash(self, hash).await
}
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &Arc<str>,
) -> Result<(), RepoError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::motion_identifier(self, hash).await
}
async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
T::cleanup_hash(self, hash).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait VariantRepo: BaseRepo {
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError>;
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError>;
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> VariantRepo for Arc<T>
where
T: VariantRepo,
{
async fn relate_variant_identifier(
&self,
hash: Hash,
@ -750,30 +787,6 @@ where
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::remove_variant(self, hash, variant).await
}
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
T::relate_blurhash(self, hash, blurhash).await
}
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::blurhash(self, hash).await
}
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &Arc<str>,
) -> Result<(), RepoError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::motion_identifier(self, hash).await
}
async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
T::cleanup_hash(self, hash).await
}
}
#[async_trait::async_trait(?Send)]

View file

@ -46,7 +46,7 @@ use super::{
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
#[derive(Clone)]
@ -863,110 +863,6 @@ impl HashRepo for PostgresRepo {
Ok(opt.map(Arc::from))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
input_identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants)
.values((
hash.eq(&input_hash),
variant.eq(&input_variant),
identifier.eq(input_identifier.as_ref()),
))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let opt = variants
.select(identifier)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.get_result::<String>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.map(Arc::from);
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let vec = variants
.select((variant, identifier))
.filter(hash.eq(&input_hash))
.get_results::<(String, String)>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?
.into_iter()
.map(|(s, i)| (s, Arc::from(i)))
.collect();
Ok(vec)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove_variant(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<(), RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(variants)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_blurhash(
&self,
@ -1083,6 +979,113 @@ impl HashRepo for PostgresRepo {
}
}
#[async_trait::async_trait(?Send)]
impl VariantRepo for PostgresRepo {
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
input_identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants)
.values((
hash.eq(&input_hash),
variant.eq(&input_variant),
identifier.eq(input_identifier.as_ref()),
))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let opt = variants
.select(identifier)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.get_result::<String>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.map(Arc::from);
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let vec = variants
.select((variant, identifier))
.filter(hash.eq(&input_hash))
.get_results::<(String, String)>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?
.into_iter()
.map(|(s, i)| (s, Arc::from(i)))
.collect();
Ok(vec)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove_variant(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<(), RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(variants)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl AliasRepo for PostgresRepo {
#[tracing::instrument(level = "debug", skip(self))]

View file

@ -25,7 +25,7 @@ use super::{
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
macro_rules! b {
@ -1331,88 +1331,6 @@ impl HashRepo for SledRepo {
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let value = identifier.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
crate::sync::spawn_blocking("sled-io", move || {
hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
.map(|res| res.map_err(|_| VariantAlreadyExists))
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(SledError::from)
.map_err(RepoError::from)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let opt = b!(
self.hash_variant_identifiers,
hash_variant_identifiers.get(key)
);
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
let hash = hash.to_ivec();
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
.scan_prefix(hash.clone())
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = try_into_arc_str(ivec).ok();
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
Ok(vec)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.remove(key)
);
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
b!(
@ -1528,6 +1446,91 @@ impl HashRepo for SledRepo {
}
}
#[async_trait::async_trait(?Send)]
impl VariantRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let value = identifier.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
crate::sync::spawn_blocking("sled-io", move || {
hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
.map(|res| res.map_err(|_| VariantAlreadyExists))
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(SledError::from)
.map_err(RepoError::from)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let opt = b!(
self.hash_variant_identifiers,
hash_variant_identifiers.get(key)
);
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
let hash = hash.to_ivec();
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
.scan_prefix(hash.clone())
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = try_into_arc_str(ivec).ok();
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
Ok(vec)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.remove(key)
);
Ok(())
}
}
fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec<u8> {
let mut v = hash.to_vec();
v.extend_from_slice(alias);