Wrap BytesFreezer in span

This commit is contained in:
Aode (lion) 2021-09-25 16:38:44 -05:00
parent d67478843b
commit f390c62cbf

View file

@ -33,7 +33,7 @@ pub(crate) struct ProcessRead<I> {
handle: JoinHandle<()>, handle: JoinHandle<()>,
} }
struct BytesFreezer<S>(S); struct BytesFreezer<S>(S, Span);
impl Process { impl Process {
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> { pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
@ -147,7 +147,10 @@ impl Process {
pub(crate) fn bytes_stream( pub(crate) fn bytes_stream(
input: impl AsyncRead + Unpin, input: impl AsyncRead + Unpin,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin { ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
BytesFreezer(FramedRead::new(input, BytesCodec::new())) BytesFreezer(
FramedRead::new(input, BytesCodec::new()),
tracing::info_span!("Serving bytes from AsyncRead"),
)
} }
impl<I> AsyncRead for ProcessRead<I> impl<I> AsyncRead for ProcessRead<I>
@ -202,10 +205,13 @@ where
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = self.1.clone();
span.in_scope(|| {
Pin::new(&mut self.0) Pin::new(&mut self.0)
.poll_next(cx) .poll_next(cx)
.map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze()))) .map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze())))
.map_err(Error::from) .map_err(Error::from)
})
} }
} }