Add paged hash access

This commit is contained in:
asonix 2023-08-28 16:02:11 -05:00
parent e302ab1f3d
commit 24812afeba
4 changed files with 250 additions and 15 deletions

View file

@ -575,6 +575,80 @@ async fn do_download_backgrounded<S: Store + 'static>(
})))
}
#[derive(Debug, serde::Deserialize)]
struct PageQuery {
slug: Option<String>,
limit: Option<usize>,
}
#[derive(serde::Serialize)]
struct PageJson {
limit: usize,
#[serde(skip_serializing_if = "Option::is_none")]
prev: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
next: Option<String>,
hashes: Vec<HashJson>,
}
#[derive(serde::Serialize)]
struct HashJson {
hex: String,
aliases: Vec<String>,
details: Option<Details>,
}
/// Get a page of hashes
#[tracing::instrument(name = "Hash Page", skip(repo))]
async fn page(
repo: web::Data<ArcRepo>,
web::Query(PageQuery { slug, limit }): web::Query<PageQuery>,
) -> Result<HttpResponse, Error> {
let limit = limit.unwrap_or(20);
let page = repo.hash_page(slug, limit).await?;
let mut hashes = Vec::with_capacity(page.hashes.len());
for hash in &page.hashes {
let hex = hash.to_hex();
let aliases = repo
.for_hash(hash.clone())
.await?
.into_iter()
.map(|a| a.to_string())
.collect();
let identifier = repo.identifier(hash.clone()).await?;
let details = if let Some(identifier) = identifier {
repo.details(&identifier).await?
} else {
None
};
hashes.push(HashJson {
hex,
aliases,
details,
});
}
let page = PageJson {
limit: page.limit,
prev: page.prev(),
next: page.next(),
hashes,
};
Ok(HttpResponse::Ok().json(serde_json::json!({
"msg": "ok",
"page": page,
})))
}
/// Delete aliases and files
#[tracing::instrument(name = "Deleting file", skip(repo, config))]
async fn delete(
@ -1558,6 +1632,7 @@ fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
.service(web::resource("/aliases").route(web::get().to(aliases)))
.service(web::resource("/identifier").route(web::get().to(identifier::<S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found)))
.service(web::resource("/hashes").route(web::get().to(page)))
.configure(extra_config),
);
}

View file

@ -4,6 +4,7 @@ use crate::{
store::{Identifier, StoreError},
stream::LocalBoxStream,
};
use base64::Engine;
use std::{fmt::Debug, sync::Arc};
use url::Url;
use uuid::Uuid;
@ -499,12 +500,63 @@ where
}
}
#[derive(Clone)]
pub(crate) struct OrderedHash {
timestamp: time::OffsetDateTime,
hash: Hash,
}
pub(crate) struct HashPage {
pub(crate) limit: usize,
prev: Option<OrderedHash>,
next: Option<OrderedHash>,
pub(crate) hashes: Vec<Hash>,
}
fn ordered_hash_to_string(OrderedHash { timestamp, hash }: &OrderedHash) -> String {
let mut bytes: Vec<u8> = timestamp.unix_timestamp_nanos().to_be_bytes().into();
bytes.extend(hash.to_bytes());
base64::prelude::BASE64_URL_SAFE.encode(bytes)
}
fn ordered_hash_from_string(s: &str) -> Option<OrderedHash> {
let bytes = base64::prelude::BASE64_URL_SAFE.decode(s).ok()?;
let timestamp: [u8; 16] = bytes[0..16].try_into().ok()?;
let timestamp = i128::from_be_bytes(timestamp);
let timestamp = time::OffsetDateTime::from_unix_timestamp_nanos(timestamp).ok()?;
let hash = Hash::from_bytes(&bytes[16..])?;
Some(OrderedHash { timestamp, hash })
}
impl HashPage {
pub(crate) fn next(&self) -> Option<String> {
self.next.as_ref().map(ordered_hash_to_string)
}
pub(crate) fn prev(&self) -> Option<String> {
self.prev.as_ref().map(ordered_hash_to_string)
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo: BaseRepo {
async fn size(&self) -> Result<u64, RepoError>;
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>>;
async fn hash_page(&self, slug: Option<String>, limit: usize) -> Result<HashPage, RepoError> {
let bound = slug.as_deref().and_then(ordered_hash_from_string);
self.hashes_ordered(bound, limit).await
}
async fn hashes_ordered(
&self,
bound: Option<OrderedHash>,
limit: usize,
) -> Result<HashPage, RepoError>;
async fn create_hash(
&self,
hash: Hash,
@ -556,6 +608,14 @@ where
T::hashes(self).await
}
async fn hashes_ordered(
&self,
bound: Option<OrderedHash>,
limit: usize,
) -> Result<HashPage, RepoError> {
T::hashes_ordered(self, bound, limit).await
}
async fn create_hash(
&self,
hash: Hash,

View file

@ -26,6 +26,10 @@ impl Hash {
}
}
pub(crate) fn to_hex(&self) -> String {
hex::encode(self.to_bytes())
}
pub(super) fn to_bytes(&self) -> Vec<u8> {
let format = self.format.to_bytes();

View file

@ -1,11 +1,5 @@
use crate::{
details::MaybeHumanDate,
repo::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, DetailsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, JobId, ProxyRepo,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
VariantAccessRepo, VariantAlreadyExists,
},
serde_str::Serde,
store::StoreError,
stream::{from_iterator, LocalBoxStream},
@ -24,6 +18,13 @@ use tokio::sync::Notify;
use url::Url;
use uuid::Uuid;
use super::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, Identifier, JobId,
OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId,
UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists,
};
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
@ -64,6 +65,7 @@ pub(crate) struct SledRepo {
settings: Tree,
identifier_details: Tree,
hashes: Tree,
hashes_inverse: Tree,
hash_aliases: Tree,
hash_identifiers: Tree,
hash_variant_identifiers: Tree,
@ -102,6 +104,7 @@ impl SledRepo {
settings: db.open_tree("pict-rs-settings-tree")?,
identifier_details: db.open_tree("pict-rs-identifier-details-tree")?,
hashes: db.open_tree("pict-rs-hashes-tree")?,
hashes_inverse: db.open_tree("pict-rs-hashes-inverse-tree")?,
hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?,
hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?,
hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?,
@ -996,6 +999,32 @@ impl StoreMigrationRepo for SledRepo {
}
}
fn parse_ordered_hash(key: IVec) -> Option<OrderedHash> {
if key.len() < 16 {
return None;
}
let timestamp_bytes: [u8; 16] = key[0..16].try_into().ok()?;
let timestamp_i128 = i128::from_be_bytes(timestamp_bytes);
let timestamp = time::OffsetDateTime::from_unix_timestamp_nanos(timestamp_i128).ok()?;
let hash = Hash::from_bytes(&key[16..])?;
Some(OrderedHash { timestamp, hash })
}
fn serialize_ordered_hash(ordered_hash: &OrderedHash) -> IVec {
let mut bytes: Vec<u8> = ordered_hash
.timestamp
.unix_timestamp_nanos()
.to_be_bytes()
.into();
bytes.extend(ordered_hash.hash.to_bytes());
IVec::from(bytes)
}
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
async fn size(&self) -> Result<u64, RepoError> {
@ -1017,6 +1046,58 @@ impl HashRepo for SledRepo {
Box::pin(from_iterator(iter, 8))
}
async fn hashes_ordered(
&self,
bound: Option<OrderedHash>,
limit: usize,
) -> Result<HashPage, RepoError> {
let (page_iter, prev_iter) = match &bound {
Some(ordered_hash) => {
let hash_bytes = serialize_ordered_hash(ordered_hash);
(
self.hashes_inverse.range(..hash_bytes.clone()),
Some(self.hashes_inverse.range(hash_bytes..)),
)
}
None => (self.hashes_inverse.iter(), None),
};
actix_rt::task::spawn_blocking(move || {
let page_iter = page_iter
.keys()
.rev()
.filter_map(|res| res.map(parse_ordered_hash).transpose())
.take(limit);
let prev = prev_iter
.and_then(|prev_iter| {
prev_iter
.keys()
.filter_map(|res| res.map(parse_ordered_hash).transpose())
.take(limit)
.last()
})
.transpose()?;
let hashes = page_iter.collect::<Result<Vec<_>, _>>()?;
let next = hashes.last().cloned();
Ok(HashPage {
limit,
prev,
next,
hashes: hashes
.into_iter()
.map(|OrderedHash { hash, .. }| hash)
.collect(),
}) as Result<HashPage, SledError>
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(RepoError::from)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn create_hash(
&self,
@ -1026,21 +1107,30 @@ impl HashRepo for SledRepo {
let identifier: sled::IVec = identifier.to_bytes()?.into();
let hashes = self.hashes.clone();
let hashes_inverse = self.hashes_inverse.clone();
let hash_identifiers = self.hash_identifiers.clone();
let created_key = serialize_ordered_hash(&OrderedHash {
timestamp: time::OffsetDateTime::now_utc(),
hash: hash.clone(),
});
let hash = hash.to_ivec();
let res = actix_web::web::block(move || {
(&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| {
(&hashes, &hashes_inverse, &hash_identifiers).transaction(
|(hashes, hashes_inverse, hash_identifiers)| {
if hashes.get(hash.clone())?.is_some() {
return Ok(Err(HashAlreadyExists));
}
hashes.insert(hash.clone(), hash.clone())?;
hashes.insert(hash.clone(), created_key.clone())?;
hashes_inverse.insert(created_key.clone(), hash.clone())?;
hash_identifiers.insert(hash.clone(), &identifier)?;
Ok(Ok(()))
})
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;
@ -1198,6 +1288,7 @@ impl HashRepo for SledRepo {
let hash = hash.to_ivec();
let hashes = self.hashes.clone();
let hashes_inverse = self.hashes_inverse.clone();
let hash_identifiers = self.hash_identifiers.clone();
let hash_motion_identifiers = self.hash_motion_identifiers.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
@ -1216,6 +1307,7 @@ impl HashRepo for SledRepo {
let res = actix_web::web::block(move || {
(
&hashes,
&hashes_inverse,
&hash_identifiers,
&hash_motion_identifiers,
&hash_variant_identifiers,
@ -1223,11 +1315,15 @@ impl HashRepo for SledRepo {
.transaction(
|(
hashes,
hashes_inverse,
hash_identifiers,
hash_motion_identifiers,
hash_variant_identifiers,
)| {
hashes.remove(&hash)?;
if let Some(value) = hashes.remove(&hash)? {
hashes_inverse.remove(value)?;
}
hash_identifiers.remove(&hash)?;
hash_motion_identifiers.remove(&hash)?;