Reduce redundancy in process module

This commit is contained in:
asonix 2022-09-25 17:35:52 -05:00
parent 50d118a2a7
commit c9a74a73ca

View file

@ -4,12 +4,12 @@ use actix_web::web::Bytes;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
process::Stdio, process::{Stdio},
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, Command}, process::{Child, Command, ChildStdin},
sync::oneshot::{channel, Receiver}, sync::oneshot::{channel, Receiver},
}; };
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
@ -67,58 +67,50 @@ impl Process {
} }
#[tracing::instrument(skip(input))] #[tracing::instrument(skip(input))]
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> impl AsyncRead + Unpin { pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists"); self.read_fn(move |mut stdin| {
let mut input = input;
async move { stdin.write_all_buf(&mut input).await }
})
}
#[tracing::instrument]
pub(crate) fn read(self) -> impl AsyncRead + Unpin {
self.read_fn(|_| async { Ok(()) })
}
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
self,
mut async_read: A,
) -> impl AsyncRead + Unpin {
self.read_fn(move |mut stdin| async move { tokio::io::copy(&mut async_read, &mut stdin).await.map(|_| ()) })
}
#[tracing::instrument]
pub(crate) fn store_read<S: Store + 'static>(
self,
store: S,
identifier: S::Identifier,
) -> impl AsyncRead + Unpin {
self.read_fn(move |mut stdin| {
let store = store;
let identifier = identifier;
async move { store.read_into(&identifier, &mut stdin).await }
})
}
fn read_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin
where
F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>,
{
let stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists"); let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>); .in_scope(channel::<std::io::Error>);
let span = tracing::info_span!(parent: None, "Background process task from bytes");
span.follows_from(Span::current());
let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
}
.instrument(span),
)
});
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
}
}
#[tracing::instrument]
pub(crate) fn read(mut self) -> impl AsyncRead + Unpin {
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let span = tracing::info_span!(parent: None, "Background process task"); let span = tracing::info_span!(parent: None, "Background process task");
span.follows_from(Span::current()); span.follows_from(Span::current());
@ -126,54 +118,10 @@ impl Process {
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn( actix_rt::spawn(
async move { async move {
match child.wait().await { if let Err(e) = (f)(stdin).await {
Ok(status) => {
if !status.success() {
let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
}
.instrument(span),
)
});
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
}
}
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
mut self,
mut async_read: A,
) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>);
let span = tracing::info_span!(parent: None, "Background process task from bytes");
span.follows_from(Span::current());
let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Err(e) = tokio::io::copy(&mut async_read, &mut stdin).await {
let _ = tx.send(e); let _ = tx.send(e);
return; return;
} }
drop(stdin);
match child.wait().await { match child.wait().await {
Ok(status) => { Ok(status) => {
@ -199,61 +147,7 @@ impl Process {
err_closed: false, err_closed: false,
handle: DropHandle { inner: handle }, handle: DropHandle { inner: handle },
} }
}
#[tracing::instrument]
pub(crate) fn store_read<S: Store + 'static>(
mut self,
store: S,
identifier: S::Identifier,
) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel").in_scope(channel);
let span = tracing::info_span!(
parent: None,
"Background processs task from store",
?store,
?identifier
);
span.follows_from(Span::current());
let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Err(e) = store.read_into(&identifier, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
}
.instrument(span),
)
});
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
}
} }
} }