diff --git a/src/discover.rs b/src/discover.rs index c86e549..5142eae 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -25,6 +25,18 @@ pub(crate) struct DiscoveryLite { pub(crate) frames: Option, } +#[derive(Debug, thiserror::Error)] +pub(crate) enum DiscoverError { + #[error("No frames in uploaded media")] + NoFrames, + + #[error("Not all frames have same image format")] + FormatMismatch, + + #[error("Input file type {0} is unsupported")] + UnsupportedFileType(String), +} + pub(crate) async fn discover_bytes_lite( bytes: Bytes, ) -> Result { diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 3a3bc59..88b7562 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -6,6 +6,7 @@ use futures_util::Stream; use tokio::io::AsyncReadExt; use crate::{ + discover::DiscoverError, formats::{AnimationFormat, ImageFormat, ImageInput, InputFile, VideoFormat}, magick::MagickError, process::Process, @@ -77,8 +78,7 @@ pub(super) async fn confirm_bytes( ) -> Result { match discovery { Some(Discovery { - input: - InputFile::Animation( AnimationFormat::Avif,), + input: InputFile::Animation(AnimationFormat::Avif), width, height, .. @@ -92,15 +92,14 @@ pub(super) async fn confirm_bytes( .await?; return Ok(Discovery { - input: InputFile::Animation( AnimationFormat::Avif,), + input: InputFile::Animation(AnimationFormat::Avif), width, height, frames: Some(frames), }); } Some(Discovery { - input: - InputFile::Animation( AnimationFormat::Webp,), + input: InputFile::Animation(AnimationFormat::Webp), .. }) => { // continue @@ -151,6 +150,10 @@ where .await .map_err(MagickError::RemoveFile)?; + if output.is_empty() { + return Err(MagickError::Empty); + } + let lines: u32 = output .lines() .count() @@ -158,7 +161,7 @@ where .expect("Reasonable frame count"); if lines == 0 { - todo!("Error"); + return Err(MagickError::Empty); } Ok(lines) @@ -202,17 +205,21 @@ where .await .map_err(MagickError::RemoveFile)?; + if output.is_empty() { + return Err(MagickError::Empty); + } + let output: Vec = serde_json::from_slice(&output).map_err(MagickError::Json)?; - parse_discovery(output) + parse_discovery(output).map_err(MagickError::Discover) } -fn parse_discovery(output: Vec) -> Result { +fn parse_discovery(output: Vec) -> Result { let frames = output.len(); if frames == 0 { - todo!("Error") + return Err(DiscoverError::NoFrames); } let width = output @@ -250,7 +257,7 @@ fn parse_discovery(output: Vec) -> Result) -> Result { if frames > 1 { Ok(Discovery { - input: InputFile::Animation( AnimationFormat::Avif,), + input: InputFile::Animation(AnimationFormat::Avif), width, height, frames: Some(frames), @@ -277,13 +284,13 @@ fn parse_discovery(output: Vec) -> Result Ok(Discovery { - input: InputFile::Animation( AnimationFormat::Apng,), + input: InputFile::Animation(AnimationFormat::Apng), width, height, frames: Some(frames), }), "GIF" => Ok(Discovery { - input: InputFile::Animation( AnimationFormat::Gif,), + input: InputFile::Animation(AnimationFormat::Gif), width, height, frames: Some(frames), @@ -324,7 +331,7 @@ fn parse_discovery(output: Vec) -> Result { if frames > 1 { Ok(Discovery { - input: InputFile::Animation( AnimationFormat::Webp,), + input: InputFile::Animation(AnimationFormat::Webp), width, height, frames: Some(frames), @@ -347,6 +354,6 @@ fn parse_discovery(output: Vec) -> Result todo!("Error {otherwise}"), + otherwise => Err(DiscoverError::UnsupportedFileType(String::from(otherwise))), } } diff --git a/src/error.rs b/src/error.rs index 4ae479b..417ddfc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -78,6 +78,9 @@ pub(crate) enum UploadError { #[error("Error in exiftool")] Exiftool(#[from] crate::exiftool::ExifError), + #[error("Requested file extension cannot be served by source file")] + InvalidProcessExtension, + #[error("Provided process path is invalid")] ParsePath, @@ -170,7 +173,8 @@ impl ResponseError for Error { )) | UploadError::Repo(crate::repo::RepoError::AlreadyClaimed) | UploadError::Validation(_) - | UploadError::UnsupportedProcessExtension, + | UploadError::UnsupportedProcessExtension + | UploadError::InvalidProcessExtension, ) => StatusCode::BAD_REQUEST, Some(UploadError::Magick(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, diff --git a/src/generate.rs b/src/generate.rs index 765e327..0a27ff0 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -93,16 +93,23 @@ async fn process( details }; - let Some(format) = input_details.internal_format().and_then(|format| format.processable_format()) else { - todo!("Error") + let input_format = input_details + .internal_format() + .and_then(|format| format.processable_format()) + .expect("Valid details should always have internal format"); + + let Some(format) = input_format.process_to(output_format) else { + return Err(UploadError::InvalidProcessExtension.into()); }; - let Some(format) = format.process_to(output_format) else { - todo!("Error") - }; - - let mut processed_reader = - crate::magick::process_image_store_read(store.clone(), identifier, thumbnail_args, format)?; + let mut processed_reader = crate::magick::process_image_store_read( + store, + &identifier, + thumbnail_args, + input_format, + format, + ) + .await?; let mut vec = Vec::new(); processed_reader diff --git a/src/ingest.rs b/src/ingest.rs index 4f8c6d0..88c5fdb 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -40,7 +40,7 @@ where Ok(buf.into_bytes()) } -#[tracing::instrument(skip(repo, store, stream))] +#[tracing::instrument(skip(repo, store, stream, media))] pub(crate) async fn ingest( repo: &R, store: &S, @@ -56,7 +56,6 @@ where let bytes = aggregate(stream).await?; - // TODO: load from config let prescribed = Validations { image: &media.image, animation: &media.animation, @@ -71,8 +70,13 @@ where let (_, magick_args) = crate::processor::build_chain(operations, format.file_extension())?; - let processed_reader = - crate::magick::process_image_async_read(validated_reader, magick_args, format)?; + let processed_reader = crate::magick::process_image_async_read( + validated_reader, + magick_args, + format, + format, + ) + .await?; Either::left(processed_reader) } else { diff --git a/src/magick.rs b/src/magick.rs index 139a0e5..8a432d6 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -10,6 +10,9 @@ pub(crate) enum MagickError { #[error("Error in imagemagick process")] Process(#[source] ProcessError), + #[error("Error in store")] + Store(#[source] crate::store::StoreError), + #[error("Invalid output format")] Json(#[source] serde_json::Error), @@ -31,6 +34,12 @@ pub(crate) enum MagickError { #[error("Error removing file")] RemoveFile(#[source] std::io::Error), + #[error("Error in metadata discovery")] + Discover(#[source] crate::discover::DiscoverError), + + #[error("Command output is empty")] + Empty, + #[error("Invalid file path")] Path, } @@ -42,13 +51,30 @@ impl MagickError { } } -fn process_image( +async fn process_image( process_args: Vec, + input_format: ProcessableFormat, format: ProcessableFormat, -) -> Result { - let command = "magick"; - let convert_args = ["convert", "-"]; - let last_arg = format!("{}:-", format.magick_format()); + write_file: F, +) -> Result +where + F: FnOnce(crate::file::File) -> Fut, + Fut: std::future::Future>, +{ + let input_file = crate::tmp_file::tmp_file(None); + let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(MagickError::CreateDir)?; + + let tmp_one = crate::file::File::create(&input_file) + .await + .map_err(MagickError::CreateFile)?; + let tmp_one = (write_file)(tmp_one).await?; + tmp_one.close().await.map_err(MagickError::CloseFile)?; + + let input_arg = format!("{}:{input_file_str}", input_format.magick_format()); + let output_arg = format!("{}:-", format.magick_format()); let len = if format.coalesce() { process_args.len() + 4 @@ -56,34 +82,58 @@ fn process_image( process_args.len() + 3 }; - let mut args = Vec::with_capacity(len); - args.extend_from_slice(&convert_args[..]); + let mut args: Vec<&str> = Vec::with_capacity(len); + args.push("convert"); + args.push(&input_arg); args.extend(process_args.iter().map(|s| s.as_str())); if format.coalesce() { args.push("-coalesce"); } - args.push(&last_arg); + args.push(&output_arg); - Process::run(command, &args) + let reader = Process::run("magick", &args) + .map_err(MagickError::Process)? + .read(); + + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file); + + Ok(Box::pin(clean_reader)) } -pub(crate) fn process_image_store_read( - store: S, - identifier: S::Identifier, +pub(crate) async fn process_image_store_read( + store: &S, + identifier: &S::Identifier, args: Vec, + input_format: ProcessableFormat, format: ProcessableFormat, ) -> Result { - Ok(process_image(args, format) - .map_err(MagickError::Process)? - .store_read(store, identifier)) + let stream = store + .to_stream(identifier, None, None) + .await + .map_err(MagickError::Store)?; + + process_image(args, input_format, format, |mut tmp_file| async move { + tmp_file + .write_from_stream(stream) + .await + .map_err(MagickError::Write)?; + Ok(tmp_file) + }) + .await } -pub(crate) fn process_image_async_read( +pub(crate) async fn process_image_async_read( async_read: A, args: Vec, + input_format: ProcessableFormat, format: ProcessableFormat, ) -> Result { - Ok(process_image(args, format) - .map_err(MagickError::Process)? - .pipe_async_read(async_read)) + process_image(args, input_format, format, |mut tmp_file| async move { + tmp_file + .write_from_async_read(async_read) + .await + .map_err(MagickError::Write)?; + Ok(tmp_file) + }) + .await } diff --git a/src/process.rs b/src/process.rs index e2812da..3246b86 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,3 @@ -use crate::store::Store; use actix_rt::task::JoinHandle; use actix_web::web::Bytes; use std::{ @@ -18,6 +17,7 @@ use tracing::{Instrument, Span}; struct StatusError(ExitStatus); pub(crate) struct Process { + command: String, child: Child, } @@ -59,8 +59,8 @@ pub(crate) enum ProcessError { impl Process { pub(crate) fn run(command: &str, args: &[&str]) -> Result { - let res = tracing::trace_span!(parent: None, "Create command") - .in_scope(|| Self::spawn(Command::new(command).args(args))); + let res = tracing::trace_span!(parent: None, "Create command", %command) + .in_scope(|| Self::spawn(command, Command::new(command).args(args))); match res { Ok(this) => Ok(this), @@ -72,14 +72,17 @@ impl Process { } } - fn spawn(cmd: &mut Command) -> std::io::Result { - tracing::trace_span!(parent: None, "Spawn command").in_scope(|| { + fn spawn(command: &str, cmd: &mut Command) -> std::io::Result { + tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { let cmd = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .kill_on_drop(true); - cmd.spawn().map(|child| Process { child }) + cmd.spawn().map(|child| Process { + child, + command: String::from(command), + }) }) } @@ -105,30 +108,6 @@ impl Process { self.spawn_fn(|_| async { Ok(()) }) } - pub(crate) fn pipe_async_read( - self, - mut async_read: A, - ) -> impl AsyncRead + Unpin { - self.spawn_fn(move |mut stdin| async move { - tokio::io::copy(&mut async_read, &mut stdin) - .await - .map(|_| ()) - }) - } - - pub(crate) fn store_read( - self, - store: S, - identifier: S::Identifier, - ) -> impl AsyncRead + Unpin { - self.spawn_fn(move |mut stdin| { - let store = store; - let identifier = identifier; - - async move { store.read_into(&identifier, &mut stdin).await } - }) - } - #[allow(unknown_lints)] #[allow(clippy::let_with_type_underscore)] #[tracing::instrument(level = "trace", skip_all)] @@ -140,14 +119,15 @@ impl Process { let stdin = self.child.stdin.take().expect("stdin exists"); let stdout = self.child.stdout.take().expect("stdout exists"); - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %self.command) .in_scope(channel::); - let span = tracing::info_span!(parent: None, "Background process task"); + let span = tracing::info_span!(parent: None, "Background process task", %self.command); span.follows_from(Span::current()); let mut child = self.child; - let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + let command = self.command; + let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { actix_rt::spawn( async move { if let Err(e) = (f)(stdin).await { diff --git a/src/validate.rs b/src/validate.rs index 4028a7d..9a95d28 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -28,6 +28,9 @@ pub(crate) enum ValidationError { #[error("Too many frames")] Frames, + #[error("Uploaded file is empty")] + Empty, + #[error("Filesize too large")] Filesize, @@ -42,6 +45,10 @@ pub(crate) async fn validate_bytes( bytes: Bytes, validations: Validations<'_>, ) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { + if bytes.is_empty() { + return Err(ValidationError::Empty.into()); + } + let Discovery { input, width, @@ -112,7 +119,7 @@ async fn process_image( } = input.build_output(validations.format); let read = if needs_transcode { - Either::left(magick::convert_image(input.format, format, bytes)?) + Either::left(magick::convert_image(input.format, format, bytes).await?) } else { Either::right(exiftool::clear_metadata_bytes_read(bytes)?) }; @@ -163,7 +170,7 @@ async fn process_animation( } = input.build_output(validations.animation.format); let read = if needs_transcode { - Either::left(magick::convert_animation(input, format, bytes)?) + Either::left(magick::convert_animation(input, format, bytes).await?) } else { Either::right(Either::left(exiftool::clear_metadata_bytes_read(bytes)?)) }; @@ -177,7 +184,9 @@ async fn process_animation( validations.video.allow_audio, ); - let read = Either::right(Either::right(magick::convert_video(input, output, bytes)?)); + let read = Either::right(Either::right( + magick::convert_video(input, output, bytes).await?, + )); Ok((InternalFormat::Video(output.internal_format()), read)) } diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 483038e..942ebee 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -7,47 +7,79 @@ use crate::{ process::Process, }; -pub(super) fn convert_image( +pub(super) async fn convert_image( input: ImageFormat, output: ImageFormat, bytes: Bytes, ) -> Result { - let input_arg = format!("{}:-", input.magick_format()); - let output_arg = format!("{}:-", output.magick_format()); - - let process = Process::run( - "magick", - &["-strip", "-auto-orient", &input_arg, &output_arg], - ) - .map_err(MagickError::Process)?; - - Ok(process.bytes_read(bytes)) + convert(input.magick_format(), output.magick_format(), false, bytes).await } -pub(super) fn convert_animation( +pub(super) async fn convert_animation( input: AnimationFormat, output: AnimationFormat, bytes: Bytes, ) -> Result { - let input_arg = format!("{}:-", input.magick_format()); - let output_arg = format!("{}:-", output.magick_format()); - - let process = Process::run("magick", &["-strip", &input_arg, "-coalesce", &output_arg]) - .map_err(MagickError::Process)?; - - Ok(process.bytes_read(bytes)) + convert(input.magick_format(), output.magick_format(), true, bytes).await } -pub(super) fn convert_video( +pub(super) async fn convert_video( input: AnimationFormat, output: OutputVideoFormat, bytes: Bytes, ) -> Result { - let input_arg = format!("{}:-", input.magick_format()); - let output_arg = format!("{}:-", output.magick_format()); - - let process = Process::run("magick", &["-strip", &input_arg, "-coalesce", &output_arg]) - .map_err(MagickError::Process)?; - - Ok(process.bytes_read(bytes)) + convert(input.magick_format(), output.magick_format(), true, bytes).await +} + +async fn convert( + input: &'static str, + output: &'static str, + coalesce: bool, + bytes: Bytes, +) -> Result { + let input_file = crate::tmp_file::tmp_file(None); + let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; + + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(MagickError::CreateDir)?; + + let mut tmp_one = crate::file::File::create(&input_file) + .await + .map_err(MagickError::CreateFile)?; + tmp_one + .write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; + tmp_one.close().await.map_err(MagickError::CloseFile)?; + + let input_arg = format!("{input}:{input_file_str}"); + let output_arg = format!("{output}:-"); + + let process = if coalesce { + Process::run( + "magick", + &[ + "convert", + "-strip", + "-auto-orient", + &input_arg, + "-coalesce", + &output_arg, + ], + ) + .map_err(MagickError::Process)? + } else { + Process::run( + "magick", + &["convert", "-strip", "-auto-orient", &input_arg, &output_arg], + ) + .map_err(MagickError::Process)? + }; + + let reader = process.read(); + + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file); + + Ok(Box::pin(clean_reader)) }