Add internal endpoint for cleaning variants

This commit is contained in:
Aode (Lion) 2022-04-11 16:56:39 -05:00
parent 25b588154c
commit b3cbda1337
5 changed files with 77 additions and 4 deletions

View file

@ -603,6 +603,12 @@ where
.streaming(stream)
}
#[instrument(name = "Spawning variant cleanup", skip(repo))]
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> {
queue::cleanup_all_variants(&**repo).await?;
Ok(HttpResponse::NoContent().finish())
}
#[derive(Debug, serde::Deserialize)]
struct AliasQuery {
alias: Serde<Alias>,
@ -840,6 +846,9 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
.wrap(import_form.clone())
.route(web::post().to(upload::<R, S>)),
)
.service(
web::resource("/variants").route(web::delete().to(clean_variants::<R>)),
)
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>))),
)

View file

@ -28,6 +28,10 @@ enum Cleanup {
alias: Serde<Alias>,
token: Serde<DeleteToken>,
},
Variant {
hash: Vec<u8>,
},
AllVariants,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
@ -79,6 +83,20 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
Ok(())
}
async fn cleanup_variants<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Variant {
hash: hash.as_ref().to_vec(),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_all_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::AllVariants)?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
identifier: Vec<u8>,

View file

@ -5,6 +5,7 @@ use crate::{
serde_str::Serde,
store::{Identifier, Store},
};
use futures_util::StreamExt;
use tracing::error;
pub(super) fn perform<'a, R, S>(
@ -22,7 +23,7 @@ where
Cleanup::Hash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::Identifier {
identifier: in_identifier,
} => identifier(repo, &store, in_identifier).await?,
} => identifier(repo, store, in_identifier).await?,
Cleanup::Alias {
alias: stored_alias,
token,
@ -34,6 +35,8 @@ where
)
.await?
}
Cleanup::Variant { hash } => variant::<R, S>(repo, hash).await?,
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
@ -87,7 +90,7 @@ where
if !aliases.is_empty() {
for alias in aliases {
let token = repo.delete_token(&alias).await?;
crate::queue::cleanup_alias(repo, alias, token).await?;
super::cleanup_alias(repo, alias, token).await?;
}
// Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned
return Ok(());
@ -103,7 +106,7 @@ where
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
let _ = crate::queue::cleanup_identifier(repo, identifier).await;
let _ = super::cleanup_identifier(repo, identifier).await;
}
HashRepo::cleanup(repo, hash).await?;
@ -126,7 +129,37 @@ where
repo.remove_alias(hash.clone(), &alias).await?;
if repo.aliases(hash.clone()).await?.is_empty() {
crate::queue::cleanup_hash(repo, hash).await?;
super::cleanup_hash(repo, hash).await?;
}
Ok(())
}
async fn all_variants<R, S>(repo: &R) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
let mut hash_stream = Box::pin(repo.hashes().await);
while let Some(res) = hash_stream.next().await {
let hash = res?;
super::cleanup_variants(repo, hash).await?;
}
Ok(())
}
async fn variant<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
let hash: R::Bytes = hash.into();
for (variant, identifier) in repo.variants::<S::Identifier>(hash.clone()).await? {
repo.remove_variant(hash.clone(), variant).await?;
super::cleanup_identifier(repo, identifier).await?;
}
Ok(())

View file

@ -183,6 +183,7 @@ pub(crate) trait HashRepo: BaseRepo {
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Error>;
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error>;
async fn relate_motion_identifier<I: Identifier>(
&self,

View file

@ -689,6 +689,18 @@ impl HashRepo for SledRepo {
Ok(vec)
}
#[tracing::instrument]
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> {
let key = variant_key(&hash, &variant);
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.remove(key)
);
Ok(())
}
#[tracing::instrument]
async fn relate_motion_identifier<I: Identifier>(
&self,