Move away from UploadManager to direct repo & store actions

This commit is contained in:
Aode (Lion) 2022-04-02 16:44:03 -05:00
parent 6ed592c432
commit 09f53b9ce6
10 changed files with 666 additions and 847 deletions

218
src/ingest.rs Normal file
View file

@ -0,0 +1,218 @@
use crate::{
error::{Error, UploadError},
magick::ValidInputType,
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
store::Store,
CONFIG,
};
use actix_web::web::{Bytes, BytesMut};
use futures_util::{Stream, StreamExt};
use once_cell::sync::Lazy;
use sha2::{Digest, Sha256};
use tokio::sync::Semaphore;
use tracing::debug;
mod hasher;
use hasher::Hasher;
static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(num_cpus::get()));
pub(crate) struct Session<R, S>
where
R: FullRepo + 'static,
S: Store,
{
repo: R,
hash: Option<Vec<u8>>,
alias: Option<Alias>,
identifier: Option<S::Identifier>,
}
pub(crate) async fn ingest<R, S>(
repo: &R,
store: &S,
stream: impl Stream<Item = Result<Bytes, Error>>,
declared_alias: Option<Alias>,
should_validate: bool,
) -> Result<Session<R, S>, Error>
where
R: FullRepo + 'static,
S: Store,
{
let permit = PROCESS_SEMAPHORE.acquire().await;
let mut bytes_mut = BytesMut::new();
futures_util::pin_mut!(stream);
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (input_type, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
CONFIG.media.format,
CONFIG.media.enable_silent_video,
should_validate,
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, Sha256::new());
let identifier = store.save_async_read(&mut hasher_reader).await?;
drop(permit);
let mut session = Session {
repo: repo.clone(),
hash: None,
alias: None,
identifier: Some(identifier.clone()),
};
let hash = hasher_reader.finalize_reset().await?;
session.hash = Some(hash.clone());
debug!("Saving upload");
save_upload(repo, store, &hash, &identifier).await?;
debug!("Adding alias");
if let Some(alias) = declared_alias {
session.add_existing_alias(&hash, alias).await?
} else {
session.create_alias(&hash, input_type).await?;
}
Ok(session)
}
async fn save_upload<R, S>(
repo: &R,
store: &S,
hash: &[u8],
identifier: &S::Identifier,
) -> Result<(), Error>
where
S: Store,
R: FullRepo,
{
if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() {
store.remove(identifier).await?;
return Ok(());
}
repo.relate_identifier(hash.to_vec().into(), identifier)
.await?;
Ok(())
}
impl<R, S> Session<R, S>
where
R: FullRepo + 'static,
S: Store,
{
pub(crate) fn disarm(&mut self) {
let _ = self.alias.take();
let _ = self.identifier.take();
}
pub(crate) fn alias(&self) -> Option<&Alias> {
self.alias.as_ref()
}
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
let delete_token = DeleteToken::generate();
debug!("Saving delete token");
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
if res.is_err() {
let delete_token = self.repo.delete_token(&alias).await?;
debug!("Returning existing delete token, {:?}", delete_token);
return Ok(delete_token);
}
debug!("Returning new delete token, {:?}", delete_token);
Ok(delete_token)
}
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias)
.await?
.map_err(|_| UploadError::DuplicateAlias)?;
self.alias = Some(alias.clone());
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
Ok(())
}
async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> {
debug!("Alias gen loop");
loop {
let alias = Alias::generate(input_type.as_ext().to_string());
if AliasRepo::create(&self.repo, &alias).await?.is_ok() {
self.alias = Some(alias.clone());
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
return Ok(());
}
debug!("Alias exists, regenerating");
}
}
}
impl<R, S> Drop for Session<R, S>
where
R: FullRepo + 'static,
S: Store,
{
fn drop(&mut self) {
if let Some(hash) = self.hash.take() {
let repo = self.repo.clone();
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
});
}
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
actix_rt::spawn(async move {
if let Ok(token) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
}
});
}
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
});
}
}
}

View file

@ -16,10 +16,6 @@ pin_project_lite::pin_project! {
}
}
pub(super) struct Hash {
inner: Vec<u8>,
}
impl<I, D> Hasher<I, D>
where
D: Digest + FixedOutputReset + Send + 'static,
@ -31,27 +27,13 @@ where
}
}
pub(super) async fn finalize_reset(self) -> Result<Hash, Error> {
pub(super) async fn finalize_reset(self) -> Result<Vec<u8>, Error> {
let mut hasher = self.hasher;
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
let hash = web::block(move || hasher.finalize_reset().to_vec()).await?;
Ok(hash)
}
}
impl Hash {
fn new(inner: Vec<u8>) -> Self {
Hash { inner }
}
pub(super) fn as_slice(&self) -> &[u8] {
&self.inner
}
pub(super) fn into_inner(self) -> Vec<u8> {
self.inner
}
}
impl<I, D> AsyncRead for Hasher<I, D>
where
I: AsyncRead,
@ -77,12 +59,6 @@ where
}
}
impl std::fmt::Debug for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", base64::encode(&self.inner))
}
}
#[cfg(test)]
mod test {
use super::Hasher;
@ -127,6 +103,6 @@ mod test {
hasher.update(vec);
let correct_hash = hasher.finalize_reset().to_vec();
assert_eq!(hash.inner, correct_hash);
assert_eq!(hash, correct_hash);
}
}

View file

@ -30,6 +30,7 @@ mod error;
mod exiftool;
mod ffmpeg;
mod file;
mod ingest;
mod init_tracing;
mod magick;
mod middleware;
@ -43,26 +44,24 @@ mod serde_str;
mod store;
mod stream;
mod tmp_file;
mod upload_manager;
mod validate;
use crate::stream::StreamTimeout;
use self::{
concurrent_processor::CancelSafeProcessor,
config::{Configuration, ImageFormat, Operation},
details::Details,
either::Either,
error::{Error, UploadError},
ffmpeg::{InputFormat, ThumbnailFormat},
ingest::Session,
init_tracing::init_tracing,
magick::details_hint,
middleware::{Deadline, Internal},
migrate::LatestDb,
repo::{Alias, DeleteToken, Repo},
repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo},
serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Store},
stream::StreamLimit,
upload_manager::{UploadManager, UploadManagerSession},
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
stream::{StreamLimit, StreamTimeout},
};
const MEGABYTES: usize = 1024 * 1024;
@ -78,10 +77,10 @@ static PROCESS_SEMAPHORE: Lazy<Semaphore> =
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)));
/// Handle responding to succesful uploads
#[instrument(name = "Uploaded files", skip(value, manager))]
async fn upload<S: Store>(
value: Value<UploadManagerSession<S>>,
manager: web::Data<UploadManager>,
#[instrument(name = "Uploaded files", skip(value))]
async fn upload<R: FullRepo, S: Store + 'static>(
value: Value<Session<R, S>>,
repo: web::Data<R>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let images = value
@ -100,8 +99,8 @@ async fn upload<S: Store>(
info!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = image.result.delete_token().await?;
let identifier = manager.identifier_from_alias::<S>(alias).await?;
let details = manager.details(&identifier).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(alias).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
debug!("details exist");
@ -112,7 +111,7 @@ async fn upload<S: Store>(
let new_details =
Details::from_store((**store).clone(), identifier.clone(), hint).await?;
debug!("storing details for {:?}", identifier);
manager.store_details(&identifier, &new_details).await?;
repo.relate_details(&identifier, &new_details).await?;
debug!("stored");
new_details
};
@ -125,8 +124,8 @@ async fn upload<S: Store>(
}
}
for image in images {
image.result.succeed();
for mut image in images {
image.result.disarm();
}
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
@ -140,10 +139,10 @@ struct UrlQuery {
}
/// download an image from a URL
#[instrument(name = "Downloading file", skip(client, manager))]
async fn download<S: Store + 'static>(
#[instrument(name = "Downloading file", skip(client, repo))]
async fn download<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
manager: web::Data<UploadManager>,
repo: web::Data<R>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> {
@ -157,31 +156,25 @@ async fn download<S: Store + 'static>(
.map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
futures_util::pin_mut!(stream);
let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?;
let permit = PROCESS_SEMAPHORE.acquire().await?;
let session = manager
.session((**store).clone())
.upload(CONFIG.media.enable_silent_video, stream)
.await?;
let alias = session.alias().unwrap().to_owned();
drop(permit);
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = manager.details(&identifier).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager.store_details(&identifier, &new_details).await?;
repo.relate_details(&identifier, &new_details).await?;
new_details
};
session.succeed();
session.disarm();
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
"files": [{
@ -193,9 +186,9 @@ async fn download<S: Store + 'static>(
}
/// Delete aliases and files
#[instrument(name = "Deleting file", skip(manager))]
async fn delete(
manager: web::Data<UploadManager>,
#[instrument(name = "Deleting file", skip(repo))]
async fn delete<R: FullRepo>(
repo: web::Data<R>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
let (token, alias) = path_entries.into_inner();
@ -203,7 +196,7 @@ async fn delete(
let token = DeleteToken::from_existing(&token);
let alias = Alias::from_existing(&alias);
manager.delete(alias, token).await?;
queue::cleanup_alias(&**repo, alias, token).await?;
Ok(HttpResponse::NoContent().finish())
}
@ -249,20 +242,21 @@ fn prepare_process(
Ok((format, alias, thumbnail_path, thumbnail_args))
}
#[instrument(name = "Fetching derived details", skip(manager))]
async fn process_details<S: Store>(
#[instrument(name = "Fetching derived details", skip(repo))]
async fn process_details<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
repo: web::Data<R>,
) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?;
let identifier = manager
.variant_identifier::<S>(&alias, &thumbnail_path)
let hash = repo.hash(&alias).await?;
let identifier = repo
.variant_identifier::<S::Identifier>(hash, thumbnail_path.to_string_lossy().to_string())
.await?
.ok_or(UploadError::MissingAlias)?;
let details = manager.details(&identifier).await?;
let details = repo.details(&identifier).await?;
let details = details.ok_or(UploadError::NoFiles)?;
@ -270,38 +264,60 @@ async fn process_details<S: Store>(
}
/// Process files
#[instrument(name = "Serving processed image", skip(manager))]
async fn process<S: Store + 'static>(
#[instrument(name = "Serving processed image", skip(repo))]
async fn process<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
repo: web::Data<R>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
let identifier_opt = manager
.variant_identifier::<S>(&alias, &thumbnail_path)
let path_string = thumbnail_path.to_string_lossy().to_string();
let hash = repo.hash(&alias).await?;
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
if let Some(identifier) = identifier_opt {
let details_opt = manager.details(&identifier).await?;
let details_opt = repo.details(&identifier).await?;
let details = if let Some(details) = details_opt {
details
} else {
let hint = details_hint(&alias);
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager.store_details(&identifier, &details).await?;
repo.relate_details(&identifier, &details).await?;
details
};
return ranged_file_resp(&**store, identifier, range, details).await;
}
let identifier = manager
.still_identifier_from_alias((**store).clone(), &alias)
let identifier = if let Some(identifier) = repo
.still_identifier_from_alias::<S::Identifier>(&alias)
.await?
{
identifier
} else {
let identifier = repo.identifier(hash.clone()).await?;
let permit = PROCESS_SEMAPHORE.acquire().await;
let mut reader = crate::ffmpeg::thumbnail(
(**store).clone(),
identifier,
InputFormat::Mp4,
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = store.save_async_read(&mut reader).await?;
drop(permit);
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
.await?;
motion_identifier
};
let thumbnail_path2 = thumbnail_path.clone();
let identifier2 = identifier.clone();
@ -326,9 +342,12 @@ async fn process<S: Store + 'static>(
let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?;
let identifier = store.save_bytes(bytes.clone()).await?;
manager.store_details(&identifier, &details).await?;
manager
.store_variant(&alias, &thumbnail_path, &identifier)
repo.relate_details(&identifier, &details).await?;
repo.relate_variant_identifier(
hash,
thumbnail_path.to_string_lossy().to_string(),
&identifier,
)
.await?;
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
@ -370,25 +389,25 @@ async fn process<S: Store + 'static>(
}
/// Fetch file details
#[instrument(name = "Fetching details", skip(manager))]
async fn details<S: Store + 'static>(
#[instrument(name = "Fetching details", skip(repo))]
async fn details<R: FullRepo, S: Store + 'static>(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
repo: web::Data<R>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = manager.details(&identifier).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager.store_details(&identifier, &new_details).await?;
repo.relate_details(&identifier, &new_details).await?;
new_details
};
@ -396,25 +415,25 @@ async fn details<S: Store + 'static>(
}
/// Serve files
#[instrument(name = "Serving file", skip(manager))]
async fn serve<S: Store + 'static>(
#[instrument(name = "Serving file", skip(repo))]
async fn serve<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<String>,
manager: web::Data<UploadManager>,
repo: web::Data<R>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = manager.details(&identifier).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&alias);
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager.store_details(&identifier, &details).await?;
repo.relate_details(&identifier, &details).await?;
details
};
@ -506,18 +525,17 @@ struct AliasQuery {
alias: String,
}
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge(
#[instrument(name = "Purging file", skip(repo))]
async fn purge<R: FullRepo>(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
repo: web::Data<R>,
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
let aliases = repo.aliases_from_alias(&alias).await?;
for alias in aliases.iter() {
upload_manager
.delete_without_token(alias.to_owned())
.await?;
let token = repo.delete_token(alias).await?;
queue::cleanup_alias(&**repo, alias.clone(), token).await?;
}
Ok(HttpResponse::Ok().json(&serde_json::json!({
@ -526,13 +544,13 @@ async fn purge(
})))
}
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases(
#[instrument(name = "Fetching aliases", skip(repo))]
async fn aliases<R: FullRepo>(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
repo: web::Data<R>,
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
let aliases = repo.aliases_from_alias(&alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",
@ -567,14 +585,14 @@ fn next_worker_id() -> String {
format!("{}-{}", CONFIG.server.worker_id, next_id)
}
async fn launch<S: Store + Clone + 'static>(
manager: UploadManager,
async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
repo: R,
store: S,
) -> color_eyre::Result<()> {
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let manager2 = manager.clone();
let repo2 = repo.clone();
let store2 = store.clone();
let form = Form::new()
.max_files(10)
@ -583,33 +601,24 @@ async fn launch<S: Store + Clone + 'static>(
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
let repo = repo2.clone();
let store = store2.clone();
let manager = manager2.clone();
let span = tracing::info_span!("file-upload", ?filename);
async move {
let permit = PROCESS_SEMAPHORE.acquire().await?;
let stream = stream.map_err(Error::from);
let res = manager
.session(store)
.upload(
CONFIG.media.enable_silent_video,
stream.map_err(Error::from),
Box::pin(
async move { ingest::ingest(&repo, &store, stream, None, true).await }
.instrument(span),
)
.await;
drop(permit);
res
}
.instrument(span)
})),
);
// Create a new Multipart Form validator for internal imports
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let manager2 = manager.clone();
let repo2 = repo.clone();
let store2 = store.clone();
let import_form = Form::new()
.max_files(10)
@ -618,42 +627,40 @@ async fn launch<S: Store + Clone + 'static>(
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
let repo = repo2.clone();
let store = store2.clone();
let manager = manager2.clone();
let span = tracing::info_span!("file-import", ?filename);
let stream = stream.map_err(Error::from);
Box::pin(
async move {
let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager
.session(store)
.import(
filename,
ingest::ingest(
&repo,
&store,
stream,
Some(Alias::from_existing(&filename)),
!CONFIG.media.skip_validate_imports,
CONFIG.media.enable_silent_video,
stream.map_err(Error::from),
)
.await;
drop(permit);
res
.await
}
.instrument(span)
.instrument(span),
)
})),
);
HttpServer::new(move || {
let manager = manager.clone();
let store = store.clone();
let repo = repo.clone();
actix_rt::spawn(queue::process_cleanup(
manager.repo().clone(),
repo.clone(),
store.clone(),
next_worker_id(),
));
actix_rt::spawn(queue::process_images(
manager.repo().clone(),
repo.clone(),
store.clone(),
next_worker_id(),
));
@ -661,8 +668,8 @@ async fn launch<S: Store + Clone + 'static>(
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.app_data(web::Data::new(repo))
.app_data(web::Data::new(store))
.app_data(web::Data::new(manager))
.app_data(web::Data::new(build_client()))
.service(
web::scope("/image")
@ -670,25 +677,27 @@ async fn launch<S: Store + Clone + 'static>(
web::resource("")
.guard(guard::Post())
.wrap(form.clone())
.route(web::post().to(upload::<S>)),
.route(web::post().to(upload::<R, S>)),
)
.service(web::resource("/download").route(web::get().to(download::<S>)))
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete))
.route(web::get().to(delete)),
.route(web::delete().to(delete::<R>))
.route(web::get().to(delete::<R>)),
)
.service(web::resource("/original/{filename}").route(web::get().to(serve::<S>)))
.service(web::resource("/process.{ext}").route(web::get().to(process::<S>)))
.service(
web::resource("/original/{filename}").route(web::get().to(serve::<R, S>)),
)
.service(web::resource("/process.{ext}").route(web::get().to(process::<R, S>)))
.service(
web::scope("/details")
.service(
web::resource("/original/{filename}")
.route(web::get().to(details::<S>)),
.route(web::get().to(details::<R, S>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details::<S>)),
.route(web::get().to(process_details::<R, S>)),
),
),
)
@ -700,10 +709,10 @@ async fn launch<S: Store + Clone + 'static>(
.service(
web::resource("/import")
.wrap(import_form.clone())
.route(web::post().to(upload::<S>)),
.route(web::post().to(upload::<R, S>)),
)
.service(web::resource("/purge").route(web::post().to(purge)))
.service(web::resource("/aliases").route(web::get().to(aliases))),
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>))),
)
})
.bind(CONFIG.server.address)?
@ -715,19 +724,16 @@ async fn launch<S: Store + Clone + 'static>(
Ok(())
}
async fn migrate_inner<S1>(
manager: &UploadManager,
repo: &Repo,
from: S1,
to: &config::Store,
) -> color_eyre::Result<()>
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()>
where
S1: Store,
{
match to {
config::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?;
manager.migrate_store::<S1, FileStore>(from, to).await?;
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
bucket_name,
@ -749,7 +755,9 @@ where
)
.await?;
manager.migrate_store::<S1, ObjectStore>(from, to).await?;
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
}
}
}
@ -765,15 +773,13 @@ async fn main() -> color_eyre::Result<()> {
let db = LatestDb::exists(CONFIG.old_db.path.clone()).migrate()?;
repo.from_db(db).await?;
let manager = UploadManager::new(repo.clone(), CONFIG.media.format).await?;
match (*OPERATION).clone() {
Operation::Run => (),
Operation::MigrateStore { from, to } => {
match from {
config::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&manager, &repo, from, &to).await?;
migrate_inner(&repo, from, &to).await?;
}
config::Store::ObjectStorage(config::ObjectStorage {
bucket_name,
@ -795,7 +801,7 @@ async fn main() -> color_eyre::Result<()> {
)
.await?;
migrate_inner(&manager, &repo, from, &to).await?;
migrate_inner(&repo, from, &to).await?;
}
}
@ -805,8 +811,10 @@ async fn main() -> color_eyre::Result<()> {
match CONFIG.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => {
let store = FileStore::build(path, repo).await?;
launch(manager, store).await
let store = FileStore::build(path, repo.clone()).await?;
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
bucket_name,
@ -823,12 +831,92 @@ async fn main() -> color_eyre::Result<()> {
Some(secret_key),
security_token,
session_token,
repo,
repo.clone(),
build_reqwest_client()?,
)
.await?;
launch(manager, store).await
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
}
}
}
}
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
async fn migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
where
S1: Store,
S2: Store,
R: IdentifierRepo + HashRepo + SettingsRepo,
{
let stream = repo.hashes().await;
let mut stream = Box::pin(stream);
while let Some(hash) = stream.next().await {
let hash = hash?;
if let Some(identifier) = repo
.motion_identifier(hash.as_ref().to_vec().into())
.await?
{
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
}
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? {
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?;
}
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?;
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
.await?;
}
// clean up the migration key to avoid interfering with future migrations
repo.remove(STORE_MIGRATION_PROGRESS).await?;
Ok(())
}
async fn migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
where
S1: Store,
S2: Store,
{
let stream = from.to_stream(identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let new_identifier = to.save_async_read(&mut reader).await?;
Ok(new_identifier)
}
async fn migrate_details<R, I1, I2>(repo: &R, from: I1, to: &I2) -> Result<(), Error>
where
R: IdentifierRepo,
I1: Identifier,
I2: Identifier,
{
if let Some(details) = repo.details(&from).await? {
repo.relate_details(to, &details).await?;
repo.cleanup(&from).await?;
}
Ok(())
}

View file

@ -1,9 +1,9 @@
use crate::{
config::ImageFormat,
error::Error,
repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo},
serde_str::Serde,
store::Store,
store::{Identifier, Store},
};
use std::{future::Future, path::PathBuf, pin::Pin};
use uuid::Uuid;
@ -16,8 +16,16 @@ const PROCESS_QUEUE: &str = "process";
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Cleanup {
CleanupHash { hash: Vec<u8> },
CleanupIdentifier { identifier: Vec<u8> },
Hash {
hash: Vec<u8>,
},
Identifier {
identifier: Vec<u8>,
},
Alias {
alias: Serde<Alias>,
token: Serde<DeleteToken>,
},
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
@ -36,14 +44,38 @@ enum Process {
},
}
pub(crate) async fn queue_cleanup<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::CleanupHash {
pub(crate) async fn cleanup_alias<R: QueueRepo>(
repo: &R,
alias: Alias,
token: DeleteToken,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Alias {
alias: Serde::new(alias),
token: Serde::new(token),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Hash {
hash: hash.as_ref().to_vec(),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
repo: &R,
identifier: I,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Identifier {
identifier: identifier.to_bytes()?,
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
identifier: Vec<u8>,
@ -78,16 +110,16 @@ pub(crate) async fn queue_generate<R: QueueRepo>(
Ok(())
}
pub(crate) async fn process_cleanup<S: Store>(repo: Repo, store: S, worker_id: String) {
match repo {
Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, cleanup::perform).await,
}
pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(repo: R, store: S, worker_id: String) {
process_jobs(&repo, &store, worker_id, cleanup::perform).await
}
pub(crate) async fn process_images<S: Store>(repo: Repo, store: S, worker_id: String) {
match repo {
Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, process::perform).await,
}
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store>(
repo: R,
store: S,
worker_id: String,
) {
process_jobs(&repo, &store, worker_id, process::perform).await
}
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;

View file

@ -1,7 +1,8 @@
use crate::{
error::Error,
queue::{Cleanup, LocalBoxFuture, CLEANUP_QUEUE},
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo},
error::{Error, UploadError},
queue::{Cleanup, LocalBoxFuture},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo},
serde_str::Serde,
store::{Identifier, Store},
};
use tracing::error;
@ -12,17 +13,27 @@ pub(super) fn perform<'a, R, S>(
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
R: FullRepo,
S: Store,
{
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Cleanup::CleanupHash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::CleanupIdentifier {
Cleanup::Hash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::Identifier {
identifier: in_identifier,
} => identifier(repo, &store, in_identifier).await?,
Cleanup::Alias {
alias: stored_alias,
token,
} => {
alias(
repo,
Serde::into_inner(stored_alias),
Serde::into_inner(token),
)
.await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
@ -36,8 +47,7 @@ where
#[tracing::instrument(skip(repo, store))]
async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
R: FullRepo,
S: Store,
{
let identifier = S::Identifier::from_bytes(identifier)?;
@ -67,8 +77,7 @@ where
#[tracing::instrument(skip(repo))]
async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
R: FullRepo,
S: Store,
{
let hash: R::Bytes = hash.into();
@ -89,13 +98,31 @@ where
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
if let Ok(identifier) = identifier.to_bytes() {
let job = serde_json::to_vec(&Cleanup::CleanupIdentifier { identifier })?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
}
let _ = crate::queue::cleanup_identifier(repo, identifier).await;
}
HashRepo::cleanup(repo, hash).await?;
Ok(())
}
async fn alias<R>(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error>
where
R: FullRepo,
{
let saved_delete_token = repo.delete_token(&alias).await?;
if saved_delete_token != token {
return Err(UploadError::InvalidToken.into());
}
let hash = repo.hash(&alias).await?;
AliasRepo::cleanup(repo, &alias).await?;
repo.remove_alias(hash.clone(), &alias).await?;
if repo.aliases(hash.clone()).await?.is_empty() {
crate::queue::cleanup_hash(repo, hash).await?;
}
Ok(())
}

View file

@ -1,13 +1,14 @@
use crate::{
config::ImageFormat,
error::Error,
ingest::Session,
queue::{LocalBoxFuture, Process},
repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo},
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
serde_str::Serde,
store::Store,
store::{Identifier, Store},
};
use futures_util::TryStreamExt;
use std::path::PathBuf;
use uuid::Uuid;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
@ -15,8 +16,7 @@ pub(super) fn perform<'a, R, S>(
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
R: FullRepo + 'static,
S: Store,
{
Box::pin(async move {
@ -28,11 +28,11 @@ where
declared_alias,
should_validate,
} => {
ingest(
process_ingest(
repo,
store,
identifier,
upload_id,
upload_id.into(),
declared_alias.map(Serde::into_inner),
should_validate,
)
@ -64,15 +64,54 @@ where
})
}
async fn ingest<R, S>(
#[tracing::instrument(skip(repo, store))]
async fn process_ingest<R, S>(
repo: &R,
store: &S,
identifier: Vec<u8>,
upload_id: Uuid,
unprocessed_identifier: Vec<u8>,
upload_id: UploadId,
declared_alias: Option<Alias>,
should_validate: bool,
) -> Result<(), Error> {
unimplemented!("do this")
) -> Result<(), Error>
where
R: FullRepo + 'static,
S: Store,
{
let fut = async {
let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?;
let stream = store
.to_stream(&unprocessed_identifier, None, None)
.await?
.map_err(Error::from);
let session =
crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?;
let token = session.delete_token().await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
};
let result = match fut.await {
Ok((mut session, token)) => {
let alias = session.alias().take().expect("Alias should exist").clone();
let result = UploadResult::Success { alias, token };
session.disarm();
result
}
Err(e) => {
tracing::warn!("Failed to ingest {}, {:?}", e, e);
UploadResult::Failure {
message: e.to_string(),
}
}
};
repo.complete(upload_id, result).await?;
Ok(())
}
async fn generate<R, S>(

View file

@ -1,5 +1,6 @@
use crate::{config, details::Details, error::Error, store::Identifier};
use futures_util::Stream;
use std::fmt::Debug;
use tracing::debug;
use uuid::Uuid;
@ -40,8 +41,49 @@ pub(crate) enum UploadResult {
Failure { message: String },
}
#[async_trait::async_trait(?Send)]
pub(crate) trait FullRepo:
UploadRepo
+ SettingsRepo
+ IdentifierRepo
+ AliasRepo
+ QueueRepo
+ HashRepo
+ Send
+ Sync
+ Clone
+ Debug
{
async fn identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
) -> Result<I, Error> {
let hash = self.hash(alias).await?;
self.identifier(hash).await
}
async fn aliases_from_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let hash = self.hash(alias).await?;
self.aliases(hash).await
}
async fn still_identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
) -> Result<Option<I>, Error> {
let hash = self.hash(alias).await?;
let identifier = self.identifier::<I>(hash.clone()).await?;
match self.details(&identifier).await? {
Some(details) if details.is_motion() => self.motion_identifier::<I>(hash).await,
Some(_) => Ok(Some(identifier)),
None => Ok(None),
}
}
}
pub(crate) trait BaseRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Bytes: AsRef<[u8]> + From<Vec<u8>> + Clone;
}
#[async_trait::async_trait(?Send)]
@ -396,6 +438,12 @@ impl UploadId {
}
}
impl From<Uuid> for UploadId {
fn from(id: Uuid) -> Self {
Self { id }
}
}
impl std::fmt::Display for MaybeUuid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -405,6 +453,14 @@ impl std::fmt::Display for MaybeUuid {
}
}
impl std::str::FromStr for DeleteToken {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(DeleteToken::from_existing(s))
}
}
impl std::fmt::Display for DeleteToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)

View file

@ -1,8 +1,8 @@
use crate::{
error::Error,
repo::{
Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier,
IdentifierRepo, QueueRepo, SettingsRepo,
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
},
stream::from_iterator,
};
@ -15,8 +15,6 @@ use std::{
};
use tokio::sync::Notify;
use super::BaseRepo;
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
@ -85,6 +83,23 @@ impl BaseRepo for SledRepo {
type Bytes = IVec;
}
impl FullRepo for SledRepo {}
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
unimplemented!("DO THIS")
}
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
unimplemented!("DO THIS")
}
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
unimplemented!("DO THIS")
}
}
#[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error> {

View file

@ -1,366 +0,0 @@
use crate::{
config::ImageFormat,
details::Details,
error::{Error, UploadError},
ffmpeg::{InputFormat, ThumbnailFormat},
magick::details_hint,
repo::{
sled::SledRepo, Alias, AliasRepo, BaseRepo, DeleteToken, HashRepo, IdentifierRepo, Repo,
SettingsRepo,
},
store::{Identifier, Store},
};
use futures_util::StreamExt;
use sha2::Digest;
use std::sync::Arc;
use tracing::instrument;
mod hasher;
mod session;
pub(super) use session::UploadManagerSession;
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
#[derive(Clone)]
pub(crate) struct UploadManager {
inner: Arc<UploadManagerInner>,
}
pub(crate) struct UploadManagerInner {
format: Option<ImageFormat>,
hasher: sha2::Sha256,
repo: Repo,
}
impl UploadManager {
pub(crate) fn repo(&self) -> &Repo {
&self.inner.repo
}
/// Create a new UploadManager
pub(crate) async fn new(repo: Repo, format: Option<ImageFormat>) -> Result<Self, Error> {
let manager = UploadManager {
inner: Arc::new(UploadManagerInner {
format,
hasher: sha2::Sha256::new(),
repo,
}),
};
Ok(manager)
}
pub(crate) async fn migrate_store<S1, S2>(&self, from: S1, to: S2) -> Result<(), Error>
where
S1: Store,
S2: Store,
{
match self.inner.repo {
Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await,
}
}
pub(crate) async fn still_identifier_from_alias<S: Store + Clone + 'static>(
&self,
store: S,
alias: &Alias,
) -> Result<S::Identifier, Error> {
let identifier = self.identifier_from_alias::<S>(alias).await?;
let details = if let Some(details) = self.details(&identifier).await? {
details
} else {
let hint = details_hint(alias);
Details::from_store(store.clone(), identifier.clone(), hint).await?
};
if !details.is_motion() {
return Ok(identifier);
}
if let Some(motion_identifier) = self.motion_identifier::<S>(alias).await? {
return Ok(motion_identifier);
}
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let mut reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,
InputFormat::Mp4,
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = store.save_async_read(&mut reader).await?;
drop(permit);
self.store_motion_identifier(alias, &motion_identifier)
.await?;
Ok(motion_identifier)
}
async fn motion_identifier<S: Store>(
&self,
alias: &Alias,
) -> Result<Option<S::Identifier>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.motion_identifier(hash).await?)
}
}
}
async fn store_motion_identifier<I: Identifier + 'static>(
&self,
alias: &Alias,
identifier: &I,
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.relate_motion_identifier(hash, identifier).await?)
}
}
}
#[instrument(skip(self))]
pub(crate) async fn identifier_from_alias<S: Store>(
&self,
alias: &Alias,
) -> Result<S::Identifier, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.identifier(hash).await?)
}
}
}
#[instrument(skip(self))]
async fn store_identifier<I: Identifier>(
&self,
hash: Vec<u8>,
identifier: &I,
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
Ok(sled_repo.relate_identifier(hash.into(), identifier).await?)
}
}
}
#[instrument(skip(self))]
pub(crate) async fn variant_identifier<S: Store>(
&self,
alias: &Alias,
process_path: &std::path::Path,
) -> Result<Option<S::Identifier>, Error> {
let variant = process_path.to_string_lossy().to_string();
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.variant_identifier(hash, variant).await?)
}
}
}
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_full_res<I: Identifier>(
&self,
alias: &Alias,
identifier: &I,
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.relate_identifier(hash, identifier).await?)
}
}
}
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant<I: Identifier>(
&self,
alias: &Alias,
variant_process_path: &std::path::Path,
identifier: &I,
) -> Result<(), Error> {
let variant = variant_process_path.to_string_lossy().to_string();
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo
.relate_variant_identifier(hash, variant, identifier)
.await?)
}
}
}
/// Get the image details for a given variant
#[instrument(skip(self))]
pub(crate) async fn details<I: Identifier>(
&self,
identifier: &I,
) -> Result<Option<Details>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?),
}
}
#[instrument(skip(self))]
pub(crate) async fn store_details<I: Identifier>(
&self,
identifier: &I,
details: &Details,
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => Ok(sled_repo.relate_details(identifier, details).await?),
}
}
/// Get a list of aliases for a given alias
pub(crate) async fn aliases_by_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.aliases(hash).await?)
}
}
}
/// Delete an alias without a delete token
pub(crate) async fn delete_without_token(&self, alias: Alias) -> Result<(), Error> {
let token = match self.inner.repo {
Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?,
};
self.delete(alias, token).await
}
/// Delete the alias, and the file & variants if no more aliases exist
#[instrument(skip(self, alias, token))]
pub(crate) async fn delete(&self, alias: Alias, token: DeleteToken) -> Result<(), Error> {
let hash = match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let saved_delete_token = sled_repo.delete_token(&alias).await?;
if saved_delete_token != token {
return Err(UploadError::InvalidToken.into());
}
let hash = sled_repo.hash(&alias).await?;
AliasRepo::cleanup(sled_repo, &alias).await?;
sled_repo.remove_alias(hash.clone(), &alias).await?;
hash.to_vec()
}
};
self.check_delete_files(hash).await
}
async fn check_delete_files(&self, hash: Vec<u8>) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash: <SledRepo as BaseRepo>::Bytes = hash.into();
let aliases = sled_repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
crate::queue::queue_cleanup(sled_repo, hash).await?;
}
}
Ok(())
}
pub(crate) fn session<S: Store + Clone + 'static>(&self, store: S) -> UploadManagerSession<S> {
UploadManagerSession::new(self.clone(), store)
}
}
async fn migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
where
S1: Store,
S2: Store,
{
let stream = from.to_stream(identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let new_identifier = to.save_async_read(&mut reader).await?;
Ok(new_identifier)
}
async fn migrate_details<R, I1, I2>(repo: &R, from: I1, to: &I2) -> Result<(), Error>
where
R: IdentifierRepo,
I1: Identifier,
I2: Identifier,
{
if let Some(details) = repo.details(&from).await? {
repo.relate_details(to, &details).await?;
repo.cleanup(&from).await?;
}
Ok(())
}
async fn do_migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
where
S1: Store,
S2: Store,
R: IdentifierRepo + HashRepo + SettingsRepo,
{
let stream = repo.hashes().await;
let mut stream = Box::pin(stream);
while let Some(hash) = stream.next().await {
let hash = hash?;
if let Some(identifier) = repo
.motion_identifier(hash.as_ref().to_vec().into())
.await?
{
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
}
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? {
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?;
}
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?;
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
.await?;
}
// clean up the migration key to avoid interfering with future migrations
repo.remove(STORE_MIGRATION_PROGRESS).await?;
Ok(())
}
impl std::fmt::Debug for UploadManager {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager").finish()
}
}

View file

@ -1,266 +0,0 @@
use crate::{
error::{Error, UploadError},
magick::ValidInputType,
repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, HashRepo, IdentifierRepo, Repo},
store::Store,
upload_manager::{
hasher::{Hash, Hasher},
UploadManager,
},
};
use actix_web::web;
use futures_util::stream::{Stream, StreamExt};
use tracing::{debug, instrument, Span};
use tracing_futures::Instrument;
pub(crate) struct UploadManagerSession<S: Store + Clone + 'static> {
store: S,
manager: UploadManager,
alias: Option<Alias>,
finished: bool,
}
impl<S: Store + Clone + 'static> UploadManagerSession<S> {
pub(super) fn new(manager: UploadManager, store: S) -> Self {
UploadManagerSession {
store,
manager,
alias: None,
finished: false,
}
}
pub(crate) fn succeed(mut self) {
self.finished = true;
}
pub(crate) fn alias(&self) -> Option<&Alias> {
self.alias.as_ref()
}
}
impl<S: Store + Clone + 'static> Drop for UploadManagerSession<S> {
fn drop(&mut self) {
if self.finished {
return;
}
if let Some(alias) = self.alias.take() {
let store = self.store.clone();
let manager = self.manager.clone();
let cleanup_span = tracing::info_span!(
parent: None,
"Upload cleanup",
alias = &tracing::field::display(&alias),
);
cleanup_span.follows_from(Span::current());
actix_rt::spawn(
async move {
// undo alias -> hash mapping
match manager.inner.repo {
Repo::Sled(ref sled_repo) => {
if let Ok(hash) = sled_repo.hash(&alias).await {
debug!("Clean alias repo");
let _ = AliasRepo::cleanup(sled_repo, &alias).await;
if let Ok(identifier) = sled_repo.identifier(hash.clone()).await {
debug!("Clean identifier repo");
let _ = IdentifierRepo::cleanup(sled_repo, &identifier).await;
debug!("Remove stored files");
let _ = store.remove(&identifier).await;
}
debug!("Clean hash repo");
let _ = HashRepo::cleanup(sled_repo, hash).await;
}
}
}
}
.instrument(cleanup_span),
);
}
}
}
impl<S: Store> UploadManagerSession<S> {
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
let delete_token = DeleteToken::generate();
debug!("Saving delete token");
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
let res = sled_repo.relate_delete_token(&alias, &delete_token).await?;
Ok(if res.is_err() {
let delete_token = sled_repo.delete_token(&alias).await?;
debug!("Returning existing delete token, {:?}", delete_token);
delete_token
} else {
debug!("Returning new delete token, {:?}", delete_token);
delete_token
})
}
}
}
/// Import the file, discarding bytes if it's already present, or saving if it's new
pub(crate) async fn import(
mut self,
alias: String,
validate: bool,
enable_silent_video: bool,
mut stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin,
) -> Result<Self, Error> {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (_, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
self.manager.inner.format,
enable_silent_video,
validate,
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
self.add_existing_alias(&hash, alias).await?;
debug!("Saving file");
self.save_upload(&identifier, hash).await?;
// Return alias to file
Ok(self)
}
/// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))]
pub(crate) async fn upload(
mut self,
enable_silent_video: bool,
mut stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin,
) -> Result<Self, Error> {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (input_type, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
self.manager.inner.format,
enable_silent_video,
true,
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
self.add_alias(&hash, input_type).await?;
debug!("Saving file");
self.save_upload(&identifier, hash).await?;
// Return alias to file
Ok(self)
}
// check duplicates & store image if new
#[instrument(skip(self, hash))]
async fn save_upload(&self, identifier: &S::Identifier, hash: Hash) -> Result<(), Error> {
let res = self.check_duplicate(&hash).await?;
// bail early with alias to existing file if this is a duplicate
if res.is_err() {
debug!("Duplicate exists, removing file");
self.store.remove(identifier).await?;
return Ok(());
}
self.manager
.store_identifier(hash.into_inner(), identifier)
.await?;
Ok(())
}
// check for an already-uploaded image with this hash, returning the path to the target file
#[instrument(skip(self, hash))]
async fn check_duplicate(&self, hash: &Hash) -> Result<Result<(), AlreadyExists>, Error> {
let hash = hash.as_slice().to_vec();
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => Ok(HashRepo::create(sled_repo, hash.into()).await?),
}
}
// Add an alias from an existing filename
async fn add_existing_alias(&mut self, hash: &Hash, filename: String) -> Result<(), Error> {
let alias = Alias::from_existing(&filename);
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
AliasRepo::create(sled_repo, &alias)
.await?
.map_err(|_| UploadError::DuplicateAlias)?;
self.alias = Some(alias.clone());
let hash = hash.as_slice().to_vec();
sled_repo.relate_hash(&alias, hash.clone().into()).await?;
sled_repo.relate_alias(hash.into(), &alias).await?;
}
}
Ok(())
}
// Add an alias to an existing file
//
// This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument(skip(self, hash, input_type))]
async fn add_alias(&mut self, hash: &Hash, input_type: ValidInputType) -> Result<(), Error> {
loop {
debug!("Alias gen loop");
let alias = Alias::generate(input_type.as_ext().to_string());
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
let res = AliasRepo::create(sled_repo, &alias).await?;
if res.is_ok() {
self.alias = Some(alias.clone());
let hash = hash.as_slice().to_vec();
sled_repo.relate_hash(&alias, hash.clone().into()).await?;
sled_repo.relate_alias(hash.into(), &alias).await?;
return Ok(());
}
}
};
debug!("Alias exists, regenning");
}
}
}