diff --git a/Cargo.lock b/Cargo.lock index cfdb278..7d2439d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1743,6 +1743,7 @@ dependencies = [ "console-subscriber", "dashmap", "flume", + "futures-core", "futures-util", "hex", "md-5", diff --git a/Cargo.toml b/Cargo.toml index 2772d3e..635827c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,8 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" flume = "0.11.0" -futures-util = "0.3.17" +futures-core = "0.3" +futures-util = { version = "0.3.17", default-features = false } hex = "0.4.3" md-5 = "0.10.5" metrics = "0.21.1" diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 9d2158b..dd673d0 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -4,7 +4,8 @@ use crate::{ store::Store, }; use actix_web::web::Bytes; -use futures_util::{Stream, TryStreamExt}; +use futures_core::Stream; +use futures_util::TryStreamExt; use mime::APPLICATION_OCTET_STREAM; use tracing::{Instrument, Span}; diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 8f13b4f..ae07007 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -2,7 +2,7 @@ use actix_web::{ body::MessageBody, web::{Bytes, BytesMut}, }; -use futures_util::Stream; +use futures_core::Stream; use std::{ collections::{vec_deque::IntoIter, VecDeque}, convert::Infallible, diff --git a/src/discover/ffmpeg.rs b/src/discover/ffmpeg.rs index 10fd88f..50d3624 100644 --- a/src/discover/ffmpeg.rs +++ b/src/discover/ffmpeg.rs @@ -12,7 +12,7 @@ use crate::{ process::Process, }; use actix_web::web::Bytes; -use futures_util::Stream; +use futures_core::Stream; use tokio::io::AsyncReadExt; use super::{Discovery, DiscoveryLite}; diff --git a/src/discover/magick.rs b/src/discover/magick.rs index ffadd3e..88df362 100644 --- a/src/discover/magick.rs +++ b/src/discover/magick.rs @@ -2,7 +2,7 @@ mod tests; use actix_web::web::Bytes; -use futures_util::Stream; +use futures_core::Stream; use tokio::io::AsyncReadExt; use crate::{ diff --git a/src/either.rs b/src/either.rs index 97bef7a..0bb6d3b 100644 --- a/src/either.rs +++ b/src/either.rs @@ -1,4 +1,4 @@ -use futures_util::stream::Stream; +use futures_core::Stream; use std::{ pin::Pin, task::{Context, Poll}, diff --git a/src/file.rs b/src/file.rs index 6ac8b1e..50df3f2 100644 --- a/src/file.rs +++ b/src/file.rs @@ -6,9 +6,10 @@ pub(crate) use tokio_file::File; #[cfg(not(feature = "io-uring"))] mod tokio_file { - use crate::{store::file_store::FileError, Either}; + use crate::{store::file_store::FileError, stream::IntoStreamer, Either}; use actix_web::web::{Bytes, BytesMut}; - use futures_util::{Stream, StreamExt, TryStreamExt}; + use futures_core::Stream; + use futures_util::TryStreamExt; use std::{io::SeekFrom, path::Path}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -43,7 +44,8 @@ mod tokio_file { where S: Stream>, { - futures_util::pin_mut!(stream); + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); while let Some(res) = stream.next().await { let mut bytes = res?; @@ -102,9 +104,9 @@ mod tokio_file { #[cfg(feature = "io-uring")] mod io_uring { - use crate::store::file_store::FileError; + use crate::{store::file_store::FileError, stream::IntoStreamer}; use actix_web::web::{Bytes, BytesMut}; - use futures_util::stream::{Stream, StreamExt}; + use futures_core::Stream; use std::{ convert::TryInto, fs::Metadata, @@ -181,7 +183,8 @@ mod io_uring { where S: Stream>, { - futures_util::pin_mut!(stream); + let stream = std::pin::pin!(stream); + let mut stream = stream.into_streamer(); let mut cursor: u64 = 0; while let Some(res) = stream.next().await { diff --git a/src/ingest.rs b/src/ingest.rs index 7228de2..938e277 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -5,9 +5,10 @@ use crate::{ formats::{InternalFormat, Validations}, repo::{Alias, ArcRepo, DeleteToken, Hash}, store::Store, + stream::IntoStreamer, }; use actix_web::web::Bytes; -use futures_util::{Stream, StreamExt}; +use futures_core::Stream; use tracing::{Instrument, Span}; mod hasher; @@ -26,12 +27,14 @@ where } #[tracing::instrument(skip(stream))] -async fn aggregate(mut stream: S) -> Result +async fn aggregate(stream: S) -> Result where S: Stream> + Unpin, { let mut buf = BytesStream::new(); + let mut stream = stream.into_streamer(); + while let Some(res) = stream.next().await { buf.add_bytes(res?); } diff --git a/src/lib.rs b/src/lib.rs index 35723bf..d473542 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,10 +34,8 @@ use actix_web::{ http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; -use futures_util::{ - stream::{empty, once}, - Stream, StreamExt, TryStreamExt, -}; +use futures_core::Stream; +use futures_util::{StreamExt, TryStreamExt}; use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; use once_cell::sync::Lazy; @@ -46,7 +44,6 @@ use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; use std::{ - future::ready, path::Path, path::PathBuf, sync::Arc, @@ -72,7 +69,7 @@ use self::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, - stream::{StreamLimit, StreamTimeout}, + stream::{empty, once, StreamLimit, StreamTimeout}, }; pub use self::config::{ConfigSource, PictRsConfiguration}; @@ -845,12 +842,9 @@ async fn process( return Err(UploadError::Range.into()); } } else if not_found { - ( - HttpResponse::NotFound(), - Either::right(once(ready(Ok(bytes)))), - ) + (HttpResponse::NotFound(), Either::right(once(Ok(bytes)))) } else { - (HttpResponse::Ok(), Either::right(once(ready(Ok(bytes))))) + (HttpResponse::Ok(), Either::right(once(Ok(bytes)))) }; Ok(srv_response( diff --git a/src/migrate_store.rs b/src/migrate_store.rs index ab6978d..5be16b9 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -1,4 +1,3 @@ -use futures_util::StreamExt; use std::{ rc::Rc, sync::atomic::{AtomicU64, Ordering}, @@ -10,6 +9,7 @@ use crate::{ error::{Error, UploadError}, repo::{ArcRepo, Hash}, store::{Identifier, Store}, + stream::IntoStreamer, }; pub(super) async fn migrate_store( @@ -103,8 +103,7 @@ where } // Hashes are read in a consistent order - let stream = repo.hashes().await; - let mut stream = Box::pin(stream); + let mut stream = repo.hashes().await.into_streamer(); let state = Rc::new(MigrateState { repo: repo.clone(), diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index ea9723f..eaf6e69 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -5,8 +5,8 @@ use crate::{ repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, store::{Identifier, Store}, + stream::IntoStreamer, }; -use futures_util::StreamExt; pub(super) fn perform<'a, S>( repo: &'a ArcRepo, @@ -136,7 +136,7 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E #[tracing::instrument(skip_all)] async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { - let mut hash_stream = Box::pin(repo.hashes().await); + let mut hash_stream = repo.hashes().await.into_streamer(); while let Some(res) = hash_stream.next().await { let hash = res?; @@ -151,7 +151,7 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.variants.to_duration()); - let mut variant_stream = Box::pin(repo.older_variants(since).await?); + let mut variant_stream = repo.older_variants(since).await?.into_streamer(); while let Some(res) = variant_stream.next().await { let (hash, variant) = res?; @@ -166,7 +166,7 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.proxy.to_duration()); - let mut alias_stream = Box::pin(repo.older_aliases(since).await?); + let mut alias_stream = repo.older_aliases(since).await?.into_streamer(); while let Some(res) = alias_stream.next().await { let alias = res?; diff --git a/src/range.rs b/src/range.rs index dfcfc35..976f384 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,13 +1,13 @@ use crate::{ error::{Error, UploadError}, store::Store, + stream::once, }; use actix_web::{ http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range}, web::Bytes, }; -use futures_util::stream::{once, Stream}; -use std::future::ready; +use futures_core::Stream; pub(crate) fn chop_bytes( byte_range: &ByteRangeSpec, @@ -17,7 +17,7 @@ pub(crate) fn chop_bytes( if let Some((start, end)) = byte_range.to_satisfiable_range(length) { // END IS INCLUSIVE let end = end as usize + 1; - return Ok(once(ready(Ok(bytes.slice(start as usize..end))))); + return Ok(once(Ok(bytes.slice(start as usize..end)))); } Err(UploadError::Range.into()) diff --git a/src/repo_04.rs b/src/repo_04.rs index 03160af..3e0f9e3 100644 --- a/src/repo_04.rs +++ b/src/repo_04.rs @@ -4,7 +4,7 @@ use crate::{ repo::{Alias, DeleteToken}, store::{Identifier, StoreError}, }; -use futures_util::Stream; +use futures_core::Stream; use std::fmt::Debug; pub(crate) use self::sled::SledRepo; diff --git a/src/store.rs b/src/store.rs index e1704d2..248138e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,6 @@ use actix_web::web::Bytes; use base64::{prelude::BASE64_STANDARD, Engine}; -use futures_util::stream::Stream; +use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 67a3578..860fb52 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -4,7 +4,7 @@ use crate::{ store::Store, }; use actix_web::web::Bytes; -use futures_util::stream::Stream; +use futures_core::Stream; use std::{ path::{Path, PathBuf}, pin::Pin, diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 15d453f..1847eed 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -2,6 +2,7 @@ use crate::{ bytes_stream::BytesStream, repo::{Repo, SettingsRepo}, store::Store, + stream::IntoStreamer, }; use actix_rt::task::JoinError; use actix_web::{ @@ -13,7 +14,8 @@ use actix_web::{ web::Bytes, }; use base64::{prelude::BASE64_STANDARD, Engine}; -use futures_util::{Stream, StreamExt, TryStreamExt}; +use futures_core::Stream; +use futures_util::TryStreamExt; use reqwest::{header::RANGE, Body, Response}; use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; @@ -143,6 +145,8 @@ where { let mut buf = BytesStream::new(); + let mut stream = stream.into_streamer(); + while buf.len() < CHUNK_SIZE { if let Some(res) = stream.next().await { buf.add_bytes(res?) @@ -404,7 +408,7 @@ impl Store for ObjectStore { )); } - let mut stream = response.bytes_stream(); + let mut stream = response.bytes_stream().into_streamer(); while let Some(res) = stream.next().await { let mut bytes = res.map_err(payload_to_io_error)?; diff --git a/src/stream.rs b/src/stream.rs index a1f1024..3ebe13d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,8 +1,9 @@ use actix_rt::{task::JoinHandle, time::Sleep}; use actix_web::web::Bytes; -use futures_util::Stream; +use futures_core::Stream; use std::{ future::Future, + marker::PhantomData, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -12,6 +13,37 @@ use std::{ time::Duration, }; +pub(crate) struct Empty(PhantomData); + +impl Stream for Empty { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } +} + +pub(crate) fn empty() -> Empty { + Empty(PhantomData) +} + +pub(crate) struct Once(Option); + +impl Stream for Once +where + T: Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.0.take()) + } +} + +pub(crate) fn once(value: T) -> Once { + Once(Some(value)) +} + pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; pub(crate) trait StreamLimit {