Abort process writer task if reader is dropped

This commit is contained in:
Aode (lion) 2021-09-11 16:35:38 -05:00
parent 0ffd66c74e
commit bf1a16d7d3
3 changed files with 13 additions and 6 deletions

View file

@ -1,3 +1,4 @@
use actix_rt::time::Timeout;
use actix_web::{ use actix_web::{
dev::{Service, ServiceRequest, Transform}, dev::{Service, ServiceRequest, Transform},
http::StatusCode, http::StatusCode,
@ -9,7 +10,6 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_rt::time::Timeout;
use tracing_futures::{Instrument, Instrumented}; use tracing_futures::{Instrument, Instrumented};
use uuid::Uuid; use uuid::Uuid;

View file

@ -49,9 +49,7 @@ impl Range {
) -> impl Stream<Item = Result<Bytes, UploadError>> + Unpin { ) -> impl Stream<Item = Result<Bytes, UploadError>> + Unpin {
match self { match self {
Range::RangeStart(start) => once(ready(Ok(bytes.slice(*start as usize..)))), Range::RangeStart(start) => once(ready(Ok(bytes.slice(*start as usize..)))),
Range::SuffixLength(from_start) => { Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))),
once(ready(Ok(bytes.slice(..*from_start as usize))))
}
Range::Segment(start, end) => { Range::Segment(start, end) => {
once(ready(Ok(bytes.slice(*start as usize..*end as usize)))) once(ready(Ok(bytes.slice(*start as usize..*end as usize))))
} }

View file

@ -19,6 +19,7 @@ pub(crate) struct ProcessRead<I> {
inner: I, inner: I,
err_recv: tokio::sync::oneshot::Receiver<std::io::Error>, err_recv: tokio::sync::oneshot::Receiver<std::io::Error>,
err_closed: bool, err_closed: bool,
handle: actix_rt::task::JoinHandle<()>,
} }
struct BytesFreezer<S>(S); struct BytesFreezer<S>(S);
@ -43,7 +44,7 @@ impl Process {
let mut child = self.child; let mut child = self.child;
actix_rt::spawn(async move { let handle = actix_rt::spawn(async move {
if let Err(e) = stdin.write_all_buf(&mut input).await { if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e); let _ = tx.send(e);
return; return;
@ -67,6 +68,7 @@ impl Process {
inner: stdout, inner: stdout,
err_recv: rx, err_recv: rx,
err_closed: false, err_closed: false,
handle,
})) }))
} }
@ -81,7 +83,7 @@ impl Process {
let mut child = self.child; let mut child = self.child;
actix_rt::spawn(async move { let handle = actix_rt::spawn(async move {
if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await {
let _ = tx.send(e); let _ = tx.send(e);
return; return;
@ -105,6 +107,7 @@ impl Process {
inner: stdout, inner: stdout,
err_recv: rx, err_recv: rx,
err_closed: false, err_closed: false,
handle,
})) }))
} }
} }
@ -144,6 +147,12 @@ where
} }
} }
impl<I> Drop for ProcessRead<I> {
fn drop(&mut self) {
self.handle.abort();
}
}
impl<S> Stream for BytesFreezer<S> impl<S> Stream for BytesFreezer<S>
where where
S: Stream<Item = std::io::Result<BytesMut>> + Unpin, S: Stream<Item = std::io::Result<BytesMut>> + Unpin,