From fee4ed1e3eab6b521fbd7d02aa3f4b29511e35c6 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 5 Aug 2023 12:41:06 -0500 Subject: [PATCH] Add process timeout --- defaults.toml | 1 + pict-rs.toml | 5 ++ src/config/commandline.rs | 8 ++ src/config/defaults.rs | 2 + src/config/file.rs | 2 + src/details.rs | 7 +- src/discover.rs | 23 ++++-- src/discover/exiftool.rs | 7 +- src/discover/ffmpeg.rs | 86 +++++++++++++-------- src/discover/magick.rs | 91 +++++++++++++++-------- src/exiftool.rs | 11 ++- src/ffmpeg.rs | 2 + src/generate.rs | 6 +- src/ingest.rs | 4 +- src/lib.rs | 38 ++++++++-- src/magick.rs | 7 +- src/migrate_store.rs | 36 +++++++-- src/process.rs | 152 ++++++++++++++++++++++---------------- src/validate.rs | 26 +++++-- src/validate/exiftool.rs | 7 +- src/validate/ffmpeg.rs | 5 +- src/validate/magick.rs | 9 ++- 22 files changed, 358 insertions(+), 177 deletions(-) diff --git a/defaults.toml b/defaults.toml index 6ac6616..b6c9371 100644 --- a/defaults.toml +++ b/defaults.toml @@ -26,6 +26,7 @@ path = "/mnt" [media] max_file_size = 40 +process_timeout = 30 filters = [ "blur", "crop", diff --git a/pict-rs.toml b/pict-rs.toml index 06d4293..2bc5a3f 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -151,6 +151,11 @@ path = '/mnt' # default: 40 max_file_size = 40 +## Optional: Timeout (in seconds) for all spawned media processing operations +# environment variable: PICTRS__MEDIA__PROCESS_TIMEOUT +# default: 30 +process_timeout = 30 + ## Optional: preprocessing steps for uploaded images # environment variable: PICTRS__MEDIA__PREPROCESS_STEPS # default: empty diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 282be5b..52cc4d1 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -53,6 +53,7 @@ impl Args { metrics_prometheus_address, media_preprocess_steps, media_max_file_size, + media_process_timeout, media_retention_variants, media_retention_proxy, media_image_max_width, @@ -177,6 +178,7 @@ impl Args { let media = Media { max_file_size: media_max_file_size, + process_timeout: media_process_timeout, preprocess_steps: media_preprocess_steps, filters: media_filters, retention: retention.set(), @@ -459,6 +461,8 @@ struct Media { #[serde(skip_serializing_if = "Option::is_none")] max_file_size: Option, #[serde(skip_serializing_if = "Option::is_none")] + process_timeout: Option, + #[serde(skip_serializing_if = "Option::is_none")] preprocess_steps: Option, #[serde(skip_serializing_if = "Option::is_none")] filters: Option>, @@ -801,6 +805,10 @@ struct Run { #[arg(long)] media_max_file_size: Option, + /// Timeout for any media processing operation + #[arg(long)] + media_process_timeout: Option, + /// How long to keep image "variants" around /// /// A variant is any processed version of an original image diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 98c7a1f..8a626a2 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -73,6 +73,7 @@ struct OldDbDefaults { #[serde(rename_all = "snake_case")] struct MediaDefaults { max_file_size: usize, + process_timeout: u64, filters: Vec, retention: RetentionDefaults, image: ImageDefaults, @@ -238,6 +239,7 @@ impl Default for MediaDefaults { fn default() -> Self { MediaDefaults { max_file_size: 40, + process_timeout: 30, filters: vec![ "blur".into(), "crop".into(), diff --git a/src/config/file.rs b/src/config/file.rs index af4b9dc..7f3e57c 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -168,6 +168,8 @@ pub(crate) struct OldDb { pub(crate) struct Media { pub(crate) max_file_size: usize, + pub(crate) process_timeout: u64, + #[serde(skip_serializing_if = "Option::is_none")] preprocess_steps: Option, diff --git a/src/details.rs b/src/details.rs index 1a2b15b..c12b58a 100644 --- a/src/details.rs +++ b/src/details.rs @@ -31,13 +31,13 @@ impl Details { self.content_type.type_() == "video" } - pub(crate) async fn from_bytes(input: web::Bytes) -> Result { + pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result { let DiscoveryLite { format, width, height, frames, - } = crate::discover::discover_bytes_lite(input).await?; + } = crate::discover::discover_bytes_lite(timeout, input).await?; Ok(Details::from_parts(format, width, height, frames)) } @@ -45,13 +45,14 @@ impl Details { pub(crate) async fn from_store( store: &S, identifier: &S::Identifier, + timeout: u64, ) -> Result { let DiscoveryLite { format, width, height, frames, - } = crate::discover::discover_store_lite(store, identifier).await?; + } = crate::discover::discover_store_lite(store, identifier, timeout).await?; Ok(Details::from_parts(format, width, height, frames)) } diff --git a/src/discover.rs b/src/discover.rs index eb8c557..4234fd0 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -38,13 +38,14 @@ pub(crate) enum DiscoverError { } pub(crate) async fn discover_bytes_lite( + timeout: u64, bytes: Bytes, ) -> Result { - if let Some(discovery) = ffmpeg::discover_bytes_lite(bytes.clone()).await? { + if let Some(discovery) = ffmpeg::discover_bytes_lite(timeout, bytes.clone()).await? { return Ok(discovery); } - let discovery = magick::discover_bytes_lite(bytes).await?; + let discovery = magick::discover_bytes_lite(timeout, bytes).await?; Ok(discovery) } @@ -52,28 +53,34 @@ pub(crate) async fn discover_bytes_lite( pub(crate) async fn discover_store_lite( store: &S, identifier: &S::Identifier, + timeout: u64, ) -> Result where S: Store, { if let Some(discovery) = - ffmpeg::discover_stream_lite(store.to_stream(identifier, None, None).await?).await? + ffmpeg::discover_stream_lite(timeout, store.to_stream(identifier, None, None).await?) + .await? { return Ok(discovery); } let discovery = - magick::discover_stream_lite(store.to_stream(identifier, None, None).await?).await?; + magick::discover_stream_lite(timeout, store.to_stream(identifier, None, None).await?) + .await?; Ok(discovery) } -pub(crate) async fn discover_bytes(bytes: Bytes) -> Result { - let discovery = ffmpeg::discover_bytes(bytes.clone()).await?; +pub(crate) async fn discover_bytes( + timeout: u64, + bytes: Bytes, +) -> Result { + let discovery = ffmpeg::discover_bytes(timeout, bytes.clone()).await?; - let discovery = magick::confirm_bytes(discovery, bytes.clone()).await?; + let discovery = magick::confirm_bytes(discovery, timeout, bytes.clone()).await?; - let discovery = exiftool::check_reorient(discovery, bytes).await?; + let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?; Ok(discovery) } diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index b291454..1530eac 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -16,11 +16,12 @@ pub(super) async fn check_reorient( height, frames, }: Discovery, + timeout: u64, bytes: Bytes, ) -> Result { let input = match input { InputFile::Image(ImageInput { format, .. }) => { - let needs_reorient = needs_reorienting(bytes).await?; + let needs_reorient = needs_reorienting(bytes, timeout).await?; InputFile::Image(ImageInput { format, @@ -39,8 +40,8 @@ pub(super) async fn check_reorient( } #[tracing::instrument(level = "trace", skip(input))] -async fn needs_reorienting(input: Bytes) -> Result { - let process = Process::run("exiftool", &["-n", "-Orientation", "-"])?; +async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { + let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout)?; let mut reader = process.bytes_read(input); let mut buf = String::new(); diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 51d4e75..10fd88f 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -63,46 +63,65 @@ struct Flags { alpha: usize, } -pub(super) async fn discover_bytes(bytes: Bytes) -> Result, FfMpegError> { - discover_file_full(move |mut file| { - let bytes = bytes.clone(); +pub(super) async fn discover_bytes( + timeout: u64, + bytes: Bytes, +) -> Result, FfMpegError> { + discover_file_full( + move |mut file| { + let bytes = bytes.clone(); - async move { - file.write_from_bytes(bytes) - .await - .map_err(FfMpegError::Write)?; - Ok(file) - } - }) + async move { + file.write_from_bytes(bytes) + .await + .map_err(FfMpegError::Write)?; + Ok(file) + } + }, + timeout, + ) .await } pub(super) async fn discover_bytes_lite( + timeout: u64, bytes: Bytes, ) -> Result, FfMpegError> { - discover_file_lite(move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(FfMpegError::Write)?; - Ok(file) - }) + discover_file_lite( + move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(FfMpegError::Write)?; + Ok(file) + }, + timeout, + ) .await } -pub(super) async fn discover_stream_lite(stream: S) -> Result, FfMpegError> +pub(super) async fn discover_stream_lite( + timeout: u64, + stream: S, +) -> Result, FfMpegError> where S: Stream> + Unpin, { - discover_file_lite(move |mut file| async move { - file.write_from_stream(stream) - .await - .map_err(FfMpegError::Write)?; - Ok(file) - }) + discover_file_lite( + move |mut file| async move { + file.write_from_stream(stream) + .await + .map_err(FfMpegError::Write)?; + Ok(file) + }, + timeout, + ) .await } -async fn discover_file_lite(f: F) -> Result, FfMpegError> +async fn discover_file_lite( + f: F, + timeout: u64, +) -> Result, FfMpegError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -112,7 +131,7 @@ where width, height, frames, - }) = discover_file(f) + }) = discover_file(f, timeout) .await? else { return Ok(None); }; @@ -130,12 +149,12 @@ where })) } -async fn discover_file_full(f: F) -> Result, FfMpegError> +async fn discover_file_full(f: F, timeout: u64) -> Result, FfMpegError> where F: Fn(crate::file::File) -> Fut + Clone, Fut: std::future::Future>, { - let Some(DiscoveryLite { format, width, height, frames }) = discover_file(f.clone()).await? else { + let Some(DiscoveryLite { format, width, height, frames }) = discover_file(f.clone(), timeout).await? else { return Ok(None); }; @@ -143,12 +162,12 @@ where InternalFormat::Video(InternalVideoFormat::Webm) => { static ALPHA_PIXEL_FORMATS: OnceLock> = OnceLock::new(); - let format = pixel_format(f).await?; + let format = pixel_format(f, timeout).await?; let alpha = match ALPHA_PIXEL_FORMATS.get() { Some(alpha_pixel_formats) => alpha_pixel_formats.contains(&format), None => { - let pixel_formats = alpha_pixel_formats().await?; + let pixel_formats = alpha_pixel_formats(timeout).await?; let alpha = pixel_formats.contains(&format); let _ = ALPHA_PIXEL_FORMATS.set(pixel_formats); alpha @@ -187,7 +206,7 @@ where } #[tracing::instrument(skip(f))] -async fn discover_file(f: F) -> Result, FfMpegError> +async fn discover_file(f: F, timeout: u64) -> Result, FfMpegError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -220,6 +239,7 @@ where "json", input_file_str, ], + timeout, )?; let mut output = Vec::new(); @@ -237,7 +257,7 @@ where parse_discovery(output) } -async fn pixel_format(f: F) -> Result +async fn pixel_format(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -267,6 +287,7 @@ where "compact=p=0:nk=1", input_file_str, ], + timeout, )?; let mut output = Vec::new(); @@ -283,7 +304,7 @@ where Ok(String::from_utf8_lossy(&output).trim().to_string()) } -async fn alpha_pixel_formats() -> Result, FfMpegError> { +async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegError> { let process = Process::run( "ffprobe", &[ @@ -296,6 +317,7 @@ async fn alpha_pixel_formats() -> Result, FfMpegError> { "-print_format", "json", ], + timeout, )?; let mut output = Vec::new(); diff --git a/src/discover/magick.rs b/src/discover/magick.rs index 32b0de6..ffadd3e 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -49,31 +49,44 @@ impl Discovery { } } -pub(super) async fn discover_bytes_lite(bytes: Bytes) -> Result { - discover_file_lite(move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(MagickError::Write)?; - Ok(file) - }) +pub(super) async fn discover_bytes_lite( + timeout: u64, + bytes: Bytes, +) -> Result { + discover_file_lite( + move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; + Ok(file) + }, + timeout, + ) .await } -pub(super) async fn discover_stream_lite(stream: S) -> Result +pub(super) async fn discover_stream_lite( + timeout: u64, + stream: S, +) -> Result where S: Stream> + Unpin + 'static, { - discover_file_lite(move |mut file| async move { - file.write_from_stream(stream) - .await - .map_err(MagickError::Write)?; - Ok(file) - }) + discover_file_lite( + move |mut file| async move { + file.write_from_stream(stream) + .await + .map_err(MagickError::Write)?; + Ok(file) + }, + timeout, + ) .await } pub(super) async fn confirm_bytes( discovery: Option, + timeout: u64, bytes: Bytes, ) -> Result { match discovery { @@ -83,12 +96,15 @@ pub(super) async fn confirm_bytes( height, .. }) => { - let frames = count_avif_frames(move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(MagickError::Write)?; - Ok(file) - }) + let frames = count_avif_frames( + move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; + Ok(file) + }, + timeout, + ) .await?; return Ok(Discovery { @@ -110,17 +126,20 @@ pub(super) async fn confirm_bytes( } } - discover_file(move |mut file| async move { - file.write_from_bytes(bytes) - .await - .map_err(MagickError::Write)?; + discover_file( + move |mut file| async move { + file.write_from_bytes(bytes) + .await + .map_err(MagickError::Write)?; - Ok(file) - }) + Ok(file) + }, + timeout, + ) .await } -async fn count_avif_frames(f: F) -> Result +async fn count_avif_frames(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -137,7 +156,11 @@ where let tmp_one = (f)(tmp_one).await?; tmp_one.close().await.map_err(MagickError::CloseFile)?; - let process = Process::run("magick", &["convert", "-ping", input_file_str, "INFO:"])?; + let process = Process::run( + "magick", + &["convert", "-ping", input_file_str, "INFO:"], + timeout, + )?; let mut output = String::new(); process @@ -166,15 +189,15 @@ where Ok(lines) } -async fn discover_file_lite(f: F) -> Result +async fn discover_file_lite(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - discover_file(f).await.map(Discovery::lite) + discover_file(f, timeout).await.map(Discovery::lite) } -async fn discover_file(f: F) -> Result +async fn discover_file(f: F, timeout: u64) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -191,7 +214,11 @@ where let tmp_one = (f)(tmp_one).await?; tmp_one.close().await.map_err(MagickError::CloseFile)?; - let process = Process::run("magick", &["convert", "-ping", input_file_str, "JSON:"])?; + let process = Process::run( + "magick", + &["convert", "-ping", input_file_str, "JSON:"], + timeout, + )?; let mut output = Vec::new(); process diff --git a/src/exiftool.rs b/src/exiftool.rs index 8b107b9..5e3427c 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -31,8 +31,8 @@ impl ExifError { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) async fn needs_reorienting(input: Bytes) -> Result { - let process = Process::run("exiftool", &["-n", "-Orientation", "-"])?; +pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { + let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout)?; let mut reader = process.bytes_read(input); let mut buf = String::new(); @@ -45,8 +45,11 @@ pub(crate) async fn needs_reorienting(input: Bytes) -> Result { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result { - let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?; +pub(crate) fn clear_metadata_bytes_read( + timeout: u64, + input: Bytes, +) -> Result { + let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?; Ok(process.bytes_read(input)) } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index ffa2067..0cb1a6d 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -113,6 +113,7 @@ pub(crate) async fn thumbnail( from: S::Identifier, input_format: InternalVideoFormat, format: ThumbnailFormat, + timeout: u64, ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension())); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; @@ -155,6 +156,7 @@ pub(crate) async fn thumbnail( format.as_ffmpeg_format(), output_file_str, ], + timeout, )?; process.wait().await?; diff --git a/src/generate.rs b/src/generate.rs index 8c0ad00..3498c1b 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -107,6 +107,7 @@ async fn process( identifier, input_format.unwrap_or(InternalVideoFormat::Mp4), thumbnail_format, + media.process_timeout, ) .await?; @@ -123,7 +124,7 @@ async fn process( let input_details = if let Some(details) = repo.details(&identifier).await? { details } else { - let details = Details::from_store(store, &identifier).await?; + let details = Details::from_store(store, &identifier, media.process_timeout).await?; repo.relate_details(&identifier, &details).await?; @@ -151,6 +152,7 @@ async fn process( input_format, format, quality, + media.process_timeout, ) .await?; @@ -163,7 +165,7 @@ async fn process( drop(permit); - let details = Details::from_bytes(bytes.clone()).await?; + let details = Details::from_bytes(media.process_timeout, bytes.clone()).await?; let identifier = store .save_bytes(bytes.clone(), details.media_type()) diff --git a/src/ingest.rs b/src/ingest.rs index 3c77ffb..e797791 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -64,7 +64,8 @@ where }; tracing::trace!("Validating bytes"); - let (input_type, validated_reader) = crate::validate::validate_bytes(bytes, prescribed).await?; + let (input_type, validated_reader) = + crate::validate::validate_bytes(bytes, prescribed, media.process_timeout).await?; let processed_reader = if let Some(operations) = media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -84,6 +85,7 @@ where format, format, quality, + media.process_timeout, ) .await?; diff --git a/src/lib.rs b/src/lib.rs index 1070491..311fc3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,7 +122,8 @@ async fn ensure_details( } tracing::debug!("generating new details from {:?}", identifier); - let new_details = Details::from_store(store, &identifier).await?; + let new_details = + Details::from_store(store, &identifier, config.media.process_timeout).await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -798,7 +799,8 @@ async fn process( } tracing::debug!("generating new details from {:?}", identifier); - let new_details = Details::from_store(&store, &identifier).await?; + let new_details = + Details::from_store(&store, &identifier, config.media.process_timeout).await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -927,7 +929,8 @@ async fn process_head( } tracing::debug!("generating new details from {:?}", identifier); - let new_details = Details::from_store(&store, &identifier).await?; + let new_details = + Details::from_store(&store, &identifier, config.media.process_timeout).await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -1744,6 +1747,7 @@ async fn migrate_inner( from: S1, to: config::primitives::Store, skip_missing_files: bool, + timeout: u64, ) -> color_eyre::Result<()> where S1: Store + 'static, @@ -1753,7 +1757,9 @@ where let to = FileStore::build(path.clone(), repo.clone()).await?; match repo { - Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, + Repo::Sled(repo) => { + migrate_store(repo, from, to, skip_missing_files, timeout).await? + } } } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { @@ -1789,7 +1795,9 @@ where .build(client); match repo { - Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, + Repo::Sled(repo) => { + migrate_store(repo, from, to, skip_missing_files, timeout).await? + } } } } @@ -1897,7 +1905,15 @@ impl PictRsConfiguration { match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(repo, client, from, to, skip_missing_files).await?; + migrate_inner( + repo, + client, + from, + to, + skip_missing_files, + config.media.process_timeout, + ) + .await?; } config::primitives::Store::ObjectStorage( config::primitives::ObjectStorage { @@ -1933,7 +1949,15 @@ impl PictRsConfiguration { .await? .build(client.clone()); - migrate_inner(repo, client, from, to, skip_missing_files).await?; + migrate_inner( + repo, + client, + from, + to, + skip_missing_files, + config.media.process_timeout, + ) + .await?; } } diff --git a/src/magick.rs b/src/magick.rs index bc2aaa3..00737ef 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -68,6 +68,7 @@ async fn process_image( input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, + timeout: u64, write_file: F, ) -> Result where @@ -108,7 +109,7 @@ where } args.push(&output_arg); - let reader = Process::run("magick", &args)?.read(); + let reader = Process::run("magick", &args, timeout)?.read(); let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file); @@ -122,6 +123,7 @@ pub(crate) async fn process_image_store_read( input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, + timeout: u64, ) -> Result { let stream = store .to_stream(identifier, None, None) @@ -133,6 +135,7 @@ pub(crate) async fn process_image_store_read( input_format, format, quality, + timeout, |mut tmp_file| async move { tmp_file .write_from_stream(stream) @@ -150,12 +153,14 @@ pub(crate) async fn process_image_async_read( input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, + timeout: u64, ) -> Result { process_image( args, input_format, format, quality, + timeout, |mut tmp_file| async move { tmp_file .write_from_async_read(async_read) diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 922b8a8..a716b12 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -17,6 +17,7 @@ pub(super) async fn migrate_store( from: S1, to: S2, skip_missing_files: bool, + timeout: u64, ) -> Result<(), Error> where S1: Store + Clone + 'static, @@ -38,8 +39,14 @@ where let mut failure_count = 0; - while let Err(e) = - do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await + while let Err(e) = do_migrate_store( + repo.clone(), + from.clone(), + to.clone(), + skip_missing_files, + timeout, + ) + .await { tracing::error!("Migration failed with {}", format!("{e:?}")); @@ -69,6 +76,7 @@ struct MigrateState { pct: AtomicU64, index: AtomicU64, started_at: Instant, + timeout: u64, } async fn do_migrate_store( @@ -76,6 +84,7 @@ async fn do_migrate_store( from: S1, to: S2, skip_missing_files: bool, + timeout: u64, ) -> Result<(), Error> where S1: Store + 'static, @@ -110,6 +119,7 @@ where pct: AtomicU64::new(initial_repo_size / 100), index: AtomicU64::new(0), started_at: Instant::now(), + timeout, }); let mut joinset = tokio::task::JoinSet::new(); @@ -160,6 +170,7 @@ where pct, index, started_at, + timeout, } = state; let current_index = index.fetch_add(1, Ordering::Relaxed); @@ -212,7 +223,7 @@ where if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; repo.relate_motion_identifier(hash.clone().into(), &new_identifier) @@ -244,7 +255,7 @@ where for (variant, identifier) in repo.variants(hash.clone().into()).await? { if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; repo.remove_variant(hash.clone().into(), variant.clone()) @@ -280,7 +291,16 @@ where } } - match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { + match migrate_file( + repo, + from, + to, + &original_identifier, + *skip_missing_files, + *timeout, + ) + .await + { Ok(new_identifier) => { migrate_details(repo, &original_identifier, &new_identifier).await?; repo.update_identifier(hash.clone().into(), &new_identifier) @@ -338,6 +358,7 @@ async fn migrate_file( to: &S2, identifier: &S1::Identifier, skip_missing_files: bool, + timeout: u64, ) -> Result where R: IdentifierRepo, @@ -347,7 +368,7 @@ where let mut failure_count = 0; loop { - match do_migrate_file(repo, from, to, identifier).await { + match do_migrate_file(repo, from, to, identifier, timeout).await { Ok(identifier) => return Ok(identifier), Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { return Err(MigrateError::From(e)); @@ -380,6 +401,7 @@ async fn do_migrate_file( from: &S1, to: &S2, identifier: &S1::Identifier, + timeout: u64, ) -> Result where R: IdentifierRepo, @@ -407,7 +429,7 @@ where let details = if let Some(details) = details_opt { details } else { - let new_details = Details::from_store(from, identifier) + let new_details = Details::from_store(from, identifier, timeout) .await .map_err(MigrateError::Details)?; repo.relate_details(identifier, &new_details) diff --git a/src/process.rs b/src/process.rs index abd7fc3..8a9667b 100644 --- a/src/process.rs +++ b/src/process.rs @@ -5,11 +5,11 @@ use std::{ pin::Pin, process::{ExitStatus, Stdio}, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, - process::{Child, ChildStdin, Command}, + process::{Child, ChildStdin, ChildStdout, Command}, sync::oneshot::{channel, Receiver}, }; use tracing::{Instrument, Span}; @@ -56,6 +56,7 @@ pub(crate) struct Process { command: String, child: Child, guard: MetricsGuard, + timeout: Duration, } impl std::fmt::Debug for Process { @@ -68,15 +69,14 @@ struct DropHandle { inner: JoinHandle<()>, } -pin_project_lite::pin_project! { - struct ProcessRead { - #[pin] - inner: I, - err_recv: Receiver, - err_closed: bool, - handle: DropHandle, - eof: bool, - } +pub(crate) struct ProcessRead { + inner: I, + err_recv: Receiver, + err_closed: bool, + #[allow(dead_code)] + handle: DropHandle, + eof: bool, + sleep: Pin>, } #[derive(Debug, thiserror::Error)] @@ -90,6 +90,9 @@ pub(crate) enum ProcessError { #[error("Reached process spawn limit")] LimitReached, + #[error("{0} timed out")] + Timeout(String), + #[error("{0} Failed with {1}")] Status(String, ExitStatus), @@ -98,9 +101,9 @@ pub(crate) enum ProcessError { } impl Process { - pub(crate) fn run(command: &str, args: &[&str]) -> Result { + pub(crate) fn run(command: &str, args: &[&str], timeout: u64) -> Result { let res = tracing::trace_span!(parent: None, "Create command", %command) - .in_scope(|| Self::spawn(command, Command::new(command).args(args))); + .in_scope(|| Self::spawn(command, Command::new(command).args(args), timeout)); match res { Ok(this) => Ok(this), @@ -115,7 +118,7 @@ impl Process { } } - fn spawn(command: &str, cmd: &mut Command) -> std::io::Result { + fn spawn(command: &str, cmd: &mut Command, timeout: u64) -> std::io::Result { tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { let guard = MetricsGuard::guard(command.into()); @@ -128,133 +131,152 @@ impl Process { child, command: String::from(command), guard, + timeout: Duration::from_secs(timeout), }) }) } #[tracing::instrument(skip(self))] - pub(crate) async fn wait(mut self) -> Result<(), ProcessError> { - let res = self.child.wait().await; + pub(crate) async fn wait(self) -> Result<(), ProcessError> { + let Process { + command, + mut child, + guard, + timeout, + } = self; + + let res = actix_rt::time::timeout(timeout, child.wait()).await; match res { - Ok(status) if status.success() => { - self.guard.disarm(); + Ok(Ok(status)) if status.success() => { + guard.disarm(); Ok(()) } - Ok(status) => Err(ProcessError::Status(self.command, status)), - Err(e) => Err(ProcessError::Other(e)), + Ok(Ok(status)) => Err(ProcessError::Status(command, status)), + Ok(Err(e)) => Err(ProcessError::Other(e)), + Err(_) => { + child.kill().await.map_err(ProcessError::Other)?; + + Err(ProcessError::Timeout(command)) + } } } - pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin { + pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { self.spawn_fn(move |mut stdin| { let mut input = input; async move { stdin.write_all_buf(&mut input).await } }) } - pub(crate) fn read(self) -> impl AsyncRead + Unpin { + pub(crate) fn read(self) -> ProcessRead { self.spawn_fn(|_| async { Ok(()) }) } #[allow(unknown_lints)] #[allow(clippy::let_with_type_underscore)] #[tracing::instrument(level = "trace", skip_all)] - fn spawn_fn(mut self, f: F) -> impl AsyncRead + Unpin + fn spawn_fn(self, f: F) -> ProcessRead where F: FnOnce(ChildStdin) -> Fut + 'static, Fut: Future>, { - let stdin = self.child.stdin.take().expect("stdin exists"); - let stdout = self.child.stdout.take().expect("stdout exists"); + let Process { + command, + mut child, + guard, + timeout, + } = self; - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %self.command) + let stdin = child.stdin.take().expect("stdin exists"); + let stdout = child.stdout.take().expect("stdout exists"); + + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %command) .in_scope(channel::); - let span = tracing::info_span!(parent: None, "Background process task", %self.command); + let span = tracing::info_span!(parent: None, "Background process task", %command); span.follows_from(Span::current()); - let mut child = self.child; - let command = self.command; - let guard = self.guard; let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { actix_rt::spawn( async move { - if let Err(e) = (f)(stdin).await { - let _ = tx.send(e); - return; - } + let child_fut = async { + (f)(stdin).await?; - match child.wait().await { - Ok(status) => { - if status.success() { - guard.disarm(); - } else { - let _ = tx.send(std::io::Error::new( - std::io::ErrorKind::Other, - StatusError(status), - )); - } + child.wait().await + }; + + let error = match actix_rt::time::timeout(timeout, child_fut).await { + Ok(Ok(status)) if status.success() => { + guard.disarm(); + return; } - Err(e) => { - let _ = tx.send(e); + Ok(Ok(status)) => { + std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) } - } + Ok(Err(e)) => e, + Err(_) => std::io::ErrorKind::TimedOut.into(), + }; + + let _ = tx.send(error); + let _ = child.kill().await; } .instrument(span), ) }); + let sleep = actix_rt::time::sleep(timeout); + ProcessRead { inner: stdout, err_recv: rx, err_closed: false, handle: DropHandle { inner: handle }, eof: false, + sleep: Box::pin(sleep), } } } impl AsyncRead for ProcessRead where - I: AsyncRead, + I: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let this = self.as_mut().project(); - - let err_recv = this.err_recv; - let err_closed = this.err_closed; - let eof = this.eof; - let inner = this.inner; - - if !*err_closed { - if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) { - *err_closed = true; + if !self.err_closed { + if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { + self.err_closed = true; if let Ok(err) = res { return Poll::Ready(Err(err)); } - if *eof { + if self.eof { return Poll::Ready(Ok(())); } } + + if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) { + self.err_closed = true; + + return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())); + } } - if !*eof { + if !self.eof { let before_size = buf.filled().len(); - return match inner.poll_read(cx, buf) { + return match Pin::new(&mut self.inner).poll_read(cx, buf) { Poll::Ready(Ok(())) => { if buf.filled().len() == before_size { - *eof = true; + self.eof = true; - if !*err_closed { + if !self.err_closed { // reached end of stream & haven't received process signal return Poll::Pending; } @@ -263,7 +285,7 @@ where Poll::Ready(Ok(())) } Poll::Ready(Err(e)) => { - *eof = true; + self.eof = true; Poll::Ready(Err(e)) } @@ -271,7 +293,7 @@ where }; } - if *err_closed && *eof { + if self.err_closed && self.eof { return Poll::Ready(Ok(())); } diff --git a/src/validate.rs b/src/validate.rs index 00e29bb..1b3e58b 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -44,6 +44,7 @@ const MEGABYTES: usize = 1024 * 1024; pub(crate) async fn validate_bytes( bytes: Bytes, validations: Validations<'_>, + timeout: u64, ) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); @@ -54,12 +55,12 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(bytes.clone()).await?; + } = crate::discover::discover_bytes(timeout, bytes.clone()).await?; match &input { InputFile::Image(input) => { let (format, read) = - process_image(bytes, *input, width, height, validations.image).await?; + process_image(bytes, *input, width, height, validations.image, timeout).await?; Ok((format, Either::left(read))) } @@ -71,6 +72,7 @@ pub(crate) async fn validate_bytes( height, frames.unwrap_or(1), &validations, + timeout, ) .await?; @@ -84,6 +86,7 @@ pub(crate) async fn validate_bytes( height, frames.unwrap_or(1), validations.video, + timeout, ) .await?; @@ -99,6 +102,7 @@ async fn process_image( width: u16, height: u16, validations: &crate::config::Image, + timeout: u64, ) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { if width > validations.max_width { return Err(ValidationError::Width.into()); @@ -121,9 +125,9 @@ async fn process_image( let read = if needs_transcode { let quality = validations.quality_for(format); - Either::left(magick::convert_image(input.format, format, quality, bytes).await?) + Either::left(magick::convert_image(input.format, format, quality, timeout, bytes).await?) } else { - Either::right(exiftool::clear_metadata_bytes_read(bytes)?) + Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?) }; Ok((InternalFormat::Image(format), read)) @@ -163,6 +167,7 @@ async fn process_animation( height: u16, frames: u32, validations: &Validations<'_>, + timeout: u64, ) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { match validate_animation(bytes.len(), width, height, frames, validations.animation) { Ok(()) => { @@ -174,9 +179,13 @@ async fn process_animation( let read = if needs_transcode { let quality = validations.animation.quality_for(format); - Either::left(magick::convert_animation(input, format, quality, bytes).await?) + Either::left( + magick::convert_animation(input, format, quality, timeout, bytes).await?, + ) } else { - Either::right(Either::left(exiftool::clear_metadata_bytes_read(bytes)?)) + Either::right(Either::left(exiftool::clear_metadata_bytes_read( + bytes, timeout, + )?)) }; Ok((InternalFormat::Animation(format), read)) @@ -190,7 +199,7 @@ async fn process_animation( ); let read = Either::right(Either::right( - magick::convert_video(input, output, bytes).await?, + magick::convert_video(input, output, timeout, bytes).await?, )); Ok((InternalFormat::Video(output.internal_format()), read)) @@ -237,6 +246,7 @@ async fn process_video( height: u16, frames: u32, validations: &crate::config::Video, + timeout: u64, ) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { validate_video(bytes.len(), width, height, frames, validations)?; @@ -248,7 +258,7 @@ async fn process_video( let crf = validations.crf_for(width, height); - let read = ffmpeg::transcode_bytes(input, output, crf, bytes).await?; + let read = ffmpeg::transcode_bytes(input, output, crf, timeout, bytes).await?; Ok((InternalFormat::Video(output.internal_format()), read)) } diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index 2c28b10..da557bc 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -4,8 +4,11 @@ use tokio::io::AsyncRead; use crate::{exiftool::ExifError, process::Process}; #[tracing::instrument(level = "trace", skip(input))] -pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result { - let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?; +pub(crate) fn clear_metadata_bytes_read( + input: Bytes, + timeout: u64, +) -> Result { + let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?; Ok(process.bytes_read(input)) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 41cbc8d..7954a61 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -11,6 +11,7 @@ pub(super) async fn transcode_bytes( input_format: VideoFormat, output_format: OutputVideoFormat, crf: u8, + timeout: u64, bytes: Bytes, ) -> Result { let input_file = crate::tmp_file::tmp_file(None); @@ -37,6 +38,7 @@ pub(super) async fn transcode_bytes( output_file_str, output_format, crf, + timeout, ) .await?; @@ -59,6 +61,7 @@ async fn transcode_files( output_path: &str, output_format: OutputVideoFormat, crf: u8, + timeout: u64, ) -> Result<(), FfMpegError> { let mut args = vec![ "-hide_banner", @@ -96,7 +99,7 @@ async fn transcode_files( output_path, ]); - Process::run("ffmpeg", &args)?.wait().await?; + Process::run("ffmpeg", &args, timeout)?.wait().await?; Ok(()) } diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 89c7bad..67ddeb1 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -11,6 +11,7 @@ pub(super) async fn convert_image( input: ImageFormat, output: ImageFormat, quality: Option, + timeout: u64, bytes: Bytes, ) -> Result { convert( @@ -18,6 +19,7 @@ pub(super) async fn convert_image( output.magick_format(), false, quality, + timeout, bytes, ) .await @@ -27,6 +29,7 @@ pub(super) async fn convert_animation( input: AnimationFormat, output: AnimationFormat, quality: Option, + timeout: u64, bytes: Bytes, ) -> Result { convert( @@ -34,6 +37,7 @@ pub(super) async fn convert_animation( output.magick_format(), true, quality, + timeout, bytes, ) .await @@ -42,6 +46,7 @@ pub(super) async fn convert_animation( pub(super) async fn convert_video( input: AnimationFormat, output: OutputVideoFormat, + timeout: u64, bytes: Bytes, ) -> Result { convert( @@ -49,6 +54,7 @@ pub(super) async fn convert_video( output.magick_format(), true, None, + timeout, bytes, ) .await @@ -59,6 +65,7 @@ async fn convert( output: &'static str, coalesce: bool, quality: Option, + timeout: u64, bytes: Bytes, ) -> Result { let input_file = crate::tmp_file::tmp_file(None); @@ -95,7 +102,7 @@ async fn convert( args.push(&output_arg); - let reader = Process::run("magick", &args)?.read(); + let reader = Process::run("magick", &args, timeout)?.read(); let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file);