1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-01 21:08:43 +00:00

remove SendError and SendResponse services

This commit is contained in:
Nikolay Kim 2019-04-05 11:29:42 -07:00
parent f89321fd01
commit b6dacaa23a
8 changed files with 27 additions and 292 deletions

View file

@ -1,28 +0,0 @@
use std::{env, io};
use actix_codec::Framed;
use actix_http::{h1, Response, SendResponse, ServiceConfig};
use actix_server::{Io, Server};
use actix_service::{fn_service, NewService};
use actix_utils::framed::IntoFramed;
use actix_utils::stream::TakeItem;
use futures::Future;
use tokio_tcp::TcpStream;
fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "framed_hello=info");
env_logger::init();
Server::build()
.bind("framed_hello", "127.0.0.1:8080", || {
fn_service(|io: Io<TcpStream>| Ok(io.into_parts().0))
.and_then(IntoFramed::new(|| h1::Codec::new(ServiceConfig::default())))
.and_then(TakeItem::new().map_err(|_| ()))
.and_then(|(_req, _framed): (_, Framed<_, _>)| {
SendResponse::send(_framed, Response::Ok().body("Hello world!"))
.map_err(|_| ())
.map(|_| ())
})
})?
.run()
}

View file

@ -37,7 +37,7 @@ pub use self::message::{Message, RequestHead, ResponseHead};
pub use self::payload::{Payload, PayloadStream}; pub use self::payload::{Payload, PayloadStream};
pub use self::request::Request; pub use self::request::Request;
pub use self::response::{Response, ResponseBuilder}; pub use self::response::{Response, ResponseBuilder};
pub use self::service::{HttpService, SendError, SendResponse}; pub use self::service::HttpService;
pub mod http { pub mod http {
//! Various HTTP related types //! Various HTTP related types

View file

@ -208,7 +208,7 @@ impl<B> Response<B> {
} }
/// Set a body /// Set a body
pub(crate) fn set_body<B2>(self, body: B2) -> Response<B2> { pub fn set_body<B2>(self, body: B2) -> Response<B2> {
Response { Response {
head: self.head, head: self.head,
body: ResponseBody::Body(body), body: ResponseBody::Body(body),
@ -217,7 +217,7 @@ impl<B> Response<B> {
} }
/// Drop request's body /// Drop request's body
pub(crate) fn drop_body(self) -> Response<()> { pub fn drop_body(self) -> Response<()> {
Response { Response {
head: self.head, head: self.head,
body: ResponseBody::Body(()), body: ResponseBody::Body(()),

View file

@ -1,5 +0,0 @@
mod senderror;
mod service;
pub use self::senderror::{SendError, SendResponse};
pub use self::service::HttpService;

View file

@ -1,241 +0,0 @@
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_service::{NewService, Service};
use futures::future::{ok, Either, FutureResult};
use futures::{Async, Future, Poll, Sink};
use crate::body::{BodySize, MessageBody, ResponseBody};
use crate::error::{Error, ResponseError};
use crate::h1::{Codec, Message};
use crate::response::Response;
pub struct SendError<T, R, E>(PhantomData<(T, R, E)>);
impl<T, R, E> Default for SendError<T, R, E>
where
T: AsyncRead + AsyncWrite,
E: ResponseError,
{
fn default() -> Self {
SendError(PhantomData)
}
}
impl<T, R, E> NewService for SendError<T, R, E>
where
T: AsyncRead + AsyncWrite,
E: ResponseError,
{
type Request = Result<R, (E, Framed<T, Codec>)>;
type Response = R;
type Error = (E, Framed<T, Codec>);
type InitError = ();
type Service = SendError<T, R, E>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(SendError(PhantomData))
}
}
impl<T, R, E> Service for SendError<T, R, E>
where
T: AsyncRead + AsyncWrite,
E: ResponseError,
{
type Request = Result<R, (E, Framed<T, Codec>)>;
type Response = R;
type Error = (E, Framed<T, Codec>);
type Future = Either<FutureResult<R, (E, Framed<T, Codec>)>, SendErrorFut<T, R, E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Result<R, (E, Framed<T, Codec>)>) -> Self::Future {
match req {
Ok(r) => Either::A(ok(r)),
Err((e, framed)) => {
let res = e.error_response().set_body(format!("{}", e));
let (res, _body) = res.replace_body(());
Either::B(SendErrorFut {
framed: Some(framed),
res: Some((res, BodySize::Empty).into()),
err: Some(e),
_t: PhantomData,
})
}
}
}
}
pub struct SendErrorFut<T, R, E> {
res: Option<Message<(Response<()>, BodySize)>>,
framed: Option<Framed<T, Codec>>,
err: Option<E>,
_t: PhantomData<R>,
}
impl<T, R, E> Future for SendErrorFut<T, R, E>
where
E: ResponseError,
T: AsyncRead + AsyncWrite,
{
type Item = R;
type Error = (E, Framed<T, Codec>);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(res) = self.res.take() {
if self.framed.as_mut().unwrap().force_send(res).is_err() {
return Err((self.err.take().unwrap(), self.framed.take().unwrap()));
}
}
match self.framed.as_mut().unwrap().poll_complete() {
Ok(Async::Ready(_)) => {
Err((self.err.take().unwrap(), self.framed.take().unwrap()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err((self.err.take().unwrap(), self.framed.take().unwrap())),
}
}
}
pub struct SendResponse<T, B>(PhantomData<(T, B)>);
impl<T, B> Default for SendResponse<T, B> {
fn default() -> Self {
SendResponse(PhantomData)
}
}
impl<T, B> SendResponse<T, B>
where
T: AsyncRead + AsyncWrite,
B: MessageBody,
{
pub fn send(
framed: Framed<T, Codec>,
res: Response<B>,
) -> impl Future<Item = Framed<T, Codec>, Error = Error> {
// extract body from response
let (res, body) = res.replace_body(());
// write response
SendResponseFut {
res: Some(Message::Item((res, body.length()))),
body: Some(body),
framed: Some(framed),
}
}
}
impl<T, B> NewService for SendResponse<T, B>
where
T: AsyncRead + AsyncWrite,
B: MessageBody,
{
type Request = (Response<B>, Framed<T, Codec>);
type Response = Framed<T, Codec>;
type Error = Error;
type InitError = ();
type Service = SendResponse<T, B>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(SendResponse(PhantomData))
}
}
impl<T, B> Service for SendResponse<T, B>
where
T: AsyncRead + AsyncWrite,
B: MessageBody,
{
type Request = (Response<B>, Framed<T, Codec>);
type Response = Framed<T, Codec>;
type Error = Error;
type Future = SendResponseFut<T, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, (res, framed): (Response<B>, Framed<T, Codec>)) -> Self::Future {
let (res, body) = res.replace_body(());
SendResponseFut {
res: Some(Message::Item((res, body.length()))),
body: Some(body),
framed: Some(framed),
}
}
}
pub struct SendResponseFut<T, B> {
res: Option<Message<(Response<()>, BodySize)>>,
body: Option<ResponseBody<B>>,
framed: Option<Framed<T, Codec>>,
}
impl<T, B> Future for SendResponseFut<T, B>
where
T: AsyncRead + AsyncWrite,
B: MessageBody,
{
type Item = Framed<T, Codec>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let mut body_ready = self.body.is_some();
let framed = self.framed.as_mut().unwrap();
// send body
if self.res.is_none() && self.body.is_some() {
while body_ready && self.body.is_some() && !framed.is_write_buf_full() {
match self.body.as_mut().unwrap().poll_next()? {
Async::Ready(item) => {
// body is done
if item.is_none() {
let _ = self.body.take();
}
framed.force_send(Message::Chunk(item))?;
}
Async::NotReady => body_ready = false,
}
}
}
// flush write buffer
if !framed.is_write_buf_empty() {
match framed.poll_complete()? {
Async::Ready(_) => {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
}
}
Async::NotReady => return Ok(Async::NotReady),
}
}
// send response
if let Some(res) = self.res.take() {
framed.force_send(res)?;
continue;
}
if self.body.is_some() {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
}
} else {
break;
}
}
Ok(Async::Ready(self.framed.take().unwrap()))
}
}

View file

@ -11,7 +11,7 @@ use futures::future::{ok, Either};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use actix_http::{h1, ws, ResponseError, SendResponse, ServiceConfig}; use actix_http::{body::BodySize, h1, ws, ResponseError, ServiceConfig};
fn ws_service(req: ws::Frame) -> impl Future<Item = ws::Message, Error = io::Error> { fn ws_service(req: ws::Frame) -> impl Future<Item = ws::Message, Error = io::Error> {
match req { match req {
@ -46,26 +46,34 @@ fn test_simple() {
match ws::verify_handshake(&req) { match ws::verify_handshake(&req) {
Err(e) => { Err(e) => {
// validation failed // validation failed
let res = e.error_response();
Either::A( Either::A(
SendResponse::send(framed, e.error_response()) framed
.send(h1::Message::Item((
res.drop_body(),
BodySize::Empty,
)))
.map_err(|_| ()) .map_err(|_| ())
.map(|_| ()), .map(|_| ()),
) )
} }
Ok(_) => { Ok(_) => {
let res = ws::handshake_response(&req).finish();
Either::B( Either::B(
// send handshake response // send handshake response
SendResponse::send( framed
framed, .send(h1::Message::Item((
ws::handshake_response(&req).finish(), res.drop_body(),
) BodySize::None,
.map_err(|_| ()) )))
.and_then(|framed| { .map_err(|_| ())
// start websocket service .and_then(|framed| {
let framed = framed.into_framed(ws::Codec::new()); // start websocket service
ws::Transport::with(framed, ws_service) let framed =
.map_err(|_| ()) framed.into_framed(ws::Codec::new());
}), ws::Transport::with(framed, ws_service)
.map_err(|_| ())
}),
) )
} }
} }

View file

@ -189,6 +189,7 @@ pub mod client {
pub use awc::error::{ pub use awc::error::{
ConnectError, InvalidUrl, PayloadError, SendRequestError, WsClientError, ConnectError, InvalidUrl, PayloadError, SendRequestError, WsClientError,
}; };
pub use awc::{test, Client, ClientBuilder, ClientRequest, ClientResponse, pub use awc::{
Connector}; test, Client, ClientBuilder, ClientRequest, ClientResponse, Connector,
};
} }