diff --git a/defaults.toml b/defaults.toml index 243ae0e..86004da 100644 --- a/defaults.toml +++ b/defaults.toml @@ -20,8 +20,10 @@ targets = "info" [metrics] -[old_db] -path = "/mnt" +[old_repo] +path = "/mnt/sled-repo" +cache_capacity = 67108864 +export_path = "/mnt/exports" [media] max_file_size = 40 diff --git a/pict-rs.toml b/pict-rs.toml index 17e7862..12b0682 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -128,12 +128,17 @@ targets = 'info' prometheus_address = "0.0.0.0:9000" -## Configuration for migrating from pict-rs 0.2 -[old_db] -## Optional: path to old pict-rs directory -# environment variable: PICTRS__OLD_DB__PATH -# default: /mnt -path = '/mnt' +## Configuration for migrating from pict-rs 0.4 +[old_repo] +## Optional: path to sled repository +# environment variable: PICTRS__OLD_REPO__PATH +# default: /mnt/sled-repo +path = '/mnt/sled-repo' + +## Optional: in-memory cache capacity for sled data (in bytes) +# environment variable: PICTRS__OLD_REPO__CACHE_CAPACITY +# default: 67,108,864 (1024 * 1024 * 64, or 64MB) +cache_capacity = 67108864 ## Media Processing Configuration diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 7a8b9ec..46577f0 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -1,6 +1,6 @@ use crate::{ error::Error, - repo::{FullRepo, UploadId, UploadRepo}, + repo::{ArcRepo, UploadId, UploadRepo}, store::Store, }; use actix_web::web::Bytes; @@ -8,19 +8,17 @@ use futures_util::{Stream, TryStreamExt}; use mime::APPLICATION_OCTET_STREAM; use tracing::{Instrument, Span}; -pub(crate) struct Backgrounded +pub(crate) struct Backgrounded where - R: FullRepo + 'static, S: Store, { - repo: R, + repo: ArcRepo, identifier: Option, upload_id: Option, } -impl Backgrounded +impl Backgrounded where - R: FullRepo + 'static, S: Store, { pub(crate) fn disarm(mut self) { @@ -36,7 +34,7 @@ where self.identifier.as_ref() } - pub(crate) async fn proxy

(repo: R, store: S, stream: P) -> Result + pub(crate) async fn proxy

(repo: ArcRepo, store: S, stream: P) -> Result where P: Stream> + Unpin + 'static, { @@ -55,7 +53,11 @@ where where P: Stream> + Unpin + 'static, { - UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?; + UploadRepo::create( + self.repo.as_ref(), + self.upload_id.expect("Upload id exists"), + ) + .await?; let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); @@ -68,9 +70,8 @@ where } } -impl Drop for Backgrounded +impl Drop for Backgrounded where - R: FullRepo + 'static, S: Store, { fn drop(&mut self) { diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 9322fcd..7017060 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -13,7 +13,8 @@ impl Args { pub(super) fn into_output(self) -> Output { let Args { config_file, - old_db_path, + old_repo_path, + old_repo_cache_capacity, log_format, log_targets, console_address, @@ -25,7 +26,11 @@ impl Args { command, } = self; - let old_db = OldDb { path: old_db_path }; + let old_repo = OldSled { + path: old_repo_path, + cache_capacity: old_repo_cache_capacity, + } + .set(); let tracing = Tracing { logging: Logging { @@ -193,7 +198,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -211,7 +216,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -227,7 +232,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -255,7 +260,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -275,7 +280,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -299,7 +304,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -322,7 +327,7 @@ impl Args { config_format: ConfigFormat { server, client, - old_db, + old_repo, tracing, metrics, media, @@ -368,7 +373,8 @@ pub(crate) enum Operation { pub(super) struct ConfigFormat { server: Server, client: Client, - old_db: OldDb, + #[serde(skip_serializing_if = "Option::is_none")] + old_repo: Option, tracing: Tracing, metrics: Metrics, media: Media, @@ -444,13 +450,6 @@ struct Metrics { prometheus_address: Option, } -#[derive(Debug, Default, serde::Serialize)] -#[serde(rename_all = "snake_case")] -struct OldDb { - #[serde(skip_serializing_if = "Option::is_none")] - path: Option, -} - #[derive(Debug, Default, serde::Serialize)] #[serde(rename_all = "snake_case")] struct Media { @@ -706,7 +705,11 @@ pub(super) struct Args { /// Path to the old pict-rs sled database #[arg(long)] - old_db_path: Option, + old_repo_path: Option, + + /// The cache capacity, in bytes, allowed to sled for in-memory operations + #[arg(long)] + old_repo_cache_capacity: Option, /// Format of logs printed to stdout #[arg(long)] @@ -1178,3 +1181,29 @@ struct Sled { #[serde(skip_serializing_if = "Option::is_none")] export_path: Option, } + +#[derive(Debug, Parser, serde::Serialize)] +#[serde(rename_all = "snake_case")] +struct OldSled { + /// The path to store the sled database + #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] + path: Option, + + /// The cache capacity, in bytes, allowed to sled for in-memory operations + #[arg(short, long)] + #[serde(skip_serializing_if = "Option::is_none")] + cache_capacity: Option, +} + +impl OldSled { + fn set(self) -> Option { + let any_set = self.path.is_some() || self.cache_capacity.is_some(); + + if any_set { + Some(self) + } else { + None + } + } +} diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 187158c..a2dd2b7 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -11,7 +11,7 @@ pub(crate) struct Defaults { server: ServerDefaults, client: ClientDefaults, tracing: TracingDefaults, - old_db: OldDbDefaults, + old_repo: SledDefaults, media: MediaDefaults, repo: RepoDefaults, store: StoreDefaults, @@ -62,12 +62,6 @@ struct OpenTelemetryDefaults { targets: Serde, } -#[derive(Clone, Debug, serde::Serialize)] -#[serde(rename_all = "snake_case")] -struct OldDbDefaults { - path: PathBuf, -} - #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "snake_case")] struct MediaDefaults { @@ -225,14 +219,6 @@ impl Default for OpenTelemetryDefaults { } } -impl Default for OldDbDefaults { - fn default() -> Self { - OldDbDefaults { - path: PathBuf::from(String::from("/mnt")), - } - } -} - impl Default for MediaDefaults { fn default() -> Self { MediaDefaults { diff --git a/src/config/file.rs b/src/config/file.rs index 7716618..f1e66a5 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -18,7 +18,7 @@ pub(crate) struct ConfigFile { #[serde(default)] pub(crate) metrics: Metrics, - pub(crate) old_db: OldDb, + pub(crate) old_repo: Sled, pub(crate) media: Media, diff --git a/src/error.rs b/src/error.rs index 249c7b6..991eb7b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -52,8 +52,8 @@ pub(crate) enum UploadError { #[error("Error in DB")] Repo(#[from] crate::repo::RepoError), - #[error("Error in old sled DB")] - OldSled(#[from] ::sled::Error), + #[error("Error in old repo")] + OldRepo(#[from] crate::repo_04::RepoError), #[error("Error parsing string")] ParseString(#[from] std::string::FromUtf8Error), diff --git a/src/generate.rs b/src/generate.rs index be12297..f5a73b7 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -4,8 +4,8 @@ use crate::{ error::{Error, UploadError}, ffmpeg::ThumbnailFormat, formats::{InputProcessableFormat, InternalVideoFormat}, - repo::{Alias, FullRepo, Hash}, - store::Store, + repo::{Alias, ArcRepo, Hash}, + store::{Identifier, Store}, }; use actix_web::web::Bytes; use std::{path::PathBuf, time::Instant}; @@ -40,8 +40,8 @@ impl Drop for MetricsGuard { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, hash, process_map, media))] -pub(crate) async fn generate( - repo: &R, +pub(crate) async fn generate( + repo: &ArcRepo, store: &S, process_map: &ProcessMap, format: InputProcessableFormat, @@ -75,8 +75,8 @@ pub(crate) async fn generate( #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(repo, store, hash, media))] -async fn process( - repo: &R, +async fn process( + repo: &ArcRepo, store: &S, output_format: InputProcessableFormat, alias: Alias, @@ -90,11 +90,8 @@ async fn process( let guard = MetricsGuard::guard(); let permit = crate::PROCESS_SEMAPHORE.acquire().await; - let identifier = if let Some(identifier) = repo - .still_identifier_from_alias::(&alias) - .await? - { - identifier + let identifier = if let Some(identifier) = repo.still_identifier_from_alias(&alias).await? { + S::Identifier::from_arc(identifier)? } else { let Some(identifier) = repo.identifier(hash.clone()).await? else { return Err(UploadError::MissingIdentifier.into()); @@ -104,7 +101,7 @@ async fn process( let reader = crate::ffmpeg::thumbnail( store.clone(), - identifier, + S::Identifier::from_arc(identifier)?, input_format.unwrap_or(InternalVideoFormat::Mp4), thumbnail_format, media.process_timeout, diff --git a/src/ingest.rs b/src/ingest.rs index 714af5e..7fea8d1 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,7 +3,7 @@ use crate::{ either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo}, + repo::{Alias, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo}, store::Store, }; use actix_web::web::Bytes; @@ -14,12 +14,11 @@ mod hasher; use hasher::Hasher; #[derive(Debug)] -pub(crate) struct Session +pub(crate) struct Session where - R: FullRepo + 'static, S: Store, { - repo: R, + repo: ArcRepo, delete_token: DeleteToken, hash: Option, alias: Option, @@ -41,15 +40,14 @@ where } #[tracing::instrument(skip(repo, store, stream, media))] -pub(crate) async fn ingest( - repo: &R, +pub(crate) async fn ingest( + repo: &ArcRepo, store: &S, stream: impl Stream> + Unpin + 'static, declared_alias: Option, media: &crate::config::Media, -) -> Result, Error> +) -> Result, Error> where - R: FullRepo + 'static, S: Store, { let permit = crate::PROCESS_SEMAPHORE.acquire().await; @@ -129,18 +127,17 @@ where } #[tracing::instrument(level = "trace", skip_all)] -async fn save_upload( - session: &mut Session, - repo: &R, +async fn save_upload( + session: &mut Session, + repo: &ArcRepo, store: &S, hash: Hash, identifier: &S::Identifier, ) -> Result<(), Error> where S: Store, - R: FullRepo, { - if HashRepo::create(repo, hash.clone(), identifier) + if HashRepo::create(repo.as_ref(), hash.clone(), identifier) .await? .is_err() { @@ -156,9 +153,8 @@ where Ok(()) } -impl Session +impl Session where - R: FullRepo + 'static, S: Store, { pub(crate) fn disarm(mut self) -> DeleteToken { @@ -179,7 +175,7 @@ where #[tracing::instrument(skip(self, hash))] async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> { - AliasRepo::create(&self.repo, &alias, &self.delete_token, hash) + AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash) .await? .map_err(|_| UploadError::DuplicateAlias)?; @@ -193,7 +189,7 @@ where loop { let alias = Alias::generate(input_type.file_extension().to_string()); - if AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone()) + if AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash.clone()) .await? .is_ok() { @@ -207,9 +203,8 @@ where } } -impl Drop for Session +impl Drop for Session where - R: FullRepo + 'static, S: Store, { fn drop(&mut self) { diff --git a/src/lib.rs b/src/lib.rs index d1adaab..f0b1dca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,7 @@ use futures_util::{ use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; use once_cell::sync::Lazy; +use repo::ArcRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; @@ -48,6 +49,7 @@ use std::{ future::ready, path::Path, path::PathBuf, + sync::Arc, time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; @@ -68,15 +70,11 @@ use self::{ migrate_store::migrate_store, queue::queue_generate, repo::{ - sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, Hash, HashRepo, - IdentifierRepo, Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, + sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, Hash, Repo, UploadId, UploadResult, + VariantAccessRepo, }, serde_str::Serde, - store::{ - file_store::FileStore, - object_store::{ObjectStore, ObjectStoreConfig}, - Identifier, Store, - }, + store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, stream::{StreamLimit, StreamTimeout}, }; @@ -94,13 +92,13 @@ static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| { .in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) }); -async fn ensure_details( - repo: &R, +async fn ensure_details( + repo: &ArcRepo, store: &S, config: &Configuration, alias: &Alias, ) -> Result { - let Some(identifier) = repo.identifier_from_alias::(alias).await? else { + let Some(identifier) = repo.identifier_from_alias(alias).await?.map(S::Identifier::from_arc).transpose()? else { return Err(UploadError::MissingAlias.into()); }; @@ -130,10 +128,10 @@ async fn ensure_details( } } -struct Upload(Value>); +struct Upload(Value>); -impl FormData for Upload { - type Item = Session; +impl FormData for Upload { + type Item = Session; type Error = Error; fn form(req: &HttpRequest) -> Form { @@ -141,7 +139,7 @@ impl FormData for Upload { // // This form is expecting a single array field, 'images' with at most 10 files in it let repo = req - .app_data::>() + .app_data::>() .expect("No repo in request") .clone(); let store = req @@ -176,7 +174,7 @@ impl FormData for Upload { return Err(UploadError::ReadOnly.into()); } - ingest::ingest(&**repo, &**store, stream, None, &config.media).await + ingest::ingest(&repo, &**store, stream, None, &config.media).await } .instrument(span), ) @@ -184,20 +182,20 @@ impl FormData for Upload { ) } - fn extract(value: Value>) -> Result { + fn extract(value: Value) -> Result { Ok(Upload(value)) } } -struct Import(Value>); +struct Import(Value>); -impl FormData for Import { - type Item = Session; +impl FormData for Import { + type Item = Session; type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { let repo = req - .app_data::>() + .app_data::>() .expect("No repo in request") .clone(); let store = req @@ -236,7 +234,7 @@ impl FormData for Import { } ingest::ingest( - &**repo, + &repo, &**store, stream, Some(Alias::from_existing(&filename)), @@ -260,9 +258,9 @@ impl FormData for Import { /// Handle responding to successful uploads #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] -async fn upload( - Multipart(Upload(value)): Multipart>, - repo: web::Data, +async fn upload( + Multipart(Upload(value)): Multipart>, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -271,9 +269,9 @@ async fn upload( /// Handle responding to successful uploads #[tracing::instrument(name = "Imported files", skip(value, repo, store, config))] -async fn import( - Multipart(Import(value)): Multipart>, - repo: web::Data, +async fn import( + Multipart(Import(value)): Multipart>, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -282,9 +280,9 @@ async fn import( /// Handle responding to successful uploads #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] -async fn handle_upload( - value: Value>, - repo: web::Data, +async fn handle_upload( + value: Value>, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -325,10 +323,10 @@ async fn handle_upload( }))) } -struct BackgroundedUpload(Value>); +struct BackgroundedUpload(Value>); -impl FormData for BackgroundedUpload { - type Item = Backgrounded; +impl FormData for BackgroundedUpload { + type Item = Backgrounded; type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { @@ -336,7 +334,7 @@ impl FormData for BackgroundedUpload { // // This form is expecting a single array field, 'images' with at most 10 files in it let repo = req - .app_data::>() + .app_data::>() .expect("No repo in request") .clone(); let store = req @@ -389,9 +387,9 @@ impl FormData for BackgroundedUpload { } #[tracing::instrument(name = "Uploaded files", skip(value, repo))] -async fn upload_backgrounded( - Multipart(BackgroundedUpload(value)): Multipart>, - repo: web::Data, +async fn upload_backgrounded( + Multipart(BackgroundedUpload(value)): Multipart>, + repo: web::Data, ) -> Result { let images = value .map() @@ -437,8 +435,8 @@ struct ClaimQuery { /// Claim a backgrounded upload #[tracing::instrument(name = "Waiting on upload", skip_all)] -async fn claim_upload( - repo: web::Data, +async fn claim_upload( + repo: web::Data, store: web::Data, config: web::Data, query: web::Query, @@ -483,9 +481,9 @@ struct UrlQuery { backgrounded: bool, } -async fn ingest_inline( +async fn ingest_inline( stream: impl Stream> + Unpin + 'static, - repo: &R, + repo: &ArcRepo, store: &S, config: &Configuration, ) -> Result<(Alias, DeleteToken, Details), Error> { @@ -502,9 +500,9 @@ async fn ingest_inline( /// download an image from a URL #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] -async fn download( +async fn download( client: web::Data, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, query: web::Query, @@ -542,9 +540,9 @@ async fn download_stream( } #[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))] -async fn do_download_inline( +async fn do_download_inline( stream: impl Stream> + Unpin + 'static, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -563,9 +561,9 @@ async fn do_download_inline( } #[tracing::instrument(name = "Downloading file in background", skip(stream, repo, store))] -async fn do_download_backgrounded( +async fn do_download_backgrounded( stream: impl Stream> + Unpin + 'static, - repo: web::Data, + repo: web::Data, store: web::Data, ) -> Result { metrics::increment_counter!("pict-rs.files", "download" => "background"); @@ -592,8 +590,8 @@ async fn do_download_backgrounded( /// Delete aliases and files #[tracing::instrument(name = "Deleting file", skip(repo, config))] -async fn delete( - repo: web::Data, +async fn delete( + repo: web::Data, config: web::Data, path_entries: web::Path<(String, String)>, ) -> Result { @@ -649,10 +647,10 @@ fn prepare_process( } #[tracing::instrument(name = "Fetching derived details", skip(repo, config))] -async fn process_details( +async fn process_details( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, + repo: web::Data, config: web::Data, ) -> Result { let alias = match source { @@ -681,12 +679,15 @@ async fn process_details( let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); if !config.server.read_only { - VariantAccessRepo::accessed(&repo, hash.clone(), thumbnail_string.clone()).await?; + VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), thumbnail_string.clone()) + .await?; } let identifier = repo - .variant_identifier::(hash, thumbnail_string) + .variant_identifier(hash, thumbnail_string) .await? + .map(S::Identifier::from_arc) + .transpose()? .ok_or(UploadError::MissingAlias)?; let details = repo.details(&identifier).await?; @@ -696,7 +697,7 @@ async fn process_details( Ok(HttpResponse::Ok().json(&details)) } -async fn not_found_hash(repo: &R) -> Result, Error> { +async fn not_found_hash(repo: &ArcRepo) -> Result, Error> { let Some(not_found) = repo.get(NOT_FOUND_KEY).await? else { return Ok(None); }; @@ -720,11 +721,11 @@ async fn not_found_hash(repo: &R) -> Result, name = "Serving processed image", skip(repo, store, client, config, process_map) )] -async fn process( +async fn process( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, + repo: web::Data, store: web::Data, client: web::Data, config: web::Data, @@ -750,7 +751,7 @@ async fn process( }; if !config.server.read_only { - AliasAccessRepo::accessed(&repo, alias.clone()).await?; + AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; } alias @@ -773,12 +774,14 @@ async fn process( }; if !config.server.read_only { - VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?; + VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; } let identifier_opt = repo - .variant_identifier::(hash.clone(), path_string) - .await?; + .variant_identifier(hash.clone(), path_string) + .await? + .map(S::Identifier::from_arc) + .transpose()?; if let Some(identifier) = identifier_opt { let details = repo.details(&identifier).await?.and_then(|details| { @@ -874,11 +877,11 @@ async fn process( } #[tracing::instrument(name = "Serving processed image headers", skip(repo, store, config))] -async fn process_head( +async fn process_head( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -903,12 +906,14 @@ async fn process_head( }; if !config.server.read_only { - VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?; + VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?; } let identifier_opt = repo - .variant_identifier::(hash.clone(), path_string) - .await?; + .variant_identifier(hash.clone(), path_string) + .await? + .map(S::Identifier::from_arc) + .transpose()?; if let Some(identifier) = identifier_opt { let details = repo.details(&identifier).await?.and_then(|details| { @@ -950,10 +955,10 @@ async fn process_head( /// Process files #[tracing::instrument(name = "Spawning image process", skip(repo))] -async fn process_backgrounded( +async fn process_backgrounded( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, + repo: web::Data, config: web::Data, ) -> Result { let source = match source { @@ -978,8 +983,10 @@ async fn process_backgrounded( }; let identifier_opt = repo - .variant_identifier::(hash.clone(), path_string) - .await?; + .variant_identifier(hash.clone(), path_string) + .await? + .map(S::Identifier::from_arc) + .transpose()?; if identifier_opt.is_some() { return Ok(HttpResponse::Accepted().finish()); @@ -996,9 +1003,9 @@ async fn process_backgrounded( /// Fetch file details #[tracing::instrument(name = "Fetching query details", skip(repo, store, config))] -async fn details_query( +async fn details_query( web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1019,18 +1026,18 @@ async fn details_query( /// Fetch file details #[tracing::instrument(name = "Fetching details", skip(repo, store, config))] -async fn details( +async fn details( alias: web::Path>, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { do_details(Serde::into_inner(alias.into_inner()), repo, store, config).await } -async fn do_details( +async fn do_details( alias: Alias, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1041,10 +1048,10 @@ async fn do_details( /// Serve files based on alias query #[tracing::instrument(name = "Serving file query", skip(repo, store, client, config))] -async fn serve_query( +async fn serve_query( range: Option>, web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, store: web::Data, client: web::Data, config: web::Data, @@ -1067,7 +1074,7 @@ async fn serve_query( }; if !config.server.read_only { - AliasAccessRepo::accessed(&repo, alias.clone()).await?; + AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?; } alias @@ -1079,10 +1086,10 @@ async fn serve_query( /// Serve files #[tracing::instrument(name = "Serving file", skip(repo, store, config))] -async fn serve( +async fn serve( range: Option>, alias: web::Path>, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1096,10 +1103,10 @@ async fn serve( .await } -async fn do_serve( +async fn do_serve( range: Option>, alias: Alias, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1113,7 +1120,7 @@ async fn do_serve( (hash, alias, true) }; - let Some(identifier) = repo.identifier(hash.clone()).await? else { + let Some(identifier) = repo.identifier(hash.clone()).await?.map(Identifier::from_arc).transpose()? else { tracing::warn!( "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); @@ -1133,10 +1140,10 @@ async fn do_serve( } #[tracing::instrument(name = "Serving query file headers", skip(repo, store, config))] -async fn serve_query_head( +async fn serve_query_head( range: Option>, web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1154,10 +1161,10 @@ async fn serve_query_head( } #[tracing::instrument(name = "Serving file headers", skip(repo, store, config))] -async fn serve_head( +async fn serve_head( range: Option>, alias: web::Path>, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { @@ -1171,14 +1178,14 @@ async fn serve_head( .await } -async fn do_serve_head( +async fn do_serve_head( range: Option>, alias: Alias, - repo: web::Data, + repo: web::Data, store: web::Data, config: web::Data, ) -> Result { - let Some(identifier) = repo.identifier_from_alias::(&alias).await? else { + let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else { // Invalid alias return Ok(HttpResponse::NotFound().finish()); }; @@ -1327,8 +1334,8 @@ fn srv_head( } #[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))] -async fn clean_variants( - repo: web::Data, +async fn clean_variants( + repo: web::Data, config: web::Data, ) -> Result { if config.server.read_only { @@ -1347,9 +1354,9 @@ enum AliasQuery { } #[tracing::instrument(name = "Setting 404 Image", skip(repo, config))] -async fn set_not_found( +async fn set_not_found( json: web::Json, - repo: web::Data, + repo: web::Data, client: web::Data, config: web::Data, ) -> Result { @@ -1380,9 +1387,9 @@ async fn set_not_found( } #[tracing::instrument(name = "Purging file", skip(repo, config))] -async fn purge( +async fn purge( web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, config: web::Data, ) -> Result { if config.server.read_only { @@ -1415,9 +1422,9 @@ async fn purge( } #[tracing::instrument(name = "Fetching aliases", skip(repo))] -async fn aliases( +async fn aliases( web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), @@ -1438,9 +1445,9 @@ async fn aliases( } #[tracing::instrument(name = "Fetching identifier", skip(repo))] -async fn identifier( +async fn identifier( web::Query(alias_query): web::Query, - repo: web::Data, + repo: web::Data, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), @@ -1452,7 +1459,7 @@ async fn identifier( } }; - let Some(identifier) = repo.identifier_from_alias::(&alias).await? else { + let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else { // Invalid alias return Ok(HttpResponse::NotFound().json(serde_json::json!({ "msg": "No identifiers associated with provided alias" @@ -1465,8 +1472,8 @@ async fn identifier( }))) } -async fn healthz( - repo: web::Data, +async fn healthz( + repo: web::Data, store: web::Data, ) -> Result { repo.health_check().await?; @@ -1493,13 +1500,9 @@ fn build_client(config: &Configuration) -> Result { .build()) } -fn configure_endpoints< - R: FullRepo + 'static, - S: Store + 'static, - F: Fn(&mut web::ServiceConfig), ->( +fn configure_endpoints( config: &mut web::ServiceConfig, - repo: R, + repo: ArcRepo, store: S, configuration: Configuration, client: ClientWithMiddleware, @@ -1510,68 +1513,63 @@ fn configure_endpoints< .app_data(web::Data::new(store)) .app_data(web::Data::new(client)) .app_data(web::Data::new(configuration.clone())) - .route("/healthz", web::get().to(healthz::)) + .route("/healthz", web::get().to(healthz::)) .service( web::scope("/image") .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) .service( web::scope("/backgrounded") .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload_backgrounded::)), + .route(web::post().to(upload_backgrounded::)), ) - .service( - web::resource("/claim").route(web::get().to(claim_upload::)), - ), + .service(web::resource("/claim").route(web::get().to(claim_upload::))), ) - .service(web::resource("/download").route(web::get().to(download::))) + .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete::)) - .route(web::get().to(delete::)), + .route(web::delete().to(delete)) + .route(web::get().to(delete)), ) .service( web::scope("/original") .service( web::resource("") - .route(web::get().to(serve_query::)) - .route(web::head().to(serve_query_head::)), + .route(web::get().to(serve_query::)) + .route(web::head().to(serve_query_head::)), ) .service( web::resource("/{filename}") - .route(web::get().to(serve::)) - .route(web::head().to(serve_head::)), + .route(web::get().to(serve::)) + .route(web::head().to(serve_head::)), ), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process::)) - .route(web::head().to(process_head::)), + .route(web::get().to(process::)) + .route(web::head().to(process_head::)), ) .service( web::resource("/process_backgrounded.{ext}") - .route(web::get().to(process_backgrounded::)), + .route(web::get().to(process_backgrounded::)), ) .service( web::scope("/details") .service( web::scope("/original") + .service(web::resource("").route(web::get().to(details_query::))) .service( - web::resource("").route(web::get().to(details_query::)), - ) - .service( - web::resource("/{filename}") - .route(web::get().to(details::)), + web::resource("/{filename}").route(web::get().to(details::)), ), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details::)), + .route(web::get().to(process_details::)), ), ), ) @@ -1580,20 +1578,17 @@ fn configure_endpoints< .wrap(Internal( configuration.server.api_key.as_ref().map(|s| s.to_owned()), )) - .service(web::resource("/import").route(web::post().to(import::))) - .service(web::resource("/variants").route(web::delete().to(clean_variants::))) - .service(web::resource("/purge").route(web::post().to(purge::))) - .service(web::resource("/aliases").route(web::get().to(aliases::))) - .service(web::resource("/identifier").route(web::get().to(identifier::))) - .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))) + .service(web::resource("/import").route(web::post().to(import::))) + .service(web::resource("/variants").route(web::delete().to(clean_variants))) + .service(web::resource("/purge").route(web::post().to(purge))) + .service(web::resource("/aliases").route(web::get().to(aliases))) + .service(web::resource("/identifier").route(web::get().to(identifier::))) + .service(web::resource("/set_not_found").route(web::post().to(set_not_found))) .configure(extra_config), ); } -fn spawn_cleanup(repo: R, config: &Configuration) -where - R: FullRepo + 'static, -{ +fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { if config.server.read_only { return; } @@ -1623,9 +1618,8 @@ where }) } -fn spawn_workers(repo: R, store: S, config: Configuration, process_map: ProcessMap) +fn spawn_workers(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap) where - R: FullRepo + 'static, S: Store + 'static, { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1639,8 +1633,8 @@ where .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, process_map, config))); } -async fn launch_file_store( - repo: R, +async fn launch_file_store( + repo: ArcRepo, store: FileStore, client: ClientWithMiddleware, config: Configuration, @@ -1678,12 +1672,9 @@ async fn launch_file_store( - repo: R, - store_config: ObjectStoreConfig, +async fn launch_object_store( + repo: ArcRepo, + store: ObjectStore, client: ClientWithMiddleware, config: Configuration, extra_config: F, @@ -1696,7 +1687,7 @@ async fn launch_object_store< HttpServer::new(move || { let client = client.clone(); - let store = store_config.clone().build(client.clone()); + let store = store.clone(); let repo = repo.clone(); let config = config.clone(); let extra_config = extra_config.clone(); @@ -1737,7 +1728,7 @@ where match repo { Repo::Sled(repo) => { - migrate_store(repo, from, to, skip_missing_files, timeout).await? + migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await? } } } @@ -1775,7 +1766,7 @@ where match repo { Repo::Sled(repo) => { - migrate_store(repo, from, to, skip_missing_files, timeout).await? + migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await? } } } @@ -1833,8 +1824,9 @@ async fn export_handler(repo: web::Data) -> Result { let store = FileStore::build(path, repo.clone()).await?; + + let arc_repo = repo.to_arc(); + + if arc_repo.get("migrate-0.4").await?.is_none() { + if let Some(old_repo) = repo_04::open(&config.old_repo)? { + repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; + arc_repo + .set("migrate-0.4", Arc::from(b"migrated".to_vec())) + .await?; + } + } + match repo { Repo::Sled(sled_repo) => { - sled_repo - .mark_accessed::<::Identifier>() - .await?; - - launch_file_store(sled_repo, store, client, config, sled_extra_config) - .await?; + launch_file_store( + Arc::new(sled_repo.clone()), + store, + client, + config, + move |sc| sled_extra_config(sc, sled_repo.clone()), + ) + .await?; } } } @@ -1991,16 +1998,26 @@ impl PictRsConfiguration { public_endpoint, repo.clone(), ) - .await?; + .await? + .build(client.clone()); + + let arc_repo = repo.to_arc(); + + if arc_repo.get("migrate-0.4").await?.is_none() { + if let Some(old_repo) = repo_04::open(&config.old_repo)? { + repo::migrate_04(old_repo, &arc_repo, &store, &config).await?; + arc_repo + .set("migrate-0.4", Arc::from(b"migrated".to_vec())) + .await?; + } + } match repo { Repo::Sled(sled_repo) => { - sled_repo - .mark_accessed::<::Identifier>() - .await?; - - launch_object_store(sled_repo, store, client, config, sled_extra_config) - .await?; + launch_object_store(arc_repo, store, client, config, move |sc| { + sled_extra_config(sc, sled_repo.clone()) + }) + .await?; } } } diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 27846f1..1611320 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -8,12 +8,12 @@ use std::{ use crate::{ details::Details, error::{Error, UploadError}, - repo::{Hash, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, + repo::{ArcRepo, Hash, IdentifierRepo}, store::{Identifier, Store}, }; -pub(super) async fn migrate_store( - repo: R, +pub(super) async fn migrate_store( + repo: ArcRepo, from: S1, to: S2, skip_missing_files: bool, @@ -22,7 +22,6 @@ pub(super) async fn migrate_store( where S1: Store + Clone + 'static, S2: Store + Clone + 'static, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, { tracing::warn!("Running checks"); @@ -65,8 +64,8 @@ where Ok(()) } -struct MigrateState { - repo: R, +struct MigrateState { + repo: ArcRepo, from: S1, to: S2, continuing_migration: bool, @@ -79,8 +78,8 @@ struct MigrateState { timeout: u64, } -async fn do_migrate_store( - repo: R, +async fn do_migrate_store( + repo: ArcRepo, from: S1, to: S2, skip_missing_files: bool, @@ -89,7 +88,6 @@ async fn do_migrate_store( where S1: Store + 'static, S2: Store + 'static, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, { let continuing_migration = repo.is_continuing_migration().await?; let initial_repo_size = repo.size().await?; @@ -150,11 +148,10 @@ where } #[tracing::instrument(skip(state))] -async fn migrate_hash(state: &MigrateState, hash: Hash) -> Result<(), Error> +async fn migrate_hash(state: &MigrateState, hash: Hash) -> Result<(), Error> where S1: Store, S2: Store, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo, { let MigrateState { repo, @@ -173,7 +170,7 @@ where let current_index = index.fetch_add(1, Ordering::Relaxed); let original_identifier = match repo.identifier(hash.clone()).await { - Ok(Some(identifier)) => identifier, + Ok(Some(identifier)) => S1::Identifier::from_arc(identifier)?, Ok(None) => { tracing::warn!( "Original File identifier for hash {hash:?} is missing, queue cleanup task", @@ -218,6 +215,8 @@ where } if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { + let identifier = S1::Identifier::from_arc(identifier)?; + if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { @@ -247,6 +246,8 @@ where } for (variant, identifier) in repo.variants(hash.clone()).await? { + let identifier = S1::Identifier::from_arc(identifier)?; + if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { @@ -334,8 +335,8 @@ where Ok(()) } -async fn migrate_file( - repo: &R, +async fn migrate_file( + repo: &ArcRepo, from: &S1, to: &S2, identifier: &S1::Identifier, @@ -343,7 +344,6 @@ async fn migrate_file( timeout: u64, ) -> Result where - R: IdentifierRepo, S1: Store, S2: Store, { @@ -378,15 +378,14 @@ enum MigrateError { To(crate::store::StoreError), } -async fn do_migrate_file( - repo: &R, +async fn do_migrate_file( + repo: &ArcRepo, from: &S1, to: &S2, identifier: &S1::Identifier, timeout: u64, ) -> Result where - R: IdentifierRepo, S1: Store, S2: Store, { @@ -429,15 +428,14 @@ where Ok(new_identifier) } -async fn migrate_details(repo: &R, from: &I1, to: &I2) -> Result<(), Error> +async fn migrate_details(repo: &ArcRepo, from: &I1, to: &I2) -> Result<(), Error> where - R: IdentifierRepo, I1: Identifier, I2: Identifier, { if let Some(details) = repo.details(from).await? { repo.relate_details(to, &details).await?; - repo.cleanup(from).await?; + IdentifierRepo::cleanup(repo.as_ref(), from).await?; } Ok(()) diff --git a/src/queue.rs b/src/queue.rs index 8ec3c72..f6f8037 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -3,10 +3,7 @@ use crate::{ config::Configuration, error::Error, formats::InputProcessableFormat, - repo::{ - Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, JobId, QueueRepo, - UploadId, - }, + repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId}, serde_str::Serde, store::{Identifier, Store}, }; @@ -15,6 +12,7 @@ use std::{ future::Future, path::PathBuf, pin::Pin, + sync::Arc, time::{Duration, Instant}, }; use tracing::Instrument; @@ -88,8 +86,8 @@ enum Process { }, } -pub(crate) async fn cleanup_alias( - repo: &R, +pub(crate) async fn cleanup_alias( + repo: &Arc, alias: Alias, token: DeleteToken, ) -> Result<(), Error> { @@ -101,14 +99,14 @@ pub(crate) async fn cleanup_alias( Ok(()) } -pub(crate) async fn cleanup_hash(repo: &R, hash: Hash) -> Result<(), Error> { +pub(crate) async fn cleanup_hash(repo: &Arc, hash: Hash) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Hash { hash })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn cleanup_identifier( - repo: &R, +pub(crate) async fn cleanup_identifier( + repo: &Arc, identifier: I, ) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Identifier { @@ -118,8 +116,8 @@ pub(crate) async fn cleanup_identifier( Ok(()) } -async fn cleanup_variants( - repo: &R, +async fn cleanup_variants( + repo: &Arc, hash: Hash, variant: Option, ) -> Result<(), Error> { @@ -128,26 +126,26 @@ async fn cleanup_variants( Ok(()) } -pub(crate) async fn cleanup_outdated_proxies(repo: &R) -> Result<(), Error> { +pub(crate) async fn cleanup_outdated_proxies(repo: &Arc) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::OutdatedProxies)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn cleanup_outdated_variants(repo: &R) -> Result<(), Error> { +pub(crate) async fn cleanup_outdated_variants(repo: &Arc) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn cleanup_all_variants(repo: &R) -> Result<(), Error> { +pub(crate) async fn cleanup_all_variants(repo: &Arc) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::AllVariants)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn queue_ingest( - repo: &R, +pub(crate) async fn queue_ingest( + repo: &Arc, identifier: Vec, upload_id: UploadId, declared_alias: Option, @@ -161,8 +159,8 @@ pub(crate) async fn queue_ingest( Ok(()) } -pub(crate) async fn queue_generate( - repo: &R, +pub(crate) async fn queue_generate( + repo: &Arc, target_format: InputProcessableFormat, source: Alias, process_path: PathBuf, @@ -178,16 +176,16 @@ pub(crate) async fn queue_generate( Ok(()) } -pub(crate) async fn process_cleanup( - repo: R, +pub(crate) async fn process_cleanup( + repo: Arc, store: S, config: Configuration, ) { process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await } -pub(crate) async fn process_images( - repo: R, +pub(crate) async fn process_images( + repo: Arc, store: S, process_map: ProcessMap, config: Configuration, @@ -205,16 +203,20 @@ pub(crate) async fn process_images( type LocalBoxFuture<'a, T> = Pin + 'a>>; -async fn process_jobs( - repo: &R, +async fn process_jobs( + repo: &Arc, store: &S, config: &Configuration, queue: &'static str, callback: F, ) where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, S: Store, - for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + for<'a> F: Fn( + &'a Arc, + &'a S, + &'a Configuration, + &'a [u8], + ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { let worker_id = uuid::Uuid::new_v4(); @@ -263,8 +265,8 @@ impl Drop for MetricsGuard { } } -async fn job_loop( - repo: &R, +async fn job_loop( + repo: &Arc, store: &S, config: &Configuration, worker_id: uuid::Uuid, @@ -272,9 +274,13 @@ async fn job_loop( callback: F, ) -> Result<(), Error> where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, S: Store, - for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + for<'a> F: Fn( + &'a Arc, + &'a S, + &'a Configuration, + &'a [u8], + ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { @@ -312,18 +318,17 @@ where } } -async fn process_image_jobs( - repo: &R, +async fn process_image_jobs( + repo: &Arc, store: &S, process_map: &ProcessMap, config: &Configuration, queue: &'static str, callback: F, ) where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, S: Store, for<'a> F: Fn( - &'a R, + &'a Arc, &'a S, &'a ProcessMap, &'a Configuration, @@ -347,8 +352,8 @@ async fn process_image_jobs( } } -async fn image_job_loop( - repo: &R, +async fn image_job_loop( + repo: &Arc, store: &S, process_map: &ProcessMap, config: &Configuration, @@ -357,10 +362,9 @@ async fn image_job_loop( callback: F, ) -> Result<(), Error> where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, S: Store, for<'a> F: Fn( - &'a R, + &'a Arc, &'a S, &'a ProcessMap, &'a Configuration, @@ -402,15 +406,14 @@ where } } -async fn heartbeat( - repo: &R, +async fn heartbeat( + repo: &Arc, queue: &'static str, worker_id: uuid::Uuid, job_id: JobId, fut: Fut, ) -> Fut::Output where - R: QueueRepo, Fut: std::future::Future, { let mut fut = diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index bb5f9e7..dbe4676 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, repo::{ - Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, + Alias, AliasAccessRepo, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, IdentifierRepo, VariantAccessRepo, }, serde_str::Serde, @@ -11,20 +11,19 @@ use crate::{ }; use futures_util::StreamExt; -pub(super) fn perform<'a, R, S>( - repo: &'a R, +pub(super) fn perform<'a, S>( + repo: &'a ArcRepo, store: &'a S, configuration: &'a Configuration, job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where - R: FullRepo, S: Store, { Box::pin(async move { match serde_json::from_slice(job) { Ok(job) => match job { - Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, + Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?, Cleanup::Identifier { identifier: Base64Bytes(in_identifier), } => identifier(repo, store, in_identifier).await?, @@ -39,12 +38,10 @@ where ) .await? } - Cleanup::Variant { hash, variant } => { - hash_variant::(repo, hash, variant).await? - } - Cleanup::AllVariants => all_variants::(repo).await?, - Cleanup::OutdatedVariants => outdated_variants::(repo, configuration).await?, - Cleanup::OutdatedProxies => outdated_proxies::(repo, configuration).await?, + Cleanup::Variant { hash, variant } => hash_variant(repo, hash, variant).await?, + Cleanup::AllVariants => all_variants(repo).await?, + Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?, + Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -56,9 +53,8 @@ where } #[tracing::instrument(skip_all)] -async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> +async fn identifier(repo: &ArcRepo, store: &S, identifier: Vec) -> Result<(), Error> where - R: FullRepo, S: Store, { let identifier = S::Identifier::from_bytes(identifier)?; @@ -69,7 +65,7 @@ where errors.push(e); } - if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await { + if let Err(e) = IdentifierRepo::cleanup(repo.as_ref(), &identifier).await { errors.push(e); } @@ -81,11 +77,7 @@ where } #[tracing::instrument(skip_all)] -async fn hash(repo: &R, hash: Hash) -> Result<(), Error> -where - R: FullRepo, - S: Store, -{ +async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { let aliases = repo.for_hash(hash.clone()).await?; if !aliases.is_empty() { @@ -102,7 +94,7 @@ where } let mut idents = repo - .variants::(hash.clone()) + .variants(hash.clone()) .await? .into_iter() .map(|(_, v)| v) @@ -114,16 +106,13 @@ where let _ = super::cleanup_identifier(repo, identifier).await; } - HashRepo::cleanup(repo, hash).await?; + HashRepo::cleanup(repo.as_ref(), hash).await?; Ok(()) } #[tracing::instrument(skip_all)] -async fn alias(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error> -where - R: FullRepo, -{ +async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), Error> { let saved_delete_token = repo.delete_token(&alias).await?; if saved_delete_token.is_some() && saved_delete_token != Some(token) { @@ -132,9 +121,9 @@ where let hash = repo.hash(&alias).await?; - AliasRepo::cleanup(repo, &alias).await?; + AliasRepo::cleanup(repo.as_ref(), &alias).await?; repo.remove_relation(alias.clone()).await?; - AliasAccessRepo::remove_access(repo, alias.clone()).await?; + AliasAccessRepo::remove_access(repo.as_ref(), alias.clone()).await?; let Some(hash) = hash else { // hash doesn't exist, nothing to do @@ -149,11 +138,7 @@ where } #[tracing::instrument(skip_all)] -async fn all_variants(repo: &R) -> Result<(), Error> -where - R: FullRepo, - S: Store, -{ +async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { let mut hash_stream = Box::pin(repo.hashes().await); while let Some(res) = hash_stream.next().await { @@ -165,11 +150,7 @@ where } #[tracing::instrument(skip_all)] -async fn outdated_variants(repo: &R, config: &Configuration) -> Result<(), Error> -where - R: FullRepo, - S: Store, -{ +async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.variants.to_duration()); @@ -184,11 +165,7 @@ where } #[tracing::instrument(skip_all)] -async fn outdated_proxies(repo: &R, config: &Configuration) -> Result<(), Error> -where - R: FullRepo, - S: Store, -{ +async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> { let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.proxy.to_duration()); @@ -201,7 +178,7 @@ where } else { tracing::warn!("Skipping alias cleanup - no delete token"); repo.remove_relation(alias.clone()).await?; - AliasAccessRepo::remove_access(repo, alias).await?; + AliasAccessRepo::remove_access(repo.as_ref(), alias).await?; } } @@ -209,18 +186,14 @@ where } #[tracing::instrument(skip_all)] -async fn hash_variant( - repo: &R, +async fn hash_variant( + repo: &ArcRepo, hash: Hash, target_variant: Option, -) -> Result<(), Error> -where - R: FullRepo, - S: Store, -{ +) -> Result<(), Error> { if let Some(target_variant) = target_variant { if let Some(identifier) = repo - .variant_identifier::(hash.clone(), target_variant.clone()) + .variant_identifier(hash.clone(), target_variant.clone()) .await? { super::cleanup_identifier(repo, identifier).await?; @@ -228,11 +201,11 @@ where repo.remove_variant(hash.clone(), target_variant.clone()) .await?; - VariantAccessRepo::remove_access(repo, hash, target_variant).await?; + VariantAccessRepo::remove_access(repo.as_ref(), hash, target_variant).await?; } else { - for (variant, identifier) in repo.variants::(hash.clone()).await? { + for (variant, identifier) in repo.variants(hash.clone()).await? { repo.remove_variant(hash.clone(), variant.clone()).await?; - VariantAccessRepo::remove_access(repo, hash.clone(), variant).await?; + VariantAccessRepo::remove_access(repo.as_ref(), hash.clone(), variant).await?; super::cleanup_identifier(repo, identifier).await?; } } diff --git a/src/queue/process.rs b/src/queue/process.rs index 75392d4..be01479 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -5,22 +5,21 @@ use crate::{ formats::InputProcessableFormat, ingest::Session, queue::{Base64Bytes, LocalBoxFuture, Process}, - repo::{Alias, FullRepo, UploadId, UploadResult}, + repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, store::{Identifier, Store}, }; use futures_util::TryStreamExt; use std::path::PathBuf; -pub(super) fn perform<'a, R, S>( - repo: &'a R, +pub(super) fn perform<'a, S>( + repo: &'a ArcRepo, store: &'a S, process_map: &'a ProcessMap, config: &'a Configuration, job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where - R: FullRepo + 'static, S: Store + 'static, { Box::pin(async move { @@ -70,8 +69,8 @@ where } #[tracing::instrument(skip_all)] -async fn process_ingest( - repo: &R, +async fn process_ingest( + repo: &ArcRepo, store: &S, unprocessed_identifier: Vec, upload_id: UploadId, @@ -79,7 +78,6 @@ async fn process_ingest( media: &crate::config::Media, ) -> Result<(), Error> where - R: FullRepo + 'static, S: Store + 'static, { let fut = async { @@ -99,7 +97,7 @@ where let session = crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; - Ok(session) as Result, Error> + Ok(session) as Result, Error> }) .await; @@ -130,8 +128,8 @@ where #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] -async fn generate( - repo: &R, +async fn generate( + repo: &ArcRepo, store: &S, process_map: &ProcessMap, target_format: InputProcessableFormat, @@ -146,9 +144,7 @@ async fn generate( }; let path_string = process_path.to_string_lossy().to_string(); - let identifier_opt = repo - .variant_identifier::(hash.clone(), path_string) - .await?; + let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if identifier_opt.is_some() { return Ok(()); diff --git a/src/repo.rs b/src/repo.rs index 6b8f9c1..6dd99bc 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -2,16 +2,20 @@ use crate::{ config, details::Details, store::{Identifier, StoreError}, + stream::LocalBoxStream, }; -use futures_util::Stream; use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; mod hash; +mod migrate; pub(crate) mod sled; pub(crate) use hash::Hash; +pub(crate) use migrate::migrate_04; + +pub(crate) type ArcRepo = Arc; #[derive(Clone, Debug)] pub(crate) enum Repo { @@ -35,7 +39,9 @@ pub(crate) struct DeleteToken { id: MaybeUuid, } +#[derive(Debug)] pub(crate) struct HashAlreadyExists; +#[derive(Debug)] pub(crate) struct AliasAlreadyExists; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -74,16 +80,12 @@ pub(crate) trait FullRepo: + ProxyRepo + Send + Sync - + Clone + Debug { async fn health_check(&self) -> Result<(), RepoError>; #[tracing::instrument(skip(self))] - async fn identifier_from_alias( - &self, - alias: &Alias, - ) -> Result, StoreError> { + async fn identifier_from_alias(&self, alias: &Alias) -> Result>, RepoError> { let Some(hash) = self.hash(alias).await? else { return Ok(None); }; @@ -101,20 +103,22 @@ pub(crate) trait FullRepo: } #[tracing::instrument(skip(self))] - async fn still_identifier_from_alias( + async fn still_identifier_from_alias( &self, alias: &Alias, - ) -> Result, StoreError> { + ) -> Result>, StoreError> { let Some(hash) = self.hash(alias).await? else { return Ok(None); }; - let Some(identifier) = self.identifier::(hash.clone()).await? else { + let Some(identifier) = self.identifier(hash.clone()).await? else { return Ok(None); }; match self.details(&identifier).await? { - Some(details) if details.is_video() => self.motion_identifier::(hash).await, + Some(details) if details.is_video() => { + self.motion_identifier(hash).await.map_err(From::from) + } Some(_) => Ok(Some(identifier)), None => Ok(None), } @@ -122,7 +126,7 @@ pub(crate) trait FullRepo: } #[async_trait::async_trait(?Send)] -impl FullRepo for actix_web::web::Data +impl FullRepo for Arc where T: FullRepo, { @@ -133,7 +137,7 @@ where pub(crate) trait BaseRepo {} -impl BaseRepo for actix_web::web::Data where T: BaseRepo {} +impl BaseRepo for Arc where T: BaseRepo {} #[async_trait::async_trait(?Send)] pub(crate) trait ProxyRepo: BaseRepo { @@ -145,7 +149,7 @@ pub(crate) trait ProxyRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl ProxyRepo for actix_web::web::Data +impl ProxyRepo for Arc where T: ProxyRepo, { @@ -164,25 +168,21 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait AliasAccessRepo: BaseRepo { - type AliasAccessStream: Stream>; - async fn accessed(&self, alias: Alias) -> Result<(), RepoError>; async fn older_aliases( &self, timestamp: time::OffsetDateTime, - ) -> Result; + ) -> Result>, RepoError>; async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] -impl AliasAccessRepo for actix_web::web::Data +impl AliasAccessRepo for Arc where T: AliasAccessRepo, { - type AliasAccessStream = T::AliasAccessStream; - async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { T::accessed(self, alias).await } @@ -190,7 +190,7 @@ where async fn older_aliases( &self, timestamp: time::OffsetDateTime, - ) -> Result { + ) -> Result>, RepoError> { T::older_aliases(self, timestamp).await } @@ -201,8 +201,6 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait VariantAccessRepo: BaseRepo { - type VariantAccessStream: Stream>; - async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn contains_variant(&self, hash: Hash, variant: String) -> Result; @@ -210,18 +208,16 @@ pub(crate) trait VariantAccessRepo: BaseRepo { async fn older_variants( &self, timestamp: time::OffsetDateTime, - ) -> Result; + ) -> Result>, RepoError>; async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] -impl VariantAccessRepo for actix_web::web::Data +impl VariantAccessRepo for Arc where T: VariantAccessRepo, { - type VariantAccessStream = T::VariantAccessStream; - async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::accessed(self, hash, variant).await } @@ -233,7 +229,7 @@ where async fn older_variants( &self, timestamp: time::OffsetDateTime, - ) -> Result { + ) -> Result>, RepoError> { T::older_variants(self, timestamp).await } @@ -254,7 +250,7 @@ pub(crate) trait UploadRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl UploadRepo for actix_web::web::Data +impl UploadRepo for Arc where T: UploadRepo, { @@ -318,7 +314,7 @@ pub(crate) trait QueueRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl QueueRepo for actix_web::web::Data +impl QueueRepo for Arc where T: QueueRepo, { @@ -361,7 +357,7 @@ pub(crate) trait SettingsRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl SettingsRepo for actix_web::web::Data +impl SettingsRepo for Arc where T: SettingsRepo, { @@ -380,34 +376,34 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait IdentifierRepo: BaseRepo { - async fn relate_details( + async fn relate_details( &self, - identifier: &I, + identifier: &dyn Identifier, details: &Details, ) -> Result<(), StoreError>; - async fn details(&self, identifier: &I) -> Result, StoreError>; + async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError>; - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError>; + async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] -impl IdentifierRepo for actix_web::web::Data +impl IdentifierRepo for Arc where T: IdentifierRepo, { - async fn relate_details( + async fn relate_details( &self, - identifier: &I, + identifier: &dyn Identifier, details: &Details, ) -> Result<(), StoreError> { T::relate_details(self, identifier, details).await } - async fn details(&self, identifier: &I) -> Result, StoreError> { + async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError> { T::details(self, identifier).await } - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { + async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { T::cleanup(self, identifier).await } } @@ -416,19 +412,19 @@ where pub(crate) trait MigrationRepo: BaseRepo { async fn is_continuing_migration(&self) -> Result; - async fn mark_migrated( + async fn mark_migrated( &self, - old_identifier: &I1, - new_identifier: &I2, + old_identifier: &dyn Identifier, + new_identifier: &dyn Identifier, ) -> Result<(), StoreError>; - async fn is_migrated(&self, identifier: &I) -> Result; + async fn is_migrated(&self, identifier: &dyn Identifier) -> Result; async fn clear(&self) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] -impl MigrationRepo for actix_web::web::Data +impl MigrationRepo for Arc where T: MigrationRepo, { @@ -436,15 +432,15 @@ where T::is_continuing_migration(self).await } - async fn mark_migrated( + async fn mark_migrated( &self, - old_identifier: &I1, - new_identifier: &I2, + old_identifier: &dyn Identifier, + new_identifier: &dyn Identifier, ) -> Result<(), StoreError> { T::mark_migrated(self, old_identifier, new_identifier).await } - async fn is_migrated(&self, identifier: &I) -> Result { + async fn is_migrated(&self, identifier: &dyn Identifier) -> Result { T::is_migrated(self, identifier).await } @@ -455,118 +451,99 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { - type Stream: Stream>; - async fn size(&self) -> Result; - async fn hashes(&self) -> Self::Stream; + async fn hashes(&self) -> LocalBoxStream<'static, Result>; - async fn create( + async fn create( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result, StoreError>; - async fn update_identifier( + async fn update_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError>; - async fn identifier( - &self, - hash: Hash, - ) -> Result, StoreError>; + async fn identifier(&self, hash: Hash) -> Result>, RepoError>; - async fn relate_variant_identifier( + async fn relate_variant_identifier( &self, hash: Hash, variant: String, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError>; - async fn variant_identifier( + async fn variant_identifier( &self, hash: Hash, variant: String, - ) -> Result, StoreError>; - async fn variants( - &self, - hash: Hash, - ) -> Result, StoreError>; + ) -> Result>, RepoError>; + async fn variants(&self, hash: Hash) -> Result)>, RepoError>; async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - async fn relate_motion_identifier( + async fn relate_motion_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError>; - async fn motion_identifier( - &self, - hash: Hash, - ) -> Result, StoreError>; + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError>; async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] -impl HashRepo for actix_web::web::Data +impl HashRepo for Arc where T: HashRepo, { - type Stream = T::Stream; - async fn size(&self) -> Result { T::size(self).await } - async fn hashes(&self) -> Self::Stream { + async fn hashes(&self) -> LocalBoxStream<'static, Result> { T::hashes(self).await } - async fn create( + async fn create( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result, StoreError> { T::create(self, hash, identifier).await } - async fn update_identifier( + async fn update_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { T::update_identifier(self, hash, identifier).await } - async fn identifier( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn identifier(&self, hash: Hash) -> Result>, RepoError> { T::identifier(self, hash).await } - async fn relate_variant_identifier( + async fn relate_variant_identifier( &self, hash: Hash, variant: String, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { T::relate_variant_identifier(self, hash, variant, identifier).await } - async fn variant_identifier( + async fn variant_identifier( &self, hash: Hash, variant: String, - ) -> Result, StoreError> { + ) -> Result>, RepoError> { T::variant_identifier(self, hash, variant).await } - async fn variants( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { T::variants(self, hash).await } @@ -574,18 +551,15 @@ where T::remove_variant(self, hash, variant).await } - async fn relate_motion_identifier( + async fn relate_motion_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { T::relate_motion_identifier(self, hash, identifier).await } - async fn motion_identifier( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { T::motion_identifier(self, hash).await } @@ -613,7 +587,7 @@ pub(crate) trait AliasRepo: BaseRepo { } #[async_trait::async_trait(?Send)] -impl AliasRepo for actix_web::web::Data +impl AliasRepo for Arc where T: AliasRepo, { @@ -658,6 +632,12 @@ impl Repo { } } } + + pub(crate) fn to_arc(&self) -> ArcRepo { + match self { + Self::Sled(sled_repo) => Arc::new(sled_repo.clone()), + } + } } impl MaybeUuid { @@ -760,11 +740,11 @@ impl DeleteToken { } } - fn to_bytes(&self) -> Vec { + pub(crate) fn to_bytes(&self) -> Vec { self.id.as_bytes().to_vec() } - fn from_slice(bytes: &[u8]) -> Option { + pub(crate) fn from_slice(bytes: &[u8]) -> Option { if let Ok(s) = std::str::from_utf8(bytes) { Some(DeleteToken::from_existing(s)) } else if bytes.len() == 16 { diff --git a/src/repo/hash.rs b/src/repo/hash.rs index 12c7550..e4501ef 100644 --- a/src/repo/hash.rs +++ b/src/repo/hash.rs @@ -70,7 +70,7 @@ impl Hash { impl std::fmt::Debug for Hash { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("Hash") - .field("hash", &hex::encode(&*self.hash)) + .field("hash", &hex::encode(*self.hash)) .field("format", &self.format) .field("size", &self.size) .finish() diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs new file mode 100644 index 0000000..58f21a9 --- /dev/null +++ b/src/repo/migrate.rs @@ -0,0 +1,180 @@ +use crate::{ + config::Configuration, + details::Details, + error::Error, + repo::{AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, VariantAccessRepo}, + repo_04::{ + AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _, + SledRepo as OldSledRepo, + }, + store::Store, + stream::IntoStreamer, +}; + +pub(crate) async fn migrate_04( + old_repo: OldSledRepo, + new_repo: &ArcRepo, + store: &S, + config: &Configuration, +) -> Result<(), Error> { + tracing::warn!("Running checks"); + if let Err(e) = old_repo.health_check().await { + tracing::warn!("Old repo is not configured correctly"); + return Err(e.into()); + } + if let Err(e) = new_repo.health_check().await { + tracing::warn!("New repo is not configured correctly"); + return Err(e.into()); + } + if let Err(e) = store.health_check().await { + tracing::warn!("Store is not configured correctly"); + return Err(e.into()); + } + + tracing::warn!("Checks complete, migrating repo"); + tracing::warn!("{} hashes will be migrated", old_repo.size().await?); + + let mut hash_stream = old_repo.hashes().await.into_streamer(); + + while let Some(res) = hash_stream.next().await { + if let Ok(hash) = res { + let _ = migrate_hash_04(&old_repo, new_repo, store, config, hash).await; + } else { + tracing::warn!("Failed to read hash, skipping"); + } + } + + if let Some(generator_state) = old_repo.get("last-path").await? { + new_repo + .set("last-path", generator_state.to_vec().into()) + .await?; + } + + Ok(()) +} + +async fn migrate_hash_04( + old_repo: &OldSledRepo, + new_repo: &ArcRepo, + store: &S, + config: &Configuration, + old_hash: sled::IVec, +) -> Result<(), Error> { + let mut hash_failures = 0; + + while let Err(e) = do_migrate_hash_04(old_repo, new_repo, store, config, old_hash.clone()).await + { + hash_failures += 1; + + if hash_failures > 10 { + tracing::error!( + "Failed to migrate hash {}, skipping\n{}", + hex::encode(&old_hash[..]), + format!("{e:?}") + ); + return Err(e); + } else { + tracing::warn!( + "Failed to migrate hash {}, retrying +1", + hex::encode(&old_hash[..]) + ); + } + } + + Ok(()) +} + +async fn do_migrate_hash_04( + old_repo: &OldSledRepo, + new_repo: &ArcRepo, + store: &S, + config: &Configuration, + old_hash: sled::IVec, +) -> Result<(), Error> { + let Some(identifier) = old_repo.identifier::(old_hash.clone()).await? else { + tracing::warn!("Skipping hash {}, no identifier", hex::encode(&old_hash)); + return Ok(()); + }; + + let size = store.len(&identifier).await?; + + let hash_details = details(old_repo, store, config, &identifier).await?; + let aliases = old_repo.for_hash(old_hash.clone()).await?; + let variants = old_repo.variants::(old_hash.clone()).await?; + let motion_identifier = old_repo + .motion_identifier::(old_hash.clone()) + .await?; + + let hash = old_hash[..].try_into().expect("Invalid hash size"); + + let hash = Hash::new( + hash, + size, + hash_details.internal_format().expect("format exists"), + ); + + HashRepo::create(new_repo.as_ref(), hash.clone(), &identifier) + .await? + .expect("not duplicate"); + new_repo.relate_details(&identifier, &hash_details).await?; + + for alias in aliases { + let delete_token = old_repo + .delete_token(&alias) + .await? + .unwrap_or_else(DeleteToken::generate); + + AliasRepo::create(new_repo.as_ref(), &alias, &delete_token, hash.clone()) + .await? + .expect("not duplicate"); + } + + if let Some(motion_identifier) = motion_identifier { + let motion_details = details(old_repo, store, config, &motion_identifier).await?; + + new_repo + .relate_motion_identifier(hash.clone(), &motion_identifier) + .await?; + new_repo + .relate_details(&motion_identifier, &motion_details) + .await?; + } + + for (variant, identifier) in variants { + let variant_details = details(old_repo, store, config, &identifier).await?; + + new_repo + .relate_variant_identifier(hash.clone(), variant.clone(), &identifier) + .await?; + new_repo + .relate_details(&identifier, &variant_details) + .await?; + + VariantAccessRepo::accessed(new_repo.as_ref(), hash.clone(), variant).await?; + } + + Ok(()) +} + +async fn details( + old_repo: &OldSledRepo, + store: &S, + config: &Configuration, + identifier: &S::Identifier, +) -> Result { + let details_opt = old_repo.details(identifier).await?.and_then(|details| { + if details.internal_format().is_some() { + Some(details) + } else { + None + } + }); + + if let Some(details) = details_opt { + Ok(details) + } else { + Details::from_store(store, identifier, config.media.process_timeout) + .await + .map_err(From::from) + } +} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 6033d38..99d8630 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -8,21 +8,19 @@ use crate::{ }, serde_str::Serde, store::StoreError, - stream::from_iterator, + stream::{from_iterator, LocalBoxStream}, }; -use futures_util::{Future, Stream}; use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; use std::{ collections::HashMap, path::PathBuf, - pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, time::Instant, }; -use tokio::{sync::Notify, task::JoinHandle}; +use tokio::sync::Notify; use url::Url; use uuid::Uuid; @@ -142,25 +140,6 @@ impl SledRepo { Ok(db) } - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn mark_accessed(&self) -> Result<(), StoreError> { - use futures_util::StreamExt; - - let mut stream = self.hashes().await; - - while let Some(res) = stream.next().await { - let hash = res?; - - for (variant, _) in self.variants::(hash.clone()).await? { - if !self.contains_variant(hash.clone(), variant.clone()).await? { - VariantAccessRepo::accessed(self, hash.clone(), variant).await?; - } - } - } - - Ok(()) - } - #[tracing::instrument(level = "warn", skip_all)] pub(crate) async fn export(&self) -> Result<(), RepoError> { let path = self @@ -239,104 +218,8 @@ impl ProxyRepo for SledRepo { } } -type IterValue = Option<(sled::Iter, Result)>; - -pub(crate) struct IterStream { - iter: Option, - next: Option>, -} - -impl futures_util::Stream for IterStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - if let Some(ref mut next) = self.next { - let res = std::task::ready!(Pin::new(next).poll(cx)); - - self.next.take(); - - let opt = match res { - Ok(opt) => opt, - Err(_) => return std::task::Poll::Ready(Some(Err(RepoError::Canceled))), - }; - - if let Some((iter, res)) = opt { - self.iter = Some(iter); - - std::task::Poll::Ready(Some(res)) - } else { - std::task::Poll::Ready(None) - } - } else if let Some(mut iter) = self.iter.take() { - self.next = Some(actix_rt::task::spawn_blocking(move || { - let opt = iter - .next() - .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); - - opt.map(|res| (iter, res.map(|(_, value)| value))) - })); - self.poll_next(cx) - } else { - std::task::Poll::Ready(None) - } - } -} - -pub(crate) struct AliasAccessStream { - iter: IterStream, -} - -pub(crate) struct VariantAccessStream { - iter: IterStream, -} - -impl futures_util::Stream for AliasAccessStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { - Some(Ok(bytes)) => { - if let Some(alias) = Alias::from_slice(&bytes) { - std::task::Poll::Ready(Some(Ok(alias))) - } else { - self.poll_next(cx) - } - } - Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), - None => std::task::Poll::Ready(None), - } - } -} - -impl futures_util::Stream for VariantAccessStream { - type Item = Result<(Hash, String), RepoError>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { - Some(Ok(bytes)) => std::task::Poll::Ready(Some( - parse_variant_access_key(bytes) - .map_err(SledError::from) - .map_err(RepoError::from), - )), - Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), - None => std::task::Poll::Ready(None), - } - } -} - #[async_trait::async_trait(?Send)] impl AliasAccessRepo for SledRepo { - type AliasAccessStream = AliasAccessStream; - #[tracing::instrument(level = "debug", skip(self))] async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { let now_string = time::OffsetDateTime::now_utc() @@ -362,24 +245,22 @@ impl AliasAccessRepo for SledRepo { async fn older_aliases( &self, timestamp: time::OffsetDateTime, - ) -> Result { + ) -> Result>, RepoError> { let time_string = timestamp .format(&time::format_description::well_known::Rfc3339) .map_err(SledError::Format)?; - let inverse_alias_access = self.inverse_alias_access.clone(); + let iterator = self + .inverse_alias_access + .range(..=time_string) + .filter_map(|res| { + res.map_err(SledError::from) + .map_err(RepoError::from) + .map(|(_, value)| Alias::from_slice(&value)) + .transpose() + }); - let iter = - actix_rt::task::spawn_blocking(move || inverse_alias_access.range(..=time_string)) - .await - .map_err(|_| RepoError::Canceled)?; - - Ok(AliasAccessStream { - iter: IterStream { - iter: Some(iter), - next: None, - }, - }) + Ok(Box::pin(from_iterator(iterator, 8))) } #[tracing::instrument(level = "debug", skip(self))] @@ -401,8 +282,6 @@ impl AliasAccessRepo for SledRepo { #[async_trait::async_trait(?Send)] impl VariantAccessRepo for SledRepo { - type VariantAccessStream = VariantAccessStream; - #[tracing::instrument(level = "debug", skip(self))] async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let hash = hash.to_bytes(); @@ -441,24 +320,23 @@ impl VariantAccessRepo for SledRepo { async fn older_variants( &self, timestamp: time::OffsetDateTime, - ) -> Result { + ) -> Result>, RepoError> { let time_string = timestamp .format(&time::format_description::well_known::Rfc3339) .map_err(SledError::Format)?; - let inverse_variant_access = self.inverse_variant_access.clone(); + let iterator = self + .inverse_variant_access + .range(..=time_string) + .map(|res| { + let (_, bytes) = res.map_err(SledError::from)?; - let iter = - actix_rt::task::spawn_blocking(move || inverse_variant_access.range(..=time_string)) - .await - .map_err(|_| RepoError::Canceled)?; + parse_variant_access_key(bytes) + .map_err(SledError::from) + .map_err(RepoError::from) + }); - Ok(VariantAccessStream { - iter: IterStream { - iter: Some(iter), - next: None, - }, - }) + Ok(Box::pin(from_iterator(iterator, 8))) } #[tracing::instrument(level = "debug", skip(self))] @@ -973,9 +851,9 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn relate_details( + async fn relate_details( &self, - identifier: &I, + identifier: &dyn Identifier, details: &Details, ) -> Result<(), StoreError> { let key = identifier.to_bytes()?; @@ -992,7 +870,7 @@ impl IdentifierRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn details(&self, identifier: &I) -> Result, StoreError> { + async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError> { let key = identifier.to_bytes()?; let opt = b!(self.identifier_details, identifier_details.get(key)); @@ -1005,7 +883,7 @@ impl IdentifierRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { + async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { let key = identifier.to_bytes()?; b!(self.identifier_details, identifier_details.remove(key)); @@ -1020,10 +898,10 @@ impl MigrationRepo for SledRepo { Ok(!self.migration_identifiers.is_empty()) } - async fn mark_migrated( + async fn mark_migrated( &self, - old_identifier: &I1, - new_identifier: &I2, + old_identifier: &dyn Identifier, + new_identifier: &dyn Identifier, ) -> Result<(), StoreError> { let key = new_identifier.to_bytes()?; let value = old_identifier.to_bytes()?; @@ -1036,7 +914,7 @@ impl MigrationRepo for SledRepo { Ok(()) } - async fn is_migrated(&self, identifier: &I) -> Result { + async fn is_migrated(&self, identifier: &dyn Identifier) -> Result { let key = identifier.to_bytes()?; Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) @@ -1049,13 +927,8 @@ impl MigrationRepo for SledRepo { } } -type StreamItem = Result; -type LocalBoxStream<'a, T> = Pin + 'a>>; - #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { - type Stream = LocalBoxStream<'static, StreamItem>; - async fn size(&self) -> Result { Ok(b!( self.hashes, @@ -1064,7 +937,7 @@ impl HashRepo for SledRepo { )) } - async fn hashes(&self) -> Self::Stream { + async fn hashes(&self) -> LocalBoxStream<'static, Result> { let iter = self.hashes.iter().keys().filter_map(|res| { res.map_err(SledError::from) .map_err(RepoError::from) @@ -1076,10 +949,10 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn create( + async fn create( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result, StoreError> { let identifier: sled::IVec = identifier.to_bytes()?.into(); @@ -1111,10 +984,10 @@ impl HashRepo for SledRepo { } } - async fn update_identifier( + async fn update_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { let identifier = identifier.to_bytes()?; @@ -1129,25 +1002,22 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn identifier( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn identifier(&self, hash: Hash) -> Result>, RepoError> { let hash = hash.to_ivec(); let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { return Ok(None); }; - Ok(Some(I::from_bytes(ivec.to_vec())?)) + Ok(Some(Arc::from(ivec.to_vec()))) } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn relate_variant_identifier( + async fn relate_variant_identifier( &self, hash: Hash, variant: String, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { let hash = hash.to_bytes(); @@ -1163,11 +1033,11 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn variant_identifier( + async fn variant_identifier( &self, hash: Hash, variant: String, - ) -> Result, StoreError> { + ) -> Result>, RepoError> { let hash = hash.to_bytes(); let key = variant_key(&hash, &variant); @@ -1177,14 +1047,11 @@ impl HashRepo for SledRepo { hash_variant_identifiers.get(key) ); - opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() + Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) } #[tracing::instrument(level = "debug", skip(self))] - async fn variants( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { let hash = hash.to_ivec(); let vec = b!( @@ -1193,20 +1060,14 @@ impl HashRepo for SledRepo { .scan_prefix(hash.clone()) .filter_map(|res| res.ok()) .filter_map(|(key, ivec)| { - let identifier = I::from_bytes(ivec.to_vec()).ok(); - if identifier.is_none() { - tracing::warn!( - "Skipping an identifier: {}", - String::from_utf8_lossy(&ivec) - ); - } + let identifier = Arc::from(ivec.to_vec()); let variant = variant_from_key(&hash, &key); if variant.is_none() { tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); } - Some((variant?, identifier?)) + Some((variant?, identifier)) }) .collect::>()) as Result, SledError> ); @@ -1229,10 +1090,10 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn relate_motion_identifier( + async fn relate_motion_identifier( &self, hash: Hash, - identifier: &I, + identifier: &dyn Identifier, ) -> Result<(), StoreError> { let hash = hash.to_ivec(); let bytes = identifier.to_bytes()?; @@ -1246,10 +1107,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn motion_identifier( - &self, - hash: Hash, - ) -> Result, StoreError> { + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { let hash = hash.to_ivec(); let opt = b!( @@ -1257,7 +1115,7 @@ impl HashRepo for SledRepo { hash_motion_identifiers.get(hash) ); - opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() + Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) } #[tracing::instrument(skip(self))] diff --git a/src/repo_04.rs b/src/repo_04.rs index 384c2dd..03160af 100644 --- a/src/repo_04.rs +++ b/src/repo_04.rs @@ -1,131 +1,31 @@ use crate::{ config, details::Details, + repo::{Alias, DeleteToken}, store::{Identifier, StoreError}, }; use futures_util::Stream; use std::fmt::Debug; -use url::Url; -use uuid::Uuid; -pub(crate) mod sled; +pub(crate) use self::sled::SledRepo; -#[derive(Clone, Debug)] -pub(crate) enum Repo { - Sled(self::sled::SledRepo), -} +mod sled; -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -enum MaybeUuid { - Uuid(Uuid), - Name(String), -} +#[tracing::instrument] +pub(crate) fn open(config: &config::Sled) -> color_eyre::Result> { + let config::Sled { + path, + cache_capacity, + export_path: _, + } = config; -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct Alias { - id: MaybeUuid, - extension: Option, -} - -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct DeleteToken { - id: MaybeUuid, -} - -pub(crate) struct HashAlreadyExists; -pub(crate) struct AliasAlreadyExists; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct UploadId { - id: Uuid, -} - -pub(crate) enum UploadResult { - Success { alias: Alias, token: DeleteToken }, - Failure { message: String }, + SledRepo::build(path.clone(), *cache_capacity) } #[derive(Debug, thiserror::Error)] pub(crate) enum RepoError { #[error("Error in sled")] SledError(#[from] self::sled::SledError), - - #[error("Upload was already claimed")] - AlreadyClaimed, - - #[error("Panic in blocking operation")] - Canceled, -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait FullRepo: - UploadRepo - + SettingsRepo - + IdentifierRepo - + AliasRepo - + QueueRepo - + HashRepo - + MigrationRepo - + AliasAccessRepo - + VariantAccessRepo - + ProxyRepo - + Send - + Sync - + Clone - + Debug -{ - async fn health_check(&self) -> Result<(), RepoError>; - - #[tracing::instrument(skip(self))] - async fn identifier_from_alias( - &self, - alias: &Alias, - ) -> Result, StoreError> { - let Some(hash) = self.hash(alias).await? else { - return Ok(None); - }; - - self.identifier(hash).await - } - - #[tracing::instrument(skip(self))] - async fn aliases_from_alias(&self, alias: &Alias) -> Result, RepoError> { - let Some(hash) = self.hash(alias).await? else { - return Ok(vec![]); - }; - - self.for_hash(hash).await - } - - #[tracing::instrument(skip(self))] - async fn still_identifier_from_alias( - &self, - alias: &Alias, - ) -> Result, StoreError> { - let Some(hash) = self.hash(alias).await? else { - return Ok(None); - }; - - let Some(identifier) = self.identifier::(hash.clone()).await? else { - return Ok(None); - }; - - match self.details(&identifier).await? { - Some(details) if details.is_video() => self.motion_identifier::(hash).await, - Some(_) => Ok(Some(identifier)), - None => Ok(None), - } - } -} - -#[async_trait::async_trait(?Send)] -impl FullRepo for actix_web::web::Data -where - T: FullRepo, -{ - async fn health_check(&self) -> Result<(), RepoError> { - T::health_check(self).await - } } pub(crate) trait BaseRepo { @@ -139,276 +39,14 @@ where type Bytes = T::Bytes; } -#[async_trait::async_trait(?Send)] -pub(crate) trait ProxyRepo: BaseRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>; - - async fn related(&self, url: Url) -> Result, RepoError>; - - async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl ProxyRepo for actix_web::web::Data -where - T: ProxyRepo, -{ - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { - T::relate_url(self, url, alias).await - } - - async fn related(&self, url: Url) -> Result, RepoError> { - T::related(self, url).await - } - - async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { - T::remove_relation(self, alias).await - } -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait AliasAccessRepo: BaseRepo { - type AliasAccessStream: Stream>; - - async fn accessed(&self, alias: Alias) -> Result<(), RepoError>; - - async fn older_aliases( - &self, - timestamp: time::OffsetDateTime, - ) -> Result; - - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl AliasAccessRepo for actix_web::web::Data -where - T: AliasAccessRepo, -{ - type AliasAccessStream = T::AliasAccessStream; - - async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { - T::accessed(self, alias).await - } - - async fn older_aliases( - &self, - timestamp: time::OffsetDateTime, - ) -> Result { - T::older_aliases(self, timestamp).await - } - - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { - T::remove_access(self, alias).await - } -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait VariantAccessRepo: BaseRepo { - type VariantAccessStream: Stream>; - - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; - - async fn contains_variant(&self, hash: Self::Bytes, variant: String) - -> Result; - - async fn older_variants( - &self, - timestamp: time::OffsetDateTime, - ) -> Result; - - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl VariantAccessRepo for actix_web::web::Data -where - T: VariantAccessRepo, -{ - type VariantAccessStream = T::VariantAccessStream; - - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - T::accessed(self, hash, variant).await - } - - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { - T::contains_variant(self, hash, variant).await - } - - async fn older_variants( - &self, - timestamp: time::OffsetDateTime, - ) -> Result { - T::older_variants(self, timestamp).await - } - - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - T::remove_access(self, hash, variant).await - } -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait UploadRepo: BaseRepo { - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>; - - async fn wait(&self, upload_id: UploadId) -> Result; - - async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>; - - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl UploadRepo for actix_web::web::Data -where - T: UploadRepo, -{ - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { - T::create(self, upload_id).await - } - - async fn wait(&self, upload_id: UploadId) -> Result { - T::wait(self, upload_id).await - } - - async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { - T::claim(self, upload_id).await - } - - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { - T::complete(self, upload_id, result).await - } -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait QueueRepo: BaseRepo { - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError>; - - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>; - - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; -} - -#[async_trait::async_trait(?Send)] -impl QueueRepo for actix_web::web::Data -where - T: QueueRepo, -{ - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { - T::requeue_in_progress(self, worker_prefix).await - } - - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> { - T::push(self, queue, job).await - } - - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result { - T::pop(self, queue, worker_id).await - } -} - #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo: BaseRepo { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>; async fn get(&self, key: &'static str) -> Result, RepoError>; - async fn remove(&self, key: &'static str) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl SettingsRepo for actix_web::web::Data -where - T: SettingsRepo, -{ - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { - T::set(self, key, value).await - } - - async fn get(&self, key: &'static str) -> Result, RepoError> { - T::get(self, key).await - } - - async fn remove(&self, key: &'static str) -> Result<(), RepoError> { - T::remove(self, key).await - } } #[async_trait::async_trait(?Send)] pub(crate) trait IdentifierRepo: BaseRepo { - async fn relate_details( - &self, - identifier: &I, - details: &Details, - ) -> Result<(), StoreError>; async fn details(&self, identifier: &I) -> Result, StoreError>; - - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError>; -} - -#[async_trait::async_trait(?Send)] -impl IdentifierRepo for actix_web::web::Data -where - T: IdentifierRepo, -{ - async fn relate_details( - &self, - identifier: &I, - details: &Details, - ) -> Result<(), StoreError> { - T::relate_details(self, identifier, details).await - } - - async fn details(&self, identifier: &I) -> Result, StoreError> { - T::details(self, identifier).await - } - - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { - T::cleanup(self, identifier).await - } -} - -#[async_trait::async_trait(?Send)] -pub(crate) trait MigrationRepo: BaseRepo { - async fn is_continuing_migration(&self) -> Result; - - async fn mark_migrated( - &self, - old_identifier: &I1, - new_identifier: &I2, - ) -> Result<(), StoreError>; - - async fn is_migrated(&self, identifier: &I) -> Result; - - async fn clear(&self) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl MigrationRepo for actix_web::web::Data -where - T: MigrationRepo, -{ - async fn is_continuing_migration(&self) -> Result { - T::is_continuing_migration(self).await - } - - async fn mark_migrated( - &self, - old_identifier: &I1, - new_identifier: &I2, - ) -> Result<(), StoreError> { - T::mark_migrated(self, old_identifier, new_identifier).await - } - - async fn is_migrated(&self, identifier: &I) -> Result { - T::is_migrated(self, identifier).await - } - - async fn clear(&self) -> Result<(), RepoError> { - T::clear(self).await - } } #[async_trait::async_trait(?Send)] @@ -419,601 +57,25 @@ pub(crate) trait HashRepo: BaseRepo { async fn hashes(&self) -> Self::Stream; - async fn create( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result, StoreError>; - - async fn update_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError>; - async fn identifier( &self, hash: Self::Bytes, ) -> Result, StoreError>; - async fn relate_variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - identifier: &I, - ) -> Result<(), StoreError>; - async fn variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result, StoreError>; async fn variants( &self, hash: Self::Bytes, ) -> Result, StoreError>; - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; - async fn relate_motion_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError>; async fn motion_identifier( &self, hash: Self::Bytes, ) -> Result, StoreError>; - - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl HashRepo for actix_web::web::Data -where - T: HashRepo, -{ - type Stream = T::Stream; - - async fn size(&self) -> Result { - T::size(self).await - } - - async fn hashes(&self) -> Self::Stream { - T::hashes(self).await - } - - async fn create( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result, StoreError> { - T::create(self, hash, identifier).await - } - - async fn update_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError> { - T::update_identifier(self, hash, identifier).await - } - - async fn identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { - T::identifier(self, hash).await - } - - async fn relate_variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - identifier: &I, - ) -> Result<(), StoreError> { - T::relate_variant_identifier(self, hash, variant, identifier).await - } - - async fn variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result, StoreError> { - T::variant_identifier(self, hash, variant).await - } - - async fn variants( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { - T::variants(self, hash).await - } - - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - T::remove_variant(self, hash, variant).await - } - - async fn relate_motion_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError> { - T::relate_motion_identifier(self, hash, identifier).await - } - - async fn motion_identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { - T::motion_identifier(self, hash).await - } - - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { - T::cleanup(self, hash).await - } } #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo: BaseRepo { - async fn create( - &self, - alias: &Alias, - delete_token: &DeleteToken, - hash: Self::Bytes, - ) -> Result, RepoError>; - async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; - async fn hash(&self, alias: &Alias) -> Result, RepoError>; - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; - - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; -} - -#[async_trait::async_trait(?Send)] -impl AliasRepo for actix_web::web::Data -where - T: AliasRepo, -{ - async fn create( - &self, - alias: &Alias, - delete_token: &DeleteToken, - hash: Self::Bytes, - ) -> Result, RepoError> { - T::create(self, alias, delete_token, hash).await - } - - async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { - T::delete_token(self, alias).await - } - - async fn hash(&self, alias: &Alias) -> Result, RepoError> { - T::hash(self, alias).await - } - - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { - T::for_hash(self, hash).await - } - - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { - T::cleanup(self, alias).await - } -} - -impl Repo { - #[tracing::instrument] - pub(crate) fn open(config: config::Repo) -> color_eyre::Result { - match config { - config::Repo::Sled(config::Sled { - path, - cache_capacity, - export_path, - }) => { - let repo = self::sled::SledRepo::build(path, cache_capacity, export_path)?; - - Ok(Self::Sled(repo)) - } - } - } -} - -impl MaybeUuid { - fn from_str(s: &str) -> Self { - if let Ok(uuid) = Uuid::parse_str(s) { - MaybeUuid::Uuid(uuid) - } else { - MaybeUuid::Name(s.into()) - } - } - - fn as_bytes(&self) -> &[u8] { - match self { - Self::Uuid(uuid) => &uuid.as_bytes()[..], - Self::Name(name) => name.as_bytes(), - } - } -} - -fn split_at_dot(s: &str) -> Option<(&str, &str)> { - let index = s.find('.')?; - - Some(s.split_at(index)) -} - -impl Alias { - pub(crate) fn generate(extension: String) -> Self { - Alias { - id: MaybeUuid::Uuid(Uuid::new_v4()), - extension: Some(extension), - } - } - - pub(crate) fn from_existing(alias: &str) -> Self { - if let Some((start, end)) = split_at_dot(alias) { - Alias { - id: MaybeUuid::from_str(start), - extension: Some(end.into()), - } - } else { - Alias { - id: MaybeUuid::from_str(alias), - extension: None, - } - } - } - - pub(crate) fn extension(&self) -> Option<&str> { - self.extension.as_deref() - } - - pub(crate) fn to_bytes(&self) -> Vec { - let mut v = self.id.as_bytes().to_vec(); - - if let Some(ext) = self.extension() { - v.extend_from_slice(ext.as_bytes()); - } - - v - } - - pub(crate) fn from_slice(bytes: &[u8]) -> Option { - if let Ok(s) = std::str::from_utf8(bytes) { - Some(Self::from_existing(s)) - } else if bytes.len() >= 16 { - let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); - - let extension = if bytes.len() > 16 { - Some(String::from_utf8_lossy(&bytes[16..]).to_string()) - } else { - None - }; - - Some(Self { - id: MaybeUuid::Uuid(id), - extension, - }) - } else { - None - } - } -} - -impl DeleteToken { - pub(crate) fn from_existing(existing: &str) -> Self { - if let Ok(uuid) = Uuid::parse_str(existing) { - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - } else { - DeleteToken { - id: MaybeUuid::Name(existing.into()), - } - } - } - - pub(crate) fn generate() -> Self { - Self { - id: MaybeUuid::Uuid(Uuid::new_v4()), - } - } - - fn to_bytes(&self) -> Vec { - self.id.as_bytes().to_vec() - } - - fn from_slice(bytes: &[u8]) -> Option { - if let Ok(s) = std::str::from_utf8(bytes) { - Some(DeleteToken::from_existing(s)) - } else if bytes.len() == 16 { - Some(DeleteToken { - id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?), - }) - } else { - None - } - } -} - -impl UploadId { - pub(crate) fn generate() -> Self { - Self { id: Uuid::new_v4() } - } - - pub(crate) fn as_bytes(&self) -> &[u8] { - &self.id.as_bytes()[..] - } -} - -impl std::str::FromStr for UploadId { - type Err = ::Err; - - fn from_str(s: &str) -> Result { - Ok(UploadId { id: s.parse()? }) - } -} - -impl std::fmt::Display for UploadId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.id, f) - } -} - -impl std::fmt::Display for MaybeUuid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Uuid(id) => write!(f, "{id}"), - Self::Name(name) => write!(f, "{name}"), - } - } -} - -impl std::str::FromStr for DeleteToken { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - Ok(DeleteToken::from_existing(s)) - } -} - -impl std::fmt::Display for DeleteToken { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.id) - } -} - -impl std::str::FromStr for Alias { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - Ok(Alias::from_existing(s)) - } -} - -impl std::fmt::Display for Alias { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(ext) = self.extension() { - write!(f, "{}{ext}", self.id) - } else { - write!(f, "{}", self.id) - } - } -} - -#[cfg(test)] -mod tests { - use super::{Alias, DeleteToken, MaybeUuid, Uuid}; - - #[test] - fn string_delete_token() { - let delete_token = DeleteToken::from_existing("blah"); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Name(String::from("blah")) - } - ) - } - - #[test] - fn uuid_string_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_existing(&uuid.to_string()); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn bytes_delete_token() { - let delete_token = DeleteToken::from_slice(b"blah").unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Name(String::from("blah")) - } - ) - } - - #[test] - fn uuid_bytes_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn uuid_bytes_string_delete_token() { - let uuid = Uuid::new_v4(); - - let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap(); - - assert_eq!( - delete_token, - DeleteToken { - id: MaybeUuid::Uuid(uuid), - } - ) - } - - #[test] - fn string_alias() { - let alias = Alias::from_existing("blah"); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: None - } - ); - } - - #[test] - fn string_alias_ext() { - let alias = Alias::from_existing("blah.mp4"); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: Some(String::from(".mp4")), - } - ); - } - - #[test] - fn uuid_string_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_existing(&uuid.to_string()); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_string_alias_ext() { - let uuid = Uuid::new_v4(); - - let alias_str = format!("{uuid}.mp4"); - let alias = Alias::from_existing(&alias_str); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } - - #[test] - fn bytes_alias() { - let alias = Alias::from_slice(b"blah").unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: None - } - ); - } - - #[test] - fn bytes_alias_ext() { - let alias = Alias::from_slice(b"blah.mp4").unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Name(String::from("blah")), - extension: Some(String::from(".mp4")), - } - ); - } - - #[test] - fn uuid_bytes_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_bytes_string_alias() { - let uuid = Uuid::new_v4(); - - let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: None, - } - ) - } - - #[test] - fn uuid_bytes_alias_ext() { - let uuid = Uuid::new_v4(); - - let mut alias_bytes = uuid.as_bytes().to_vec(); - alias_bytes.extend_from_slice(b".mp4"); - - let alias = Alias::from_slice(&alias_bytes).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } - - #[test] - fn uuid_bytes_string_alias_ext() { - let uuid = Uuid::new_v4(); - - let alias_str = format!("{uuid}.mp4"); - let alias = Alias::from_slice(alias_str.as_bytes()).unwrap(); - - assert_eq!( - alias, - Alias { - id: MaybeUuid::Uuid(uuid), - extension: Some(String::from(".mp4")), - } - ) - } } diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs index 64cc817..b025b24 100644 --- a/src/repo_04/sled.rs +++ b/src/repo_04/sled.rs @@ -1,30 +1,19 @@ use crate::{ - details::MaybeHumanDate, repo_04::{ - Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, - HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, QueueRepo, - SettingsRepo, UploadId, UploadRepo, UploadResult, + Alias, AliasRepo, BaseRepo, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo, + RepoError, SettingsRepo, }, - serde_str::Serde, store::StoreError, - stream::from_iterator, + stream::{from_iterator, LocalBoxStream}, }; -use futures_util::{Future, Stream}; -use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; +use sled::{Db, IVec, Tree}; use std::{ - collections::HashMap, path::PathBuf, - pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, - Arc, RwLock, + Arc, }, - time::Instant, }; -use tokio::{sync::Notify, task::JoinHandle}; -use url::Url; - -use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo}; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ @@ -49,12 +38,6 @@ pub(crate) enum SledError { #[error("Invalid details json")] Details(#[from] serde_json::Error), - #[error("Error formatting timestamp")] - Format(#[source] time::error::Format), - - #[error("Error parsing variant key")] - VariantKey(#[from] VariantKeyError), - #[error("Operation panicked")] Panic, } @@ -70,35 +53,18 @@ pub(crate) struct SledRepo { hash_identifiers: Tree, hash_variant_identifiers: Tree, hash_motion_identifiers: Tree, - aliases: Tree, - alias_hashes: Tree, alias_delete_tokens: Tree, - queue: Tree, - alias_access: Tree, - inverse_alias_access: Tree, - variant_access: Tree, - inverse_variant_access: Tree, - proxy: Tree, - inverse_proxy: Tree, - in_progress_queue: Tree, - queue_notifier: Arc>>>, - uploads: Tree, - migration_identifiers: Tree, - cache_capacity: u64, - export_path: PathBuf, - db: Db, + _db: Db, } impl SledRepo { #[tracing::instrument] - pub(crate) fn build( - path: PathBuf, - cache_capacity: u64, - export_path: PathBuf, - ) -> color_eyre::Result { - let db = Self::open(path, cache_capacity)?; + pub(crate) fn build(path: PathBuf, cache_capacity: u64) -> color_eyre::Result> { + let Some(db) = Self::open(path, cache_capacity)? else { + return Ok(None); + }; - Ok(SledRepo { + Ok(Some(SledRepo { healthz_count: Arc::new(AtomicU64::new(0)), healthz: db.open_tree("pict-rs-healthz-tree")?, settings: db.open_tree("pict-rs-settings-tree")?, @@ -108,84 +74,29 @@ impl SledRepo { hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?, hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?, - aliases: db.open_tree("pict-rs-aliases-tree")?, - alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, - queue: db.open_tree("pict-rs-queue-tree")?, - alias_access: db.open_tree("pict-rs-alias-access-tree")?, - inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, - variant_access: db.open_tree("pict-rs-variant-access-tree")?, - inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?, - proxy: db.open_tree("pict-rs-proxy-tree")?, - inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?, - in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, - queue_notifier: Arc::new(RwLock::new(HashMap::new())), - uploads: db.open_tree("pict-rs-uploads-tree")?, - migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, - cache_capacity, - export_path, - db, - }) + _db: db, + })) } - fn open(mut path: PathBuf, cache_capacity: u64) -> Result { + fn open(mut path: PathBuf, cache_capacity: u64) -> Result, SledError> { path.push("v0.4.0-alpha.1"); + if let Err(e) = std::fs::metadata(&path) { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(None); + } + } + let db = ::sled::Config::new() .cache_capacity(cache_capacity) .path(path) .open()?; - Ok(db) + Ok(Some(db)) } - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn mark_accessed(&self) -> Result<(), StoreError> { - use futures_util::StreamExt; - - let mut stream = self.hashes().await; - - while let Some(res) = stream.next().await { - let hash = res?; - - for (variant, _) in self.variants::(hash.clone()).await? { - if !self.contains_variant(hash.clone(), variant.clone()).await? { - VariantAccessRepo::accessed(self, hash.clone(), variant).await?; - } - } - } - - Ok(()) - } - - #[tracing::instrument(level = "warn", skip_all)] - pub(crate) async fn export(&self) -> Result<(), RepoError> { - let path = self - .export_path - .join(MaybeHumanDate::HumanDate(time::OffsetDateTime::now_utc()).to_string()); - - let export_db = Self::open(path, self.cache_capacity)?; - - let this = self.db.clone(); - - actix_rt::task::spawn_blocking(move || { - let export = this.export(); - export_db.import(export); - }) - .await - .map_err(SledError::from)?; - - Ok(()) - } -} - -impl BaseRepo for SledRepo { - type Bytes = IVec; -} - -#[async_trait::async_trait(?Send)] -impl FullRepo for SledRepo { - async fn health_check(&self) -> Result<(), RepoError> { + pub(crate) async fn health_check(&self) -> Result<(), RepoError> { let next = self.healthz_count.fetch_add(1, Ordering::Relaxed); b!(self.healthz, { healthz.insert("healthz", &next.to_be_bytes()[..]) @@ -196,656 +107,18 @@ impl FullRepo for SledRepo { } } -#[async_trait::async_trait(?Send)] -impl ProxyRepo for SledRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { - let proxy = self.proxy.clone(); - let inverse_proxy = self.inverse_proxy.clone(); - - actix_web::web::block(move || { - proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?; - inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; - - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)??; - - Ok(()) - } - - async fn related(&self, url: Url) -> Result, RepoError> { - let opt = b!(self.proxy, proxy.get(url.as_str().as_bytes())); - - Ok(opt.and_then(|ivec| Alias::from_slice(&ivec))) - } - - async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { - let proxy = self.proxy.clone(); - let inverse_proxy = self.inverse_proxy.clone(); - - actix_web::web::block(move || { - if let Some(url) = inverse_proxy.remove(alias.to_bytes())? { - proxy.remove(url)?; - } - - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)??; - - Ok(()) - } -} - -type IterValue = Option<(sled::Iter, Result)>; - -pub(crate) struct IterStream { - iter: Option, - next: Option>, -} - -impl futures_util::Stream for IterStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - if let Some(ref mut next) = self.next { - let res = std::task::ready!(Pin::new(next).poll(cx)); - - self.next.take(); - - let opt = match res { - Ok(opt) => opt, - Err(_) => return std::task::Poll::Ready(Some(Err(RepoError::Canceled))), - }; - - if let Some((iter, res)) = opt { - self.iter = Some(iter); - - std::task::Poll::Ready(Some(res)) - } else { - std::task::Poll::Ready(None) - } - } else if let Some(mut iter) = self.iter.take() { - self.next = Some(tokio::task::spawn_blocking(move || { - let opt = iter - .next() - .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); - - opt.map(|res| (iter, res.map(|(_, value)| value))) - })); - self.poll_next(cx) - } else { - std::task::Poll::Ready(None) - } - } -} - -pub(crate) struct AliasAccessStream { - iter: IterStream, -} - -pub(crate) struct VariantAccessStream { - iter: IterStream, -} - -impl futures_util::Stream for AliasAccessStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { - Some(Ok(bytes)) => { - if let Some(alias) = Alias::from_slice(&bytes) { - std::task::Poll::Ready(Some(Ok(alias))) - } else { - self.poll_next(cx) - } - } - Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), - None => std::task::Poll::Ready(None), - } - } -} - -impl futures_util::Stream for VariantAccessStream { - type Item = Result<(IVec, String), RepoError>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) { - Some(Ok(bytes)) => std::task::Poll::Ready(Some( - parse_variant_access_key(bytes) - .map_err(SledError::from) - .map_err(RepoError::from), - )), - Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), - None => std::task::Poll::Ready(None), - } - } -} - -#[async_trait::async_trait(?Send)] -impl AliasAccessRepo for SledRepo { - type AliasAccessStream = AliasAccessStream; - - #[tracing::instrument(level = "debug", skip(self))] - async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { - let now_string = time::OffsetDateTime::now_utc() - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; - - let alias_access = self.alias_access.clone(); - let inverse_alias_access = self.inverse_alias_access.clone(); - - actix_rt::task::spawn_blocking(move || { - if let Some(old) = alias_access.insert(alias.to_bytes(), now_string.as_bytes())? { - inverse_alias_access.remove(old)?; - } - inverse_alias_access.insert(now_string, alias.to_bytes())?; - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn older_aliases( - &self, - timestamp: time::OffsetDateTime, - ) -> Result { - let time_string = timestamp - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; - - let inverse_alias_access = self.inverse_alias_access.clone(); - - let iter = - actix_rt::task::spawn_blocking(move || inverse_alias_access.range(..=time_string)) - .await - .map_err(|_| RepoError::Canceled)?; - - Ok(AliasAccessStream { - iter: IterStream { - iter: Some(iter), - next: None, - }, - }) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn remove_access(&self, alias: Alias) -> Result<(), RepoError> { - let alias_access = self.alias_access.clone(); - let inverse_alias_access = self.inverse_alias_access.clone(); - - actix_rt::task::spawn_blocking(move || { - if let Some(old) = alias_access.remove(alias.to_bytes())? { - inverse_alias_access.remove(old)?; - } - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) - } -} - -#[async_trait::async_trait(?Send)] -impl VariantAccessRepo for SledRepo { - type VariantAccessStream = VariantAccessStream; - - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - let key = variant_access_key(&hash, &variant); - - let now_string = time::OffsetDateTime::now_utc() - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; - - let variant_access = self.variant_access.clone(); - let inverse_variant_access = self.inverse_variant_access.clone(); - - actix_rt::task::spawn_blocking(move || { - if let Some(old) = variant_access.insert(&key, now_string.as_bytes())? { - inverse_variant_access.remove(old)?; - } - inverse_variant_access.insert(now_string, key)?; - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) - } - - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { - let key = variant_access_key(&hash, &variant); - - let timestamp = b!(self.variant_access, variant_access.get(key)); - - Ok(timestamp.is_some()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn older_variants( - &self, - timestamp: time::OffsetDateTime, - ) -> Result { - let time_string = timestamp - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; - - let inverse_variant_access = self.inverse_variant_access.clone(); - - let iter = - actix_rt::task::spawn_blocking(move || inverse_variant_access.range(..=time_string)) - .await - .map_err(|_| RepoError::Canceled)?; - - Ok(VariantAccessStream { - iter: IterStream { - iter: Some(iter), - next: None, - }, - }) - } - - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - let key = variant_access_key(&hash, &variant); - - let variant_access = self.variant_access.clone(); - let inverse_variant_access = self.inverse_variant_access.clone(); - - actix_rt::task::spawn_blocking(move || { - if let Some(old) = variant_access.remove(key)? { - inverse_variant_access.remove(old)?; - } - Ok(()) as Result<(), SledError> - }) - .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) - } -} - -#[derive(serde::Deserialize, serde::Serialize)] -enum InnerUploadResult { - Success { - alias: Serde, - token: Serde, - }, - Failure { - message: String, - }, -} - -impl From for InnerUploadResult { - fn from(u: UploadResult) -> Self { - match u { - UploadResult::Success { alias, token } => InnerUploadResult::Success { - alias: Serde::new(alias), - token: Serde::new(token), - }, - UploadResult::Failure { message } => InnerUploadResult::Failure { message }, - } - } -} - -impl From for UploadResult { - fn from(i: InnerUploadResult) -> Self { - match i { - InnerUploadResult::Success { alias, token } => UploadResult::Success { - alias: Serde::into_inner(alias), - token: Serde::into_inner(token), - }, - InnerUploadResult::Failure { message } => UploadResult::Failure { message }, - } - } -} - -struct PushMetricsGuard { - queue: &'static str, - armed: bool, -} - -struct PopMetricsGuard { - queue: &'static str, - start: Instant, - armed: bool, -} - -impl PushMetricsGuard { - fn guard(queue: &'static str) -> Self { - Self { queue, armed: true } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl PopMetricsGuard { - fn guard(queue: &'static str) -> Self { - Self { - queue, - start: Instant::now(), - armed: true, - } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl Drop for PushMetricsGuard { - fn drop(&mut self) { - metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue); - } -} - -impl Drop for PopMetricsGuard { - fn drop(&mut self) { - metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue); - metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue); - } -} - -#[async_trait::async_trait(?Send)] -impl UploadRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { - b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn wait(&self, upload_id: UploadId) -> Result { - let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); - - let bytes = upload_id.as_bytes().to_vec(); - let opt = b!(self.uploads, uploads.get(bytes)); - - if let Some(bytes) = opt { - if bytes != b"1" { - let result: InnerUploadResult = - serde_json::from_slice(&bytes).map_err(SledError::from)?; - return Ok(result.into()); - } - } else { - return Err(RepoError::AlreadyClaimed); - } - - while let Some(event) = (&mut subscriber).await { - match event { - sled::Event::Remove { .. } => { - return Err(RepoError::AlreadyClaimed); - } - sled::Event::Insert { value, .. } => { - if value != b"1" { - let result: InnerUploadResult = - serde_json::from_slice(&value).map_err(SledError::from)?; - return Ok(result.into()); - } - } - } - } - - Err(RepoError::Canceled) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { - b!(self.uploads, uploads.remove(upload_id.as_bytes())); - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, result))] - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { - let result: InnerUploadResult = result.into(); - let result = serde_json::to_vec(&result).map_err(SledError::from)?; - - b!(self.uploads, uploads.insert(upload_id.as_bytes(), result)); - - Ok(()) - } -} - -#[async_trait::async_trait(?Send)] -impl QueueRepo for SledRepo { - #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { - let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, { - let vec = in_progress_queue - .scan_prefix(worker_prefix) - .values() - .filter_map(Result::ok) - .filter_map(|ivec| { - let index = ivec.as_ref().iter().enumerate().find_map(|(index, byte)| { - if *byte == 0 { - Some(index) - } else { - None - } - })?; - - let (queue, job) = ivec.split_at(index); - if queue.is_empty() || job.len() <= 1 { - return None; - } - let job = &job[1..]; - - Some((String::from_utf8_lossy(queue).to_string(), IVec::from(job))) - }) - .collect::>(); - - Ok(vec) as Result<_, SledError> - }); - - let db = self.db.clone(); - b!(self.queue, { - for (queue_name, job) in vec { - let id = db.generate_id()?; - let mut key = queue_name.as_bytes().to_vec(); - key.extend(id.to_be_bytes()); - - queue.insert(key, job)?; - } - - Ok(()) as Result<(), SledError> - }); - - Ok(()) - } - - #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] - async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { - let metrics_guard = PushMetricsGuard::guard(queue_name); - - let id = self.db.generate_id().map_err(SledError::from)?; - let mut key = queue_name.as_bytes().to_vec(); - key.extend(id.to_be_bytes()); - - b!(self.queue, queue.insert(key, job)); - - if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { - notifier.notify_one(); - metrics_guard.disarm(); - return Ok(()); - } - - self.queue_notifier - .write() - .unwrap() - .entry(queue_name) - .or_insert_with(|| Arc::new(Notify::new())) - .notify_one(); - - metrics_guard.disarm(); - - Ok(()) - } - - #[tracing::instrument(skip(self, worker_id), fields(worker_id = %String::from_utf8_lossy(&worker_id)))] - async fn pop( - &self, - queue_name: &'static str, - worker_id: Vec, - ) -> Result { - let metrics_guard = PopMetricsGuard::guard(queue_name); - - loop { - let in_progress_queue = self.in_progress_queue.clone(); - - let worker_id = worker_id.clone(); - let job = b!(self.queue, { - in_progress_queue.remove(&worker_id)?; - in_progress_queue.flush()?; - - while let Some((key, job)) = queue - .scan_prefix(queue_name.as_bytes()) - .find_map(Result::ok) - { - let mut in_progress_value = queue_name.as_bytes().to_vec(); - in_progress_value.push(0); - in_progress_value.extend_from_slice(&job); - - in_progress_queue.insert(&worker_id, in_progress_value)?; - - if queue.remove(key)?.is_some() { - return Ok(Some(job)); - } - - in_progress_queue.remove(&worker_id)?; - } - - Ok(None) as Result<_, SledError> - }); - - if let Some(job) = job { - metrics_guard.disarm(); - return Ok(job); - } - - let opt = self - .queue_notifier - .read() - .unwrap() - .get(&queue_name) - .map(Arc::clone); - - let notify = if let Some(notify) = opt { - notify - } else { - let mut guard = self.queue_notifier.write().unwrap(); - let entry = guard - .entry(queue_name) - .or_insert_with(|| Arc::new(Notify::new())); - Arc::clone(entry) - }; - - notify.notified().await - } - } +impl BaseRepo for SledRepo { + type Bytes = IVec; } #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(value))] - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { - b!(self.settings, settings.insert(key, value)); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self))] async fn get(&self, key: &'static str) -> Result, RepoError> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } - - #[tracing::instrument(level = "trace", skip(self))] - async fn remove(&self, key: &'static str) -> Result<(), RepoError> { - b!(self.settings, settings.remove(key)); - - Ok(()) - } -} - -fn variant_access_key(hash: &[u8], variant: &str) -> Vec { - let variant = variant.as_bytes(); - - let hash_len: u64 = u64::try_from(hash.len()).expect("Length is reasonable"); - - let mut out = Vec::with_capacity(8 + hash.len() + variant.len()); - - let hash_length_bytes: [u8; 8] = hash_len.to_be_bytes(); - out.extend(hash_length_bytes); - out.extend(hash); - out.extend(variant); - out -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum VariantKeyError { - #[error("Bytes too short to be VariantAccessKey")] - TooShort, - - #[error("Prefix Length is longer than backing bytes")] - InvalidLength, - - #[error("Invalid utf8 in Variant")] - Utf8, -} - -fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyError> { - if bytes.len() < 8 { - return Err(VariantKeyError::TooShort); - } - - let hash_len = u64::from_be_bytes(bytes[..8].try_into().expect("Verified length")); - let hash_len: usize = usize::try_from(hash_len).expect("Length is reasonable"); - - if (hash_len + 8) > bytes.len() { - return Err(VariantKeyError::InvalidLength); - } - - let hash = bytes.subslice(8, hash_len); - - let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len); - - if variant_len == 0 { - return Ok((hash, String::new())); - } - - let variant_start = 8 + hash_len; - - let variant = std::str::from_utf8(&bytes[variant_start..]) - .map_err(|_| VariantKeyError::Utf8)? - .to_string(); - - Ok((hash, variant)) -} - -fn variant_key(hash: &[u8], variant: &str) -> Vec { - let mut bytes = hash.to_vec(); - bytes.push(b'/'); - bytes.extend_from_slice(variant.as_bytes()); - bytes } fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { @@ -856,25 +129,6 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn relate_details( - &self, - identifier: &I, - details: &Details, - ) -> Result<(), StoreError> { - let key = identifier.to_bytes()?; - let details = serde_json::to_vec(&details) - .map_err(SledError::from) - .map_err(RepoError::from)?; - - b!( - self.identifier_details, - identifier_details.insert(key, details) - ); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn details(&self, identifier: &I) -> Result, StoreError> { let key = identifier.to_bytes()?; @@ -887,54 +141,9 @@ impl IdentifierRepo for SledRepo { .map_err(RepoError::from) .map_err(StoreError::from) } - - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { - let key = identifier.to_bytes()?; - - b!(self.identifier_details, identifier_details.remove(key)); - - Ok(()) - } -} - -#[async_trait::async_trait(?Send)] -impl MigrationRepo for SledRepo { - async fn is_continuing_migration(&self) -> Result { - Ok(!self.migration_identifiers.is_empty()) - } - - async fn mark_migrated( - &self, - old_identifier: &I1, - new_identifier: &I2, - ) -> Result<(), StoreError> { - let key = new_identifier.to_bytes()?; - let value = old_identifier.to_bytes()?; - - b!( - self.migration_identifiers, - migration_identifiers.insert(key, value) - ); - - Ok(()) - } - - async fn is_migrated(&self, identifier: &I) -> Result { - let key = identifier.to_bytes()?; - - Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) - } - - async fn clear(&self) -> Result<(), RepoError> { - b!(self.migration_identifiers, migration_identifiers.clear()); - - Ok(()) - } } type StreamItem = Result; -type LocalBoxStream<'a, T> = Pin + 'a>>; #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { @@ -958,55 +167,6 @@ impl HashRepo for SledRepo { Box::pin(from_iterator(iter, 8)) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn create( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result, StoreError> { - let identifier: sled::IVec = identifier.to_bytes()?.into(); - - let hashes = self.hashes.clone(); - let hash_identifiers = self.hash_identifiers.clone(); - - let res = actix_web::web::block(move || { - (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { - if hashes.get(&hash)?.is_some() { - return Ok(Err(HashAlreadyExists)); - } - - hashes.insert(&hash, &hash)?; - hash_identifiers.insert(&hash, &identifier)?; - - Ok(Ok(())) - }) - }) - .await - .map_err(|_| RepoError::Canceled)?; - - match res { - Ok(res) => Ok(res), - Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { - Err(StoreError::from(RepoError::from(SledError::from(e)))) - } - } - } - - async fn update_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError> { - let identifier = identifier.to_bytes()?; - - b!( - self.hash_identifiers, - hash_identifiers.insert(hash, identifier) - ); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn identifier( &self, @@ -1019,40 +179,6 @@ impl HashRepo for SledRepo { Ok(Some(I::from_bytes(ivec.to_vec())?)) } - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] - async fn relate_variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - identifier: &I, - ) -> Result<(), StoreError> { - let key = variant_key(&hash, &variant); - let value = identifier.to_bytes()?; - - b!( - self.hash_variant_identifiers, - hash_variant_identifiers.insert(key, value) - ); - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn variant_identifier( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result, StoreError> { - let key = variant_key(&hash, &variant); - - let opt = b!( - self.hash_variant_identifiers, - hash_variant_identifiers.get(key) - ); - - opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() - } - #[tracing::instrument(level = "debug", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn variants( &self, @@ -1085,34 +211,6 @@ impl HashRepo for SledRepo { Ok(vec) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { - let key = variant_key(&hash, &variant); - - b!( - self.hash_variant_identifiers, - hash_variant_identifiers.remove(key) - ); - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] - async fn relate_motion_identifier( - &self, - hash: Self::Bytes, - identifier: &I, - ) -> Result<(), StoreError> { - let bytes = identifier.to_bytes()?; - - b!( - self.hash_motion_identifiers, - hash_motion_identifiers.insert(hash, bytes) - ); - - Ok(()) - } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] async fn motion_identifier( &self, @@ -1125,114 +223,10 @@ impl HashRepo for SledRepo { opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } - - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { - let hashes = self.hashes.clone(); - let hash_identifiers = self.hash_identifiers.clone(); - let hash_motion_identifiers = self.hash_motion_identifiers.clone(); - let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - - let hash2 = hash.clone(); - let variant_keys = b!(self.hash_variant_identifiers, { - let v = hash_variant_identifiers - .scan_prefix(hash2) - .keys() - .filter_map(Result::ok) - .collect::>(); - - Ok(v) as Result, SledError> - }); - - let res = actix_web::web::block(move || { - ( - &hashes, - &hash_identifiers, - &hash_motion_identifiers, - &hash_variant_identifiers, - ) - .transaction( - |( - hashes, - hash_identifiers, - hash_motion_identifiers, - hash_variant_identifiers, - )| { - hashes.remove(&hash)?; - hash_identifiers.remove(&hash)?; - hash_motion_identifiers.remove(&hash)?; - - for key in &variant_keys { - hash_variant_identifiers.remove(key)?; - } - - Ok(()) - }, - ) - }) - .await - .map_err(|_| RepoError::Canceled)?; - - match res { - Ok(()) => Ok(()), - Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { - Err(SledError::from(e).into()) - } - } - } -} - -fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec { - let mut v = hash.to_vec(); - v.extend_from_slice(alias); - v } #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(self))] - async fn create( - &self, - alias: &Alias, - delete_token: &DeleteToken, - hash: Self::Bytes, - ) -> Result, RepoError> { - let alias: sled::IVec = alias.to_bytes().into(); - let delete_token: sled::IVec = delete_token.to_bytes().into(); - - let aliases = self.aliases.clone(); - let alias_hashes = self.alias_hashes.clone(); - let hash_aliases = self.hash_aliases.clone(); - let alias_delete_tokens = self.alias_delete_tokens.clone(); - - let res = actix_web::web::block(move || { - (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( - |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { - if aliases.get(&alias)?.is_some() { - return Ok(Err(AliasAlreadyExists)); - } - - aliases.insert(&alias, &alias)?; - alias_hashes.insert(&alias, &hash)?; - - hash_aliases.insert(hash_alias_key(&hash, &alias), &alias)?; - alias_delete_tokens.insert(&alias, &delete_token)?; - - Ok(Ok(())) - }, - ) - }) - .await - .map_err(|_| RepoError::Canceled)?; - - match res { - Ok(res) => Ok(res), - Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { - Err(SledError::from(e).into()) - } - } - } - #[tracing::instrument(level = "trace", skip(self))] async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { let key = alias.to_bytes(); @@ -1248,15 +242,6 @@ impl AliasRepo for SledRepo { Ok(Some(token)) } - #[tracing::instrument(level = "trace", skip(self))] - async fn hash(&self, alias: &Alias) -> Result, RepoError> { - let key = alias.to_bytes(); - - let opt = b!(self.alias_hashes, alias_hashes.get(key)); - - Ok(opt) - } - #[tracing::instrument(skip_all)] async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { let v = b!(self.hash_aliases, { @@ -1270,38 +255,6 @@ impl AliasRepo for SledRepo { Ok(v) } - - #[tracing::instrument(skip(self))] - async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { - let alias: IVec = alias.to_bytes().into(); - - let aliases = self.aliases.clone(); - let alias_hashes = self.alias_hashes.clone(); - let hash_aliases = self.hash_aliases.clone(); - let alias_delete_tokens = self.alias_delete_tokens.clone(); - - let res = actix_web::web::block(move || { - (&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction( - |(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| { - aliases.remove(&alias)?; - if let Some(hash) = alias_hashes.remove(&alias)? { - hash_aliases.remove(hash_alias_key(&hash, &alias))?; - } - alias_delete_tokens.remove(&alias)?; - Ok(()) - }, - ) - }) - .await - .map_err(|_| RepoError::Canceled)?; - - match res { - Ok(()) => Ok(()), - Err(TransactionError::Abort(e)) | Err(TransactionError::Storage(e)) => { - Err(SledError::from(e).into()) - } - } - } } impl std::fmt::Debug for SledRepo { @@ -1315,20 +268,3 @@ impl From for SledError { SledError::Panic } } - -#[cfg(test)] -mod tests { - #[test] - fn round_trip() { - let hash = sled::IVec::from(b"some hash value"); - let variant = String::from("some string value"); - - let key = super::variant_access_key(&hash, &variant); - - let (out_hash, out_variant) = - super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes"); - - assert_eq!(out_hash, hash); - assert_eq!(out_variant, variant); - } -} diff --git a/src/store.rs b/src/store.rs index 988535b..e1704d2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,7 @@ use actix_web::web::Bytes; use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::stream::Stream; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod file_store; @@ -59,19 +59,23 @@ impl From for StoreError { } } -pub(crate) trait Identifier: Send + Sync + Clone + Debug { +pub(crate) trait Identifier: Send + Sync + Debug { fn to_bytes(&self) -> Result, StoreError>; fn from_bytes(bytes: Vec) -> Result where Self: Sized; + fn from_arc(arc: Arc<[u8]>) -> Result + where + Self: Sized; + fn string_repr(&self) -> String; } #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { - type Identifier: Identifier + 'static; + type Identifier: Identifier + Clone + 'static; type Stream: Stream> + Unpin + 'static; async fn health_check(&self) -> Result<(), StoreError>; @@ -278,6 +282,13 @@ impl Identifier for Vec { Ok(bytes) } + fn from_arc(arc: Arc<[u8]>) -> Result + where + Self: Sized, + { + Ok(Vec::from(&arc[..])) + } + fn to_bytes(&self) -> Result, StoreError> { Ok(self.clone()) } @@ -286,3 +297,27 @@ impl Identifier for Vec { BASE64_STANDARD.encode(self.as_slice()) } } + +impl Identifier for Arc<[u8]> { + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized, + { + Ok(Arc::from(bytes)) + } + + fn from_arc(arc: Arc<[u8]>) -> Result + where + Self: Sized, + { + Ok(arc) + } + + fn to_bytes(&self) -> Result, StoreError> { + Ok(Vec::from(&self[..])) + } + + fn string_repr(&self) -> String { + BASE64_STANDARD.encode(&self[..]) + } +} diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs index 08b223f..279ddbc 100644 --- a/src/store/file_store/file_id.rs +++ b/src/store/file_store/file_id.rs @@ -30,6 +30,13 @@ impl Identifier for FileId { Ok(id) } + fn from_arc(arc: std::sync::Arc<[u8]>) -> Result + where + Self: Sized, + { + Self::from_bytes(Vec::from(&arc[..])) + } + fn string_repr(&self) -> String { self.0.to_string_lossy().into_owned() } diff --git a/src/store/object_store/object_id.rs b/src/store/object_store/object_id.rs index 512f884..f8a7dd9 100644 --- a/src/store/object_store/object_id.rs +++ b/src/store/object_store/object_id.rs @@ -14,6 +14,13 @@ impl Identifier for ObjectId { )) } + fn from_arc(arc: std::sync::Arc<[u8]>) -> Result + where + Self: Sized, + { + Self::from_bytes(Vec::from(&arc[..])) + } + fn string_repr(&self) -> String { self.0.clone() } diff --git a/src/stream.rs b/src/stream.rs index 8848ef5..a1f1024 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -12,6 +12,8 @@ use std::{ time::Duration, }; +pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; + pub(crate) trait StreamLimit { fn limit(self, limit: u64) -> Limit where @@ -39,6 +41,17 @@ pub(crate) trait StreamTimeout { } } +pub(crate) trait IntoStreamer: Stream { + fn into_streamer(self) -> Streamer + where + Self: Sized, + { + Streamer(Some(self)) + } +} + +impl IntoStreamer for T where T: Stream + Unpin {} + pub(crate) fn from_iterator( iterator: I, buffer: usize, @@ -51,6 +64,28 @@ pub(crate) fn from_iterator( impl StreamLimit for S where S: Stream> {} impl StreamTimeout for S where S: Stream {} +pub(crate) struct Streamer(Option); + +impl Streamer { + pub(crate) async fn next(&mut self) -> Option + where + S: Stream + Unpin, + { + let opt = match self.0 { + Some(ref mut stream) => { + std::future::poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await + } + None => None, + }; + + if opt.is_none() { + self.0.take(); + } + + opt + } +} + pin_project_lite::pin_project! { pub(crate) struct Limit { #[pin]