diff --git a/src/blurhash.rs b/src/blurhash.rs index 3c22cf9..9450923 100644 --- a/src/blurhash.rs +++ b/src/blurhash.rs @@ -98,7 +98,8 @@ async fn read_rgba_command( (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? + let process = Process::run("magick", &args, &envs, state.config.media.process_timeout) + .await? .add_extras(temporary_path); Ok(process) diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index de0b7e3..06ee1da 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -40,7 +40,8 @@ pub(super) async fn check_reorient( #[tracing::instrument(level = "trace", skip_all)] async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { - let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? + let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout) + .await? .drive_with_async_read(input.into_reader()) .into_string() .await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 0b3ccc3..7aca056 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -197,7 +197,8 @@ pub(super) async fn discover_bytes_stream( ], &[], state.config.media.process_timeout, - )? + ) + .await? .read() .into_vec() .await @@ -242,7 +243,8 @@ async fn alpha_pixel_formats(timeout: u64) -> Result, FfMpegErro ], &[], timeout, - )? + ) + .await? .read() .into_vec() .await?; diff --git a/src/discover/magick.rs b/src/discover/magick.rs index e4a7f37..d44a0dc 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -72,7 +72,8 @@ async fn discover(state: &State, stream: BytesStream) -> Result Duration::from_micros(10) { - metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string()); + metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string()).increment(1); + metrics::histogram!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED_SECONDS, "timer" => this.name.to_string()).record(elapsed.as_secs_f64()); } if elapsed > Duration::from_secs(1) { diff --git a/src/generate.rs b/src/generate.rs index 950aa7f..636b3cc 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -74,11 +74,12 @@ pub(crate) async fn generate( thumbnail_args, original_details, hash.clone(), - ); + ) + .with_poll_timer("process-future"); let (details, identifier) = process_map .process(hash, thumbnail_path, process_fut) - .with_poll_timer("process-future") + .with_poll_timer("process-map-future") .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics(crate::init_metrics::GENERATE_PROCESS) .await diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index 5dedbab..11e9cf9 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -97,7 +97,8 @@ pub(super) async fn thumbnail( ], &[], state.config.media.process_timeout, - )? + ) + .await? .wait() .await .map_err(FfMpegError::Process)?; diff --git a/src/generate/magick.rs b/src/generate/magick.rs index 2796e0c..9d2b0c3 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -43,7 +43,8 @@ pub(super) async fn thumbnail_command( (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? + let process = Process::run("magick", &args, &envs, state.config.media.process_timeout) + .await? .add_extras(temporary_path); Ok(process) diff --git a/src/init_metrics.rs b/src/init_metrics.rs index 96d42ca..e98bd96 100644 --- a/src/init_metrics.rs +++ b/src/init_metrics.rs @@ -32,9 +32,15 @@ fn describe_future() { FUTURE_POLL_TIMER_EXCEEDED, "How many times a given poll operation has lasted longer than 10 microseconds" ); + metrics::describe_histogram!( + FUTURE_POLL_TIMER_EXCEEDED_SECONDS, + "Durations for polls lasting longer than 10 microseconds" + ); } pub(crate) const FUTURE_POLL_TIMER_EXCEEDED: &str = "pict-rs.future.poll-timer.exceeded"; +pub(crate) const FUTURE_POLL_TIMER_EXCEEDED_SECONDS: &str = + "pict-rs.future.poll-timer.exceeded.seconds"; fn describe_queue_cleanup() { metrics::describe_counter!( diff --git a/src/magick.rs b/src/magick.rs index 4e6f407..f089391 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -106,7 +106,8 @@ pub(crate) async fn process_image_command( (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? + let process = Process::run("magick", &args, &envs, state.config.media.process_timeout) + .await? .add_extras(temporary_path); Ok(process) diff --git a/src/process.rs b/src/process.rs index be49cc6..525d4b7 100644 --- a/src/process.rs +++ b/src/process.rs @@ -155,7 +155,7 @@ impl ProcessError { } impl Process { - pub(crate) fn run( + pub(crate) async fn run( command: &str, args: &[T], envs: &[(&str, &OsStr)], @@ -168,15 +168,10 @@ impl Process { tracing::debug!("{envs:?} {command} {args:?}"); - let res = tracing::trace_span!(parent: None, "Create command", %command).in_scope(|| { - Self::spawn( - command.clone(), - Command::new(command.as_ref()) - .args(args) - .envs(envs.iter().copied()), - timeout, - ) - }); + let mut cmd = Command::new(command.as_ref()); + cmd.args(args).envs(envs.iter().copied()); + + let res = Self::spawn(command.clone(), cmd, timeout).await; match res { Ok(this) => Ok(this), @@ -191,16 +186,17 @@ impl Process { } } - fn spawn(command: Arc, cmd: &mut Command, timeout: u64) -> std::io::Result { - tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { - let guard = MetricsGuard::guard(command.clone()); + async fn spawn(command: Arc, mut cmd: Command, timeout: u64) -> std::io::Result { + let guard = MetricsGuard::guard(command.clone()); - let cmd = cmd - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .kill_on_drop(true); + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true); - cmd.spawn().map(|child| Process { + crate::sync::spawn_blocking("spawn-command", move || cmd.spawn()) + .await + .expect("spawn panicked") + .map(|child| Process { child, command, guard, @@ -208,7 +204,6 @@ impl Process { extras: Box::new(()), id: Uuid::now_v7(), }) - }) } pub(crate) fn add_extras(self, extra: impl Extras + 'static) -> Self { @@ -282,6 +277,7 @@ impl Process { Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => return Err(e), } + tokio::task::yield_now().await; } Ok(()) @@ -454,6 +450,16 @@ impl ProcessRead { self, f: impl FnOnce(BoxRead<'static>) -> Fut, ) -> Result + where + Fut: Future, + { + self.with_stdout_inner(f).await + } + + async fn with_stdout_inner( + self, + f: impl FnOnce(BoxRead<'static>) -> Fut, + ) -> Result where Fut: Future, { diff --git a/src/validate.rs b/src/validate.rs index ef28c10..541b7f0 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -133,7 +133,7 @@ async fn process_image_command( magick::convert_image_command(state, input.format, format, quality).await? } else { - exiftool::clear_metadata_command(state.config.media.process_timeout)? + exiftool::clear_metadata_command(state.config.media.process_timeout).await? }; Ok((InternalFormat::Image(format), process)) @@ -188,7 +188,7 @@ async fn process_animation_command( magick::convert_animation_command(state, input, format, quality).await? } else { - exiftool::clear_metadata_command(state.config.media.process_timeout)? + exiftool::clear_metadata_command(state.config.media.process_timeout).await? }; Ok((InternalFormat::Animation(format), process)) diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index b9e7199..8296b0e 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,11 +1,6 @@ use crate::{exiftool::ExifError, process::Process}; #[tracing::instrument(level = "trace", skip_all)] -pub(super) fn clear_metadata_command(timeout: u64) -> Result { - Ok(Process::run( - "exiftool", - &["-all=", "-", "-out", "-"], - &[], - timeout, - )?) +pub(super) async fn clear_metadata_command(timeout: u64) -> Result { + Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout).await?) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 90e6327..84e0c22 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -132,7 +132,10 @@ async fn transcode_files( output_path, ]); - Process::run("ffmpeg", &args, &[], timeout)?.wait().await?; + Process::run("ffmpeg", &args, &[], timeout) + .await? + .wait() + .await?; Ok(()) } diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 3e3b8e6..f484a9b 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -80,7 +80,8 @@ async fn convert( (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? + let process = Process::run("magick", &args, &envs, state.config.media.process_timeout) + .await? .add_extras(temporary_path); Ok(process)