1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-02 05:18:44 +00:00

simplify payload api; add missing http error helper functions

This commit is contained in:
Nikolay Kim 2019-02-07 11:06:05 -08:00
parent fcace161c7
commit cd83553db7
21 changed files with 442 additions and 175 deletions

View file

@ -75,7 +75,6 @@ openssl = { version="0.10", optional = true }
[dev-dependencies]
actix-rt = "0.1.0"
actix-web = "0.7"
actix-server = { version="0.2", features=["ssl"] }
actix-connector = { version="0.2.0", features=["ssl"] }
actix-utils = "0.2.1"

View file

@ -18,8 +18,8 @@ fn main() {
.client_timeout(1000)
.client_disconnect(1000)
.server_hostname("localhost")
.finish(|_req: Request| {
_req.body().limit(512).and_then(|bytes: Bytes| {
.finish(|mut req: Request| {
req.body().limit(512).and_then(|bytes: Bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));

View file

@ -8,8 +8,8 @@ use futures::Future;
use log::info;
use std::env;
fn handle_request(_req: Request) -> impl Future<Item = Response, Error = Error> {
_req.body().limit(512).from_err().and_then(|bytes: Bytes| {
fn handle_request(mut req: Request) -> impl Future<Item = Response, Error = Error> {
req.body().limit(512).from_err().and_then(|bytes: Bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));

View file

@ -50,14 +50,14 @@ where
.into_future()
.map_err(|(e, _)| SendRequestError::from(e))
.and_then(|(item, framed)| {
if let Some(res) = item {
if let Some(mut res) = item {
match framed.get_codec().message_type() {
h1::MessageType::None => {
let force_close = !framed.get_codec().keepalive();
release_connection(framed, force_close)
}
_ => {
*res.payload.borrow_mut() = Some(Payload::stream(framed))
res.set_payload(Payload::stream(framed));
}
}
ok(res)
@ -199,27 +199,10 @@ where
}
}
struct EmptyPayload;
impl Stream for EmptyPayload {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(Async::Ready(None))
}
}
pub(crate) struct Payload<Io> {
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
}
impl Payload<()> {
pub fn empty() -> PayloadStream {
Box::new(EmptyPayload)
}
}
impl<Io: ConnectionLifetime> Payload<Io> {
pub fn stream(framed: Framed<Io, h1::ClientCodec>) -> PayloadStream {
Box::new(Payload {

View file

@ -1,4 +1,3 @@
use std::cell::RefCell;
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
@ -111,7 +110,7 @@ where
Ok(ClientResponse {
head,
payload: RefCell::new(Some(Box::new(Payload::new(body)))),
payload: Some(Box::new(Payload::new(body))),
})
})
.from_err()

View file

@ -268,10 +268,9 @@ impl ClientRequestBuilder {
///
/// ```rust
/// # extern crate mime;
/// # extern crate actix_web;
/// # use actix_web::client::*;
/// # extern crate actix_http;
/// #
/// use actix_web::{client, http};
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = client::ClientRequest::build()
@ -299,16 +298,14 @@ impl ClientRequestBuilder {
/// To override header use `set_header()` method.
///
/// ```rust
/// # extern crate http;
/// # extern crate actix_web;
/// # use actix_web::client::*;
/// # extern crate actix_http;
/// #
/// use http::header;
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = ClientRequest::build()
/// let req = client::ClientRequest::build()
/// .header("X-TEST", "value")
/// .header(header::CONTENT_TYPE, "application/json")
/// .header(http::header::CONTENT_TYPE, "application/json")
/// .finish()
/// .unwrap();
/// }
@ -427,8 +424,8 @@ impl ClientRequestBuilder {
/// Set a cookie
///
/// ```rust
/// # extern crate actix_web;
/// use actix_web::{client, http};
/// # extern crate actix_http;
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = client::ClientRequest::build()

View file

@ -1,4 +1,3 @@
use std::cell::RefCell;
use std::fmt;
use bytes::Bytes;
@ -10,13 +9,11 @@ use crate::error::PayloadError;
use crate::httpmessage::HttpMessage;
use crate::message::{Head, ResponseHead};
use super::h1proto::Payload;
/// Client Response
#[derive(Default)]
pub struct ClientResponse {
pub(crate) head: ResponseHead,
pub(crate) payload: RefCell<Option<PayloadStream>>,
pub(crate) payload: Option<PayloadStream>,
}
impl HttpMessage for ClientResponse {
@ -27,12 +24,8 @@ impl HttpMessage for ClientResponse {
}
#[inline]
fn payload(self) -> Self::Stream {
if let Some(payload) = self.payload.borrow_mut().take() {
payload
} else {
Payload::empty()
}
fn payload(&mut self) -> Option<Self::Stream> {
self.payload.take()
}
}
@ -41,7 +34,7 @@ impl ClientResponse {
pub fn new() -> ClientResponse {
ClientResponse {
head: ResponseHead::default(),
payload: RefCell::new(None),
payload: None,
}
}
@ -84,6 +77,11 @@ impl ClientResponse {
pub fn keep_alive(&self) -> bool {
self.head().keep_alive()
}
/// Set response payload
pub fn set_payload(&mut self, payload: PayloadStream) {
self.payload = Some(payload);
}
}
impl Stream for ClientResponse {
@ -91,7 +89,7 @@ impl Stream for ClientResponse {
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(ref mut payload) = &mut *self.payload.borrow_mut() {
if let Some(ref mut payload) = self.payload {
payload.poll()
} else {
Ok(Async::Ready(None))

View file

@ -344,7 +344,7 @@ pub enum PayloadError {
UnknownLength,
/// Http2 payload error
#[display(fmt = "{}", _0)]
H2Payload(h2::Error),
Http2Payload(h2::Error),
}
impl From<io::Error> for PayloadError {
@ -642,6 +642,16 @@ where
InternalError::new(err, StatusCode::UNAUTHORIZED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *PAYMENT_REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorPaymentRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::PAYMENT_REQUIRED).into()
}
/// Helper function that creates wrapper of any error and generate *FORBIDDEN*
/// response.
#[allow(non_snake_case)]
@ -672,6 +682,26 @@ where
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED).into()
}
/// Helper function that creates wrapper of any error and generate *NOT
/// ACCEPTABLE* response.
#[allow(non_snake_case)]
pub fn ErrorNotAcceptable<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::NOT_ACCEPTABLE).into()
}
/// Helper function that creates wrapper of any error and generate *PROXY
/// AUTHENTICATION REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorProxyAuthenticationRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::PROXY_AUTHENTICATION_REQUIRED).into()
}
/// Helper function that creates wrapper of any error and generate *REQUEST
/// TIMEOUT* response.
#[allow(non_snake_case)]
@ -702,6 +732,116 @@ where
InternalError::new(err, StatusCode::GONE).into()
}
/// Helper function that creates wrapper of any error and generate *LENGTH
/// REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorLengthRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::LENGTH_REQUIRED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *PAYLOAD TOO LARGE* response.
#[allow(non_snake_case)]
pub fn ErrorPayloadTooLarge<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::PAYLOAD_TOO_LARGE).into()
}
/// Helper function that creates wrapper of any error and generate
/// *URI TOO LONG* response.
#[allow(non_snake_case)]
pub fn ErrorUriTooLong<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::URI_TOO_LONG).into()
}
/// Helper function that creates wrapper of any error and generate
/// *UNSUPPORTED MEDIA TYPE* response.
#[allow(non_snake_case)]
pub fn ErrorUnsupportedMediaType<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::UNSUPPORTED_MEDIA_TYPE).into()
}
/// Helper function that creates wrapper of any error and generate
/// *RANGE NOT SATISFIABLE* response.
#[allow(non_snake_case)]
pub fn ErrorRangeNotSatisfiable<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::RANGE_NOT_SATISFIABLE).into()
}
/// Helper function that creates wrapper of any error and generate
/// *IM A TEAPOT* response.
#[allow(non_snake_case)]
pub fn ErrorImATeapot<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::IM_A_TEAPOT).into()
}
/// Helper function that creates wrapper of any error and generate
/// *MISDIRECTED REQUEST* response.
#[allow(non_snake_case)]
pub fn ErrorMisdirectedRequest<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::MISDIRECTED_REQUEST).into()
}
/// Helper function that creates wrapper of any error and generate
/// *UNPROCESSABLE ENTITY* response.
#[allow(non_snake_case)]
pub fn ErrorUnprocessableEntity<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::UNPROCESSABLE_ENTITY).into()
}
/// Helper function that creates wrapper of any error and generate
/// *LOCKED* response.
#[allow(non_snake_case)]
pub fn ErrorLocked<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::LOCKED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *FAILED DEPENDENCY* response.
#[allow(non_snake_case)]
pub fn ErrorFailedDependency<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::FAILED_DEPENDENCY).into()
}
/// Helper function that creates wrapper of any error and generate
/// *UPGRADE REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorUpgradeRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::UPGRADE_REQUIRED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *PRECONDITION FAILED* response.
#[allow(non_snake_case)]
@ -712,6 +852,46 @@ where
InternalError::new(err, StatusCode::PRECONDITION_FAILED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *PRECONDITION REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorPreconditionRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::PRECONDITION_REQUIRED).into()
}
/// Helper function that creates wrapper of any error and generate
/// *TOO MANY REQUESTS* response.
#[allow(non_snake_case)]
pub fn ErrorTooManyRequests<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::TOO_MANY_REQUESTS).into()
}
/// Helper function that creates wrapper of any error and generate
/// *REQUEST HEADER FIELDS TOO LARGE* response.
#[allow(non_snake_case)]
pub fn ErrorRequestHeaderFieldsTooLarge<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE).into()
}
/// Helper function that creates wrapper of any error and generate
/// *UNAVAILABLE FOR LEGAL REASONS* response.
#[allow(non_snake_case)]
pub fn ErrorUnavailableForLegalReasons<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS).into()
}
/// Helper function that creates wrapper of any error and generate
/// *EXPECTATION FAILED* response.
#[allow(non_snake_case)]
@ -772,6 +952,66 @@ where
InternalError::new(err, StatusCode::GATEWAY_TIMEOUT).into()
}
/// Helper function that creates wrapper of any error and
/// generate *HTTP VERSION NOT SUPPORTED* response.
#[allow(non_snake_case)]
pub fn ErrorHttpVersionNotSupported<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::HTTP_VERSION_NOT_SUPPORTED).into()
}
/// Helper function that creates wrapper of any error and
/// generate *VARIANT ALSO NEGOTIATES* response.
#[allow(non_snake_case)]
pub fn ErrorVariantAlsoNegotiates<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::VARIANT_ALSO_NEGOTIATES).into()
}
/// Helper function that creates wrapper of any error and
/// generate *INSUFFICIENT STORAGE* response.
#[allow(non_snake_case)]
pub fn ErrorInsufficientStorage<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::INSUFFICIENT_STORAGE).into()
}
/// Helper function that creates wrapper of any error and
/// generate *LOOP DETECTED* response.
#[allow(non_snake_case)]
pub fn ErrorLoopDetected<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::LOOP_DETECTED).into()
}
/// Helper function that creates wrapper of any error and
/// generate *NOT EXTENDED* response.
#[allow(non_snake_case)]
pub fn ErrorNotExtended<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::NOT_EXTENDED).into()
}
/// Helper function that creates wrapper of any error and
/// generate *NETWORK AUTHENTICATION REQUIRED* response.
#[allow(non_snake_case)]
pub fn ErrorNetworkAuthenticationRequired<T>(err: T) -> Error
where
T: fmt::Debug + fmt::Display + 'static,
{
InternalError::new(err, StatusCode::NETWORK_AUTHENTICATION_REQUIRED).into()
}
#[cfg(test)]
mod tests {
use super::*;
@ -926,6 +1166,9 @@ mod tests {
let r: Response = ErrorUnauthorized("err").into();
assert_eq!(r.status(), StatusCode::UNAUTHORIZED);
let r: Response = ErrorPaymentRequired("err").into();
assert_eq!(r.status(), StatusCode::PAYMENT_REQUIRED);
let r: Response = ErrorForbidden("err").into();
assert_eq!(r.status(), StatusCode::FORBIDDEN);
@ -935,6 +1178,12 @@ mod tests {
let r: Response = ErrorMethodNotAllowed("err").into();
assert_eq!(r.status(), StatusCode::METHOD_NOT_ALLOWED);
let r: Response = ErrorNotAcceptable("err").into();
assert_eq!(r.status(), StatusCode::NOT_ACCEPTABLE);
let r: Response = ErrorProxyAuthenticationRequired("err").into();
assert_eq!(r.status(), StatusCode::PROXY_AUTHENTICATION_REQUIRED);
let r: Response = ErrorRequestTimeout("err").into();
assert_eq!(r.status(), StatusCode::REQUEST_TIMEOUT);
@ -944,12 +1193,57 @@ mod tests {
let r: Response = ErrorGone("err").into();
assert_eq!(r.status(), StatusCode::GONE);
let r: Response = ErrorLengthRequired("err").into();
assert_eq!(r.status(), StatusCode::LENGTH_REQUIRED);
let r: Response = ErrorPreconditionFailed("err").into();
assert_eq!(r.status(), StatusCode::PRECONDITION_FAILED);
let r: Response = ErrorPayloadTooLarge("err").into();
assert_eq!(r.status(), StatusCode::PAYLOAD_TOO_LARGE);
let r: Response = ErrorUriTooLong("err").into();
assert_eq!(r.status(), StatusCode::URI_TOO_LONG);
let r: Response = ErrorUnsupportedMediaType("err").into();
assert_eq!(r.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
let r: Response = ErrorRangeNotSatisfiable("err").into();
assert_eq!(r.status(), StatusCode::RANGE_NOT_SATISFIABLE);
let r: Response = ErrorExpectationFailed("err").into();
assert_eq!(r.status(), StatusCode::EXPECTATION_FAILED);
let r: Response = ErrorImATeapot("err").into();
assert_eq!(r.status(), StatusCode::IM_A_TEAPOT);
let r: Response = ErrorMisdirectedRequest("err").into();
assert_eq!(r.status(), StatusCode::MISDIRECTED_REQUEST);
let r: Response = ErrorUnprocessableEntity("err").into();
assert_eq!(r.status(), StatusCode::UNPROCESSABLE_ENTITY);
let r: Response = ErrorLocked("err").into();
assert_eq!(r.status(), StatusCode::LOCKED);
let r: Response = ErrorFailedDependency("err").into();
assert_eq!(r.status(), StatusCode::FAILED_DEPENDENCY);
let r: Response = ErrorUpgradeRequired("err").into();
assert_eq!(r.status(), StatusCode::UPGRADE_REQUIRED);
let r: Response = ErrorPreconditionRequired("err").into();
assert_eq!(r.status(), StatusCode::PRECONDITION_REQUIRED);
let r: Response = ErrorTooManyRequests("err").into();
assert_eq!(r.status(), StatusCode::TOO_MANY_REQUESTS);
let r: Response = ErrorRequestHeaderFieldsTooLarge("err").into();
assert_eq!(r.status(), StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE);
let r: Response = ErrorUnavailableForLegalReasons("err").into();
assert_eq!(r.status(), StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS);
let r: Response = ErrorInternalServerError("err").into();
assert_eq!(r.status(), StatusCode::INTERNAL_SERVER_ERROR);
@ -964,5 +1258,23 @@ mod tests {
let r: Response = ErrorGatewayTimeout("err").into();
assert_eq!(r.status(), StatusCode::GATEWAY_TIMEOUT);
let r: Response = ErrorHttpVersionNotSupported("err").into();
assert_eq!(r.status(), StatusCode::HTTP_VERSION_NOT_SUPPORTED);
let r: Response = ErrorVariantAlsoNegotiates("err").into();
assert_eq!(r.status(), StatusCode::VARIANT_ALSO_NEGOTIATES);
let r: Response = ErrorInsufficientStorage("err").into();
assert_eq!(r.status(), StatusCode::INSUFFICIENT_STORAGE);
let r: Response = ErrorLoopDetected("err").into();
assert_eq!(r.status(), StatusCode::LOOP_DETECTED);
let r: Response = ErrorNotExtended("err").into();
assert_eq!(r.status(), StatusCode::NOT_EXTENDED);
let r: Response = ErrorNetworkAuthenticationRequired("err").into();
assert_eq!(r.status(), StatusCode::NETWORK_AUTHENTICATION_REQUIRED);
}
}

View file

@ -14,11 +14,11 @@ use crate::body::{Body, BodyLength, MessageBody, ResponseBody};
use crate::config::ServiceConfig;
use crate::error::DispatchError;
use crate::error::{ParseError, PayloadError};
use crate::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use crate::request::Request;
use crate::response::Response;
use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use super::{H1ServiceResult, Message, MessageType};
const MAX_PIPELINED_MESSAGES: usize = 16;

View file

@ -9,11 +9,13 @@ mod codec;
mod decoder;
mod dispatcher;
mod encoder;
mod payload;
mod service;
pub use self::client::{ClientCodec, ClientPayloadCodec};
pub use self::codec::Codec;
pub use self::dispatcher::Dispatcher;
pub use self::payload::{Payload, PayloadBuffer};
pub use self::service::{H1Service, H1ServiceHandler, OneRequest};
use crate::request::Request;

View file

@ -54,6 +54,7 @@ impl Response {
STATIC_RESP!(Gone, StatusCode::GONE);
STATIC_RESP!(LengthRequired, StatusCode::LENGTH_REQUIRED);
STATIC_RESP!(PreconditionFailed, StatusCode::PRECONDITION_FAILED);
STATIC_RESP!(PreconditionRequired, StatusCode::PRECONDITION_REQUIRED);
STATIC_RESP!(PayloadTooLarge, StatusCode::PAYLOAD_TOO_LARGE);
STATIC_RESP!(UriTooLong, StatusCode::URI_TOO_LONG);
STATIC_RESP!(UnsupportedMediaType, StatusCode::UNSUPPORTED_MEDIA_TYPE);

View file

@ -25,7 +25,7 @@ pub trait HttpMessage: Sized {
fn headers(&self) -> &HeaderMap;
/// Message payload stream
fn payload(self) -> Self::Stream;
fn payload(&mut self) -> Option<Self::Stream>;
#[doc(hidden)]
/// Get a header
@ -128,7 +128,7 @@ pub trait HttpMessage: Sized {
/// }
/// # fn main() {}
/// ```
fn body(self) -> MessageBody<Self> {
fn body(&mut self) -> MessageBody<Self> {
MessageBody::new(self)
}
@ -162,7 +162,7 @@ pub trait HttpMessage: Sized {
/// }
/// # fn main() {}
/// ```
fn urlencoded<T: DeserializeOwned>(self) -> UrlEncoded<Self, T> {
fn urlencoded<T: DeserializeOwned>(&mut self) -> UrlEncoded<Self, T> {
UrlEncoded::new(self)
}
@ -198,19 +198,19 @@ pub trait HttpMessage: Sized {
/// }
/// # fn main() {}
/// ```
fn json<T: DeserializeOwned>(self) -> JsonBody<Self, T> {
fn json<T: DeserializeOwned>(&mut self) -> JsonBody<Self, T> {
JsonBody::new(self)
}
/// Return stream of lines.
fn readlines(self) -> Readlines<Self> {
fn readlines(&mut self) -> Readlines<Self> {
Readlines::new(self)
}
}
/// Stream to read request line by line.
pub struct Readlines<T: HttpMessage> {
stream: T::Stream,
stream: Option<T::Stream>,
buff: BytesMut,
limit: usize,
checked_buff: bool,
@ -220,7 +220,7 @@ pub struct Readlines<T: HttpMessage> {
impl<T: HttpMessage> Readlines<T> {
/// Create a new stream to read request line by line.
fn new(req: T) -> Self {
fn new(req: &mut T) -> Self {
let encoding = match req.encoding() {
Ok(enc) => enc,
Err(err) => return Self::err(req, err.into()),
@ -242,7 +242,7 @@ impl<T: HttpMessage> Readlines<T> {
self
}
fn err(req: T, err: ReadlinesError) -> Self {
fn err(req: &mut T, err: ReadlinesError) -> Self {
Readlines {
stream: req.payload(),
buff: BytesMut::new(),
@ -292,61 +292,65 @@ impl<T: HttpMessage + 'static> Stream for Readlines<T> {
self.checked_buff = true;
}
// poll req for more bytes
match self.stream.poll() {
Ok(Async::Ready(Some(mut bytes))) => {
// check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
if let Some(ref mut stream) = self.stream {
match stream.poll() {
Ok(Async::Ready(Some(mut bytes))) => {
// check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.is_empty() {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind + 1))
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
self.buff.clear();
Ok(Async::Ready(Some(line)))
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
Err(e) => Err(ReadlinesError::from(e)),
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.is_empty() {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
self.buff.clear();
Ok(Async::Ready(Some(line)))
}
Err(e) => Err(ReadlinesError::from(e)),
} else {
Ok(Async::Ready(None))
}
}
}
@ -362,7 +366,7 @@ pub struct MessageBody<T: HttpMessage> {
impl<T: HttpMessage> MessageBody<T> {
/// Create `MessageBody` for request.
pub fn new(req: T) -> MessageBody<T> {
pub fn new(req: &mut T) -> MessageBody<T> {
let mut len = None;
if let Some(l) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
@ -379,7 +383,7 @@ impl<T: HttpMessage> MessageBody<T> {
MessageBody {
limit: 262_144,
length: len,
stream: Some(req.payload()),
stream: req.payload(),
fut: None,
err: None,
}
@ -424,6 +428,10 @@ where
}
}
if self.stream.is_none() {
return Ok(Async::Ready(Bytes::new()));
}
// future
let limit = self.limit;
self.fut = Some(Box::new(
@ -457,7 +465,7 @@ pub struct UrlEncoded<T: HttpMessage, U> {
impl<T: HttpMessage, U> UrlEncoded<T, U> {
/// Create a new future to URL encode a request
pub fn new(req: T) -> UrlEncoded<T, U> {
pub fn new(req: &mut T) -> UrlEncoded<T, U> {
// check content type
if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" {
return Self::err(UrlencodedError::ContentType);
@ -482,7 +490,7 @@ impl<T: HttpMessage, U> UrlEncoded<T, U> {
UrlEncoded {
encoding,
stream: Some(req.payload()),
stream: req.payload(),
limit: 262_144,
length: len,
fut: None,
@ -694,7 +702,7 @@ mod tests {
#[test]
fn test_urlencoded_error() {
let req = TestRequest::with_header(
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
@ -705,7 +713,7 @@ mod tests {
UrlencodedError::UnknownLength
);
let req = TestRequest::with_header(
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
@ -716,7 +724,7 @@ mod tests {
UrlencodedError::Overflow
);
let req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain")
let mut req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain")
.header(header::CONTENT_LENGTH, "10")
.finish();
assert_eq!(
@ -727,7 +735,7 @@ mod tests {
#[test]
fn test_urlencoded() {
let req = TestRequest::with_header(
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
@ -743,7 +751,7 @@ mod tests {
})
);
let req = TestRequest::with_header(
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded; charset=utf-8",
)
@ -762,19 +770,20 @@ mod tests {
#[test]
fn test_message_body() {
let req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish();
let mut req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish();
match req.body().poll().err().unwrap() {
PayloadError::UnknownLength => (),
_ => unreachable!("error"),
}
let req = TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish();
let mut req =
TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish();
match req.body().poll().err().unwrap() {
PayloadError::Overflow => (),
_ => unreachable!("error"),
}
let req = TestRequest::default()
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"test"))
.finish();
match req.body().poll().ok().unwrap() {
@ -782,7 +791,7 @@ mod tests {
_ => unreachable!("error"),
}
let req = TestRequest::default()
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"11111111111111"))
.finish();
match req.body().limit(5).poll().err().unwrap() {
@ -793,14 +802,14 @@ mod tests {
#[test]
fn test_readlines() {
let req = TestRequest::default()
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(
b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
industry. Lorem Ipsum has been the industry's standard dummy\n\
Contrary to popular belief, Lorem Ipsum is not simply random text.",
))
.finish();
let mut r = Readlines::new(req);
let mut r = Readlines::new(&mut req);
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(
s,

View file

@ -50,7 +50,7 @@ pub struct JsonBody<T: HttpMessage, U: DeserializeOwned> {
impl<T: HttpMessage, U: DeserializeOwned> JsonBody<T, U> {
/// Create `JsonBody` for request.
pub fn new(req: T) -> Self {
pub fn new(req: &mut T) -> Self {
// check content-type
let json = if let Ok(Some(mime)) = req.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
@ -79,7 +79,7 @@ impl<T: HttpMessage, U: DeserializeOwned> JsonBody<T, U> {
JsonBody {
limit: 262_144,
length: len,
stream: Some(req.payload()),
stream: req.payload(),
fut: None,
err: None,
}
@ -164,11 +164,11 @@ mod tests {
#[test]
fn test_json_body() {
let req = TestRequest::default().finish();
let mut req = TestRequest::default().finish();
let mut json = req.json::<MyObject>();
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let req = TestRequest::default()
let mut req = TestRequest::default()
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/text"),
@ -177,7 +177,7 @@ mod tests {
let mut json = req.json::<MyObject>();
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let req = TestRequest::default()
let mut req = TestRequest::default()
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
@ -190,7 +190,7 @@ mod tests {
let mut json = req.json::<MyObject>().limit(100);
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow);
let req = TestRequest::default()
let mut req = TestRequest::default()
.header(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),

View file

@ -78,7 +78,6 @@ mod httpcodes;
mod httpmessage;
mod json;
mod message;
mod payload;
mod request;
mod response;
mod service;
@ -111,7 +110,6 @@ pub mod dev {
pub use crate::httpmessage::{MessageBody, Readlines, UrlEncoded};
pub use crate::json::JsonBody;
pub use crate::payload::{Payload, PayloadBuffer};
pub use crate::response::ResponseBuilder;
}

View file

@ -10,7 +10,8 @@ use crate::error::PayloadError;
use crate::extensions::Extensions;
use crate::httpmessage::HttpMessage;
use crate::message::{Message, MessagePool, RequestHead};
use crate::payload::Payload;
use crate::h1::Payload;
/// Request
pub struct Request<P = Payload> {
@ -29,8 +30,8 @@ where
}
#[inline]
fn payload(mut self) -> P {
self.payload.take().unwrap()
fn payload(&mut self) -> Option<P> {
self.payload.take()
}
}
@ -62,9 +63,9 @@ impl<Payload> Request<Payload> {
}
/// Take request's payload
pub fn take_payload(mut self) -> (Payload, Request<()>) {
pub fn take_payload(mut self) -> (Option<Payload>, Request<()>) {
(
self.payload.take().unwrap(),
self.payload.take(),
Request {
payload: Some(()),
inner: self.inner.clone(),

View file

@ -1,3 +1,5 @@
use h2::RecvStream;
mod senderror;
pub use self::senderror::{SendError, SendResponse};

View file

@ -6,8 +6,8 @@ use cookie::Cookie;
use http::header::HeaderName;
use http::{HeaderMap, HttpTryFrom, Method, Uri, Version};
use crate::h1::Payload;
use crate::header::{Header, IntoHeaderValue};
use crate::payload::Payload;
use crate::Request;
/// Test `Request` builder

View file

@ -47,7 +47,7 @@ fn test_h1_v2() {
assert!(repr.contains("ClientRequest"));
assert!(repr.contains("x-test"));
let response = srv.block_on(request.send(&mut connector)).unwrap();
let mut response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success());
// read response
@ -55,7 +55,7 @@ fn test_h1_v2() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
let request = srv.post().finish().unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
let mut response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success());
// read response

View file

@ -89,7 +89,7 @@ fn test_h2_body() -> std::io::Result<()> {
.map_err(|e| println!("Openssl error: {}", e))
.and_then(
h2::H2Service::build()
.finish(|req: Request<_>| {
.finish(|mut req: Request<_>| {
req.body()
.limit(1024 * 1024)
.and_then(|body| Ok(Response::Ok().body(body)))
@ -101,7 +101,7 @@ fn test_h2_body() -> std::io::Result<()> {
let req = client::ClientRequest::get(srv.surl("/"))
.body(data.clone())
.unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap();
@ -350,7 +350,7 @@ fn test_headers() {
let req = srv.get().finish().unwrap();
let response = srv.block_on(req.send(&mut connector)).unwrap();
let mut response = srv.block_on(req.send(&mut connector)).unwrap();
assert!(response.status().is_success());
// read response
@ -387,7 +387,7 @@ fn test_body() {
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
// read response
@ -402,7 +402,7 @@ fn test_head_empty() {
});
let req = client::ClientRequest::head(srv.url("/")).finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
{
@ -428,7 +428,7 @@ fn test_head_binary() {
});
let req = client::ClientRequest::head(srv.url("/")).finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
{
@ -477,7 +477,7 @@ fn test_body_length() {
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
// read response
@ -496,7 +496,7 @@ fn test_body_chunked_explicit() {
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
// read response
@ -517,7 +517,7 @@ fn test_body_chunked_implicit() {
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
// read response
@ -540,7 +540,7 @@ fn test_response_http_error_handling() {
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
let mut response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response

View file

@ -5,7 +5,6 @@ use actix_http_test::TestServer;
use actix_service::NewService;
use actix_utils::framed::IntoFramed;
use actix_utils::stream::TakeItem;
use actix_web::ws as web_ws;
use bytes::{Bytes, BytesMut};
use futures::future::{lazy, ok, Either};
use futures::{Future, Sink, Stream};
@ -105,37 +104,4 @@ fn test_simple() {
item,
Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into())))
);
{
let mut sys = actix_web::actix::System::new("test");
let url = srv.url("/");
let (reader, mut writer) = sys
.block_on(lazy(|| web_ws::Client::new(url).connect()))
.unwrap();
writer.text("text");
let (item, reader) = sys.block_on(reader.into_future()).unwrap();
assert_eq!(item, Some(web_ws::Message::Text("text".to_owned())));
writer.binary(b"text".as_ref());
let (item, reader) = sys.block_on(reader.into_future()).unwrap();
assert_eq!(
item,
Some(web_ws::Message::Binary(Bytes::from_static(b"text").into()))
);
writer.ping("ping");
let (item, reader) = sys.block_on(reader.into_future()).unwrap();
assert_eq!(item, Some(web_ws::Message::Pong("ping".to_owned())));
writer.close(Some(web_ws::CloseCode::Normal.into()));
let (item, _) = sys.block_on(reader.into_future()).unwrap();
assert_eq!(
item,
Some(web_ws::Message::Close(Some(
web_ws::CloseCode::Normal.into()
)))
);
}
}