From 4976fcb2eb723a76acbd75dd8a7012b4a760e54a Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 9 Mar 2024 22:53:46 -0600 Subject: [PATCH] Remove unneeded code --- src/bytes_stream.rs | 85 ++++------------------------- src/discover.rs | 17 +++--- src/discover/exiftool.rs | 2 +- src/discover/ffmpeg.rs | 5 +- src/discover/magick.rs | 2 +- src/file.rs | 109 +++++++++----------------------------- src/generate.rs | 6 ++- src/ingest.rs | 19 ++++--- src/process.rs | 21 +------- src/queue.rs | 38 +++++++++---- src/repo/sled.rs | 4 +- src/store.rs | 46 ---------------- src/store/file_store.rs | 50 +++++++---------- src/store/object_store.rs | 23 +------- src/stream.rs | 12 +---- src/sync.rs | 9 ++++ src/validate.rs | 10 ++-- src/validate/ffmpeg.rs | 5 +- 18 files changed, 142 insertions(+), 321 deletions(-) diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index df1ba8c..fdb534c 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -1,14 +1,7 @@ use actix_web::web::Bytes; use futures_core::Stream; -use std::{ - collections::{vec_deque::IntoIter, VecDeque}, - convert::Infallible, - pin::Pin, - task::{Context, Poll}, -}; +use std::collections::{vec_deque::IntoIter, VecDeque}; use streem::IntoStreamer; -use tokio::io::AsyncRead; -use tokio_util::bytes::Buf; #[derive(Clone, Debug)] pub(crate) struct BytesStream { @@ -36,6 +29,7 @@ impl BytesStream { while let Some(bytes) = stream.try_next().await? { tracing::trace!("try_from_stream: looping"); bs.add_bytes(bytes); + crate::sync::cooperate().await; } tracing::debug!( @@ -64,21 +58,14 @@ impl BytesStream { self.total_len == 0 } - pub(crate) fn into_reader(self) -> BytesReader { - BytesReader { inner: self.inner } + pub(crate) fn into_io_stream(self) -> impl Stream> { + streem::from_fn(move |yielder| async move { + for bytes in self { + crate::sync::cooperate().await; + yielder.yield_ok(bytes).await; + } + }) } - - pub(crate) fn into_io_stream(self) -> IoStream { - IoStream { inner: self.inner } - } -} - -pub(crate) struct IoStream { - inner: VecDeque, -} - -pub(crate) struct BytesReader { - inner: VecDeque, } impl IntoIterator for BytesStream { @@ -89,57 +76,3 @@ impl IntoIterator for BytesStream { self.inner.into_iter() } } - -impl Stream for BytesStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) - } - - fn size_hint(&self) -> (usize, Option) { - (self.inner.len(), Some(self.inner.len())) - } -} - -impl Stream for IoStream { - type Item = std::io::Result; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) - } - - fn size_hint(&self) -> (usize, Option) { - (self.inner.len(), Some(self.inner.len())) - } -} - -impl AsyncRead for BytesReader { - fn poll_read( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - while buf.remaining() > 0 { - tracing::trace!("bytes reader: looping"); - - if let Some(bytes) = self.inner.front_mut() { - if bytes.is_empty() { - self.inner.pop_front(); - continue; - } - - let upper_bound = buf.remaining().min(bytes.len()); - - let slice = &bytes[..upper_bound]; - - buf.put_slice(slice); - bytes.advance(upper_bound); - } else { - break; - } - } - - Poll::Ready(Ok(())) - } -} diff --git a/src/discover.rs b/src/discover.rs index 362a4cf..af35251 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -2,9 +2,7 @@ mod exiftool; mod ffmpeg; mod magick; - - -use crate::{bytes_stream::BytesStream, formats::InputFile, state::State}; +use crate::{bytes_stream::BytesStream, formats::InputFile, future::WithPollTimer, state::State}; #[derive(Debug, PartialEq, Eq)] pub(crate) struct Discovery { @@ -31,12 +29,17 @@ pub(crate) async fn discover_bytes_stream( state: &State, bytes: BytesStream, ) -> Result { - let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?; + let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()) + .with_poll_timer("discover-ffmpeg") + .await?; - let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?; + let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()) + .with_poll_timer("confirm-imagemagick") + .await?; - let discovery = - exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; + let discovery = exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout) + .with_poll_timer("reorient-exiftool") + .await?; Ok(discovery) } diff --git a/src/discover/exiftool.rs b/src/discover/exiftool.rs index 06ee1da..aa80a6e 100644 --- a/src/discover/exiftool.rs +++ b/src/discover/exiftool.rs @@ -42,7 +42,7 @@ pub(super) async fn check_reorient( async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result { let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout) .await? - .drive_with_async_read(input.into_reader()) + .drive_with_stream(input.into_io_stream()) .into_string() .await?; diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 7aca056..705007c 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -10,6 +10,7 @@ use crate::{ AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec, }, + future::WithPollTimer, process::Process, state::State, }; @@ -177,7 +178,8 @@ pub(super) async fn discover_bytes_stream( bytes: BytesStream, ) -> Result, FfMpegError> { let output = crate::ffmpeg::with_file(&state.tmp_dir, None, |path| async move { - crate::file::write_from_async_read(&path, bytes.into_reader()) + crate::file::write_from_stream(&path, bytes.into_io_stream()) + .with_poll_timer("discover-ffmpeg-write-file") .await .map_err(FfMpegError::Write)?; @@ -201,6 +203,7 @@ pub(super) async fn discover_bytes_stream( .await? .read() .into_vec() + .with_poll_timer("discover-ffmpeg-into-vec") .await .map_err(FfMpegError::Process) }) diff --git a/src/discover/magick.rs b/src/discover/magick.rs index d44a0dc..d971022 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -74,7 +74,7 @@ async fn discover(state: &State, stream: BytesStream) -> Result, stream: impl Stream>, ) -> std::io::Result<()> { - let mut file = File::create(path).await?; - file.write_from_stream(stream).await?; - file.close().await?; - Ok(()) -} - -pub(crate) async fn write_from_async_read( - path: impl AsRef, - reader: impl AsyncRead, -) -> std::io::Result<()> { - let mut file = File::create(path).await?; - file.write_from_async_read(reader).await?; + let mut file = File::create(path).with_poll_timer("create-file").await?; + file.write_from_stream(stream) + .with_poll_timer("write-from-stream") + .await?; file.close().await?; Ok(()) } #[cfg(not(feature = "io-uring"))] mod tokio_file { - use crate::{store::file_store::FileError, Either}; + use crate::{future::WithPollTimer, store::file_store::FileError, Either}; use actix_web::web::{Bytes, BytesMut}; use futures_core::Stream; use std::{io::SeekFrom, path::Path}; use streem::IntoStreamer; - use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; - use tokio_util::codec::{BytesCodec, FramedRead}; + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + use tokio_util::{ + bytes::Buf, + codec::{BytesCodec, FramedRead}, + }; pub(crate) struct File { inner: tokio::fs::File, @@ -68,26 +64,22 @@ mod tokio_file { let stream = std::pin::pin!(stream); let mut stream = stream.into_streamer(); - while let Some(res) = stream.next().await { + while let Some(mut bytes) = stream.try_next().with_poll_timer("try-next").await? { tracing::trace!("write_from_stream: looping"); - let mut bytes = res?; + while bytes.has_remaining() { + self.inner + .write_buf(&mut bytes) + .with_poll_timer("write-buf") + .await?; - self.inner.write_all_buf(&mut bytes).await?; + crate::sync::cooperate().await; + } } Ok(()) } - pub(crate) async fn write_from_async_read(&mut self, reader: R) -> std::io::Result<()> - where - R: AsyncRead, - { - let mut reader = std::pin::pin!(reader); - tokio::io::copy(&mut reader, &mut self.inner).await?; - Ok(()) - } - pub(crate) async fn close(self) -> std::io::Result<()> { Ok(()) } @@ -129,7 +121,6 @@ mod io_uring { path::{Path, PathBuf}, }; use streem::IntoStreamer; - use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_uring::{ buf::{IoBuf, IoBufMut}, BufResult, @@ -212,59 +203,6 @@ mod io_uring { Ok(()) } - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn write_from_async_read(&mut self, reader: R) -> std::io::Result<()> - where - R: AsyncRead, - { - let mut reader = std::pin::pin!(reader); - let mut cursor: u64 = 0; - - loop { - tracing::trace!("write_from_async_read: looping"); - - let max_size = 65_536; - let mut buf = Vec::with_capacity(max_size.try_into().unwrap()); - - let n = (&mut reader).take(max_size).read_buf(&mut buf).await?; - - if n == 0 { - break; - } - - let mut position = 0; - - loop { - tracing::trace!("write_from_async_read: looping inner"); - - if position == n { - break; - } - - let position_u64: u64 = position.try_into().unwrap(); - let (res, slice) = self - .write_at(buf.slice(position..n), cursor + position_u64) - .await; - - let n = res?; - if n == 0 { - return Err(std::io::ErrorKind::UnexpectedEof.into()); - } - - position += n; - - buf = slice.into_inner(); - } - - let position: u64 = position.try_into().unwrap(); - cursor += position; - } - - self.inner.sync_all().await?; - - Ok(()) - } - pub(crate) async fn close(self) -> std::io::Result<()> { self.inner.close().await } @@ -387,9 +325,12 @@ mod io_uring { let tmp = "/tmp/write-test"; test_async!(async move { - let mut file = tokio::fs::File::open(EARTH_GIF).await.unwrap(); + let file = tokio::fs::File::open(EARTH_GIF).await.unwrap(); let mut tmp_file = super::File::create(tmp).await.unwrap(); - tmp_file.write_from_async_read(&mut file).await.unwrap(); + tmp_file + .write_from_stream(tokio_util::io::ReaderStream::new(file)) + .await + .unwrap(); }); let mut source = std::fs::File::open(EARTH_GIF).unwrap(); diff --git a/src/generate.rs b/src/generate.rs index 636b3cc..18d03bd 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -223,7 +223,11 @@ where .with_stdout(|stdout| async { state .store - .save_async_read(stdout, media_type, Some(file_extension)) + .save_stream( + tokio_util::io::ReaderStream::with_capacity(stdout, 1024 * 64), + media_type, + Some(file_extension), + ) .await }) .await??; diff --git a/src/ingest.rs b/src/ingest.rs index a074f84..3d5004b 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -47,13 +47,16 @@ where Duration::from_secs(60), BytesStream::try_from_stream(stream), ) + .with_poll_timer("try-from-stream") .await .map_err(|_| UploadError::AggregateTimeout)??; let permit = crate::process_semaphore().acquire().await?; tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?; + let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes) + .with_poll_timer("validate-bytes-stream") + .await?; let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -88,18 +91,22 @@ where state .store - .save_async_read( - hasher_reader, + .save_stream( + tokio_util::io::ReaderStream::with_capacity(hasher_reader, 1024 * 64), input_type.media_type(), Some(input_type.file_extension()), ) + .with_poll_timer("save-hasher-reader") .await .map(move |identifier| (hash_state, identifier)) }) + .with_poll_timer("save-process-stdout") .await??; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; - let details = Details::from_bytes_stream(state, bytes_stream).await?; + let details = Details::from_bytes_stream(state, bytes_stream) + .with_poll_timer("details-from-bytes-stream") + .await?; drop(permit); @@ -135,8 +142,8 @@ where let identifier = state .store - .save_async_read( - hasher_reader, + .save_stream( + tokio_util::io::ReaderStream::with_capacity(hasher_reader, 1024 * 64), input_type.media_type(), Some(input_type.file_extension()), ) diff --git a/src/process.rs b/src/process.rs index 525d4b7..e31f15b 100644 --- a/src/process.rs +++ b/src/process.rs @@ -9,7 +9,7 @@ use std::{ use futures_core::Stream; use streem::IntoStreamer; use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; use tokio_util::{bytes::Bytes, io::ReaderStream}; @@ -246,23 +246,6 @@ impl Process { } } - pub(crate) fn drive_with_async_read(self, input: impl AsyncRead + 'static) -> ProcessRead { - self.drive(move |mut stdin| { - async move { - let mut input = std::pin::pin!(input); - - match tokio::io::copy(&mut input, &mut stdin).await { - Ok(_) => Ok(()), - // BrokenPipe means we finished reading from Stdout, so we don't need to write - // to stdin. We'll still error out if the command failed so treat this as a - // success - Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(()), - Err(e) => Err(e), - } - } - }) - } - pub(crate) fn drive_with_stream(self, input: S) -> ProcessRead where S: Stream> + 'static, @@ -277,7 +260,7 @@ impl Process { Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => return Err(e), } - tokio::task::yield_now().await; + crate::sync::cooperate().await; } Ok(()) diff --git a/src/queue.rs b/src/queue.rs index 0c78463..0556348 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -2,7 +2,7 @@ use crate::{ concurrent_processor::ProcessMap, error::{Error, UploadError}, formats::InputProcessableFormat, - future::LocalBoxFuture, + future::{LocalBoxFuture, WithPollTimer}, repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId}, serde_str::Serde, state::State, @@ -312,9 +312,11 @@ where loop { tracing::trace!("process_jobs: looping"); - tokio::task::yield_now().await; + crate::sync::cooperate().await; - let res = job_loop(&state, worker_id, queue, callback).await; + let res = job_loop(&state, worker_id, queue, callback) + .with_poll_timer("job-loop") + .await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -344,10 +346,14 @@ where loop { tracing::trace!("job_loop: looping"); - tokio::task::yield_now().await; + crate::sync::cooperate().await; async { - let (job_id, job) = state.repo.pop(queue, worker_id).await?; + let (job_id, job) = state + .repo + .pop(queue, worker_id) + .with_poll_timer("pop-cleanup") + .await?; let guard = MetricsGuard::guard(worker_id, queue); @@ -358,6 +364,7 @@ where job_id, (callback)(state, job), ) + .with_poll_timer("cleanup-job-and-heartbeat") .await; state @@ -390,9 +397,11 @@ async fn process_image_jobs( loop { tracing::trace!("process_image_jobs: looping"); - tokio::task::yield_now().await; + crate::sync::cooperate().await; - let res = image_job_loop(&state, &process_map, worker_id, queue, callback).await; + let res = image_job_loop(&state, &process_map, worker_id, queue, callback) + .with_poll_timer("image-job-loop") + .await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -423,10 +432,14 @@ where loop { tracing::trace!("image_job_loop: looping"); - tokio::task::yield_now().await; + crate::sync::cooperate().await; async { - let (job_id, job) = state.repo.pop(queue, worker_id).await?; + let (job_id, job) = state + .repo + .pop(queue, worker_id) + .with_poll_timer("pop-process") + .await?; let guard = MetricsGuard::guard(worker_id, queue); @@ -437,6 +450,7 @@ where job_id, (callback)(state, process_map, job), ) + .with_poll_timer("process-job-and-heartbeat") .await; state @@ -466,7 +480,9 @@ async fn heartbeat( where Fut: std::future::Future, { - let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future"))); + let mut fut = std::pin::pin!(fut + .with_poll_timer("job-future") + .instrument(tracing::info_span!("job-future"))); let mut interval = tokio::time::interval(Duration::from_secs(5)); @@ -475,7 +491,7 @@ where loop { tracing::trace!("heartbeat: looping"); - tokio::task::yield_now().await; + crate::sync::cooperate().await; tokio::select! { biased; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index d3da1a2..ff67e8b 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,7 +1,7 @@ use crate::{ details::HumanDate, error_code::{ErrorCode, OwnedErrorCode}, - future::WithTimeout, + future::{WithPollTimer, WithTimeout}, serde_str::Serde, stream::{from_iterator, LocalBoxStream}, }; @@ -784,6 +784,7 @@ impl QueueRepo for SledRepo { Ok(None) }) + .with_poll_timer("sled-pop-spawn-blocking") .await .map_err(|_| RepoError::Canceled)??; @@ -813,6 +814,7 @@ impl QueueRepo for SledRepo { match notify .notified() .with_timeout(Duration::from_secs(30)) + .with_poll_timer("sled-pop-notify") .await { Ok(()) => tracing::debug!("Notified"), diff --git a/src/store.rs b/src/store.rs index 88cb9f8..15cdffe 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,6 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; -use tokio::io::AsyncRead; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; @@ -85,15 +84,6 @@ impl From for StoreError { pub(crate) trait Store: Clone + Debug { async fn health_check(&self) -> Result<(), StoreError>; - async fn save_async_read( - &self, - reader: Reader, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead; - async fn save_stream( &self, stream: S, @@ -138,18 +128,6 @@ where T::health_check(self).await } - async fn save_async_read( - &self, - reader: Reader, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead, - { - T::save_async_read(self, reader, content_type, extension).await - } - async fn save_stream( &self, stream: S, @@ -192,18 +170,6 @@ where T::health_check(self).await } - async fn save_async_read( - &self, - reader: Reader, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead, - { - T::save_async_read(self, reader, content_type, extension).await - } - async fn save_stream( &self, stream: S, @@ -246,18 +212,6 @@ where T::health_check(self).await } - async fn save_async_read( - &self, - reader: Reader, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead, - { - T::save_async_read(self, reader, content_type, extension).await - } - async fn save_stream( &self, stream: S, diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 501df56..1f04fd5 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,12 +1,12 @@ -use crate::{error_code::ErrorCode, file::File, store::Store, stream::LocalBoxStream}; +use crate::{ + error_code::ErrorCode, file::File, future::WithPollTimer, store::Store, stream::LocalBoxStream, +}; use actix_web::web::Bytes; use futures_core::Stream; use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::AsyncRead; -use tokio_util::io::StreamReader; use tracing::Instrument; use super::StoreError; @@ -51,39 +51,23 @@ impl Store for FileStore { Ok(()) } - #[tracing::instrument(skip(self, reader))] - async fn save_async_read( - &self, - reader: Reader, - _content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead, - { - let mut reader = std::pin::pin!(reader); - - let path = self.next_file(extension); - - if let Err(e) = self.safe_save_reader(&path, &mut reader).await { - self.safe_remove_file(&path).await?; - return Err(e.into()); - } - - Ok(self.file_id_from_path(path)?) - } - async fn save_stream( &self, stream: S, - content_type: mime::Mime, + _content_type: mime::Mime, extension: Option<&str>, ) -> Result, StoreError> where S: Stream>, { - self.save_async_read(StreamReader::new(stream), content_type, extension) - .await + let path = self.next_file(extension); + + if let Err(e) = self.safe_save_stream(&path, stream).await { + self.safe_remove_file(&path).await?; + return Err(e.into()); + } + + Ok(self.file_id_from_path(path)?) } fn public_url(&self, _identifier: &Arc) -> Option { @@ -182,10 +166,10 @@ impl FileStore { } } - async fn safe_save_reader>( + async fn safe_save_stream>( &self, to: P, - input: &mut (impl AsyncRead + Unpin + ?Sized), + input: impl Stream>, ) -> Result<(), FileError> { safe_create_parent(&to).await?; @@ -199,7 +183,11 @@ impl FileStore { let mut file = File::create(to).await?; - file.write_from_async_read(input).await?; + file.write_from_stream(input) + .with_poll_timer("write-from-stream") + .await?; + + file.close().await?; Ok(()) } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index e272be5..e152bd5 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -21,8 +21,6 @@ use rusty_s3::{ }; use std::{string::FromUtf8Error, sync::Arc, time::Duration}; use streem::IntoStreamer; -use tokio::io::AsyncRead; -use tokio_util::io::ReaderStream; use tracing::Instrument; use url::Url; @@ -207,23 +205,6 @@ impl Store for ObjectStore { Ok(()) } - async fn save_async_read( - &self, - reader: Reader, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result, StoreError> - where - Reader: AsyncRead, - { - self.save_stream( - ReaderStream::with_capacity(reader, 1024 * 64), - content_type, - extension, - ) - .await - } - #[tracing::instrument(skip_all)] async fn save_stream( &self, @@ -244,7 +225,7 @@ impl Store for ObjectStore { .await?; let response = req - .body(Body::wrap_stream(first_chunk)) + .body(Body::wrap_stream(first_chunk.into_io_stream())) .send() .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) .await @@ -484,7 +465,7 @@ impl ObjectStore { &upload_id2, ) .await? - .body(Body::wrap_stream(buf)) + .body(Body::wrap_stream(buf.into_io_stream())) .send() .with_metrics( crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, diff --git a/src/stream.rs b/src/stream.rs index 66a502b..9482de3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -105,22 +105,12 @@ where streem::from_fn(|yielder| async move { let mut stream = rx.into_stream().into_streamer(); - let yield_count = buffer.max(8); - let mut count = 0; - while let Some(res) = stream.next().await { tracing::trace!("from_iterator: looping"); - count += 1; - count %= yield_count; - yielder.yield_(res).await; - // every 8 (or buffer-size) items, yield to executor before looping - // improves cooperation - if count == 0 { - tokio::task::yield_now().await; - } + crate::sync::cooperate().await; } let _ = handle.await; diff --git a/src/sync.rs b/src/sync.rs index 20a5468..d94d460 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -76,6 +76,15 @@ pub(crate) fn bare_semaphore(permits: usize) -> Semaphore { semaphore } +// best effort cooperation mechanism +pub(crate) async fn cooperate() { + #[cfg(tokio_unstable)] + tokio::task::consume_budget().await; + + #[cfg(not(tokio_unstable))] + tokio::task::yield_now().await; +} + #[track_caller] pub(crate) fn spawn(name: &'static str, future: F) -> tokio::task::JoinHandle where diff --git a/src/validate.rs b/src/validate.rs index 541b7f0..5460553 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -11,6 +11,7 @@ use crate::{ AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, InternalFormat, }, + future::WithPollTimer, process::{Process, ProcessRead}, state::State, }; @@ -69,14 +70,16 @@ pub(crate) async fn validate_bytes_stream( width, height, frames, - } = crate::discover::discover_bytes_stream(state, bytes.clone()).await?; + } = crate::discover::discover_bytes_stream(state, bytes.clone()) + .with_poll_timer("discover-bytes-stream") + .await?; match &input { InputFile::Image(input) => { let (format, process) = process_image_command(state, *input, bytes.len(), width, height).await?; - Ok((format, process.drive_with_async_read(bytes.into_reader()))) + Ok((format, process.drive_with_stream(bytes.into_io_stream()))) } InputFile::Animation(input) => { let (format, process) = process_animation_command( @@ -89,7 +92,7 @@ pub(crate) async fn validate_bytes_stream( ) .await?; - Ok((format, process.drive_with_async_read(bytes.into_reader()))) + Ok((format, process.drive_with_stream(bytes.into_io_stream()))) } InputFile::Video(input) => { let (format, process_read) = @@ -252,6 +255,7 @@ async fn process_video( state.config.media.process_timeout, bytes, ) + .with_poll_timer("transcode-bytes") .await?; Ok(( diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 84e0c22..d80300e 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -6,6 +6,7 @@ use crate::{ bytes_stream::BytesStream, ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, + future::WithPollTimer, process::{Process, ProcessRead}, tmp_file::TmpDir, }; @@ -22,7 +23,8 @@ pub(super) async fn transcode_bytes( let output_path = output_file.as_os_str(); let res = crate::ffmpeg::with_file(tmp_dir, None, |input_file| async move { - crate::file::write_from_async_read(&input_file, bytes.into_reader()) + crate::file::write_from_stream(&input_file, bytes.into_io_stream()) + .with_poll_timer("write-from-stream") .await .map_err(FfMpegError::Write)?; @@ -34,6 +36,7 @@ pub(super) async fn transcode_bytes( crf, timeout, ) + .with_poll_timer("transcode-files") .await?; let tmp_file = crate::file::File::open(output_path)