1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-27 01:18:10 +00:00

provide optimisation path for single-chunk body types (#2497)

This commit is contained in:
Rob Ede 2021-12-09 13:52:35 +00:00 committed by GitHub
parent 69fa17f66f
commit 774ac7fec4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 328 additions and 30 deletions

View file

@ -18,6 +18,7 @@
* `Request::take_conn_data()`. [#2491]
* `Request::take_req_data()`. [#2487]
* `impl Clone` for `RequestHead`. [#2487]
* New methods on `MessageBody` trait, `is_complete_body` and `take_complete_body`, both with default implementations, for optimisations on body types that are done in exactly one poll/chunk. [#2497]
### Changed
* Rename `body::BoxBody::{from_body => new}`. [#2468]
@ -45,6 +46,7 @@
[#2487]: https://github.com/actix/actix-web/pull/2487
[#2488]: https://github.com/actix/actix-web/pull/2488
[#2491]: https://github.com/actix/actix-web/pull/2491
[#2497]: https://github.com/actix/actix-web/pull/2497
## 3.0.0-beta.14 - 2021-11-30

View file

@ -51,6 +51,34 @@ impl MessageBody for BoxBody {
.poll_next(cx)
.map_err(|err| Error::new_body().with_cause(err))
}
fn is_complete_body(&self) -> bool {
self.0.is_complete_body()
}
fn take_complete_body(&mut self) -> Bytes {
debug_assert!(
self.is_complete_body(),
"boxed type does not allow taking complete body; caller should make sure to \
call `is_complete_body` first",
);
// we do not have DerefMut access to call take_complete_body directly but since
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_pin_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(data))) => data,
_ => {
panic!(
"boxed type indicated it allows taking complete body but failed to \
return Bytes when polled",
);
}
}
}
}
#[cfg(test)]

View file

@ -67,6 +67,20 @@ where
.map_err(|err| Error::new_body().with_cause(err)),
}
}
fn is_complete_body(&self) -> bool {
match self {
EitherBody::Left { body } => body.is_complete_body(),
EitherBody::Right { body } => body.is_complete_body(),
}
}
fn take_complete_body(&mut self) -> Bytes {
match self {
EitherBody::Left { body } => body.take_complete_body(),
EitherBody::Right { body } => body.take_complete_body(),
}
}
}
#[cfg(test)]

View file

@ -25,10 +25,58 @@ pub trait MessageBody {
fn size(&self) -> BodySize;
/// Attempt to pull out the next chunk of body bytes.
// TODO: expand documentation
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>>;
/// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`.
///
/// This method's implementation should agree with [`take_complete_body`] and should always be
/// checked before taking the body.
///
/// The default implementation returns `false.
///
/// [`take_complete_body`]: MessageBody::take_complete_body
fn is_complete_body(&self) -> bool {
false
}
/// Returns the complete chunk of body bytes.
///
/// Implementors of this method should note the following:
/// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of
/// performing this check is delegated to the caller.
/// - If the result of [`is_complete_body`] is conditional, that condition should be given
/// equivalent attention here.
/// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic.
/// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless
/// the chunk is guaranteed to be empty.
///
/// The default implementation panics unconditionally, indicating a control flow bug in the
/// calling code.
///
/// # Panics
/// With a correct implementation, panics if called without first checking [`is_complete_body`].
///
/// [`is_complete_body`]: MessageBody::is_complete_body
/// [`take_complete_body`]: MessageBody::take_complete_body
/// [`poll_next`]: MessageBody::poll_next
fn take_complete_body(&mut self) -> Bytes {
assert!(
self.is_complete_body(),
"type ({}) allows taking complete body but did not provide an implementation \
of `take_complete_body`",
std::any::type_name::<Self>()
);
unimplemented!(
"type ({}) does not allow taking complete body; caller should make sure to \
check `is_complete_body` first",
std::any::type_name::<Self>()
);
}
}
mod foreign_impls {
@ -49,6 +97,14 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
match *self {}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
match *self {}
}
}
impl MessageBody for () {
@ -66,6 +122,16 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(None)
}
#[inline]
fn is_complete_body(&self) -> bool {
true
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
}
}
impl<B> MessageBody for Box<B>
@ -86,6 +152,16 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
Pin::new(self.get_mut().as_mut()).poll_next(cx)
}
#[inline]
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
self.as_mut().take_complete_body()
}
}
impl<B> MessageBody for Pin<Box<B>>
@ -106,6 +182,38 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.as_mut().poll_next(cx)
}
#[inline]
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
debug_assert!(
self.is_complete_body(),
"inner type \"{}\" does not allow taking complete body; caller should make sure to \
call `is_complete_body` first",
std::any::type_name::<B>(),
);
// we do not have DerefMut access to call take_complete_body directly but since
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(data))) => data,
_ => {
panic!(
"inner type \"{}\" indicated it allows taking complete body but failed to \
return Bytes when polled",
std::any::type_name::<B>()
);
}
}
}
}
impl MessageBody for &'static [u8] {
@ -116,17 +224,23 @@ mod foreign_impls {
}
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
let bytes = mem::take(self.get_mut());
let bytes = Bytes::from_static(bytes);
Poll::Ready(Some(Ok(bytes)))
Poll::Ready(Some(Ok(self.take_complete_body())))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self))
}
}
impl MessageBody for Bytes {
@ -137,16 +251,23 @@ mod foreign_impls {
}
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
let bytes = mem::take(self.get_mut());
Poll::Ready(Some(Ok(bytes)))
Poll::Ready(Some(Ok(self.take_complete_body())))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self)
}
}
impl MessageBody for BytesMut {
@ -157,16 +278,23 @@ mod foreign_impls {
}
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
let bytes = mem::take(self.get_mut()).freeze();
Poll::Ready(Some(Ok(bytes)))
Poll::Ready(Some(Ok(self.take_complete_body())))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).freeze()
}
}
impl MessageBody for Vec<u8> {
@ -177,16 +305,23 @@ mod foreign_impls {
}
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
let bytes = mem::take(self.get_mut());
Poll::Ready(Some(Ok(Bytes::from(bytes))))
Poll::Ready(Some(Ok(self.take_complete_body())))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
}
}
impl MessageBody for &'static str {
@ -208,6 +343,14 @@ mod foreign_impls {
Poll::Ready(Some(Ok(bytes)))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self).as_bytes())
}
}
impl MessageBody for String {
@ -228,6 +371,14 @@ mod foreign_impls {
Poll::Ready(Some(Ok(Bytes::from(string))))
}
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
}
}
impl MessageBody for bytestring::ByteString {
@ -244,6 +395,14 @@ mod foreign_impls {
let string = mem::take(self.get_mut());
Poll::Ready(Some(Ok(string.into_bytes())))
}
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).into_bytes()
}
}
}
@ -406,6 +565,51 @@ mod tests {
assert_poll_next!(pl, Bytes::from("test"));
}
#[test]
fn take_string() {
let mut data = "test".repeat(2);
let data_bytes = Bytes::from(data.clone());
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), data_bytes);
let mut big_data = "test".repeat(64 * 1024);
let data_bytes = Bytes::from(big_data.clone());
assert!(big_data.is_complete_body());
assert_eq!(big_data.take_complete_body(), data_bytes);
}
#[test]
fn take_boxed_equivalence() {
let mut data = Bytes::from_static(b"test");
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
let mut data = Box::new(Bytes::from_static(b"test"));
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
let mut data = Box::pin(Bytes::from_static(b"test"));
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
}
#[test]
fn take_policy() {
let mut data = Bytes::from_static(b"test");
// first call returns chunk
assert_eq!(data.take_complete_body(), b"test".as_ref());
// second call returns empty
assert_eq!(data.take_complete_body(), b"".as_ref());
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut data = Bytes::from_static(b"test");
// take returns whole chunk
assert_eq!(data.take_complete_body(), b"test".as_ref());
// subsequent poll_next returns None
assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None));
}
// down-casting used to be done with a method on MessageBody trait
// test is kept to demonstrate equivalence of Any trait
#[actix_rt::test]

View file

@ -40,4 +40,14 @@ impl MessageBody for None {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(Option::None)
}
#[inline]
fn is_complete_body(&self) -> bool {
true
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
}
}

View file

@ -53,32 +53,32 @@ impl<B: MessageBody> Encoder<B> {
}
}
pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self {
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|| head.status == StatusCode::SWITCHING_PROTOCOLS
|| head.status == StatusCode::NO_CONTENT
|| encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto);
match body.size() {
// no need to compress an empty body
BodySize::None => return Self::none(),
// we cannot assume that Sized is not a stream
BodySize::Sized(_) | BodySize::Stream => {}
// no need to compress an empty body
if matches!(body.size(), BodySize::None) {
return Self::none();
}
// TODO potentially some optimisation for single-chunk responses here by trying to read the
// payload eagerly, stopping after 2 polls if the first is a chunk and the second is None
let body = if body.is_complete_body() {
let body = body.take_complete_body();
EncoderBody::Full { body }
} else {
EncoderBody::Stream { body }
};
if can_encode {
// Modify response body only if encoder is set
if let Some(enc) = ContentEncoder::encoder(encoding) {
update_head(encoding, head);
head.no_chunking(false);
return Encoder {
body: EncoderBody::Stream { body },
body,
encoder: Some(enc),
fut: None,
eof: false,
@ -87,7 +87,7 @@ impl<B: MessageBody> Encoder<B> {
}
Encoder {
body: EncoderBody::Stream { body },
body,
encoder: None,
fut: None,
eof: false,
@ -99,6 +99,7 @@ pin_project! {
#[project = EncoderBodyProj]
enum EncoderBody<B> {
None,
Full { body: Bytes },
Stream { #[pin] body: B },
}
}
@ -112,6 +113,7 @@ where
fn size(&self) -> BodySize {
match self {
EncoderBody::None => BodySize::None,
EncoderBody::Full { body } => body.size(),
EncoderBody::Stream { body } => body.size(),
}
}
@ -122,12 +124,32 @@ where
) -> Poll<Option<Result<Bytes, Self::Error>>> {
match self.project() {
EncoderBodyProj::None => Poll::Ready(None),
EncoderBodyProj::Full { body } => {
Pin::new(body).poll_next(cx).map_err(|err| match err {})
}
EncoderBodyProj::Stream { body } => body
.poll_next(cx)
.map_err(|err| EncoderError::Body(err.into())),
}
}
fn is_complete_body(&self) -> bool {
match self {
EncoderBody::None => true,
EncoderBody::Full { .. } => true,
EncoderBody::Stream { .. } => false,
}
}
fn take_complete_body(&mut self) -> Bytes {
match self {
EncoderBody::None => Bytes::new(),
EncoderBody::Full { body } => body.take_complete_body(),
EncoderBody::Stream { .. } => {
panic!("EncoderBody::Stream variant cannot be taken")
}
}
}
}
impl<B> MessageBody for Encoder<B>
@ -137,10 +159,10 @@ where
type Error = EncoderError;
fn size(&self) -> BodySize {
if self.encoder.is_none() {
self.body.size()
} else {
if self.encoder.is_some() {
BodySize::Stream
} else {
self.body.size()
}
}
@ -211,6 +233,22 @@ where
}
}
}
fn is_complete_body(&self) -> bool {
if self.encoder.is_some() {
false
} else {
self.body.is_complete_body()
}
}
fn take_complete_body(&mut self) -> Bytes {
if self.encoder.is_some() {
panic!("compressed body stream cannot be taken")
} else {
self.body.take_complete_body()
}
}
}
fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
@ -218,6 +256,8 @@ fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
header::CONTENT_ENCODING,
HeaderValue::from_static(encoding.as_str()),
);
head.no_chunking(false);
}
enum ContentEncoder {

View file

@ -101,7 +101,7 @@ async fn test_h2_1() -> io::Result<()> {
#[actix_rt::test]
async fn test_h2_body() -> io::Result<()> {
let data = "HELLOWORLD".to_owned().repeat(64 * 1024);
let data = "HELLOWORLD".to_owned().repeat(64 * 1024); // 640 KiB
let mut srv = test_server(move || {
HttpService::build()
.h2(|mut req: Request<_>| async move {