From 6fa79b918884046988995df491ebbccc2f9f92cf Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 23 Dec 2023 11:58:20 -0600 Subject: [PATCH] Do more cleanup inline --- src/discover/exiftool.rs | 2 +- src/discover/ffmpeg.rs | 6 +-- src/discover/magick.rs | 18 ++++++--- src/exiftool.rs | 2 +- src/ffmpeg.rs | 4 ++ src/generate.rs | 14 ++++--- src/generate/ffmpeg.rs | 2 +- src/generate/magick.rs | 16 +++----- src/magick.rs | 26 +++++-------- src/process.rs | 50 +++++++++++++++++++----- src/tmp_file.rs | 82 +++++++++++++++++++++++++++------------- src/validate/ffmpeg.rs | 2 + 12 files changed, 145 insertions(+), 79 deletions(-) diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index b8bba46..253810f 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -43,7 +43,7 @@ pub(super) async fn check_reorient( async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? .bytes_read(input) - .to_string() + .into_string() .await?; Ok(!buf.is_empty()) diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 7a930e3..d1bc06f 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -229,10 +229,10 @@ where timeout, )? .read() - .to_vec() + .into_vec() .await?; - drop(input_file); + input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; @@ -273,7 +273,7 @@ async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegErro timeout, )? .read() - .to_vec() + .into_vec() .await?; let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; diff --git a/src/discover/magick.rs b/src/discover/magick.rs index ac679b1..72051a1 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -129,11 +129,14 @@ where timeout, )? .read() - .to_string() + .into_string() .await?; - drop(input_file); - drop(temporary_path); + input_file.cleanup().await.map_err(MagickError::Cleanup)?; + temporary_path + .cleanup() + .await + .map_err(MagickError::Cleanup)?; if output.is_empty() { return Err(MagickError::Empty); @@ -192,11 +195,14 @@ where timeout, )? .read() - .to_vec() + .into_vec() .await?; - drop(input_file); - drop(temporary_path); + input_file.cleanup().await.map_err(MagickError::Cleanup)?; + temporary_path + .cleanup() + .await + .map_err(MagickError::Cleanup)?; if output.is_empty() { return Err(MagickError::Empty); diff --git a/src/exiftool.rs b/src/exiftool.rs index cc23813..831ef9b 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -42,7 +42,7 @@ impl ExifError { pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? .bytes_read(input) - .to_string() + .into_string() .await?; Ok(!buf.is_empty()) diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index b55476e..cd11b8f 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -26,6 +26,9 @@ pub(crate) enum FfMpegError { #[error("Error closing file")] CloseFile(#[source] std::io::Error), + #[error("Error cleaning up after command")] + Cleanup(#[source] std::io::Error), + #[error("Error in store")] Store(#[source] StoreError), @@ -53,6 +56,7 @@ impl FfMpegError { | Self::CreateDir(_) | Self::ReadFile(_) | Self::OpenFile(_) + | Self::Cleanup(_) | Self::CreateFile(_) | Self::CloseFile(_) => ErrorCode::COMMAND_ERROR, } diff --git a/src/generate.rs b/src/generate.rs index bebf2e2..f1d2d0f 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -135,10 +135,11 @@ async fn process( ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), }; - let vec = crate::magick::process_image_store_read( + let stream = store.to_stream(&identifier, None, None).await?; + + let vec = crate::magick::process_image_stream_read( tmp_dir, - store, - &identifier, + stream, thumbnail_args, input_format, format, @@ -146,7 +147,7 @@ async fn process( config.media.process_timeout, ) .await? - .to_vec() + .into_vec() .instrument(tracing::info_span!("Reading processed image to vec")) .await?; @@ -216,10 +217,11 @@ where { let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp); + let stream = store.to_stream(&identifier, None, None).await?; + let reader = magick::thumbnail( tmp_dir, - store, - &identifier, + stream, processable_format, ProcessableFormat::Image(thumbnail_format), media.image.quality_for(thumbnail_format), diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index c90e029..0311347 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -103,7 +103,7 @@ pub(super) async fn thumbnail( )?; process.wait().await?; - drop(input_file); + input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; let tmp_two = crate::file::File::open(&output_file) .await diff --git a/src/generate/magick.rs b/src/generate/magick.rs index e745767..5e5aec1 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -1,10 +1,12 @@ -use std::{ffi::OsStr, sync::Arc}; +use std::ffi::OsStr; + +use actix_web::web::Bytes; use crate::{ formats::ProcessableFormat, magick::{MagickError, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, - store::Store, + stream::LocalBoxStream, tmp_file::TmpDir, }; @@ -67,20 +69,14 @@ where Ok(reader) } -pub(super) async fn thumbnail( +pub(super) async fn thumbnail( tmp_dir: &TmpDir, - store: &S, - identifier: &Arc, + stream: LocalBoxStream<'static, std::io::Result>, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, timeout: u64, ) -> Result { - let stream = store - .to_stream(identifier, None, None) - .await - .map_err(MagickError::Store)?; - thumbnail_animation( tmp_dir, input_format, diff --git a/src/magick.rs b/src/magick.rs index 95fb164..d109987 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -1,15 +1,15 @@ -use std::{ffi::OsStr, sync::Arc}; +use std::ffi::OsStr; + +use actix_web::web::Bytes; use crate::{ error_code::ErrorCode, formats::ProcessableFormat, process::{Process, ProcessError, ProcessRead}, - store::Store, + stream::LocalBoxStream, tmp_file::TmpDir, }; - - pub(crate) const MAGICK_TEMPORARY_PATH: &str = "MAGICK_TEMPORARY_PATH"; #[derive(Debug, thiserror::Error)] @@ -17,9 +17,6 @@ pub(crate) enum MagickError { #[error("Error in imagemagick process")] Process(#[source] ProcessError), - #[error("Error in store")] - Store(#[source] crate::store::StoreError), - #[error("Invalid output format")] Json(#[source] serde_json::Error), @@ -44,6 +41,9 @@ pub(crate) enum MagickError { #[error("Invalid media file provided")] CommandFailed(ProcessError), + #[error("Error cleaning up after command")] + Cleanup(#[source] std::io::Error), + #[error("Command output is empty")] Empty, } @@ -61,7 +61,6 @@ impl MagickError { pub(crate) const fn error_code(&self) -> ErrorCode { match self { Self::CommandFailed(_) => ErrorCode::COMMAND_FAILURE, - Self::Store(e) => e.error_code(), Self::Process(e) => e.error_code(), Self::Json(_) | Self::Write(_) @@ -70,6 +69,7 @@ impl MagickError { | Self::CreateTemporaryDirectory(_) | Self::CloseFile(_) | Self::Discover(_) + | Self::Cleanup(_) | Self::Empty => ErrorCode::COMMAND_ERROR, } } @@ -148,21 +148,15 @@ where } #[allow(clippy::too_many_arguments)] -pub(crate) async fn process_image_store_read( +pub(crate) async fn process_image_stream_read( tmp_dir: &TmpDir, - store: &S, - identifier: &Arc, + stream: LocalBoxStream<'static, std::io::Result>, args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, timeout: u64, ) -> Result { - let stream = store - .to_stream(identifier, None, None) - .await - .map_err(MagickError::Store)?; - process_image( tmp_dir, args, diff --git a/src/process.rs b/src/process.rs index 53ae82b..04dfc96 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,6 +1,5 @@ use actix_web::web::Bytes; use std::{ - any::Any, ffi::OsStr, future::Future, process::{ExitStatus, Stdio}, @@ -72,12 +71,36 @@ impl std::fmt::Debug for Process { } } +#[async_trait::async_trait(?Send)] +pub(crate) trait Extras { + async fn consume(&mut self) -> std::io::Result<()>; +} + +#[async_trait::async_trait(?Send)] +impl Extras for () { + async fn consume(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl Extras for (Box, T) +where + T: Extras, +{ + async fn consume(&mut self) -> std::io::Result<()> { + let (res1, res2) = tokio::join!(self.0.consume(), self.1.consume()); + res1?; + res2 + } +} + pub(crate) struct ProcessRead { reader: BoxRead<'static>, handle: LocalBoxFuture<'static, Result<(), ProcessError>>, command: Arc, id: Uuid, - extras: Box, + extras: Box, } #[derive(Debug, thiserror::Error)] @@ -100,6 +123,9 @@ pub(crate) enum ProcessError { #[error("Failed to read stdout for {0}")] Read(Arc, #[source] std::io::Error), + #[error("Failed to cleanup for command {0}")] + Cleanup(Arc, #[source] std::io::Error), + #[error("Unknown process error")] Other(#[source] std::io::Error), } @@ -109,7 +135,9 @@ impl ProcessError { match self { Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, - Self::LimitReached | Self::Read(_, _) | Self::Other(_) => ErrorCode::COMMAND_ERROR, + Self::LimitReached | Self::Read(_, _) | Self::Cleanup(_, _) | Self::Other(_) => { + ErrorCode::COMMAND_ERROR + } Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, Self::Status(_, _) => ErrorCode::COMMAND_FAILURE, } @@ -276,7 +304,7 @@ impl ProcessRead { } } - pub(crate) async fn to_vec(self) -> Result, ProcessError> { + pub(crate) async fn into_vec(self) -> Result, ProcessError> { let cmd = self.command.clone(); self.with_stdout(move |mut stdout| async move { @@ -291,7 +319,7 @@ impl ProcessRead { .await? } - pub(crate) async fn to_string(self) -> Result { + pub(crate) async fn into_string(self) -> Result { let cmd = self.command.clone(); self.with_stdout(move |mut stdout| async move { @@ -318,21 +346,25 @@ impl ProcessRead { handle, command, id, - extras, + mut extras, } = self; let (out, res) = tokio::join!( (f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)), handle.instrument(tracing::info_span!("cmd-handle", %command, %id)) ); - res?; - drop(extras); + extras + .consume() + .await + .map_err(|e| ProcessError::Cleanup(command, e))?; + + res?; Ok(out) } - pub(crate) fn add_extras(self, more_extras: Extras) -> ProcessRead { + pub(crate) fn add_extras(self, more_extras: E) -> ProcessRead { let Self { reader, handle, diff --git a/src/tmp_file.rs b/src/tmp_file.rs index 2616448..81eda3c 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -6,6 +6,8 @@ use std::{ use uuid::Uuid; +use crate::process::Extras; + pub(crate) type ArcTmpDir = Arc; #[derive(Debug)] @@ -15,36 +17,33 @@ pub(crate) struct TmpDir { impl TmpDir { pub(crate) async fn init>(path: P) -> std::io::Result> { - let path = path.as_ref().join(Uuid::new_v4().to_string()); + let path = path.as_ref().join(Uuid::now_v7().to_string()); tokio::fs::create_dir(&path).await?; Ok(Arc::new(TmpDir { path: Some(path) })) } - fn build_tmp_file(&self, ext: Option<&str>) -> Arc { + fn build_tmp_file(&self, ext: Option<&str>) -> PathBuf { if let Some(ext) = ext { - Arc::from(self.path.as_ref().expect("tmp path exists").join(format!( - "{}{}", - Uuid::new_v4(), - ext - ))) + self.path + .as_ref() + .expect("tmp path exists") + .join(format!("{}{}", Uuid::now_v7(), ext)) } else { - Arc::from( - self.path - .as_ref() - .expect("tmp path exists") - .join(Uuid::new_v4().to_string()), - ) + self.path + .as_ref() + .expect("tmp path exists") + .join(Uuid::now_v7().to_string()) } } pub(crate) fn tmp_file(&self, ext: Option<&str>) -> TmpFile { - TmpFile(self.build_tmp_file(ext)) + TmpFile(Some(self.build_tmp_file(ext))) } pub(crate) async fn tmp_folder(&self) -> std::io::Result { let path = self.build_tmp_file(None); tokio::fs::create_dir(&path).await?; - Ok(TmpFolder(path)) + Ok(TmpFolder(Some(path))) } pub(crate) async fn cleanup(self: Arc) -> std::io::Result<()> { @@ -65,11 +64,26 @@ impl Drop for TmpDir { } #[must_use] -pub(crate) struct TmpFolder(Arc); +pub(crate) struct TmpFolder(Option); + +impl TmpFolder { + pub(crate) async fn cleanup(mut self) -> std::io::Result<()> { + self.consume().await + } +} + +#[async_trait::async_trait(?Send)] +impl Extras for TmpFolder { + async fn consume(&mut self) -> std::io::Result<()> { + tokio::fs::remove_dir_all(&self).await?; + self.0.take(); + Ok(()) + } +} impl AsRef for TmpFolder { fn as_ref(&self) -> &Path { - &self.0 + self.0.as_deref().unwrap() } } @@ -77,25 +91,39 @@ impl Deref for TmpFolder { type Target = Path; fn deref(&self) -> &Self::Target { - &self.0 + self.0.as_deref().unwrap() } } impl Drop for TmpFolder { fn drop(&mut self) { - crate::sync::spawn( - "remove-tmpfolder", - tokio::fs::remove_dir_all(self.0.clone()), - ); + if let Some(path) = self.0.take() { + let _ = std::fs::remove_dir_all(path); + } } } #[must_use] -pub(crate) struct TmpFile(Arc); +pub(crate) struct TmpFile(Option); + +impl TmpFile { + pub(crate) async fn cleanup(mut self) -> std::io::Result<()> { + self.consume().await + } +} + +#[async_trait::async_trait(?Send)] +impl Extras for TmpFile { + async fn consume(&mut self) -> std::io::Result<()> { + tokio::fs::remove_file(&self).await?; + self.0.take(); + Ok(()) + } +} impl AsRef for TmpFile { fn as_ref(&self) -> &Path { - &self.0 + self.0.as_deref().unwrap() } } @@ -103,12 +131,14 @@ impl Deref for TmpFile { type Target = Path; fn deref(&self) -> &Self::Target { - &self.0 + self.0.as_deref().unwrap() } } impl Drop for TmpFile { fn drop(&mut self) { - crate::sync::spawn("remove-tmpfile", tokio::fs::remove_file(self.0.clone())); + if let Some(path) = self.0.take() { + let _ = std::fs::remove_file(path); + } } } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index eafa6a5..ae828ce 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -44,6 +44,8 @@ pub(super) async fn transcode_bytes( ) .await?; + input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; + let tmp_two = crate::file::File::open(&output_file) .await .map_err(FfMpegError::OpenFile)?;