mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2025-01-22 09:18:08 +00:00
Make it compile
This commit is contained in:
parent
3129f7844e
commit
da876fd553
12 changed files with 203 additions and 132 deletions
|
@ -1,6 +1,7 @@
|
|||
use crate::{
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
repo::Hash,
|
||||
};
|
||||
use actix_web::web;
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
|
@ -16,7 +17,7 @@ use tracing::Span;
|
|||
|
||||
type OutcomeReceiver = Receiver<(Details, web::Bytes)>;
|
||||
|
||||
type ProcessMapKey = (Vec<u8>, PathBuf);
|
||||
type ProcessMapKey = (Hash, PathBuf);
|
||||
|
||||
type ProcessMapInner = DashMap<ProcessMapKey, OutcomeReceiver>;
|
||||
|
||||
|
@ -32,14 +33,14 @@ impl ProcessMap {
|
|||
|
||||
pub(super) async fn process<Fut>(
|
||||
&self,
|
||||
hash: &[u8],
|
||||
hash: Hash,
|
||||
path: PathBuf,
|
||||
fut: Fut,
|
||||
) -> Result<(Details, web::Bytes), Error>
|
||||
where
|
||||
Fut: Future<Output = Result<(Details, web::Bytes), Error>>,
|
||||
{
|
||||
let key = (hash.to_vec(), path.clone());
|
||||
let key = (hash.clone(), path.clone());
|
||||
|
||||
let (sender, receiver) = flume::bounded(1);
|
||||
|
||||
|
@ -51,8 +52,8 @@ impl ProcessMap {
|
|||
|
||||
let span = tracing::info_span!(
|
||||
"Processing image",
|
||||
hash = &tracing::field::debug(&hex::encode(hash)),
|
||||
path = &tracing::field::debug(&path),
|
||||
hash = ?hash,
|
||||
path = ?path,
|
||||
completed = &tracing::field::Empty,
|
||||
);
|
||||
|
||||
|
@ -63,8 +64,8 @@ impl ProcessMap {
|
|||
Entry::Occupied(receiver) => {
|
||||
let span = tracing::info_span!(
|
||||
"Waiting for processed image",
|
||||
hash = &tracing::field::debug(&hex::encode(hash)),
|
||||
path = &tracing::field::debug(&path),
|
||||
hash = ?hash,
|
||||
path = ?path,
|
||||
);
|
||||
|
||||
let receiver = receiver.get().clone().into_recv_async();
|
||||
|
|
|
@ -25,10 +25,12 @@ pub(crate) enum InputFile {
|
|||
Video(VideoFormat),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(
|
||||
Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
|
||||
)]
|
||||
pub(crate) enum InternalFormat {
|
||||
Image(ImageFormat),
|
||||
Animation(AnimationFormat),
|
||||
Image(ImageFormat),
|
||||
Video(InternalVideoFormat),
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
error::{Error, UploadError},
|
||||
ffmpeg::ThumbnailFormat,
|
||||
formats::{InputProcessableFormat, InternalVideoFormat},
|
||||
repo::{Alias, FullRepo},
|
||||
repo::{Alias, FullRepo, Hash},
|
||||
store::Store,
|
||||
};
|
||||
use actix_web::web::Bytes;
|
||||
|
@ -51,7 +51,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
|
|||
input_format: Option<InternalVideoFormat>,
|
||||
thumbnail_format: Option<ThumbnailFormat>,
|
||||
media: &crate::config::Media,
|
||||
hash: R::Bytes,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
let process_fut = process(
|
||||
repo,
|
||||
|
@ -67,7 +67,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
|
|||
);
|
||||
|
||||
let (details, bytes) = process_map
|
||||
.process(hash.as_ref(), thumbnail_path, process_fut)
|
||||
.process(hash, thumbnail_path, process_fut)
|
||||
.await?;
|
||||
|
||||
Ok((details, bytes))
|
||||
|
@ -85,7 +85,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
|||
input_format: Option<InternalVideoFormat>,
|
||||
thumbnail_format: Option<ThumbnailFormat>,
|
||||
media: &crate::config::Media,
|
||||
hash: R::Bytes,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
let guard = MetricsGuard::guard();
|
||||
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
|
||||
|
|
|
@ -3,12 +3,11 @@ use crate::{
|
|||
either::Either,
|
||||
error::{Error, UploadError},
|
||||
formats::{InternalFormat, Validations},
|
||||
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
|
||||
repo::{Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo},
|
||||
store::Store,
|
||||
};
|
||||
use actix_web::web::Bytes;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{Instrument, Span};
|
||||
|
||||
mod hasher;
|
||||
|
@ -22,7 +21,7 @@ where
|
|||
{
|
||||
repo: R,
|
||||
delete_token: DeleteToken,
|
||||
hash: Option<Vec<u8>>,
|
||||
hash: Option<Hash>,
|
||||
alias: Option<Alias>,
|
||||
identifier: Option<S::Identifier>,
|
||||
}
|
||||
|
@ -97,8 +96,8 @@ where
|
|||
Either::right(validated_reader)
|
||||
};
|
||||
|
||||
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
|
||||
let hasher = hasher_reader.hasher();
|
||||
let hasher_reader = Hasher::new(processed_reader);
|
||||
let state = hasher_reader.state();
|
||||
|
||||
let identifier = store
|
||||
.save_async_read(hasher_reader, input_type.media_type())
|
||||
|
@ -114,14 +113,16 @@ where
|
|||
identifier: Some(identifier.clone()),
|
||||
};
|
||||
|
||||
let hash = hasher.borrow_mut().finalize_reset().to_vec();
|
||||
let (hash, size) = state.borrow_mut().finalize_reset();
|
||||
|
||||
save_upload(&mut session, repo, store, &hash, &identifier).await?;
|
||||
let hash = Hash::new(hash, size, input_type);
|
||||
|
||||
save_upload(&mut session, repo, store, hash.clone(), &identifier).await?;
|
||||
|
||||
if let Some(alias) = declared_alias {
|
||||
session.add_existing_alias(&hash, alias).await?
|
||||
session.add_existing_alias(hash, alias).await?
|
||||
} else {
|
||||
session.create_alias(&hash, input_type).await?
|
||||
session.create_alias(hash, input_type).await?
|
||||
};
|
||||
|
||||
Ok(session)
|
||||
|
@ -132,14 +133,14 @@ async fn save_upload<R, S>(
|
|||
session: &mut Session<R, S>,
|
||||
repo: &R,
|
||||
store: &S,
|
||||
hash: &[u8],
|
||||
hash: Hash,
|
||||
identifier: &S::Identifier,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
R: FullRepo,
|
||||
{
|
||||
if HashRepo::create(repo, hash.to_vec().into(), identifier)
|
||||
if HashRepo::create(repo, hash.clone(), identifier)
|
||||
.await?
|
||||
.is_err()
|
||||
{
|
||||
|
@ -150,7 +151,7 @@ where
|
|||
}
|
||||
|
||||
// Set hash after upload uniquness check so we don't clean existing files on failure
|
||||
session.hash = Some(Vec::from(hash));
|
||||
session.hash = Some(hash);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -177,10 +178,8 @@ where
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip(self, hash))]
|
||||
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> {
|
||||
let hash: R::Bytes = hash.to_vec().into();
|
||||
|
||||
AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone())
|
||||
async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> {
|
||||
AliasRepo::create(&self.repo, &alias, &self.delete_token, hash)
|
||||
.await?
|
||||
.map_err(|_| UploadError::DuplicateAlias)?;
|
||||
|
||||
|
@ -190,9 +189,7 @@ where
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, hash))]
|
||||
async fn create_alias(&mut self, hash: &[u8], input_type: InternalFormat) -> Result<(), Error> {
|
||||
let hash: R::Bytes = hash.to_vec().into();
|
||||
|
||||
async fn create_alias(&mut self, hash: Hash, input_type: InternalFormat) -> Result<(), Error> {
|
||||
loop {
|
||||
let alias = Alias::generate(input_type.file_extension().to_string());
|
||||
|
||||
|
@ -232,7 +229,7 @@ where
|
|||
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
||||
actix_rt::spawn(
|
||||
async move {
|
||||
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
|
||||
let _ = crate::queue::cleanup_hash(&repo, hash).await;
|
||||
}
|
||||
.instrument(cleanup_span),
|
||||
)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use sha2::{digest::FixedOutputReset, Digest};
|
||||
use sha2::{digest::FixedOutputReset, Digest, Sha256};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
pin::Pin,
|
||||
|
@ -7,43 +7,47 @@ use std::{
|
|||
};
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
|
||||
struct State<D> {
|
||||
hasher: D,
|
||||
pub(super) struct State {
|
||||
hasher: Sha256,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub(crate) struct Hasher<I, D> {
|
||||
pub(crate) struct Hasher<I> {
|
||||
#[pin]
|
||||
inner: I,
|
||||
|
||||
state: Rc<RefCell<State<D>>>,
|
||||
state: Rc<RefCell<State>>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, D> Hasher<I, D>
|
||||
where
|
||||
D: Digest + FixedOutputReset + Send + 'static,
|
||||
{
|
||||
pub(super) fn new(reader: I, digest: D) -> Self {
|
||||
impl<I> Hasher<I> {
|
||||
pub(super) fn new(reader: I) -> Self {
|
||||
Hasher {
|
||||
inner: reader,
|
||||
state: Rc::new(RefCell::new(State {
|
||||
hasher: digest,
|
||||
hasher: Sha256::new(),
|
||||
size: 0,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn state(&self) -> Rc<RefCell<State<D>>> {
|
||||
pub(super) fn state(&self) -> Rc<RefCell<State>> {
|
||||
Rc::clone(&self.state)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, D> AsyncRead for Hasher<I, D>
|
||||
impl State {
|
||||
pub(super) fn finalize_reset(&mut self) -> ([u8; 32], u64) {
|
||||
let arr = self.hasher.finalize_fixed_reset().into();
|
||||
|
||||
(arr, self.size)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> AsyncRead for Hasher<I>
|
||||
where
|
||||
I: AsyncRead,
|
||||
D: Digest,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
|
@ -94,24 +98,26 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn hasher_works() {
|
||||
let hash = test_on_arbiter!(async move {
|
||||
let (hash, size) = test_on_arbiter!(async move {
|
||||
let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
|
||||
|
||||
let mut reader = Hasher::new(file1, Sha256::new());
|
||||
let mut reader = Hasher::new(file1);
|
||||
|
||||
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
|
||||
|
||||
Ok(reader.state().borrow_mut().hasher.finalize_reset().to_vec()) as std::io::Result<_>
|
||||
Ok(reader.state().borrow_mut().finalize_reset()) as std::io::Result<_>
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap();
|
||||
let mut vec = Vec::new();
|
||||
file.read_to_end(&mut vec).unwrap();
|
||||
let correct_size = vec.len() as u64;
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(vec);
|
||||
let correct_hash = hasher.finalize_reset().to_vec();
|
||||
let correct_hash: [u8; 32] = hasher.finalize_reset().into();
|
||||
|
||||
assert_eq!(hash, correct_hash);
|
||||
assert_eq!(size, correct_size);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,8 +68,8 @@ use self::{
|
|||
migrate_store::migrate_store,
|
||||
queue::queue_generate,
|
||||
repo::{
|
||||
sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo,
|
||||
Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo,
|
||||
sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, Hash, HashRepo,
|
||||
IdentifierRepo, Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo,
|
||||
},
|
||||
serde_str::Serde,
|
||||
store::{
|
||||
|
@ -696,7 +696,7 @@ async fn process_details<R: FullRepo, S: Store>(
|
|||
Ok(HttpResponse::Ok().json(&details))
|
||||
}
|
||||
|
||||
async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, R::Bytes)>, Error> {
|
||||
async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, Hash)>, Error> {
|
||||
let Some(not_found) = repo.get(NOT_FOUND_KEY).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
@ -1115,8 +1115,7 @@ async fn do_serve<R: FullRepo, S: Store + 'static>(
|
|||
|
||||
let Some(identifier) = repo.identifier(hash.clone()).await? else {
|
||||
tracing::warn!(
|
||||
"Original File identifier for hash {} is missing, queue cleanup task",
|
||||
hex::encode(&hash)
|
||||
"Original File identifier for hash {hash:?} is missing, queue cleanup task",
|
||||
);
|
||||
crate::queue::cleanup_hash(&repo, hash).await?;
|
||||
return Ok(HttpResponse::NotFound().finish());
|
||||
|
|
|
@ -8,7 +8,7 @@ use std::{
|
|||
use crate::{
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo},
|
||||
repo::{Hash, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo},
|
||||
store::{Identifier, Store},
|
||||
};
|
||||
|
||||
|
@ -125,7 +125,7 @@ where
|
|||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
while let Some(hash) = stream.next().await {
|
||||
let hash = hash?.as_ref().to_vec();
|
||||
let hash = hash?;
|
||||
|
||||
if joinset.len() >= 32 {
|
||||
if let Some(res) = joinset.join_next().await {
|
||||
|
@ -149,11 +149,8 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))]
|
||||
async fn migrate_hash<R, S1, S2>(
|
||||
state: &MigrateState<R, S1, S2>,
|
||||
hash: Vec<u8>,
|
||||
) -> Result<(), Error>
|
||||
#[tracing::instrument(skip(state))]
|
||||
async fn migrate_hash<R, S1, S2>(state: &MigrateState<R, S1, S2>, hash: Hash) -> Result<(), Error>
|
||||
where
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
|
@ -175,14 +172,13 @@ where
|
|||
|
||||
let current_index = index.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let original_identifier = match repo.identifier(hash.clone().into()).await {
|
||||
let original_identifier = match repo.identifier(hash.clone()).await {
|
||||
Ok(Some(identifier)) => identifier,
|
||||
Ok(None) => {
|
||||
tracing::warn!(
|
||||
"Original File identifier for hash {} is missing, queue cleanup task",
|
||||
hex::encode(&hash)
|
||||
"Original File identifier for hash {hash:?} is missing, queue cleanup task",
|
||||
);
|
||||
crate::queue::cleanup_hash(repo, hash.clone().into()).await?;
|
||||
crate::queue::cleanup_hash(repo, hash.clone()).await?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
|
@ -221,24 +217,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
|
||||
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
|
||||
repo.relate_motion_identifier(hash.clone(), &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||
tracing::warn!("Skipping motion file for hash {hash:?}");
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error generating details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
tracing::warn!("Error generating details for motion file for hash {hash:?}");
|
||||
return Err(e);
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
|
@ -253,30 +246,22 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
for (variant, identifier) in repo.variants(hash.clone().into()).await? {
|
||||
for (variant, identifier) in repo.variants(hash.clone()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.remove_variant(hash.clone().into(), variant.clone())
|
||||
.await?;
|
||||
repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier)
|
||||
repo.remove_variant(hash.clone(), variant.clone()).await?;
|
||||
repo.relate_variant_identifier(hash.clone(), variant, &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!(
|
||||
"Skipping variant {} for hash {}",
|
||||
variant,
|
||||
hex::encode(&hash)
|
||||
);
|
||||
tracing::warn!("Skipping variant {variant} for hash {hash:?}",);
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error generating details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
tracing::warn!("Error generating details for motion file for hash {hash:?}",);
|
||||
return Err(e);
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
|
@ -303,19 +288,16 @@ where
|
|||
{
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &original_identifier, &new_identifier).await?;
|
||||
repo.update_identifier(hash.clone().into(), &new_identifier)
|
||||
repo.update_identifier(hash.clone(), &new_identifier)
|
||||
.await?;
|
||||
repo.mark_migrated(&original_identifier, &new_identifier)
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||
tracing::warn!("Skipping original file for hash {hash:?}");
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error generating details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
tracing::warn!("Error generating details for motion file for hash {hash:?}",);
|
||||
return Err(e);
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
|
|
19
src/queue.rs
19
src/queue.rs
|
@ -4,7 +4,7 @@ use crate::{
|
|||
error::Error,
|
||||
formats::InputProcessableFormat,
|
||||
repo::{
|
||||
Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, JobId, QueueRepo,
|
||||
Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, JobId, QueueRepo,
|
||||
UploadId,
|
||||
},
|
||||
serde_str::Serde,
|
||||
|
@ -54,7 +54,7 @@ const PROCESS_QUEUE: &str = "process";
|
|||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
enum Cleanup {
|
||||
Hash {
|
||||
hash: Base64Bytes,
|
||||
hash: Hash,
|
||||
},
|
||||
Identifier {
|
||||
identifier: Base64Bytes,
|
||||
|
@ -64,7 +64,7 @@ enum Cleanup {
|
|||
token: Serde<DeleteToken>,
|
||||
},
|
||||
Variant {
|
||||
hash: Base64Bytes,
|
||||
hash: Hash,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
variant: Option<String>,
|
||||
},
|
||||
|
@ -101,10 +101,8 @@ pub(crate) async fn cleanup_alias<R: QueueRepo>(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
|
||||
let job = serde_json::to_vec(&Cleanup::Hash {
|
||||
hash: Base64Bytes(hash.as_ref().to_vec()),
|
||||
})?;
|
||||
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: Hash) -> Result<(), Error> {
|
||||
let job = serde_json::to_vec(&Cleanup::Hash { hash })?;
|
||||
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -122,13 +120,10 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
|
|||
|
||||
async fn cleanup_variants<R: QueueRepo>(
|
||||
repo: &R,
|
||||
hash: R::Bytes,
|
||||
hash: Hash,
|
||||
variant: Option<String>,
|
||||
) -> Result<(), Error> {
|
||||
let job = serde_json::to_vec(&Cleanup::Variant {
|
||||
hash: Base64Bytes(hash.as_ref().to_vec()),
|
||||
variant,
|
||||
})?;
|
||||
let job = serde_json::to_vec(&Cleanup::Variant { hash, variant })?;
|
||||
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
|||
error::{Error, UploadError},
|
||||
queue::{Base64Bytes, Cleanup, LocalBoxFuture},
|
||||
repo::{
|
||||
Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo,
|
||||
Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo,
|
||||
VariantAccessRepo,
|
||||
},
|
||||
serde_str::Serde,
|
||||
|
@ -24,9 +24,7 @@ where
|
|||
Box::pin(async move {
|
||||
match serde_json::from_slice(job) {
|
||||
Ok(job) => match job {
|
||||
Cleanup::Hash {
|
||||
hash: Base64Bytes(in_hash),
|
||||
} => hash::<R, S>(repo, in_hash).await?,
|
||||
Cleanup::Hash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
|
||||
Cleanup::Identifier {
|
||||
identifier: Base64Bytes(in_identifier),
|
||||
} => identifier(repo, store, in_identifier).await?,
|
||||
|
@ -41,10 +39,9 @@ where
|
|||
)
|
||||
.await?
|
||||
}
|
||||
Cleanup::Variant {
|
||||
hash: Base64Bytes(hash),
|
||||
variant,
|
||||
} => hash_variant::<R, S>(repo, hash, variant).await?,
|
||||
Cleanup::Variant { hash, variant } => {
|
||||
hash_variant::<R, S>(repo, hash, variant).await?
|
||||
}
|
||||
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
|
||||
Cleanup::OutdatedVariants => outdated_variants::<R, S>(repo, configuration).await?,
|
||||
Cleanup::OutdatedProxies => outdated_proxies::<R, S>(repo, configuration).await?,
|
||||
|
@ -89,13 +86,11 @@ where
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
|
||||
async fn hash<R, S>(repo: &R, hash: Hash) -> Result<(), Error>
|
||||
where
|
||||
R: FullRepo,
|
||||
S: Store,
|
||||
{
|
||||
let hash: R::Bytes = hash.into();
|
||||
|
||||
let aliases = repo.for_hash(hash.clone()).await?;
|
||||
|
||||
if !aliases.is_empty() {
|
||||
|
@ -221,15 +216,13 @@ where
|
|||
#[tracing::instrument(skip_all)]
|
||||
async fn hash_variant<R, S>(
|
||||
repo: &R,
|
||||
hash: Vec<u8>,
|
||||
hash: Hash,
|
||||
target_variant: Option<String>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
R: FullRepo,
|
||||
S: Store,
|
||||
{
|
||||
let hash: R::Bytes = hash.into();
|
||||
|
||||
if let Some(target_variant) = target_variant {
|
||||
if let Some(identifier) = repo
|
||||
.variant_identifier::<S::Identifier>(hash.clone(), target_variant.clone())
|
||||
|
|
|
@ -208,7 +208,7 @@ where
|
|||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
pub(crate) trait VariantAccessRepo: BaseRepo {
|
||||
type VariantAccessStream: Stream<Item = Result<(Self::Bytes, String), RepoError>>;
|
||||
type VariantAccessStream: Stream<Item = Result<(Hash, String), RepoError>>;
|
||||
|
||||
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::formats::InternalFormat;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub(crate) struct Hash {
|
||||
hash: Arc<[u8; 32]>,
|
||||
size: u64,
|
||||
|
@ -9,8 +9,21 @@ pub(crate) struct Hash {
|
|||
}
|
||||
|
||||
impl Hash {
|
||||
pub(crate) fn new(hash: Arc<[u8; 32]>, size: u64, format: InternalFormat) -> Self {
|
||||
Self { hash, format, size }
|
||||
pub(crate) fn new(hash: [u8; 32], size: u64, format: InternalFormat) -> Self {
|
||||
Self {
|
||||
hash: Arc::new(hash),
|
||||
format,
|
||||
size,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn test_value() -> Self {
|
||||
Self {
|
||||
hash: Arc::new([0u8; 32]),
|
||||
format: InternalFormat::Image(crate::formats::ImageFormat::Jxl),
|
||||
size: 1234,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn to_bytes(&self) -> Vec<u8> {
|
||||
|
@ -63,3 +76,82 @@ impl std::fmt::Debug for Hash {
|
|||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
struct SerdeHash {
|
||||
hash: String,
|
||||
size: u64,
|
||||
format: InternalFormat,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Hash {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let hash = hex::encode(&self.hash[..]);
|
||||
|
||||
SerdeHash {
|
||||
hash,
|
||||
size: self.size,
|
||||
format: self.format,
|
||||
}
|
||||
.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for Hash {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
use serde::de::Error;
|
||||
|
||||
let SerdeHash { hash, size, format } = SerdeHash::deserialize(deserializer)?;
|
||||
let hash = hex::decode(hash)
|
||||
.map_err(D::Error::custom)?
|
||||
.try_into()
|
||||
.map_err(|_| D::Error::custom("Invalid hash size"))?;
|
||||
|
||||
Ok(Hash::new(hash, size, format))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Hash;
|
||||
|
||||
#[test]
|
||||
fn round_trip() {
|
||||
let hashes = [
|
||||
Hash {
|
||||
hash: std::sync::Arc::from([0u8; 32]),
|
||||
size: 1234,
|
||||
format: crate::formats::InternalFormat::Image(crate::formats::ImageFormat::Jxl),
|
||||
},
|
||||
Hash {
|
||||
hash: std::sync::Arc::from([255u8; 32]),
|
||||
size: 1234,
|
||||
format: crate::formats::InternalFormat::Animation(
|
||||
crate::formats::AnimationFormat::Avif,
|
||||
),
|
||||
},
|
||||
Hash {
|
||||
hash: std::sync::Arc::from([99u8; 32]),
|
||||
size: 1234,
|
||||
format: crate::formats::InternalFormat::Video(
|
||||
crate::formats::InternalVideoFormat::Mp4,
|
||||
),
|
||||
},
|
||||
];
|
||||
|
||||
for hash in hashes {
|
||||
let bytes = hash.to_bytes();
|
||||
let new_hash = Hash::from_bytes(&bytes).expect("From bytes");
|
||||
let new_bytes = new_hash.to_bytes();
|
||||
|
||||
assert_eq!(hash, new_hash, "Hash mismatch");
|
||||
assert_eq!(bytes, new_bytes, "Bytes mismatch");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use crate::{
|
||||
details::MaybeHumanDate,
|
||||
repo::{
|
||||
hash::Hash, Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo,
|
||||
HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, MigrationRepo, QueueRepo,
|
||||
SettingsRepo, UploadId, UploadRepo, UploadResult,
|
||||
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
|
||||
Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId,
|
||||
MigrationRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, UploadId, UploadRepo,
|
||||
UploadResult, VariantAccessRepo,
|
||||
},
|
||||
serde_str::Serde,
|
||||
store::StoreError,
|
||||
|
@ -24,8 +25,6 @@ use std::{
|
|||
use tokio::{sync::Notify, task::JoinHandle};
|
||||
use url::Url;
|
||||
|
||||
use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo};
|
||||
|
||||
macro_rules! b {
|
||||
($self:ident.$ident:ident, $expr:expr) => {{
|
||||
let $ident = $self.$ident.clone();
|
||||
|
@ -317,7 +316,7 @@ impl futures_util::Stream for AliasAccessStream {
|
|||
}
|
||||
|
||||
impl futures_util::Stream for VariantAccessStream {
|
||||
type Item = Result<(IVec, String), RepoError>;
|
||||
type Item = Result<(Hash, String), RepoError>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
|
@ -888,9 +887,12 @@ pub(crate) enum VariantKeyError {
|
|||
|
||||
#[error("Invalid utf8 in Variant")]
|
||||
Utf8,
|
||||
|
||||
#[error("Hash format is invalid")]
|
||||
InvalidHash,
|
||||
}
|
||||
|
||||
fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyError> {
|
||||
fn parse_variant_access_key(bytes: IVec) -> Result<(Hash, String), VariantKeyError> {
|
||||
if bytes.len() < 8 {
|
||||
return Err(VariantKeyError::TooShort);
|
||||
}
|
||||
|
@ -904,6 +906,8 @@ fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyErr
|
|||
|
||||
let hash = bytes.subslice(8, hash_len);
|
||||
|
||||
let hash = Hash::from_ivec(hash).ok_or(VariantKeyError::InvalidHash)?;
|
||||
|
||||
let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len);
|
||||
|
||||
if variant_len == 0 {
|
||||
|
@ -1421,10 +1425,10 @@ impl From<actix_rt::task::JoinError> for SledError {
|
|||
mod tests {
|
||||
#[test]
|
||||
fn round_trip() {
|
||||
let hash = sled::IVec::from(b"some hash value");
|
||||
let hash = crate::repo::Hash::test_value();
|
||||
let variant = String::from("some string value");
|
||||
|
||||
let key = super::variant_access_key(&hash, &variant);
|
||||
let key = super::variant_access_key(&hash.to_bytes(), &variant);
|
||||
|
||||
let (out_hash, out_variant) =
|
||||
super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes");
|
||||
|
|
Loading…
Reference in a new issue