Enable thumbnailing animations, use dynamic dispatch for a number of async readers

This commit is contained in:
asonix 2023-09-24 11:54:16 -05:00
parent cde4a72203
commit 6c921817e1
17 changed files with 351 additions and 239 deletions

View file

@ -79,10 +79,6 @@ impl Details {
} }
} }
pub(crate) fn is_video(&self) -> bool {
self.inner.content_type.type_() == "video"
}
pub(crate) fn created_at(&self) -> time::OffsetDateTime { pub(crate) fn created_at(&self) -> time::OffsetDateTime {
self.inner.created_at.timestamp self.inner.created_at.timestamp
} }

View file

@ -1,18 +1,4 @@
use std::sync::Arc; use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError};
use crate::{
error_code::ErrorCode,
formats::InternalVideoFormat,
process::{Process, ProcessError},
store::{Store, StoreError},
};
use tokio::io::AsyncRead;
#[derive(Clone, Copy, Debug)]
pub(crate) enum ThumbnailFormat {
Jpeg,
// Webp,
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum FfMpegError { pub(crate) enum FfMpegError {
@ -100,103 +86,3 @@ impl FfMpegError {
false false
} }
} }
impl ThumbnailFormat {
const fn as_ffmpeg_codec(self) -> &'static str {
match self {
Self::Jpeg => "mjpeg",
// Self::Webp => "webp",
}
}
const fn to_file_extension(self) -> &'static str {
match self {
Self::Jpeg => ".jpeg",
// Self::Webp => ".webp",
}
}
const fn as_ffmpeg_format(self) -> &'static str {
match self {
Self::Jpeg => "image2",
// Self::Webp => "webp",
}
}
pub(crate) fn media_type(self) -> mime::Mime {
match self {
Self::Jpeg => mime::IMAGE_JPEG,
// Self::Webp => crate::formats::mimes::image_webp(),
}
}
}
#[tracing::instrument(skip(store))]
pub(crate) async fn thumbnail<S: Store>(
store: S,
from: Arc<str>,
input_format: InternalVideoFormat,
format: ThumbnailFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension()));
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(FfMpegError::CreateDir)?;
let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension()));
let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&output_file)
.await
.map_err(FfMpegError::CreateDir)?;
let mut tmp_one = crate::file::File::create(&input_file)
.await
.map_err(FfMpegError::CreateFile)?;
let stream = store
.to_stream(&from, None, None)
.await
.map_err(FfMpegError::Store)?;
tmp_one
.write_from_stream(stream)
.await
.map_err(FfMpegError::Write)?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let process = Process::run(
"ffmpeg",
&[
"-hide_banner",
"-v",
"warning",
"-i",
input_file_str,
"-frames:v",
"1",
"-codec",
format.as_ffmpeg_codec(),
"-f",
format.as_ffmpeg_format(),
output_file_str,
],
timeout,
)?;
process.wait().await?;
tokio::fs::remove_file(input_file)
.await
.map_err(FfMpegError::RemoveFile)?;
let tmp_two = crate::file::File::open(&output_file)
.await
.map_err(FfMpegError::OpenFile)?;
let stream = tmp_two
.read_to_stream(None, None)
.await
.map_err(FfMpegError::ReadFile)?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);
Ok(Box::pin(clean_reader))
}

View file

@ -166,12 +166,18 @@ impl ProcessableFormat {
} }
} }
pub(crate) fn process_to(self, output: InputProcessableFormat) -> Option<Self> { pub(crate) const fn process_to(self, output: InputProcessableFormat) -> Option<Self> {
match (self, output) { match (self, output) {
(Self::Image(_), InputProcessableFormat::Avif) => Some(Self::Image(ImageFormat::Avif)), (Self::Image(_), InputProcessableFormat::Avif) => Some(Self::Image(ImageFormat::Avif)),
(Self::Image(_), InputProcessableFormat::Jpeg) => Some(Self::Image(ImageFormat::Jpeg)), (Self::Image(_) | Self::Animation(_), InputProcessableFormat::Jpeg) => {
(Self::Image(_), InputProcessableFormat::Jxl) => Some(Self::Image(ImageFormat::Jxl)), Some(Self::Image(ImageFormat::Jpeg))
(Self::Image(_), InputProcessableFormat::Png) => Some(Self::Image(ImageFormat::Png)), }
(Self::Image(_) | Self::Animation(_), InputProcessableFormat::Jxl) => {
Some(Self::Image(ImageFormat::Jxl))
}
(Self::Image(_) | Self::Animation(_), InputProcessableFormat::Png) => {
Some(Self::Image(ImageFormat::Png))
}
(Self::Image(_), InputProcessableFormat::Webp) => Some(Self::Image(ImageFormat::Webp)), (Self::Image(_), InputProcessableFormat::Webp) => Some(Self::Image(ImageFormat::Webp)),
(Self::Animation(_), InputProcessableFormat::Apng) => { (Self::Animation(_), InputProcessableFormat::Apng) => {
Some(Self::Animation(AnimationFormat::Apng)) Some(Self::Animation(AnimationFormat::Apng))
@ -187,11 +193,12 @@ impl ProcessableFormat {
} }
(Self::Image(_), InputProcessableFormat::Apng) => None, (Self::Image(_), InputProcessableFormat::Apng) => None,
(Self::Image(_), InputProcessableFormat::Gif) => None, (Self::Image(_), InputProcessableFormat::Gif) => None,
(Self::Animation(_), InputProcessableFormat::Jpeg) => None,
(Self::Animation(_), InputProcessableFormat::Jxl) => None,
(Self::Animation(_), InputProcessableFormat::Png) => None,
} }
} }
pub(crate) const fn should_thumbnail(self, output: Self) -> bool {
matches!((self, output), (Self::Animation(_), Self::Image(_)))
}
} }
impl FromStr for InputProcessableFormat { impl FromStr for InputProcessableFormat {

View file

@ -90,7 +90,7 @@ impl ImageFormat {
} }
} }
pub(super) fn media_type(self) -> mime::Mime { pub(crate) fn media_type(self) -> mime::Mime {
match self { match self {
Self::Avif => super::mimes::image_avif(), Self::Avif => super::mimes::image_avif(),
Self::Jpeg => mime::IMAGE_JPEG, Self::Jpeg => mime::IMAGE_JPEG,

View file

@ -1,14 +1,16 @@
mod ffmpeg;
mod magick;
use crate::{ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
details::Details, details::Details,
error::{Error, UploadError}, error::{Error, UploadError},
ffmpeg::ThumbnailFormat, formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
formats::{InputProcessableFormat, InternalVideoFormat}, repo::{ArcRepo, Hash, VariantAlreadyExists},
repo::{Alias, ArcRepo, Hash, VariantAlreadyExists},
store::Store, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use std::{path::PathBuf, time::Instant}; use std::{path::PathBuf, sync::Arc, time::Instant};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tracing::Instrument; use tracing::Instrument;
@ -45,11 +47,9 @@ pub(crate) async fn generate<S: Store + 'static>(
store: &S, store: &S,
process_map: &ProcessMap, process_map: &ProcessMap,
format: InputProcessableFormat, format: InputProcessableFormat,
alias: Alias,
thumbnail_path: PathBuf, thumbnail_path: PathBuf,
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<InternalVideoFormat>, original_details: &Details,
thumbnail_format: Option<ThumbnailFormat>,
media: &crate::config::Media, media: &crate::config::Media,
hash: Hash, hash: Hash,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
@ -57,11 +57,9 @@ pub(crate) async fn generate<S: Store + 'static>(
repo, repo,
store, store,
format, format,
alias,
thumbnail_path.clone(), thumbnail_path.clone(),
thumbnail_args, thumbnail_args,
input_format, original_details,
thumbnail_format,
media, media,
hash.clone(), hash.clone(),
); );
@ -79,44 +77,24 @@ async fn process<S: Store + 'static>(
repo: &ArcRepo, repo: &ArcRepo,
store: &S, store: &S,
output_format: InputProcessableFormat, output_format: InputProcessableFormat,
alias: Alias,
thumbnail_path: PathBuf, thumbnail_path: PathBuf,
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<InternalVideoFormat>, original_details: &Details,
thumbnail_format: Option<ThumbnailFormat>,
media: &crate::config::Media, media: &crate::config::Media,
hash: Hash, hash: Hash,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let guard = MetricsGuard::guard(); let guard = MetricsGuard::guard();
let permit = crate::PROCESS_SEMAPHORE.acquire().await; let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let identifier = if let Some(identifier) = repo.still_identifier_from_alias(&alias).await? { let identifier = input_identifier(
identifier repo,
} else { store,
let Some(identifier) = repo.identifier(hash.clone()).await? else { output_format,
return Err(UploadError::MissingIdentifier.into()); hash.clone(),
}; original_details,
media,
let thumbnail_format = thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg); )
.await?;
let reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,
input_format.unwrap_or(InternalVideoFormat::Mp4),
thumbnail_format,
media.process_timeout,
)
.await?;
let motion_identifier = store
.save_async_read(reader, thumbnail_format.media_type())
.await?;
repo.relate_motion_identifier(hash.clone(), &motion_identifier)
.await?;
motion_identifier
};
let input_details = if let Some(details) = repo.details(&identifier).await? { let input_details = if let Some(details) = repo.details(&identifier).await? {
details details
@ -133,13 +111,13 @@ async fn process<S: Store + 'static>(
.processable_format() .processable_format()
.expect("Already verified format is processable"); .expect("Already verified format is processable");
let Some(format) = input_format.process_to(output_format) else { let format = input_format
return Err(UploadError::InvalidProcessExtension.into()); .process_to(output_format)
}; .ok_or(UploadError::InvalidProcessExtension)?;
let quality = match format { let quality = match format {
crate::formats::ProcessableFormat::Image(format) => media.image.quality_for(format), ProcessableFormat::Image(format) => media.image.quality_for(format),
crate::formats::ProcessableFormat::Animation(format) => media.animation.quality_for(format), ProcessableFormat::Animation(format) => media.animation.quality_for(format),
}; };
let mut processed_reader = crate::magick::process_image_store_read( let mut processed_reader = crate::magick::process_image_store_read(
@ -185,3 +163,84 @@ async fn process<S: Store + 'static>(
Ok((details, bytes)) as Result<(Details, Bytes), Error> Ok((details, bytes)) as Result<(Details, Bytes), Error>
} }
#[tracing::instrument(skip_all)]
async fn input_identifier<S>(
repo: &ArcRepo,
store: &S,
output_format: InputProcessableFormat,
hash: Hash,
original_details: &Details,
media: &crate::config::Media,
) -> Result<Arc<str>, Error>
where
S: Store + 'static,
{
let should_thumbnail =
if let Some(input_format) = original_details.internal_format().processable_format() {
let output_format = input_format
.process_to(output_format)
.ok_or(UploadError::InvalidProcessExtension)?;
input_format.should_thumbnail(output_format)
} else {
// video case
true
};
if should_thumbnail {
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
return Ok(identifier);
};
let identifier = repo
.identifier(hash.clone())
.await?
.ok_or(UploadError::MissingIdentifier)?;
let (reader, media_type) = if let Some(processable_format) =
original_details.internal_format().processable_format()
{
let thumbnail_format = ImageFormat::Jpeg;
let reader = magick::thumbnail(
store,
&identifier,
processable_format,
ProcessableFormat::Image(thumbnail_format),
media.image.quality_for(thumbnail_format),
media.process_timeout,
)
.await?;
(reader, thumbnail_format.media_type())
} else {
let thumbnail_format = ffmpeg::ThumbnailFormat::Jpeg;
let reader = ffmpeg::thumbnail(
store.clone(),
identifier,
original_details
.video_format()
.unwrap_or(InternalVideoFormat::Mp4),
thumbnail_format,
media.process_timeout,
)
.await?;
(reader, thumbnail_format.media_type())
};
let motion_identifier = store.save_async_read(reader, media_type).await?;
repo.relate_motion_identifier(hash, &motion_identifier)
.await?;
return Ok(motion_identifier);
}
repo.identifier(hash)
.await?
.ok_or(UploadError::MissingIdentifier)
.map_err(From::from)
}

112
src/generate/ffmpeg.rs Normal file
View file

@ -0,0 +1,112 @@
use std::sync::Arc;
use crate::{
ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead,
store::Store,
};
#[derive(Clone, Copy, Debug)]
pub(super) enum ThumbnailFormat {
Jpeg,
// Webp,
}
impl ThumbnailFormat {
const fn as_ffmpeg_codec(self) -> &'static str {
match self {
Self::Jpeg => "mjpeg",
// Self::Webp => "webp",
}
}
const fn to_file_extension(self) -> &'static str {
match self {
Self::Jpeg => ".jpeg",
// Self::Webp => ".webp",
}
}
const fn as_ffmpeg_format(self) -> &'static str {
match self {
Self::Jpeg => "image2",
// Self::Webp => "webp",
}
}
pub(crate) fn media_type(self) -> mime::Mime {
match self {
Self::Jpeg => mime::IMAGE_JPEG,
// Self::Webp => crate::formats::mimes::image_webp(),
}
}
}
#[tracing::instrument(skip(store))]
pub(super) async fn thumbnail<S: Store>(
store: S,
from: Arc<str>,
input_format: InternalVideoFormat,
format: ThumbnailFormat,
timeout: u64,
) -> Result<BoxRead<'static>, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension()));
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(FfMpegError::CreateDir)?;
let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension()));
let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&output_file)
.await
.map_err(FfMpegError::CreateDir)?;
let mut tmp_one = crate::file::File::create(&input_file)
.await
.map_err(FfMpegError::CreateFile)?;
let stream = store
.to_stream(&from, None, None)
.await
.map_err(FfMpegError::Store)?;
tmp_one
.write_from_stream(stream)
.await
.map_err(FfMpegError::Write)?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let process = Process::run(
"ffmpeg",
&[
"-hide_banner",
"-v",
"warning",
"-i",
input_file_str,
"-frames:v",
"1",
"-codec",
format.as_ffmpeg_codec(),
"-f",
format.as_ffmpeg_format(),
output_file_str,
],
timeout,
)?;
process.wait().await?;
tokio::fs::remove_file(input_file)
.await
.map_err(FfMpegError::RemoveFile)?;
let tmp_two = crate::file::File::open(&output_file)
.await
.map_err(FfMpegError::OpenFile)?;
let stream = tmp_two
.read_to_stream(None, None)
.await
.map_err(FfMpegError::ReadFile)?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);
Ok(Box::pin(clean_reader))
}

81
src/generate/magick.rs Normal file
View file

@ -0,0 +1,81 @@
use std::sync::Arc;
use crate::{
formats::ProcessableFormat, magick::MagickError, process::Process, read::BoxRead, store::Store,
};
async fn thumbnail_animation<F, Fut>(
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
write_file: F,
) -> Result<BoxRead<'static>, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
{
let input_file = crate::tmp_file::tmp_file(None);
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(MagickError::CreateDir)?;
let tmp_one = crate::file::File::create(&input_file)
.await
.map_err(MagickError::CreateFile)?;
let tmp_one = (write_file)(tmp_one).await?;
tmp_one.close().await.map_err(MagickError::CloseFile)?;
let input_arg = format!("{}:{input_file_str}[0]", input_format.magick_format());
let output_arg = format!("{}:-", format.magick_format());
let quality = quality.map(|q| q.to_string());
let len = format.coalesce().then(|| 4).unwrap_or(3) + quality.is_some().then(|| 1).unwrap_or(0);
let mut args: Vec<&str> = Vec::with_capacity(len);
args.push("convert");
args.push(&input_arg);
if format.coalesce() {
args.push("-coalesce");
}
if let Some(quality) = &quality {
args.extend(["-quality", quality]);
}
args.push(&output_arg);
let reader = Process::run("magick", &args, timeout)?.read();
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file);
Ok(Box::pin(clean_reader))
}
pub(super) async fn thumbnail<S: Store + 'static>(
store: &S,
identifier: &Arc<str>,
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
) -> Result<BoxRead<'static>, MagickError> {
let stream = store
.to_stream(identifier, None, None)
.await
.map_err(MagickError::Store)?;
thumbnail_animation(
input_format,
format,
quality,
timeout,
|mut tmp_file| async move {
tmp_file
.write_from_stream(stream)
.await
.map_err(MagickError::Write)?;
Ok(tmp_file)
},
)
.await
}

View file

@ -2,7 +2,6 @@ use std::{sync::Arc, time::Duration};
use crate::{ use crate::{
bytes_stream::BytesStream, bytes_stream::BytesStream,
either::Either,
error::{Error, UploadError}, error::{Error, UploadError},
formats::{InternalFormat, Validations}, formats::{InternalFormat, Validations},
future::WithMetrics, future::WithMetrics,
@ -93,12 +92,12 @@ where
) )
.await?; .await?;
Either::left(processed_reader) processed_reader
} else { } else {
Either::right(validated_reader) validated_reader
} }
} else { } else {
Either::right(validated_reader) validated_reader
}; };
let hasher_reader = Hasher::new(processed_reader); let hasher_reader = Hasher::new(processed_reader);

View file

@ -22,6 +22,7 @@ mod process;
mod processor; mod processor;
mod queue; mod queue;
mod range; mod range;
mod read;
mod repo; mod repo;
mod repo_04; mod repo_04;
mod serde_str; mod serde_str;
@ -917,11 +918,9 @@ async fn process<S: Store + 'static>(
&store, &store,
&process_map, &process_map,
format, format,
alias,
thumbnail_path, thumbnail_path,
thumbnail_args, thumbnail_args,
original_details.video_format(), &original_details,
None,
&config.media, &config.media,
hash, hash,
) )
@ -1087,7 +1086,7 @@ async fn details_query<S: Store + 'static>(
let Some(alias) = repo.related(proxy).await? else { let Some(alias) = repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().json(&serde_json::json!({ return Ok(HttpResponse::NotFound().json(&serde_json::json!({
"msg": "Provided proxy URL has not been cached", "msg": "Provided proxy URL has not been cached",
}))) })));
}; };
alias alias
} }
@ -1193,9 +1192,7 @@ async fn do_serve<S: Store + 'static>(
}; };
let Some(identifier) = repo.identifier(hash.clone()).await? else { let Some(identifier) = repo.identifier(hash.clone()).await? else {
tracing::warn!( tracing::warn!("Original File identifier for hash {hash:?} is missing, queue cleanup task",);
"Original File identifier for hash {hash:?} is missing, queue cleanup task",
);
crate::queue::cleanup_hash(&repo, hash).await?; crate::queue::cleanup_hash(&repo, hash).await?;
return Ok(HttpResponse::NotFound().finish()); return Ok(HttpResponse::NotFound().finish());
}; };

View file

@ -4,8 +4,10 @@ use crate::{
error_code::ErrorCode, error_code::ErrorCode,
formats::ProcessableFormat, formats::ProcessableFormat,
process::{Process, ProcessError}, process::{Process, ProcessError},
read::BoxRead,
store::Store, store::Store,
}; };
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -94,7 +96,7 @@ async fn process_image<F, Fut>(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
write_file: F, write_file: F,
) -> Result<impl AsyncRead + Unpin, MagickError> ) -> Result<BoxRead<'static>, MagickError>
where where
F: FnOnce(crate::file::File) -> Fut, F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>, Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
@ -115,11 +117,9 @@ where
let output_arg = format!("{}:-", format.magick_format()); let output_arg = format!("{}:-", format.magick_format());
let quality = quality.map(|q| q.to_string()); let quality = quality.map(|q| q.to_string());
let len = if format.coalesce() { let len = format.coalesce().then(|| 4).unwrap_or(3)
process_args.len() + 4 + quality.is_some().then(|| 1).unwrap_or(0)
} else { + process_args.len();
process_args.len() + 3
};
let mut args: Vec<&str> = Vec::with_capacity(len); let mut args: Vec<&str> = Vec::with_capacity(len);
args.push("convert"); args.push("convert");
@ -148,7 +148,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
let stream = store let stream = store
.to_stream(identifier, None, None) .to_stream(identifier, None, None)
.await .await
@ -178,7 +178,7 @@ pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
process_image( process_image(
args, args,
input_format, input_format,

View file

@ -199,11 +199,9 @@ async fn generate<S: Store + 'static>(
store, store,
process_map, process_map,
target_format, target_format,
source,
process_path, process_path,
process_args, process_args,
original_details.video_format(), &original_details,
None,
&config.media, &config.media,
hash, hash,
) )

1
src/read.rs Normal file
View file

@ -0,0 +1 @@
pub(crate) type BoxRead<'a> = std::pin::Pin<Box<dyn tokio::io::AsyncRead + 'a>>;

View file

@ -130,28 +130,6 @@ pub(crate) trait FullRepo:
self.aliases_for_hash(hash).await self.aliases_for_hash(hash).await
} }
#[tracing::instrument(skip(self))]
async fn still_identifier_from_alias(
&self,
alias: &Alias,
) -> Result<Option<Arc<str>>, RepoError> {
let Some(hash) = self.hash(alias).await? else {
return Ok(None);
};
let Some(identifier) = self.identifier(hash.clone()).await? else {
return Ok(None);
};
match self.details(&identifier).await? {
Some(details) if details.is_video() => {
self.motion_identifier(hash).await.map_err(From::from)
}
Some(_) => Ok(Some(identifier)),
None => Ok(None),
}
}
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]

View file

@ -4,16 +4,15 @@ mod magick;
use crate::{ use crate::{
discover::Discovery, discover::Discovery,
either::Either,
error::Error, error::Error,
error_code::ErrorCode, error_code::ErrorCode,
formats::{ formats::{
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
InternalFormat, Validations, InternalFormat, Validations,
}, },
read::BoxRead,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use tokio::io::AsyncRead;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ValidationError { pub(crate) enum ValidationError {
@ -60,7 +59,7 @@ pub(crate) async fn validate_bytes(
bytes: Bytes, bytes: Bytes,
validations: Validations<'_>, validations: Validations<'_>,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { ) -> Result<(InternalFormat, BoxRead<'static>), Error> {
if bytes.is_empty() { if bytes.is_empty() {
return Err(ValidationError::Empty.into()); return Err(ValidationError::Empty.into());
} }
@ -77,7 +76,7 @@ pub(crate) async fn validate_bytes(
let (format, read) = let (format, read) =
process_image(bytes, *input, width, height, validations.image, timeout).await?; process_image(bytes, *input, width, height, validations.image, timeout).await?;
Ok((format, Either::left(read))) Ok((format, read))
} }
InputFile::Animation(input) => { InputFile::Animation(input) => {
let (format, read) = process_animation( let (format, read) = process_animation(
@ -91,7 +90,7 @@ pub(crate) async fn validate_bytes(
) )
.await?; .await?;
Ok((format, Either::right(Either::left(read)))) Ok((format, read))
} }
InputFile::Video(input) => { InputFile::Video(input) => {
let (format, read) = process_video( let (format, read) = process_video(
@ -105,7 +104,7 @@ pub(crate) async fn validate_bytes(
) )
.await?; .await?;
Ok((format, Either::right(Either::right(read)))) Ok((format, read))
} }
} }
} }
@ -118,7 +117,7 @@ async fn process_image(
height: u16, height: u16,
validations: &crate::config::Image, validations: &crate::config::Image,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { ) -> Result<(InternalFormat, BoxRead<'static>), Error> {
if width > validations.max_width { if width > validations.max_width {
return Err(ValidationError::Width.into()); return Err(ValidationError::Width.into());
} }
@ -140,9 +139,9 @@ async fn process_image(
let read = if needs_transcode { let read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
Either::left(magick::convert_image(input.format, format, quality, timeout, bytes).await?) magick::convert_image(input.format, format, quality, timeout, bytes).await?
} else { } else {
Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?) exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
Ok((InternalFormat::Image(format), read)) Ok((InternalFormat::Image(format), read))
@ -183,7 +182,7 @@ async fn process_animation(
frames: u32, frames: u32,
validations: &crate::config::Animation, validations: &crate::config::Animation,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { ) -> Result<(InternalFormat, BoxRead<'static>), Error> {
validate_animation(bytes.len(), width, height, frames, validations)?; validate_animation(bytes.len(), width, height, frames, validations)?;
let AnimationOutput { let AnimationOutput {
@ -194,9 +193,9 @@ async fn process_animation(
let read = if needs_transcode { let read = if needs_transcode {
let quality = validations.quality_for(format); let quality = validations.quality_for(format);
Either::left(magick::convert_animation(input, format, quality, timeout, bytes).await?) magick::convert_animation(input, format, quality, timeout, bytes).await?
} else { } else {
Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?) exiftool::clear_metadata_bytes_read(bytes, timeout)?
}; };
Ok((InternalFormat::Animation(format), read)) Ok((InternalFormat::Animation(format), read))
@ -240,7 +239,7 @@ async fn process_video(
frames: u32, frames: u32,
validations: &crate::config::Video, validations: &crate::config::Video,
timeout: u64, timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { ) -> Result<(InternalFormat, BoxRead<'static>), Error> {
validate_video(bytes.len(), width, height, frames, validations)?; validate_video(bytes.len(), width, height, frames, validations)?;
let output = input.build_output( let output = input.build_output(

View file

@ -1,14 +1,13 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use crate::{exiftool::ExifError, process::Process}; use crate::{exiftool::ExifError, process::Process, read::BoxRead};
#[tracing::instrument(level = "trace", skip(input))] #[tracing::instrument(level = "trace", skip(input))]
pub(crate) fn clear_metadata_bytes_read( pub(crate) fn clear_metadata_bytes_read(
input: Bytes, input: Bytes,
timeout: u64, timeout: u64,
) -> Result<impl AsyncRead + Unpin, ExifError> { ) -> Result<BoxRead<'static>, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?; let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?;
Ok(process.bytes_read(input)) Ok(Box::pin(process.bytes_read(input)))
} }

View file

@ -1,10 +1,10 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use crate::{ use crate::{
ffmpeg::FfMpegError, ffmpeg::FfMpegError,
formats::{InputVideoFormat, OutputVideo}, formats::{InputVideoFormat, OutputVideo},
process::Process, process::Process,
read::BoxRead,
}; };
pub(super) async fn transcode_bytes( pub(super) async fn transcode_bytes(
@ -13,7 +13,7 @@ pub(super) async fn transcode_bytes(
crf: u8, crf: u8,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, FfMpegError> { ) -> Result<BoxRead<'static>, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(None); let input_file = crate::tmp_file::tmp_file(None);
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
crate::store::file_store::safe_create_parent(&input_file) crate::store::file_store::safe_create_parent(&input_file)

View file

@ -1,10 +1,10 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use crate::{ use crate::{
formats::{AnimationFormat, ImageFormat}, formats::{AnimationFormat, ImageFormat},
magick::MagickError, magick::MagickError,
process::Process, process::Process,
read::BoxRead,
}; };
pub(super) async fn convert_image( pub(super) async fn convert_image(
@ -13,7 +13,7 @@ pub(super) async fn convert_image(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
convert( convert(
input.magick_format(), input.magick_format(),
output.magick_format(), output.magick_format(),
@ -31,7 +31,7 @@ pub(super) async fn convert_animation(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
convert( convert(
input.magick_format(), input.magick_format(),
output.magick_format(), output.magick_format(),
@ -50,7 +50,7 @@ async fn convert(
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
bytes: Bytes, bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> { ) -> Result<BoxRead<'static>, MagickError> {
let input_file = crate::tmp_file::tmp_file(None); let input_file = crate::tmp_file::tmp_file(None);
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;