diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 06eeeff..4bb10d8 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -1,6 +1,7 @@ use crate::{ details::Details, error::{Error, UploadError}, + repo::Hash, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; @@ -16,7 +17,7 @@ use tracing::Span; type OutcomeReceiver = Receiver<(Details, web::Bytes)>; -type ProcessMapKey = (Vec, PathBuf); +type ProcessMapKey = (Hash, PathBuf); type ProcessMapInner = DashMap; @@ -32,14 +33,14 @@ impl ProcessMap { pub(super) async fn process( &self, - hash: &[u8], + hash: Hash, path: PathBuf, fut: Fut, ) -> Result<(Details, web::Bytes), Error> where Fut: Future>, { - let key = (hash.to_vec(), path.clone()); + let key = (hash.clone(), path.clone()); let (sender, receiver) = flume::bounded(1); @@ -51,8 +52,8 @@ impl ProcessMap { let span = tracing::info_span!( "Processing image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), + hash = ?hash, + path = ?path, completed = &tracing::field::Empty, ); @@ -63,8 +64,8 @@ impl ProcessMap { Entry::Occupied(receiver) => { let span = tracing::info_span!( "Waiting for processed image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), + hash = ?hash, + path = ?path, ); let receiver = receiver.get().clone().into_recv_async(); diff --git a/src/formats.rs b/src/formats.rs index 7826e20..3ea46e5 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -7,7 +7,9 @@ use std::str::FromStr; pub(crate) use animation::{AnimationFormat, AnimationOutput}; pub(crate) use image::{ImageFormat, ImageInput, ImageOutput}; -pub(crate) use video::{InternalVideoFormat, OutputVideoFormat, VideoFormat, VideoCodec, AudioCodec}; +pub(crate) use video::{ + AudioCodec, InternalVideoFormat, OutputVideoFormat, VideoCodec, VideoFormat, +}; #[derive(Clone, Debug)] pub(crate) struct Validations<'a> { @@ -23,10 +25,12 @@ pub(crate) enum InputFile { Video(VideoFormat), } -#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] pub(crate) enum InternalFormat { - Image(ImageFormat), Animation(AnimationFormat), + Image(ImageFormat), Video(InternalVideoFormat), } @@ -75,6 +79,39 @@ impl InternalFormat { } } + pub(crate) const fn to_bytes(self) -> &'static [u8] { + match self { + Self::Animation(AnimationFormat::Apng) => b"a-apng", + Self::Animation(AnimationFormat::Avif) => b"a-avif", + Self::Animation(AnimationFormat::Gif) => b"a-gif", + Self::Animation(AnimationFormat::Webp) => b"a-webp", + Self::Image(ImageFormat::Avif) => b"i-avif", + Self::Image(ImageFormat::Jpeg) => b"i-jpeg", + Self::Image(ImageFormat::Jxl) => b"i-jxl", + Self::Image(ImageFormat::Png) => b"i-png", + Self::Image(ImageFormat::Webp) => b"i-webp", + Self::Video(InternalVideoFormat::Mp4) => b"v-mp4", + Self::Video(InternalVideoFormat::Webm) => b"v-webm", + } + } + + pub(crate) const fn from_bytes(bytes: &[u8]) -> Option { + match bytes { + b"a-apng" => Some(Self::Animation(AnimationFormat::Apng)), + b"a-avif" => Some(Self::Animation(AnimationFormat::Avif)), + b"a-gif" => Some(Self::Animation(AnimationFormat::Gif)), + b"a-webp" => Some(Self::Animation(AnimationFormat::Webp)), + b"i-avif" => Some(Self::Image(ImageFormat::Avif)), + b"i-jpeg" => Some(Self::Image(ImageFormat::Jpeg)), + b"i-jxl" => Some(Self::Image(ImageFormat::Jxl)), + b"i-png" => Some(Self::Image(ImageFormat::Png)), + b"i-webp" => Some(Self::Image(ImageFormat::Webp)), + b"v-mp4" => Some(Self::Video(InternalVideoFormat::Mp4)), + b"v-webm" => Some(Self::Video(InternalVideoFormat::Webm)), + _ => None, + } + } + pub(crate) fn maybe_from_media_type(mime: &mime::Mime, has_frames: bool) -> Option { match (mime.type_(), mime.subtype().as_str(), has_frames) { (mime::IMAGE, "apng", _) => Some(Self::Animation(AnimationFormat::Apng)), diff --git a/src/generate.rs b/src/generate.rs index 3498c1b..be12297 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -4,7 +4,7 @@ use crate::{ error::{Error, UploadError}, ffmpeg::ThumbnailFormat, formats::{InputProcessableFormat, InternalVideoFormat}, - repo::{Alias, FullRepo}, + repo::{Alias, FullRepo, Hash}, store::Store, }; use actix_web::web::Bytes; @@ -51,7 +51,7 @@ pub(crate) async fn generate( input_format: Option, thumbnail_format: Option, media: &crate::config::Media, - hash: R::Bytes, + hash: Hash, ) -> Result<(Details, Bytes), Error> { let process_fut = process( repo, @@ -67,7 +67,7 @@ pub(crate) async fn generate( ); let (details, bytes) = process_map - .process(hash.as_ref(), thumbnail_path, process_fut) + .process(hash, thumbnail_path, process_fut) .await?; Ok((details, bytes)) @@ -85,7 +85,7 @@ async fn process( input_format: Option, thumbnail_format: Option, media: &crate::config::Media, - hash: R::Bytes, + hash: Hash, ) -> Result<(Details, Bytes), Error> { let guard = MetricsGuard::guard(); let permit = crate::PROCESS_SEMAPHORE.acquire().await; diff --git a/src/ingest.rs b/src/ingest.rs index e797791..714af5e 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,12 +3,11 @@ use crate::{ either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo}, store::Store, }; use actix_web::web::Bytes; use futures_util::{Stream, StreamExt}; -use sha2::{Digest, Sha256}; use tracing::{Instrument, Span}; mod hasher; @@ -22,7 +21,7 @@ where { repo: R, delete_token: DeleteToken, - hash: Option>, + hash: Option, alias: Option, identifier: Option, } @@ -97,8 +96,8 @@ where Either::right(validated_reader) }; - let hasher_reader = Hasher::new(processed_reader, Sha256::new()); - let hasher = hasher_reader.hasher(); + let hasher_reader = Hasher::new(processed_reader); + let state = hasher_reader.state(); let identifier = store .save_async_read(hasher_reader, input_type.media_type()) @@ -114,14 +113,16 @@ where identifier: Some(identifier.clone()), }; - let hash = hasher.borrow_mut().finalize_reset().to_vec(); + let (hash, size) = state.borrow_mut().finalize_reset(); - save_upload(&mut session, repo, store, &hash, &identifier).await?; + let hash = Hash::new(hash, size, input_type); + + save_upload(&mut session, repo, store, hash.clone(), &identifier).await?; if let Some(alias) = declared_alias { - session.add_existing_alias(&hash, alias).await? + session.add_existing_alias(hash, alias).await? } else { - session.create_alias(&hash, input_type).await? + session.create_alias(hash, input_type).await? }; Ok(session) @@ -132,14 +133,14 @@ async fn save_upload( session: &mut Session, repo: &R, store: &S, - hash: &[u8], + hash: Hash, identifier: &S::Identifier, ) -> Result<(), Error> where S: Store, R: FullRepo, { - if HashRepo::create(repo, hash.to_vec().into(), identifier) + if HashRepo::create(repo, hash.clone(), identifier) .await? .is_err() { @@ -150,7 +151,7 @@ where } // Set hash after upload uniquness check so we don't clean existing files on failure - session.hash = Some(Vec::from(hash)); + session.hash = Some(hash); Ok(()) } @@ -177,10 +178,8 @@ where } #[tracing::instrument(skip(self, hash))] - async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { - let hash: R::Bytes = hash.to_vec().into(); - - AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone()) + async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> { + AliasRepo::create(&self.repo, &alias, &self.delete_token, hash) .await? .map_err(|_| UploadError::DuplicateAlias)?; @@ -190,9 +189,7 @@ where } #[tracing::instrument(level = "debug", skip(self, hash))] - async fn create_alias(&mut self, hash: &[u8], input_type: InternalFormat) -> Result<(), Error> { - let hash: R::Bytes = hash.to_vec().into(); - + async fn create_alias(&mut self, hash: Hash, input_type: InternalFormat) -> Result<(), Error> { loop { let alias = Alias::generate(input_type.file_extension().to_string()); @@ -232,7 +229,7 @@ where tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + let _ = crate::queue::cleanup_hash(&repo, hash).await; } .instrument(cleanup_span), ) diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 811600b..b5b809c 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -1,4 +1,4 @@ -use sha2::{digest::FixedOutputReset, Digest}; +use sha2::{digest::FixedOutputReset, Digest, Sha256}; use std::{ cell::RefCell, pin::Pin, @@ -7,35 +7,47 @@ use std::{ }; use tokio::io::{AsyncRead, ReadBuf}; +pub(super) struct State { + hasher: Sha256, + size: u64, +} + pin_project_lite::pin_project! { - pub(crate) struct Hasher { + pub(crate) struct Hasher { #[pin] inner: I, - hasher: Rc>, + state: Rc>, } } -impl Hasher -where - D: Digest + FixedOutputReset + Send + 'static, -{ - pub(super) fn new(reader: I, digest: D) -> Self { +impl Hasher { + pub(super) fn new(reader: I) -> Self { Hasher { inner: reader, - hasher: Rc::new(RefCell::new(digest)), + state: Rc::new(RefCell::new(State { + hasher: Sha256::new(), + size: 0, + })), } } - pub(super) fn hasher(&self) -> Rc> { - Rc::clone(&self.hasher) + pub(super) fn state(&self) -> Rc> { + Rc::clone(&self.state) } } -impl AsyncRead for Hasher +impl State { + pub(super) fn finalize_reset(&mut self) -> ([u8; 32], u64) { + let arr = self.hasher.finalize_fixed_reset().into(); + + (arr, self.size) + } +} + +impl AsyncRead for Hasher where I: AsyncRead, - D: Digest, { fn poll_read( mut self: Pin<&mut Self>, @@ -45,15 +57,15 @@ where let this = self.as_mut().project(); let reader = this.inner; - let hasher = this.hasher; + let state = this.state; let before_len = buf.filled().len(); let poll_res = reader.poll_read(cx, buf); let after_len = buf.filled().len(); if after_len > before_len { - hasher - .borrow_mut() - .update(&buf.filled()[before_len..after_len]); + let mut guard = state.borrow_mut(); + guard.hasher.update(&buf.filled()[before_len..after_len]); + guard.size += u64::try_from(after_len - before_len).expect("Size is reasonable"); } poll_res } @@ -86,24 +98,26 @@ mod test { #[test] fn hasher_works() { - let hash = test_on_arbiter!(async move { + let (hash, size) = test_on_arbiter!(async move { let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; - let mut reader = Hasher::new(file1, Sha256::new()); + let mut reader = Hasher::new(file1); tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; - Ok(reader.hasher().borrow_mut().finalize_reset().to_vec()) as std::io::Result<_> + Ok(reader.state().borrow_mut().finalize_reset()) as std::io::Result<_> }) .unwrap(); let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); let mut vec = Vec::new(); file.read_to_end(&mut vec).unwrap(); + let correct_size = vec.len() as u64; let mut hasher = Sha256::new(); hasher.update(vec); - let correct_hash = hasher.finalize_reset().to_vec(); + let correct_hash: [u8; 32] = hasher.finalize_reset().into(); assert_eq!(hash, correct_hash); + assert_eq!(size, correct_size); } } diff --git a/src/lib.rs b/src/lib.rs index 89818ca..d1adaab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,8 +68,8 @@ use self::{ migrate_store::migrate_store, queue::queue_generate, repo::{ - sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, - Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, + sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, Hash, HashRepo, + IdentifierRepo, Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, }, serde_str::Serde, store::{ @@ -696,7 +696,7 @@ async fn process_details( Ok(HttpResponse::Ok().json(&details)) } -async fn not_found_hash(repo: &R) -> Result, Error> { +async fn not_found_hash(repo: &R) -> Result, Error> { let Some(not_found) = repo.get(NOT_FOUND_KEY).await? else { return Ok(None); }; @@ -1115,8 +1115,7 @@ async fn do_serve( let Some(identifier) = repo.identifier(hash.clone()).await? else { tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) + "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); crate::queue::cleanup_hash(&repo, hash).await?; return Ok(HttpResponse::NotFound().finish()); diff --git a/src/migrate_store.rs b/src/migrate_store.rs index a716b12..27846f1 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ details::Details, error::{Error, UploadError}, - repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, + repo::{Hash, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, store::{Identifier, Store}, }; @@ -125,7 +125,7 @@ where let mut joinset = tokio::task::JoinSet::new(); while let Some(hash) = stream.next().await { - let hash = hash?.as_ref().to_vec(); + let hash = hash?; if joinset.len() >= 32 { if let Some(res) = joinset.join_next().await { @@ -149,11 +149,8 @@ where Ok(()) } -#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] -async fn migrate_hash( - state: &MigrateState, - hash: Vec, -) -> Result<(), Error> +#[tracing::instrument(skip(state))] +async fn migrate_hash(state: &MigrateState, hash: Hash) -> Result<(), Error> where S1: Store, S2: Store, @@ -175,14 +172,13 @@ where let current_index = index.fetch_add(1, Ordering::Relaxed); - let original_identifier = match repo.identifier(hash.clone().into()).await { + let original_identifier = match repo.identifier(hash.clone()).await { Ok(Some(identifier)) => identifier, Ok(None) => { tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) + "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); - crate::queue::cleanup_hash(repo, hash.clone().into()).await?; + crate::queue::cleanup_hash(repo, hash.clone()).await?; return Ok(()); } Err(e) => return Err(e.into()), @@ -221,24 +217,21 @@ where } } - if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { + if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.clone().into(), &new_identifier) + repo.relate_motion_identifier(hash.clone(), &new_identifier) .await?; repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + tracing::warn!("Skipping motion file for hash {hash:?}"); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}"); return Err(e); } Err(MigrateError::From(e)) => { @@ -253,30 +246,22 @@ where } } - for (variant, identifier) in repo.variants(hash.clone().into()).await? { + for (variant, identifier) in repo.variants(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; - repo.remove_variant(hash.clone().into(), variant.clone()) - .await?; - repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) + repo.remove_variant(hash.clone(), variant.clone()).await?; + repo.relate_variant_identifier(hash.clone(), variant, &new_identifier) .await?; repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!( - "Skipping variant {} for hash {}", - variant, - hex::encode(&hash) - ); + tracing::warn!("Skipping variant {variant} for hash {hash:?}",); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}",); return Err(e); } Err(MigrateError::From(e)) => { @@ -303,19 +288,16 @@ where { Ok(new_identifier) => { migrate_details(repo, &original_identifier, &new_identifier).await?; - repo.update_identifier(hash.clone().into(), &new_identifier) + repo.update_identifier(hash.clone(), &new_identifier) .await?; repo.mark_migrated(&original_identifier, &new_identifier) .await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + tracing::warn!("Skipping original file for hash {hash:?}"); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}",); return Err(e); } Err(MigrateError::From(e)) => { diff --git a/src/queue.rs b/src/queue.rs index ba932ce..8ec3c72 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -4,7 +4,7 @@ use crate::{ error::Error, formats::InputProcessableFormat, repo::{ - Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, JobId, QueueRepo, + Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, JobId, QueueRepo, UploadId, }, serde_str::Serde, @@ -54,7 +54,7 @@ const PROCESS_QUEUE: &str = "process"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Cleanup { Hash { - hash: Base64Bytes, + hash: Hash, }, Identifier { identifier: Base64Bytes, @@ -64,7 +64,7 @@ enum Cleanup { token: Serde, }, Variant { - hash: Base64Bytes, + hash: Hash, #[serde(skip_serializing_if = "Option::is_none")] variant: Option, }, @@ -101,10 +101,8 @@ pub(crate) async fn cleanup_alias( Ok(()) } -pub(crate) async fn cleanup_hash(repo: &R, hash: R::Bytes) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Hash { - hash: Base64Bytes(hash.as_ref().to_vec()), - })?; +pub(crate) async fn cleanup_hash(repo: &R, hash: Hash) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Hash { hash })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } @@ -122,13 +120,10 @@ pub(crate) async fn cleanup_identifier( async fn cleanup_variants( repo: &R, - hash: R::Bytes, + hash: Hash, variant: Option, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Variant { - hash: Base64Bytes(hash.as_ref().to_vec()), - variant, - })?; + let job = serde_json::to_vec(&Cleanup::Variant { hash, variant })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } @@ -218,7 +213,6 @@ async fn process_jobs( callback: F, ) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, @@ -279,35 +273,42 @@ async fn job_loop( ) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { - let (job_id, bytes) = repo.pop(queue).await?; + let fut = async { + let (job_id, bytes) = repo.pop(queue, worker_id).await?; - let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + let span = tracing::info_span!("Running Job"); - let guard = MetricsGuard::guard(worker_id, queue); + let guard = MetricsGuard::guard(worker_id, queue); - let res = span - .in_scope(|| { - heartbeat( - repo, - queue, - job_id, - (callback)(repo, store, config, bytes.as_ref()), - ) - }) - .instrument(span) - .await; + let res = span + .in_scope(|| { + heartbeat( + repo, + queue, + worker_id, + job_id, + (callback)(repo, store, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await; - repo.complete_job(queue, job_id).await?; + repo.complete_job(queue, worker_id, job_id).await?; - res?; + res?; - guard.disarm(); + guard.disarm(); + + Ok(()) as Result<(), Error> + }; + + fut.instrument(tracing::info_span!("tick", worker_id = %worker_id)) + .await?; } } @@ -320,7 +321,6 @@ async fn process_image_jobs( callback: F, ) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn( &'a R, @@ -358,7 +358,6 @@ async fn image_job_loop( ) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, S: Store, for<'a> F: Fn( &'a R, @@ -370,38 +369,52 @@ where + Copy, { loop { - let (job_id, bytes) = repo.pop(queue).await?; + let fut = async { + let (job_id, bytes) = repo.pop(queue, worker_id).await?; - let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + let span = tracing::info_span!("Running Job"); - let guard = MetricsGuard::guard(worker_id, queue); + let guard = MetricsGuard::guard(worker_id, queue); - let res = span - .in_scope(|| { - heartbeat( - repo, - queue, - job_id, - (callback)(repo, store, process_map, config, bytes.as_ref()), - ) - }) - .instrument(span) - .await; + let res = span + .in_scope(|| { + heartbeat( + repo, + queue, + worker_id, + job_id, + (callback)(repo, store, process_map, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await; - repo.complete_job(queue, job_id).await?; + repo.complete_job(queue, worker_id, job_id).await?; - res?; + res?; - guard.disarm(); + guard.disarm(); + Ok(()) as Result<(), Error> + }; + + fut.instrument(tracing::info_span!("tick", worker_id = %worker_id)) + .await?; } } -async fn heartbeat(repo: &R, queue: &'static str, job_id: JobId, fut: Fut) -> Fut::Output +async fn heartbeat( + repo: &R, + queue: &'static str, + worker_id: uuid::Uuid, + job_id: JobId, + fut: Fut, +) -> Fut::Output where R: QueueRepo, Fut: std::future::Future, { - let mut fut = std::pin::pin!(fut); + let mut fut = + std::pin::pin!(fut.instrument(tracing::info_span!("job-future", job_id = ?job_id))); let mut interval = actix_rt::time::interval(Duration::from_secs(5)); @@ -414,10 +427,12 @@ where } _ = interval.tick() => { if hb.is_none() { - hb = Some(repo.heartbeat(queue, job_id)); + hb = Some(repo.heartbeat(queue, worker_id, job_id)); } } opt = poll_opt(hb.as_mut()), if hb.is_some() => { + hb.take(); + if let Some(Err(e)) = opt { tracing::warn!("Failed heartbeat\n{}", format!("{e:?}")); } @@ -432,6 +447,6 @@ where { match opt { None => None, - Some(fut) => std::future::poll_fn(|cx| Pin::new(&mut *fut).poll(cx).map(Some)).await, + Some(fut) => Some(fut.await), } } diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 3bb051c..bb5f9e7 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, repo::{ - Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, + Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, VariantAccessRepo, }, serde_str::Serde, @@ -24,9 +24,7 @@ where Box::pin(async move { match serde_json::from_slice(job) { Ok(job) => match job { - Cleanup::Hash { - hash: Base64Bytes(in_hash), - } => hash::(repo, in_hash).await?, + Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, Cleanup::Identifier { identifier: Base64Bytes(in_identifier), } => identifier(repo, store, in_identifier).await?, @@ -41,10 +39,9 @@ where ) .await? } - Cleanup::Variant { - hash: Base64Bytes(hash), - variant, - } => hash_variant::(repo, hash, variant).await?, + Cleanup::Variant { hash, variant } => { + hash_variant::(repo, hash, variant).await? + } Cleanup::AllVariants => all_variants::(repo).await?, Cleanup::OutdatedVariants => outdated_variants::(repo, configuration).await?, Cleanup::OutdatedProxies => outdated_proxies::(repo, configuration).await?, @@ -76,26 +73,19 @@ where errors.push(e); } - if !errors.is_empty() { - let span = tracing::error_span!("Error deleting files"); - span.in_scope(|| { - for error in errors { - tracing::error!("{}", format!("{error}")); - } - }); + for error in errors { + tracing::error!("{}", format!("{error:?}")); } Ok(()) } #[tracing::instrument(skip_all)] -async fn hash(repo: &R, hash: Vec) -> Result<(), Error> +async fn hash(repo: &R, hash: Hash) -> Result<(), Error> where R: FullRepo, S: Store, { - let hash: R::Bytes = hash.into(); - let aliases = repo.for_hash(hash.clone()).await?; if !aliases.is_empty() { @@ -221,15 +211,13 @@ where #[tracing::instrument(skip_all)] async fn hash_variant( repo: &R, - hash: Vec, + hash: Hash, target_variant: Option, ) -> Result<(), Error> where R: FullRepo, S: Store, { - let hash: R::Bytes = hash.into(); - if let Some(target_variant) = target_variant { if let Some(identifier) = repo .variant_identifier::(hash.clone(), target_variant.clone()) diff --git a/src/repo.rs b/src/repo.rs index 69d446c..6b8f9c1 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -4,12 +4,15 @@ use crate::{ store::{Identifier, StoreError}, }; use futures_util::Stream; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; +mod hash; pub(crate) mod sled; +pub(crate) use hash::Hash; + #[derive(Clone, Debug)] pub(crate) enum Repo { Sled(self::sled::SledRepo), @@ -128,16 +131,9 @@ where } } -pub(crate) trait BaseRepo { - type Bytes: AsRef<[u8]> + From> + Clone; -} +pub(crate) trait BaseRepo {} -impl BaseRepo for actix_web::web::Data -where - T: BaseRepo, -{ - type Bytes = T::Bytes; -} +impl BaseRepo for actix_web::web::Data where T: BaseRepo {} #[async_trait::async_trait(?Send)] pub(crate) trait ProxyRepo: BaseRepo { @@ -205,19 +201,18 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait VariantAccessRepo: BaseRepo { - type VariantAccessStream: Stream>; + type VariantAccessStream: Stream>; - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - async fn contains_variant(&self, hash: Self::Bytes, variant: String) - -> Result; + async fn contains_variant(&self, hash: Hash, variant: String) -> Result; async fn older_variants( &self, timestamp: time::OffsetDateTime, ) -> Result; - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -227,15 +222,11 @@ where { type VariantAccessStream = T::VariantAccessStream; - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::accessed(self, hash, variant).await } - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { + async fn contains_variant(&self, hash: Hash, variant: String) -> Result { T::contains_variant(self, hash, variant).await } @@ -246,7 +237,7 @@ where T::older_variants(self, timestamp).await } - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::remove_access(self, hash, variant).await } } @@ -303,13 +294,27 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result; + async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result; - async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError>; + async fn pop( + &self, + queue: &'static str, + worker_id: Uuid, + ) -> Result<(JobId, Arc<[u8]>), RepoError>; - async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>; + async fn heartbeat( + &self, + queue: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError>; - async fn complete_job(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>; + async fn complete_job( + &self, + queue: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -317,27 +322,41 @@ impl QueueRepo for actix_web::web::Data where T: QueueRepo, { - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result { + async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result { T::push(self, queue, job).await } - async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { - T::pop(self, queue).await + async fn pop( + &self, + queue: &'static str, + worker_id: Uuid, + ) -> Result<(JobId, Arc<[u8]>), RepoError> { + T::pop(self, queue, worker_id).await } - async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError> { - T::heartbeat(self, queue, job_id).await + async fn heartbeat( + &self, + queue: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { + T::heartbeat(self, queue, worker_id, job_id).await } - async fn complete_job(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError> { - T::complete_job(self, queue, job_id).await + async fn complete_job( + &self, + queue: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { + T::complete_job(self, queue, worker_id, job_id).await } } #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo: BaseRepo { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>; - async fn get(&self, key: &'static str) -> Result, RepoError>; + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>; + async fn get(&self, key: &'static str) -> Result>, RepoError>; async fn remove(&self, key: &'static str) -> Result<(), RepoError>; } @@ -346,11 +365,11 @@ impl SettingsRepo for actix_web::web::Data where T: SettingsRepo, { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> { T::set(self, key, value).await } - async fn get(&self, key: &'static str) -> Result, RepoError> { + async fn get(&self, key: &'static str) -> Result>, RepoError> { T::get(self, key).await } @@ -436,7 +455,7 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { - type Stream: Stream>; + type Stream: Stream>; async fn size(&self) -> Result; @@ -444,49 +463,49 @@ pub(crate) trait HashRepo: BaseRepo { async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError>; async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError>; async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError>; async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError>; async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError>; async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>; + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -506,7 +525,7 @@ where async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError> { T::create(self, hash, identifier).await @@ -514,7 +533,7 @@ where async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { T::update_identifier(self, hash, identifier).await @@ -522,14 +541,14 @@ where async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::identifier(self, hash).await } async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError> { @@ -538,7 +557,7 @@ where async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError> { T::variant_identifier(self, hash, variant).await @@ -546,18 +565,18 @@ where async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::variants(self, hash).await } - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::remove_variant(self, hash, variant).await } async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { T::relate_motion_identifier(self, hash, identifier).await @@ -565,12 +584,12 @@ where async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::motion_identifier(self, hash).await } - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { T::cleanup(self, hash).await } } @@ -581,14 +600,14 @@ pub(crate) trait AliasRepo: BaseRepo { &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError>; async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; - async fn hash(&self, alias: &Alias) -> Result, RepoError>; + async fn hash(&self, alias: &Alias) -> Result, RepoError>; - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn for_hash(&self, hash: Hash) -> Result, RepoError>; async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; } @@ -602,7 +621,7 @@ where &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError> { T::create(self, alias, delete_token, hash).await } @@ -611,11 +630,11 @@ where T::delete_token(self, alias).await } - async fn hash(&self, alias: &Alias) -> Result, RepoError> { + async fn hash(&self, alias: &Alias) -> Result, RepoError> { T::hash(self, alias).await } - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn for_hash(&self, hash: Hash) -> Result, RepoError> { T::for_hash(self, hash).await } diff --git a/src/repo/hash.rs b/src/repo/hash.rs new file mode 100644 index 0000000..12c7550 --- /dev/null +++ b/src/repo/hash.rs @@ -0,0 +1,157 @@ +use crate::formats::InternalFormat; +use std::sync::Arc; + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct Hash { + hash: Arc<[u8; 32]>, + size: u64, + format: InternalFormat, +} + +impl Hash { + pub(crate) fn new(hash: [u8; 32], size: u64, format: InternalFormat) -> Self { + Self { + hash: Arc::new(hash), + format, + size, + } + } + + #[cfg(test)] + pub(crate) fn test_value() -> Self { + Self { + hash: Arc::new([0u8; 32]), + format: InternalFormat::Image(crate::formats::ImageFormat::Jxl), + size: 1234, + } + } + + pub(super) fn to_bytes(&self) -> Vec { + let format = self.format.to_bytes(); + + let mut vec = Vec::with_capacity(32 + 8 + format.len()); + + vec.extend_from_slice(&self.hash[..]); + vec.extend(self.size.to_be_bytes()); + vec.extend(format); + + vec + } + + pub(super) fn to_ivec(&self) -> sled::IVec { + sled::IVec::from(self.to_bytes()) + } + + pub(super) fn from_ivec(ivec: sled::IVec) -> Option { + Self::from_bytes(&ivec) + } + + pub(super) fn from_bytes(bytes: &[u8]) -> Option { + if bytes.len() < 32 + 8 + 5 { + return None; + } + + let hash = &bytes[..32]; + let size = &bytes[32..40]; + let format = &bytes[40..]; + + let hash: [u8; 32] = hash.try_into().expect("Correct length"); + let size: [u8; 8] = size.try_into().expect("Correct length"); + let format = InternalFormat::from_bytes(format)?; + + Some(Self { + hash: Arc::new(hash), + size: u64::from_be_bytes(size), + format, + }) + } +} + +impl std::fmt::Debug for Hash { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Hash") + .field("hash", &hex::encode(&*self.hash)) + .field("format", &self.format) + .field("size", &self.size) + .finish() + } +} + +#[derive(serde::Deserialize, serde::Serialize)] +struct SerdeHash { + hash: String, + size: u64, + format: InternalFormat, +} + +impl serde::Serialize for Hash { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let hash = hex::encode(&self.hash[..]); + + SerdeHash { + hash, + size: self.size, + format: self.format, + } + .serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for Hash { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::de::Error; + + let SerdeHash { hash, size, format } = SerdeHash::deserialize(deserializer)?; + let hash = hex::decode(hash) + .map_err(D::Error::custom)? + .try_into() + .map_err(|_| D::Error::custom("Invalid hash size"))?; + + Ok(Hash::new(hash, size, format)) + } +} + +#[cfg(test)] +mod tests { + use super::Hash; + + #[test] + fn round_trip() { + let hashes = [ + Hash { + hash: std::sync::Arc::from([0u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Image(crate::formats::ImageFormat::Jxl), + }, + Hash { + hash: std::sync::Arc::from([255u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Animation( + crate::formats::AnimationFormat::Avif, + ), + }, + Hash { + hash: std::sync::Arc::from([99u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Video( + crate::formats::InternalVideoFormat::Mp4, + ), + }, + ]; + + for hash in hashes { + let bytes = hash.to_bytes(); + let new_hash = Hash::from_bytes(&bytes).expect("From bytes"); + let new_bytes = new_hash.to_bytes(); + + assert_eq!(hash, new_hash, "Hash mismatch"); + assert_eq!(bytes, new_bytes, "Bytes mismatch"); + } + } +} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 986a75b..6033d38 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,9 +1,10 @@ use crate::{ details::MaybeHumanDate, repo::{ - Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, - HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, MigrationRepo, QueueRepo, - SettingsRepo, UploadId, UploadRepo, UploadResult, + hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, + Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, + MigrationRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, UploadId, UploadRepo, + UploadResult, VariantAccessRepo, }, serde_str::Serde, store::StoreError, @@ -23,8 +24,7 @@ use std::{ }; use tokio::{sync::Notify, task::JoinHandle}; use url::Url; - -use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo}; +use uuid::Uuid; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ @@ -182,9 +182,7 @@ impl SledRepo { } } -impl BaseRepo for SledRepo { - type Bytes = IVec; -} +impl BaseRepo for SledRepo {} #[async_trait::async_trait(?Send)] impl FullRepo for SledRepo { @@ -317,7 +315,7 @@ impl futures_util::Stream for AliasAccessStream { } impl futures_util::Stream for VariantAccessStream { - type Item = Result<(IVec, String), RepoError>; + type Item = Result<(Hash, String), RepoError>; fn poll_next( mut self: Pin<&mut Self>, @@ -405,8 +403,9 @@ impl AliasAccessRepo for SledRepo { impl VariantAccessRepo for SledRepo { type VariantAccessStream = VariantAccessStream; - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let now_string = time::OffsetDateTime::now_utc() @@ -428,12 +427,9 @@ impl VariantAccessRepo for SledRepo { .map_err(RepoError::from) } - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { + #[tracing::instrument(level = "debug", skip(self))] + async fn contains_variant(&self, hash: Hash, variant: String) -> Result { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let timestamp = b!(self.variant_access, variant_access.get(key)); @@ -465,8 +461,9 @@ impl VariantAccessRepo for SledRepo { }) } - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let variant_access = self.variant_access.clone(); @@ -629,7 +626,7 @@ impl UploadRepo for SledRepo { enum JobState { Pending, - Running([u8; 8]), + Running([u8; 24]), } impl JobState { @@ -637,12 +634,26 @@ impl JobState { Self::Pending } - fn running() -> Self { - Self::Running( - time::OffsetDateTime::now_utc() - .unix_timestamp() - .to_be_bytes(), - ) + fn running(worker_id: Uuid) -> Self { + let first_eight = time::OffsetDateTime::now_utc() + .unix_timestamp() + .to_be_bytes(); + + let next_sixteen = worker_id.into_bytes(); + + let mut bytes = [0u8; 24]; + + bytes[0..8] + .iter_mut() + .zip(&first_eight) + .for_each(|(dest, src)| *dest = *src); + + bytes[8..24] + .iter_mut() + .zip(&next_sixteen) + .for_each(|(dest, src)| *dest = *src); + + Self::Running(bytes) } fn as_bytes(&self) -> &[u8] { @@ -663,7 +674,7 @@ fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] - async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result { + async fn push(&self, queue_name: &'static str, job: Arc<[u8]>) -> Result { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); @@ -676,7 +687,7 @@ impl QueueRepo for SledRepo { (&queue, &job_state).transaction(|(queue, job_state)| { let state = JobState::pending(); - queue.insert(&key[..], &job)?; + queue.insert(&key[..], &job[..])?; job_state.insert(&key[..], state.as_bytes())?; Ok(()) @@ -707,8 +718,12 @@ impl QueueRepo for SledRepo { Ok(id) } - #[tracing::instrument(skip(self))] - async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { + #[tracing::instrument(skip(self, worker_id), fields(job_id))] + async fn pop( + &self, + queue_name: &'static str, + worker_id: Uuid, + ) -> Result<(JobId, Arc<[u8]>), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); let now = time::OffsetDateTime::now_utc(); @@ -717,13 +732,15 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); + let span = tracing::Span::current(); let opt = actix_rt::task::spawn_blocking(move || { + let _guard = span.enter(); // Job IDs are generated with Uuid version 7 - defining their first bits as a // timestamp. Scanning a prefix should give us jobs in the order they were queued. for res in job_state.scan_prefix(queue_name) { let (key, value) = res?; - if value.len() == 8 { + if value.len() > 8 { let unix_timestamp = i64::from_be_bytes(value[0..8].try_into().expect("Verified length")); @@ -738,13 +755,14 @@ impl QueueRepo for SledRepo { } } - let state = JobState::running(); + let state = JobState::running(worker_id); - match job_state.compare_and_swap(&key, Some(value), Some(state.as_bytes())) { - Ok(_) => { + match job_state.compare_and_swap(&key, Some(value), Some(state.as_bytes()))? { + Ok(()) => { // acquired job } Err(_) => { + tracing::debug!("Contested"); // someone else acquired job continue; } @@ -756,9 +774,13 @@ impl QueueRepo for SledRepo { let job_id = JobId::from_bytes(id_bytes); - let opt = queue.get(&key)?.map(|job_bytes| (job_id, job_bytes)); + tracing::Span::current().record("job_id", &format!("{job_id:?}")); - return Ok(opt) as Result, SledError>; + let opt = queue + .get(&key)? + .map(|job_bytes| (job_id, Arc::from(job_bytes.to_vec()))); + + return Ok(opt) as Result)>, SledError>; } Ok(None) @@ -792,18 +814,23 @@ impl QueueRepo for SledRepo { } } - #[tracing::instrument(skip(self))] - async fn heartbeat(&self, queue_name: &'static str, job_id: JobId) -> Result<(), RepoError> { + #[tracing::instrument(skip(self, worker_id))] + async fn heartbeat( + &self, + queue_name: &'static str, + worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { let key = job_key(queue_name, job_id); let job_state = self.job_state.clone(); actix_rt::task::spawn_blocking(move || { if let Some(state) = job_state.get(&key)? { - let new_state = JobState::running(); + let new_state = JobState::running(worker_id); match job_state.compare_and_swap(&key, Some(state), Some(new_state.as_bytes()))? { - Ok(_) => Ok(()), + Ok(()) => Ok(()), Err(_) => Err(SledError::Conflict), } } else { @@ -816,8 +843,13 @@ impl QueueRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self))] - async fn complete_job(&self, queue_name: &'static str, job_id: JobId) -> Result<(), RepoError> { + #[tracing::instrument(skip(self, _worker_id))] + async fn complete_job( + &self, + queue_name: &'static str, + _worker_id: Uuid, + job_id: JobId, + ) -> Result<(), RepoError> { let key = job_key(queue_name, job_id); let queue = self.queue.clone(); @@ -844,17 +876,17 @@ impl QueueRepo for SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { #[tracing::instrument(level = "trace", skip(value))] - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { - b!(self.settings, settings.insert(key, value)); + async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> { + b!(self.settings, settings.insert(key, &value[..])); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn get(&self, key: &'static str) -> Result, RepoError> { + async fn get(&self, key: &'static str) -> Result>, RepoError> { let opt = b!(self.settings, settings.get(key)); - Ok(opt) + Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) } #[tracing::instrument(level = "trace", skip(self))] @@ -889,9 +921,12 @@ pub(crate) enum VariantKeyError { #[error("Invalid utf8 in Variant")] Utf8, + + #[error("Hash format is invalid")] + InvalidHash, } -fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyError> { +fn parse_variant_access_key(bytes: IVec) -> Result<(Hash, String), VariantKeyError> { if bytes.len() < 8 { return Err(VariantKeyError::TooShort); } @@ -905,6 +940,8 @@ fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyErr let hash = bytes.subslice(8, hash_len); + let hash = Hash::from_ivec(hash).ok_or(VariantKeyError::InvalidHash)?; + let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len); if variant_len == 0 { @@ -1012,7 +1049,7 @@ impl MigrationRepo for SledRepo { } } -type StreamItem = Result; +type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>; #[async_trait::async_trait(?Send)] @@ -1028,19 +1065,20 @@ impl HashRepo for SledRepo { } async fn hashes(&self) -> Self::Stream { - let iter = self - .hashes - .iter() - .keys() - .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); + let iter = self.hashes.iter().keys().filter_map(|res| { + res.map_err(SledError::from) + .map_err(RepoError::from) + .map(Hash::from_ivec) + .transpose() + }); Box::pin(from_iterator(iter, 8)) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError> { let identifier: sled::IVec = identifier.to_bytes()?.into(); @@ -1048,14 +1086,16 @@ impl HashRepo for SledRepo { let hashes = self.hashes.clone(); let hash_identifiers = self.hash_identifiers.clone(); + let hash = hash.to_ivec(); + let res = actix_web::web::block(move || { (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { - if hashes.get(&hash)?.is_some() { + if hashes.get(hash.clone())?.is_some() { return Ok(Err(HashAlreadyExists)); } - hashes.insert(&hash, &hash)?; - hash_identifiers.insert(&hash, &identifier)?; + hashes.insert(hash.clone(), hash.clone())?; + hash_identifiers.insert(hash.clone(), &identifier)?; Ok(Ok(())) }) @@ -1073,11 +1113,13 @@ impl HashRepo for SledRepo { async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { let identifier = identifier.to_bytes()?; + let hash = hash.to_ivec(); + b!( self.hash_identifiers, hash_identifiers.insert(hash, identifier) @@ -1086,11 +1128,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { return Ok(None); }; @@ -1098,13 +1142,15 @@ impl HashRepo for SledRepo { Ok(Some(I::from_bytes(ivec.to_vec())?)) } - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); let value = identifier.to_bytes()?; @@ -1116,12 +1162,14 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); let opt = b!( @@ -1132,15 +1180,17 @@ impl HashRepo for SledRepo { opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } - #[tracing::instrument(level = "debug", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "debug", skip(self))] async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let vec = b!( self.hash_variant_identifiers, Ok(hash_variant_identifiers - .scan_prefix(&hash) + .scan_prefix(hash.clone()) .filter_map(|res| res.ok()) .filter_map(|(key, ivec)| { let identifier = I::from_bytes(ivec.to_vec()).ok(); @@ -1164,8 +1214,10 @@ impl HashRepo for SledRepo { Ok(vec) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "trace", skip(self))] + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); b!( @@ -1176,12 +1228,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { + let hash = hash.to_ivec(); let bytes = identifier.to_bytes()?; b!( @@ -1192,11 +1245,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) @@ -1205,8 +1260,10 @@ impl HashRepo for SledRepo { opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + #[tracing::instrument(skip(self))] + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { + let hash = hash.to_ivec(); + let hashes = self.hashes.clone(); let hash_identifiers = self.hash_identifiers.clone(); let hash_motion_identifiers = self.hash_motion_identifiers.clone(); @@ -1274,8 +1331,9 @@ impl AliasRepo for SledRepo { &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError> { + let hash = hash.to_ivec(); let alias: sled::IVec = alias.to_bytes().into(); let delete_token: sled::IVec = delete_token.to_bytes().into(); @@ -1328,16 +1386,18 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn hash(&self, alias: &Alias) -> Result, RepoError> { + async fn hash(&self, alias: &Alias) -> Result, RepoError> { let key = alias.to_bytes(); let opt = b!(self.alias_hashes, alias_hashes.get(key)); - Ok(opt) + Ok(opt.and_then(Hash::from_ivec)) } #[tracing::instrument(skip_all)] - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn for_hash(&self, hash: Hash) -> Result, RepoError> { + let hash = hash.to_ivec(); + let v = b!(self.hash_aliases, { Ok(hash_aliases .scan_prefix(hash) @@ -1399,10 +1459,10 @@ impl From for SledError { mod tests { #[test] fn round_trip() { - let hash = sled::IVec::from(b"some hash value"); + let hash = crate::repo::Hash::test_value(); let variant = String::from("some string value"); - let key = super::variant_access_key(&hash, &variant); + let key = super::variant_access_key(&hash.to_bytes(), &variant); let (out_hash, out_variant) = super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes");