Make Repo object safe. Create migration

This commit is contained in:
asonix 2023-08-15 19:19:03 -05:00
parent ba535e8b83
commit 1ee77b19d8
24 changed files with 821 additions and 2719 deletions

View file

@ -20,8 +20,10 @@ targets = "info"
[metrics]
[old_db]
path = "/mnt"
[old_repo]
path = "/mnt/sled-repo"
cache_capacity = 67108864
export_path = "/mnt/exports"
[media]
max_file_size = 40

View file

@ -128,12 +128,17 @@ targets = 'info'
prometheus_address = "0.0.0.0:9000"
## Configuration for migrating from pict-rs 0.2
[old_db]
## Optional: path to old pict-rs directory
# environment variable: PICTRS__OLD_DB__PATH
# default: /mnt
path = '/mnt'
## Configuration for migrating from pict-rs 0.4
[old_repo]
## Optional: path to sled repository
# environment variable: PICTRS__OLD_REPO__PATH
# default: /mnt/sled-repo
path = '/mnt/sled-repo'
## Optional: in-memory cache capacity for sled data (in bytes)
# environment variable: PICTRS__OLD_REPO__CACHE_CAPACITY
# default: 67,108,864 (1024 * 1024 * 64, or 64MB)
cache_capacity = 67108864
## Media Processing Configuration

View file

@ -1,6 +1,6 @@
use crate::{
error::Error,
repo::{FullRepo, UploadId, UploadRepo},
repo::{ArcRepo, UploadId, UploadRepo},
store::Store,
};
use actix_web::web::Bytes;
@ -8,19 +8,17 @@ use futures_util::{Stream, TryStreamExt};
use mime::APPLICATION_OCTET_STREAM;
use tracing::{Instrument, Span};
pub(crate) struct Backgrounded<R, S>
pub(crate) struct Backgrounded<S>
where
R: FullRepo + 'static,
S: Store,
{
repo: R,
repo: ArcRepo,
identifier: Option<S::Identifier>,
upload_id: Option<UploadId>,
}
impl<R, S> Backgrounded<R, S>
impl<S> Backgrounded<S>
where
R: FullRepo + 'static,
S: Store,
{
pub(crate) fn disarm(mut self) {
@ -36,7 +34,7 @@ where
self.identifier.as_ref()
}
pub(crate) async fn proxy<P>(repo: R, store: S, stream: P) -> Result<Self, Error>
pub(crate) async fn proxy<P>(repo: ArcRepo, store: S, stream: P) -> Result<Self, Error>
where
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{
@ -55,7 +53,11 @@ where
where
P: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{
UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?;
UploadRepo::create(
self.repo.as_ref(),
self.upload_id.expect("Upload id exists"),
)
.await?;
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
@ -68,9 +70,8 @@ where
}
}
impl<R, S> Drop for Backgrounded<R, S>
impl<S> Drop for Backgrounded<S>
where
R: FullRepo + 'static,
S: Store,
{
fn drop(&mut self) {

View file

@ -13,7 +13,8 @@ impl Args {
pub(super) fn into_output(self) -> Output {
let Args {
config_file,
old_db_path,
old_repo_path,
old_repo_cache_capacity,
log_format,
log_targets,
console_address,
@ -25,7 +26,11 @@ impl Args {
command,
} = self;
let old_db = OldDb { path: old_db_path };
let old_repo = OldSled {
path: old_repo_path,
cache_capacity: old_repo_cache_capacity,
}
.set();
let tracing = Tracing {
logging: Logging {
@ -193,7 +198,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -211,7 +216,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -227,7 +232,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -255,7 +260,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -275,7 +280,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -299,7 +304,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -322,7 +327,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
old_db,
old_repo,
tracing,
metrics,
media,
@ -368,7 +373,8 @@ pub(crate) enum Operation {
pub(super) struct ConfigFormat {
server: Server,
client: Client,
old_db: OldDb,
#[serde(skip_serializing_if = "Option::is_none")]
old_repo: Option<OldSled>,
tracing: Tracing,
metrics: Metrics,
media: Media,
@ -444,13 +450,6 @@ struct Metrics {
prometheus_address: Option<SocketAddr>,
}
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct OldDb {
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<PathBuf>,
}
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct Media {
@ -706,7 +705,11 @@ pub(super) struct Args {
/// Path to the old pict-rs sled database
#[arg(long)]
old_db_path: Option<PathBuf>,
old_repo_path: Option<PathBuf>,
/// The cache capacity, in bytes, allowed to sled for in-memory operations
#[arg(long)]
old_repo_cache_capacity: Option<u64>,
/// Format of logs printed to stdout
#[arg(long)]
@ -1178,3 +1181,29 @@ struct Sled {
#[serde(skip_serializing_if = "Option::is_none")]
export_path: Option<PathBuf>,
}
#[derive(Debug, Parser, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct OldSled {
/// The path to store the sled database
#[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<PathBuf>,
/// The cache capacity, in bytes, allowed to sled for in-memory operations
#[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
cache_capacity: Option<u64>,
}
impl OldSled {
fn set(self) -> Option<Self> {
let any_set = self.path.is_some() || self.cache_capacity.is_some();
if any_set {
Some(self)
} else {
None
}
}
}

View file

@ -11,7 +11,7 @@ pub(crate) struct Defaults {
server: ServerDefaults,
client: ClientDefaults,
tracing: TracingDefaults,
old_db: OldDbDefaults,
old_repo: SledDefaults,
media: MediaDefaults,
repo: RepoDefaults,
store: StoreDefaults,
@ -62,12 +62,6 @@ struct OpenTelemetryDefaults {
targets: Serde<Targets>,
}
#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct OldDbDefaults {
path: PathBuf,
}
#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct MediaDefaults {
@ -225,14 +219,6 @@ impl Default for OpenTelemetryDefaults {
}
}
impl Default for OldDbDefaults {
fn default() -> Self {
OldDbDefaults {
path: PathBuf::from(String::from("/mnt")),
}
}
}
impl Default for MediaDefaults {
fn default() -> Self {
MediaDefaults {

View file

@ -18,7 +18,7 @@ pub(crate) struct ConfigFile {
#[serde(default)]
pub(crate) metrics: Metrics,
pub(crate) old_db: OldDb,
pub(crate) old_repo: Sled,
pub(crate) media: Media,

View file

@ -52,8 +52,8 @@ pub(crate) enum UploadError {
#[error("Error in DB")]
Repo(#[from] crate::repo::RepoError),
#[error("Error in old sled DB")]
OldSled(#[from] ::sled::Error),
#[error("Error in old repo")]
OldRepo(#[from] crate::repo_04::RepoError),
#[error("Error parsing string")]
ParseString(#[from] std::string::FromUtf8Error),

View file

@ -4,8 +4,8 @@ use crate::{
error::{Error, UploadError},
ffmpeg::ThumbnailFormat,
formats::{InputProcessableFormat, InternalVideoFormat},
repo::{Alias, FullRepo, Hash},
store::Store,
repo::{Alias, ArcRepo, Hash},
store::{Identifier, Store},
};
use actix_web::web::Bytes;
use std::{path::PathBuf, time::Instant};
@ -40,8 +40,8 @@ impl Drop for MetricsGuard {
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(repo, store, hash, process_map, media))]
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
pub(crate) async fn generate<S: Store + 'static>(
repo: &ArcRepo,
store: &S,
process_map: &ProcessMap,
format: InputProcessableFormat,
@ -75,8 +75,8 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(repo, store, hash, media))]
async fn process<R: FullRepo, S: Store + 'static>(
repo: &R,
async fn process<S: Store + 'static>(
repo: &ArcRepo,
store: &S,
output_format: InputProcessableFormat,
alias: Alias,
@ -90,11 +90,8 @@ async fn process<R: FullRepo, S: Store + 'static>(
let guard = MetricsGuard::guard();
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let identifier = if let Some(identifier) = repo
.still_identifier_from_alias::<S::Identifier>(&alias)
.await?
{
identifier
let identifier = if let Some(identifier) = repo.still_identifier_from_alias(&alias).await? {
S::Identifier::from_arc(identifier)?
} else {
let Some(identifier) = repo.identifier(hash.clone()).await? else {
return Err(UploadError::MissingIdentifier.into());
@ -104,7 +101,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
let reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,
S::Identifier::from_arc(identifier)?,
input_format.unwrap_or(InternalVideoFormat::Mp4),
thumbnail_format,
media.process_timeout,

View file

@ -3,7 +3,7 @@ use crate::{
either::Either,
error::{Error, UploadError},
formats::{InternalFormat, Validations},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo},
repo::{Alias, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo},
store::Store,
};
use actix_web::web::Bytes;
@ -14,12 +14,11 @@ mod hasher;
use hasher::Hasher;
#[derive(Debug)]
pub(crate) struct Session<R, S>
pub(crate) struct Session<S>
where
R: FullRepo + 'static,
S: Store,
{
repo: R,
repo: ArcRepo,
delete_token: DeleteToken,
hash: Option<Hash>,
alias: Option<Alias>,
@ -41,15 +40,14 @@ where
}
#[tracing::instrument(skip(repo, store, stream, media))]
pub(crate) async fn ingest<R, S>(
repo: &R,
pub(crate) async fn ingest<S>(
repo: &ArcRepo,
store: &S,
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>,
media: &crate::config::Media,
) -> Result<Session<R, S>, Error>
) -> Result<Session<S>, Error>
where
R: FullRepo + 'static,
S: Store,
{
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
@ -129,18 +127,17 @@ where
}
#[tracing::instrument(level = "trace", skip_all)]
async fn save_upload<R, S>(
session: &mut Session<R, S>,
repo: &R,
async fn save_upload<S>(
session: &mut Session<S>,
repo: &ArcRepo,
store: &S,
hash: Hash,
identifier: &S::Identifier,
) -> Result<(), Error>
where
S: Store,
R: FullRepo,
{
if HashRepo::create(repo, hash.clone(), identifier)
if HashRepo::create(repo.as_ref(), hash.clone(), identifier)
.await?
.is_err()
{
@ -156,9 +153,8 @@ where
Ok(())
}
impl<R, S> Session<R, S>
impl<S> Session<S>
where
R: FullRepo + 'static,
S: Store,
{
pub(crate) fn disarm(mut self) -> DeleteToken {
@ -179,7 +175,7 @@ where
#[tracing::instrument(skip(self, hash))]
async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias, &self.delete_token, hash)
AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash)
.await?
.map_err(|_| UploadError::DuplicateAlias)?;
@ -193,7 +189,7 @@ where
loop {
let alias = Alias::generate(input_type.file_extension().to_string());
if AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone())
if AliasRepo::create(self.repo.as_ref(), &alias, &self.delete_token, hash.clone())
.await?
.is_ok()
{
@ -207,9 +203,8 @@ where
}
}
impl<R, S> Drop for Session<R, S>
impl<S> Drop for Session<S>
where
R: FullRepo + 'static,
S: Store,
{
fn drop(&mut self) {

View file

@ -41,6 +41,7 @@ use futures_util::{
use metrics_exporter_prometheus::PrometheusBuilder;
use middleware::Metrics;
use once_cell::sync::Lazy;
use repo::ArcRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use rusty_s3::UrlStyle;
@ -48,6 +49,7 @@ use std::{
future::ready,
path::Path,
path::PathBuf,
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::Semaphore;
@ -68,15 +70,11 @@ use self::{
migrate_store::migrate_store,
queue::queue_generate,
repo::{
sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, Hash, HashRepo,
IdentifierRepo, Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo,
sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, Hash, Repo, UploadId, UploadResult,
VariantAccessRepo,
},
serde_str::Serde,
store::{
file_store::FileStore,
object_store::{ObjectStore, ObjectStoreConfig},
Identifier, Store,
},
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
stream::{StreamLimit, StreamTimeout},
};
@ -94,13 +92,13 @@ static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
.in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
});
async fn ensure_details<R: FullRepo, S: Store + 'static>(
repo: &R,
async fn ensure_details<S: Store + 'static>(
repo: &ArcRepo,
store: &S,
config: &Configuration,
alias: &Alias,
) -> Result<Details, Error> {
let Some(identifier) = repo.identifier_from_alias::<S::Identifier>(alias).await? else {
let Some(identifier) = repo.identifier_from_alias(alias).await?.map(S::Identifier::from_arc).transpose()? else {
return Err(UploadError::MissingAlias.into());
};
@ -130,10 +128,10 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
}
}
struct Upload<R: FullRepo + 'static, S: Store + 'static>(Value<Session<R, S>>);
struct Upload<S: Store + 'static>(Value<Session<S>>);
impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
type Item = Session<R, S>;
impl<S: Store + 'static> FormData for Upload<S> {
type Item = Session<S>;
type Error = Error;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
@ -141,7 +139,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let repo = req
.app_data::<web::Data<R>>()
.app_data::<web::Data<ArcRepo>>()
.expect("No repo in request")
.clone();
let store = req
@ -176,7 +174,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
return Err(UploadError::ReadOnly.into());
}
ingest::ingest(&**repo, &**store, stream, None, &config.media).await
ingest::ingest(&repo, &**store, stream, None, &config.media).await
}
.instrument(span),
)
@ -184,20 +182,20 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
)
}
fn extract(value: Value<Session<R, S>>) -> Result<Self, Self::Error> {
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error> {
Ok(Upload(value))
}
}
struct Import<R: FullRepo + 'static, S: Store + 'static>(Value<Session<R, S>>);
struct Import<S: Store + 'static>(Value<Session<S>>);
impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
type Item = Session<R, S>;
impl<S: Store + 'static> FormData for Import<S> {
type Item = Session<S>;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
let repo = req
.app_data::<web::Data<R>>()
.app_data::<web::Data<ArcRepo>>()
.expect("No repo in request")
.clone();
let store = req
@ -236,7 +234,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
}
ingest::ingest(
&**repo,
&repo,
&**store,
stream,
Some(Alias::from_existing(&filename)),
@ -260,9 +258,9 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
/// Handle responding to successful uploads
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
async fn upload<R: FullRepo, S: Store + 'static>(
Multipart(Upload(value)): Multipart<Upload<R, S>>,
repo: web::Data<R>,
async fn upload<S: Store + 'static>(
Multipart(Upload(value)): Multipart<Upload<S>>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -271,9 +269,9 @@ async fn upload<R: FullRepo, S: Store + 'static>(
/// Handle responding to successful uploads
#[tracing::instrument(name = "Imported files", skip(value, repo, store, config))]
async fn import<R: FullRepo, S: Store + 'static>(
Multipart(Import(value)): Multipart<Import<R, S>>,
repo: web::Data<R>,
async fn import<S: Store + 'static>(
Multipart(Import(value)): Multipart<Import<S>>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -282,9 +280,9 @@ async fn import<R: FullRepo, S: Store + 'static>(
/// Handle responding to successful uploads
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
async fn handle_upload<R: FullRepo, S: Store + 'static>(
value: Value<Session<R, S>>,
repo: web::Data<R>,
async fn handle_upload<S: Store + 'static>(
value: Value<Session<S>>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -325,10 +323,10 @@ async fn handle_upload<R: FullRepo, S: Store + 'static>(
})))
}
struct BackgroundedUpload<R: FullRepo + 'static, S: Store + 'static>(Value<Backgrounded<R, S>>);
struct BackgroundedUpload<S: Store + 'static>(Value<Backgrounded<S>>);
impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
type Item = Backgrounded<R, S>;
impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
type Item = Backgrounded<S>;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
@ -336,7 +334,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let repo = req
.app_data::<web::Data<R>>()
.app_data::<web::Data<ArcRepo>>()
.expect("No repo in request")
.clone();
let store = req
@ -389,9 +387,9 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
}
#[tracing::instrument(name = "Uploaded files", skip(value, repo))]
async fn upload_backgrounded<R: FullRepo, S: Store>(
Multipart(BackgroundedUpload(value)): Multipart<BackgroundedUpload<R, S>>,
repo: web::Data<R>,
async fn upload_backgrounded<S: Store>(
Multipart(BackgroundedUpload(value)): Multipart<BackgroundedUpload<S>>,
repo: web::Data<ArcRepo>,
) -> Result<HttpResponse, Error> {
let images = value
.map()
@ -437,8 +435,8 @@ struct ClaimQuery {
/// Claim a backgrounded upload
#[tracing::instrument(name = "Waiting on upload", skip_all)]
async fn claim_upload<R: FullRepo, S: Store + 'static>(
repo: web::Data<R>,
async fn claim_upload<S: Store + 'static>(
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
query: web::Query<ClaimQuery>,
@ -483,9 +481,9 @@ struct UrlQuery {
backgrounded: bool,
}
async fn ingest_inline<R: FullRepo, S: Store + 'static>(
async fn ingest_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: &R,
repo: &ArcRepo,
store: &S,
config: &Configuration,
) -> Result<(Alias, DeleteToken, Details), Error> {
@ -502,9 +500,9 @@ async fn ingest_inline<R: FullRepo, S: Store + 'static>(
/// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))]
async fn download<R: FullRepo + 'static, S: Store + 'static>(
async fn download<S: Store + 'static>(
client: web::Data<ClientWithMiddleware>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
query: web::Query<UrlQuery>,
@ -542,9 +540,9 @@ async fn download_stream(
}
#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))]
async fn do_download_inline<R: FullRepo, S: Store + 'static>(
async fn do_download_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -563,9 +561,9 @@ async fn do_download_inline<R: FullRepo, S: Store + 'static>(
}
#[tracing::instrument(name = "Downloading file in background", skip(stream, repo, store))]
async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
async fn do_download_backgrounded<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
metrics::increment_counter!("pict-rs.files", "download" => "background");
@ -592,8 +590,8 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
/// Delete aliases and files
#[tracing::instrument(name = "Deleting file", skip(repo, config))]
async fn delete<R: FullRepo>(
repo: web::Data<R>,
async fn delete(
repo: web::Data<ArcRepo>,
config: web::Data<Configuration>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
@ -649,10 +647,10 @@ fn prepare_process(
}
#[tracing::instrument(name = "Fetching derived details", skip(repo, config))]
async fn process_details<R: FullRepo, S: Store>(
async fn process_details<S: Store>(
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
let alias = match source {
@ -681,12 +679,15 @@ async fn process_details<R: FullRepo, S: Store>(
let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
if !config.server.read_only {
VariantAccessRepo::accessed(&repo, hash.clone(), thumbnail_string.clone()).await?;
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), thumbnail_string.clone())
.await?;
}
let identifier = repo
.variant_identifier::<S::Identifier>(hash, thumbnail_string)
.variant_identifier(hash, thumbnail_string)
.await?
.map(S::Identifier::from_arc)
.transpose()?
.ok_or(UploadError::MissingAlias)?;
let details = repo.details(&identifier).await?;
@ -696,7 +697,7 @@ async fn process_details<R: FullRepo, S: Store>(
Ok(HttpResponse::Ok().json(&details))
}
async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, Hash)>, Error> {
async fn not_found_hash(repo: &ArcRepo) -> Result<Option<(Alias, Hash)>, Error> {
let Some(not_found) = repo.get(NOT_FOUND_KEY).await? else {
return Ok(None);
};
@ -720,11 +721,11 @@ async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, Hash)>,
name = "Serving processed image",
skip(repo, store, client, config, process_map)
)]
async fn process<R: FullRepo, S: Store + 'static>(
async fn process<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>,
@ -750,7 +751,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
};
if !config.server.read_only {
AliasAccessRepo::accessed(&repo, alias.clone()).await?;
AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?;
}
alias
@ -773,12 +774,14 @@ async fn process<R: FullRepo, S: Store + 'static>(
};
if !config.server.read_only {
VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?;
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?;
}
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
.variant_identifier(hash.clone(), path_string)
.await?
.map(S::Identifier::from_arc)
.transpose()?;
if let Some(identifier) = identifier_opt {
let details = repo.details(&identifier).await?.and_then(|details| {
@ -874,11 +877,11 @@ async fn process<R: FullRepo, S: Store + 'static>(
}
#[tracing::instrument(name = "Serving processed image headers", skip(repo, store, config))]
async fn process_head<R: FullRepo, S: Store + 'static>(
async fn process_head<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -903,12 +906,14 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
};
if !config.server.read_only {
VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?;
VariantAccessRepo::accessed((**repo).as_ref(), hash.clone(), path_string.clone()).await?;
}
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
.variant_identifier(hash.clone(), path_string)
.await?
.map(S::Identifier::from_arc)
.transpose()?;
if let Some(identifier) = identifier_opt {
let details = repo.details(&identifier).await?.and_then(|details| {
@ -950,10 +955,10 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
/// Process files
#[tracing::instrument(name = "Spawning image process", skip(repo))]
async fn process_backgrounded<R: FullRepo, S: Store>(
async fn process_backgrounded<S: Store>(
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
let source = match source {
@ -978,8 +983,10 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
};
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
.variant_identifier(hash.clone(), path_string)
.await?
.map(S::Identifier::from_arc)
.transpose()?;
if identifier_opt.is_some() {
return Ok(HttpResponse::Accepted().finish());
@ -996,9 +1003,9 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
/// Fetch file details
#[tracing::instrument(name = "Fetching query details", skip(repo, store, config))]
async fn details_query<R: FullRepo, S: Store + 'static>(
async fn details_query<S: Store + 'static>(
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1019,18 +1026,18 @@ async fn details_query<R: FullRepo, S: Store + 'static>(
/// Fetch file details
#[tracing::instrument(name = "Fetching details", skip(repo, store, config))]
async fn details<R: FullRepo, S: Store + 'static>(
async fn details<S: Store + 'static>(
alias: web::Path<Serde<Alias>>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
do_details(Serde::into_inner(alias.into_inner()), repo, store, config).await
}
async fn do_details<R: FullRepo, S: Store + 'static>(
async fn do_details<S: Store + 'static>(
alias: Alias,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1041,10 +1048,10 @@ async fn do_details<R: FullRepo, S: Store + 'static>(
/// Serve files based on alias query
#[tracing::instrument(name = "Serving file query", skip(repo, store, client, config))]
async fn serve_query<R: FullRepo, S: Store + 'static>(
async fn serve_query<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>,
@ -1067,7 +1074,7 @@ async fn serve_query<R: FullRepo, S: Store + 'static>(
};
if !config.server.read_only {
AliasAccessRepo::accessed(&repo, alias.clone()).await?;
AliasAccessRepo::accessed((**repo).as_ref(), alias.clone()).await?;
}
alias
@ -1079,10 +1086,10 @@ async fn serve_query<R: FullRepo, S: Store + 'static>(
/// Serve files
#[tracing::instrument(name = "Serving file", skip(repo, store, config))]
async fn serve<R: FullRepo, S: Store + 'static>(
async fn serve<S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1096,10 +1103,10 @@ async fn serve<R: FullRepo, S: Store + 'static>(
.await
}
async fn do_serve<R: FullRepo, S: Store + 'static>(
async fn do_serve<S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: Alias,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1113,7 +1120,7 @@ async fn do_serve<R: FullRepo, S: Store + 'static>(
(hash, alias, true)
};
let Some(identifier) = repo.identifier(hash.clone()).await? else {
let Some(identifier) = repo.identifier(hash.clone()).await?.map(Identifier::from_arc).transpose()? else {
tracing::warn!(
"Original File identifier for hash {hash:?} is missing, queue cleanup task",
);
@ -1133,10 +1140,10 @@ async fn do_serve<R: FullRepo, S: Store + 'static>(
}
#[tracing::instrument(name = "Serving query file headers", skip(repo, store, config))]
async fn serve_query_head<R: FullRepo, S: Store + 'static>(
async fn serve_query_head<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1154,10 +1161,10 @@ async fn serve_query_head<R: FullRepo, S: Store + 'static>(
}
#[tracing::instrument(name = "Serving file headers", skip(repo, store, config))]
async fn serve_head<R: FullRepo, S: Store + 'static>(
async fn serve_head<S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1171,14 +1178,14 @@ async fn serve_head<R: FullRepo, S: Store + 'static>(
.await
}
async fn do_serve_head<R: FullRepo, S: Store + 'static>(
async fn do_serve_head<S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: Alias,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
let Some(identifier) = repo.identifier_from_alias::<S::Identifier>(&alias).await? else {
let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else {
// Invalid alias
return Ok(HttpResponse::NotFound().finish());
};
@ -1327,8 +1334,8 @@ fn srv_head(
}
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))]
async fn clean_variants<R: FullRepo>(
repo: web::Data<R>,
async fn clean_variants(
repo: web::Data<ArcRepo>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
if config.server.read_only {
@ -1347,9 +1354,9 @@ enum AliasQuery {
}
#[tracing::instrument(name = "Setting 404 Image", skip(repo, config))]
async fn set_not_found<R: FullRepo>(
async fn set_not_found(
json: web::Json<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
@ -1380,9 +1387,9 @@ async fn set_not_found<R: FullRepo>(
}
#[tracing::instrument(name = "Purging file", skip(repo, config))]
async fn purge<R: FullRepo>(
async fn purge(
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
if config.server.read_only {
@ -1415,9 +1422,9 @@ async fn purge<R: FullRepo>(
}
#[tracing::instrument(name = "Fetching aliases", skip(repo))]
async fn aliases<R: FullRepo>(
async fn aliases(
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
@ -1438,9 +1445,9 @@ async fn aliases<R: FullRepo>(
}
#[tracing::instrument(name = "Fetching identifier", skip(repo))]
async fn identifier<R: FullRepo, S: Store>(
async fn identifier<S: Store>(
web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>,
repo: web::Data<ArcRepo>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
@ -1452,7 +1459,7 @@ async fn identifier<R: FullRepo, S: Store>(
}
};
let Some(identifier) = repo.identifier_from_alias::<S::Identifier>(&alias).await? else {
let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else {
// Invalid alias
return Ok(HttpResponse::NotFound().json(serde_json::json!({
"msg": "No identifiers associated with provided alias"
@ -1465,8 +1472,8 @@ async fn identifier<R: FullRepo, S: Store>(
})))
}
async fn healthz<R: FullRepo, S: Store>(
repo: web::Data<R>,
async fn healthz<S: Store>(
repo: web::Data<ArcRepo>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
repo.health_check().await?;
@ -1493,13 +1500,9 @@ fn build_client(config: &Configuration) -> Result<ClientWithMiddleware, Error> {
.build())
}
fn configure_endpoints<
R: FullRepo + 'static,
S: Store + 'static,
F: Fn(&mut web::ServiceConfig),
>(
fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
config: &mut web::ServiceConfig,
repo: R,
repo: ArcRepo,
store: S,
configuration: Configuration,
client: ClientWithMiddleware,
@ -1510,68 +1513,63 @@ fn configure_endpoints<
.app_data(web::Data::new(store))
.app_data(web::Data::new(client))
.app_data(web::Data::new(configuration.clone()))
.route("/healthz", web::get().to(healthz::<R, S>))
.route("/healthz", web::get().to(healthz::<S>))
.service(
web::scope("/image")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload::<R, S>)),
.route(web::post().to(upload::<S>)),
)
.service(
web::scope("/backgrounded")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload_backgrounded::<R, S>)),
.route(web::post().to(upload_backgrounded::<S>)),
)
.service(
web::resource("/claim").route(web::get().to(claim_upload::<R, S>)),
),
.service(web::resource("/claim").route(web::get().to(claim_upload::<S>))),
)
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(web::resource("/download").route(web::get().to(download::<S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>))
.route(web::get().to(delete::<R>)),
.route(web::delete().to(delete))
.route(web::get().to(delete)),
)
.service(
web::scope("/original")
.service(
web::resource("")
.route(web::get().to(serve_query::<R, S>))
.route(web::head().to(serve_query_head::<R, S>)),
.route(web::get().to(serve_query::<S>))
.route(web::head().to(serve_query_head::<S>)),
)
.service(
web::resource("/{filename}")
.route(web::get().to(serve::<R, S>))
.route(web::head().to(serve_head::<R, S>)),
.route(web::get().to(serve::<S>))
.route(web::head().to(serve_head::<S>)),
),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process::<R, S>))
.route(web::head().to(process_head::<R, S>)),
.route(web::get().to(process::<S>))
.route(web::head().to(process_head::<S>)),
)
.service(
web::resource("/process_backgrounded.{ext}")
.route(web::get().to(process_backgrounded::<R, S>)),
.route(web::get().to(process_backgrounded::<S>)),
)
.service(
web::scope("/details")
.service(
web::scope("/original")
.service(web::resource("").route(web::get().to(details_query::<S>)))
.service(
web::resource("").route(web::get().to(details_query::<R, S>)),
)
.service(
web::resource("/{filename}")
.route(web::get().to(details::<R, S>)),
web::resource("/{filename}").route(web::get().to(details::<S>)),
),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details::<R, S>)),
.route(web::get().to(process_details::<S>)),
),
),
)
@ -1580,20 +1578,17 @@ fn configure_endpoints<
.wrap(Internal(
configuration.server.api_key.as_ref().map(|s| s.to_owned()),
))
.service(web::resource("/import").route(web::post().to(import::<R, S>)))
.service(web::resource("/variants").route(web::delete().to(clean_variants::<R>)))
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>)))
.service(web::resource("/identifier").route(web::get().to(identifier::<R, S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found::<R>)))
.service(web::resource("/import").route(web::post().to(import::<S>)))
.service(web::resource("/variants").route(web::delete().to(clean_variants)))
.service(web::resource("/purge").route(web::post().to(purge)))
.service(web::resource("/aliases").route(web::get().to(aliases)))
.service(web::resource("/identifier").route(web::get().to(identifier::<S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found)))
.configure(extra_config),
);
}
fn spawn_cleanup<R>(repo: R, config: &Configuration)
where
R: FullRepo + 'static,
{
fn spawn_cleanup(repo: ArcRepo, config: &Configuration) {
if config.server.read_only {
return;
}
@ -1623,9 +1618,8 @@ where
})
}
fn spawn_workers<R, S>(repo: R, store: S, config: Configuration, process_map: ProcessMap)
fn spawn_workers<S>(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap)
where
R: FullRepo + 'static,
S: Store + 'static,
{
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
@ -1639,8 +1633,8 @@ where
.in_scope(|| actix_rt::spawn(queue::process_images(repo, store, process_map, config)));
}
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
repo: R,
async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
repo: ArcRepo,
store: FileStore,
client: ClientWithMiddleware,
config: Configuration,
@ -1678,12 +1672,9 @@ async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig)
.await
}
async fn launch_object_store<
R: FullRepo + 'static,
F: Fn(&mut web::ServiceConfig) + Send + Clone,
>(
repo: R,
store_config: ObjectStoreConfig,
async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
repo: ArcRepo,
store: ObjectStore,
client: ClientWithMiddleware,
config: Configuration,
extra_config: F,
@ -1696,7 +1687,7 @@ async fn launch_object_store<
HttpServer::new(move || {
let client = client.clone();
let store = store_config.clone().build(client.clone());
let store = store.clone();
let repo = repo.clone();
let config = config.clone();
let extra_config = extra_config.clone();
@ -1737,7 +1728,7 @@ where
match repo {
Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, timeout).await?
migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await?
}
}
}
@ -1775,7 +1766,7 @@ where
match repo {
Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, timeout).await?
migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await?
}
}
}
@ -1833,8 +1824,9 @@ async fn export_handler(repo: web::Data<SledRepo>) -> Result<HttpResponse, Error
})))
}
fn sled_extra_config(sc: &mut web::ServiceConfig) {
sc.service(web::resource("/export").route(web::post().to(export_handler)));
fn sled_extra_config(sc: &mut web::ServiceConfig, repo: SledRepo) {
sc.app_data(web::Data::new(repo))
.service(web::resource("/export").route(web::post().to(export_handler)));
}
impl PictRsConfiguration {
@ -1872,6 +1864,7 @@ impl PictRsConfiguration {
let PictRsConfiguration { config, operation } = self;
let repo = Repo::open(config.repo.clone())?;
let client = build_client(&config)?;
match operation {
@ -1951,14 +1944,28 @@ impl PictRsConfiguration {
match config.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => {
let store = FileStore::build(path, repo.clone()).await?;
let arc_repo = repo.to_arc();
if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, &arc_repo, &store, &config).await?;
arc_repo
.set("migrate-0.4", Arc::from(b"migrated".to_vec()))
.await?;
}
}
match repo {
Repo::Sled(sled_repo) => {
sled_repo
.mark_accessed::<<FileStore as Store>::Identifier>()
.await?;
launch_file_store(sled_repo, store, client, config, sled_extra_config)
.await?;
launch_file_store(
Arc::new(sled_repo.clone()),
store,
client,
config,
move |sc| sled_extra_config(sc, sled_repo.clone()),
)
.await?;
}
}
}
@ -1991,16 +1998,26 @@ impl PictRsConfiguration {
public_endpoint,
repo.clone(),
)
.await?;
.await?
.build(client.clone());
let arc_repo = repo.to_arc();
if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, &arc_repo, &store, &config).await?;
arc_repo
.set("migrate-0.4", Arc::from(b"migrated".to_vec()))
.await?;
}
}
match repo {
Repo::Sled(sled_repo) => {
sled_repo
.mark_accessed::<<ObjectStore as Store>::Identifier>()
.await?;
launch_object_store(sled_repo, store, client, config, sled_extra_config)
.await?;
launch_object_store(arc_repo, store, client, config, move |sc| {
sled_extra_config(sc, sled_repo.clone())
})
.await?;
}
}
}

View file

@ -8,12 +8,12 @@ use std::{
use crate::{
details::Details,
error::{Error, UploadError},
repo::{Hash, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo},
repo::{ArcRepo, Hash, IdentifierRepo},
store::{Identifier, Store},
};
pub(super) async fn migrate_store<R, S1, S2>(
repo: R,
pub(super) async fn migrate_store<S1, S2>(
repo: ArcRepo,
from: S1,
to: S2,
skip_missing_files: bool,
@ -22,7 +22,6 @@ pub(super) async fn migrate_store<R, S1, S2>(
where
S1: Store + Clone + 'static,
S2: Store + Clone + 'static,
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
{
tracing::warn!("Running checks");
@ -65,8 +64,8 @@ where
Ok(())
}
struct MigrateState<R, S1, S2> {
repo: R,
struct MigrateState<S1, S2> {
repo: ArcRepo,
from: S1,
to: S2,
continuing_migration: bool,
@ -79,8 +78,8 @@ struct MigrateState<R, S1, S2> {
timeout: u64,
}
async fn do_migrate_store<R, S1, S2>(
repo: R,
async fn do_migrate_store<S1, S2>(
repo: ArcRepo,
from: S1,
to: S2,
skip_missing_files: bool,
@ -89,7 +88,6 @@ async fn do_migrate_store<R, S1, S2>(
where
S1: Store + 'static,
S2: Store + 'static,
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
{
let continuing_migration = repo.is_continuing_migration().await?;
let initial_repo_size = repo.size().await?;
@ -150,11 +148,10 @@ where
}
#[tracing::instrument(skip(state))]
async fn migrate_hash<R, S1, S2>(state: &MigrateState<R, S1, S2>, hash: Hash) -> Result<(), Error>
async fn migrate_hash<S1, S2>(state: &MigrateState<S1, S2>, hash: Hash) -> Result<(), Error>
where
S1: Store,
S2: Store,
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo,
{
let MigrateState {
repo,
@ -173,7 +170,7 @@ where
let current_index = index.fetch_add(1, Ordering::Relaxed);
let original_identifier = match repo.identifier(hash.clone()).await {
Ok(Some(identifier)) => identifier,
Ok(Some(identifier)) => S1::Identifier::from_arc(identifier)?,
Ok(None) => {
tracing::warn!(
"Original File identifier for hash {hash:?} is missing, queue cleanup task",
@ -218,6 +215,8 @@ where
}
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
let identifier = S1::Identifier::from_arc(identifier)?;
if !repo.is_migrated(&identifier).await? {
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
Ok(new_identifier) => {
@ -247,6 +246,8 @@ where
}
for (variant, identifier) in repo.variants(hash.clone()).await? {
let identifier = S1::Identifier::from_arc(identifier)?;
if !repo.is_migrated(&identifier).await? {
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
Ok(new_identifier) => {
@ -334,8 +335,8 @@ where
Ok(())
}
async fn migrate_file<R, S1, S2>(
repo: &R,
async fn migrate_file<S1, S2>(
repo: &ArcRepo,
from: &S1,
to: &S2,
identifier: &S1::Identifier,
@ -343,7 +344,6 @@ async fn migrate_file<R, S1, S2>(
timeout: u64,
) -> Result<S2::Identifier, MigrateError>
where
R: IdentifierRepo,
S1: Store,
S2: Store,
{
@ -378,15 +378,14 @@ enum MigrateError {
To(crate::store::StoreError),
}
async fn do_migrate_file<R, S1, S2>(
repo: &R,
async fn do_migrate_file<S1, S2>(
repo: &ArcRepo,
from: &S1,
to: &S2,
identifier: &S1::Identifier,
timeout: u64,
) -> Result<S2::Identifier, MigrateError>
where
R: IdentifierRepo,
S1: Store,
S2: Store,
{
@ -429,15 +428,14 @@ where
Ok(new_identifier)
}
async fn migrate_details<R, I1, I2>(repo: &R, from: &I1, to: &I2) -> Result<(), Error>
async fn migrate_details<I1, I2>(repo: &ArcRepo, from: &I1, to: &I2) -> Result<(), Error>
where
R: IdentifierRepo,
I1: Identifier,
I2: Identifier,
{
if let Some(details) = repo.details(from).await? {
repo.relate_details(to, &details).await?;
repo.cleanup(from).await?;
IdentifierRepo::cleanup(repo.as_ref(), from).await?;
}
Ok(())

View file

@ -3,10 +3,7 @@ use crate::{
config::Configuration,
error::Error,
formats::InputProcessableFormat,
repo::{
Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, JobId, QueueRepo,
UploadId,
},
repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId},
serde_str::Serde,
store::{Identifier, Store},
};
@ -15,6 +12,7 @@ use std::{
future::Future,
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use tracing::Instrument;
@ -88,8 +86,8 @@ enum Process {
},
}
pub(crate) async fn cleanup_alias<R: QueueRepo>(
repo: &R,
pub(crate) async fn cleanup_alias(
repo: &Arc<dyn FullRepo>,
alias: Alias,
token: DeleteToken,
) -> Result<(), Error> {
@ -101,14 +99,14 @@ pub(crate) async fn cleanup_alias<R: QueueRepo>(
Ok(())
}
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: Hash) -> Result<(), Error> {
pub(crate) async fn cleanup_hash(repo: &Arc<dyn FullRepo>, hash: Hash) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Hash { hash })?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
repo: &R,
pub(crate) async fn cleanup_identifier<I: Identifier>(
repo: &Arc<dyn FullRepo>,
identifier: I,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Identifier {
@ -118,8 +116,8 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
Ok(())
}
async fn cleanup_variants<R: QueueRepo>(
repo: &R,
async fn cleanup_variants(
repo: &Arc<dyn FullRepo>,
hash: Hash,
variant: Option<String>,
) -> Result<(), Error> {
@ -128,26 +126,26 @@ async fn cleanup_variants<R: QueueRepo>(
Ok(())
}
pub(crate) async fn cleanup_outdated_proxies<R: QueueRepo>(repo: &R) -> Result<(), Error> {
pub(crate) async fn cleanup_outdated_proxies(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::OutdatedProxies)?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_outdated_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> {
pub(crate) async fn cleanup_outdated_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_all_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> {
pub(crate) async fn cleanup_all_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::AllVariants)?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
pub(crate) async fn queue_ingest(
repo: &Arc<dyn FullRepo>,
identifier: Vec<u8>,
upload_id: UploadId,
declared_alias: Option<Alias>,
@ -161,8 +159,8 @@ pub(crate) async fn queue_ingest<R: QueueRepo>(
Ok(())
}
pub(crate) async fn queue_generate<R: QueueRepo>(
repo: &R,
pub(crate) async fn queue_generate(
repo: &Arc<dyn FullRepo>,
target_format: InputProcessableFormat,
source: Alias,
process_path: PathBuf,
@ -178,16 +176,16 @@ pub(crate) async fn queue_generate<R: QueueRepo>(
Ok(())
}
pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(
repo: R,
pub(crate) async fn process_cleanup<S: Store>(
repo: Arc<dyn FullRepo>,
store: S,
config: Configuration,
) {
process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await
}
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
repo: R,
pub(crate) async fn process_images<S: Store + 'static>(
repo: Arc<dyn FullRepo>,
store: S,
process_map: ProcessMap,
config: Configuration,
@ -205,16 +203,20 @@ pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
async fn process_jobs<R, S, F>(
repo: &R,
async fn process_jobs<S, F>(
repo: &Arc<dyn FullRepo>,
store: &S,
config: &Configuration,
queue: &'static str,
callback: F,
) where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>>
for<'a> F: Fn(
&'a Arc<dyn FullRepo>,
&'a S,
&'a Configuration,
&'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
let worker_id = uuid::Uuid::new_v4();
@ -263,8 +265,8 @@ impl Drop for MetricsGuard {
}
}
async fn job_loop<R, S, F>(
repo: &R,
async fn job_loop<S, F>(
repo: &Arc<dyn FullRepo>,
store: &S,
config: &Configuration,
worker_id: uuid::Uuid,
@ -272,9 +274,13 @@ async fn job_loop<R, S, F>(
callback: F,
) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>>
for<'a> F: Fn(
&'a Arc<dyn FullRepo>,
&'a S,
&'a Configuration,
&'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{
loop {
@ -312,18 +318,17 @@ where
}
}
async fn process_image_jobs<R, S, F>(
repo: &R,
async fn process_image_jobs<S, F>(
repo: &Arc<dyn FullRepo>,
store: &S,
process_map: &ProcessMap,
config: &Configuration,
queue: &'static str,
callback: F,
) where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
S: Store,
for<'a> F: Fn(
&'a R,
&'a Arc<dyn FullRepo>,
&'a S,
&'a ProcessMap,
&'a Configuration,
@ -347,8 +352,8 @@ async fn process_image_jobs<R, S, F>(
}
}
async fn image_job_loop<R, S, F>(
repo: &R,
async fn image_job_loop<S, F>(
repo: &Arc<dyn FullRepo>,
store: &S,
process_map: &ProcessMap,
config: &Configuration,
@ -357,10 +362,9 @@ async fn image_job_loop<R, S, F>(
callback: F,
) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
S: Store,
for<'a> F: Fn(
&'a R,
&'a Arc<dyn FullRepo>,
&'a S,
&'a ProcessMap,
&'a Configuration,
@ -402,15 +406,14 @@ where
}
}
async fn heartbeat<R, Fut>(
repo: &R,
async fn heartbeat<Fut>(
repo: &Arc<dyn FullRepo>,
queue: &'static str,
worker_id: uuid::Uuid,
job_id: JobId,
fut: Fut,
) -> Fut::Output
where
R: QueueRepo,
Fut: std::future::Future,
{
let mut fut =

View file

@ -3,7 +3,7 @@ use crate::{
error::{Error, UploadError},
queue::{Base64Bytes, Cleanup, LocalBoxFuture},
repo::{
Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo,
Alias, AliasAccessRepo, AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, IdentifierRepo,
VariantAccessRepo,
},
serde_str::Serde,
@ -11,20 +11,19 @@ use crate::{
};
use futures_util::StreamExt;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
pub(super) fn perform<'a, S>(
repo: &'a ArcRepo,
store: &'a S,
configuration: &'a Configuration,
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: FullRepo,
S: Store,
{
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Cleanup::Hash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?,
Cleanup::Identifier {
identifier: Base64Bytes(in_identifier),
} => identifier(repo, store, in_identifier).await?,
@ -39,12 +38,10 @@ where
)
.await?
}
Cleanup::Variant { hash, variant } => {
hash_variant::<R, S>(repo, hash, variant).await?
}
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
Cleanup::OutdatedVariants => outdated_variants::<R, S>(repo, configuration).await?,
Cleanup::OutdatedProxies => outdated_proxies::<R, S>(repo, configuration).await?,
Cleanup::Variant { hash, variant } => hash_variant(repo, hash, variant).await?,
Cleanup::AllVariants => all_variants(repo).await?,
Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?,
Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
@ -56,9 +53,8 @@ where
}
#[tracing::instrument(skip_all)]
async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
async fn identifier<S>(repo: &ArcRepo, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
let identifier = S::Identifier::from_bytes(identifier)?;
@ -69,7 +65,7 @@ where
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await {
if let Err(e) = IdentifierRepo::cleanup(repo.as_ref(), &identifier).await {
errors.push(e);
}
@ -81,11 +77,7 @@ where
}
#[tracing::instrument(skip_all)]
async fn hash<R, S>(repo: &R, hash: Hash) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let aliases = repo.for_hash(hash.clone()).await?;
if !aliases.is_empty() {
@ -102,7 +94,7 @@ where
}
let mut idents = repo
.variants::<S::Identifier>(hash.clone())
.variants(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
@ -114,16 +106,13 @@ where
let _ = super::cleanup_identifier(repo, identifier).await;
}
HashRepo::cleanup(repo, hash).await?;
HashRepo::cleanup(repo.as_ref(), hash).await?;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn alias<R>(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error>
where
R: FullRepo,
{
async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), Error> {
let saved_delete_token = repo.delete_token(&alias).await?;
if saved_delete_token.is_some() && saved_delete_token != Some(token) {
@ -132,9 +121,9 @@ where
let hash = repo.hash(&alias).await?;
AliasRepo::cleanup(repo, &alias).await?;
AliasRepo::cleanup(repo.as_ref(), &alias).await?;
repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo, alias.clone()).await?;
AliasAccessRepo::remove_access(repo.as_ref(), alias.clone()).await?;
let Some(hash) = hash else {
// hash doesn't exist, nothing to do
@ -149,11 +138,7 @@ where
}
#[tracing::instrument(skip_all)]
async fn all_variants<R, S>(repo: &R) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
async fn all_variants(repo: &ArcRepo) -> Result<(), Error> {
let mut hash_stream = Box::pin(repo.hashes().await);
while let Some(res) = hash_stream.next().await {
@ -165,11 +150,7 @@ where
}
#[tracing::instrument(skip_all)]
async fn outdated_variants<R, S>(repo: &R, config: &Configuration) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> {
let now = time::OffsetDateTime::now_utc();
let since = now.saturating_sub(config.media.retention.variants.to_duration());
@ -184,11 +165,7 @@ where
}
#[tracing::instrument(skip_all)]
async fn outdated_proxies<R, S>(repo: &R, config: &Configuration) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> {
let now = time::OffsetDateTime::now_utc();
let since = now.saturating_sub(config.media.retention.proxy.to_duration());
@ -201,7 +178,7 @@ where
} else {
tracing::warn!("Skipping alias cleanup - no delete token");
repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo, alias).await?;
AliasAccessRepo::remove_access(repo.as_ref(), alias).await?;
}
}
@ -209,18 +186,14 @@ where
}
#[tracing::instrument(skip_all)]
async fn hash_variant<R, S>(
repo: &R,
async fn hash_variant(
repo: &ArcRepo,
hash: Hash,
target_variant: Option<String>,
) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
) -> Result<(), Error> {
if let Some(target_variant) = target_variant {
if let Some(identifier) = repo
.variant_identifier::<S::Identifier>(hash.clone(), target_variant.clone())
.variant_identifier(hash.clone(), target_variant.clone())
.await?
{
super::cleanup_identifier(repo, identifier).await?;
@ -228,11 +201,11 @@ where
repo.remove_variant(hash.clone(), target_variant.clone())
.await?;
VariantAccessRepo::remove_access(repo, hash, target_variant).await?;
VariantAccessRepo::remove_access(repo.as_ref(), hash, target_variant).await?;
} else {
for (variant, identifier) in repo.variants::<S::Identifier>(hash.clone()).await? {
for (variant, identifier) in repo.variants(hash.clone()).await? {
repo.remove_variant(hash.clone(), variant.clone()).await?;
VariantAccessRepo::remove_access(repo, hash.clone(), variant).await?;
VariantAccessRepo::remove_access(repo.as_ref(), hash.clone(), variant).await?;
super::cleanup_identifier(repo, identifier).await?;
}
}

View file

@ -5,22 +5,21 @@ use crate::{
formats::InputProcessableFormat,
ingest::Session,
queue::{Base64Bytes, LocalBoxFuture, Process},
repo::{Alias, FullRepo, UploadId, UploadResult},
repo::{Alias, ArcRepo, UploadId, UploadResult},
serde_str::Serde,
store::{Identifier, Store},
};
use futures_util::TryStreamExt;
use std::path::PathBuf;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
pub(super) fn perform<'a, S>(
repo: &'a ArcRepo,
store: &'a S,
process_map: &'a ProcessMap,
config: &'a Configuration,
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: FullRepo + 'static,
S: Store + 'static,
{
Box::pin(async move {
@ -70,8 +69,8 @@ where
}
#[tracing::instrument(skip_all)]
async fn process_ingest<R, S>(
repo: &R,
async fn process_ingest<S>(
repo: &ArcRepo,
store: &S,
unprocessed_identifier: Vec<u8>,
upload_id: UploadId,
@ -79,7 +78,6 @@ async fn process_ingest<R, S>(
media: &crate::config::Media,
) -> Result<(), Error>
where
R: FullRepo + 'static,
S: Store + 'static,
{
let fut = async {
@ -99,7 +97,7 @@ where
let session =
crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?;
Ok(session) as Result<Session<R, S>, Error>
Ok(session) as Result<Session<S>, Error>
})
.await;
@ -130,8 +128,8 @@ where
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all)]
async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
async fn generate<S: Store + 'static>(
repo: &ArcRepo,
store: &S,
process_map: &ProcessMap,
target_format: InputProcessableFormat,
@ -146,9 +144,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
};
let path_string = process_path.to_string_lossy().to_string();
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
if identifier_opt.is_some() {
return Ok(());

View file

@ -2,16 +2,20 @@ use crate::{
config,
details::Details,
store::{Identifier, StoreError},
stream::LocalBoxStream,
};
use futures_util::Stream;
use std::{fmt::Debug, sync::Arc};
use url::Url;
use uuid::Uuid;
mod hash;
mod migrate;
pub(crate) mod sled;
pub(crate) use hash::Hash;
pub(crate) use migrate::migrate_04;
pub(crate) type ArcRepo = Arc<dyn FullRepo>;
#[derive(Clone, Debug)]
pub(crate) enum Repo {
@ -35,7 +39,9 @@ pub(crate) struct DeleteToken {
id: MaybeUuid,
}
#[derive(Debug)]
pub(crate) struct HashAlreadyExists;
#[derive(Debug)]
pub(crate) struct AliasAlreadyExists;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -74,16 +80,12 @@ pub(crate) trait FullRepo:
+ ProxyRepo
+ Send
+ Sync
+ Clone
+ Debug
{
async fn health_check(&self) -> Result<(), RepoError>;
#[tracing::instrument(skip(self))]
async fn identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
) -> Result<Option<I>, StoreError> {
async fn identifier_from_alias(&self, alias: &Alias) -> Result<Option<Arc<[u8]>>, RepoError> {
let Some(hash) = self.hash(alias).await? else {
return Ok(None);
};
@ -101,20 +103,22 @@ pub(crate) trait FullRepo:
}
#[tracing::instrument(skip(self))]
async fn still_identifier_from_alias<I: Identifier + 'static>(
async fn still_identifier_from_alias(
&self,
alias: &Alias,
) -> Result<Option<I>, StoreError> {
) -> Result<Option<Arc<[u8]>>, StoreError> {
let Some(hash) = self.hash(alias).await? else {
return Ok(None);
};
let Some(identifier) = self.identifier::<I>(hash.clone()).await? else {
let Some(identifier) = self.identifier(hash.clone()).await? else {
return Ok(None);
};
match self.details(&identifier).await? {
Some(details) if details.is_video() => self.motion_identifier::<I>(hash).await,
Some(details) if details.is_video() => {
self.motion_identifier(hash).await.map_err(From::from)
}
Some(_) => Ok(Some(identifier)),
None => Ok(None),
}
@ -122,7 +126,7 @@ pub(crate) trait FullRepo:
}
#[async_trait::async_trait(?Send)]
impl<T> FullRepo for actix_web::web::Data<T>
impl<T> FullRepo for Arc<T>
where
T: FullRepo,
{
@ -133,7 +137,7 @@ where
pub(crate) trait BaseRepo {}
impl<T> BaseRepo for actix_web::web::Data<T> where T: BaseRepo {}
impl<T> BaseRepo for Arc<T> where T: BaseRepo {}
#[async_trait::async_trait(?Send)]
pub(crate) trait ProxyRepo: BaseRepo {
@ -145,7 +149,7 @@ pub(crate) trait ProxyRepo: BaseRepo {
}
#[async_trait::async_trait(?Send)]
impl<T> ProxyRepo for actix_web::web::Data<T>
impl<T> ProxyRepo for Arc<T>
where
T: ProxyRepo,
{
@ -164,25 +168,21 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasAccessRepo: BaseRepo {
type AliasAccessStream: Stream<Item = Result<Alias, RepoError>>;
async fn accessed(&self, alias: Alias) -> Result<(), RepoError>;
async fn older_aliases(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::AliasAccessStream, RepoError>;
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError>;
async fn remove_access(&self, alias: Alias) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> AliasAccessRepo for actix_web::web::Data<T>
impl<T> AliasAccessRepo for Arc<T>
where
T: AliasAccessRepo,
{
type AliasAccessStream = T::AliasAccessStream;
async fn accessed(&self, alias: Alias) -> Result<(), RepoError> {
T::accessed(self, alias).await
}
@ -190,7 +190,7 @@ where
async fn older_aliases(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::AliasAccessStream, RepoError> {
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
T::older_aliases(self, timestamp).await
}
@ -201,8 +201,6 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait VariantAccessRepo: BaseRepo {
type VariantAccessStream: Stream<Item = Result<(Hash, String), RepoError>>;
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn contains_variant(&self, hash: Hash, variant: String) -> Result<bool, RepoError>;
@ -210,18 +208,16 @@ pub(crate) trait VariantAccessRepo: BaseRepo {
async fn older_variants(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::VariantAccessStream, RepoError>;
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError>;
async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> VariantAccessRepo for actix_web::web::Data<T>
impl<T> VariantAccessRepo for Arc<T>
where
T: VariantAccessRepo,
{
type VariantAccessStream = T::VariantAccessStream;
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::accessed(self, hash, variant).await
}
@ -233,7 +229,7 @@ where
async fn older_variants(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::VariantAccessStream, RepoError> {
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
T::older_variants(self, timestamp).await
}
@ -254,7 +250,7 @@ pub(crate) trait UploadRepo: BaseRepo {
}
#[async_trait::async_trait(?Send)]
impl<T> UploadRepo for actix_web::web::Data<T>
impl<T> UploadRepo for Arc<T>
where
T: UploadRepo,
{
@ -318,7 +314,7 @@ pub(crate) trait QueueRepo: BaseRepo {
}
#[async_trait::async_trait(?Send)]
impl<T> QueueRepo for actix_web::web::Data<T>
impl<T> QueueRepo for Arc<T>
where
T: QueueRepo,
{
@ -361,7 +357,7 @@ pub(crate) trait SettingsRepo: BaseRepo {
}
#[async_trait::async_trait(?Send)]
impl<T> SettingsRepo for actix_web::web::Data<T>
impl<T> SettingsRepo for Arc<T>
where
T: SettingsRepo,
{
@ -380,34 +376,34 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait IdentifierRepo: BaseRepo {
async fn relate_details<I: Identifier>(
async fn relate_details(
&self,
identifier: &I,
identifier: &dyn Identifier,
details: &Details,
) -> Result<(), StoreError>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError>;
async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError>;
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError>;
}
#[async_trait::async_trait(?Send)]
impl<T> IdentifierRepo for actix_web::web::Data<T>
impl<T> IdentifierRepo for Arc<T>
where
T: IdentifierRepo,
{
async fn relate_details<I: Identifier>(
async fn relate_details(
&self,
identifier: &I,
identifier: &dyn Identifier,
details: &Details,
) -> Result<(), StoreError> {
T::relate_details(self, identifier, details).await
}
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError> {
async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError> {
T::details(self, identifier).await
}
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError> {
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> {
T::cleanup(self, identifier).await
}
}
@ -416,19 +412,19 @@ where
pub(crate) trait MigrationRepo: BaseRepo {
async fn is_continuing_migration(&self) -> Result<bool, RepoError>;
async fn mark_migrated<I1: Identifier, I2: Identifier>(
async fn mark_migrated(
&self,
old_identifier: &I1,
new_identifier: &I2,
old_identifier: &dyn Identifier,
new_identifier: &dyn Identifier,
) -> Result<(), StoreError>;
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError>;
async fn is_migrated(&self, identifier: &dyn Identifier) -> Result<bool, StoreError>;
async fn clear(&self) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> MigrationRepo for actix_web::web::Data<T>
impl<T> MigrationRepo for Arc<T>
where
T: MigrationRepo,
{
@ -436,15 +432,15 @@ where
T::is_continuing_migration(self).await
}
async fn mark_migrated<I1: Identifier, I2: Identifier>(
async fn mark_migrated(
&self,
old_identifier: &I1,
new_identifier: &I2,
old_identifier: &dyn Identifier,
new_identifier: &dyn Identifier,
) -> Result<(), StoreError> {
T::mark_migrated(self, old_identifier, new_identifier).await
}
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
async fn is_migrated(&self, identifier: &dyn Identifier) -> Result<bool, StoreError> {
T::is_migrated(self, identifier).await
}
@ -455,118 +451,99 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo: BaseRepo {
type Stream: Stream<Item = Result<Hash, RepoError>>;
async fn size(&self) -> Result<u64, RepoError>;
async fn hashes(&self) -> Self::Stream;
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>>;
async fn create<I: Identifier>(
async fn create(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<Result<(), HashAlreadyExists>, StoreError>;
async fn update_identifier<I: Identifier>(
async fn update_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError>;
async fn identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError>;
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn relate_variant_identifier<I: Identifier>(
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError>;
async fn variant_identifier<I: Identifier + 'static>(
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<I>, StoreError>;
async fn variants<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Vec<(String, I)>, StoreError>;
) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<[u8]>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn relate_motion_identifier<I: Identifier>(
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError>;
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError>;
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> HashRepo for actix_web::web::Data<T>
impl<T> HashRepo for Arc<T>
where
T: HashRepo,
{
type Stream = T::Stream;
async fn size(&self) -> Result<u64, RepoError> {
T::size(self).await
}
async fn hashes(&self) -> Self::Stream {
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>> {
T::hashes(self).await
}
async fn create<I: Identifier>(
async fn create(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<Result<(), HashAlreadyExists>, StoreError> {
T::create(self, hash, identifier).await
}
async fn update_identifier<I: Identifier>(
async fn update_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
T::update_identifier(self, hash, identifier).await
}
async fn identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError> {
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
T::identifier(self, hash).await
}
async fn relate_variant_identifier<I: Identifier>(
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
T::relate_variant_identifier(self, hash, variant, identifier).await
}
async fn variant_identifier<I: Identifier + 'static>(
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<I>, StoreError> {
) -> Result<Option<Arc<[u8]>>, RepoError> {
T::variant_identifier(self, hash, variant).await
}
async fn variants<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Vec<(String, I)>, StoreError> {
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<[u8]>)>, RepoError> {
T::variants(self, hash).await
}
@ -574,18 +551,15 @@ where
T::remove_variant(self, hash, variant).await
}
async fn relate_motion_identifier<I: Identifier>(
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError> {
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
T::motion_identifier(self, hash).await
}
@ -613,7 +587,7 @@ pub(crate) trait AliasRepo: BaseRepo {
}
#[async_trait::async_trait(?Send)]
impl<T> AliasRepo for actix_web::web::Data<T>
impl<T> AliasRepo for Arc<T>
where
T: AliasRepo,
{
@ -658,6 +632,12 @@ impl Repo {
}
}
}
pub(crate) fn to_arc(&self) -> ArcRepo {
match self {
Self::Sled(sled_repo) => Arc::new(sled_repo.clone()),
}
}
}
impl MaybeUuid {
@ -760,11 +740,11 @@ impl DeleteToken {
}
}
fn to_bytes(&self) -> Vec<u8> {
pub(crate) fn to_bytes(&self) -> Vec<u8> {
self.id.as_bytes().to_vec()
}
fn from_slice(bytes: &[u8]) -> Option<Self> {
pub(crate) fn from_slice(bytes: &[u8]) -> Option<Self> {
if let Ok(s) = std::str::from_utf8(bytes) {
Some(DeleteToken::from_existing(s))
} else if bytes.len() == 16 {

View file

@ -70,7 +70,7 @@ impl Hash {
impl std::fmt::Debug for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Hash")
.field("hash", &hex::encode(&*self.hash))
.field("hash", &hex::encode(*self.hash))
.field("format", &self.format)
.field("size", &self.size)
.finish()

180
src/repo/migrate.rs Normal file
View file

@ -0,0 +1,180 @@
use crate::{
config::Configuration,
details::Details,
error::Error,
repo::{AliasRepo, ArcRepo, DeleteToken, Hash, HashRepo, VariantAccessRepo},
repo_04::{
AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _,
SledRepo as OldSledRepo,
},
store::Store,
stream::IntoStreamer,
};
pub(crate) async fn migrate_04<S: Store>(
old_repo: OldSledRepo,
new_repo: &ArcRepo,
store: &S,
config: &Configuration,
) -> Result<(), Error> {
tracing::warn!("Running checks");
if let Err(e) = old_repo.health_check().await {
tracing::warn!("Old repo is not configured correctly");
return Err(e.into());
}
if let Err(e) = new_repo.health_check().await {
tracing::warn!("New repo is not configured correctly");
return Err(e.into());
}
if let Err(e) = store.health_check().await {
tracing::warn!("Store is not configured correctly");
return Err(e.into());
}
tracing::warn!("Checks complete, migrating repo");
tracing::warn!("{} hashes will be migrated", old_repo.size().await?);
let mut hash_stream = old_repo.hashes().await.into_streamer();
while let Some(res) = hash_stream.next().await {
if let Ok(hash) = res {
let _ = migrate_hash_04(&old_repo, new_repo, store, config, hash).await;
} else {
tracing::warn!("Failed to read hash, skipping");
}
}
if let Some(generator_state) = old_repo.get("last-path").await? {
new_repo
.set("last-path", generator_state.to_vec().into())
.await?;
}
Ok(())
}
async fn migrate_hash_04<S: Store>(
old_repo: &OldSledRepo,
new_repo: &ArcRepo,
store: &S,
config: &Configuration,
old_hash: sled::IVec,
) -> Result<(), Error> {
let mut hash_failures = 0;
while let Err(e) = do_migrate_hash_04(old_repo, new_repo, store, config, old_hash.clone()).await
{
hash_failures += 1;
if hash_failures > 10 {
tracing::error!(
"Failed to migrate hash {}, skipping\n{}",
hex::encode(&old_hash[..]),
format!("{e:?}")
);
return Err(e);
} else {
tracing::warn!(
"Failed to migrate hash {}, retrying +1",
hex::encode(&old_hash[..])
);
}
}
Ok(())
}
async fn do_migrate_hash_04<S: Store>(
old_repo: &OldSledRepo,
new_repo: &ArcRepo,
store: &S,
config: &Configuration,
old_hash: sled::IVec,
) -> Result<(), Error> {
let Some(identifier) = old_repo.identifier::<S::Identifier>(old_hash.clone()).await? else {
tracing::warn!("Skipping hash {}, no identifier", hex::encode(&old_hash));
return Ok(());
};
let size = store.len(&identifier).await?;
let hash_details = details(old_repo, store, config, &identifier).await?;
let aliases = old_repo.for_hash(old_hash.clone()).await?;
let variants = old_repo.variants::<S::Identifier>(old_hash.clone()).await?;
let motion_identifier = old_repo
.motion_identifier::<S::Identifier>(old_hash.clone())
.await?;
let hash = old_hash[..].try_into().expect("Invalid hash size");
let hash = Hash::new(
hash,
size,
hash_details.internal_format().expect("format exists"),
);
HashRepo::create(new_repo.as_ref(), hash.clone(), &identifier)
.await?
.expect("not duplicate");
new_repo.relate_details(&identifier, &hash_details).await?;
for alias in aliases {
let delete_token = old_repo
.delete_token(&alias)
.await?
.unwrap_or_else(DeleteToken::generate);
AliasRepo::create(new_repo.as_ref(), &alias, &delete_token, hash.clone())
.await?
.expect("not duplicate");
}
if let Some(motion_identifier) = motion_identifier {
let motion_details = details(old_repo, store, config, &motion_identifier).await?;
new_repo
.relate_motion_identifier(hash.clone(), &motion_identifier)
.await?;
new_repo
.relate_details(&motion_identifier, &motion_details)
.await?;
}
for (variant, identifier) in variants {
let variant_details = details(old_repo, store, config, &identifier).await?;
new_repo
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
.await?;
new_repo
.relate_details(&identifier, &variant_details)
.await?;
VariantAccessRepo::accessed(new_repo.as_ref(), hash.clone(), variant).await?;
}
Ok(())
}
async fn details<S: Store>(
old_repo: &OldSledRepo,
store: &S,
config: &Configuration,
identifier: &S::Identifier,
) -> Result<Details, Error> {
let details_opt = old_repo.details(identifier).await?.and_then(|details| {
if details.internal_format().is_some() {
Some(details)
} else {
None
}
});
if let Some(details) = details_opt {
Ok(details)
} else {
Details::from_store(store, identifier, config.media.process_timeout)
.await
.map_err(From::from)
}
}

View file

@ -8,21 +8,19 @@ use crate::{
},
serde_str::Serde,
store::StoreError,
stream::from_iterator,
stream::{from_iterator, LocalBoxStream},
};
use futures_util::{Future, Stream};
use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree};
use std::{
collections::HashMap,
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::Instant,
};
use tokio::{sync::Notify, task::JoinHandle};
use tokio::sync::Notify;
use url::Url;
use uuid::Uuid;
@ -142,25 +140,6 @@ impl SledRepo {
Ok(db)
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn mark_accessed<I: Identifier + 'static>(&self) -> Result<(), StoreError> {
use futures_util::StreamExt;
let mut stream = self.hashes().await;
while let Some(res) = stream.next().await {
let hash = res?;
for (variant, _) in self.variants::<I>(hash.clone()).await? {
if !self.contains_variant(hash.clone(), variant.clone()).await? {
VariantAccessRepo::accessed(self, hash.clone(), variant).await?;
}
}
}
Ok(())
}
#[tracing::instrument(level = "warn", skip_all)]
pub(crate) async fn export(&self) -> Result<(), RepoError> {
let path = self
@ -239,104 +218,8 @@ impl ProxyRepo for SledRepo {
}
}
type IterValue = Option<(sled::Iter, Result<IVec, RepoError>)>;
pub(crate) struct IterStream {
iter: Option<sled::Iter>,
next: Option<JoinHandle<IterValue>>,
}
impl futures_util::Stream for IterStream {
type Item = Result<IVec, RepoError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if let Some(ref mut next) = self.next {
let res = std::task::ready!(Pin::new(next).poll(cx));
self.next.take();
let opt = match res {
Ok(opt) => opt,
Err(_) => return std::task::Poll::Ready(Some(Err(RepoError::Canceled))),
};
if let Some((iter, res)) = opt {
self.iter = Some(iter);
std::task::Poll::Ready(Some(res))
} else {
std::task::Poll::Ready(None)
}
} else if let Some(mut iter) = self.iter.take() {
self.next = Some(actix_rt::task::spawn_blocking(move || {
let opt = iter
.next()
.map(|res| res.map_err(SledError::from).map_err(RepoError::from));
opt.map(|res| (iter, res.map(|(_, value)| value)))
}));
self.poll_next(cx)
} else {
std::task::Poll::Ready(None)
}
}
}
pub(crate) struct AliasAccessStream {
iter: IterStream,
}
pub(crate) struct VariantAccessStream {
iter: IterStream,
}
impl futures_util::Stream for AliasAccessStream {
type Item = Result<Alias, RepoError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) {
Some(Ok(bytes)) => {
if let Some(alias) = Alias::from_slice(&bytes) {
std::task::Poll::Ready(Some(Ok(alias)))
} else {
self.poll_next(cx)
}
}
Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
None => std::task::Poll::Ready(None),
}
}
}
impl futures_util::Stream for VariantAccessStream {
type Item = Result<(Hash, String), RepoError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match std::task::ready!(Pin::new(&mut self.iter).poll_next(cx)) {
Some(Ok(bytes)) => std::task::Poll::Ready(Some(
parse_variant_access_key(bytes)
.map_err(SledError::from)
.map_err(RepoError::from),
)),
Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
None => std::task::Poll::Ready(None),
}
}
}
#[async_trait::async_trait(?Send)]
impl AliasAccessRepo for SledRepo {
type AliasAccessStream = AliasAccessStream;
#[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, alias: Alias) -> Result<(), RepoError> {
let now_string = time::OffsetDateTime::now_utc()
@ -362,24 +245,22 @@ impl AliasAccessRepo for SledRepo {
async fn older_aliases(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::AliasAccessStream, RepoError> {
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
let time_string = timestamp
.format(&time::format_description::well_known::Rfc3339)
.map_err(SledError::Format)?;
let inverse_alias_access = self.inverse_alias_access.clone();
let iterator = self
.inverse_alias_access
.range(..=time_string)
.filter_map(|res| {
res.map_err(SledError::from)
.map_err(RepoError::from)
.map(|(_, value)| Alias::from_slice(&value))
.transpose()
});
let iter =
actix_rt::task::spawn_blocking(move || inverse_alias_access.range(..=time_string))
.await
.map_err(|_| RepoError::Canceled)?;
Ok(AliasAccessStream {
iter: IterStream {
iter: Some(iter),
next: None,
},
})
Ok(Box::pin(from_iterator(iterator, 8)))
}
#[tracing::instrument(level = "debug", skip(self))]
@ -401,8 +282,6 @@ impl AliasAccessRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl VariantAccessRepo for SledRepo {
type VariantAccessStream = VariantAccessStream;
#[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes();
@ -441,24 +320,23 @@ impl VariantAccessRepo for SledRepo {
async fn older_variants(
&self,
timestamp: time::OffsetDateTime,
) -> Result<Self::VariantAccessStream, RepoError> {
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
let time_string = timestamp
.format(&time::format_description::well_known::Rfc3339)
.map_err(SledError::Format)?;
let inverse_variant_access = self.inverse_variant_access.clone();
let iterator = self
.inverse_variant_access
.range(..=time_string)
.map(|res| {
let (_, bytes) = res.map_err(SledError::from)?;
let iter =
actix_rt::task::spawn_blocking(move || inverse_variant_access.range(..=time_string))
.await
.map_err(|_| RepoError::Canceled)?;
parse_variant_access_key(bytes)
.map_err(SledError::from)
.map_err(RepoError::from)
});
Ok(VariantAccessStream {
iter: IterStream {
iter: Some(iter),
next: None,
},
})
Ok(Box::pin(from_iterator(iterator, 8)))
}
#[tracing::instrument(level = "debug", skip(self))]
@ -973,9 +851,9 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
#[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_details<I: Identifier>(
async fn relate_details(
&self,
identifier: &I,
identifier: &dyn Identifier,
details: &Details,
) -> Result<(), StoreError> {
let key = identifier.to_bytes()?;
@ -992,7 +870,7 @@ impl IdentifierRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError> {
async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError> {
let key = identifier.to_bytes()?;
let opt = b!(self.identifier_details, identifier_details.get(key));
@ -1005,7 +883,7 @@ impl IdentifierRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError> {
async fn cleanup(&self, identifier: &dyn Identifier) -> Result<(), StoreError> {
let key = identifier.to_bytes()?;
b!(self.identifier_details, identifier_details.remove(key));
@ -1020,10 +898,10 @@ impl MigrationRepo for SledRepo {
Ok(!self.migration_identifiers.is_empty())
}
async fn mark_migrated<I1: Identifier, I2: Identifier>(
async fn mark_migrated(
&self,
old_identifier: &I1,
new_identifier: &I2,
old_identifier: &dyn Identifier,
new_identifier: &dyn Identifier,
) -> Result<(), StoreError> {
let key = new_identifier.to_bytes()?;
let value = old_identifier.to_bytes()?;
@ -1036,7 +914,7 @@ impl MigrationRepo for SledRepo {
Ok(())
}
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
async fn is_migrated(&self, identifier: &dyn Identifier) -> Result<bool, StoreError> {
let key = identifier.to_bytes()?;
Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some())
@ -1049,13 +927,8 @@ impl MigrationRepo for SledRepo {
}
}
type StreamItem = Result<Hash, RepoError>;
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
type Stream = LocalBoxStream<'static, StreamItem>;
async fn size(&self) -> Result<u64, RepoError> {
Ok(b!(
self.hashes,
@ -1064,7 +937,7 @@ impl HashRepo for SledRepo {
))
}
async fn hashes(&self) -> Self::Stream {
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>> {
let iter = self.hashes.iter().keys().filter_map(|res| {
res.map_err(SledError::from)
.map_err(RepoError::from)
@ -1076,10 +949,10 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn create<I: Identifier>(
async fn create(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<Result<(), HashAlreadyExists>, StoreError> {
let identifier: sled::IVec = identifier.to_bytes()?.into();
@ -1111,10 +984,10 @@ impl HashRepo for SledRepo {
}
}
async fn update_identifier<I: Identifier>(
async fn update_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
let identifier = identifier.to_bytes()?;
@ -1129,25 +1002,22 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError> {
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
let hash = hash.to_ivec();
let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else {
return Ok(None);
};
Ok(Some(I::from_bytes(ivec.to_vec())?))
Ok(Some(Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_variant_identifier<I: Identifier>(
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
let hash = hash.to_bytes();
@ -1163,11 +1033,11 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_identifier<I: Identifier + 'static>(
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<I>, StoreError> {
) -> Result<Option<Arc<[u8]>>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
@ -1177,14 +1047,11 @@ impl HashRepo for SledRepo {
hash_variant_identifiers.get(key)
);
opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose()
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Vec<(String, I)>, StoreError> {
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<[u8]>)>, RepoError> {
let hash = hash.to_ivec();
let vec = b!(
@ -1193,20 +1060,14 @@ impl HashRepo for SledRepo {
.scan_prefix(hash.clone())
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = I::from_bytes(ivec.to_vec()).ok();
if identifier.is_none() {
tracing::warn!(
"Skipping an identifier: {}",
String::from_utf8_lossy(&ivec)
);
}
let identifier = Arc::from(ivec.to_vec());
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
Some((variant?, identifier))
})
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
@ -1229,10 +1090,10 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_motion_identifier<I: Identifier>(
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &I,
identifier: &dyn Identifier,
) -> Result<(), StoreError> {
let hash = hash.to_ivec();
let bytes = identifier.to_bytes()?;
@ -1246,10 +1107,7 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Hash,
) -> Result<Option<I>, StoreError> {
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
let hash = hash.to_ivec();
let opt = b!(
@ -1257,7 +1115,7 @@ impl HashRepo for SledRepo {
hash_motion_identifiers.get(hash)
);
opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose()
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(skip(self))]

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
use actix_web::web::Bytes;
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::stream::Stream;
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) mod file_store;
@ -59,19 +59,23 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
}
}
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
pub(crate) trait Identifier: Send + Sync + Debug {
fn to_bytes(&self) -> Result<Vec<u8>, StoreError>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError>
where
Self: Sized;
fn from_arc(arc: Arc<[u8]>) -> Result<Self, StoreError>
where
Self: Sized;
fn string_repr(&self) -> String;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Clone + Debug {
type Identifier: Identifier + 'static;
type Identifier: Identifier + Clone + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn health_check(&self) -> Result<(), StoreError>;
@ -278,6 +282,13 @@ impl Identifier for Vec<u8> {
Ok(bytes)
}
fn from_arc(arc: Arc<[u8]>) -> Result<Self, StoreError>
where
Self: Sized,
{
Ok(Vec::from(&arc[..]))
}
fn to_bytes(&self) -> Result<Vec<u8>, StoreError> {
Ok(self.clone())
}
@ -286,3 +297,27 @@ impl Identifier for Vec<u8> {
BASE64_STANDARD.encode(self.as_slice())
}
}
impl Identifier for Arc<[u8]> {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError>
where
Self: Sized,
{
Ok(Arc::from(bytes))
}
fn from_arc(arc: Arc<[u8]>) -> Result<Self, StoreError>
where
Self: Sized,
{
Ok(arc)
}
fn to_bytes(&self) -> Result<Vec<u8>, StoreError> {
Ok(Vec::from(&self[..]))
}
fn string_repr(&self) -> String {
BASE64_STANDARD.encode(&self[..])
}
}

View file

@ -30,6 +30,13 @@ impl Identifier for FileId {
Ok(id)
}
fn from_arc(arc: std::sync::Arc<[u8]>) -> Result<Self, StoreError>
where
Self: Sized,
{
Self::from_bytes(Vec::from(&arc[..]))
}
fn string_repr(&self) -> String {
self.0.to_string_lossy().into_owned()
}

View file

@ -14,6 +14,13 @@ impl Identifier for ObjectId {
))
}
fn from_arc(arc: std::sync::Arc<[u8]>) -> Result<Self, StoreError>
where
Self: Sized,
{
Self::from_bytes(Vec::from(&arc[..]))
}
fn string_repr(&self) -> String {
self.0.clone()
}

View file

@ -12,6 +12,8 @@ use std::{
time::Duration,
};
pub(crate) type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
pub(crate) trait StreamLimit {
fn limit(self, limit: u64) -> Limit<Self>
where
@ -39,6 +41,17 @@ pub(crate) trait StreamTimeout {
}
}
pub(crate) trait IntoStreamer: Stream {
fn into_streamer(self) -> Streamer<Self>
where
Self: Sized,
{
Streamer(Some(self))
}
}
impl<T> IntoStreamer for T where T: Stream + Unpin {}
pub(crate) fn from_iterator<I: IntoIterator + Unpin + Send + 'static>(
iterator: I,
buffer: usize,
@ -51,6 +64,28 @@ pub(crate) fn from_iterator<I: IntoIterator + Unpin + Send + 'static>(
impl<S, E> StreamLimit for S where S: Stream<Item = Result<Bytes, E>> {}
impl<S> StreamTimeout for S where S: Stream {}
pub(crate) struct Streamer<S>(Option<S>);
impl<S> Streamer<S> {
pub(crate) async fn next(&mut self) -> Option<S::Item>
where
S: Stream + Unpin,
{
let opt = match self.0 {
Some(ref mut stream) => {
std::future::poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
None => None,
};
if opt.is_none() {
self.0.take();
}
opt
}
}
pin_project_lite::pin_project! {
pub(crate) struct Limit<S> {
#[pin]