diff --git a/src/repo.rs b/src/repo.rs index ed87b7a..b659e3d 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -103,6 +103,7 @@ pub(crate) trait FullRepo: + AliasRepo + QueueRepo + HashRepo + + VariantRepo + StoreMigrationRepo + AliasAccessRepo + VariantAccessRepo @@ -653,20 +654,6 @@ pub(crate) trait HashRepo: BaseRepo { async fn identifier(&self, hash: Hash) -> Result>, RepoError>; - async fn relate_variant_identifier( - &self, - hash: Hash, - variant: String, - identifier: &Arc, - ) -> Result, RepoError>; - async fn variant_identifier( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError>; - async fn variants(&self, hash: Hash) -> Result)>, RepoError>; - async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError>; async fn blurhash(&self, hash: Hash) -> Result>, RepoError>; @@ -726,6 +713,56 @@ where T::identifier(self, hash).await } + async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError> { + T::relate_blurhash(self, hash, blurhash).await + } + + async fn blurhash(&self, hash: Hash) -> Result>, RepoError> { + T::blurhash(self, hash).await + } + + async fn relate_motion_identifier( + &self, + hash: Hash, + identifier: &Arc, + ) -> Result<(), RepoError> { + T::relate_motion_identifier(self, hash, identifier).await + } + + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { + T::motion_identifier(self, hash).await + } + + async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { + T::cleanup_hash(self, hash).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait VariantRepo: BaseRepo { + async fn relate_variant_identifier( + &self, + hash: Hash, + variant: String, + identifier: &Arc, + ) -> Result, RepoError>; + + async fn variant_identifier( + &self, + hash: Hash, + variant: String, + ) -> Result>, RepoError>; + + async fn variants(&self, hash: Hash) -> Result)>, RepoError>; + + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl VariantRepo for Arc +where + T: VariantRepo, +{ async fn relate_variant_identifier( &self, hash: Hash, @@ -750,30 +787,6 @@ where async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::remove_variant(self, hash, variant).await } - - async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError> { - T::relate_blurhash(self, hash, blurhash).await - } - - async fn blurhash(&self, hash: Hash) -> Result>, RepoError> { - T::blurhash(self, hash).await - } - - async fn relate_motion_identifier( - &self, - hash: Hash, - identifier: &Arc, - ) -> Result<(), RepoError> { - T::relate_motion_identifier(self, hash, identifier).await - } - - async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { - T::motion_identifier(self, hash).await - } - - async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { - T::cleanup_hash(self, hash).await - } } #[async_trait::async_trait(?Send)] diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index faa5792..a14c5b2 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -46,7 +46,7 @@ use super::{ Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, + UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; #[derive(Clone)] @@ -863,110 +863,6 @@ impl HashRepo for PostgresRepo { Ok(opt.map(Arc::from)) } - #[tracing::instrument(level = "debug", skip(self))] - async fn relate_variant_identifier( - &self, - input_hash: Hash, - input_variant: String, - input_identifier: &Arc, - ) -> Result, RepoError> { - use schema::variants::dsl::*; - - let mut conn = self.get_connection().await?; - - let res = diesel::insert_into(variants) - .values(( - hash.eq(&input_hash), - variant.eq(&input_variant), - identifier.eq(input_identifier.as_ref()), - )) - .execute(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)?; - - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(VariantAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), - } - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn variant_identifier( - &self, - input_hash: Hash, - input_variant: String, - ) -> Result>, RepoError> { - use schema::variants::dsl::*; - - let mut conn = self.get_connection().await?; - - let opt = variants - .select(identifier) - .filter(hash.eq(&input_hash)) - .filter(variant.eq(&input_variant)) - .get_result::(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .optional() - .map_err(PostgresError::Diesel)? - .map(Arc::from); - - Ok(opt) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { - use schema::variants::dsl::*; - - let mut conn = self.get_connection().await?; - - let vec = variants - .select((variant, identifier)) - .filter(hash.eq(&input_hash)) - .get_results::<(String, String)>(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)? - .into_iter() - .map(|(s, i)| (s, Arc::from(i))) - .collect(); - - Ok(vec) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn remove_variant( - &self, - input_hash: Hash, - input_variant: String, - ) -> Result<(), RepoError> { - use schema::variants::dsl::*; - - let mut conn = self.get_connection().await?; - - diesel::delete(variants) - .filter(hash.eq(&input_hash)) - .filter(variant.eq(&input_variant)) - .execute(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; - - Ok(()) - } - #[tracing::instrument(level = "debug", skip(self))] async fn relate_blurhash( &self, @@ -1083,6 +979,113 @@ impl HashRepo for PostgresRepo { } } +#[async_trait::async_trait(?Send)] +impl VariantRepo for PostgresRepo { + #[tracing::instrument(level = "debug", skip(self))] + async fn relate_variant_identifier( + &self, + input_hash: Hash, + input_variant: String, + input_identifier: &Arc, + ) -> Result, RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.get_connection().await?; + + let res = diesel::insert_into(variants) + .values(( + hash.eq(&input_hash), + variant.eq(&input_variant), + identifier.eq(input_identifier.as_ref()), + )) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)?; + + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(VariantAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn variant_identifier( + &self, + input_hash: Hash, + input_variant: String, + ) -> Result>, RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.get_connection().await?; + + let opt = variants + .select(identifier) + .filter(hash.eq(&input_hash)) + .filter(variant.eq(&input_variant)) + .get_result::(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .optional() + .map_err(PostgresError::Diesel)? + .map(Arc::from); + + Ok(opt) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.get_connection().await?; + + let vec = variants + .select((variant, identifier)) + .filter(hash.eq(&input_hash)) + .get_results::<(String, String)>(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)? + .into_iter() + .map(|(s, i)| (s, Arc::from(i))) + .collect(); + + Ok(vec) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn remove_variant( + &self, + input_hash: Hash, + input_variant: String, + ) -> Result<(), RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.get_connection().await?; + + diesel::delete(variants) + .filter(hash.eq(&input_hash)) + .filter(variant.eq(&input_variant)) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + #[async_trait::async_trait(?Send)] impl AliasRepo for PostgresRepo { #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 6042116..2555438 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -25,7 +25,7 @@ use super::{ Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, + UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; macro_rules! b { @@ -1331,88 +1331,6 @@ impl HashRepo for SledRepo { Ok(opt.map(try_into_arc_str).transpose()?) } - #[tracing::instrument(level = "trace", skip(self))] - async fn relate_variant_identifier( - &self, - hash: Hash, - variant: String, - identifier: &Arc, - ) -> Result, RepoError> { - let hash = hash.to_bytes(); - - let key = variant_key(&hash, &variant); - let value = identifier.clone(); - - let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - - crate::sync::spawn_blocking("sled-io", move || { - hash_variant_identifiers - .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) - .map(|res| res.map_err(|_| VariantAlreadyExists)) - }) - .await - .map_err(|_| RepoError::Canceled)? - .map_err(SledError::from) - .map_err(RepoError::from) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn variant_identifier( - &self, - hash: Hash, - variant: String, - ) -> Result>, RepoError> { - let hash = hash.to_bytes(); - - let key = variant_key(&hash, &variant); - - let opt = b!( - self.hash_variant_identifiers, - hash_variant_identifiers.get(key) - ); - - Ok(opt.map(try_into_arc_str).transpose()?) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn variants(&self, hash: Hash) -> Result)>, RepoError> { - let hash = hash.to_ivec(); - - let vec = b!( - self.hash_variant_identifiers, - Ok(hash_variant_identifiers - .scan_prefix(hash.clone()) - .filter_map(|res| res.ok()) - .filter_map(|(key, ivec)| { - let identifier = try_into_arc_str(ivec).ok(); - - let variant = variant_from_key(&hash, &key); - if variant.is_none() { - tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); - } - - Some((variant?, identifier?)) - }) - .collect::>()) as Result, SledError> - ); - - Ok(vec) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - let hash = hash.to_bytes(); - - let key = variant_key(&hash, &variant); - - b!( - self.hash_variant_identifiers, - hash_variant_identifiers.remove(key) - ); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self))] async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError> { b!( @@ -1528,6 +1446,91 @@ impl HashRepo for SledRepo { } } +#[async_trait::async_trait(?Send)] +impl VariantRepo for SledRepo { + #[tracing::instrument(level = "trace", skip(self))] + async fn relate_variant_identifier( + &self, + hash: Hash, + variant: String, + identifier: &Arc, + ) -> Result, RepoError> { + let hash = hash.to_bytes(); + + let key = variant_key(&hash, &variant); + let value = identifier.clone(); + + let hash_variant_identifiers = self.hash_variant_identifiers.clone(); + + crate::sync::spawn_blocking("sled-io", move || { + hash_variant_identifiers + .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) + .map(|res| res.map_err(|_| VariantAlreadyExists)) + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(SledError::from) + .map_err(RepoError::from) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn variant_identifier( + &self, + hash: Hash, + variant: String, + ) -> Result>, RepoError> { + let hash = hash.to_bytes(); + + let key = variant_key(&hash, &variant); + + let opt = b!( + self.hash_variant_identifiers, + hash_variant_identifiers.get(key) + ); + + Ok(opt.map(try_into_arc_str).transpose()?) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { + let hash = hash.to_ivec(); + + let vec = b!( + self.hash_variant_identifiers, + Ok(hash_variant_identifiers + .scan_prefix(hash.clone()) + .filter_map(|res| res.ok()) + .filter_map(|(key, ivec)| { + let identifier = try_into_arc_str(ivec).ok(); + + let variant = variant_from_key(&hash, &key); + if variant.is_none() { + tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); + } + + Some((variant?, identifier?)) + }) + .collect::>()) as Result, SledError> + ); + + Ok(vec) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); + + let key = variant_key(&hash, &variant); + + b!( + self.hash_variant_identifiers, + hash_variant_identifiers.remove(key) + ); + + Ok(()) + } +} + fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec { let mut v = hash.to_vec(); v.extend_from_slice(alias);