Rename some repo methods, generate UploadId in repo

This commit is contained in:
asonix 2023-09-01 18:41:04 -05:00
parent cbb66f1b75
commit fd74161c61
9 changed files with 39 additions and 26 deletions

View file

@ -42,7 +42,7 @@ where
let mut this = Self { let mut this = Self {
repo, repo,
identifier: None, identifier: None,
upload_id: Some(UploadId::generate()), upload_id: None,
}; };
this.do_proxy(store, stream).await?; this.do_proxy(store, stream).await?;
@ -54,9 +54,7 @@ where
where where
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static, P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{ {
self.repo self.upload_id = Some(self.repo.create_upload().await?);
.create_upload(self.upload_id.expect("Upload id exists"))
.await?;
let stream = let stream =
stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));

View file

@ -629,7 +629,7 @@ async fn page(
for hash in &page.hashes { for hash in &page.hashes {
let hex = hash.to_hex(); let hex = hash.to_hex();
let aliases = repo let aliases = repo
.for_hash(hash.clone()) .aliases_for_hash(hash.clone())
.await? .await?
.into_iter() .into_iter()
.map(|a| a.to_string()) .map(|a| a.to_string())

View file

@ -75,7 +75,7 @@ where
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let aliases = repo.for_hash(hash.clone()).await?; let aliases = repo.aliases_for_hash(hash.clone()).await?;
if !aliases.is_empty() { if !aliases.is_empty() {
for alias in aliases { for alias in aliases {
@ -127,7 +127,7 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E
return Ok(()); return Ok(());
}; };
if repo.for_hash(hash.clone()).await?.is_empty() { if repo.aliases_for_hash(hash.clone()).await?.is_empty() {
super::cleanup_hash(repo, hash).await?; super::cleanup_hash(repo, hash).await?;
} }

View file

@ -121,7 +121,7 @@ where
} }
}; };
repo.complete(upload_id, result).await?; repo.complete_upload(upload_id, result).await?;
Ok(()) Ok(())
} }

View file

@ -102,7 +102,7 @@ pub(crate) trait FullRepo:
return Ok(vec![]); return Ok(vec![]);
}; };
self.for_hash(hash).await self.aliases_for_hash(hash).await
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
@ -291,13 +291,17 @@ where
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo { pub(crate) trait UploadRepo: BaseRepo {
async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError>; async fn create_upload(&self) -> Result<UploadId, RepoError>;
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError>; async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError>;
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>; async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>;
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>; async fn complete_upload(
&self,
upload_id: UploadId,
result: UploadResult,
) -> Result<(), RepoError>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -305,8 +309,8 @@ impl<T> UploadRepo for Arc<T>
where where
T: UploadRepo, T: UploadRepo,
{ {
async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { async fn create_upload(&self) -> Result<UploadId, RepoError> {
T::create_upload(self, upload_id).await T::create_upload(self).await
} }
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> { async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
@ -317,8 +321,12 @@ where
T::claim(self, upload_id).await T::claim(self, upload_id).await
} }
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { async fn complete_upload(
T::complete(self, upload_id, result).await &self,
upload_id: UploadId,
result: UploadResult,
) -> Result<(), RepoError> {
T::complete_upload(self, upload_id, result).await
} }
} }
@ -727,7 +735,7 @@ pub(crate) trait AliasRepo: BaseRepo {
async fn hash(&self, alias: &Alias) -> Result<Option<Hash>, RepoError>; async fn hash(&self, alias: &Alias) -> Result<Option<Hash>, RepoError>;
async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError>; async fn aliases_for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError>;
async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError>; async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError>;
} }
@ -754,8 +762,8 @@ where
T::hash(self, alias).await T::hash(self, alias).await
} }
async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError> { async fn aliases_for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError> {
T::for_hash(self, hash).await T::aliases_for_hash(self, hash).await
} }
async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> { async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> {

View file

@ -220,7 +220,7 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) ->
} }
} }
for alias in old_repo.for_hash(hash.clone()).await? { for alias in old_repo.aliases_for_hash(hash.clone()).await? {
let delete_token = old_repo let delete_token = old_repo
.delete_token(&alias) .delete_token(&alias)
.await? .await?
@ -275,7 +275,7 @@ async fn do_migrate_hash_04<S: Store>(
let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?; let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?;
let aliases = old_repo.for_hash(old_hash.clone()).await?; let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?;
let variants = old_repo.variants::<S::Identifier>(old_hash.clone()).await?; let variants = old_repo.variants::<S::Identifier>(old_hash.clone()).await?;
let motion_identifier = old_repo let motion_identifier = old_repo
.motion_identifier::<S::Identifier>(old_hash.clone()) .motion_identifier::<S::Identifier>(old_hash.clone())

View file

@ -520,9 +520,12 @@ impl Drop for PopMetricsGuard {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo { impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { async fn create_upload(&self) -> Result<UploadId, RepoError> {
let upload_id = UploadId::generate();
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
Ok(())
Ok(upload_id)
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
@ -567,7 +570,11 @@ impl UploadRepo for SledRepo {
} }
#[tracing::instrument(level = "trace", skip(self, result))] #[tracing::instrument(level = "trace", skip(self, result))]
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { async fn complete_upload(
&self,
upload_id: UploadId,
result: UploadResult,
) -> Result<(), RepoError> {
let result: InnerUploadResult = result.into(); let result: InnerUploadResult = result.into();
let result = serde_json::to_vec(&result).map_err(SledError::from)?; let result = serde_json::to_vec(&result).map_err(SledError::from)?;
@ -1496,7 +1503,7 @@ impl AliasRepo for SledRepo {
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError> { async fn aliases_for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError> {
let hash = hash.to_ivec(); let hash = hash.to_ivec();
let v = b!(self.hash_aliases, { let v = b!(self.hash_aliases, {

View file

@ -77,5 +77,5 @@ pub(crate) trait HashRepo: BaseRepo {
pub(crate) trait AliasRepo: BaseRepo { pub(crate) trait AliasRepo: BaseRepo {
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError>; async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError>;
async fn for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError>; async fn aliases_for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError>;
} }

View file

@ -294,7 +294,7 @@ impl AliasRepo for SledRepo {
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> { async fn aliases_for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
let v = b!(self.hash_aliases, { let v = b!(self.hash_aliases, {
Ok(hash_aliases Ok(hash_aliases
.scan_prefix(hash) .scan_prefix(hash)