ffprobe: read from file, simplify input-file access

This commit is contained in:
asonix 2024-02-25 13:05:47 -06:00
parent 277b47af46
commit 7c6112e631
9 changed files with 176 additions and 272 deletions

View file

@ -176,28 +176,34 @@ pub(super) async fn discover_bytes_stream<S>(
state: &State<S>, state: &State<S>,
bytes: BytesStream, bytes: BytesStream,
) -> Result<Option<Discovery>, FfMpegError> { ) -> Result<Option<Discovery>, FfMpegError> {
let res = Process::run( let output = crate::ffmpeg::with_file(&state.tmp_dir, None, |path| async move {
"ffprobe", crate::file::write_from_async_read(&path, bytes.into_reader())
&[ .await
"-v", .map_err(FfMpegError::Write)?;
"quiet",
"-count_frames",
"-show_entries",
"stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
"-print_format",
"json",
"-",
],
&[],
state.config.media.process_timeout,
)?
.drive_with_async_read(bytes.into_reader())
.into_vec()
.await;
let output = res?; Process::run(
"ffprobe",
&[
"-v".as_ref(),
"quiet".as_ref(),
"-count_frames".as_ref(),
"-show_entries".as_ref(),
"stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name".as_ref(),
"-of".as_ref(),
"default=noprint_wrappers=1:nokey=1".as_ref(),
"-print_format".as_ref(),
"json".as_ref(),
path.as_os_str(),
],
&[],
state.config.media.process_timeout,
)?
.read()
.into_vec()
.await
.map_err(FfMpegError::Process)
})
.await??;
let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?;

View file

@ -1,4 +1,8 @@
use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError}; use std::ffi::OsString;
use futures_core::Future;
use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError, tmp_file::TmpDir};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum FfMpegError { pub(crate) enum FfMpegError {
@ -20,12 +24,6 @@ pub(crate) enum FfMpegError {
#[error("Error opening file")] #[error("Error opening file")]
OpenFile(#[source] std::io::Error), OpenFile(#[source] std::io::Error),
#[error("Error creating file")]
CreateFile(#[source] std::io::Error),
#[error("Error closing file")]
CloseFile(#[source] std::io::Error),
#[error("Error cleaning up after command")] #[error("Error cleaning up after command")]
Cleanup(#[source] std::io::Error), Cleanup(#[source] std::io::Error),
@ -56,9 +54,7 @@ impl FfMpegError {
| Self::CreateDir(_) | Self::CreateDir(_)
| Self::ReadFile(_) | Self::ReadFile(_)
| Self::OpenFile(_) | Self::OpenFile(_)
| Self::Cleanup(_) | Self::Cleanup(_) => ErrorCode::COMMAND_ERROR,
| Self::CreateFile(_)
| Self::CloseFile(_) => ErrorCode::COMMAND_ERROR,
} }
} }
@ -78,3 +74,25 @@ impl FfMpegError {
false false
} }
} }
pub(crate) async fn with_file<F, Fut>(
tmp: &TmpDir,
ext: Option<&str>,
f: F,
) -> Result<Fut::Output, FfMpegError>
where
F: FnOnce(OsString) -> Fut,
Fut: Future,
{
let file = tmp.tmp_file(ext);
crate::store::file_store::safe_create_parent(&file)
.await
.map_err(FfMpegError::CreateDir)?;
let res = (f)(file.as_os_str().to_os_string()).await;
file.cleanup().await.map_err(FfMpegError::Cleanup)?;
Ok(res)
}

View file

@ -1,9 +1,35 @@
use std::path::Path;
use futures_core::Stream;
use tokio::io::AsyncRead;
use tokio_util::bytes::Bytes;
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
pub(crate) use io_uring::File; pub(crate) use io_uring::File;
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
pub(crate) use tokio_file::File; pub(crate) use tokio_file::File;
pub(crate) async fn write_from_stream(
path: impl AsRef<Path>,
stream: impl Stream<Item = std::io::Result<Bytes>>,
) -> std::io::Result<()> {
let mut file = File::create(path).await?;
file.write_from_stream(stream).await?;
file.close().await?;
Ok(())
}
pub(crate) async fn write_from_async_read(
path: impl AsRef<Path>,
reader: impl AsyncRead,
) -> std::io::Result<()> {
let mut file = File::create(path).await?;
file.write_from_async_read(reader).await?;
file.close().await?;
Ok(())
}
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
mod tokio_file { mod tokio_file {
use crate::{store::file_store::FileError, Either}; use crate::{store::file_store::FileError, Either};
@ -35,11 +61,6 @@ mod tokio_file {
}) })
} }
pub(crate) async fn write_from_bytes(&mut self, mut bytes: Bytes) -> std::io::Result<()> {
self.inner.write_all_buf(&mut bytes).await?;
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()> pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()>
where where
S: Stream<Item = std::io::Result<Bytes>>, S: Stream<Item = std::io::Result<Bytes>>,
@ -58,13 +79,11 @@ mod tokio_file {
Ok(()) Ok(())
} }
pub(crate) async fn write_from_async_read<R>( pub(crate) async fn write_from_async_read<R>(&mut self, reader: R) -> std::io::Result<()>
&mut self,
mut reader: R,
) -> std::io::Result<()>
where where
R: AsyncRead + Unpin, R: AsyncRead,
{ {
let mut reader = std::pin::pin!(reader);
tokio::io::copy(&mut reader, &mut self.inner).await?; tokio::io::copy(&mut reader, &mut self.inner).await?;
Ok(()) Ok(())
} }
@ -154,36 +173,6 @@ mod io_uring {
tokio::fs::metadata(&self.path).await tokio::fs::metadata(&self.path).await
} }
pub(crate) async fn write_from_bytes(&mut self, mut buf: Bytes) -> std::io::Result<()> {
let len: u64 = buf.len().try_into().unwrap();
let mut cursor: u64 = 0;
loop {
tracing::trace!("write_from_bytes: looping");
if cursor == len {
break;
}
let cursor_usize: usize = cursor.try_into().unwrap();
let (res, slice) = self.inner.write_at(buf.slice(cursor_usize..), cursor).await;
let n: usize = res?;
if n == 0 {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
buf = slice.into_inner();
let n: u64 = n.try_into().unwrap();
cursor += n;
}
self.inner.sync_all().await?;
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()> pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()>
where where
S: Stream<Item = std::io::Result<Bytes>>, S: Stream<Item = std::io::Result<Bytes>>,
@ -232,13 +221,11 @@ mod io_uring {
} }
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn write_from_async_read<R>( pub(crate) async fn write_from_async_read<R>(&mut self, reader: R) -> std::io::Result<()>
&mut self,
mut reader: R,
) -> std::io::Result<()>
where where
R: AsyncRead + Unpin, R: AsyncRead,
{ {
let mut reader = std::pin::pin!(reader);
let mut cursor: u64 = 0; let mut cursor: u64 = 0;
loop { loop {

View file

@ -57,69 +57,72 @@ pub(super) async fn thumbnail<S: Store>(
input_format: InternalVideoFormat, input_format: InternalVideoFormat,
format: ThumbnailFormat, format: ThumbnailFormat,
) -> Result<ProcessRead, FfMpegError> { ) -> Result<ProcessRead, FfMpegError> {
let input_file = state.tmp_dir.tmp_file(Some(input_format.file_extension()));
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(FfMpegError::CreateDir)?;
let output_file = state.tmp_dir.tmp_file(Some(format.to_file_extension())); let output_file = state.tmp_dir.tmp_file(Some(format.to_file_extension()));
crate::store::file_store::safe_create_parent(&output_file) crate::store::file_store::safe_create_parent(&output_file)
.await .await
.map_err(FfMpegError::CreateDir)?; .map_err(FfMpegError::CreateDir)?;
let mut tmp_one = crate::file::File::create(&input_file) let output_path = output_file.as_os_str();
.await
.map_err(FfMpegError::CreateFile)?;
let stream = state
.store
.to_stream(&from, None, None)
.await
.map_err(FfMpegError::Store)?;
tmp_one
.write_from_stream(stream)
.await
.map_err(FfMpegError::Write)?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let process = Process::run( let res = crate::ffmpeg::with_file(
"ffmpeg", &state.tmp_dir,
&[ Some(input_format.file_extension()),
"-hide_banner".as_ref(), |input_file| async move {
"-v".as_ref(), let stream = state
"warning".as_ref(), .store
"-i".as_ref(), .to_stream(&from, None, None)
input_file.as_os_str(), .await
"-frames:v".as_ref(), .map_err(FfMpegError::Store)?;
"1".as_ref(),
"-codec".as_ref(),
format.as_ffmpeg_codec().as_ref(),
"-f".as_ref(),
format.as_ffmpeg_format().as_ref(),
output_file.as_os_str(),
],
&[],
state.config.media.process_timeout,
)?;
let res = process.wait().await; crate::file::write_from_stream(&input_file, stream)
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; .await
res?; .map_err(FfMpegError::Write)?;
let tmp_two = crate::file::File::open(&output_file) Process::run(
.await "ffmpeg",
.map_err(FfMpegError::OpenFile)?; &[
let stream = tmp_two "-hide_banner".as_ref(),
.read_to_stream(None, None) "-v".as_ref(),
.await "warning".as_ref(),
.map_err(FfMpegError::ReadFile)?; "-i".as_ref(),
let reader = tokio_util::io::StreamReader::new(stream); input_file.as_os_str(),
"-frames:v".as_ref(),
"1".as_ref(),
"-codec".as_ref(),
format.as_ffmpeg_codec().as_ref(),
"-f".as_ref(),
format.as_ffmpeg_format().as_ref(),
output_path,
],
&[],
state.config.media.process_timeout,
)?
.wait()
.await
.map_err(FfMpegError::Process)?;
let reader = ProcessRead::new( let out_file = crate::file::File::open(output_path)
Box::pin(reader), .await
Arc::from(String::from("ffmpeg")), .map_err(FfMpegError::OpenFile)?;
Uuid::now_v7(), out_file
.read_to_stream(None, None)
.await
.map_err(FfMpegError::ReadFile)
},
) )
.add_extras(output_file); .await;
Ok(reader) match res {
Ok(Ok(stream)) => Ok(ProcessRead::new(
Box::pin(tokio_util::io::StreamReader::new(stream)),
Arc::from(String::from("ffmpeg")),
Uuid::now_v7(),
)
.add_extras(output_file)),
Ok(Err(e)) | Err(e) => {
output_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
Err(e)
}
}
} }

View file

@ -129,7 +129,6 @@ async fn ensure_details_identifier<S: Store + 'static>(
let details = state.repo.details(identifier).await?; let details = state.repo.details(identifier).await?;
if let Some(details) = details { if let Some(details) = details {
tracing::debug!("details exist");
Ok(details) Ok(details)
} else { } else {
if state.config.server.read_only { if state.config.server.read_only {
@ -140,12 +139,9 @@ async fn ensure_details_identifier<S: Store + 'static>(
))); )));
} }
tracing::debug!("generating new details from {:?}", identifier);
let bytes_stream = state.store.to_bytes(identifier, None, None).await?; let bytes_stream = state.store.to_bytes(identifier, None, None).await?;
let new_details = Details::from_bytes_stream(state, bytes_stream).await?; let new_details = Details::from_bytes_stream(state, bytes_stream).await?;
tracing::debug!("storing details for {:?}", identifier);
state.repo.relate_details(identifier, &new_details).await?; state.repo.relate_details(identifier, &new_details).await?;
tracing::debug!("stored");
Ok(new_details) Ok(new_details)
} }
} }

View file

@ -101,12 +101,6 @@ pub(crate) trait Store: Clone + Debug {
where where
S: Stream<Item = std::io::Result<Bytes>>; S: Stream<Item = std::io::Result<Bytes>>;
async fn save_bytes(
&self,
bytes: Bytes,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError>;
fn public_url(&self, _: &Arc<str>) -> Option<url::Url>; fn public_url(&self, _: &Arc<str>) -> Option<url::Url>;
async fn to_stream( async fn to_stream(
@ -172,14 +166,6 @@ where
T::save_stream(self, stream, content_type).await T::save_stream(self, stream, content_type).await
} }
async fn save_bytes(
&self,
bytes: Bytes,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
T::save_bytes(self, bytes, content_type).await
}
fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> {
T::public_url(self, identifier) T::public_url(self, identifier)
} }
@ -243,14 +229,6 @@ where
T::save_stream(self, stream, content_type).await T::save_stream(self, stream, content_type).await
} }
async fn save_bytes(
&self,
bytes: Bytes,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
T::save_bytes(self, bytes, content_type).await
}
fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> {
T::public_url(self, identifier) T::public_url(self, identifier)
} }
@ -314,14 +292,6 @@ where
T::save_stream(self, stream, content_type).await T::save_stream(self, stream, content_type).await
} }
async fn save_bytes(
&self,
bytes: Bytes,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
T::save_bytes(self, bytes, content_type).await
}
fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> {
T::public_url(self, identifier) T::public_url(self, identifier)
} }

View file

@ -98,22 +98,6 @@ impl Store for FileStore {
.await .await
} }
#[tracing::instrument(skip(self, bytes))]
async fn save_bytes(
&self,
bytes: Bytes,
_content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
let path = self.next_file().await?;
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
self.safe_remove_file(&path).await?;
return Err(e.into());
}
Ok(self.file_id_from_path(path)?)
}
fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> {
None None
} }
@ -251,36 +235,6 @@ impl FileStore {
} }
} }
// Try writing to a file
async fn safe_save_bytes<P: AsRef<Path>>(
&self,
path: P,
bytes: Bytes,
) -> Result<(), FileError> {
safe_create_parent(&path).await?;
// Only write the file if it doesn't already exist
if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Ok(());
}
// Open the file for writing
let mut file = File::create(&path).await?;
// try writing
if let Err(e) = file.write_from_bytes(bytes).await {
// remove file if writing failed before completion
self.safe_remove_file(path).await?;
return Err(e.into());
}
Ok(())
}
async fn safe_save_reader<P: AsRef<Path>>( async fn safe_save_reader<P: AsRef<Path>>(
&self, &self,
to: P, to: P,

View file

@ -303,28 +303,6 @@ impl Store for ObjectStore {
} }
} }
#[tracing::instrument(skip_all)]
async fn save_bytes(
&self,
bytes: Bytes,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
let (req, object_id) = self.put_object_request(bytes.len(), content_type).await?;
let response = req
.body(bytes)
.send()
.with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST)
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response, None).await);
}
Ok(object_id)
}
fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> { fn public_url(&self, identifier: &Arc<str>) -> Option<url::Url> {
self.public_endpoint.clone().and_then(|mut endpoint| { self.public_endpoint.clone().and_then(|mut endpoint| {
endpoint endpoint

View file

@ -18,63 +18,47 @@ pub(super) async fn transcode_bytes(
timeout: u64, timeout: u64,
bytes: BytesStream, bytes: BytesStream,
) -> Result<ProcessRead, FfMpegError> { ) -> Result<ProcessRead, FfMpegError> {
let input_file = tmp_dir.tmp_file(None);
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(FfMpegError::CreateDir)?;
let mut tmp_one = crate::file::File::create(&input_file)
.await
.map_err(FfMpegError::CreateFile)?;
tmp_one
.write_from_stream(bytes.into_io_stream())
.await
.map_err(FfMpegError::Write)?;
tmp_one.close().await.map_err(FfMpegError::CloseFile)?;
let output_file = tmp_dir.tmp_file(None); let output_file = tmp_dir.tmp_file(None);
let output_path = output_file.as_os_str();
let res = async { let res = crate::ffmpeg::with_file(tmp_dir, None, |input_file| async move {
let res = transcode_files( crate::file::write_from_async_read(&input_file, bytes.into_reader())
.await
.map_err(FfMpegError::Write)?;
transcode_files(
input_file.as_os_str(), input_file.as_os_str(),
input_format, input_format,
output_file.as_os_str(), output_path,
output_format, output_format,
crf, crf,
timeout, timeout,
) )
.await; .await?;
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; let tmp_file = crate::file::File::open(output_path)
res?;
let tmp_two = crate::file::File::open(&output_file)
.await .await
.map_err(FfMpegError::OpenFile)?; .map_err(FfMpegError::OpenFile)?;
let stream = tmp_two
tmp_file
.read_to_stream(None, None) .read_to_stream(None, None)
.await .await
.map_err(FfMpegError::ReadFile)?; .map_err(FfMpegError::ReadFile)
Ok(tokio_util::io::StreamReader::new(stream)) })
}
.await; .await;
let reader = match res { match res {
Ok(reader) => reader, Ok(Ok(stream)) => Ok(ProcessRead::new(
Err(e) => { Box::pin(tokio_util::io::StreamReader::new(stream)),
Arc::from(String::from("ffmpeg")),
Uuid::now_v7(),
)
.add_extras(output_file)),
Ok(Err(e)) | Err(e) => {
output_file.cleanup().await.map_err(FfMpegError::Cleanup)?; output_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
return Err(e); Err(e)
} }
}; }
let process_read = ProcessRead::new(
Box::pin(reader),
Arc::from(String::from("ffmpeg")),
Uuid::now_v7(),
)
.add_extras(output_file);
Ok(process_read)
} }
async fn transcode_files( async fn transcode_files(
@ -135,6 +119,14 @@ async fn transcode_files(
args.extend([ args.extend([
"-map_metadata".as_ref(), "-map_metadata".as_ref(),
"-1".as_ref(), "-1".as_ref(),
"-map_metadata:g".as_ref(),
"-1".as_ref(),
"-map_metadata:s".as_ref(),
"-1".as_ref(),
"-map_metadata:c".as_ref(),
"-1".as_ref(),
"-map_metadata:p".as_ref(),
"-1".as_ref(),
"-f".as_ref(), "-f".as_ref(),
output_format.ffmpeg_format().as_ref(), output_format.ffmpeg_format().as_ref(),
output_path, output_path,