diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index e9418be..0b3ccc3 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -176,28 +176,34 @@ pub(super) async fn discover_bytes_stream( state: &State, bytes: BytesStream, ) -> Result, FfMpegError> { - let res = Process::run( - "ffprobe", - &[ - "-v", - "quiet", - "-count_frames", - "-show_entries", - "stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name", - "-of", - "default=noprint_wrappers=1:nokey=1", - "-print_format", - "json", - "-", - ], - &[], - state.config.media.process_timeout, - )? - .drive_with_async_read(bytes.into_reader()) - .into_vec() - .await; + let output = crate::ffmpeg::with_file(&state.tmp_dir, None, |path| async move { + crate::file::write_from_async_read(&path, bytes.into_reader()) + .await + .map_err(FfMpegError::Write)?; - let output = res?; + Process::run( + "ffprobe", + &[ + "-v".as_ref(), + "quiet".as_ref(), + "-count_frames".as_ref(), + "-show_entries".as_ref(), + "stream=width,height,nb_read_frames,codec_name,pix_fmt:format=format_name".as_ref(), + "-of".as_ref(), + "default=noprint_wrappers=1:nokey=1".as_ref(), + "-print_format".as_ref(), + "json".as_ref(), + path.as_os_str(), + ], + &[], + state.config.media.process_timeout, + )? + .read() + .into_vec() + .await + .map_err(FfMpegError::Process) + }) + .await??; let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index cd11b8f..c531774 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,4 +1,8 @@ -use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError}; +use std::ffi::OsString; + +use futures_core::Future; + +use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError, tmp_file::TmpDir}; #[derive(Debug, thiserror::Error)] pub(crate) enum FfMpegError { @@ -20,12 +24,6 @@ pub(crate) enum FfMpegError { #[error("Error opening file")] OpenFile(#[source] std::io::Error), - #[error("Error creating file")] - CreateFile(#[source] std::io::Error), - - #[error("Error closing file")] - CloseFile(#[source] std::io::Error), - #[error("Error cleaning up after command")] Cleanup(#[source] std::io::Error), @@ -56,9 +54,7 @@ impl FfMpegError { | Self::CreateDir(_) | Self::ReadFile(_) | Self::OpenFile(_) - | Self::Cleanup(_) - | Self::CreateFile(_) - | Self::CloseFile(_) => ErrorCode::COMMAND_ERROR, + | Self::Cleanup(_) => ErrorCode::COMMAND_ERROR, } } @@ -78,3 +74,25 @@ impl FfMpegError { false } } + +pub(crate) async fn with_file( + tmp: &TmpDir, + ext: Option<&str>, + f: F, +) -> Result +where + F: FnOnce(OsString) -> Fut, + Fut: Future, +{ + let file = tmp.tmp_file(ext); + + crate::store::file_store::safe_create_parent(&file) + .await + .map_err(FfMpegError::CreateDir)?; + + let res = (f)(file.as_os_str().to_os_string()).await; + + file.cleanup().await.map_err(FfMpegError::Cleanup)?; + + Ok(res) +} diff --git a/src/file.rs b/src/file.rs index 9b958fc..dd784f1 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,9 +1,35 @@ +use std::path::Path; + +use futures_core::Stream; +use tokio::io::AsyncRead; +use tokio_util::bytes::Bytes; + #[cfg(feature = "io-uring")] pub(crate) use io_uring::File; #[cfg(not(feature = "io-uring"))] pub(crate) use tokio_file::File; +pub(crate) async fn write_from_stream( + path: impl AsRef, + stream: impl Stream>, +) -> std::io::Result<()> { + let mut file = File::create(path).await?; + file.write_from_stream(stream).await?; + file.close().await?; + Ok(()) +} + +pub(crate) async fn write_from_async_read( + path: impl AsRef, + reader: impl AsyncRead, +) -> std::io::Result<()> { + let mut file = File::create(path).await?; + file.write_from_async_read(reader).await?; + file.close().await?; + Ok(()) +} + #[cfg(not(feature = "io-uring"))] mod tokio_file { use crate::{store::file_store::FileError, Either}; @@ -35,11 +61,6 @@ mod tokio_file { }) } - pub(crate) async fn write_from_bytes(&mut self, mut bytes: Bytes) -> std::io::Result<()> { - self.inner.write_all_buf(&mut bytes).await?; - Ok(()) - } - pub(crate) async fn write_from_stream(&mut self, stream: S) -> std::io::Result<()> where S: Stream>, @@ -58,13 +79,11 @@ mod tokio_file { Ok(()) } - pub(crate) async fn write_from_async_read( - &mut self, - mut reader: R, - ) -> std::io::Result<()> + pub(crate) async fn write_from_async_read(&mut self, reader: R) -> std::io::Result<()> where - R: AsyncRead + Unpin, + R: AsyncRead, { + let mut reader = std::pin::pin!(reader); tokio::io::copy(&mut reader, &mut self.inner).await?; Ok(()) } @@ -154,36 +173,6 @@ mod io_uring { tokio::fs::metadata(&self.path).await } - pub(crate) async fn write_from_bytes(&mut self, mut buf: Bytes) -> std::io::Result<()> { - let len: u64 = buf.len().try_into().unwrap(); - - let mut cursor: u64 = 0; - - loop { - tracing::trace!("write_from_bytes: looping"); - - if cursor == len { - break; - } - - let cursor_usize: usize = cursor.try_into().unwrap(); - let (res, slice) = self.inner.write_at(buf.slice(cursor_usize..), cursor).await; - let n: usize = res?; - - if n == 0 { - return Err(std::io::ErrorKind::UnexpectedEof.into()); - } - - buf = slice.into_inner(); - let n: u64 = n.try_into().unwrap(); - cursor += n; - } - - self.inner.sync_all().await?; - - Ok(()) - } - pub(crate) async fn write_from_stream(&mut self, stream: S) -> std::io::Result<()> where S: Stream>, @@ -232,13 +221,11 @@ mod io_uring { } #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn write_from_async_read( - &mut self, - mut reader: R, - ) -> std::io::Result<()> + pub(crate) async fn write_from_async_read(&mut self, reader: R) -> std::io::Result<()> where - R: AsyncRead + Unpin, + R: AsyncRead, { + let mut reader = std::pin::pin!(reader); let mut cursor: u64 = 0; loop { diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs index 39976be..a4e247a 100644 --- a/src/generate/ffmpeg.rs +++ b/src/generate/ffmpeg.rs @@ -57,69 +57,72 @@ pub(super) async fn thumbnail( input_format: InternalVideoFormat, format: ThumbnailFormat, ) -> Result { - let input_file = state.tmp_dir.tmp_file(Some(input_format.file_extension())); - crate::store::file_store::safe_create_parent(&input_file) - .await - .map_err(FfMpegError::CreateDir)?; - let output_file = state.tmp_dir.tmp_file(Some(format.to_file_extension())); + crate::store::file_store::safe_create_parent(&output_file) .await .map_err(FfMpegError::CreateDir)?; - let mut tmp_one = crate::file::File::create(&input_file) - .await - .map_err(FfMpegError::CreateFile)?; - let stream = state - .store - .to_stream(&from, None, None) - .await - .map_err(FfMpegError::Store)?; - tmp_one - .write_from_stream(stream) - .await - .map_err(FfMpegError::Write)?; - tmp_one.close().await.map_err(FfMpegError::CloseFile)?; + let output_path = output_file.as_os_str(); - let process = Process::run( - "ffmpeg", - &[ - "-hide_banner".as_ref(), - "-v".as_ref(), - "warning".as_ref(), - "-i".as_ref(), - input_file.as_os_str(), - "-frames:v".as_ref(), - "1".as_ref(), - "-codec".as_ref(), - format.as_ffmpeg_codec().as_ref(), - "-f".as_ref(), - format.as_ffmpeg_format().as_ref(), - output_file.as_os_str(), - ], - &[], - state.config.media.process_timeout, - )?; + let res = crate::ffmpeg::with_file( + &state.tmp_dir, + Some(input_format.file_extension()), + |input_file| async move { + let stream = state + .store + .to_stream(&from, None, None) + .await + .map_err(FfMpegError::Store)?; - let res = process.wait().await; - input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; - res?; + crate::file::write_from_stream(&input_file, stream) + .await + .map_err(FfMpegError::Write)?; - let tmp_two = crate::file::File::open(&output_file) - .await - .map_err(FfMpegError::OpenFile)?; - let stream = tmp_two - .read_to_stream(None, None) - .await - .map_err(FfMpegError::ReadFile)?; - let reader = tokio_util::io::StreamReader::new(stream); + Process::run( + "ffmpeg", + &[ + "-hide_banner".as_ref(), + "-v".as_ref(), + "warning".as_ref(), + "-i".as_ref(), + input_file.as_os_str(), + "-frames:v".as_ref(), + "1".as_ref(), + "-codec".as_ref(), + format.as_ffmpeg_codec().as_ref(), + "-f".as_ref(), + format.as_ffmpeg_format().as_ref(), + output_path, + ], + &[], + state.config.media.process_timeout, + )? + .wait() + .await + .map_err(FfMpegError::Process)?; - let reader = ProcessRead::new( - Box::pin(reader), - Arc::from(String::from("ffmpeg")), - Uuid::now_v7(), + let out_file = crate::file::File::open(output_path) + .await + .map_err(FfMpegError::OpenFile)?; + out_file + .read_to_stream(None, None) + .await + .map_err(FfMpegError::ReadFile) + }, ) - .add_extras(output_file); + .await; - Ok(reader) + match res { + Ok(Ok(stream)) => Ok(ProcessRead::new( + Box::pin(tokio_util::io::StreamReader::new(stream)), + Arc::from(String::from("ffmpeg")), + Uuid::now_v7(), + ) + .add_extras(output_file)), + Ok(Err(e)) | Err(e) => { + output_file.cleanup().await.map_err(FfMpegError::Cleanup)?; + Err(e) + } + } } diff --git a/src/lib.rs b/src/lib.rs index c9d9a26..ef281e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,7 +129,6 @@ async fn ensure_details_identifier( let details = state.repo.details(identifier).await?; if let Some(details) = details { - tracing::debug!("details exist"); Ok(details) } else { if state.config.server.read_only { @@ -140,12 +139,9 @@ async fn ensure_details_identifier( ))); } - tracing::debug!("generating new details from {:?}", identifier); let bytes_stream = state.store.to_bytes(identifier, None, None).await?; let new_details = Details::from_bytes_stream(state, bytes_stream).await?; - tracing::debug!("storing details for {:?}", identifier); state.repo.relate_details(identifier, &new_details).await?; - tracing::debug!("stored"); Ok(new_details) } } diff --git a/src/store.rs b/src/store.rs index 02f4d81..e904216 100644 --- a/src/store.rs +++ b/src/store.rs @@ -101,12 +101,6 @@ pub(crate) trait Store: Clone + Debug { where S: Stream>; - async fn save_bytes( - &self, - bytes: Bytes, - content_type: mime::Mime, - ) -> Result, StoreError>; - fn public_url(&self, _: &Arc) -> Option; async fn to_stream( @@ -172,14 +166,6 @@ where T::save_stream(self, stream, content_type).await } - async fn save_bytes( - &self, - bytes: Bytes, - content_type: mime::Mime, - ) -> Result, StoreError> { - T::save_bytes(self, bytes, content_type).await - } - fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } @@ -243,14 +229,6 @@ where T::save_stream(self, stream, content_type).await } - async fn save_bytes( - &self, - bytes: Bytes, - content_type: mime::Mime, - ) -> Result, StoreError> { - T::save_bytes(self, bytes, content_type).await - } - fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } @@ -314,14 +292,6 @@ where T::save_stream(self, stream, content_type).await } - async fn save_bytes( - &self, - bytes: Bytes, - content_type: mime::Mime, - ) -> Result, StoreError> { - T::save_bytes(self, bytes, content_type).await - } - fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index ddf1819..c53a987 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -98,22 +98,6 @@ impl Store for FileStore { .await } - #[tracing::instrument(skip(self, bytes))] - async fn save_bytes( - &self, - bytes: Bytes, - _content_type: mime::Mime, - ) -> Result, StoreError> { - let path = self.next_file().await?; - - if let Err(e) = self.safe_save_bytes(&path, bytes).await { - self.safe_remove_file(&path).await?; - return Err(e.into()); - } - - Ok(self.file_id_from_path(path)?) - } - fn public_url(&self, _identifier: &Arc) -> Option { None } @@ -251,36 +235,6 @@ impl FileStore { } } - // Try writing to a file - async fn safe_save_bytes>( - &self, - path: P, - bytes: Bytes, - ) -> Result<(), FileError> { - safe_create_parent(&path).await?; - - // Only write the file if it doesn't already exist - if let Err(e) = tokio::fs::metadata(&path).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Ok(()); - } - - // Open the file for writing - let mut file = File::create(&path).await?; - - // try writing - if let Err(e) = file.write_from_bytes(bytes).await { - // remove file if writing failed before completion - self.safe_remove_file(path).await?; - return Err(e.into()); - } - - Ok(()) - } - async fn safe_save_reader>( &self, to: P, diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a119d97..a145eab 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -303,28 +303,6 @@ impl Store for ObjectStore { } } - #[tracing::instrument(skip_all)] - async fn save_bytes( - &self, - bytes: Bytes, - content_type: mime::Mime, - ) -> Result, StoreError> { - let (req, object_id) = self.put_object_request(bytes.len(), content_type).await?; - - let response = req - .body(bytes) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - Ok(object_id) - } - fn public_url(&self, identifier: &Arc) -> Option { self.public_endpoint.clone().and_then(|mut endpoint| { endpoint diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 53bb1cb..90e6327 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -18,63 +18,47 @@ pub(super) async fn transcode_bytes( timeout: u64, bytes: BytesStream, ) -> Result { - let input_file = tmp_dir.tmp_file(None); - crate::store::file_store::safe_create_parent(&input_file) - .await - .map_err(FfMpegError::CreateDir)?; - - let mut tmp_one = crate::file::File::create(&input_file) - .await - .map_err(FfMpegError::CreateFile)?; - tmp_one - .write_from_stream(bytes.into_io_stream()) - .await - .map_err(FfMpegError::Write)?; - tmp_one.close().await.map_err(FfMpegError::CloseFile)?; - let output_file = tmp_dir.tmp_file(None); + let output_path = output_file.as_os_str(); - let res = async { - let res = transcode_files( + let res = crate::ffmpeg::with_file(tmp_dir, None, |input_file| async move { + crate::file::write_from_async_read(&input_file, bytes.into_reader()) + .await + .map_err(FfMpegError::Write)?; + + transcode_files( input_file.as_os_str(), input_format, - output_file.as_os_str(), + output_path, output_format, crf, timeout, ) - .await; + .await?; - input_file.cleanup().await.map_err(FfMpegError::Cleanup)?; - res?; - - let tmp_two = crate::file::File::open(&output_file) + let tmp_file = crate::file::File::open(output_path) .await .map_err(FfMpegError::OpenFile)?; - let stream = tmp_two + + tmp_file .read_to_stream(None, None) .await - .map_err(FfMpegError::ReadFile)?; - Ok(tokio_util::io::StreamReader::new(stream)) - } + .map_err(FfMpegError::ReadFile) + }) .await; - let reader = match res { - Ok(reader) => reader, - Err(e) => { + match res { + Ok(Ok(stream)) => Ok(ProcessRead::new( + Box::pin(tokio_util::io::StreamReader::new(stream)), + Arc::from(String::from("ffmpeg")), + Uuid::now_v7(), + ) + .add_extras(output_file)), + Ok(Err(e)) | Err(e) => { output_file.cleanup().await.map_err(FfMpegError::Cleanup)?; - return Err(e); + Err(e) } - }; - - let process_read = ProcessRead::new( - Box::pin(reader), - Arc::from(String::from("ffmpeg")), - Uuid::now_v7(), - ) - .add_extras(output_file); - - Ok(process_read) + } } async fn transcode_files( @@ -135,6 +119,14 @@ async fn transcode_files( args.extend([ "-map_metadata".as_ref(), "-1".as_ref(), + "-map_metadata:g".as_ref(), + "-1".as_ref(), + "-map_metadata:s".as_ref(), + "-1".as_ref(), + "-map_metadata:c".as_ref(), + "-1".as_ref(), + "-map_metadata:p".as_ref(), + "-1".as_ref(), "-f".as_ref(), output_format.ffmpeg_format().as_ref(), output_path,