Remove unneeded code

This commit is contained in:
asonix 2024-03-09 22:53:46 -06:00
parent aa4582a3f8
commit 4976fcb2eb
18 changed files with 142 additions and 321 deletions

View file

@ -1,14 +1,7 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{ use std::collections::{vec_deque::IntoIter, VecDeque};
collections::{vec_deque::IntoIter, VecDeque},
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::AsyncRead;
use tokio_util::bytes::Buf;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct BytesStream { pub(crate) struct BytesStream {
@ -36,6 +29,7 @@ impl BytesStream {
while let Some(bytes) = stream.try_next().await? { while let Some(bytes) = stream.try_next().await? {
tracing::trace!("try_from_stream: looping"); tracing::trace!("try_from_stream: looping");
bs.add_bytes(bytes); bs.add_bytes(bytes);
crate::sync::cooperate().await;
} }
tracing::debug!( tracing::debug!(
@ -64,21 +58,14 @@ impl BytesStream {
self.total_len == 0 self.total_len == 0
} }
pub(crate) fn into_reader(self) -> BytesReader { pub(crate) fn into_io_stream(self) -> impl Stream<Item = std::io::Result<Bytes>> {
BytesReader { inner: self.inner } streem::from_fn(move |yielder| async move {
for bytes in self {
crate::sync::cooperate().await;
yielder.yield_ok(bytes).await;
}
})
} }
pub(crate) fn into_io_stream(self) -> IoStream {
IoStream { inner: self.inner }
}
}
pub(crate) struct IoStream {
inner: VecDeque<Bytes>,
}
pub(crate) struct BytesReader {
inner: VecDeque<Bytes>,
} }
impl IntoIterator for BytesStream { impl IntoIterator for BytesStream {
@ -89,57 +76,3 @@ impl IntoIterator for BytesStream {
self.inner.into_iter() self.inner.into_iter()
} }
} }
impl Stream for BytesStream {
type Item = Result<Bytes, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.inner.len(), Some(self.inner.len()))
}
}
impl Stream for IoStream {
type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.inner.len(), Some(self.inner.len()))
}
}
impl AsyncRead for BytesReader {
fn poll_read(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
while buf.remaining() > 0 {
tracing::trace!("bytes reader: looping");
if let Some(bytes) = self.inner.front_mut() {
if bytes.is_empty() {
self.inner.pop_front();
continue;
}
let upper_bound = buf.remaining().min(bytes.len());
let slice = &bytes[..upper_bound];
buf.put_slice(slice);
bytes.advance(upper_bound);
} else {
break;
}
}
Poll::Ready(Ok(()))
}
}

View file

@ -2,9 +2,7 @@ mod exiftool;
mod ffmpeg; mod ffmpeg;
mod magick; mod magick;
use crate::{bytes_stream::BytesStream, formats::InputFile, future::WithPollTimer, state::State};
use crate::{bytes_stream::BytesStream, formats::InputFile, state::State};
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub(crate) struct Discovery { pub(crate) struct Discovery {
@ -31,12 +29,17 @@ pub(crate) async fn discover_bytes_stream<S>(
state: &State<S>, state: &State<S>,
bytes: BytesStream, bytes: BytesStream,
) -> Result<Discovery, crate::error::Error> { ) -> Result<Discovery, crate::error::Error> {
let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone()).await?; let discovery = ffmpeg::discover_bytes_stream(state, bytes.clone())
.with_poll_timer("discover-ffmpeg")
.await?;
let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone()).await?; let discovery = magick::confirm_bytes_stream(state, discovery, bytes.clone())
.with_poll_timer("confirm-imagemagick")
.await?;
let discovery = let discovery = exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout)
exiftool::check_reorient(discovery, bytes, state.config.media.process_timeout).await?; .with_poll_timer("reorient-exiftool")
.await?;
Ok(discovery) Ok(discovery)
} }

View file

@ -42,7 +42,7 @@ pub(super) async fn check_reorient(
async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> { async fn needs_reorienting(input: BytesStream, timeout: u64) -> Result<bool, ExifError> {
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout) let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)
.await? .await?
.drive_with_async_read(input.into_reader()) .drive_with_stream(input.into_io_stream())
.into_string() .into_string()
.await?; .await?;

View file

@ -10,6 +10,7 @@ use crate::{
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec, Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec,
}, },
future::WithPollTimer,
process::Process, process::Process,
state::State, state::State,
}; };
@ -177,7 +178,8 @@ pub(super) async fn discover_bytes_stream<S>(
bytes: BytesStream, bytes: BytesStream,
) -> Result<Option<Discovery>, FfMpegError> { ) -> Result<Option<Discovery>, FfMpegError> {
let output = crate::ffmpeg::with_file(&state.tmp_dir, None, |path| async move { let output = crate::ffmpeg::with_file(&state.tmp_dir, None, |path| async move {
crate::file::write_from_async_read(&path, bytes.into_reader()) crate::file::write_from_stream(&path, bytes.into_io_stream())
.with_poll_timer("discover-ffmpeg-write-file")
.await .await
.map_err(FfMpegError::Write)?; .map_err(FfMpegError::Write)?;
@ -201,6 +203,7 @@ pub(super) async fn discover_bytes_stream<S>(
.await? .await?
.read() .read()
.into_vec() .into_vec()
.with_poll_timer("discover-ffmpeg-into-vec")
.await .await
.map_err(FfMpegError::Process) .map_err(FfMpegError::Process)
}) })

View file

@ -74,7 +74,7 @@ async fn discover<S>(state: &State<S>, stream: BytesStream) -> Result<Discovery,
state.config.media.process_timeout, state.config.media.process_timeout,
) )
.await? .await?
.drive_with_async_read(stream.into_reader()) .drive_with_stream(stream.into_io_stream())
.into_string() .into_string()
.await; .await;

View file

@ -1,7 +1,6 @@
use std::path::Path; use std::path::Path;
use futures_core::Stream; use futures_core::Stream;
use tokio::io::AsyncRead;
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
@ -10,35 +9,32 @@ pub(crate) use io_uring::File;
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
pub(crate) use tokio_file::File; pub(crate) use tokio_file::File;
use crate::future::WithPollTimer;
pub(crate) async fn write_from_stream( pub(crate) async fn write_from_stream(
path: impl AsRef<Path>, path: impl AsRef<Path>,
stream: impl Stream<Item = std::io::Result<Bytes>>, stream: impl Stream<Item = std::io::Result<Bytes>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let mut file = File::create(path).await?; let mut file = File::create(path).with_poll_timer("create-file").await?;
file.write_from_stream(stream).await?; file.write_from_stream(stream)
file.close().await?; .with_poll_timer("write-from-stream")
Ok(()) .await?;
}
pub(crate) async fn write_from_async_read(
path: impl AsRef<Path>,
reader: impl AsyncRead,
) -> std::io::Result<()> {
let mut file = File::create(path).await?;
file.write_from_async_read(reader).await?;
file.close().await?; file.close().await?;
Ok(()) Ok(())
} }
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
mod tokio_file { mod tokio_file {
use crate::{store::file_store::FileError, Either}; use crate::{future::WithPollTimer, store::file_store::FileError, Either};
use actix_web::web::{Bytes, BytesMut}; use actix_web::web::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use std::{io::SeekFrom, path::Path}; use std::{io::SeekFrom, path::Path};
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::{
bytes::Buf,
codec::{BytesCodec, FramedRead},
};
pub(crate) struct File { pub(crate) struct File {
inner: tokio::fs::File, inner: tokio::fs::File,
@ -68,26 +64,22 @@ mod tokio_file {
let stream = std::pin::pin!(stream); let stream = std::pin::pin!(stream);
let mut stream = stream.into_streamer(); let mut stream = stream.into_streamer();
while let Some(res) = stream.next().await { while let Some(mut bytes) = stream.try_next().with_poll_timer("try-next").await? {
tracing::trace!("write_from_stream: looping"); tracing::trace!("write_from_stream: looping");
let mut bytes = res?; while bytes.has_remaining() {
self.inner
.write_buf(&mut bytes)
.with_poll_timer("write-buf")
.await?;
self.inner.write_all_buf(&mut bytes).await?; crate::sync::cooperate().await;
}
} }
Ok(()) Ok(())
} }
pub(crate) async fn write_from_async_read<R>(&mut self, reader: R) -> std::io::Result<()>
where
R: AsyncRead,
{
let mut reader = std::pin::pin!(reader);
tokio::io::copy(&mut reader, &mut self.inner).await?;
Ok(())
}
pub(crate) async fn close(self) -> std::io::Result<()> { pub(crate) async fn close(self) -> std::io::Result<()> {
Ok(()) Ok(())
} }
@ -129,7 +121,6 @@ mod io_uring {
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_uring::{ use tokio_uring::{
buf::{IoBuf, IoBufMut}, buf::{IoBuf, IoBufMut},
BufResult, BufResult,
@ -212,59 +203,6 @@ mod io_uring {
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn write_from_async_read<R>(&mut self, reader: R) -> std::io::Result<()>
where
R: AsyncRead,
{
let mut reader = std::pin::pin!(reader);
let mut cursor: u64 = 0;
loop {
tracing::trace!("write_from_async_read: looping");
let max_size = 65_536;
let mut buf = Vec::with_capacity(max_size.try_into().unwrap());
let n = (&mut reader).take(max_size).read_buf(&mut buf).await?;
if n == 0 {
break;
}
let mut position = 0;
loop {
tracing::trace!("write_from_async_read: looping inner");
if position == n {
break;
}
let position_u64: u64 = position.try_into().unwrap();
let (res, slice) = self
.write_at(buf.slice(position..n), cursor + position_u64)
.await;
let n = res?;
if n == 0 {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
position += n;
buf = slice.into_inner();
}
let position: u64 = position.try_into().unwrap();
cursor += position;
}
self.inner.sync_all().await?;
Ok(())
}
pub(crate) async fn close(self) -> std::io::Result<()> { pub(crate) async fn close(self) -> std::io::Result<()> {
self.inner.close().await self.inner.close().await
} }
@ -387,9 +325,12 @@ mod io_uring {
let tmp = "/tmp/write-test"; let tmp = "/tmp/write-test";
test_async!(async move { test_async!(async move {
let mut file = tokio::fs::File::open(EARTH_GIF).await.unwrap(); let file = tokio::fs::File::open(EARTH_GIF).await.unwrap();
let mut tmp_file = super::File::create(tmp).await.unwrap(); let mut tmp_file = super::File::create(tmp).await.unwrap();
tmp_file.write_from_async_read(&mut file).await.unwrap(); tmp_file
.write_from_stream(tokio_util::io::ReaderStream::new(file))
.await
.unwrap();
}); });
let mut source = std::fs::File::open(EARTH_GIF).unwrap(); let mut source = std::fs::File::open(EARTH_GIF).unwrap();

View file

@ -223,7 +223,11 @@ where
.with_stdout(|stdout| async { .with_stdout(|stdout| async {
state state
.store .store
.save_async_read(stdout, media_type, Some(file_extension)) .save_stream(
tokio_util::io::ReaderStream::with_capacity(stdout, 1024 * 64),
media_type,
Some(file_extension),
)
.await .await
}) })
.await??; .await??;

View file

@ -47,13 +47,16 @@ where
Duration::from_secs(60), Duration::from_secs(60),
BytesStream::try_from_stream(stream), BytesStream::try_from_stream(stream),
) )
.with_poll_timer("try-from-stream")
.await .await
.map_err(|_| UploadError::AggregateTimeout)??; .map_err(|_| UploadError::AggregateTimeout)??;
let permit = crate::process_semaphore().acquire().await?; let permit = crate::process_semaphore().acquire().await?;
tracing::trace!("Validating bytes"); tracing::trace!("Validating bytes");
let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes).await?; let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes)
.with_poll_timer("validate-bytes-stream")
.await?;
let process_read = if let Some(operations) = state.config.media.preprocess_steps() { let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
if let Some(format) = input_type.processable_format() { if let Some(format) = input_type.processable_format() {
@ -88,18 +91,22 @@ where
state state
.store .store
.save_async_read( .save_stream(
hasher_reader, tokio_util::io::ReaderStream::with_capacity(hasher_reader, 1024 * 64),
input_type.media_type(), input_type.media_type(),
Some(input_type.file_extension()), Some(input_type.file_extension()),
) )
.with_poll_timer("save-hasher-reader")
.await .await
.map(move |identifier| (hash_state, identifier)) .map(move |identifier| (hash_state, identifier))
}) })
.with_poll_timer("save-process-stdout")
.await??; .await??;
let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; let bytes_stream = state.store.to_bytes(&identifier, None, None).await?;
let details = Details::from_bytes_stream(state, bytes_stream).await?; let details = Details::from_bytes_stream(state, bytes_stream)
.with_poll_timer("details-from-bytes-stream")
.await?;
drop(permit); drop(permit);
@ -135,8 +142,8 @@ where
let identifier = state let identifier = state
.store .store
.save_async_read( .save_stream(
hasher_reader, tokio_util::io::ReaderStream::with_capacity(hasher_reader, 1024 * 64),
input_type.media_type(), input_type.media_type(),
Some(input_type.file_extension()), Some(input_type.file_extension()),
) )

View file

@ -9,7 +9,7 @@ use std::{
use futures_core::Stream; use futures_core::Stream;
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::{ use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
process::{Child, ChildStdin, Command}, process::{Child, ChildStdin, Command},
}; };
use tokio_util::{bytes::Bytes, io::ReaderStream}; use tokio_util::{bytes::Bytes, io::ReaderStream};
@ -246,23 +246,6 @@ impl Process {
} }
} }
pub(crate) fn drive_with_async_read(self, input: impl AsyncRead + 'static) -> ProcessRead {
self.drive(move |mut stdin| {
async move {
let mut input = std::pin::pin!(input);
match tokio::io::copy(&mut input, &mut stdin).await {
Ok(_) => Ok(()),
// BrokenPipe means we finished reading from Stdout, so we don't need to write
// to stdin. We'll still error out if the command failed so treat this as a
// success
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(()),
Err(e) => Err(e),
}
}
})
}
pub(crate) fn drive_with_stream<S>(self, input: S) -> ProcessRead pub(crate) fn drive_with_stream<S>(self, input: S) -> ProcessRead
where where
S: Stream<Item = std::io::Result<Bytes>> + 'static, S: Stream<Item = std::io::Result<Bytes>> + 'static,
@ -277,7 +260,7 @@ impl Process {
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Err(e) => return Err(e), Err(e) => return Err(e),
} }
tokio::task::yield_now().await; crate::sync::cooperate().await;
} }
Ok(()) Ok(())

View file

@ -2,7 +2,7 @@ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
error::{Error, UploadError}, error::{Error, UploadError},
formats::InputProcessableFormat, formats::InputProcessableFormat,
future::LocalBoxFuture, future::{LocalBoxFuture, WithPollTimer},
repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId}, repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
serde_str::Serde, serde_str::Serde,
state::State, state::State,
@ -312,9 +312,11 @@ where
loop { loop {
tracing::trace!("process_jobs: looping"); tracing::trace!("process_jobs: looping");
tokio::task::yield_now().await; crate::sync::cooperate().await;
let res = job_loop(&state, worker_id, queue, callback).await; let res = job_loop(&state, worker_id, queue, callback)
.with_poll_timer("job-loop")
.await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("Error processing jobs: {}", format!("{e}"));
@ -344,10 +346,14 @@ where
loop { loop {
tracing::trace!("job_loop: looping"); tracing::trace!("job_loop: looping");
tokio::task::yield_now().await; crate::sync::cooperate().await;
async { async {
let (job_id, job) = state.repo.pop(queue, worker_id).await?; let (job_id, job) = state
.repo
.pop(queue, worker_id)
.with_poll_timer("pop-cleanup")
.await?;
let guard = MetricsGuard::guard(worker_id, queue); let guard = MetricsGuard::guard(worker_id, queue);
@ -358,6 +364,7 @@ where
job_id, job_id,
(callback)(state, job), (callback)(state, job),
) )
.with_poll_timer("cleanup-job-and-heartbeat")
.await; .await;
state state
@ -390,9 +397,11 @@ async fn process_image_jobs<S, F>(
loop { loop {
tracing::trace!("process_image_jobs: looping"); tracing::trace!("process_image_jobs: looping");
tokio::task::yield_now().await; crate::sync::cooperate().await;
let res = image_job_loop(&state, &process_map, worker_id, queue, callback).await; let res = image_job_loop(&state, &process_map, worker_id, queue, callback)
.with_poll_timer("image-job-loop")
.await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("Error processing jobs: {}", format!("{e}"));
@ -423,10 +432,14 @@ where
loop { loop {
tracing::trace!("image_job_loop: looping"); tracing::trace!("image_job_loop: looping");
tokio::task::yield_now().await; crate::sync::cooperate().await;
async { async {
let (job_id, job) = state.repo.pop(queue, worker_id).await?; let (job_id, job) = state
.repo
.pop(queue, worker_id)
.with_poll_timer("pop-process")
.await?;
let guard = MetricsGuard::guard(worker_id, queue); let guard = MetricsGuard::guard(worker_id, queue);
@ -437,6 +450,7 @@ where
job_id, job_id,
(callback)(state, process_map, job), (callback)(state, process_map, job),
) )
.with_poll_timer("process-job-and-heartbeat")
.await; .await;
state state
@ -466,7 +480,9 @@ async fn heartbeat<Fut>(
where where
Fut: std::future::Future, Fut: std::future::Future,
{ {
let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future"))); let mut fut = std::pin::pin!(fut
.with_poll_timer("job-future")
.instrument(tracing::info_span!("job-future")));
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
@ -475,7 +491,7 @@ where
loop { loop {
tracing::trace!("heartbeat: looping"); tracing::trace!("heartbeat: looping");
tokio::task::yield_now().await; crate::sync::cooperate().await;
tokio::select! { tokio::select! {
biased; biased;

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
details::HumanDate, details::HumanDate,
error_code::{ErrorCode, OwnedErrorCode}, error_code::{ErrorCode, OwnedErrorCode},
future::WithTimeout, future::{WithPollTimer, WithTimeout},
serde_str::Serde, serde_str::Serde,
stream::{from_iterator, LocalBoxStream}, stream::{from_iterator, LocalBoxStream},
}; };
@ -784,6 +784,7 @@ impl QueueRepo for SledRepo {
Ok(None) Ok(None)
}) })
.with_poll_timer("sled-pop-spawn-blocking")
.await .await
.map_err(|_| RepoError::Canceled)??; .map_err(|_| RepoError::Canceled)??;
@ -813,6 +814,7 @@ impl QueueRepo for SledRepo {
match notify match notify
.notified() .notified()
.with_timeout(Duration::from_secs(30)) .with_timeout(Duration::from_secs(30))
.with_poll_timer("sled-pop-notify")
.await .await
{ {
Ok(()) => tracing::debug!("Notified"), Ok(()) => tracing::debug!("Notified"),

View file

@ -1,7 +1,6 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use tokio::io::AsyncRead;
use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream};
@ -85,15 +84,6 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
pub(crate) trait Store: Clone + Debug { pub(crate) trait Store: Clone + Debug {
async fn health_check(&self) -> Result<(), StoreError>; async fn health_check(&self) -> Result<(), StoreError>;
async fn save_async_read<Reader>(
&self,
reader: Reader,
content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead;
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
stream: S, stream: S,
@ -138,18 +128,6 @@ where
T::health_check(self).await T::health_check(self).await
} }
async fn save_async_read<Reader>(
&self,
reader: Reader,
content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead,
{
T::save_async_read(self, reader, content_type, extension).await
}
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
stream: S, stream: S,
@ -192,18 +170,6 @@ where
T::health_check(self).await T::health_check(self).await
} }
async fn save_async_read<Reader>(
&self,
reader: Reader,
content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead,
{
T::save_async_read(self, reader, content_type, extension).await
}
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
stream: S, stream: S,
@ -246,18 +212,6 @@ where
T::health_check(self).await T::health_check(self).await
} }
async fn save_async_read<Reader>(
&self,
reader: Reader,
content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead,
{
T::save_async_read(self, reader, content_type, extension).await
}
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
stream: S, stream: S,

View file

@ -1,12 +1,12 @@
use crate::{error_code::ErrorCode, file::File, store::Store, stream::LocalBoxStream}; use crate::{
error_code::ErrorCode, file::File, future::WithPollTimer, store::Store, stream::LocalBoxStream,
};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tracing::Instrument; use tracing::Instrument;
use super::StoreError; use super::StoreError;
@ -51,39 +51,23 @@ impl Store for FileStore {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, reader))]
async fn save_async_read<Reader>(
&self,
reader: Reader,
_content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead,
{
let mut reader = std::pin::pin!(reader);
let path = self.next_file(extension);
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
self.safe_remove_file(&path).await?;
return Err(e.into());
}
Ok(self.file_id_from_path(path)?)
}
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
stream: S, stream: S,
content_type: mime::Mime, _content_type: mime::Mime,
extension: Option<&str>, extension: Option<&str>,
) -> Result<Arc<str>, StoreError> ) -> Result<Arc<str>, StoreError>
where where
S: Stream<Item = std::io::Result<Bytes>>, S: Stream<Item = std::io::Result<Bytes>>,
{ {
self.save_async_read(StreamReader::new(stream), content_type, extension) let path = self.next_file(extension);
.await
if let Err(e) = self.safe_save_stream(&path, stream).await {
self.safe_remove_file(&path).await?;
return Err(e.into());
}
Ok(self.file_id_from_path(path)?)
} }
fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> {
@ -182,10 +166,10 @@ impl FileStore {
} }
} }
async fn safe_save_reader<P: AsRef<Path>>( async fn safe_save_stream<P: AsRef<Path>>(
&self, &self,
to: P, to: P,
input: &mut (impl AsyncRead + Unpin + ?Sized), input: impl Stream<Item = std::io::Result<Bytes>>,
) -> Result<(), FileError> { ) -> Result<(), FileError> {
safe_create_parent(&to).await?; safe_create_parent(&to).await?;
@ -199,7 +183,11 @@ impl FileStore {
let mut file = File::create(to).await?; let mut file = File::create(to).await?;
file.write_from_async_read(input).await?; file.write_from_stream(input)
.with_poll_timer("write-from-stream")
.await?;
file.close().await?;
Ok(()) Ok(())
} }

View file

@ -21,8 +21,6 @@ use rusty_s3::{
}; };
use std::{string::FromUtf8Error, sync::Arc, time::Duration}; use std::{string::FromUtf8Error, sync::Arc, time::Duration};
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use tracing::Instrument; use tracing::Instrument;
use url::Url; use url::Url;
@ -207,23 +205,6 @@ impl Store for ObjectStore {
Ok(()) Ok(())
} }
async fn save_async_read<Reader>(
&self,
reader: Reader,
content_type: mime::Mime,
extension: Option<&str>,
) -> Result<Arc<str>, StoreError>
where
Reader: AsyncRead,
{
self.save_stream(
ReaderStream::with_capacity(reader, 1024 * 64),
content_type,
extension,
)
.await
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn save_stream<S>( async fn save_stream<S>(
&self, &self,
@ -244,7 +225,7 @@ impl Store for ObjectStore {
.await?; .await?;
let response = req let response = req
.body(Body::wrap_stream(first_chunk)) .body(Body::wrap_stream(first_chunk.into_io_stream()))
.send() .send()
.with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST)
.await .await
@ -484,7 +465,7 @@ impl ObjectStore {
&upload_id2, &upload_id2,
) )
.await? .await?
.body(Body::wrap_stream(buf)) .body(Body::wrap_stream(buf.into_io_stream()))
.send() .send()
.with_metrics( .with_metrics(
crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST,

View file

@ -105,22 +105,12 @@ where
streem::from_fn(|yielder| async move { streem::from_fn(|yielder| async move {
let mut stream = rx.into_stream().into_streamer(); let mut stream = rx.into_stream().into_streamer();
let yield_count = buffer.max(8);
let mut count = 0;
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {
tracing::trace!("from_iterator: looping"); tracing::trace!("from_iterator: looping");
count += 1;
count %= yield_count;
yielder.yield_(res).await; yielder.yield_(res).await;
// every 8 (or buffer-size) items, yield to executor before looping crate::sync::cooperate().await;
// improves cooperation
if count == 0 {
tokio::task::yield_now().await;
}
} }
let _ = handle.await; let _ = handle.await;

View file

@ -76,6 +76,15 @@ pub(crate) fn bare_semaphore(permits: usize) -> Semaphore {
semaphore semaphore
} }
// best effort cooperation mechanism
pub(crate) async fn cooperate() {
#[cfg(tokio_unstable)]
tokio::task::consume_budget().await;
#[cfg(not(tokio_unstable))]
tokio::task::yield_now().await;
}
#[track_caller] #[track_caller]
pub(crate) fn spawn<F>(name: &'static str, future: F) -> tokio::task::JoinHandle<F::Output> pub(crate) fn spawn<F>(name: &'static str, future: F) -> tokio::task::JoinHandle<F::Output>
where where

View file

@ -11,6 +11,7 @@ use crate::{
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
InternalFormat, InternalFormat,
}, },
future::WithPollTimer,
process::{Process, ProcessRead}, process::{Process, ProcessRead},
state::State, state::State,
}; };
@ -69,14 +70,16 @@ pub(crate) async fn validate_bytes_stream<S>(
width, width,
height, height,
frames, frames,
} = crate::discover::discover_bytes_stream(state, bytes.clone()).await?; } = crate::discover::discover_bytes_stream(state, bytes.clone())
.with_poll_timer("discover-bytes-stream")
.await?;
match &input { match &input {
InputFile::Image(input) => { InputFile::Image(input) => {
let (format, process) = let (format, process) =
process_image_command(state, *input, bytes.len(), width, height).await?; process_image_command(state, *input, bytes.len(), width, height).await?;
Ok((format, process.drive_with_async_read(bytes.into_reader()))) Ok((format, process.drive_with_stream(bytes.into_io_stream())))
} }
InputFile::Animation(input) => { InputFile::Animation(input) => {
let (format, process) = process_animation_command( let (format, process) = process_animation_command(
@ -89,7 +92,7 @@ pub(crate) async fn validate_bytes_stream<S>(
) )
.await?; .await?;
Ok((format, process.drive_with_async_read(bytes.into_reader()))) Ok((format, process.drive_with_stream(bytes.into_io_stream())))
} }
InputFile::Video(input) => { InputFile::Video(input) => {
let (format, process_read) = let (format, process_read) =
@ -252,6 +255,7 @@ async fn process_video<S>(
state.config.media.process_timeout, state.config.media.process_timeout,
bytes, bytes,
) )
.with_poll_timer("transcode-bytes")
.await?; .await?;
Ok(( Ok((

View file

@ -6,6 +6,7 @@ use crate::{
bytes_stream::BytesStream, bytes_stream::BytesStream,
ffmpeg::FfMpegError, ffmpeg::FfMpegError,
formats::{InputVideoFormat, OutputVideo}, formats::{InputVideoFormat, OutputVideo},
future::WithPollTimer,
process::{Process, ProcessRead}, process::{Process, ProcessRead},
tmp_file::TmpDir, tmp_file::TmpDir,
}; };
@ -22,7 +23,8 @@ pub(super) async fn transcode_bytes(
let output_path = output_file.as_os_str(); let output_path = output_file.as_os_str();
let res = crate::ffmpeg::with_file(tmp_dir, None, |input_file| async move { let res = crate::ffmpeg::with_file(tmp_dir, None, |input_file| async move {
crate::file::write_from_async_read(&input_file, bytes.into_reader()) crate::file::write_from_stream(&input_file, bytes.into_io_stream())
.with_poll_timer("write-from-stream")
.await .await
.map_err(FfMpegError::Write)?; .map_err(FfMpegError::Write)?;
@ -34,6 +36,7 @@ pub(super) async fn transcode_bytes(
crf, crf,
timeout, timeout,
) )
.with_poll_timer("transcode-files")
.await?; .await?;
let tmp_file = crate::file::File::open(output_path) let tmp_file = crate::file::File::open(output_path)