Switch more commands to be driven via STDIN

This commit is contained in:
asonix 2024-02-24 14:02:41 -06:00
parent eabd7ea228
commit 2074334131
6 changed files with 108 additions and 113 deletions

View file

@ -1,15 +1,13 @@
use std::ffi::{OsStr, OsString};
use futures_core::Stream;
use tokio::io::AsyncReadExt;
use tokio_util::bytes::Bytes;
use crate::{
details::Details,
error::{Error, UploadError},
formats::ProcessableFormat,
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
process::{Process, ProcessRead},
process::Process,
repo::Alias,
state::State,
store::Store,
@ -43,47 +41,44 @@ where
let stream = state.store.to_stream(&identifier, None, None).await?;
let process = read_rgba(
let blurhash = read_rgba_command(
state,
input_details
.internal_format()
.processable_format()
.expect("not a video"),
stream,
)
.await?;
.await?
.drive_with_stream(stream)
.with_stdout(|mut stdout| async move {
let mut encoder = blurhash_update::Encoder::auto(blurhash_update::ImageBounds {
width: input_details.width() as _,
height: input_details.height() as _,
});
let blurhash = process
.with_stdout(|mut stdout| async move {
let mut encoder = blurhash_update::Encoder::auto(blurhash_update::ImageBounds {
width: input_details.width() as _,
height: input_details.height() as _,
});
let mut buf = [0u8; 1024 * 8];
let mut buf = [0u8; 1024 * 8];
loop {
let n = stdout.read(&mut buf).await?;
loop {
let n = stdout.read(&mut buf).await?;
if n == 0 {
break;
}
encoder.update(&buf[..n]);
if n == 0 {
break;
}
Ok(encoder.finalize()) as std::io::Result<String>
})
.await??;
encoder.update(&buf[..n]);
}
Ok(encoder.finalize()) as std::io::Result<String>
})
.await??;
Ok(blurhash)
}
async fn read_rgba<S>(
async fn read_rgba_command<S>(
state: &State<S>,
input_format: ProcessableFormat,
stream: impl Stream<Item = std::io::Result<Bytes>> + 'static,
) -> Result<ProcessRead, MagickError> {
) -> Result<Process, MagickError> {
let temporary_path = state
.tmp_dir
.tmp_folder()
@ -104,7 +99,6 @@ async fn read_rgba<S>(
];
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
.stream_read(stream)
.add_extras(temporary_path);
Ok(process)

View file

@ -41,7 +41,7 @@ pub(super) async fn check_reorient(
#[tracing::instrument(level = "trace", skip_all)]
async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> {
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
.bytes_stream_read(input)
.drive_with_async_read(input.into_reader())
.into_string()
.await?;

View file

@ -9,7 +9,7 @@ use std::{
use futures_core::Stream;
use streem::IntoStreamer;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
process::{Child, ChildStdin, Command},
};
use tokio_util::{bytes::Bytes, io::ReaderStream};
@ -67,6 +67,7 @@ pub(crate) struct Process {
child: Child,
guard: MetricsGuard,
timeout: Duration,
extras: Box<dyn Extras>,
id: Uuid,
}
@ -204,11 +205,19 @@ impl Process {
command,
guard,
timeout: Duration::from_secs(timeout),
extras: Box::new(()),
id: Uuid::now_v7(),
})
})
}
pub(crate) fn add_extras(self, extra: impl Extras + 'static) -> Self {
Self {
extras: Box::new((self.extras, extra)),
..self
}
}
#[tracing::instrument(skip(self), fields(command = %self.command, id = %self.id))]
pub(crate) async fn wait(self) -> Result<(), ProcessError> {
let Process {
@ -216,11 +225,17 @@ impl Process {
mut child,
guard,
timeout,
mut extras,
id: _,
} = self;
let res = child.wait().with_timeout(timeout).await;
extras
.consume()
.await
.map_err(|e| ProcessError::Cleanup(command.clone(), e))?;
match res {
Ok(Ok(status)) if status.success() => {
guard.disarm();
@ -236,10 +251,12 @@ impl Process {
}
}
pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead {
self.spawn_fn(move |mut stdin| {
pub(crate) fn drive_with_async_read(self, input: impl AsyncRead + 'static) -> ProcessRead {
self.drive(move |mut stdin| {
async move {
match tokio::io::copy(&mut input.into_reader(), &mut stdin).await {
let mut input = std::pin::pin!(input);
match tokio::io::copy(&mut input, &mut stdin).await {
Ok(_) => Ok(()),
// BrokenPipe means we finished reading from Stdout, so we don't need to write
// to stdin. We'll still error out if the command failed so treat this as a
@ -251,11 +268,11 @@ impl Process {
})
}
pub(crate) fn stream_read<S>(self, input: S) -> ProcessRead
pub(crate) fn drive_with_stream<S>(self, input: S) -> ProcessRead
where
S: Stream<Item = std::io::Result<Bytes>> + 'static,
{
self.spawn_fn(move |mut stdin| async move {
self.drive(move |mut stdin| async move {
let stream = std::pin::pin!(input);
let mut stream = stream.into_streamer();
@ -272,13 +289,13 @@ impl Process {
}
pub(crate) fn read(self) -> ProcessRead {
self.spawn_fn(|_| async { Ok(()) })
self.drive(|_| async { Ok(()) })
}
#[allow(unknown_lints)]
#[allow(clippy::let_with_type_underscore)]
#[tracing::instrument(level = "trace", skip_all)]
fn spawn_fn<F, Fut>(self, f: F) -> ProcessRead
fn drive<F, Fut>(self, f: F) -> ProcessRead
where
F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>,
@ -288,6 +305,7 @@ impl Process {
mut child,
guard,
timeout,
extras,
id,
} = self;
@ -324,7 +342,7 @@ impl Process {
handle,
command,
id,
extras: Box::new(()),
extras,
}
}
}

View file

@ -11,11 +11,10 @@ use crate::{
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
InternalFormat,
},
process::ProcessRead,
process::{Process, ProcessRead},
state::State,
};
#[derive(Debug, thiserror::Error)]
pub(crate) enum ValidationError {
#[error("Too wide")]
@ -74,15 +73,23 @@ pub(crate) async fn validate_bytes_stream<S>(
match &input {
InputFile::Image(input) => {
let (format, process_read) = process_image(state, bytes, *input, width, height).await?;
let (format, process) =
process_image_command(state, *input, bytes.len(), width, height).await?;
Ok((format, process_read))
Ok((format, process.drive_with_async_read(bytes.into_reader())))
}
InputFile::Animation(input) => {
let (format, process_read) =
process_animation(state, bytes, *input, width, height, frames.unwrap_or(1)).await?;
let (format, process) = process_animation_command(
state,
*input,
bytes.len(),
width,
height,
frames.unwrap_or(1),
)
.await?;
Ok((format, process_read))
Ok((format, process.drive_with_async_read(bytes.into_reader())))
}
InputFile::Video(input) => {
let (format, process_read) =
@ -93,14 +100,14 @@ pub(crate) async fn validate_bytes_stream<S>(
}
}
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
async fn process_image<S>(
#[tracing::instrument(skip(state))]
async fn process_image_command<S>(
state: &State<S>,
bytes: BytesStream,
input: ImageInput,
length: usize,
width: u16,
height: u16,
) -> Result<(InternalFormat, ProcessRead), Error> {
) -> Result<(InternalFormat, Process), Error> {
let validations = &state.config.media.image;
if width > validations.max_width {
@ -112,7 +119,7 @@ async fn process_image<S>(
if u32::from(width) * u32::from(height) > validations.max_area {
return Err(ValidationError::Area.into());
}
if bytes.len() > validations.max_file_size * MEGABYTES {
if length > validations.max_file_size * MEGABYTES {
return Err(ValidationError::Filesize.into());
}
@ -121,15 +128,15 @@ async fn process_image<S>(
needs_transcode,
} = input.build_output(validations.format);
let process_read = if needs_transcode {
let process = if needs_transcode {
let quality = validations.quality_for(format);
magick::convert_image(state, input.format, format, quality, bytes).await?
magick::convert_image_command(state, input.format, format, quality).await?
} else {
exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)?
exiftool::clear_metadata_command(state.config.media.process_timeout)?
};
Ok((InternalFormat::Image(format), process_read))
Ok((InternalFormat::Image(format), process))
}
fn validate_animation(
@ -158,33 +165,33 @@ fn validate_animation(
Ok(())
}
#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))]
async fn process_animation<S>(
#[tracing::instrument(skip(state))]
async fn process_animation_command<S>(
state: &State<S>,
bytes: BytesStream,
input: AnimationFormat,
length: usize,
width: u16,
height: u16,
frames: u32,
) -> Result<(InternalFormat, ProcessRead), Error> {
) -> Result<(InternalFormat, Process), Error> {
let validations = &state.config.media.animation;
validate_animation(bytes.len(), width, height, frames, validations)?;
validate_animation(length, width, height, frames, validations)?;
let AnimationOutput {
format,
needs_transcode,
} = input.build_output(validations.format);
let process_read = if needs_transcode {
let process = if needs_transcode {
let quality = validations.quality_for(format);
magick::convert_animation(state, input, format, quality, bytes).await?
magick::convert_animation_command(state, input, format, quality).await?
} else {
exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)?
exiftool::clear_metadata_command(state.config.media.process_timeout)?
};
Ok((InternalFormat::Animation(format), process_read))
Ok((InternalFormat::Animation(format), process))
}
fn validate_video(

View file

@ -1,16 +1,11 @@
use crate::{
bytes_stream::BytesStream,
exiftool::ExifError,
process::{Process, ProcessRead},
};
use crate::{exiftool::ExifError, process::Process};
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn clear_metadata_bytes_read(
input: BytesStream,
timeout: u64,
) -> Result<ProcessRead, ExifError> {
Ok(
Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?
.bytes_stream_read(input),
)
pub(super) fn clear_metadata_command(timeout: u64) -> Result<Process, ExifError> {
Ok(Process::run(
"exiftool",
&["-all=", "-", "-out", "-"],
&[],
timeout,
)?)
}

View file

@ -1,82 +1,60 @@
use std::ffi::OsStr;
use crate::{
bytes_stream::BytesStream,
formats::{AnimationFormat, ImageFormat},
magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
process::{Process, ProcessRead},
process::Process,
state::State,
};
pub(super) async fn convert_image<S>(
pub(super) async fn convert_image_command<S>(
state: &State<S>,
input: ImageFormat,
output: ImageFormat,
quality: Option<u8>,
bytes: BytesStream,
) -> Result<ProcessRead, MagickError> {
) -> Result<Process, MagickError> {
convert(
state,
input.magick_format(),
output.magick_format(),
false,
quality,
bytes,
)
.await
}
pub(super) async fn convert_animation<S>(
pub(super) async fn convert_animation_command<S>(
state: &State<S>,
input: AnimationFormat,
output: AnimationFormat,
quality: Option<u8>,
bytes: BytesStream,
) -> Result<ProcessRead, MagickError> {
) -> Result<Process, MagickError> {
convert(
state,
input.magick_format(),
output.magick_format(),
true,
quality,
bytes,
)
.await
}
async fn convert<S>(
state: &State<S>,
input: &'static str,
output: &'static str,
input_format: &'static str,
output_format: &'static str,
coalesce: bool,
quality: Option<u8>,
bytes: BytesStream,
) -> Result<ProcessRead, MagickError> {
) -> Result<Process, MagickError> {
let temporary_path = state
.tmp_dir
.tmp_folder()
.await
.map_err(MagickError::CreateTemporaryDirectory)?;
let input_file = state.tmp_dir.tmp_file(None);
let input_arg = format!("{input_format}:-");
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(MagickError::CreateDir)?;
let mut tmp_one = crate::file::File::create(&input_file)
.await
.map_err(MagickError::CreateFile)?;
tmp_one
.write_from_stream(bytes.into_io_stream())
.await
.map_err(MagickError::Write)?;
tmp_one.close().await.map_err(MagickError::CloseFile)?;
let input_arg = [input.as_ref(), input_file.as_os_str()].join(":".as_ref());
let output_arg = format!("{output}:-");
let output_arg = format!("{output_format}:-");
let quality = quality.map(|q| q.to_string());
let mut args: Vec<&OsStr> = vec!["convert".as_ref()];
@ -85,7 +63,11 @@ async fn convert<S>(
args.push("-coalesce".as_ref());
}
args.extend(["-strip".as_ref(), "-auto-orient".as_ref(), &input_arg] as [&OsStr; 3]);
args.extend([
"-strip".as_ref(),
"-auto-orient".as_ref(),
input_arg.as_ref(),
] as [&OsStr; 3]);
if let Some(quality) = &quality {
args.extend(["-quality".as_ref(), quality.as_ref()] as [&OsStr; 2]);
@ -98,9 +80,8 @@ async fn convert<S>(
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
];
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?.read();
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
.add_extras(temporary_path);
let clean_reader = reader.add_extras(input_file).add_extras(temporary_path);
Ok(clean_reader)
Ok(process)
}