Faster chunker

This commit is contained in:
Alex Auvolat 2022-07-25 11:59:55 +02:00
parent 49154a78d8
commit ad35b18bb1
No known key found for this signature in database
GPG key ID: 0E496D15096376BE

View file

@ -387,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S, stream: S,
read_all: bool, read_all: bool,
block_size: usize, block_size: usize,
buf: VecDeque<u8>, buf: VecDeque<Bytes>,
buf_len: usize,
} }
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@ -396,29 +397,50 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream, stream,
read_all: false, read_all: false,
block_size, block_size,
buf: VecDeque::with_capacity(2 * block_size), buf: VecDeque::with_capacity(8),
buf_len: 0,
} }
} }
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
while !self.read_all && self.buf.len() < self.block_size { while !self.read_all && self.buf_len < self.block_size {
if let Some(block) = self.stream.next().await { if let Some(block) = self.stream.next().await {
let bytes = block?; let bytes = block?;
trace!("Body next: {} bytes", bytes.len()); trace!("Body next: {} bytes", bytes.len());
self.buf.extend(bytes); self.buf_len += bytes.len();
self.buf.push_back(bytes);
} else { } else {
self.read_all = true; self.read_all = true;
} }
} }
if self.buf.is_empty() { if self.buf_len == 0 {
Ok(None) Ok(None)
} else if self.buf.len() <= self.block_size {
let block = self.buf.drain(..).collect::<Vec<u8>>();
Ok(Some(block))
} else { } else {
let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); let mut slices = Vec::with_capacity(self.buf.len());
Ok(Some(block)) let mut taken = 0;
while self.buf_len > 0 && taken < self.block_size {
let front = self.buf.pop_front().unwrap();
if taken + front.len() <= self.block_size {
taken += front.len();
self.buf_len -= front.len();
slices.push(front);
} else {
let front_take = self.block_size - taken;
slices.push(front.slice(..front_take));
self.buf.push_front(front.slice(front_take..));
self.buf_len -= front_take;
break;
}
}
Ok(Some(
slices
.iter()
.map(|x| &x[..])
.collect::<Vec<_>>()
.concat()
.into(),
))
} }
} }
} }