Spawn new task to avoid complexity/bugs

This commit is contained in:
Aode (lion) 2021-08-31 10:17:08 -05:00
parent c1d4e3b87e
commit a6f2082b37

View file

@ -1,8 +1,5 @@
use actix_web::web::Bytes;
use futures::{
future::FutureExt,
stream::{LocalBoxStream, Stream, StreamExt},
};
use futures::stream::{LocalBoxStream, Stream, StreamExt};
use std::{
pin::Pin,
task::{Context, Poll},
@ -49,98 +46,52 @@ impl Process {
self.child.stdout.take().map(ProcessStream::new)
}
pub(crate) fn sink_stream<S, E>(mut self, mut input_stream: S) -> Option<ProcessSinkStream<E>>
pub(crate) fn sink_stream<S, E>(mut self, input_stream: S) -> Option<ProcessSinkStream<E>>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: From<std::io::Error> + 'static,
{
let mut stdin = self.child.stdin.take();
let mut stdin = self.take_sink()?;
let mut stdout = self.take_stream()?;
let s = async_stream::stream! {
let mut wait = Box::pin(self.child.wait().fuse());
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
loop {
tokio::select! {
res = input_stream.next() => {
match res {
Some(Ok(mut bytes)) => {
if let Some(stdin) = stdin.as_mut() {
let mut fut = Box::pin(stdin.write_all_buf(&mut bytes));
actix_rt::spawn(async move {
if let Err(e) = stdin.send(input_stream).await {
let _ = tx.send(e).await;
}
});
loop {
tokio::select! {
res = &mut fut => {
if let Err(e) = res {
yield Err(e.into());
}
break;
}
res = stdout.next() => {
match res {
Some(Ok(bytes)) => yield Ok(bytes),
Some(Err(e)) => {
yield Err(e.into());
break;
}
None => break,
}
}
res = &mut wait => {
match res {
Ok(status) if !status.success() => {
yield Err(std::io::Error::from(std::io::ErrorKind::Other).into());
break;
},
Err(e) => {
yield Err(e.into());
break;
}
_ => (),
}
}
}
}
}
},
Some(Err(e)) => {
Some(ProcessSinkStream {
stream: Box::pin(async_stream::stream! {
loop {
tokio::select! {
opt = rx.recv() => {
if let Some(e) = opt {
yield Err(e);
break;
}
None => {
stdin.take();
},
}
}
res = stdout.next() => {
match res {
Some(Ok(bytes)) => yield Ok(bytes),
Some(Err(e)) => {
yield Err(e.into());
break;
res = stdout.next() => {
match res {
Some(Ok(bytes)) => yield Ok(bytes),
Some(Err(e)) => {
yield Err(e.into());
break;
}
None => break,
}
None => break,
}
}
res = &mut wait => {
match res {
Ok(status) if !status.success() => {
yield Err(std::io::Error::from(std::io::ErrorKind::Other).into());
break;
},
Err(e) => {
yield Err(e.into());
break;
}
_ => (),
}
}
}
}
};
Some(ProcessSinkStream {
stream: Box::pin(s),
drop(stdout);
match self.child.wait().await {
Ok(status) if status.success() => return,
Ok(_) => yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()),
Err(e) => yield Err(e.into()),
}
}),
})
}
}