Update io-uring to use streem for file bytes

This commit is contained in:
asonix 2023-09-10 23:45:24 -04:00
parent ea75ca24b5
commit 2b10c48619
2 changed files with 36 additions and 99 deletions

View file

@ -113,10 +113,7 @@ mod io_uring {
use std::{ use std::{
convert::TryInto, convert::TryInto,
fs::Metadata, fs::Metadata,
future::Future,
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
}; };
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
@ -319,20 +316,7 @@ mod io_uring {
from_start: Option<u64>, from_start: Option<u64>,
len: Option<u64>, len: Option<u64>,
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, FileError> { ) -> Result<impl Stream<Item = std::io::Result<Bytes>>, FileError> {
let size = self.metadata().await?.len(); Ok(bytes_stream(self, from_start, len))
let cursor = from_start.unwrap_or(0);
let size = len.unwrap_or(size - cursor) + cursor;
Ok(BytesStream {
state: ReadFileState::File {
file: Some(self),
bytes: Some(BytesMut::new()),
},
size,
cursor,
callback: read_file,
})
} }
async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> { async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
@ -344,98 +328,50 @@ mod io_uring {
} }
} }
pin_project_lite::pin_project! { fn bytes_stream(
struct BytesStream<F, Fut> {
#[pin]
state: ReadFileState<Fut>,
size: u64,
cursor: u64,
#[pin]
callback: F,
}
}
pin_project_lite::pin_project! {
#[project = ReadFileStateProj]
#[project_replace = ReadFileStateProjReplace]
enum ReadFileState<Fut> {
File {
file: Option<File>,
bytes: Option<BytesMut>,
},
Future {
#[pin]
fut: Fut,
},
}
}
async fn read_file(
file: File, file: File,
buf: BytesMut, from_start: Option<u64>,
cursor: u64, len: Option<u64>,
) -> (File, BufResult<usize, BytesMut>) { ) -> impl Stream<Item = std::io::Result<Bytes>> {
let buf_res = file.read_at(buf, cursor).await; streem::try_from_fn(|yielder| async move {
let file_size = file.metadata().await?.len();
(file, buf_res) let mut cursor = from_start.unwrap_or(0);
} let remaining_size = file_size.saturating_sub(cursor);
let read_until = len.unwrap_or(remaining_size) + cursor;
impl<F, Fut> Stream for BytesStream<F, Fut> let mut bytes = BytesMut::new();
where
F: Fn(File, BytesMut, u64) -> Fut,
Fut: Future<Output = (File, BufResult<usize, BytesMut>)> + 'static,
{
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { loop {
let mut this = self.as_mut().project(); let max_size = read_until.saturating_sub(cursor);
match this.state.as_mut().project() {
ReadFileStateProj::File { file, bytes } => {
let cursor = *this.cursor;
let max_size = *this.size - *this.cursor;
if max_size == 0 { if max_size == 0 {
return Poll::Ready(None); break;
} }
let capacity = max_size.min(65_356) as usize; let capacity = max_size.min(65_356) as usize;
let mut bytes = bytes.take().unwrap();
let file = file.take().unwrap();
if bytes.capacity() < capacity { if bytes.capacity() < capacity {
bytes.reserve(capacity - bytes.capacity()); bytes.reserve(capacity - bytes.capacity());
} }
let fut = (this.callback)(file, bytes, cursor); let (result, mut buf_) = file.read_at(bytes, cursor).await;
this.state.project_replace(ReadFileState::Future { fut }); let n = match result {
self.poll_next(cx)
}
ReadFileStateProj::Future { fut } => match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready((file, (Ok(n), mut buf))) => {
let bytes = buf.split_off(n);
this.state.project_replace(ReadFileState::File {
file: Some(file),
bytes: Some(bytes),
});
let n: u64 = match n.try_into() {
Ok(n) => n, Ok(n) => n,
Err(_) => { Err(e) => return Err(e),
return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into())))
}
}; };
*this.cursor += n;
Poll::Ready(Some(Ok(buf.into()))) bytes = buf_.split_off(n);
}
Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))), let n: u64 = n.try_into().map_err(|_| std::io::ErrorKind::Other)?;
}, cursor += n;
}
yielder.yield_ok(buf_.into()).await;
} }
Ok(())
})
} }
#[cfg(test)] #[cfg(test)]

View file

@ -58,6 +58,7 @@ where
}) })
} }
#[cfg(not(feature = "io-uring"))]
pub(crate) fn map_ok<S, T1, T2, E, F>(stream: S, f: F) -> impl Stream<Item = Result<T2, E>> pub(crate) fn map_ok<S, T1, T2, E, F>(stream: S, f: F) -> impl Stream<Item = Result<T2, E>>
where where
S: Stream<Item = Result<T1, E>>, S: Stream<Item = Result<T1, E>>,