1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 11:31:09 +00:00

clean up body type (#1872)

This commit is contained in:
fakeshadow 2021-01-05 07:47:38 +08:00 committed by GitHub
parent e567873326
commit 93161df141
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, mem}; use std::{fmt, mem};
@ -68,7 +67,7 @@ impl<T: MessageBody + Unpin> MessageBody for Box<T> {
#[pin_project(project = ResponseBodyProj)] #[pin_project(project = ResponseBodyProj)]
pub enum ResponseBody<B> { pub enum ResponseBody<B> {
Body(#[pin] B), Body(#[pin] B),
Other(#[pin] Body), Other(Body),
} }
impl ResponseBody<Body> { impl ResponseBody<Body> {
@ -110,7 +109,7 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
) -> Poll<Option<Result<Bytes, Error>>> { ) -> Poll<Option<Result<Bytes, Error>>> {
match self.project() { match self.project() {
ResponseBodyProj::Body(body) => body.poll_next(cx), ResponseBodyProj::Body(body) => body.poll_next(cx),
ResponseBodyProj::Other(body) => body.poll_next(cx), ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
} }
} }
} }
@ -124,12 +123,11 @@ impl<B: MessageBody> Stream for ResponseBody<B> {
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
match self.project() { match self.project() {
ResponseBodyProj::Body(body) => body.poll_next(cx), ResponseBodyProj::Body(body) => body.poll_next(cx),
ResponseBodyProj::Other(body) => body.poll_next(cx), ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
} }
} }
} }
#[pin_project(project = BodyProj)]
/// Represents various types of http message body. /// Represents various types of http message body.
pub enum Body { pub enum Body {
/// Empty response. `Content-Length` header is not set. /// Empty response. `Content-Length` header is not set.
@ -168,10 +166,10 @@ impl MessageBody for Body {
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> { ) -> Poll<Option<Result<Bytes, Error>>> {
match self.project() { match self.get_mut() {
BodyProj::None => Poll::Ready(None), Body::None => Poll::Ready(None),
BodyProj::Empty => Poll::Ready(None), Body::Empty => Poll::Ready(None),
BodyProj::Bytes(ref mut bin) => { Body::Bytes(ref mut bin) => {
let len = bin.len(); let len = bin.len();
if len == 0 { if len == 0 {
Poll::Ready(None) Poll::Ready(None)
@ -179,7 +177,7 @@ impl MessageBody for Body {
Poll::Ready(Some(Ok(mem::take(bin)))) Poll::Ready(Some(Ok(mem::take(bin))))
} }
} }
BodyProj::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx), Body::Message(body) => Pin::new(&mut **body).poll_next(cx),
} }
} }
} }
@ -266,12 +264,12 @@ where
} }
} }
impl<S, E> From<BodyStream<S, E>> for Body impl<S, E> From<BodyStream<S>> for Body
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
fn from(s: BodyStream<S, E>) -> Body { fn from(s: BodyStream<S>) -> Body {
Body::from_message(s) Body::from_message(s)
} }
} }
@ -367,27 +365,21 @@ impl MessageBody for String {
/// Type represent streaming body. /// Type represent streaming body.
/// Response does not contain `content-length` header and appropriate transfer encoding is used. /// Response does not contain `content-length` header and appropriate transfer encoding is used.
#[pin_project] pub struct BodyStream<S: Unpin> {
pub struct BodyStream<S: Unpin, E> {
#[pin]
stream: S, stream: S,
_phantom: PhantomData<E>,
} }
impl<S, E> BodyStream<S, E> impl<S, E> BodyStream<S>
where where
S: Stream<Item = Result<Bytes, E>> + Unpin, S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>, E: Into<Error>,
{ {
pub fn new(stream: S) -> Self { pub fn new(stream: S) -> Self {
BodyStream { BodyStream { stream }
stream,
_phantom: PhantomData,
}
} }
} }
impl<S, E> MessageBody for BodyStream<S, E> impl<S, E> MessageBody for BodyStream<S>
where where
S: Stream<Item = Result<Bytes, E>> + Unpin, S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>, E: Into<Error>,
@ -402,13 +394,12 @@ where
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> { ) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = self.project().stream;
loop { loop {
let stream = stream.as_mut(); let stream = &mut self.as_mut().stream;
return Poll::Ready(match ready!(stream.poll_next(cx)) { return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue, Some(Ok(ref bytes)) if bytes.is_empty() => continue,
opt => opt.map(|res| res.map_err(Into::into)), opt => opt.map(|res| res.map_err(Into::into)),
}); });
@ -418,10 +409,8 @@ where
/// Type represent streaming body. This body implementation should be used /// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding. /// if total size of stream is known. Data get sent as is without using transfer encoding.
#[pin_project]
pub struct SizedStream<S: Unpin> { pub struct SizedStream<S: Unpin> {
size: u64, size: u64,
#[pin]
stream: S, stream: S,
} }
@ -448,13 +437,12 @@ where
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> { ) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream: Pin<&mut S> = self.project().stream;
loop { loop {
let stream = stream.as_mut(); let stream = &mut self.as_mut().stream;
return Poll::Ready(match ready!(stream.poll_next(cx)) { return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue, Some(Ok(ref bytes)) if bytes.is_empty() => continue,
val => val, val => val,
}); });