diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 37f8677..379e088 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -54,9 +54,9 @@ impl Backgrounded { { self.upload_id = Some(self.repo.create_upload().await?); - let stream = Box::pin(crate::stream::map_err(stream, |e| { + let stream = crate::stream::map_err(stream, |e| { std::io::Error::new(std::io::ErrorKind::Other, e) - })); + }); // use octet-stream, we don't know the upload's real type yet let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?; diff --git a/src/ingest.rs b/src/ingest.rs index 0698ea8..8ab486d 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -126,7 +126,7 @@ where Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), }); - let reader = Box::pin(tokio_util::io::StreamReader::new(stream)); + let reader = tokio_util::io::StreamReader::new(stream); let hasher_reader = Hasher::new(reader); let hash_state = hasher_reader.state(); diff --git a/src/store.rs b/src/store.rs index 9d7fcaa..02f4d81 100644 --- a/src/store.rs +++ b/src/store.rs @@ -91,7 +91,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static; + Reader: AsyncRead; async fn save_stream( &self, @@ -99,7 +99,7 @@ pub(crate) trait Store: Clone + Debug { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static; + S: Stream>; async fn save_bytes( &self, @@ -156,7 +156,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -167,7 +167,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -227,7 +227,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -238,7 +238,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } @@ -298,7 +298,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { T::save_async_read(self, reader, content_type).await } @@ -309,7 +309,7 @@ where content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { T::save_stream(self, stream, content_type).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 1883706..ddf1819 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -68,12 +68,14 @@ impl Store for FileStore { #[tracing::instrument(skip(self, reader))] async fn save_async_read( &self, - mut reader: Reader, + reader: Reader, _content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { + let mut reader = std::pin::pin!(reader); + let path = self.next_file().await?; if let Err(e) = self.safe_save_reader(&path, &mut reader).await { @@ -90,7 +92,7 @@ impl Store for FileStore { content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { self.save_async_read(StreamReader::new(stream), content_type) .await diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 354c1b5..093d939 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -170,7 +170,7 @@ fn payload_to_io_error(e: reqwest::Error) -> std::io::Error { #[tracing::instrument(level = "debug", skip(stream))] async fn read_chunk(stream: &mut S) -> Result where - S: Stream> + Unpin + 'static, + S: Stream> + Unpin, { let mut buf = BytesStream::new(); @@ -229,7 +229,7 @@ impl Store for ObjectStore { content_type: mime::Mime, ) -> Result, StoreError> where - Reader: AsyncRead + Unpin + 'static, + Reader: AsyncRead, { self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type) .await @@ -238,12 +238,14 @@ impl Store for ObjectStore { #[tracing::instrument(skip_all)] async fn save_stream( &self, - mut stream: S, + stream: S, content_type: mime::Mime, ) -> Result, StoreError> where - S: Stream> + Unpin + 'static, + S: Stream>, { + let mut stream = std::pin::pin!(stream); + let first_chunk = read_chunk(&mut stream).await?; if first_chunk.len() < CHUNK_SIZE {