1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-02 21:39:26 +00:00

Unuse stream trait.

This commit is contained in:
Yury Yarashevich 2023-10-18 22:35:44 +02:00
parent 292c95c8a4
commit 7cc4210eb3

View file

@ -11,7 +11,7 @@ use bytes::{Buf, Bytes};
use derive_more::Display;
#[cfg(feature = "compress-gzip")]
use flate2::write::{GzEncoder, ZlibEncoder};
use futures_core::{ready, Stream};
use futures_core::ready;
use pin_project_lite::pin_project;
use tracing::trace;
#[cfg(feature = "compress-zstd")]
@ -171,15 +171,12 @@ where
}
if let Some(cooperative_encoder) = this.encoder.as_deref_mut() {
match ready!(Pin::new(cooperative_encoder).poll_next(cx)) {
Some(Ok(Some(chunk))) => return Poll::Ready(Some(Ok(chunk))),
Some(Ok(None)) => {
// Need more data from uncompressed body
}
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => {
unreachable!()
match ready!(Pin::new(cooperative_encoder).poll_encoded_chunk(cx)) {
Ok(Some(encoded_chunk)) => return Poll::Ready(Some(Ok(encoded_chunk))),
Ok(None) => {
// Need next chunk from uncompressed body
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
}
@ -191,9 +188,7 @@ where
Some(Ok(chunk)) => match this.encoder.as_deref_mut() {
None => return Poll::Ready(Some(Ok(chunk))),
Some(encoder) => {
debug_assert!(encoder.chunk_ready_to_encode.is_none());
encoder.chunk_ready_to_encode = Some(chunk);
encoder.budget_used = 0;
encoder.push_chunk(chunk);
}
},
@ -327,6 +322,47 @@ impl CooperativeContentEncoder {
_ => None,
}
}
fn push_chunk(&mut self, chunk: Bytes) {
debug_assert!(self.chunk_ready_to_encode.is_none());
self.chunk_ready_to_encode = Some(chunk);
self.budget_used = 0
}
fn poll_encoded_chunk(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<Bytes>, EncoderError>> {
let this = self.get_mut();
loop {
if this.budget_used > 8 {
this.budget_used = 0;
cx.waker().wake_by_ref();
return Poll::Pending;
}
if let Some(mut chunk) = this.chunk_ready_to_encode.take() {
let encode_len = chunk.len().min(this.preferred_chunk_size);
this.content_encoder
.write(&chunk[..encode_len])
.map_err(EncoderError::Io)?;
chunk.advance(encode_len);
if !chunk.is_empty() {
this.chunk_ready_to_encode = Some(chunk);
}
let encoded_chunk = this.content_encoder.take();
if encoded_chunk.is_empty() {
continue;
}
this.budget_used += 1;
return Poll::Ready(Ok(Some(encoded_chunk)));
} else {
return Poll::Ready(Ok(None));
}
}
}
}
impl ContentEncoder {
@ -416,42 +452,6 @@ impl ContentEncoder {
}
}
impl futures_core::Stream for CooperativeContentEncoder {
type Item = Result<Option<Bytes>, EncoderError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if this.budget_used > 8 {
this.budget_used = 0;
cx.waker().wake_by_ref();
return Poll::Pending;
}
if let Some(mut chunk) = this.chunk_ready_to_encode.take() {
let encode_len = chunk.len().min(this.preferred_chunk_size);
this.content_encoder
.write(&chunk[..encode_len])
.map_err(EncoderError::Io)?;
chunk.advance(encode_len);
if !chunk.is_empty() {
this.chunk_ready_to_encode = Some(chunk);
}
let encoded_chunk = this.content_encoder.take();
if encoded_chunk.is_empty() {
continue;
}
this.budget_used += 1;
return Poll::Ready(Some(Ok(Some(encoded_chunk))));
} else {
return Poll::Ready(Some(Ok(None)));
}
}
}
}
#[cfg(feature = "compress-brotli")]
fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
Box::new(brotli::CompressorWriter::new(