Rewrite to avoid direct AsyncX impls

This commit is contained in:
Aode (Lion) 2021-10-13 19:06:53 -05:00
parent 2318fd9dca
commit 09cb2a53b0
9 changed files with 399 additions and 491 deletions

26
Cargo.lock generated
View file

@ -1282,9 +1282,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.29" version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" checksum = "edc3358ebc67bc8b7fa0c007f945b0b18226f78437d61bec735a9eb96b61ee70"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
@ -1448,6 +1448,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rio"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e98c25665909853c07874301124482754434520ab572ac6a22e90366de6685b"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.2.3" version = "0.2.3"
@ -1617,9 +1626,9 @@ dependencies = [
[[package]] [[package]]
name = "sharded-slab" name = "sharded-slab"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740223c51853f3145fe7c90360d2d4232f2b62e3449489c207eccde818979982" checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
] ]
@ -1635,9 +1644,9 @@ dependencies = [
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.4" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]] [[package]]
name = "sled" name = "sled"
@ -1653,6 +1662,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"parking_lot", "parking_lot",
"rio",
] ]
[[package]] [[package]]
@ -1923,9 +1933,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.4.1" version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "154794c8f499c2619acd19e839294703e9e32e7630ef5f46ea80d4ef0fbee5eb" checksum = "b2dd85aeaba7b68df939bd357c6afb36c87951be9e80bf9c859f2fc3e9fca0fd"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View file

@ -11,7 +11,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = [] default = []
io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring"] io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring", "sled/io_uring"]
[dependencies] [dependencies]
actix-form-data = "0.6.0-beta.1" actix-form-data = "0.6.0-beta.1"

43
src/either.rs Normal file
View file

@ -0,0 +1,43 @@
use futures_util::stream::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};
pub(crate) enum Either<Left, Right> {
Left(Left),
Right(Right),
}
impl<Left, Right> AsyncRead for Either<Left, Right>
where
Left: AsyncRead + Unpin,
Right: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match *self {
Self::Left(ref mut left) => Pin::new(left).poll_read(cx, buf),
Self::Right(ref mut right) => Pin::new(right).poll_read(cx, buf),
}
}
}
impl<Left, Right> Stream for Either<Left, Right>
where
Left: Stream<Item = <Right as Stream>::Item> + Unpin,
Right: Stream + Unpin,
{
type Item = <Left as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
Self::Left(ref mut left) => Pin::new(left).poll_next(cx),
Self::Right(ref mut right) => Pin::new(right).poll_next(cx),
}
}
}

View file

@ -1,52 +1,159 @@
use futures_util::stream::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
pub(crate) use io_uring::File; pub(crate) use io_uring::File;
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
pub(crate) use tokio::fs::File; pub(crate) use tokio_file::File;
struct CrateError<S>(S);
impl<T, E, S> Stream for CrateError<S>
where
S: Stream<Item = Result<T, E>> + Unpin,
crate::error::Error: From<E>,
{
type Item = Result<T, crate::error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
}
#[cfg(not(feature = "io-uring"))]
mod tokio_file {
use crate::Either;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::Stream;
use std::{fs::Metadata, io::SeekFrom, path::Path};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead};
pub(crate) struct File {
inner: tokio::fs::File,
}
impl File {
pub(crate) async fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
Ok(File {
inner: tokio::fs::File::open(path).await?,
})
}
pub(crate) async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> {
Ok(File {
inner: tokio::fs::File::create(path).await?,
})
}
pub(crate) async fn metadata(&self) -> std::io::Result<Metadata> {
self.inner.metadata().await
}
pub(crate) async fn write_from_bytes<'a>(
&'a mut self,
mut bytes: Bytes,
) -> std::io::Result<()> {
self.inner.write_all_buf(&mut bytes).await?;
Ok(())
}
pub(crate) async fn write_from_async_read<'a, R>(
&'a mut self,
mut reader: R,
) -> std::io::Result<()>
where
R: AsyncRead + Unpin,
{
tokio::io::copy(&mut reader, &mut self.inner).await?;
Ok(())
}
pub(crate) async fn read_to_async_write<'a, W>(
&'a mut self,
writer: &'a mut W,
) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
tokio::io::copy(&mut self.inner, writer).await?;
Ok(())
}
pub(crate) async fn read_to_stream(
mut self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<
impl Stream<Item = Result<Bytes, crate::error::Error>> + Unpin,
crate::error::Error,
> {
let obj = match (from_start, len) {
(Some(lower), Some(upper)) => {
self.inner.seek(SeekFrom::Start(lower)).await?;
Either::Left(self.inner.take(upper))
}
(None, Some(upper)) => Either::Left(self.inner.take(upper)),
(Some(lower), None) => {
self.inner.seek(SeekFrom::Start(lower)).await?;
Either::Right(self.inner)
}
(None, None) => Either::Right(self.inner),
};
Ok(super::CrateError(BytesFreezer(FramedRead::new(
obj,
BytesCodec::new(),
))))
}
}
struct BytesFreezer<S>(S);
impl<S, E> Stream for BytesFreezer<S>
where
S: Stream<Item = Result<BytesMut, E>> + Unpin,
{
type Item = Result<Bytes, E>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(BytesMut::freeze)))
}
}
}
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
mod io_uring { mod io_uring {
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::{ use std::{
convert::TryInto, convert::TryInto,
fs::Metadata, fs::Metadata,
future::Future, future::Future,
io::SeekFrom,
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
task::{Context, Poll, Waker}, task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_uring::{
buf::{IoBuf, IoBufMut, Slice},
BufResult,
}; };
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
type IoFuture =
Pin<Box<dyn Future<Output = (tokio_uring::fs::File, std::io::Result<usize>, Vec<u8>)>>>;
type FlushFuture = Pin<Box<dyn Future<Output = (tokio_uring::fs::File, std::io::Result<()>)>>>;
type ShutdownFuture = Pin<Box<dyn Future<Output = std::io::Result<()>>>>;
type SeekFuture = Pin<Box<dyn Future<Output = std::io::Result<u64>>>>;
enum FileState {
Reading { future: IoFuture },
Writing { future: IoFuture },
Syncing { future: FlushFuture },
Seeking { future: SeekFuture },
Shutdown { future: ShutdownFuture },
Pending,
}
impl FileState {
fn take(&mut self) -> Self {
std::mem::replace(self, FileState::Pending)
}
}
pub(crate) struct File { pub(crate) struct File {
path: PathBuf, path: PathBuf,
inner: Option<tokio_uring::fs::File>, inner: tokio_uring::fs::File,
cursor: usize,
wakers: Vec<Waker>,
state: FileState,
} }
impl File { impl File {
@ -54,10 +161,7 @@ mod io_uring {
tracing::info!("Opening io-uring file"); tracing::info!("Opening io-uring file");
Ok(File { Ok(File {
path: path.as_ref().to_owned(), path: path.as_ref().to_owned(),
inner: Some(tokio_uring::fs::File::open(path).await?), inner: tokio_uring::fs::File::open(path).await?,
cursor: 0,
wakers: vec![],
state: FileState::Pending,
}) })
} }
@ -65,10 +169,7 @@ mod io_uring {
tracing::info!("Creating io-uring file"); tracing::info!("Creating io-uring file");
Ok(File { Ok(File {
path: path.as_ref().to_owned(), path: path.as_ref().to_owned(),
inner: Some(tokio_uring::fs::File::create(path).await?), inner: tokio_uring::fs::File::create(path).await?,
cursor: 0,
wakers: vec![],
state: FileState::Pending,
}) })
} }
@ -76,347 +177,194 @@ mod io_uring {
tokio::fs::metadata(&self.path).await tokio::fs::metadata(&self.path).await
} }
fn poll_read( pub(crate) async fn write_from_bytes<'a>(
&mut self, &'a mut self,
cx: &mut Context<'_>, bytes: Bytes,
buf: &mut ReadBuf<'_>, ) -> std::io::Result<()> {
mut future: IoFuture, let mut buf = bytes.to_vec();
) -> Poll<std::io::Result<()>> { let len: u64 = buf.len().try_into().unwrap();
match Pin::new(&mut future).poll(cx) {
Poll::Ready((file, Ok(bytes_read), vec)) => {
self.cursor += bytes_read;
self.inner = Some(file);
buf.put_slice(&vec[0..bytes_read]);
// Wake tasks waiting on read to complete let mut cursor: u64 = 0;
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(Ok(())) loop {
} if cursor == len {
Poll::Ready((file, Err(err), _vec)) => { break;
self.inner = Some(file);
// Wake tasks waiting on read to complete
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(Err(err))
}
Poll::Pending => {
self.state = FileState::Reading { future };
Poll::Pending
}
}
}
fn poll_write(
&mut self,
cx: &mut Context<'_>,
mut future: IoFuture,
) -> Poll<std::io::Result<usize>> {
match Pin::new(&mut future).poll(cx) {
Poll::Ready((file, Ok(bytes_written), _vec)) => {
self.cursor += bytes_written;
self.inner = Some(file);
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(Ok(bytes_written))
}
Poll::Ready((file, Err(err), _vec)) => {
self.inner = Some(file);
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(Err(err))
}
Poll::Pending => {
self.state = FileState::Writing { future };
Poll::Pending
}
}
}
fn poll_flush(
&mut self,
cx: &mut Context<'_>,
mut future: FlushFuture,
) -> Poll<std::io::Result<()>> {
match Pin::new(&mut future).poll(cx) {
Poll::Ready((file, res)) => {
self.inner = Some(file);
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(res)
}
Poll::Pending => {
self.state = FileState::Syncing { future };
Poll::Pending
}
}
}
fn poll_shutdown(
&mut self,
cx: &mut Context<'_>,
mut future: ShutdownFuture,
) -> Poll<std::io::Result<()>> {
match Pin::new(&mut future).poll(cx) {
Poll::Ready(res) => {
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(res)
}
Poll::Pending => {
self.state = FileState::Shutdown { future };
Poll::Pending
}
}
}
fn poll_seek(
&mut self,
cx: &mut Context<'_>,
mut future: SeekFuture,
) -> Poll<std::io::Result<u64>> {
match Pin::new(&mut future).poll(cx) {
Poll::Ready(Ok(new_position)) => {
for waker in self.wakers.drain(..) {
waker.wake();
}
if let Ok(position) = new_position.try_into() {
self.cursor = position;
Poll::Ready(Ok(new_position))
} else {
Poll::Ready(Err(std::io::ErrorKind::Other.into()))
}
}
Poll::Ready(Err(err)) => {
for waker in self.wakers.drain(..) {
waker.wake();
}
Poll::Ready(Err(err))
}
Poll::Pending => {
self.state = FileState::Seeking { future };
Poll::Pending
}
}
}
fn prepare_read(&mut self, buf: &mut ReadBuf<'_>) -> IoFuture {
let bytes_to_read = buf.remaining().min(65_536);
let vec = vec![0u8; bytes_to_read];
let file = self.inner.take().unwrap();
let position: u64 = self.cursor.try_into().unwrap();
Box::pin(async move {
let (res, vec) = file.read_at(vec, position).await;
(file, res, vec)
})
}
fn prepare_write(&mut self, buf: &[u8]) -> IoFuture {
let vec = buf.to_vec();
let file = self.inner.take().unwrap();
let position: u64 = self.cursor.try_into().unwrap();
Box::pin(async move {
let (res, vec) = file.write_at(vec, position).await;
(file, res, vec)
})
}
fn prepare_flush(&mut self) -> FlushFuture {
let file = self.inner.take().unwrap();
Box::pin(async move {
let res = file.sync_all().await;
(file, res)
})
}
fn prepare_shutdown(&mut self) -> ShutdownFuture {
let file = self.inner.take().unwrap();
Box::pin(async move {
file.sync_all().await?;
file.close().await
})
}
fn prepare_seek(&self, from_end: i64) -> SeekFuture {
let path = self.path.clone();
Box::pin(async move {
let meta = tokio::fs::metadata(path).await?;
let end = meta.len();
if from_end < 0 {
let from_end = (-1) * from_end;
let from_end: u64 =
from_end.try_into().map_err(|_| std::io::ErrorKind::Other)?;
return Ok(end + from_end);
} }
let from_end: u64 = from_end.try_into().map_err(|_| std::io::ErrorKind::Other)?; let cursor_usize: usize = cursor.try_into().unwrap();
let (res, slice) = self.inner.write_at(buf.slice(cursor_usize..), cursor).await;
let n: usize = res?;
if from_end > end { if n == 0 {
return Err(std::io::ErrorKind::Other.into()); return Err(std::io::ErrorKind::UnexpectedEof.into());
} }
Ok(end - from_end) buf = slice.into_inner();
}) let n: u64 = n.try_into().unwrap();
} cursor += n;
fn register_waker<T>(&mut self, cx: &mut Context<'_>) -> Poll<T> {
let already_registered = self.wakers.iter().any(|waker| cx.waker().will_wake(waker));
if !already_registered {
self.wakers.push(cx.waker().clone());
} }
Poll::Pending Ok(())
}
pub(crate) async fn write_from_async_read<'a, R>(
&'a mut self,
mut reader: R,
) -> std::io::Result<()>
where
R: AsyncRead + Unpin,
{
let metadata = self.metadata().await?;
let size = metadata.len();
let mut cursor: u64 = 0;
loop {
let max_size = (size - cursor).min(65_536);
let mut buf = Vec::with_capacity(max_size.try_into().unwrap());
let n = (&mut reader).take(max_size).read_to_end(&mut buf).await?;
if n == 0 {
break;
}
let mut buf: Slice<Vec<u8>> = buf.slice(..n);
let mut position = 0;
loop {
if position == buf.len() {
break;
}
let (res, slice) = self.write_at(buf.slice(position..), cursor).await;
position += res?;
buf = slice.into_inner();
}
let position: u64 = position.try_into().unwrap();
cursor += position;
}
Ok(())
}
pub(crate) async fn read_to_async_write<'a, W>(
&'a mut self,
writer: &mut W,
) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
let metadata = self.metadata().await?;
let size = metadata.len();
let mut cursor: u64 = 0;
loop {
if cursor == size {
break;
}
let max_size = (size - cursor).min(65_536);
let buf = Vec::with_capacity(max_size.try_into().unwrap());
let (res, mut buf): (_, Vec<u8>) = self.read_at(buf, cursor).await;
let n: usize = res?;
if n == 0 {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
writer.write_all(&mut buf[0..n]).await?;
let n: u64 = n.try_into().unwrap();
cursor += n;
}
Ok(())
}
pub(crate) async fn read_to_stream(
self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<
impl Stream<Item = Result<Bytes, crate::error::Error>> + Unpin,
crate::error::Error,
> {
let size = self.metadata().await?.len();
let cursor = from_start.unwrap_or(0);
let size = len.unwrap_or(size - cursor) + cursor;
Ok(super::CrateError(BytesStream {
file: Some(self),
size,
cursor,
fut: None,
}))
}
async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
self.inner.read_at(buf, pos).await
}
async fn write_at<T: IoBuf>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
self.inner.write_at(buf, pos).await
} }
} }
impl AsyncRead for File { struct BytesStream {
fn poll_read( file: Option<File>,
mut self: Pin<&mut Self>, size: u64,
cx: &mut Context<'_>, cursor: u64,
buf: &mut ReadBuf<'_>, fut: Option<Pin<Box<dyn Future<Output = (File, BufResult<usize, Vec<u8>>)>>>>,
) -> Poll<std::io::Result<()>> {
match self.state.take() {
FileState::Pending => {
let future = (*self).prepare_read(buf);
(*self).poll_read(cx, buf, future)
}
FileState::Reading { future } => (*self).poll_read(cx, buf, future),
_ => (*self).register_waker(cx),
}
}
} }
impl AsyncWrite for File { impl Stream for BytesStream {
fn poll_write( type Item = std::io::Result<Bytes>;
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
match self.state.take() {
FileState::Pending => {
let future = (*self).prepare_write(buf);
(*self).poll_write(cx, future) fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut fut = if let Some(fut) = self.fut.take() {
fut
} else {
let file = self.file.take().unwrap();
if self.cursor == self.size {
return Poll::Ready(None);
} }
FileState::Writing { future } => (*self).poll_write(cx, future),
_ => (*self).register_waker(cx),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { let cursor = self.cursor;
match self.state.take() { let max_size = self.size - self.cursor;
FileState::Pending => {
let future = (*self).prepare_flush();
(*self).poll_flush(cx, future) Box::pin(async move {
let buf = Vec::with_capacity(max_size.try_into().unwrap());
let buf_res = file.read_at(buf, cursor).await;
(file, buf_res)
})
};
match Pin::new(&mut fut).poll(cx) {
Poll::Pending => {
self.fut = Some(fut);
Poll::Pending
} }
FileState::Syncing { future } => (*self).poll_flush(cx, future), Poll::Ready((file, (Ok(n), mut buf))) => {
_ => (*self).register_waker(cx), self.file = Some(file);
} let _ = buf.split_off(n);
} let n: u64 = match n.try_into() {
Ok(n) => n,
Err(_) => return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into()))),
};
self.cursor += n;
fn poll_shutdown( Poll::Ready(Some(Ok(Bytes::from(buf))))
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
match self.state.take() {
FileState::Pending => {
let future = (*self).prepare_shutdown();
(*self).poll_shutdown(cx, future)
} }
FileState::Shutdown { future } => (*self).poll_shutdown(cx, future), Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))),
_ => (*self).register_waker(cx),
}
}
}
impl AsyncSeek for File {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
match position {
SeekFrom::Start(from_start) => {
self.cursor = from_start.try_into().unwrap();
Ok(())
}
SeekFrom::End(from_end) => match self.state.take() {
FileState::Pending => {
let future = self.prepare_seek(from_end);
self.state = FileState::Seeking { future };
Ok(())
}
_ => Err(std::io::ErrorKind::Other.into()),
},
SeekFrom::Current(from_current) => {
if from_current < 0 {
let to_subtract = (-1) * from_current;
let to_subtract: usize = to_subtract
.try_into()
.map_err(|_| std::io::ErrorKind::Other)?;
if to_subtract > self.cursor {
return Err(std::io::ErrorKind::Other.into());
}
self.cursor -= to_subtract;
} else {
let from_current: usize = from_current
.try_into()
.map_err(|_| std::io::ErrorKind::Other)?;
self.cursor += from_current;
}
Ok(())
}
}
}
fn poll_complete(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<u64>> {
match self.state.take() {
FileState::Pending => Poll::Ready(Ok(self
.cursor
.try_into()
.map_err(|_| std::io::ErrorKind::Other)?)),
FileState::Seeking { future } => (*self).poll_seek(cx, future),
_ => Poll::Ready(Err(std::io::ErrorKind::Other.into())),
} }
} }
} }

View file

@ -138,8 +138,8 @@ pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Err
} }
#[instrument(name = "Spawning process command", skip(input))] #[instrument(name = "Spawning process command", skip(input))]
pub(crate) fn process_image_write_read( pub(crate) fn process_image_file_read(
input: impl AsyncRead + Unpin + 'static, input: crate::file::File,
args: Vec<String>, args: Vec<String>,
format: Format, format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> { ) -> std::io::Result<impl AsyncRead + Unpin> {
@ -154,7 +154,7 @@ pub(crate) fn process_image_write_read(
.arg(last_arg), .arg(last_arg),
)?; )?;
Ok(process.write_read(input).unwrap()) Ok(process.file_read(input).unwrap())
} }
impl Details { impl Details {

View file

@ -6,10 +6,7 @@ use actix_web::{
}; };
use awc::Client; use awc::Client;
use dashmap::{mapref::entry::Entry, DashMap}; use dashmap::{mapref::entry::Entry, DashMap};
use futures_util::{ use futures_util::{stream::once, Stream};
stream::{once, LocalBoxStream},
Stream,
};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use opentelemetry::{ use opentelemetry::{
sdk::{propagation::TraceContextPropagator, Resource}, sdk::{propagation::TraceContextPropagator, Resource},
@ -26,7 +23,7 @@ use std::{
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::AsyncReadExt,
sync::{ sync::{
oneshot::{Receiver, Sender}, oneshot::{Receiver, Sender},
Semaphore, Semaphore,
@ -41,6 +38,7 @@ use tracing_log::LogTracer;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry}; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry};
mod config; mod config;
mod either;
mod error; mod error;
mod exiftool; mod exiftool;
mod ffmpeg; mod ffmpeg;
@ -56,6 +54,7 @@ mod validate;
use self::{ use self::{
config::{Config, Format}, config::{Config, Format},
either::Either,
error::{Error, UploadError}, error::{Error, UploadError},
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
upload_manager::{Details, UploadManager, UploadManagerSession}, upload_manager::{Details, UploadManager, UploadManagerSession},
@ -213,7 +212,7 @@ where
// Try writing to a file // Try writing to a file
#[instrument(name = "Saving file", skip(bytes))] #[instrument(name = "Saving file", skip(bytes))]
async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), Error> { async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), Error> {
if let Some(path) = path.parent() { if let Some(path) = path.parent() {
// create the directory for the file // create the directory for the file
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
@ -236,7 +235,7 @@ async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), Erro
// try writing // try writing
debug!("Writing to {:?}", path); debug!("Writing to {:?}", path);
if let Err(e) = file.write_all_buf(&mut bytes).await { if let Err(e) = file.write_from_bytes(bytes).await {
error!("Error writing {:?}, {}", path, e); error!("Error writing {:?}, {}", path, e);
// remove file if writing failed before completion // remove file if writing failed before completion
tokio::fs::remove_file(path).await?; tokio::fs::remove_file(path).await?;
@ -545,7 +544,7 @@ async fn process(
let file = crate::file::File::open(original_path.clone()).await?; let file = crate::file::File::open(original_path.clone()).await?;
let mut processed_reader = let mut processed_reader =
crate::magick::process_image_write_read(file, thumbnail_args, format)?; crate::magick::process_image_file_read(file, thumbnail_args, format)?;
let mut vec = Vec::new(); let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?; processed_reader.read_to_end(&mut vec).await?;
@ -718,7 +717,7 @@ async fn ranged_file_resp(
let mut builder = HttpResponse::PartialContent(); let mut builder = HttpResponse::PartialContent();
builder.insert_header(range.to_content_range(meta.len())); builder.insert_header(range.to_content_range(meta.len()));
(builder, range.chop_file(file).await?) (builder, Either::Left(range.chop_file(file).await?))
} else { } else {
return Err(UploadError::Range.into()); return Err(UploadError::Range.into());
} }
@ -726,8 +725,8 @@ async fn ranged_file_resp(
//No Range header in the request - return the entire document //No Range header in the request - return the entire document
None => { None => {
let file = crate::file::File::open(path).await?; let file = crate::file::File::open(path).await?;
let stream = Box::pin(crate::stream::bytes_stream(file)) as LocalBoxStream<'_, _>; let stream = file.read_to_stream(None, None).await?;
(HttpResponse::Ok(), stream) (HttpResponse::Ok(), Either::Right(stream))
} }
}; };

View file

@ -1,7 +1,4 @@
use crate::{ use crate::error::{Error, UploadError};
error::{Error, UploadError},
stream::bytes_stream,
};
use actix_web::{ use actix_web::{
dev::Payload, dev::Payload,
http::{ http::{
@ -11,9 +8,8 @@ use actix_web::{
web::Bytes, web::Bytes,
FromRequest, HttpRequest, FromRequest, HttpRequest,
}; };
use futures_util::stream::{once, LocalBoxStream, Stream}; use futures_util::stream::{once, Stream};
use std::{future::ready, io}; use std::future::ready;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Range { pub(crate) enum Range {
@ -61,25 +57,14 @@ impl Range {
pub(crate) async fn chop_file( pub(crate) async fn chop_file(
&self, &self,
mut file: crate::file::File, file: crate::file::File,
) -> Result<LocalBoxStream<'static, Result<Bytes, Error>>, Error> { ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Unpin, Error> {
match self { match self {
Range::Start(start) => { Range::Start(start) => file.read_to_stream(Some(*start), None).await,
file.seek(io::SeekFrom::Start(*start)).await?; Range::SuffixLength(from_start) => file.read_to_stream(None, Some(*from_start)).await,
Ok(Box::pin(bytes_stream(file)))
}
Range::SuffixLength(from_start) => {
file.seek(io::SeekFrom::Start(0)).await?;
let reader = file.take(*from_start);
Ok(Box::pin(bytes_stream(reader)))
}
Range::Segment(start, end) => { Range::Segment(start, end) => {
file.seek(io::SeekFrom::Start(*start)).await?; file.read_to_stream(Some(*start), Some(end.saturating_sub(*start)))
let reader = file.take(end.saturating_sub(*start)); .await
Ok(Box::pin(bytes_stream(reader)))
} }
} }
} }

View file

@ -1,7 +1,5 @@
use crate::error::Error;
use actix_rt::task::JoinHandle; use actix_rt::task::JoinHandle;
use actix_web::web::{Bytes, BytesMut}; use actix_web::web::Bytes;
use futures_util::Stream;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -13,7 +11,6 @@ use tokio::{
process::{Child, Command}, process::{Child, Command},
sync::oneshot::{channel, Receiver}, sync::oneshot::{channel, Receiver},
}; };
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::Instrument; use tracing::Instrument;
use tracing::Span; use tracing::Span;
@ -33,8 +30,6 @@ pub(crate) struct ProcessRead<I> {
handle: JoinHandle<()>, handle: JoinHandle<()>,
} }
struct BytesFreezer<S>(S, Span);
impl Process { impl Process {
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> { pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
Self::spawn(Command::new(command).args(args)) Self::spawn(Command::new(command).args(args))
@ -100,9 +95,9 @@ impl Process {
})) }))
} }
pub(crate) fn write_read( pub(crate) fn file_read(
mut self, mut self,
mut input_reader: impl AsyncRead + Unpin + 'static, mut input_file: crate::file::File,
) -> Option<impl AsyncRead + Unpin> { ) -> Option<impl AsyncRead + Unpin> {
let mut stdin = self.child.stdin.take()?; let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?; let stdout = self.child.stdout.take()?;
@ -113,7 +108,7 @@ impl Process {
let mut child = self.child; let mut child = self.child;
let handle = actix_rt::spawn( let handle = actix_rt::spawn(
async move { async move {
if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { if let Err(e) = input_file.read_to_async_write(&mut stdin).await {
let _ = tx.send(e); let _ = tx.send(e);
return; return;
} }
@ -144,15 +139,6 @@ impl Process {
} }
} }
pub(crate) fn bytes_stream(
input: impl AsyncRead + Unpin,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
BytesFreezer(
FramedRead::new(input, BytesCodec::new()),
tracing::info_span!("Serving bytes from AsyncRead"),
)
}
impl<I> AsyncRead for ProcessRead<I> impl<I> AsyncRead for ProcessRead<I>
where where
I: AsyncRead + Unpin, I: AsyncRead + Unpin,
@ -198,23 +184,6 @@ impl<I> Drop for ProcessRead<I> {
} }
} }
impl<S> Stream for BytesFreezer<S>
where
S: Stream<Item = std::io::Result<BytesMut>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = self.1.clone();
span.in_scope(|| {
Pin::new(&mut self.0)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze())))
.map_err(Error::from)
})
}
}
impl std::fmt::Display for StatusError { impl std::fmt::Display for StatusError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Command failed with bad status") write!(f, "Command failed with bad status")

View file

@ -13,7 +13,7 @@ use std::{
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, ReadBuf};
use tracing::{debug, error, info, instrument, warn, Span}; use tracing::{debug, error, info, instrument, warn, Span};
use tracing_futures::Instrument; use tracing_futures::Instrument;
@ -993,53 +993,7 @@ pub(crate) async fn safe_save_reader(
let mut file = crate::file::File::create(to).await?; let mut file = crate::file::File::create(to).await?;
tokio::io::copy(input, &mut file).await?; file.write_from_async_read(input).await?;
Ok(())
}
#[instrument(skip(stream))]
pub(crate) async fn safe_save_stream<E>(
to: PathBuf,
mut stream: UploadStream<E>,
) -> Result<(), Error>
where
Error: From<E>,
E: Unpin,
{
if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}
debug!("Checking if {:?} already exists", to);
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(UploadError::FileExists.into());
}
debug!("Writing stream to {:?}", to);
let to1 = to.clone();
let fut = async move {
let mut file = crate::file::File::create(to1).await?;
while let Some(res) = stream.next().await {
let mut bytes = res?;
file.write_all_buf(&mut bytes).await?;
}
Ok(())
};
if let Err(e) = fut.await {
error!("Failed to save file: {}", e);
let _ = tokio::fs::remove_file(to).await;
return Err(e);
}
Ok(()) Ok(())
} }
@ -1107,7 +1061,7 @@ mod test {
#[test] #[test]
fn hasher_works() { fn hasher_works() {
let hash = test_on_arbiter!(async move { let hash = test_on_arbiter!(async move {
let file1 = crate::file::File::open("./client-examples/earth.gif").await?; let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
let mut hasher = Hasher::new(file1, Sha256::new()); let mut hasher = Hasher::new(file1, Sha256::new());