mirror of
https://github.com/actix/actix-web.git
synced 2024-11-25 19:11:10 +00:00
integrate with updated actix-{codec, utils} (#1634)
This commit is contained in:
parent
75d86a6beb
commit
8497b5f490
23 changed files with 127 additions and 86 deletions
11
CHANGES.md
11
CHANGES.md
|
@ -1,14 +1,19 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased
|
## Unreleased - 2020-xx-xx
|
||||||
### Added
|
### Added
|
||||||
* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing slash,
|
* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing
|
||||||
or as the new addition, always trimming trailing slashes.
|
slash, or as the new addition, always trimming trailing slashes.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
* Update actix-codec and actix-utils dependencies.
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.3 - 2020-08-17
|
## 3.0.0-beta.3 - 2020-08-17
|
||||||
### Changed
|
### Changed
|
||||||
* Update `rustls` to 0.18
|
* Update `rustls` to 0.18
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.2 - 2020-08-17
|
## 3.0.0-beta.2 - 2020-08-17
|
||||||
### Changed
|
### Changed
|
||||||
* `PayloadConfig` is now also considered in `Bytes` and `String` extractors when set
|
* `PayloadConfig` is now also considered in `Bytes` and `String` extractors when set
|
||||||
|
|
|
@ -65,9 +65,9 @@ name = "test_server"
|
||||||
required-features = ["compress"]
|
required-features = ["compress"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.3.0"
|
||||||
actix-service = "1.0.2"
|
actix-service = "1.0.6"
|
||||||
actix-utils = "1.0.6"
|
actix-utils = "2.0.0"
|
||||||
actix-router = "0.2.4"
|
actix-router = "0.2.4"
|
||||||
actix-rt = "1.1.1"
|
actix-rt = "1.1.1"
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
### Changed
|
||||||
|
* Update actix-codec and actix-utils dependencies.
|
||||||
|
|
||||||
## [2.0.0-beta.3] - 2020-08-14
|
## [2.0.0-beta.3] - 2020-08-14
|
||||||
|
|
||||||
|
|
|
@ -41,9 +41,9 @@ actors = ["actix"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-service = "1.0.5"
|
actix-service = "1.0.5"
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.3.0"
|
||||||
actix-connect = "2.0.0-alpha.4"
|
actix-connect = "2.0.0-alpha.4"
|
||||||
actix-utils = "1.0.6"
|
actix-utils = "2.0.0"
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-threadpool = "0.3.1"
|
actix-threadpool = "0.3.1"
|
||||||
actix-tls = { version = "2.0.0-alpha.2", optional = true }
|
actix-tls = { version = "2.0.0-alpha.2", optional = true }
|
||||||
|
|
|
@ -46,10 +46,10 @@ pub trait Connection {
|
||||||
|
|
||||||
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
|
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
|
||||||
/// Close connection
|
/// Close connection
|
||||||
fn close(&mut self);
|
fn close(self: Pin<&mut Self>);
|
||||||
|
|
||||||
/// Release connection to the connection pool
|
/// Release connection to the connection pool
|
||||||
fn release(&mut self);
|
fn release(self: Pin<&mut Self>);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -195,11 +195,15 @@ where
|
||||||
match self {
|
match self {
|
||||||
EitherConnection::A(con) => con
|
EitherConnection::A(con) => con
|
||||||
.open_tunnel(head)
|
.open_tunnel(head)
|
||||||
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
|
.map(|res| {
|
||||||
|
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A)))
|
||||||
|
})
|
||||||
.boxed_local(),
|
.boxed_local(),
|
||||||
EitherConnection::B(con) => con
|
EitherConnection::B(con) => con
|
||||||
.open_tunnel(head)
|
.open_tunnel(head)
|
||||||
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
|
.map(|res| {
|
||||||
|
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B)))
|
||||||
|
})
|
||||||
.boxed_local(),
|
.boxed_local(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,17 +67,17 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
// create Framed and send request
|
// create Framed and send request
|
||||||
let mut framed = Framed::new(io, h1::ClientCodec::default());
|
let mut framed_inner = Framed::new(io, h1::ClientCodec::default());
|
||||||
framed.send((head, body.size()).into()).await?;
|
framed_inner.send((head, body.size()).into()).await?;
|
||||||
|
|
||||||
// send request body
|
// send request body
|
||||||
match body.size() {
|
match body.size() {
|
||||||
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
|
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
|
||||||
_ => send_body(body, &mut framed).await?,
|
_ => send_body(body, Pin::new(&mut framed_inner)).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
// read response and init read body
|
// read response and init read body
|
||||||
let res = framed.into_future().await;
|
let res = Pin::new(&mut framed_inner).into_future().await;
|
||||||
let (head, framed) = if let (Some(result), framed) = res {
|
let (head, framed) = if let (Some(result), framed) = res {
|
||||||
let item = result.map_err(SendRequestError::from)?;
|
let item = result.map_err(SendRequestError::from)?;
|
||||||
(item, framed)
|
(item, framed)
|
||||||
|
@ -85,14 +85,14 @@ where
|
||||||
return Err(SendRequestError::from(ConnectError::Disconnected));
|
return Err(SendRequestError::from(ConnectError::Disconnected));
|
||||||
};
|
};
|
||||||
|
|
||||||
match framed.get_codec().message_type() {
|
match framed.codec_ref().message_type() {
|
||||||
h1::MessageType::None => {
|
h1::MessageType::None => {
|
||||||
let force_close = !framed.get_codec().keepalive();
|
let force_close = !framed.codec_ref().keepalive();
|
||||||
release_connection(framed, force_close);
|
release_connection(framed, force_close);
|
||||||
Ok((head, Payload::None))
|
Ok((head, Payload::None))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let pl: PayloadStream = PlStream::new(framed).boxed_local();
|
let pl: PayloadStream = PlStream::new(framed_inner).boxed_local();
|
||||||
Ok((head, pl.into()))
|
Ok((head, pl.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,35 +119,36 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// send request body to the peer
|
/// send request body to the peer
|
||||||
pub(crate) async fn send_body<I, B>(
|
pub(crate) async fn send_body<T, B>(
|
||||||
body: B,
|
body: B,
|
||||||
framed: &mut Framed<I, h1::ClientCodec>,
|
mut framed: Pin<&mut Framed<T, h1::ClientCodec>>,
|
||||||
) -> Result<(), SendRequestError>
|
) -> Result<(), SendRequestError>
|
||||||
where
|
where
|
||||||
I: ConnectionLifetime,
|
T: ConnectionLifetime + Unpin,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
let mut eof = false;
|
|
||||||
pin_mut!(body);
|
pin_mut!(body);
|
||||||
|
|
||||||
|
let mut eof = false;
|
||||||
while !eof {
|
while !eof {
|
||||||
while !eof && !framed.is_write_buf_full() {
|
while !eof && !framed.as_ref().is_write_buf_full() {
|
||||||
match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
|
match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
|
||||||
Some(result) => {
|
Some(result) => {
|
||||||
framed.write(h1::Message::Chunk(Some(result?)))?;
|
framed.as_mut().write(h1::Message::Chunk(Some(result?)))?;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
eof = true;
|
eof = true;
|
||||||
framed.write(h1::Message::Chunk(None))?;
|
framed.as_mut().write(h1::Message::Chunk(None))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !framed.is_write_buf_empty() {
|
if !framed.as_ref().is_write_buf_empty() {
|
||||||
poll_fn(|cx| match framed.flush(cx) {
|
poll_fn(|cx| match framed.as_mut().flush(cx) {
|
||||||
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
||||||
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
if !framed.is_write_buf_full() {
|
if !framed.as_ref().is_write_buf_full() {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
@ -158,13 +159,14 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SinkExt::flush(framed).await?;
|
SinkExt::flush(Pin::into_inner(framed)).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
/// HTTP client connection
|
/// HTTP client connection
|
||||||
pub struct H1Connection<T> {
|
pub struct H1Connection<T> {
|
||||||
|
/// T should be `Unpin`
|
||||||
io: Option<T>,
|
io: Option<T>,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
|
@ -175,7 +177,7 @@ where
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
/// Close connection
|
/// Close connection
|
||||||
fn close(&mut self) {
|
fn close(mut self: Pin<&mut Self>) {
|
||||||
if let Some(mut pool) = self.pool.take() {
|
if let Some(mut pool) = self.pool.take() {
|
||||||
if let Some(io) = self.io.take() {
|
if let Some(io) = self.io.take() {
|
||||||
pool.close(IoConnection::new(
|
pool.close(IoConnection::new(
|
||||||
|
@ -188,7 +190,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Release this connection to the connection pool
|
/// Release this connection to the connection pool
|
||||||
fn release(&mut self) {
|
fn release(mut self: Pin<&mut Self>) {
|
||||||
if let Some(mut pool) = self.pool.take() {
|
if let Some(mut pool) = self.pool.take() {
|
||||||
if let Some(io) = self.io.take() {
|
if let Some(io) = self.io.take() {
|
||||||
pool.release(IoConnection::new(
|
pool.release(IoConnection::new(
|
||||||
|
@ -242,14 +244,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
pub(crate) struct PlStream<Io> {
|
pub(crate) struct PlStream<Io> {
|
||||||
|
#[pin]
|
||||||
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
|
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io: ConnectionLifetime> PlStream<Io> {
|
impl<Io: ConnectionLifetime> PlStream<Io> {
|
||||||
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
|
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
|
||||||
|
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
|
||||||
|
|
||||||
PlStream {
|
PlStream {
|
||||||
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
|
framed: Some(framed),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,16 +267,16 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Self::Item>> {
|
) -> Poll<Option<Self::Item>> {
|
||||||
let this = self.get_mut();
|
let mut this = self.project();
|
||||||
|
|
||||||
match this.framed.as_mut().unwrap().next_item(cx)? {
|
match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
Poll::Ready(Some(chunk)) => {
|
Poll::Ready(Some(chunk)) => {
|
||||||
if let Some(chunk) = chunk {
|
if let Some(chunk) = chunk {
|
||||||
Poll::Ready(Some(Ok(chunk)))
|
Poll::Ready(Some(Ok(chunk)))
|
||||||
} else {
|
} else {
|
||||||
let framed = this.framed.take().unwrap();
|
let framed = this.framed.as_mut().as_pin_mut().unwrap();
|
||||||
let force_close = !framed.get_codec().keepalive();
|
let force_close = !framed.codec_ref().keepalive();
|
||||||
release_connection(framed, force_close);
|
release_connection(framed, force_close);
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
|
@ -280,14 +286,13 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release_connection<T, U>(framed: Framed<T, U>, force_close: bool)
|
fn release_connection<T, U>(framed: Pin<&mut Framed<T, U>>, force_close: bool)
|
||||||
where
|
where
|
||||||
T: ConnectionLifetime,
|
T: ConnectionLifetime,
|
||||||
{
|
{
|
||||||
let mut parts = framed.into_parts();
|
if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() {
|
||||||
if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() {
|
framed.io_pin().release()
|
||||||
parts.io.release()
|
|
||||||
} else {
|
} else {
|
||||||
parts.io.close()
|
framed.io_pin().close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
//! Error and Result module
|
//! Error and Result module
|
||||||
|
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::str::Utf8Error;
|
use std::str::Utf8Error;
|
||||||
|
@ -7,7 +8,7 @@ use std::{fmt, io, result};
|
||||||
|
|
||||||
use actix_codec::{Decoder, Encoder};
|
use actix_codec::{Decoder, Encoder};
|
||||||
pub use actix_threadpool::BlockingError;
|
pub use actix_threadpool::BlockingError;
|
||||||
use actix_utils::framed::DispatcherError as FramedDispatcherError;
|
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
|
||||||
use actix_utils::timeout::TimeoutError;
|
use actix_utils::timeout::TimeoutError;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use derive_more::{Display, From};
|
use derive_more::{Display, From};
|
||||||
|
@ -452,10 +453,10 @@ impl ResponseError for ContentTypeError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E, U: Encoder + Decoder> ResponseError for FramedDispatcherError<E, U>
|
impl<E, U: Encoder<I> + Decoder, I> ResponseError for FramedDispatcherError<E, U, I>
|
||||||
where
|
where
|
||||||
E: fmt::Debug + fmt::Display,
|
E: fmt::Debug + fmt::Display,
|
||||||
<U as Encoder>::Error: fmt::Debug,
|
<U as Encoder<I>>::Error: fmt::Debug,
|
||||||
<U as Decoder>::Error: fmt::Debug,
|
<U as Decoder>::Error: fmt::Debug,
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,13 +173,12 @@ impl Decoder for ClientPayloadCodec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Encoder for ClientCodec {
|
impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
|
||||||
type Item = Message<(RequestHeadType, BodySize)>;
|
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn encode(
|
fn encode(
|
||||||
&mut self,
|
&mut self,
|
||||||
item: Self::Item,
|
item: Message<(RequestHeadType, BodySize)>,
|
||||||
dst: &mut BytesMut,
|
dst: &mut BytesMut,
|
||||||
) -> Result<(), Self::Error> {
|
) -> Result<(), Self::Error> {
|
||||||
match item {
|
match item {
|
||||||
|
|
|
@ -144,13 +144,12 @@ impl Decoder for Codec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Encoder for Codec {
|
impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
|
||||||
type Item = Message<(Response<()>, BodySize)>;
|
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn encode(
|
fn encode(
|
||||||
&mut self,
|
&mut self,
|
||||||
item: Self::Item,
|
item: Message<(Response<()>, BodySize)>,
|
||||||
dst: &mut BytesMut,
|
dst: &mut BytesMut,
|
||||||
) -> Result<(), Self::Error> {
|
) -> Result<(), Self::Error> {
|
||||||
match item {
|
match item {
|
||||||
|
|
|
@ -548,10 +548,12 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
#[pin_project::pin_project]
|
||||||
pub struct OneRequestServiceResponse<T>
|
pub struct OneRequestServiceResponse<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
|
#[pin]
|
||||||
framed: Option<Framed<T, Codec>>,
|
framed: Option<Framed<T, Codec>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -562,16 +564,18 @@ where
|
||||||
type Output = Result<(Request, Framed<T, Codec>), ParseError>;
|
type Output = Result<(Request, Framed<T, Codec>), ParseError>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.framed.as_mut().unwrap().next_item(cx) {
|
let this = self.as_mut().project();
|
||||||
Poll::Ready(Some(Ok(req))) => match req {
|
|
||||||
|
match ready!(this.framed.as_pin_mut().unwrap().next_item(cx)) {
|
||||||
|
Some(Ok(req)) => match req {
|
||||||
Message::Item(req) => {
|
Message::Item(req) => {
|
||||||
Poll::Ready(Ok((req, self.framed.take().unwrap())))
|
let mut this = self.as_mut().project();
|
||||||
|
Poll::Ready(Ok((req, this.framed.take().unwrap())))
|
||||||
}
|
}
|
||||||
Message::Chunk(_) => unreachable!("Something is wrong"),
|
Message::Chunk(_) => unreachable!("Something is wrong"),
|
||||||
},
|
},
|
||||||
Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
|
Some(Err(err)) => Poll::Ready(Err(err)),
|
||||||
Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
|
None => Poll::Ready(Err(ParseError::Incomplete)),
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,13 @@ use crate::error::Error;
|
||||||
use crate::h1::{Codec, Message};
|
use crate::h1::{Codec, Message};
|
||||||
use crate::response::Response;
|
use crate::response::Response;
|
||||||
|
|
||||||
/// Send http/1 response
|
/// Send HTTP/1 response
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct SendResponse<T, B> {
|
pub struct SendResponse<T, B> {
|
||||||
res: Option<Message<(Response<()>, BodySize)>>,
|
res: Option<Message<(Response<()>, BodySize)>>,
|
||||||
#[pin]
|
#[pin]
|
||||||
body: Option<ResponseBody<B>>,
|
body: Option<ResponseBody<B>>,
|
||||||
|
#[pin]
|
||||||
framed: Option<Framed<T, Codec>>,
|
framed: Option<Framed<T, Codec>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,23 +36,30 @@ where
|
||||||
|
|
||||||
impl<T, B> Future for SendResponse<T, B>
|
impl<T, B> Future for SendResponse<T, B>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
B: MessageBody + Unpin,
|
B: MessageBody + Unpin,
|
||||||
{
|
{
|
||||||
type Output = Result<Framed<T, Codec>, Error>;
|
type Output = Result<Framed<T, Codec>, Error>;
|
||||||
|
|
||||||
// TODO: rethink if we need loops in polls
|
// TODO: rethink if we need loops in polls
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let mut this = self.project();
|
let mut this = self.as_mut().project();
|
||||||
|
|
||||||
let mut body_done = this.body.is_none();
|
let mut body_done = this.body.is_none();
|
||||||
loop {
|
loop {
|
||||||
let mut body_ready = !body_done;
|
let mut body_ready = !body_done;
|
||||||
let framed = this.framed.as_mut().unwrap();
|
|
||||||
|
|
||||||
// send body
|
// send body
|
||||||
if this.res.is_none() && body_ready {
|
if this.res.is_none() && body_ready {
|
||||||
while body_ready && !body_done && !framed.is_write_buf_full() {
|
while body_ready
|
||||||
|
&& !body_done
|
||||||
|
&& !this
|
||||||
|
.framed
|
||||||
|
.as_ref()
|
||||||
|
.as_pin_ref()
|
||||||
|
.unwrap()
|
||||||
|
.is_write_buf_full()
|
||||||
|
{
|
||||||
match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? {
|
match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? {
|
||||||
Poll::Ready(item) => {
|
Poll::Ready(item) => {
|
||||||
// body is done when item is None
|
// body is done when item is None
|
||||||
|
@ -59,6 +67,7 @@ where
|
||||||
if body_done {
|
if body_done {
|
||||||
let _ = this.body.take();
|
let _ = this.body.take();
|
||||||
}
|
}
|
||||||
|
let framed = this.framed.as_mut().as_pin_mut().unwrap();
|
||||||
framed.write(Message::Chunk(item))?;
|
framed.write(Message::Chunk(item))?;
|
||||||
}
|
}
|
||||||
Poll::Pending => body_ready = false,
|
Poll::Pending => body_ready = false,
|
||||||
|
@ -66,6 +75,8 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let framed = this.framed.as_mut().as_pin_mut().unwrap();
|
||||||
|
|
||||||
// flush write buffer
|
// flush write buffer
|
||||||
if !framed.is_write_buf_empty() {
|
if !framed.is_write_buf_empty() {
|
||||||
match framed.flush(cx)? {
|
match framed.flush(cx)? {
|
||||||
|
@ -96,6 +107,9 @@ where
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(Ok(this.framed.take().unwrap()))
|
|
||||||
|
let framed = this.framed.take().unwrap();
|
||||||
|
|
||||||
|
Poll::Ready(Ok(framed))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,8 +91,7 @@ impl Codec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Encoder for Codec {
|
impl Encoder<Message> for Codec {
|
||||||
type Item = Message;
|
|
||||||
type Error = ProtocolError;
|
type Error = ProtocolError;
|
||||||
|
|
||||||
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
|
|
@ -4,16 +4,18 @@ use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use actix_service::{IntoService, Service};
|
use actix_service::{IntoService, Service};
|
||||||
use actix_utils::framed;
|
use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError};
|
||||||
|
|
||||||
use super::{Codec, Frame, Message};
|
use super::{Codec, Frame, Message};
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
pub struct Dispatcher<S, T>
|
pub struct Dispatcher<S, T>
|
||||||
where
|
where
|
||||||
S: Service<Request = Frame, Response = Message> + 'static,
|
S: Service<Request = Frame, Response = Message> + 'static,
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
inner: framed::Dispatcher<S, T, Codec>,
|
#[pin]
|
||||||
|
inner: InnerDispatcher<S, T, Codec, Message>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, T> Dispatcher<S, T>
|
impl<S, T> Dispatcher<S, T>
|
||||||
|
@ -25,13 +27,13 @@ where
|
||||||
{
|
{
|
||||||
pub fn new<F: IntoService<S>>(io: T, service: F) -> Self {
|
pub fn new<F: IntoService<S>>(io: T, service: F) -> Self {
|
||||||
Dispatcher {
|
Dispatcher {
|
||||||
inner: framed::Dispatcher::new(Framed::new(io, Codec::new()), service),
|
inner: InnerDispatcher::new(Framed::new(io, Codec::new()), service),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with<F: IntoService<S>>(framed: Framed<T, Codec>, service: F) -> Self {
|
pub fn with<F: IntoService<S>>(framed: Framed<T, Codec>, service: F) -> Self {
|
||||||
Dispatcher {
|
Dispatcher {
|
||||||
inner: framed::Dispatcher::new(framed, service),
|
inner: InnerDispatcher::new(framed, service),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,9 +45,9 @@ where
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
{
|
{
|
||||||
type Output = Result<(), framed::DispatcherError<S::Error, Codec>>;
|
type Output = Result<(), DispatcherError<S::Error, Codec, Message>>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
Pin::new(&mut self.inner).poll(cx)
|
self.project().inner.poll(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
|
use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
|
||||||
use actix_http_test::test_server;
|
use actix_http_test::test_server;
|
||||||
use actix_service::{fn_factory, Service};
|
use actix_service::{fn_factory, Service};
|
||||||
use actix_utils::framed::Dispatcher;
|
use actix_utils::dispatcher::Dispatcher;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::future;
|
use futures_util::future;
|
||||||
use futures_util::task::{Context, Poll};
|
use futures_util::task::{Context, Poll};
|
||||||
|
@ -59,7 +59,7 @@ where
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Dispatcher::new(framed.into_framed(ws::Codec::new()), service)
|
Dispatcher::new(framed.replace_codec(ws::Codec::new()), service)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| panic!())
|
.map_err(|_| panic!())
|
||||||
};
|
};
|
||||||
|
|
|
@ -18,7 +18,7 @@ path = "src/lib.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "3.0.0-beta.3", default-features = false }
|
actix-web = { version = "3.0.0-beta.3", default-features = false }
|
||||||
actix-service = "1.0.1"
|
actix-service = "1.0.1"
|
||||||
actix-utils = "1.0.3"
|
actix-utils = "2.0.0"
|
||||||
bytes = "0.5.3"
|
bytes = "0.5.3"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
httparse = "1.3"
|
httparse = "1.3"
|
||||||
|
|
|
@ -19,7 +19,7 @@ path = "src/lib.rs"
|
||||||
actix = "0.10.0-alpha.2"
|
actix = "0.10.0-alpha.2"
|
||||||
actix-web = { version = "3.0.0-beta.3", default-features = false }
|
actix-web = { version = "3.0.0-beta.3", default-features = false }
|
||||||
actix-http = "2.0.0-beta.3"
|
actix-http = "2.0.0-beta.3"
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.3.0"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.2"
|
||||||
futures-channel = { version = "0.3.5", default-features = false }
|
futures-channel = { version = "0.3.5", default-features = false }
|
||||||
futures-core = { version = "0.3.5", default-features = false }
|
futures-core = { version = "0.3.5", default-features = false }
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## Unreleased - 2020-xx-xx
|
||||||
|
### Changed
|
||||||
|
* Update actix-codec dependency.
|
||||||
|
|
||||||
|
|
||||||
## 2.0.0-beta.3 - 2020-08-17
|
## 2.0.0-beta.3 - 2020-08-17
|
||||||
### Changed
|
### Changed
|
||||||
* Update `rustls` to 0.18
|
* Update `rustls` to 0.18
|
||||||
|
|
|
@ -37,7 +37,7 @@ rustls = ["rust-tls", "actix-http/rustls"]
|
||||||
compress = ["actix-http/compress"]
|
compress = ["actix-http/compress"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.3.0"
|
||||||
actix-service = "1.0.1"
|
actix-service = "1.0.1"
|
||||||
actix-http = "2.0.0-beta.3"
|
actix-http = "2.0.0-beta.3"
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
|
@ -61,7 +61,7 @@ actix-connect = { version = "2.0.0-alpha.4", features = ["openssl"] }
|
||||||
actix-web = { version = "3.0.0-beta.2", features = ["openssl"] }
|
actix-web = { version = "3.0.0-beta.2", features = ["openssl"] }
|
||||||
actix-http = { version = "2.0.0-beta.3", features = ["openssl"] }
|
actix-http = { version = "2.0.0-beta.3", features = ["openssl"] }
|
||||||
actix-http-test = { version = "2.0.0-alpha.1", features = ["openssl"] }
|
actix-http-test = { version = "2.0.0-alpha.1", features = ["openssl"] }
|
||||||
actix-utils = "1.0.3"
|
actix-utils = "2.0.0"
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
actix-tls = { version = "2.0.0-alpha.2", features = ["openssl", "rustls"] }
|
actix-tls = { version = "2.0.0-alpha.2", features = ["openssl", "rustls"] }
|
||||||
brotli2 = "0.3.2"
|
brotli2 = "0.3.2"
|
||||||
|
|
|
@ -152,7 +152,7 @@ where
|
||||||
let (head, framed) =
|
let (head, framed) =
|
||||||
connection.open_tunnel(RequestHeadType::from(head)).await?;
|
connection.open_tunnel(RequestHeadType::from(head)).await?;
|
||||||
|
|
||||||
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
|
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
|
||||||
Ok((head, framed))
|
Ok((head, framed))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ where
|
||||||
.open_tunnel(RequestHeadType::Rc(head, extra_headers))
|
.open_tunnel(RequestHeadType::Rc(head, extra_headers))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
|
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
|
||||||
Ok((head, framed))
|
Ok((head, framed))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -366,7 +366,7 @@ impl WebsocketsRequest {
|
||||||
// response and ws framed
|
// response and ws framed
|
||||||
Ok((
|
Ok((
|
||||||
ClientResponse::new(head, Payload::None),
|
ClientResponse::new(head, Payload::None),
|
||||||
framed.map_codec(|_| {
|
framed.into_map_codec(|_| {
|
||||||
if server_mode {
|
if server_mode {
|
||||||
ws::Codec::new().max_size(max_size)
|
ws::Codec::new().max_size(max_size)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -32,7 +32,7 @@ async fn test_simple() {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// start websocket service
|
// start websocket service
|
||||||
let framed = framed.into_framed(ws::Codec::new());
|
let framed = framed.replace_codec(ws::Codec::new());
|
||||||
ws::Dispatcher::with(framed, ws_service).await
|
ws::Dispatcher::with(framed, ws_service).await
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## Unreleased - 2020-xx-xx
|
||||||
|
* Update actix-codec and actix-utils dependencies.
|
||||||
|
|
||||||
## [2.0.0-alpha.1] - 2020-05-23
|
## [2.0.0-alpha.1] - 2020-05-23
|
||||||
|
|
||||||
* Update the `time` dependency to 0.2.7
|
* Update the `time` dependency to 0.2.7
|
||||||
|
|
|
@ -30,9 +30,9 @@ openssl = ["open-ssl", "awc/openssl"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-service = "1.0.1"
|
actix-service = "1.0.1"
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.3.0"
|
||||||
actix-connect = "2.0.0-alpha.4"
|
actix-connect = "2.0.0-alpha.4"
|
||||||
actix-utils = "1.0.3"
|
actix-utils = "2.0.0"
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
actix-testing = "1.0.0"
|
actix-testing = "1.0.0"
|
||||||
|
|
Loading…
Reference in a new issue