From c1e651c01a2b15cf5b8cc0d7efd8eebe1cc38117 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 22 Feb 2024 16:02:33 -0600 Subject: [PATCH] Use BytesStream in more places --- src/bytes_stream.rs | 91 ++++++++++++++++++++++++++++++++++++- src/concurrent_processor.rs | 14 +++--- src/details.rs | 8 +++- src/discover.rs | 10 ++-- src/discover/exiftool.rs | 7 +-- src/discover/ffmpeg.rs | 7 +-- src/discover/magick.rs | 7 +-- src/exiftool.rs | 9 ++-- src/generate.rs | 29 +++++------- src/ingest.rs | 8 ++-- src/lib.rs | 85 +++++++++++----------------------- src/migrate_store.rs | 2 +- src/process.rs | 20 ++++++-- src/range.rs | 15 ------ src/repo/migrate.rs | 3 +- src/stream.rs | 7 --- src/validate.rs | 13 +++--- src/validate/exiftool.rs | 8 +++- src/validate/ffmpeg.rs | 5 +- src/validate/magick.rs | 9 ++-- 20 files changed, 206 insertions(+), 151 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index ae07007..3594be5 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -9,6 +9,8 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use streem::IntoStreamer; +use tokio::io::AsyncRead; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -24,6 +26,21 @@ impl BytesStream { } } + pub(crate) async fn try_from_stream(stream: S) -> Result + where + S: Stream>, + { + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); + let mut bs = Self::new(); + + while let Some(bytes) = stream.try_next().await? { + bs.add_bytes(bytes); + } + + Ok(bs) + } + pub(crate) fn add_bytes(&mut self, bytes: Bytes) { self.total_len += bytes.len(); self.inner.push_back(bytes); @@ -33,7 +50,15 @@ impl BytesStream { self.total_len } - pub(crate) fn into_bytes(self) -> Bytes { + pub(crate) fn is_empty(&self) -> bool { + self.total_len > 0 + } + + fn into_bytes(mut self) -> Bytes { + if self.inner.len() == 1 { + return self.inner.pop_front().expect("Exactly one"); + } + let mut buf = BytesMut::with_capacity(self.total_len); for bytes in self.inner { @@ -42,6 +67,26 @@ impl BytesStream { buf.freeze() } + + pub(crate) fn into_reader(self) -> BytesReader { + BytesReader { + index: 0, + inner: self.inner, + } + } + + pub(crate) fn into_io_stream(self) -> IoStream { + IoStream { stream: self } + } +} + +pub(crate) struct IoStream { + stream: BytesStream, +} + +pub(crate) struct BytesReader { + index: usize, + inner: VecDeque, } impl IntoIterator for BytesStream { @@ -86,3 +131,47 @@ impl Stream for BytesStream { Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) } } + +impl Stream for IoStream { + type Item = std::io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + MessageBody::poll_next(Pin::new(&mut self.get_mut().stream), cx) + } +} + +impl AsyncRead for BytesReader { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + while buf.remaining() > 0 { + if self.index == self.inner[0].len() { + self.inner.pop_front(); + self.index = 0; + } + + if self.inner.is_empty() { + break; + } + + let upper_bound = (self.index + buf.remaining()).min(self.inner[0].len()); + + let slice = &self.inner[0][self.index..upper_bound]; + + buf.put_slice(slice); + self.index += slice.len(); + } + + Poll::Ready(Ok(())) + } +} + +impl From for BytesStream { + fn from(value: Bytes) -> Self { + let mut bs = BytesStream::new(); + bs.add_bytes(value); + bs + } +} diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 4a0f9cd..e7d0fc3 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -15,7 +15,7 @@ use std::{ }; use tracing::Span; -type OutcomeReceiver = Receiver<(Details, web::Bytes)>; +type OutcomeReceiver = Receiver<(Details, Arc)>; type ProcessMapKey = (Hash, PathBuf); @@ -36,9 +36,9 @@ impl ProcessMap { hash: Hash, path: PathBuf, fut: Fut, - ) -> Result<(Details, web::Bytes), Error> + ) -> Result<(Details, Arc), Error> where - Fut: Future>, + Fut: Future), Error>>, { let key = (hash.clone(), path.clone()); @@ -100,10 +100,10 @@ struct CancelToken { enum CancelState { Sender { - sender: Sender<(Details, web::Bytes)>, + sender: Sender<(Details, Arc)>, }, Receiver { - receiver: RecvFut<'static, (Details, web::Bytes)>, + receiver: RecvFut<'static, (Details, Arc)>, }, } @@ -124,9 +124,9 @@ pin_project_lite::pin_project! { impl Future for CancelSafeProcessor where - F: Future>, + F: Future), Error>>, { - type Output = Result<(Details, web::Bytes), Error>; + type Output = Result<(Details, Arc), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); diff --git a/src/details.rs b/src/details.rs index f95c828..d4fd6c0 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,4 +1,5 @@ use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, formats::{InternalFormat, InternalVideoFormat}, @@ -80,13 +81,16 @@ impl Details { } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn from_bytes(state: &State, input: web::Bytes) -> Result { + pub(crate) async fn from_bytes_stream( + state: &State, + input: BytesStream, + ) -> Result { let Discovery { input, width, height, frames, - } = crate::discover::discover_bytes(state, input).await?; + } = crate::discover::discover_bytes_stream(state, input).await?; Ok(Details::from_parts( input.internal_format(), diff --git a/src/discover.rs b/src/discover.rs index f394c52..a6adc7f 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -4,7 +4,7 @@ mod magick; use actix_web::web::Bytes; -use crate::{formats::InputFile, state::State}; +use crate::{bytes_stream::BytesStream, formats::InputFile, state::State}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -27,13 +27,13 @@ pub(crate) enum DiscoverError { } #[tracing::instrument(level = "trace", skip_all)] -pub(crate) async fn discover_bytes( +pub(crate) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result { - let discovery = ffmpeg::discover_bytes(state, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?; - let discovery = magick::confirm_bytes(state, discovery, bytes.clone()).await?; + let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?; let discovery = exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index d8ea421..98534d2 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -1,6 +1,7 @@ use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, formats::{ImageInput, InputFile}, process::Process, @@ -16,7 +17,7 @@ pub(super) async fn check_reorient( height, frames, }: Discovery, - bytes: Bytes, + bytes: BytesStream, timeout: u64, ) -> Result { let input = match input { @@ -40,9 +41,9 @@ pub(super) async fn check_reorient( } #[tracing::instrument(level = "trace", skip_all)] -async fn needs_reorienting(input: Bytes, timeout: u64) -> Result { +async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 1597b05..1162489 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -4,6 +4,7 @@ mod tests; use std::{collections::HashSet, sync::OnceLock}; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{ AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, @@ -158,15 +159,15 @@ struct Flags { } #[tracing::instrument(skip_all)] -pub(super) async fn discover_bytes( +pub(super) async fn discover_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result, FfMpegError> { discover_file(state, move |mut file| { let bytes = bytes.clone(); async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; Ok(file) diff --git a/src/discover/magick.rs b/src/discover/magick.rs index a280736..349af37 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -4,6 +4,7 @@ mod tests; use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, discover::DiscoverError, formats::{AnimationFormat, ImageFormat, ImageInput, InputFile}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, @@ -31,10 +32,10 @@ struct Geometry { } #[tracing::instrument(skip_all)] -pub(super) async fn confirm_bytes( +pub(super) async fn confirm_bytes_stream( state: &State, discovery: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { match discovery { Some(Discovery { @@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes( } discover_file(state, move |mut file| async move { - file.write_from_bytes(bytes) + file.write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; diff --git a/src/exiftool.rs b/src/exiftool.rs index 831ef9b..5086ce6 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,4 +1,5 @@ use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, process::{Process, ProcessError, ProcessRead}, }; @@ -39,9 +40,9 @@ impl ExifError { } #[tracing::instrument(level = "trace", skip(input))] -pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result { +pub(crate) async fn needs_reorienting(timeout: u64, input: BytesStream) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? - .bytes_read(input) + .bytes_stream_read(input) .into_string() .await?; @@ -51,9 +52,9 @@ pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result Result { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?; - Ok(process.bytes_read(input)) + Ok(process.bytes_stream_read(input)) } diff --git a/src/generate.rs b/src/generate.rs index ca151c8..9373d83 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -2,6 +2,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, concurrent_processor::ProcessMap, details::Details, error::{Error, UploadError}, @@ -57,7 +58,7 @@ pub(crate) async fn generate( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { if state.config.server.danger_dummy_mode { let identifier = state .repo @@ -65,13 +66,7 @@ pub(crate) async fn generate( .await? .ok_or(UploadError::MissingIdentifier)?; - let bytes = state - .store - .to_bytes(&identifier, None, None) - .await? - .into_bytes(); - - Ok((original_details.clone(), bytes)) + Ok((original_details.clone(), identifier)) } else { let process_fut = process( state, @@ -82,14 +77,14 @@ pub(crate) async fn generate( hash.clone(), ); - let (details, bytes) = process_map + let (details, identifier) = process_map .process(hash, thumbnail_path, process_fut) .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics(crate::init_metrics::GENERATE_PROCESS) .await .map_err(|_| UploadError::ProcessTimeout)??; - Ok((details, bytes)) + Ok((details, identifier)) } } @@ -101,7 +96,7 @@ async fn process( thumbnail_args: Vec, original_details: &Details, hash: Hash, -) -> Result<(Details, Bytes), Error> { +) -> Result<(Details, Arc), Error> { let guard = MetricsGuard::guard(); let permit = crate::process_semaphore().acquire().await?; @@ -123,7 +118,7 @@ async fn process( let stream = state.store.to_stream(&identifier, None, None).await?; - let vec = crate::magick::process_image_stream_read( + let bytes = crate::magick::process_image_stream_read( state, stream, thumbnail_args, @@ -132,19 +127,17 @@ async fn process( quality, ) .await? - .into_vec() + .into_bytes_stream() .instrument(tracing::info_span!("Reading processed image to vec")) .await?; - let bytes = Bytes::from(vec); - drop(permit); - let details = Details::from_bytes(state, bytes.clone()).await?; + let details = Details::from_bytes_stream(state, bytes.clone()).await?; let identifier = state .store - .save_bytes(bytes.clone(), details.media_type()) + .save_stream(bytes.into_io_stream(), details.media_type()) .await?; if let Err(VariantAlreadyExists) = state @@ -163,7 +156,7 @@ async fn process( guard.disarm(); - Ok((details, bytes)) as Result<(Details, Bytes), Error> + Ok((details, identifier)) as Result<(Details, Arc), Error> } #[tracing::instrument(skip_all)] diff --git a/src/ingest.rs b/src/ingest.rs index 69802ae..b3ddd16 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -30,7 +30,7 @@ pub(crate) struct Session { } #[tracing::instrument(skip(stream))] -async fn aggregate(stream: S) -> Result +async fn aggregate(stream: S) -> Result where S: Stream>, { @@ -45,7 +45,7 @@ where buf.add_bytes(res?); } - Ok(buf.into_bytes()) + Ok(buf) } async fn process_ingest( @@ -70,7 +70,7 @@ where let permit = crate::process_semaphore().acquire().await?; tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?; + let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?; let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -116,7 +116,7 @@ where .await??; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(permit); diff --git a/src/lib.rs b/src/lib.rs index 57cd0ef..44bf807 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,7 +83,7 @@ use self::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Store}, - stream::{empty, once}, + stream::empty, tls::Tls, }; @@ -141,7 +141,7 @@ async fn ensure_details_identifier( tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let new_details = Details::from_bytes(state, bytes_stream.into_bytes()).await?; + let new_details = Details::from_bytes_stream(state, bytes_stream).await?; tracing::debug!("storing details for {:?}", identifier); state.repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); @@ -841,67 +841,36 @@ async fn process( .variant_identifier(hash.clone(), path_string) .await?; - if let Some(identifier) = identifier_opt { + let (details, identifier) = if let Some(identifier) = identifier_opt { let details = ensure_details_identifier(&state, &identifier).await?; - if let Some(public_url) = state.store.public_url(&identifier) { - return Ok(HttpResponse::SeeOther() - .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) - .finish()); - } - - return ranged_file_resp(&state.store, identifier, range, details, not_found).await; - } - - if state.config.server.read_only { - return Err(UploadError::ReadOnly.into()); - } - - let original_details = ensure_details(&state, &alias).await?; - - let (details, bytes) = generate::generate( - &state, - &process_map, - format, - thumbnail_path, - thumbnail_args, - &original_details, - hash, - ) - .await?; - - let (builder, stream) = if let Some(web::Header(range_header)) = range { - if let Some(range) = range::single_bytes_range(&range_header) { - let len = bytes.len() as u64; - - if let Some(content_range) = range::to_content_range(range, len) { - let mut builder = HttpResponse::PartialContent(); - builder.insert_header(content_range); - let stream = range::chop_bytes(range, bytes, len)?; - - (builder, Either::left(Either::left(stream))) - } else { - ( - HttpResponse::RangeNotSatisfiable(), - Either::left(Either::right(empty())), - ) - } - } else { - return Err(UploadError::Range.into()); - } - } else if not_found { - (HttpResponse::NotFound(), Either::right(once(Ok(bytes)))) + (details, identifier) } else { - (HttpResponse::Ok(), Either::right(once(Ok(bytes)))) + if state.config.server.read_only { + return Err(UploadError::ReadOnly.into()); + } + + let original_details = ensure_details(&state, &alias).await?; + + generate::generate( + &state, + &process_map, + format, + thumbnail_path, + thumbnail_args, + &original_details, + hash, + ) + .await? }; - Ok(srv_response( - builder, - stream, - details.media_type(), - 7 * DAYS, - details.system_time(), - )) + if let Some(public_url) = state.store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + + ranged_file_resp(&state.store, identifier, range, details, not_found).await } #[tracing::instrument(name = "Serving processed image headers", skip(state))] diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 0578bbe..9423621 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -396,7 +396,7 @@ where .await .map_err(From::from) .map_err(MigrateError::Details)?; - let new_details = Details::from_bytes(to, bytes_stream.into_bytes()) + let new_details = Details::from_bytes_stream(to, bytes_stream) .await .map_err(MigrateError::Details)?; to.repo diff --git a/src/process.rs b/src/process.rs index 916e1ed..89239bc 100644 --- a/src/process.rs +++ b/src/process.rs @@ -6,14 +6,17 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use streem::IntoStreamer; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; +use tokio_util::io::ReaderStream; use tracing::Instrument; use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, error_code::ErrorCode, future::{LocalBoxFuture, WithTimeout}, read::BoxRead, @@ -232,12 +235,11 @@ impl Process { } } - pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { + pub(crate) fn bytes_stream_read(self, input: BytesStream) -> ProcessRead { self.spawn_fn(move |mut stdin| { - let mut input = input; async move { - match stdin.write_all_buf(&mut input).await { - Ok(()) => Ok(()), + match tokio::io::copy(&mut input.into_reader(), &mut stdin).await { + Ok(_) => Ok(()), // BrokenPipe means we finished reading from Stdout, so we don't need to write // to stdin. We'll still error out if the command failed so treat this as a // success @@ -317,6 +319,16 @@ impl ProcessRead { } } + pub(crate) async fn into_bytes_stream(self) -> Result { + let cmd = self.command.clone(); + + self.with_stdout(move |stdout| { + BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16)) + }) + .await? + .map_err(move |e| ProcessError::Read(cmd, e)) + } + pub(crate) async fn into_vec(self) -> Result, ProcessError> { let cmd = self.command.clone(); diff --git a/src/range.rs b/src/range.rs index 7a2694a..f992f38 100644 --- a/src/range.rs +++ b/src/range.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use crate::{ error::{Error, UploadError}, store::Store, - stream::once, }; use actix_web::{ http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range}, @@ -11,20 +10,6 @@ use actix_web::{ }; use futures_core::Stream; -pub(crate) fn chop_bytes( - byte_range: &ByteRangeSpec, - bytes: Bytes, - length: u64, -) -> Result>, Error> { - if let Some((start, end)) = byte_range.to_satisfiable_range(length) { - // END IS INCLUSIVE - let end = end as usize + 1; - return Ok(once(Ok(bytes.slice(start as usize..end)))); - } - - Err(UploadError::Range.into()) -} - pub(crate) async fn chop_store( byte_range: &ByteRangeSpec, store: &S, diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 9cc803f..0a67d4d 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -387,10 +387,9 @@ async fn fetch_or_generate_details( Ok(details) } else { let bytes_stream = state.store.to_bytes(identifier, None, None).await?; - let bytes = bytes_stream.into_bytes(); let guard = details_semaphore().acquire().await?; - let details = Details::from_bytes(state, bytes).await?; + let details = Details::from_bytes_stream(state, bytes_stream).await?; drop(guard); Ok(details) diff --git a/src/stream.rs b/src/stream.rs index cae35da..66a502b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -183,13 +183,6 @@ where streem::from_fn(|_| std::future::ready(())) } -pub(crate) fn once(value: T) -> impl Stream -where - T: 'static, -{ - streem::from_fn(|yielder| yielder.yield_(value)) -} - pub(crate) fn timeout( duration: Duration, stream: S, diff --git a/src/validate.rs b/src/validate.rs index ee857bd..751877e 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -3,6 +3,7 @@ mod ffmpeg; mod magick; use crate::{ + bytes_stream::BytesStream, discover::Discovery, error::Error, error_code::ErrorCode, @@ -56,9 +57,9 @@ impl ValidationError { const MEGABYTES: usize = 1024 * 1024; #[tracing::instrument(skip_all)] -pub(crate) async fn validate_bytes( +pub(crate) async fn validate_bytes_stream( state: &State, - bytes: Bytes, + bytes: BytesStream, ) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); @@ -69,7 +70,7 @@ pub(crate) async fn validate_bytes( width, height, frames, - } = crate::discover::discover_bytes(state, bytes.clone()).await?; + } = crate::discover::discover_bytes_stream(state, bytes.clone()).await?; match &input { InputFile::Image(input) => { @@ -95,7 +96,7 @@ pub(crate) async fn validate_bytes( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_image( state: &State, - bytes: Bytes, + bytes: BytesStream, input: ImageInput, width: u16, height: u16, @@ -160,7 +161,7 @@ fn validate_animation( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_animation( state: &State, - bytes: Bytes, + bytes: BytesStream, input: AnimationFormat, width: u16, height: u16, @@ -218,7 +219,7 @@ fn validate_video( #[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_video( state: &State, - bytes: Bytes, + bytes: BytesStream, input: InputVideoFormat, width: u16, height: u16, diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index b97d5f0..fe757ae 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,14 +1,18 @@ use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, exiftool::ExifError, process::{Process, ProcessRead}, }; #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn clear_metadata_bytes_read( - input: Bytes, + input: BytesStream, timeout: u64, ) -> Result { - Ok(Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)?.bytes_read(input)) + Ok( + Process::run("exiftool", &["-all=", "-", "-out", "-"], &[], timeout)? + .bytes_stream_read(input), + ) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 1ab7758..a5039c2 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -4,6 +4,7 @@ use actix_web::web::Bytes; use uuid::Uuid; use crate::{ + bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, process::{Process, ProcessRead}, @@ -16,7 +17,7 @@ pub(super) async fn transcode_bytes( output_format: OutputVideo, crf: u8, timeout: u64, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let input_file = tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) @@ -27,7 +28,7 @@ pub(super) async fn transcode_bytes( .await .map_err(FfMpegError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(FfMpegError::Write)?; tmp_one.close().await.map_err(FfMpegError::CloseFile)?; diff --git a/src/validate/magick.rs b/src/validate/magick.rs index ac8b720..e90b2b3 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -3,6 +3,7 @@ use std::ffi::OsStr; use actix_web::web::Bytes; use crate::{ + bytes_stream::BytesStream, formats::{AnimationFormat, ImageFormat}, magick::{MagickError, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, @@ -14,7 +15,7 @@ pub(super) async fn convert_image( input: ImageFormat, output: ImageFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -32,7 +33,7 @@ pub(super) async fn convert_animation( input: AnimationFormat, output: AnimationFormat, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { convert( state, @@ -51,7 +52,7 @@ async fn convert( output: &'static str, coalesce: bool, quality: Option, - bytes: Bytes, + bytes: BytesStream, ) -> Result { let temporary_path = state .tmp_dir @@ -69,7 +70,7 @@ async fn convert( .await .map_err(MagickError::CreateFile)?; tmp_one - .write_from_bytes(bytes) + .write_from_stream(bytes.into_io_stream()) .await .map_err(MagickError::Write)?; tmp_one.close().await.map_err(MagickError::CloseFile)?;