Add more poll timers, spawn process from background threads

This commit is contained in:
asonix 2024-03-09 15:19:13 -06:00
parent 9fe586b9dd
commit e302df7e39
15 changed files with 62 additions and 41 deletions

View file

@ -98,7 +98,8 @@ async fn read_rgba_command<S>(
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
];
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)
.await?
.add_extras(temporary_path);
Ok(process)

View file

@ -40,7 +40,8 @@ pub(super) async fn check_reorient(
#[tracing::instrument(level = "trace", skip_all)]
async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> {
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)
.await?
.drive_with_async_read(input.into_reader())
.into_string()
.await?;

View file

@ -197,7 +197,8 @@ pub(super) async fn discover_bytes_stream<S>(
],
&[],
state.config.media.process_timeout,
)?
)
.await?
.read()
.into_vec()
.await
@ -242,7 +243,8 @@ async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegErro
],
&[],
timeout,
)?
)
.await?
.read()
.into_vec()
.await?;

View file

@ -72,7 +72,8 @@ async fn discover<S>(state: &State<S>, stream: BytesStream) -> Result<Discovery,
],
&envs,
state.config.media.process_timeout,
)?
)
.await?
.drive_with_async_read(stream.into_reader())
.into_string()
.await;

View file

@ -142,7 +142,8 @@ where
let elapsed = start.elapsed();
if elapsed > Duration::from_micros(10) {
metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string());
metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string()).increment(1);
metrics::histogram!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED_SECONDS, "timer" => this.name.to_string()).record(elapsed.as_secs_f64());
}
if elapsed > Duration::from_secs(1) {

View file

@ -74,11 +74,12 @@ pub(crate) async fn generate<S: Store + 'static>(
thumbnail_args,
original_details,
hash.clone(),
);
)
.with_poll_timer("process-future");
let (details, identifier) = process_map
.process(hash, thumbnail_path, process_fut)
.with_poll_timer("process-future")
.with_poll_timer("process-map-future")
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
.await

View file

@ -97,7 +97,8 @@ pub(super) async fn thumbnail<S: Store>(
],
&[],
state.config.media.process_timeout,
)?
)
.await?
.wait()
.await
.map_err(FfMpegError::Process)?;

View file

@ -43,7 +43,8 @@ pub(super) async fn thumbnail_command<S>(
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
];
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)
.await?
.add_extras(temporary_path);
Ok(process)

View file

@ -32,9 +32,15 @@ fn describe_future() {
FUTURE_POLL_TIMER_EXCEEDED,
"How many times a given poll operation has lasted longer than 10 microseconds"
);
metrics::describe_histogram!(
FUTURE_POLL_TIMER_EXCEEDED_SECONDS,
"Durations for polls lasting longer than 10 microseconds"
);
}
pub(crate) const FUTURE_POLL_TIMER_EXCEEDED: &str = "pict-rs.future.poll-timer.exceeded";
pub(crate) const FUTURE_POLL_TIMER_EXCEEDED_SECONDS: &str =
"pict-rs.future.poll-timer.exceeded.seconds";
fn describe_queue_cleanup() {
metrics::describe_counter!(

View file

@ -106,7 +106,8 @@ pub(crate) async fn process_image_command<S>(
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
];
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)
.await?
.add_extras(temporary_path);
Ok(process)

View file

@ -155,7 +155,7 @@ impl ProcessError {
}
impl Process {
pub(crate) fn run<T>(
pub(crate) async fn run<T>(
command: &str,
args: &[T],
envs: &[(&str, &OsStr)],
@ -168,15 +168,10 @@ impl Process {
tracing::debug!("{envs:?} {command} {args:?}");
let res = tracing::trace_span!(parent: None, "Create command", %command).in_scope(|| {
Self::spawn(
command.clone(),
Command::new(command.as_ref())
.args(args)
.envs(envs.iter().copied()),
timeout,
)
});
let mut cmd = Command::new(command.as_ref());
cmd.args(args).envs(envs.iter().copied());
let res = Self::spawn(command.clone(), cmd, timeout).await;
match res {
Ok(this) => Ok(this),
@ -191,16 +186,17 @@ impl Process {
}
}
fn spawn(command: Arc<str>, cmd: &mut Command, timeout: u64) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| {
async fn spawn(command: Arc<str>, mut cmd: Command, timeout: u64) -> std::io::Result<Self> {
let guard = MetricsGuard::guard(command.clone());
let cmd = cmd
.stdin(Stdio::piped())
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true);
cmd.spawn().map(|child| Process {
crate::sync::spawn_blocking("spawn-command", move || cmd.spawn())
.await
.expect("spawn panicked")
.map(|child| Process {
child,
command,
guard,
@ -208,7 +204,6 @@ impl Process {
extras: Box::new(()),
id: Uuid::now_v7(),
})
})
}
pub(crate) fn add_extras(self, extra: impl Extras + 'static) -> Self {
@ -282,6 +277,7 @@ impl Process {
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Err(e) => return Err(e),
}
tokio::task::yield_now().await;
}
Ok(())
@ -454,6 +450,16 @@ impl ProcessRead {
self,
f: impl FnOnce(BoxRead<'static>) -> Fut,
) -> Result<Fut::Output, ProcessError>
where
Fut: Future,
{
self.with_stdout_inner(f).await
}
async fn with_stdout_inner<Fut>(
self,
f: impl FnOnce(BoxRead<'static>) -> Fut,
) -> Result<Fut::Output, ProcessError>
where
Fut: Future,
{

View file

@ -133,7 +133,7 @@ async fn process_image_command<S>(
magick::convert_image_command(state, input.format, format, quality).await?
} else {
exiftool::clear_metadata_command(state.config.media.process_timeout)?
exiftool::clear_metadata_command(state.config.media.process_timeout).await?
};
Ok((InternalFormat::Image(format), process))
@ -188,7 +188,7 @@ async fn process_animation_command<S>(
magick::convert_animation_command(state, input, format, quality).await?
} else {
exiftool::clear_metadata_command(state.config.media.process_timeout)?
exiftool::clear_metadata_command(state.config.media.process_timeout).await?
};
Ok((InternalFormat::Animation(format), process))

View file

@ -1,11 +1,6 @@
use crate::{exiftool::ExifError, process::Process};
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn clear_metadata_command(timeout: u64) -> Result<Process, ExifError> {
Ok(Process::run(
"exiftool",
&["-all=", "-", "-out", "-"],
&[],
timeout,
)?)
pub(super) async fn clear_metadata_command(timeout: u64) -> Result<Process, ExifError> {
Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout).await?)
}

View file

@ -132,7 +132,10 @@ async fn transcode_files(
output_path,
]);
Process::run("ffmpeg", &args, &[], timeout)?.wait().await?;
Process::run("ffmpeg", &args, &[], timeout)
.await?
.wait()
.await?;
Ok(())
}

View file

@ -80,7 +80,8 @@ async fn convert<S>(
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
];
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)
.await?
.add_extras(temporary_path);
Ok(process)