diff --git a/src/file.rs b/src/file.rs index dd784f1..83504fb 100644 --- a/src/file.rs +++ b/src/file.rs @@ -37,7 +37,7 @@ mod tokio_file { use futures_core::Stream; use std::{io::SeekFrom, path::Path}; use streem::IntoStreamer; - use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; + use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; pub(crate) struct File { @@ -92,14 +92,6 @@ mod tokio_file { Ok(()) } - pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> - where - W: AsyncWrite + Unpin + ?Sized, - { - tokio::io::copy(&mut self.inner, writer).await?; - Ok(()) - } - pub(crate) async fn read_to_stream( mut self, from_start: Option, @@ -137,7 +129,7 @@ mod io_uring { path::{Path, PathBuf}, }; use streem::IntoStreamer; - use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_uring::{ buf::{IoBuf, IoBufMut}, BufResult, @@ -277,41 +269,6 @@ mod io_uring { self.inner.close().await } - pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> - where - W: AsyncWrite + Unpin + ?Sized, - { - let metadata = self.metadata().await?; - let size = metadata.len(); - - let mut cursor: u64 = 0; - - loop { - tracing::trace!("read_to_async_write: looping"); - - if cursor == size { - break; - } - - let max_size = (size - cursor).min(65_536); - let buf = BytesMut::with_capacity(max_size.try_into().unwrap()); - - let (res, buf): (_, BytesMut) = self.read_at(buf, cursor).await; - let n: usize = res?; - - if n == 0 { - return Err(std::io::ErrorKind::UnexpectedEof.into()); - } - - writer.write_all(&buf[0..n]).await?; - - let n: u64 = n.try_into().unwrap(); - cursor += n; - } - - Ok(()) - } - pub(crate) async fn read_to_stream( self, from_start: Option, @@ -380,6 +337,8 @@ mod io_uring { #[cfg(test)] mod tests { use std::io::Read; + use streem::IntoStreamer; + use tokio::io::AsyncWriteExt; macro_rules! test_async { ($fut:expr) => { @@ -395,9 +354,16 @@ mod io_uring { let tmp = "/tmp/read-test"; test_async!(async move { - let mut file = super::File::open(EARTH_GIF).await.unwrap(); + let file = super::File::open(EARTH_GIF).await.unwrap(); let mut tmp_file = tokio::fs::File::create(tmp).await.unwrap(); - file.read_to_async_write(&mut tmp_file).await.unwrap(); + + let stream = file.read_to_stream(None, None).await.unwrap(); + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); + + while let Some(mut bytes) = stream.try_next().await.unwrap() { + tmp_file.write_all_buf(&mut bytes).await.unwrap(); + } }); let mut source = std::fs::File::open(EARTH_GIF).unwrap(); diff --git a/src/store.rs b/src/store.rs index b6b669d..88cb9f8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,7 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; @@ -125,14 +125,6 @@ pub(crate) trait Store: Clone + Debug { .map_err(StoreError::ReadStream) } - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin; - async fn len(&self, identifier: &Arc) -> Result; async fn remove(&self, identifier: &Arc) -> Result<(), StoreError>; @@ -183,17 +175,6 @@ where T::to_stream(self, identifier, from_start, len).await } - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin, - { - T::read_into(self, identifier, writer).await - } - async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } @@ -248,17 +229,6 @@ where T::to_stream(self, identifier, from_start, len).await } - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin, - { - T::read_into(self, identifier, writer).await - } - async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } @@ -313,17 +283,6 @@ where T::to_stream(self, identifier, from_start, len).await } - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin, - { - T::read_into(self, identifier, writer).await - } - async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 77143ce..501df56 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -5,7 +5,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; use tokio_util::io::StreamReader; use tracing::Instrument; @@ -114,22 +114,6 @@ impl Store for FileStore { Ok(Box::pin(stream)) } - #[tracing::instrument(skip(self, writer))] - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin, - { - let path = self.path_from_file_id(identifier); - - File::open(&path).await?.read_to_async_write(writer).await?; - - Ok(()) - } - #[tracing::instrument(skip(self))] async fn len(&self, identifier: &Arc) -> Result { let path = self.path_from_file_id(identifier); diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 3d6ed83..e272be5 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -21,7 +21,7 @@ use rusty_s3::{ }; use std::{string::FromUtf8Error, sync::Arc, time::Duration}; use streem::IntoStreamer; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::io::AsyncRead; use tokio_util::io::ReaderStream; use tracing::Instrument; use url::Url; @@ -331,46 +331,6 @@ impl Store for ObjectStore { ))) } - #[tracing::instrument(skip(self, writer))] - async fn read_into( - &self, - identifier: &Arc, - writer: &mut Writer, - ) -> Result<(), std::io::Error> - where - Writer: AsyncWrite + Unpin, - { - let response = self - .get_object_request(identifier, None, None) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST) - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; - - if !response.status().is_success() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - status_error(response, Some(identifier.clone())).await, - )); - } - - let stream = std::pin::pin!(crate::stream::metrics( - crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST_STREAM, - response.bytes_stream() - )); - let mut stream = stream.into_streamer(); - - while let Some(res) = stream.next().await { - tracing::trace!("read_into: looping"); - - let mut bytes = res.map_err(payload_to_io_error)?; - writer.write_all_buf(&mut bytes).await?; - } - writer.flush().await?; - - Ok(()) - } - #[tracing::instrument(skip(self))] async fn len(&self, identifier: &Arc) -> Result { let response = self