diff --git a/src/lib.rs b/src/lib.rs index 311fc3c..bd30421 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ mod processor; mod queue; mod range; mod repo; +mod repo_04; mod serde_str; mod store; mod stream; @@ -40,7 +41,6 @@ use futures_util::{ use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; use once_cell::sync::Lazy; -use repo::AliasAccessRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; @@ -69,8 +69,8 @@ use self::{ migrate_store::migrate_store, queue::queue_generate, repo::{ - sled::SledRepo, Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, - SettingsRepo, UploadId, UploadResult, VariantAccessRepo, + sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, + Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, }, serde_str::Serde, store::{ @@ -1974,9 +1974,6 @@ impl PictRsConfiguration { let store = FileStore::build(path, repo.clone()).await?; match repo { Repo::Sled(sled_repo) => { - sled_repo - .requeue_in_progress(config.server.worker_id.as_bytes().to_vec()) - .await?; sled_repo .mark_accessed::<::Identifier>() .await?; @@ -2019,9 +2016,6 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - sled_repo - .requeue_in_progress(config.server.worker_id.as_bytes().to_vec()) - .await?; sled_repo .mark_accessed::<::Identifier>() .await?; diff --git a/src/queue.rs b/src/queue.rs index 211d415..1613bd5 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -4,13 +4,19 @@ use crate::{ error::Error, formats::InputProcessableFormat, repo::{ - Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, UploadId, + Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, JobId, QueueRepo, + UploadId, }, serde_str::Serde, store::{Identifier, Store}, }; use base64::{prelude::BASE64_STANDARD, Engine}; -use std::{future::Future, path::PathBuf, pin::Pin, time::Instant}; +use std::{ + future::Future, + path::PathBuf, + pin::Pin, + time::{Duration, Instant}, +}; use tracing::Instrument; mod cleanup; @@ -289,15 +295,21 @@ where + Copy, { loop { - let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; + let (job_id, bytes) = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); let guard = MetricsGuard::guard(worker_id.clone(), queue); - span.in_scope(|| (callback)(repo, store, config, bytes.as_ref())) - .instrument(span) - .await?; + span.in_scope(|| { + heartbeat( + repo, + job_id, + (callback)(repo, store, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await?; guard.disarm(); } @@ -369,16 +381,62 @@ where + Copy, { loop { - let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; + let (job_id, bytes) = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); let guard = MetricsGuard::guard(worker_id.clone(), queue); - span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref())) - .instrument(span) - .await?; + span.in_scope(|| { + heartbeat( + repo, + job_id, + (callback)(repo, store, process_map, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await?; guard.disarm(); } } + +async fn heartbeat(repo: &R, job_id: JobId, fut: Fut) -> Fut::Output +where + R: QueueRepo, + Fut: std::future::Future, +{ + let mut fut = std::pin::pin!(fut); + + let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + + let mut hb = None; + + loop { + tokio::select! { + output = &mut fut => { + return output; + } + _ = interval.tick() => { + if hb.is_none() { + hb = Some(repo.heartbeat(job_id)); + } + } + opt = poll_opt(hb.as_mut()), if hb.is_some() => { + if let Some(Err(e)) = opt { + tracing::warn!("Failed heartbeat\n{}", format!("{e:?}")); + } + } + } + } +} + +async fn poll_opt(opt: Option<&mut Fut>) -> Option +where + Fut: std::future::Future + Unpin, +{ + match opt { + None => None, + Some(fut) => std::future::poll_fn(|cx| Pin::new(&mut *fut).poll(cx).map(Some)).await, + } +} diff --git a/src/repo.rs b/src/repo.rs index a53fbcc..122b0fd 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -3,7 +3,6 @@ use crate::{ details::Details, store::{Identifier, StoreError}, }; -use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::Stream; use std::fmt::Debug; use url::Url; @@ -285,13 +284,38 @@ where } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct JobId(Uuid); + +impl JobId { + pub(crate) fn gen() -> Self { + Self(Uuid::new_v4()) + } + + pub(crate) const fn as_bytes(&self) -> &[u8; 16] { + self.0.as_bytes() + } + + pub(crate) const fn from_bytes(bytes: [u8; 16]) -> Self { + Self(Uuid::from_bytes(bytes)) + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError>; + async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError>; async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>; - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; + async fn pop( + &self, + queue: &'static str, + worker_id: Vec, + ) -> Result<(JobId, Self::Bytes), RepoError>; + + async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError>; + + async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -299,17 +323,29 @@ impl QueueRepo for actix_web::web::Data where T: QueueRepo, { - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { - T::requeue_in_progress(self, worker_prefix).await + async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError> { + T::requeue_timed_out(self, worker_prefix).await } async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> { T::push(self, queue, job).await } - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result { + async fn pop( + &self, + queue: &'static str, + worker_id: Vec, + ) -> Result<(JobId, Self::Bytes), RepoError> { T::pop(self, queue, worker_id).await } + + async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError> { + T::heartbeat(self, job_id).await + } + + async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError> { + T::complete_job(self, job_id).await + } } #[async_trait::async_trait(?Send)] @@ -801,23 +837,6 @@ impl std::fmt::Display for Alias { } } -impl Identifier for Vec { - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized, - { - Ok(bytes) - } - - fn to_bytes(&self) -> Result, StoreError> { - Ok(self.clone()) - } - - fn string_repr(&self) -> String { - BASE64_STANDARD.encode(self.as_slice()) - } -} - #[cfg(test)] mod tests { use super::{Alias, DeleteToken, MaybeUuid, Uuid}; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 567fc4f..515a898 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,7 +2,7 @@ use crate::{ details::MaybeHumanDate, repo::{ Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, - HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, QueueRepo, + HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, MigrationRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, @@ -74,13 +74,13 @@ pub(crate) struct SledRepo { alias_hashes: Tree, alias_delete_tokens: Tree, queue: Tree, + job_state: Tree, alias_access: Tree, inverse_alias_access: Tree, variant_access: Tree, inverse_variant_access: Tree, proxy: Tree, inverse_proxy: Tree, - in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, migration_identifiers: Tree, @@ -112,13 +112,13 @@ impl SledRepo { alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, queue: db.open_tree("pict-rs-queue-tree")?, + job_state: db.open_tree("pict-rs-job-state-tree")?, alias_access: db.open_tree("pict-rs-alias-access-tree")?, inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, variant_access: db.open_tree("pict-rs-variant-access-tree")?, inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?, proxy: db.open_tree("pict-rs-proxy-tree")?, inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?, - in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, @@ -270,7 +270,7 @@ impl futures_util::Stream for IterStream { std::task::Poll::Ready(None) } } else if let Some(mut iter) = self.iter.take() { - self.next = Some(tokio::task::spawn_blocking(move || { + self.next = Some(actix_rt::task::spawn_blocking(move || { let opt = iter .next() .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); @@ -624,62 +624,68 @@ impl UploadRepo for SledRepo { } } +enum JobState { + Pending, + Running(Vec), +} + +impl JobState { + const fn pending() -> Self { + Self::Pending + } + + fn running() -> Self { + Self::Running( + time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .expect("Can format") + .into_bytes(), + ) + } + + fn as_bytes(&self) -> &[u8] { + match self { + Self::Pending => b"pending", + Self::Running(ref bytes) => bytes, + } + } +} + #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { - let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, { - let vec = in_progress_queue - .scan_prefix(worker_prefix) - .values() - .filter_map(Result::ok) - .filter_map(|ivec| { - let index = ivec.as_ref().iter().enumerate().find_map(|(index, byte)| { - if *byte == 0 { - Some(index) - } else { - None - } - })?; - - let (queue, job) = ivec.split_at(index); - if queue.is_empty() || job.len() <= 1 { - return None; - } - let job = &job[1..]; - - Some((String::from_utf8_lossy(queue).to_string(), IVec::from(job))) - }) - .collect::>(); - - Ok(vec) as Result<_, SledError> - }); - - let db = self.db.clone(); - b!(self.queue, { - for (queue_name, job) in vec { - let id = db.generate_id()?; - let mut key = queue_name.as_bytes().to_vec(); - key.extend(id.to_be_bytes()); - - queue.insert(key, job)?; - } - - Ok(()) as Result<(), SledError> - }); - - Ok(()) + async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError> { + todo!() } #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { let metrics_guard = PushMetricsGuard::guard(queue_name); - let id = self.db.generate_id().map_err(SledError::from)?; + let id = JobId::gen(); let mut key = queue_name.as_bytes().to_vec(); - key.extend(id.to_be_bytes()); + key.push(0); + key.extend(id.as_bytes()); - b!(self.queue, queue.insert(key, job)); + let queue = self.queue.clone(); + let job_state = self.job_state.clone(); + + let res = actix_rt::task::spawn_blocking(move || { + (&queue, &job_state).transaction(|(queue, job_state)| { + let state = JobState::pending(); + + queue.insert(key.as_slice(), &job)?; + job_state.insert(key.as_slice(), state.as_bytes())?; + + Ok(()) + }) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { notifier.notify_one(); @@ -704,40 +710,53 @@ impl QueueRepo for SledRepo { &self, queue_name: &'static str, worker_id: Vec, - ) -> Result { + ) -> Result<(JobId, Self::Bytes), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); loop { - let in_progress_queue = self.in_progress_queue.clone(); + let queue = self.queue.clone(); + let job_state = self.job_state.clone(); - let worker_id = worker_id.clone(); - let job = b!(self.queue, { - in_progress_queue.remove(&worker_id)?; - in_progress_queue.flush()?; + let opt = actix_rt::task::spawn_blocking(move || { + for res in job_state.scan_prefix(queue_name) { + let (key, value) = res?; - while let Some((key, job)) = queue - .scan_prefix(queue_name.as_bytes()) - .find_map(Result::ok) - { - let mut in_progress_value = queue_name.as_bytes().to_vec(); - in_progress_value.push(0); - in_progress_value.extend_from_slice(&job); - - in_progress_queue.insert(&worker_id, in_progress_value)?; - - if queue.remove(key)?.is_some() { - return Ok(Some(job)); + if value != "pending" { + // TODO: requeue dead jobs + continue; } - in_progress_queue.remove(&worker_id)?; + let state = JobState::running(); + + match job_state.compare_and_swap(&key, Some(value), Some(state.as_bytes())) { + Ok(_) => { + // acquired job + } + Err(_) => { + // someone else acquired job + continue; + } + } + + let id_bytes = &key[queue_name.len() + 1..]; + + let id_bytes: [u8; 16] = id_bytes.try_into().expect("Key length"); + + let job_id = JobId::from_bytes(id_bytes); + + let opt = queue.get(&key)?.map(|job_bytes| (job_id, job_bytes)); + + return Ok(opt) as Result, SledError>; } - Ok(None) as Result<_, SledError> - }); + Ok(None) + }) + .await + .map_err(|_| RepoError::Canceled)??; - if let Some(job) = job { + if let Some(tup) = opt { metrics_guard.disarm(); - return Ok(job); + return Ok(tup); } let opt = self @@ -760,6 +779,14 @@ impl QueueRepo for SledRepo { notify.notified().await } } + + async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError> { + todo!() + } + + async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError> { + todo!() + } } #[async_trait::async_trait(?Send)] diff --git a/src/repo_04.rs b/src/repo_04.rs new file mode 100644 index 0000000..384c2dd --- /dev/null +++ b/src/repo_04.rs @@ -0,0 +1,1019 @@ +use crate::{ + config, + details::Details, + store::{Identifier, StoreError}, +}; +use futures_util::Stream; +use std::fmt::Debug; +use url::Url; +use uuid::Uuid; + +pub(crate) mod sled; + +#[derive(Clone, Debug)] +pub(crate) enum Repo { + Sled(self::sled::SledRepo), +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum MaybeUuid { + Uuid(Uuid), + Name(String), +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct Alias { + id: MaybeUuid, + extension: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct DeleteToken { + id: MaybeUuid, +} + +pub(crate) struct HashAlreadyExists; +pub(crate) struct AliasAlreadyExists; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct UploadId { + id: Uuid, +} + +pub(crate) enum UploadResult { + Success { alias: Alias, token: DeleteToken }, + Failure { message: String }, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RepoError { + #[error("Error in sled")] + SledError(#[from] self::sled::SledError), + + #[error("Upload was already claimed")] + AlreadyClaimed, + + #[error("Panic in blocking operation")] + Canceled, +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait FullRepo: + UploadRepo + + SettingsRepo + + IdentifierRepo + + AliasRepo + + QueueRepo + + HashRepo + + MigrationRepo + + AliasAccessRepo + + VariantAccessRepo + + ProxyRepo + + Send + + Sync + + Clone + + Debug +{ + async fn health_check(&self) -> Result<(), RepoError>; + + #[tracing::instrument(skip(self))] + async fn identifier_from_alias( + &self, + alias: &Alias, + ) -> Result, StoreError> { + let Some(hash) = self.hash(alias).await? else { + return Ok(None); + }; + + self.identifier(hash).await + } + + #[tracing::instrument(skip(self))] + async fn aliases_from_alias(&self, alias: &Alias) -> Result, RepoError> { + let Some(hash) = self.hash(alias).await? else { + return Ok(vec![]); + }; + + self.for_hash(hash).await + } + + #[tracing::instrument(skip(self))] + async fn still_identifier_from_alias( + &self, + alias: &Alias, + ) -> Result, StoreError> { + let Some(hash) = self.hash(alias).await? else { + return Ok(None); + }; + + let Some(identifier) = self.identifier::(hash.clone()).await? else { + return Ok(None); + }; + + match self.details(&identifier).await? { + Some(details) if details.is_video() => self.motion_identifier::(hash).await, + Some(_) => Ok(Some(identifier)), + None => Ok(None), + } + } +} + +#[async_trait::async_trait(?Send)] +impl FullRepo for actix_web::web::Data +where + T: FullRepo, +{ + async fn health_check(&self) -> Result<(), RepoError> { + T::health_check(self).await + } +} + +pub(crate) trait BaseRepo { + type Bytes: AsRef<[u8]> + From> + Clone; +} + +impl BaseRepo for actix_web::web::Data +where + T: BaseRepo, +{ + type Bytes = T::Bytes; +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait ProxyRepo: BaseRepo { + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>; + + async fn related(&self, url: Url) -> Result, RepoError>; + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl ProxyRepo for actix_web::web::Data +where + T: ProxyRepo, +{ + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + T::relate_url(self, url, alias).await + } + + async fn related(&self, url: Url) -> Result, RepoError> { + T::related(self, url).await + } + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { + T::remove_relation(self, alias).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait AliasAccessRepo: BaseRepo { + type AliasAccessStream: Stream>; + + async fn accessed(&self, alias: Alias) -> Result<(), RepoError>; + + async fn older_aliases( + &self, + timestamp: time::OffsetDateTime, + ) -> Result; + + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl AliasAccessRepo for actix_web::web::Data +where + T: AliasAccessRepo, +{ + type AliasAccessStream = T::AliasAccessStream; + + async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { + T::accessed(self, alias).await + } + + async fn older_aliases( + &self, + timestamp: time::OffsetDateTime, + ) -> Result { + T::older_aliases(self, timestamp).await + } + + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { + T::remove_access(self, alias).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait VariantAccessRepo: BaseRepo { + type VariantAccessStream: Stream>; + + async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + + async fn contains_variant(&self, hash: Self::Bytes, variant: String) + -> Result; + + async fn older_variants( + &self, + timestamp: time::OffsetDateTime, + ) -> Result; + + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl VariantAccessRepo for actix_web::web::Data +where + T: VariantAccessRepo, +{ + type VariantAccessStream = T::VariantAccessStream; + + async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + T::accessed(self, hash, variant).await + } + + async fn contains_variant( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result { + T::contains_variant(self, hash, variant).await + } + + async fn older_variants( + &self, + timestamp: time::OffsetDateTime, + ) -> Result { + T::older_variants(self, timestamp).await + } + + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + T::remove_access(self, hash, variant).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait UploadRepo: BaseRepo { + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>; + + async fn wait(&self, upload_id: UploadId) -> Result; + + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>; + + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl UploadRepo for actix_web::web::Data +where + T: UploadRepo, +{ + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { + T::create(self, upload_id).await + } + + async fn wait(&self, upload_id: UploadId) -> Result { + T::wait(self, upload_id).await + } + + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { + T::claim(self, upload_id).await + } + + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { + T::complete(self, upload_id, result).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait QueueRepo: BaseRepo { + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError>; + + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>; + + async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; +} + +#[async_trait::async_trait(?Send)] +impl QueueRepo for actix_web::web::Data +where + T: QueueRepo, +{ + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { + T::requeue_in_progress(self, worker_prefix).await + } + + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> { + T::push(self, queue, job).await + } + + async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result { + T::pop(self, queue, worker_id).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait SettingsRepo: BaseRepo { + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>; + async fn get(&self, key: &'static str) -> Result, RepoError>; + async fn remove(&self, key: &'static str) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl SettingsRepo for actix_web::web::Data +where + T: SettingsRepo, +{ + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { + T::set(self, key, value).await + } + + async fn get(&self, key: &'static str) -> Result, RepoError> { + T::get(self, key).await + } + + async fn remove(&self, key: &'static str) -> Result<(), RepoError> { + T::remove(self, key).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait IdentifierRepo: BaseRepo { + async fn relate_details( + &self, + identifier: &I, + details: &Details, + ) -> Result<(), StoreError>; + async fn details(&self, identifier: &I) -> Result, StoreError>; + + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError>; +} + +#[async_trait::async_trait(?Send)] +impl IdentifierRepo for actix_web::web::Data +where + T: IdentifierRepo, +{ + async fn relate_details( + &self, + identifier: &I, + details: &Details, + ) -> Result<(), StoreError> { + T::relate_details(self, identifier, details).await + } + + async fn details(&self, identifier: &I) -> Result, StoreError> { + T::details(self, identifier).await + } + + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { + T::cleanup(self, identifier).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait MigrationRepo: BaseRepo { + async fn is_continuing_migration(&self) -> Result; + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError>; + + async fn is_migrated(&self, identifier: &I) -> Result; + + async fn clear(&self) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl MigrationRepo for actix_web::web::Data +where + T: MigrationRepo, +{ + async fn is_continuing_migration(&self) -> Result { + T::is_continuing_migration(self).await + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + T::mark_migrated(self, old_identifier, new_identifier).await + } + + async fn is_migrated(&self, identifier: &I) -> Result { + T::is_migrated(self, identifier).await + } + + async fn clear(&self) -> Result<(), RepoError> { + T::clear(self).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait HashRepo: BaseRepo { + type Stream: Stream>; + + async fn size(&self) -> Result; + + async fn hashes(&self) -> Self::Stream; + + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError>; + + async fn update_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError>; + + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError>; + + async fn relate_variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + identifier: &I, + ) -> Result<(), StoreError>; + async fn variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result, StoreError>; + async fn variants( + &self, + hash: Self::Bytes, + ) -> Result, StoreError>; + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError>; + async fn motion_identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError>; + + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl HashRepo for actix_web::web::Data +where + T: HashRepo, +{ + type Stream = T::Stream; + + async fn size(&self) -> Result { + T::size(self).await + } + + async fn hashes(&self) -> Self::Stream { + T::hashes(self).await + } + + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError> { + T::create(self, hash, identifier).await + } + + async fn update_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError> { + T::update_identifier(self, hash, identifier).await + } + + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + T::identifier(self, hash).await + } + + async fn relate_variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + identifier: &I, + ) -> Result<(), StoreError> { + T::relate_variant_identifier(self, hash, variant, identifier).await + } + + async fn variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result, StoreError> { + T::variant_identifier(self, hash, variant).await + } + + async fn variants( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + T::variants(self, hash).await + } + + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + T::remove_variant(self, hash, variant).await + } + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError> { + T::relate_motion_identifier(self, hash, identifier).await + } + + async fn motion_identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + T::motion_identifier(self, hash).await + } + + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + T::cleanup(self, hash).await + } +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait AliasRepo: BaseRepo { + async fn create( + &self, + alias: &Alias, + delete_token: &DeleteToken, + hash: Self::Bytes, + ) -> Result, RepoError>; + + async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; + + async fn hash(&self, alias: &Alias) -> Result, RepoError>; + + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; + + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl AliasRepo for actix_web::web::Data +where + T: AliasRepo, +{ + async fn create( + &self, + alias: &Alias, + delete_token: &DeleteToken, + hash: Self::Bytes, + ) -> Result, RepoError> { + T::create(self, alias, delete_token, hash).await + } + + async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { + T::delete_token(self, alias).await + } + + async fn hash(&self, alias: &Alias) -> Result, RepoError> { + T::hash(self, alias).await + } + + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + T::for_hash(self, hash).await + } + + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { + T::cleanup(self, alias).await + } +} + +impl Repo { + #[tracing::instrument] + pub(crate) fn open(config: config::Repo) -> color_eyre::Result { + match config { + config::Repo::Sled(config::Sled { + path, + cache_capacity, + export_path, + }) => { + let repo = self::sled::SledRepo::build(path, cache_capacity, export_path)?; + + Ok(Self::Sled(repo)) + } + } + } +} + +impl MaybeUuid { + fn from_str(s: &str) -> Self { + if let Ok(uuid) = Uuid::parse_str(s) { + MaybeUuid::Uuid(uuid) + } else { + MaybeUuid::Name(s.into()) + } + } + + fn as_bytes(&self) -> &[u8] { + match self { + Self::Uuid(uuid) => &uuid.as_bytes()[..], + Self::Name(name) => name.as_bytes(), + } + } +} + +fn split_at_dot(s: &str) -> Option<(&str, &str)> { + let index = s.find('.')?; + + Some(s.split_at(index)) +} + +impl Alias { + pub(crate) fn generate(extension: String) -> Self { + Alias { + id: MaybeUuid::Uuid(Uuid::new_v4()), + extension: Some(extension), + } + } + + pub(crate) fn from_existing(alias: &str) -> Self { + if let Some((start, end)) = split_at_dot(alias) { + Alias { + id: MaybeUuid::from_str(start), + extension: Some(end.into()), + } + } else { + Alias { + id: MaybeUuid::from_str(alias), + extension: None, + } + } + } + + pub(crate) fn extension(&self) -> Option<&str> { + self.extension.as_deref() + } + + pub(crate) fn to_bytes(&self) -> Vec { + let mut v = self.id.as_bytes().to_vec(); + + if let Some(ext) = self.extension() { + v.extend_from_slice(ext.as_bytes()); + } + + v + } + + pub(crate) fn from_slice(bytes: &[u8]) -> Option { + if let Ok(s) = std::str::from_utf8(bytes) { + Some(Self::from_existing(s)) + } else if bytes.len() >= 16 { + let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); + + let extension = if bytes.len() > 16 { + Some(String::from_utf8_lossy(&bytes[16..]).to_string()) + } else { + None + }; + + Some(Self { + id: MaybeUuid::Uuid(id), + extension, + }) + } else { + None + } + } +} + +impl DeleteToken { + pub(crate) fn from_existing(existing: &str) -> Self { + if let Ok(uuid) = Uuid::parse_str(existing) { + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + } else { + DeleteToken { + id: MaybeUuid::Name(existing.into()), + } + } + } + + pub(crate) fn generate() -> Self { + Self { + id: MaybeUuid::Uuid(Uuid::new_v4()), + } + } + + fn to_bytes(&self) -> Vec { + self.id.as_bytes().to_vec() + } + + fn from_slice(bytes: &[u8]) -> Option { + if let Ok(s) = std::str::from_utf8(bytes) { + Some(DeleteToken::from_existing(s)) + } else if bytes.len() == 16 { + Some(DeleteToken { + id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?), + }) + } else { + None + } + } +} + +impl UploadId { + pub(crate) fn generate() -> Self { + Self { id: Uuid::new_v4() } + } + + pub(crate) fn as_bytes(&self) -> &[u8] { + &self.id.as_bytes()[..] + } +} + +impl std::str::FromStr for UploadId { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + Ok(UploadId { id: s.parse()? }) + } +} + +impl std::fmt::Display for UploadId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.id, f) + } +} + +impl std::fmt::Display for MaybeUuid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Uuid(id) => write!(f, "{id}"), + Self::Name(name) => write!(f, "{name}"), + } + } +} + +impl std::str::FromStr for DeleteToken { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(DeleteToken::from_existing(s)) + } +} + +impl std::fmt::Display for DeleteToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} + +impl std::str::FromStr for Alias { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(Alias::from_existing(s)) + } +} + +impl std::fmt::Display for Alias { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ext) = self.extension() { + write!(f, "{}{ext}", self.id) + } else { + write!(f, "{}", self.id) + } + } +} + +#[cfg(test)] +mod tests { + use super::{Alias, DeleteToken, MaybeUuid, Uuid}; + + #[test] + fn string_delete_token() { + let delete_token = DeleteToken::from_existing("blah"); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) + } + + #[test] + fn uuid_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_existing(&uuid.to_string()); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn bytes_delete_token() { + let delete_token = DeleteToken::from_slice(b"blah").unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) + } + + #[test] + fn uuid_bytes_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn uuid_bytes_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn string_alias() { + let alias = Alias::from_existing("blah"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn string_alias_ext() { + let alias = Alias::from_existing("blah.mp4"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_existing(&uuid.to_string()); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{uuid}.mp4"); + let alias = Alias::from_existing(&alias_str); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn bytes_alias() { + let alias = Alias::from_slice(b"blah").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn bytes_alias_ext() { + let alias = Alias::from_slice(b"blah.mp4").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_bytes_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_alias_ext() { + let uuid = Uuid::new_v4(); + + let mut alias_bytes = uuid.as_bytes().to_vec(); + alias_bytes.extend_from_slice(b".mp4"); + + let alias = Alias::from_slice(&alias_bytes).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn uuid_bytes_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{uuid}.mp4"); + let alias = Alias::from_slice(alias_str.as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } +} diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs new file mode 100644 index 0000000..64cc817 --- /dev/null +++ b/src/repo_04/sled.rs @@ -0,0 +1,1334 @@ +use crate::{ + details::MaybeHumanDate, + repo_04::{ + Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, + HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, QueueRepo, + SettingsRepo, UploadId, UploadRepo, UploadResult, + }, + serde_str::Serde, + store::StoreError, + stream::from_iterator, +}; +use futures_util::{Future, Stream}; +use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; +use std::{ + collections::HashMap, + path::PathBuf, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, + time::Instant, +}; +use tokio::{sync::Notify, task::JoinHandle}; +use url::Url; + +use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo}; + +macro_rules! b { + ($self:ident.$ident:ident, $expr:expr) => {{ + let $ident = $self.$ident.clone(); + + let span = tracing::Span::current(); + + actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr)) + .await + .map_err(SledError::from) + .map_err(RepoError::from)? + .map_err(SledError::from) + .map_err(RepoError::from)? + }}; +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum SledError { + #[error("Error in database")] + Sled(#[from] sled::Error), + + #[error("Invalid details json")] + Details(#[from] serde_json::Error), + + #[error("Error formatting timestamp")] + Format(#[source] time::error::Format), + + #[error("Error parsing variant key")] + VariantKey(#[from] VariantKeyError), + + #[error("Operation panicked")] + Panic, +} + +#[derive(Clone)] +pub(crate) struct SledRepo { + healthz_count: Arc, + healthz: Tree, + settings: Tree, + identifier_details: Tree, + hashes: Tree, + hash_aliases: Tree, + hash_identifiers: Tree, + hash_variant_identifiers: Tree, + hash_motion_identifiers: Tree, + aliases: Tree, + alias_hashes: Tree, + alias_delete_tokens: Tree, + queue: Tree, + alias_access: Tree, + inverse_alias_access: Tree, + variant_access: Tree, + inverse_variant_access: Tree, + proxy: Tree, + inverse_proxy: Tree, + in_progress_queue: Tree, + queue_notifier: Arc>>>, + uploads: Tree, + migration_identifiers: Tree, + cache_capacity: u64, + export_path: PathBuf, + db: Db, +} + +impl SledRepo { + #[tracing::instrument] + pub(crate) fn build( + path: PathBuf, + cache_capacity: u64, + export_path: PathBuf, + ) -> color_eyre::Result { + let db = Self::open(path, cache_capacity)?; + + Ok(SledRepo { + healthz_count: Arc::new(AtomicU64::new(0)), + healthz: db.open_tree("pict-rs-healthz-tree")?, + settings: db.open_tree("pict-rs-settings-tree")?, + identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, + hashes: db.open_tree("pict-rs-hashes-tree")?, + hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?, + hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, + hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?, + hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?, + aliases: db.open_tree("pict-rs-aliases-tree")?, + alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, + alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, + queue: db.open_tree("pict-rs-queue-tree")?, + alias_access: db.open_tree("pict-rs-alias-access-tree")?, + inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, + variant_access: db.open_tree("pict-rs-variant-access-tree")?, + inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?, + proxy: db.open_tree("pict-rs-proxy-tree")?, + inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?, + in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, + queue_notifier: Arc::new(RwLock::new(HashMap::new())), + uploads: db.open_tree("pict-rs-uploads-tree")?, + migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, + cache_capacity, + export_path, + db, + }) + } + + fn open(mut path: PathBuf, cache_capacity: u64) -> Result { + path.push("v0.4.0-alpha.1"); + + let db = ::sled::Config::new() + .cache_capacity(cache_capacity) + .path(path) + .open()?; + + Ok(db) + } + + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) async fn mark_accessed(&self) -> Result<(), StoreError> { + use futures_util::StreamExt; + + let mut stream = self.hashes().await; + + while let Some(res) = stream.next().await { + let hash = res?; + + for (variant, _) in self.variants::(hash.clone()).await? { + if !self.contains_variant(hash.clone(), variant.clone()).await? { + VariantAccessRepo::accessed(self, hash.clone(), variant).await?; + } + } + } + + Ok(()) + } + + #[tracing::instrument(level = "warn", skip_all)] + pub(crate) async fn export(&self) -> Result<(), RepoError> { + let path = self + .export_path + .join(MaybeHumanDate::HumanDate(time::OffsetDateTime::now_utc()).to_string()); + + let export_db = Self::open(path, self.cache_capacity)?; + + let this = self.db.clone(); + + actix_rt::task::spawn_blocking(move || { + let export = this.export(); + export_db.import(export); + }) + .await + .map_err(SledError::from)?; + + Ok(()) + } +} + +impl BaseRepo for SledRepo { + type Bytes = IVec; +} + +#[async_trait::async_trait(?Send)] +impl FullRepo for SledRepo { + async fn health_check(&self) -> Result<(), RepoError> { + let next = self.healthz_count.fetch_add(1, Ordering::Relaxed); + b!(self.healthz, { + healthz.insert("healthz", &next.to_be_bytes()[..]) + }); + self.healthz.flush_async().await.map_err(SledError::from)?; + b!(self.healthz, healthz.get("healthz")); + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl ProxyRepo for SledRepo { + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + let proxy = self.proxy.clone(); + let inverse_proxy = self.inverse_proxy.clone(); + + actix_web::web::block(move || { + proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?; + inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; + + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)??; + + Ok(()) + } + + async fn related(&self, url: Url) -> Result, RepoError> { + let opt = b!(self.proxy, proxy.get(url.as_str().as_bytes())); + + Ok(opt.and_then(|ivec| Alias::from_slice(&ivec))) + } + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { + let proxy = self.proxy.clone(); + let inverse_proxy = self.inverse_proxy.clone(); + + actix_web::web::block(move || { + if let Some(url) = inverse_proxy.remove(alias.to_bytes())? { + proxy.remove(url)?; + } + + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)??; + + Ok(()) + } +} + +type IterValue = Option<(sled::Iter, Result)>; + +pub(crate) struct IterStream { + iter: Option, + next: Option>, +} + +impl futures_util::Stream for IterStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Some(ref mut next) = self.next { + let res = std::task::ready!(Pin::new(next).poll(cx)); + + self.next.take(); + + let opt = match res { + Ok(opt) => opt, + Err(_) => return std::task::Poll::Ready(Some(Err(RepoError::Canceled))), + }; + + if let Some((iter, res)) = opt { + self.iter = Some(iter); + + std::task::Poll::Ready(Some(res)) + } else { + std::task::Poll::Ready(None) + } + } else if let Some(mut iter) = self.iter.take() { + self.next = Some(tokio::task::spawn_blocking(move || { + let opt = iter + .next() + .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); + + opt.map(|res| (iter, res.map(|(_, value)| value))) + })); + self.poll_next(cx) + } else { + std::task::Poll::Ready(None) + } + } +} + +pub(crate) struct AliasAccessStream { + iter: IterStream, +} + +pub(crate) struct VariantAccessStream { + iter: IterStream, +} + +impl futures_util::Stream for AliasAccessStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { + Some(Ok(bytes)) => { + if let Some(alias) = Alias::from_slice(&bytes) { + std::task::Poll::Ready(Some(Ok(alias))) + } else { + self.poll_next(cx) + } + } + Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), + None => std::task::Poll::Ready(None), + } + } +} + +impl futures_util::Stream for VariantAccessStream { + type Item = Result<(IVec, String), RepoError>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { + Some(Ok(bytes)) => std::task::Poll::Ready(Some( + parse_variant_access_key(bytes) + .map_err(SledError::from) + .map_err(RepoError::from), + )), + Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), + None => std::task::Poll::Ready(None), + } + } +} + +#[async_trait::async_trait(?Send)] +impl AliasAccessRepo for SledRepo { + type AliasAccessStream = AliasAccessStream; + + #[tracing::instrument(level = "debug", skip(self))] + async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { + let now_string = time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .map_err(SledError::Format)?; + + let alias_access = self.alias_access.clone(); + let inverse_alias_access = self.inverse_alias_access.clone(); + + actix_rt::task::spawn_blocking(move || { + if let Some(old) = alias_access.insert(alias.to_bytes(), now_string.as_bytes())? { + inverse_alias_access.remove(old)?; + } + inverse_alias_access.insert(now_string, alias.to_bytes())?; + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(RepoError::from) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn older_aliases( + &self, + timestamp: time::OffsetDateTime, + ) -> Result { + let time_string = timestamp + .format(&time::format_description::well_known::Rfc3339) + .map_err(SledError::Format)?; + + let inverse_alias_access = self.inverse_alias_access.clone(); + + let iter = + actix_rt::task::spawn_blocking(move || inverse_alias_access.range(..=time_string)) + .await + .map_err(|_| RepoError::Canceled)?; + + Ok(AliasAccessStream { + iter: IterStream { + iter: Some(iter), + next: None, + }, + }) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { + let alias_access = self.alias_access.clone(); + let inverse_alias_access = self.inverse_alias_access.clone(); + + actix_rt::task::spawn_blocking(move || { + if let Some(old) = alias_access.remove(alias.to_bytes())? { + inverse_alias_access.remove(old)?; + } + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(RepoError::from) + } +} + +#[async_trait::async_trait(?Send)] +impl VariantAccessRepo for SledRepo { + type VariantAccessStream = VariantAccessStream; + + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] + async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + let key = variant_access_key(&hash, &variant); + + let now_string = time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .map_err(SledError::Format)?; + + let variant_access = self.variant_access.clone(); + let inverse_variant_access = self.inverse_variant_access.clone(); + + actix_rt::task::spawn_blocking(move || { + if let Some(old) = variant_access.insert(&key, now_string.as_bytes())? { + inverse_variant_access.remove(old)?; + } + inverse_variant_access.insert(now_string, key)?; + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(RepoError::from) + } + + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] + async fn contains_variant( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result { + let key = variant_access_key(&hash, &variant); + + let timestamp = b!(self.variant_access, variant_access.get(key)); + + Ok(timestamp.is_some()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn older_variants( + &self, + timestamp: time::OffsetDateTime, + ) -> Result { + let time_string = timestamp + .format(&time::format_description::well_known::Rfc3339) + .map_err(SledError::Format)?; + + let inverse_variant_access = self.inverse_variant_access.clone(); + + let iter = + actix_rt::task::spawn_blocking(move || inverse_variant_access.range(..=time_string)) + .await + .map_err(|_| RepoError::Canceled)?; + + Ok(VariantAccessStream { + iter: IterStream { + iter: Some(iter), + next: None, + }, + }) + } + + #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] + async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + let key = variant_access_key(&hash, &variant); + + let variant_access = self.variant_access.clone(); + let inverse_variant_access = self.inverse_variant_access.clone(); + + actix_rt::task::spawn_blocking(move || { + if let Some(old) = variant_access.remove(key)? { + inverse_variant_access.remove(old)?; + } + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(RepoError::from) + } +} + +#[derive(serde::Deserialize, serde::Serialize)] +enum InnerUploadResult { + Success { + alias: Serde, + token: Serde, + }, + Failure { + message: String, + }, +} + +impl From for InnerUploadResult { + fn from(u: UploadResult) -> Self { + match u { + UploadResult::Success { alias, token } => InnerUploadResult::Success { + alias: Serde::new(alias), + token: Serde::new(token), + }, + UploadResult::Failure { message } => InnerUploadResult::Failure { message }, + } + } +} + +impl From for UploadResult { + fn from(i: InnerUploadResult) -> Self { + match i { + InnerUploadResult::Success { alias, token } => UploadResult::Success { + alias: Serde::into_inner(alias), + token: Serde::into_inner(token), + }, + InnerUploadResult::Failure { message } => UploadResult::Failure { message }, + } + } +} + +struct PushMetricsGuard { + queue: &'static str, + armed: bool, +} + +struct PopMetricsGuard { + queue: &'static str, + start: Instant, + armed: bool, +} + +impl PushMetricsGuard { + fn guard(queue: &'static str) -> Self { + Self { queue, armed: true } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl PopMetricsGuard { + fn guard(queue: &'static str) -> Self { + Self { + queue, + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for PushMetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + +impl Drop for PopMetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue); + metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + +#[async_trait::async_trait(?Send)] +impl UploadRepo for SledRepo { + #[tracing::instrument(level = "trace", skip(self))] + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { + b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn wait(&self, upload_id: UploadId) -> Result { + let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); + + let bytes = upload_id.as_bytes().to_vec(); + let opt = b!(self.uploads, uploads.get(bytes)); + + if let Some(bytes) = opt { + if bytes != b"1" { + let result: InnerUploadResult = + serde_json::from_slice(&bytes).map_err(SledError::from)?; + return Ok(result.into()); + } + } else { + return Err(RepoError::AlreadyClaimed); + } + + while let Some(event) = (&mut subscriber).await { + match event { + sled::Event::Remove { .. } => { + return Err(RepoError::AlreadyClaimed); + } + sled::Event::Insert { value, .. } => { + if value != b"1" { + let result: InnerUploadResult = + serde_json::from_slice(&value).map_err(SledError::from)?; + return Ok(result.into()); + } + } + } + } + + Err(RepoError::Canceled) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { + b!(self.uploads, uploads.remove(upload_id.as_bytes())); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, result))] + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { + let result: InnerUploadResult = result.into(); + let result = serde_json::to_vec(&result).map_err(SledError::from)?; + + b!(self.uploads, uploads.insert(upload_id.as_bytes(), result)); + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl QueueRepo for SledRepo { + #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { + let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, { + let vec = in_progress_queue + .scan_prefix(worker_prefix) + .values() + .filter_map(Result::ok) + .filter_map(|ivec| { + let index = ivec.as_ref().iter().enumerate().find_map(|(index, byte)| { + if *byte == 0 { + Some(index) + } else { + None + } + })?; + + let (queue, job) = ivec.split_at(index); + if queue.is_empty() || job.len() <= 1 { + return None; + } + let job = &job[1..]; + + Some((String::from_utf8_lossy(queue).to_string(), IVec::from(job))) + }) + .collect::>(); + + Ok(vec) as Result<_, SledError> + }); + + let db = self.db.clone(); + b!(self.queue, { + for (queue_name, job) in vec { + let id = db.generate_id()?; + let mut key = queue_name.as_bytes().to_vec(); + key.extend(id.to_be_bytes()); + + queue.insert(key, job)?; + } + + Ok(()) as Result<(), SledError> + }); + + Ok(()) + } + + #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] + async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { + let metrics_guard = PushMetricsGuard::guard(queue_name); + + let id = self.db.generate_id().map_err(SledError::from)?; + let mut key = queue_name.as_bytes().to_vec(); + key.extend(id.to_be_bytes()); + + b!(self.queue, queue.insert(key, job)); + + if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { + notifier.notify_one(); + metrics_guard.disarm(); + return Ok(()); + } + + self.queue_notifier + .write() + .unwrap() + .entry(queue_name) + .or_insert_with(|| Arc::new(Notify::new())) + .notify_one(); + + metrics_guard.disarm(); + + Ok(()) + } + + #[tracing::instrument(skip(self, worker_id), fields(worker_id = %String::from_utf8_lossy(&worker_id)))] + async fn pop( + &self, + queue_name: &'static str, + worker_id: Vec, + ) -> Result { + let metrics_guard = PopMetricsGuard::guard(queue_name); + + loop { + let in_progress_queue = self.in_progress_queue.clone(); + + let worker_id = worker_id.clone(); + let job = b!(self.queue, { + in_progress_queue.remove(&worker_id)?; + in_progress_queue.flush()?; + + while let Some((key, job)) = queue + .scan_prefix(queue_name.as_bytes()) + .find_map(Result::ok) + { + let mut in_progress_value = queue_name.as_bytes().to_vec(); + in_progress_value.push(0); + in_progress_value.extend_from_slice(&job); + + in_progress_queue.insert(&worker_id, in_progress_value)?; + + if queue.remove(key)?.is_some() { + return Ok(Some(job)); + } + + in_progress_queue.remove(&worker_id)?; + } + + Ok(None) as Result<_, SledError> + }); + + if let Some(job) = job { + metrics_guard.disarm(); + return Ok(job); + } + + let opt = self + .queue_notifier + .read() + .unwrap() + .get(&queue_name) + .map(Arc::clone); + + let notify = if let Some(notify) = opt { + notify + } else { + let mut guard = self.queue_notifier.write().unwrap(); + let entry = guard + .entry(queue_name) + .or_insert_with(|| Arc::new(Notify::new())); + Arc::clone(entry) + }; + + notify.notified().await + } + } +} + +#[async_trait::async_trait(?Send)] +impl SettingsRepo for SledRepo { + #[tracing::instrument(level = "trace", skip(value))] + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { + b!(self.settings, settings.insert(key, value)); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get(&self, key: &'static str) -> Result, RepoError> { + let opt = b!(self.settings, settings.get(key)); + + Ok(opt) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn remove(&self, key: &'static str) -> Result<(), RepoError> { + b!(self.settings, settings.remove(key)); + + Ok(()) + } +} + +fn variant_access_key(hash: &[u8], variant: &str) -> Vec { + let variant = variant.as_bytes(); + + let hash_len: u64 = u64::try_from(hash.len()).expect("Length is reasonable"); + + let mut out = Vec::with_capacity(8 + hash.len() + variant.len()); + + let hash_length_bytes: [u8; 8] = hash_len.to_be_bytes(); + out.extend(hash_length_bytes); + out.extend(hash); + out.extend(variant); + out +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum VariantKeyError { + #[error("Bytes too short to be VariantAccessKey")] + TooShort, + + #[error("Prefix Length is longer than backing bytes")] + InvalidLength, + + #[error("Invalid utf8 in Variant")] + Utf8, +} + +fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyError> { + if bytes.len() < 8 { + return Err(VariantKeyError::TooShort); + } + + let hash_len = u64::from_be_bytes(bytes[..8].try_into().expect("Verified length")); + let hash_len: usize = usize::try_from(hash_len).expect("Length is reasonable"); + + if (hash_len + 8) > bytes.len() { + return Err(VariantKeyError::InvalidLength); + } + + let hash = bytes.subslice(8, hash_len); + + let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len); + + if variant_len == 0 { + return Ok((hash, String::new())); + } + + let variant_start = 8 + hash_len; + + let variant = std::str::from_utf8(&bytes[variant_start..]) + .map_err(|_| VariantKeyError::Utf8)? + .to_string(); + + Ok((hash, variant)) +} + +fn variant_key(hash: &[u8], variant: &str) -> Vec { + let mut bytes = hash.to_vec(); + bytes.push(b'/'); + bytes.extend_from_slice(variant.as_bytes()); + bytes +} + +fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { + let prefix_len = hash.len() + 1; + let variant_bytes = key.get(prefix_len..)?.to_vec(); + String::from_utf8(variant_bytes).ok() +} + +#[async_trait::async_trait(?Send)] +impl IdentifierRepo for SledRepo { + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + async fn relate_details( + &self, + identifier: &I, + details: &Details, + ) -> Result<(), StoreError> { + let key = identifier.to_bytes()?; + let details = serde_json::to_vec(&details) + .map_err(SledError::from) + .map_err(RepoError::from)?; + + b!( + self.identifier_details, + identifier_details.insert(key, details) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + async fn details(&self, identifier: &I) -> Result, StoreError> { + let key = identifier.to_bytes()?; + + let opt = b!(self.identifier_details, identifier_details.get(key)); + + opt.map(|ivec| serde_json::from_slice(&ivec)) + .transpose() + .map_err(SledError::from) + .map_err(RepoError::from) + .map_err(StoreError::from) + } + + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { + let key = identifier.to_bytes()?; + + b!(self.identifier_details, identifier_details.remove(key)); + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl MigrationRepo for SledRepo { + async fn is_continuing_migration(&self) -> Result { + Ok(!self.migration_identifiers.is_empty()) + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + let key = new_identifier.to_bytes()?; + let value = old_identifier.to_bytes()?; + + b!( + self.migration_identifiers, + migration_identifiers.insert(key, value) + ); + + Ok(()) + } + + async fn is_migrated(&self, identifier: &I) -> Result { + let key = identifier.to_bytes()?; + + Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) + } + + async fn clear(&self) -> Result<(), RepoError> { + b!(self.migration_identifiers, migration_identifiers.clear()); + + Ok(()) + } +} + +type StreamItem = Result; +type LocalBoxStream<'a, T> = Pin + 'a>>; + +#[async_trait::async_trait(?Send)] +impl HashRepo for SledRepo { + type Stream = LocalBoxStream<'static, StreamItem>; + + async fn size(&self) -> Result { + Ok(b!( + self.hashes, + Ok(u64::try_from(hashes.len()).expect("Length is reasonable")) + as Result + )) + } + + async fn hashes(&self) -> Self::Stream { + let iter = self + .hashes + .iter() + .keys() + .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); + + Box::pin(from_iterator(iter, 8)) + } + + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn create( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result, StoreError> { + let identifier: sled::IVec = identifier.to_bytes()?.into(); + + let hashes = self.hashes.clone(); + let hash_identifiers = self.hash_identifiers.clone(); + + let res = actix_web::web::block(move || { + (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { + if hashes.get(&hash)?.is_some() { + return Ok(Err(HashAlreadyExists)); + } + + hashes.insert(&hash, &hash)?; + hash_identifiers.insert(&hash, &identifier)?; + + Ok(Ok(())) + }) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(res) => Ok(res), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(StoreError::from(RepoError::from(SledError::from(e)))) + } + } + } + + async fn update_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError> { + let identifier = identifier.to_bytes()?; + + b!( + self.hash_identifiers, + hash_identifiers.insert(hash, identifier) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { + return Ok(None); + }; + + Ok(Some(I::from_bytes(ivec.to_vec())?)) + } + + #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + async fn relate_variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + identifier: &I, + ) -> Result<(), StoreError> { + let key = variant_key(&hash, &variant); + let value = identifier.to_bytes()?; + + b!( + self.hash_variant_identifiers, + hash_variant_identifiers.insert(key, value) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result, StoreError> { + let key = variant_key(&hash, &variant); + + let opt = b!( + self.hash_variant_identifiers, + hash_variant_identifiers.get(key) + ); + + opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() + } + + #[tracing::instrument(level = "debug", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn variants( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + let vec = b!( + self.hash_variant_identifiers, + Ok(hash_variant_identifiers + .scan_prefix(&hash) + .filter_map(|res| res.ok()) + .filter_map(|(key, ivec)| { + let identifier = I::from_bytes(ivec.to_vec()).ok(); + if identifier.is_none() { + tracing::warn!( + "Skipping an identifier: {}", + String::from_utf8_lossy(&ivec) + ); + } + + let variant = variant_from_key(&hash, &key); + if variant.is_none() { + tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); + } + + Some((variant?, identifier?)) + }) + .collect::>()) as Result, SledError> + ); + + Ok(vec) + } + + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + let key = variant_key(&hash, &variant); + + b!( + self.hash_variant_identifiers, + hash_variant_identifiers.remove(key) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), StoreError> { + let bytes = identifier.to_bytes()?; + + b!( + self.hash_motion_identifiers, + hash_motion_identifiers.insert(hash, bytes) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn motion_identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError> { + let opt = b!( + self.hash_motion_identifiers, + hash_motion_identifiers.get(hash) + ); + + opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() + } + + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + let hashes = self.hashes.clone(); + let hash_identifiers = self.hash_identifiers.clone(); + let hash_motion_identifiers = self.hash_motion_identifiers.clone(); + let hash_variant_identifiers = self.hash_variant_identifiers.clone(); + + let hash2 = hash.clone(); + let variant_keys = b!(self.hash_variant_identifiers, { + let v = hash_variant_identifiers + .scan_prefix(hash2) + .keys() + .filter_map(Result::ok) + .collect::>(); + + Ok(v) as Result, SledError> + }); + + let res = actix_web::web::block(move || { + ( + &hashes, + &hash_identifiers, + &hash_motion_identifiers, + &hash_variant_identifiers, + ) + .transaction( + |( + hashes, + hash_identifiers, + hash_motion_identifiers, + hash_variant_identifiers, + )| { + hashes.remove(&hash)?; + hash_identifiers.remove(&hash)?; + hash_motion_identifiers.remove(&hash)?; + + for key in &variant_keys { + hash_variant_identifiers.remove(key)?; + } + + Ok(()) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) + } + } + } +} + +fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec { + let mut v = hash.to_vec(); + v.extend_from_slice(alias); + v +} + +#[async_trait::async_trait(?Send)] +impl AliasRepo for SledRepo { + #[tracing::instrument(level = "trace", skip(self))] + async fn create( + &self, + alias: &Alias, + delete_token: &DeleteToken, + hash: Self::Bytes, + ) -> Result, RepoError> { + let alias: sled::IVec = alias.to_bytes().into(); + let delete_token: sled::IVec = delete_token.to_bytes().into(); + + let aliases = self.aliases.clone(); + let alias_hashes = self.alias_hashes.clone(); + let hash_aliases = self.hash_aliases.clone(); + let alias_delete_tokens = self.alias_delete_tokens.clone(); + + let res = actix_web::web::block(move || { + (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( + |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { + if aliases.get(&alias)?.is_some() { + return Ok(Err(AliasAlreadyExists)); + } + + aliases.insert(&alias, &alias)?; + alias_hashes.insert(&alias, &hash)?; + + hash_aliases.insert(hash_alias_key(&hash, &alias), &alias)?; + alias_delete_tokens.insert(&alias, &delete_token)?; + + Ok(Ok(())) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(res) => Ok(res), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) + } + } + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { + let key = alias.to_bytes(); + + let Some(ivec) = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)) else { + return Ok(None); + }; + + let Some(token) = DeleteToken::from_slice(&ivec) else { + return Ok(None); + }; + + Ok(Some(token)) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn hash(&self, alias: &Alias) -> Result, RepoError> { + let key = alias.to_bytes(); + + let opt = b!(self.alias_hashes, alias_hashes.get(key)); + + Ok(opt) + } + + #[tracing::instrument(skip_all)] + async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + let v = b!(self.hash_aliases, { + Ok(hash_aliases + .scan_prefix(hash) + .values() + .filter_map(Result::ok) + .filter_map(|ivec| Alias::from_slice(&ivec)) + .collect::>()) as Result<_, sled::Error> + }); + + Ok(v) + } + + #[tracing::instrument(skip(self))] + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { + let alias: IVec = alias.to_bytes().into(); + + let aliases = self.aliases.clone(); + let alias_hashes = self.alias_hashes.clone(); + let hash_aliases = self.hash_aliases.clone(); + let alias_delete_tokens = self.alias_delete_tokens.clone(); + + let res = actix_web::web::block(move || { + (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( + |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { + aliases.remove(&alias)?; + if let Some(hash) = alias_hashes.remove(&alias)? { + hash_aliases.remove(hash_alias_key(&hash, &alias))?; + } + alias_delete_tokens.remove(&alias)?; + Ok(()) + }, + ) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(e)) | Err(TransactionError::Storage(e)) => { + Err(SledError::from(e).into()) + } + } + } +} + +impl std::fmt::Debug for SledRepo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SledRepo").finish() + } +} + +impl From for SledError { + fn from(_: actix_rt::task::JoinError) -> Self { + SledError::Panic + } +} + +#[cfg(test)] +mod tests { + #[test] + fn round_trip() { + let hash = sled::IVec::from(b"some hash value"); + let variant = String::from("some string value"); + + let key = super::variant_access_key(&hash, &variant); + + let (out_hash, out_variant) = + super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes"); + + assert_eq!(out_hash, hash); + assert_eq!(out_variant, variant); + } +} diff --git a/src/store.rs b/src/store.rs index 6316ff8..988535b 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,4 +1,5 @@ use actix_web::web::Bytes; +use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::stream::Stream; use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncWrite}; @@ -17,6 +18,9 @@ pub(crate) enum StoreError { #[error("Error in DB")] Repo(#[from] crate::repo::RepoError), + #[error("Error in 0.4 DB")] + Repo04(#[from] crate::repo_04::RepoError), + #[error("Requested file is not found")] FileNotFound(#[source] std::io::Error), @@ -265,3 +269,20 @@ where T::remove(self, identifier).await } } + +impl Identifier for Vec { + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized, + { + Ok(bytes) + } + + fn to_bytes(&self) -> Result, StoreError> { + Ok(self.clone()) + } + + fn string_repr(&self) -> String { + BASE64_STANDARD.encode(self.as_slice()) + } +}