mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2025-01-19 16:05:30 +00:00
More piping
This commit is contained in:
parent
2074334131
commit
13fc0df31a
6 changed files with 118 additions and 178 deletions
|
@ -14,7 +14,6 @@ use crate::{
|
|||
state::State,
|
||||
};
|
||||
|
||||
|
||||
use super::Discovery;
|
||||
|
||||
const MP4: &str = "mp4";
|
||||
|
@ -158,24 +157,6 @@ struct Flags {
|
|||
alpha: usize,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn discover_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
bytes: BytesStream,
|
||||
) -> Result<Option<Discovery>, FfMpegError> {
|
||||
discover_file(state, move |mut file| {
|
||||
let bytes = bytes.clone();
|
||||
|
||||
async move {
|
||||
file.write_from_stream(bytes.into_io_stream())
|
||||
.await
|
||||
.map_err(FfMpegError::Write)?;
|
||||
Ok(file)
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn allows_alpha(pixel_format: &str, timeout: u64) -> Result<bool, FfMpegError> {
|
||||
static ALPHA_PIXEL_FORMATS: OnceLock<HashSet<String>> = OnceLock::new();
|
||||
|
||||
|
@ -191,45 +172,31 @@ async fn allows_alpha(pixel_format: &str, timeout: u64) -> Result<bool, FfMpegEr
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn discover_file<S, F, Fut>(state: &State<S>, f: F) -> Result<Option<Discovery>, FfMpegError>
|
||||
where
|
||||
F: FnOnce(crate::file::File) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
|
||||
{
|
||||
let input_file = state.tmp_dir.tmp_file(None);
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
.await
|
||||
.map_err(FfMpegError::CreateDir)?;
|
||||
|
||||
let tmp_one = crate::file::File::create(&input_file)
|
||||
.await
|
||||
.map_err(FfMpegError::CreateFile)?;
|
||||
let tmp_one = (f)(tmp_one).await?;
|
||||
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
|
||||
|
||||
pub(super) async fn discover_bytes_stream<S>(
|
||||
state: &State<S>,
|
||||
bytes: BytesStream,
|
||||
) -> Result<Option<Discovery>, FfMpegError> {
|
||||
let res = Process::run(
|
||||
"ffprobe",
|
||||
&[
|
||||
"-v".as_ref(),
|
||||
"quiet".as_ref(),
|
||||
"-count_frames".as_ref(),
|
||||
"-show_entries".as_ref(),
|
||||
"stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name".as_ref(),
|
||||
"-of".as_ref(),
|
||||
"default=noprint_wrappers=1:nokey=1".as_ref(),
|
||||
"-print_format".as_ref(),
|
||||
"json".as_ref(),
|
||||
input_file.as_os_str(),
|
||||
"-v",
|
||||
"quiet",
|
||||
"-count_frames",
|
||||
"-show_entries",
|
||||
"stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name",
|
||||
"-of",
|
||||
"default=noprint_wrappers=1:nokey=1",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-",
|
||||
],
|
||||
&[],
|
||||
state.config.media.process_timeout,
|
||||
)?
|
||||
.read()
|
||||
.drive_with_async_read(bytes.into_reader())
|
||||
.into_vec()
|
||||
.await;
|
||||
|
||||
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
|
||||
|
||||
let output = res?;
|
||||
|
||||
let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?;
|
||||
|
|
|
@ -117,20 +117,15 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let bytes = crate::magick::process_image_stream_read(
|
||||
state,
|
||||
stream,
|
||||
thumbnail_args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
)
|
||||
.await?
|
||||
.into_bytes_stream()
|
||||
.instrument(tracing::info_span!(
|
||||
"Reading processed image to BytesStream"
|
||||
))
|
||||
.await?;
|
||||
let bytes =
|
||||
crate::magick::process_image_command(state, thumbnail_args, input_format, format, quality)
|
||||
.await?
|
||||
.drive_with_stream(stream)
|
||||
.into_bytes_stream()
|
||||
.instrument(tracing::info_span!(
|
||||
"Reading processed image to BytesStream"
|
||||
))
|
||||
.await?;
|
||||
|
||||
drop(permit);
|
||||
|
||||
|
|
|
@ -69,15 +69,11 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
crate::magick::process_image_process_read(
|
||||
state,
|
||||
process_read,
|
||||
magick_args,
|
||||
format,
|
||||
format,
|
||||
quality,
|
||||
)
|
||||
.await?
|
||||
let process =
|
||||
crate::magick::process_image_command(state, magick_args, format, format, quality)
|
||||
.await?;
|
||||
|
||||
process_read.pipe(process)
|
||||
} else {
|
||||
process_read
|
||||
}
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
use std::{ffi::OsStr, ops::Deref, path::Path, sync::Arc};
|
||||
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
config::Media,
|
||||
error_code::ErrorCode,
|
||||
formats::ProcessableFormat,
|
||||
process::{Process, ProcessError, ProcessRead},
|
||||
process::{Process, ProcessError},
|
||||
state::State,
|
||||
stream::LocalBoxStream,
|
||||
tmp_file::{TmpDir, TmpFolder},
|
||||
};
|
||||
|
||||
|
@ -86,40 +83,20 @@ impl MagickError {
|
|||
}
|
||||
}
|
||||
|
||||
async fn process_image<S, F, Fut>(
|
||||
pub(crate) async fn process_image_command<S>(
|
||||
state: &State<S>,
|
||||
process_args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
write_file: F,
|
||||
) -> Result<ProcessRead, MagickError>
|
||||
where
|
||||
F: FnOnce(crate::file::File) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||
{
|
||||
) -> Result<Process, MagickError> {
|
||||
let temporary_path = state
|
||||
.tmp_dir
|
||||
.tmp_folder()
|
||||
.await
|
||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||
|
||||
let input_file = state.tmp_dir.tmp_file(None);
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
.await
|
||||
.map_err(MagickError::CreateDir)?;
|
||||
|
||||
let tmp_one = crate::file::File::create(&input_file)
|
||||
.await
|
||||
.map_err(MagickError::CreateFile)?;
|
||||
let tmp_one = (write_file)(tmp_one).await?;
|
||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
||||
|
||||
let input_arg = [
|
||||
input_format.magick_format().as_ref(),
|
||||
input_file.as_os_str(),
|
||||
]
|
||||
.join(":".as_ref());
|
||||
let input_arg = format!("{}:-", input_format.magick_format());
|
||||
let output_arg = format!("{}:-", format.magick_format());
|
||||
let quality = quality.map(|q| q.to_string());
|
||||
|
||||
|
@ -130,7 +107,7 @@ where
|
|||
|
||||
let mut args: Vec<&OsStr> = Vec::with_capacity(len);
|
||||
args.push("convert".as_ref());
|
||||
args.push(&input_arg);
|
||||
args.push(input_arg.as_ref());
|
||||
if input_format.coalesce() {
|
||||
args.push("-coalesce".as_ref());
|
||||
}
|
||||
|
@ -145,67 +122,10 @@ where
|
|||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
||||
.read()
|
||||
.add_extras(input_file)
|
||||
let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
||||
.add_extras(temporary_path);
|
||||
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
pub(crate) async fn process_image_stream_read<S>(
|
||||
state: &State<S>,
|
||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
state,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
|mut tmp_file| async move {
|
||||
tmp_file
|
||||
.write_from_stream(stream)
|
||||
.await
|
||||
.map_err(MagickError::Write)?;
|
||||
Ok(tmp_file)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn process_image_process_read<S>(
|
||||
state: &State<S>,
|
||||
process_read: ProcessRead,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
state,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
|mut tmp_file| async move {
|
||||
process_read
|
||||
.with_stdout(|stdout| async {
|
||||
tmp_file
|
||||
.write_from_async_read(stdout)
|
||||
.await
|
||||
.map_err(MagickError::Write)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(tmp_file)
|
||||
},
|
||||
)
|
||||
.await
|
||||
Ok(process)
|
||||
}
|
||||
|
||||
pub(crate) type ArcPolicyDir = Arc<PolicyDir>;
|
||||
|
|
|
@ -398,6 +398,58 @@ impl ProcessRead {
|
|||
.await?
|
||||
}
|
||||
|
||||
pub(crate) fn pipe(self, process: Process) -> ProcessRead {
|
||||
let Process {
|
||||
command,
|
||||
mut child,
|
||||
guard,
|
||||
timeout,
|
||||
extras,
|
||||
id,
|
||||
} = process;
|
||||
|
||||
let mut stdin = child.stdin.take().expect("stdin exists");
|
||||
let stdout = child.stdout.take().expect("stdout exists");
|
||||
|
||||
let command2 = command.clone();
|
||||
let handle = Box::pin(async move {
|
||||
self.with_stdout(move |mut stdout| async move {
|
||||
let child_fut = async {
|
||||
let n = tokio::io::copy(&mut stdout, &mut stdin).await?;
|
||||
drop(stdout);
|
||||
drop(stdin);
|
||||
|
||||
child.wait().await
|
||||
};
|
||||
|
||||
match child_fut.with_timeout(timeout).await {
|
||||
Ok(Ok(status)) if status.success() => {
|
||||
guard.disarm();
|
||||
Ok(())
|
||||
}
|
||||
Ok(Ok(status)) => Err(ProcessError::Status(command2, status)),
|
||||
Ok(Err(e)) => Err(ProcessError::Other(command2, e)),
|
||||
Err(_) => {
|
||||
child
|
||||
.kill()
|
||||
.await
|
||||
.map_err(|e| ProcessError::Other(command2.clone(), e))?;
|
||||
Err(ProcessError::Timeout(command2))
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?
|
||||
});
|
||||
|
||||
ProcessRead {
|
||||
reader: Box::pin(stdout),
|
||||
handle,
|
||||
command,
|
||||
id,
|
||||
extras,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn with_stdout<Fut>(
|
||||
self,
|
||||
f: impl FnOnce(BoxRead<'static>) -> Fut,
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use std::{ffi::OsStr, sync::Arc};
|
||||
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
|
@ -35,27 +34,38 @@ pub(super) async fn transcode_bytes(
|
|||
|
||||
let output_file = tmp_dir.tmp_file(None);
|
||||
|
||||
let res = transcode_files(
|
||||
input_file.as_os_str(),
|
||||
input_format,
|
||||
output_file.as_os_str(),
|
||||
output_format,
|
||||
crf,
|
||||
timeout,
|
||||
)
|
||||
let res = async {
|
||||
let res = transcode_files(
|
||||
input_file.as_os_str(),
|
||||
input_format,
|
||||
output_file.as_os_str(),
|
||||
output_format,
|
||||
crf,
|
||||
timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
|
||||
res?;
|
||||
|
||||
let tmp_two = crate::file::File::open(&output_file)
|
||||
.await
|
||||
.map_err(FfMpegError::OpenFile)?;
|
||||
let stream = tmp_two
|
||||
.read_to_stream(None, None)
|
||||
.await
|
||||
.map_err(FfMpegError::ReadFile)?;
|
||||
Ok(tokio_util::io::StreamReader::new(stream))
|
||||
}
|
||||
.await;
|
||||
|
||||
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
|
||||
res?;
|
||||
|
||||
let tmp_two = crate::file::File::open(&output_file)
|
||||
.await
|
||||
.map_err(FfMpegError::OpenFile)?;
|
||||
let stream = tmp_two
|
||||
.read_to_stream(None, None)
|
||||
.await
|
||||
.map_err(FfMpegError::ReadFile)?;
|
||||
let reader = tokio_util::io::StreamReader::new(stream);
|
||||
let reader = match res {
|
||||
Ok(reader) => reader,
|
||||
Err(e) => {
|
||||
output_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let process_read = ProcessRead::new(
|
||||
Box::pin(reader),
|
||||
|
|
Loading…
Reference in a new issue