1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-11 17:59:35 +00:00

migrate to -utils beta 4 (#2127)

This commit is contained in:
Rob Ede 2021-04-01 15:26:13 +01:00 committed by GitHub
parent a807d33600
commit c8ed8dd1a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 612 additions and 210 deletions

View file

@ -1,3 +1,3 @@
[alias] [alias]
chk = "hack check --workspace --tests --examples" chk = "hack check --workspace --all-features --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples" lint = "hack --clean-per-run clippy --workspace --tests --examples"

View file

@ -83,7 +83,7 @@ actix-router = "0.2.7"
actix-rt = "2.2" actix-rt = "2.2"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
actix-service = "2.0.0-beta.4" actix-service = "2.0.0-beta.4"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0-beta.4"
actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true } actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true }
actix-web-codegen = "0.5.0-beta.2" actix-web-codegen = "0.5.0-beta.2"

View file

@ -19,12 +19,12 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "4.0.0-beta.4", default-features = false } actix-web = { version = "4.0.0-beta.4", default-features = false }
actix-service = "2.0.0-beta.4" actix-service = "2.0.0-beta.4"
actix-utils = "3.0.0-beta.4"
askama_escape = "0.10" askama_escape = "0.10"
bitflags = "1" bitflags = "1"
bytes = "1" bytes = "1"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false }
http-range = "0.1.4" http-range = "0.1.4"
derive_more = "0.99.5" derive_more = "0.99.5"
log = "0.4" log = "0.4"

View file

@ -1,6 +1,7 @@
use std::{cell::RefCell, fmt, io, path::PathBuf, rc::Rc}; use std::{cell::RefCell, fmt, io, path::PathBuf, rc::Rc};
use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt}; use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt};
use actix_utils::future::ok;
use actix_web::{ use actix_web::{
dev::{AppService, HttpServiceFactory, ResourceDef, ServiceRequest, ServiceResponse}, dev::{AppService, HttpServiceFactory, ResourceDef, ServiceRequest, ServiceResponse},
error::Error, error::Error,
@ -8,7 +9,7 @@ use actix_web::{
http::header::DispositionType, http::header::DispositionType,
HttpRequest, HttpRequest,
}; };
use futures_util::future::{ok, FutureExt, LocalBoxFuture}; use futures_core::future::LocalBoxFuture;
use crate::{ use crate::{
directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService, directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService,
@ -263,18 +264,18 @@ impl ServiceFactory<ServiceRequest> for Files {
}; };
if let Some(ref default) = *self.default.borrow() { if let Some(ref default) = *self.default.borrow() {
default let fut = default.new_service(());
.new_service(()) Box::pin(async {
.map(move |result| match result { match fut.await {
Ok(default) => { Ok(default) => {
srv.default = Some(default); srv.default = Some(default);
Ok(srv) Ok(srv)
} }
Err(_) => Err(()), Err(_) => Err(()),
}) }
.boxed_local() })
} else { } else {
ok(srv).boxed_local() Box::pin(ok(srv))
} }
} }
} }

View file

@ -65,6 +65,7 @@ mod tests {
}; };
use actix_service::ServiceFactory; use actix_service::ServiceFactory;
use actix_utils::future::ok;
use actix_web::{ use actix_web::{
guard, guard,
http::{ http::{
@ -76,7 +77,6 @@ mod tests {
web::{self, Bytes}, web::{self, Bytes},
App, HttpResponse, Responder, App, HttpResponse, Responder,
}; };
use futures_util::future::ok;
use super::*; use super::*;

View file

@ -3,8 +3,8 @@ use std::{
str::FromStr, str::FromStr,
}; };
use actix_utils::future::{ready, Ready};
use actix_web::{dev::Payload, FromRequest, HttpRequest}; use actix_web::{dev::Payload, FromRequest, HttpRequest};
use futures_util::future::{ready, Ready};
use crate::error::UriSegmentError; use crate::error::UriSegmentError;

View file

@ -1,6 +1,7 @@
use std::{fmt, io, path::PathBuf, rc::Rc}; use std::{fmt, io, path::PathBuf, rc::Rc};
use actix_service::Service; use actix_service::Service;
use actix_utils::future::ok;
use actix_web::{ use actix_web::{
dev::{ServiceRequest, ServiceResponse}, dev::{ServiceRequest, ServiceResponse},
error::Error, error::Error,
@ -8,7 +9,7 @@ use actix_web::{
http::{header, Method}, http::{header, Method},
HttpResponse, HttpResponse,
}; };
use futures_util::future::{ok, Either, LocalBoxFuture, Ready}; use futures_core::future::LocalBoxFuture;
use crate::{ use crate::{
named, Directory, DirectoryRenderer, FilesError, HttpService, MimeOverride, NamedFile, named, Directory, DirectoryRenderer, FilesError, HttpService, MimeOverride, NamedFile,
@ -29,19 +30,18 @@ pub struct FilesService {
pub(crate) hidden_files: bool, pub(crate) hidden_files: bool,
} }
type FilesServiceFuture = Either<
Ready<Result<ServiceResponse, Error>>,
LocalBoxFuture<'static, Result<ServiceResponse, Error>>,
>;
impl FilesService { impl FilesService {
fn handle_err(&self, e: io::Error, req: ServiceRequest) -> FilesServiceFuture { fn handle_err(
log::debug!("Failed to handle {}: {}", req.path(), e); &self,
err: io::Error,
req: ServiceRequest,
) -> LocalBoxFuture<'static, Result<ServiceResponse, Error>> {
log::debug!("error handling {}: {}", req.path(), err);
if let Some(ref default) = self.default { if let Some(ref default) = self.default {
Either::Right(default.call(req)) Box::pin(default.call(req))
} else { } else {
Either::Left(ok(req.error_response(e))) Box::pin(ok(req.error_response(err)))
} }
} }
} }
@ -55,7 +55,7 @@ impl fmt::Debug for FilesService {
impl Service<ServiceRequest> for FilesService { impl Service<ServiceRequest> for FilesService {
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = Error; type Error = Error;
type Future = FilesServiceFuture; type Future = LocalBoxFuture<'static, Result<ServiceResponse, Error>>;
actix_service::always_ready!(); actix_service::always_ready!();
@ -69,7 +69,7 @@ impl Service<ServiceRequest> for FilesService {
}; };
if !is_method_valid { if !is_method_valid {
return Either::Left(ok(req.into_response( return Box::pin(ok(req.into_response(
actix_web::HttpResponse::MethodNotAllowed() actix_web::HttpResponse::MethodNotAllowed()
.insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8))
.body("Request did not meet this resource's requirements."), .body("Request did not meet this resource's requirements."),
@ -79,13 +79,13 @@ impl Service<ServiceRequest> for FilesService {
let real_path = let real_path =
match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) { match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) {
Ok(item) => item, Ok(item) => item,
Err(e) => return Either::Left(ok(req.error_response(e))), Err(e) => return Box::pin(ok(req.error_response(e))),
}; };
// full file path // full file path
let path = match self.directory.join(&real_path).canonicalize() { let path = match self.directory.join(&real_path).canonicalize() {
Ok(path) => path, Ok(path) => path,
Err(e) => return self.handle_err(e, req), Err(err) => return Box::pin(self.handle_err(err, req)),
}; };
if path.is_dir() { if path.is_dir() {
@ -93,7 +93,7 @@ impl Service<ServiceRequest> for FilesService {
if self.redirect_to_slash && !req.path().ends_with('/') { if self.redirect_to_slash && !req.path().ends_with('/') {
let redirect_to = format!("{}/", req.path()); let redirect_to = format!("{}/", req.path());
return Either::Left(ok(req.into_response( return Box::pin(ok(req.into_response(
HttpResponse::Found() HttpResponse::Found()
.insert_header((header::LOCATION, redirect_to)) .insert_header((header::LOCATION, redirect_to))
.body("") .body("")
@ -114,9 +114,9 @@ impl Service<ServiceRequest> for FilesService {
let (req, _) = req.into_parts(); let (req, _) = req.into_parts();
let res = named_file.into_response(&req); let res = named_file.into_response(&req);
Either::Left(ok(ServiceResponse::new(req, res))) Box::pin(ok(ServiceResponse::new(req, res)))
} }
Err(e) => self.handle_err(e, req), Err(err) => self.handle_err(err, req),
} }
} else if self.show_index { } else if self.show_index {
let dir = Directory::new(self.directory.clone(), path); let dir = Directory::new(self.directory.clone(), path);
@ -124,12 +124,12 @@ impl Service<ServiceRequest> for FilesService {
let (req, _) = req.into_parts(); let (req, _) = req.into_parts();
let x = (self.renderer)(&dir, &req); let x = (self.renderer)(&dir, &req);
match x { Box::pin(match x {
Ok(resp) => Either::Left(ok(resp)), Ok(resp) => ok(resp),
Err(e) => Either::Left(ok(ServiceResponse::from_err(e, req))), Err(err) => ok(ServiceResponse::from_err(err, req)),
} })
} else { } else {
Either::Left(ok(ServiceResponse::from_err( Box::pin(ok(ServiceResponse::from_err(
FilesError::IsDirectory, FilesError::IsDirectory,
req.into_parts().0, req.into_parts().0,
))) )))
@ -145,9 +145,9 @@ impl Service<ServiceRequest> for FilesService {
let (req, _) = req.into_parts(); let (req, _) = req.into_parts();
let res = named_file.into_response(&req); let res = named_file.into_response(&req);
Either::Left(ok(ServiceResponse::new(req, res))) Box::pin(ok(ServiceResponse::new(req, res)))
} }
Err(e) => self.handle_err(e, req), Err(err) => self.handle_err(err, req),
} }
} }
} }

View file

@ -32,7 +32,7 @@ openssl = ["tls-openssl", "awc/openssl"]
actix-service = "2.0.0-beta.4" actix-service = "2.0.0-beta.4"
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-tls = "3.0.0-beta.5" actix-tls = "3.0.0-beta.5"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0-beta.4"
actix-rt = "2.2" actix-rt = "2.2"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
awc = { version = "3.0.0-beta.3", default-features = false } awc = { version = "3.0.0-beta.3", default-features = false }

View file

@ -9,8 +9,12 @@
### Changed ### Changed
* `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063] * `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063]
### Removed
* `ResponseError` impl for `actix_utils::timeout::TimeoutError`. [#2127]
[#2063]: https://github.com/actix/actix-web/pull/2063 [#2063]: https://github.com/actix/actix-web/pull/2063
[#2081]: https://github.com/actix/actix-web/pull/2081 [#2081]: https://github.com/actix/actix-web/pull/2081
[#2127]: https://github.com/actix/actix-web/pull/2127
## 3.0.0-beta.4 - 2021-03-08 ## 3.0.0-beta.4 - 2021-03-08

View file

@ -46,7 +46,7 @@ trust-dns = ["trust-dns-resolver"]
[dependencies] [dependencies]
actix-service = "2.0.0-beta.4" actix-service = "2.0.0-beta.4"
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0-beta.4"
actix-rt = "2.2" actix-rt = "2.2"
actix-tls = "3.0.0-beta.5" actix-tls = "3.0.0-beta.5"
@ -65,11 +65,13 @@ http = "0.2.2"
httparse = "1.3" httparse = "1.3"
itoa = "0.4" itoa = "0.4"
language-tags = "0.2" language-tags = "0.2"
local-channel = "0.1"
once_cell = "1.5" once_cell = "1.5"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
percent-encoding = "2.1" percent-encoding = "2.1"
pin-project = "1.0.0" pin-project = "1.0.0"
pin-project-lite = "0.2"
rand = "0.8" rand = "0.8"
regex = "1.3" regex = "1.3"
serde = "1.0" serde = "1.0"

View file

@ -2,7 +2,7 @@ use std::{env, io};
use actix_http::{HttpService, Response}; use actix_http::{HttpService, Response};
use actix_server::Server; use actix_server::Server;
use futures_util::future; use actix_utils::future;
use http::header::HeaderValue; use http::header::HeaderValue;
use log::info; use log::info;

View file

@ -20,8 +20,9 @@ mod tests {
use std::pin::Pin; use std::pin::Pin;
use actix_rt::pin; use actix_rt::pin;
use actix_utils::future::poll_fn;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_util::{future::poll_fn, stream}; use futures_util::stream;
use super::*; use super::*;

View file

@ -5,10 +5,11 @@ use std::{
}; };
use actix_codec::Framed; use actix_codec::Framed;
use actix_utils::future::poll_fn;
use bytes::buf::BufMut; use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use futures_util::{future::poll_fn, SinkExt as _}; use futures_util::SinkExt as _;
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;

View file

@ -1,7 +1,7 @@
use std::future::Future; use std::future::Future;
use actix_utils::future::poll_fn;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::poll_fn;
use h2::{ use h2::{
client::{Builder, Connection, SendRequest}, client::{Builder, Connection, SendRequest},
SendStream, SendStream,

View file

@ -6,9 +6,6 @@ use std::str::Utf8Error;
use std::string::FromUtf8Error; use std::string::FromUtf8Error;
use std::{fmt, io, result}; use std::{fmt, io, result};
use actix_codec::{Decoder, Encoder};
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut; use bytes::BytesMut;
use derive_more::{Display, From}; use derive_more::{Display, From};
use http::uri::InvalidUri; use http::uri::InvalidUri;
@ -148,19 +145,6 @@ impl From<ResponseBuilder> for Error {
} }
} }
/// Inspects the underlying enum and returns an appropriate status code.
///
/// If the variant is [`TimeoutError::Service`], the error code of the service is returned.
/// Otherwise, [`StatusCode::GATEWAY_TIMEOUT`] is returned.
impl<E: ResponseError> ResponseError for TimeoutError<E> {
fn status_code(&self) -> StatusCode {
match self {
TimeoutError::Service(e) => e.status_code(),
TimeoutError::Timeout => StatusCode::GATEWAY_TIMEOUT,
}
}
}
#[derive(Debug, Display)] #[derive(Debug, Display)]
#[display(fmt = "UnknownError")] #[display(fmt = "UnknownError")]
struct UnitError; struct UnitError;
@ -469,14 +453,6 @@ impl ResponseError for ContentTypeError {
} }
} }
impl<E, U: Encoder<I> + Decoder, I> ResponseError for FramedDispatcherError<E, U, I>
where
E: fmt::Debug + fmt::Display,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
}
/// Helper type that can wrap any error and generate custom response. /// Helper type that can wrap any error and generate custom response.
/// ///
/// In following example any `io::Error` will be converted into "BAD REQUEST" /// In following example any `io::Error` will be converted into "BAD REQUEST"

View file

@ -951,7 +951,8 @@ mod tests {
use std::str; use std::str;
use actix_service::fn_service; use actix_service::fn_service;
use futures_util::future::{lazy, ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_util::future::lazy;
use super::*; use super::*;
use crate::{ use crate::{

View file

@ -1,5 +1,5 @@
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use crate::error::Error; use crate::error::Error;
use crate::request::Request; use crate::request::Request;

View file

@ -263,7 +263,7 @@ impl Inner {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures_util::future::poll_fn; use actix_utils::future::poll_fn;
#[actix_rt::test] #[actix_rt::test]
async fn test_unread_data() { async fn test_unread_data() {

View file

@ -6,8 +6,8 @@ use std::{fmt, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use actix_utils::future::ready;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use futures_util::future::ready;
use crate::body::MessageBody; use crate::body::MessageBody;
use crate::config::ServiceConfig; use crate::config::ServiceConfig;

View file

@ -10,9 +10,9 @@ use actix_service::{
fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service, fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service,
ServiceFactory, ServiceFactory,
}; };
use actix_utils::future::ready;
use bytes::Bytes; use bytes::Bytes;
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use futures_util::future::ready;
use h2::server::{handshake, Handshake}; use h2::server::{handshake, Handshake};
use log::error; use log::error;

View file

@ -4,7 +4,6 @@ use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};
use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError};
use super::{Codec, Frame, Message}; use super::{Codec, Frame, Message};
@ -15,7 +14,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
#[pin] #[pin]
inner: InnerDispatcher<S, T, Codec, Message>, inner: inner::Dispatcher<S, T, Codec, Message>,
} }
impl<S, T> Dispatcher<S, T> impl<S, T> Dispatcher<S, T>
@ -27,13 +26,13 @@ where
{ {
pub fn new<F: IntoService<S, Frame>>(io: T, service: F) -> Self { pub fn new<F: IntoService<S, Frame>>(io: T, service: F) -> Self {
Dispatcher { Dispatcher {
inner: InnerDispatcher::new(Framed::new(io, Codec::new()), service), inner: inner::Dispatcher::new(Framed::new(io, Codec::new()), service),
} }
} }
pub fn with<F: IntoService<S, Frame>>(framed: Framed<T, Codec>, service: F) -> Self { pub fn with<F: IntoService<S, Frame>>(framed: Framed<T, Codec>, service: F) -> Self {
Dispatcher { Dispatcher {
inner: InnerDispatcher::new(framed, service), inner: inner::Dispatcher::new(framed, service),
} }
} }
} }
@ -45,9 +44,393 @@ where
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
{ {
type Output = Result<(), DispatcherError<S::Error, Codec, Message>>; type Output = Result<(), inner::DispatcherError<S::Error, Codec, Message>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx) self.project().inner.poll(cx)
} }
} }
/// Framed dispatcher service and related utilities.
mod inner {
// allow dead code since this mod was ripped from actix-utils
#![allow(dead_code)]
use core::{
fmt,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use actix_service::{IntoService, Service};
use futures_core::stream::Stream;
use local_channel::mpsc;
use log::debug;
use pin_project_lite::pin_project;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use crate::ResponseError;
/// Framed transport errors
pub enum DispatcherError<E, U, I>
where
U: Encoder<I> + Decoder,
{
/// Inner service error.
Service(E),
/// Frame encoding error.
Encoder(<U as Encoder<I>>::Error),
/// Frame decoding error.
Decoder(<U as Decoder>::Error),
}
impl<E, U, I> From<E> for DispatcherError<E, U, I>
where
U: Encoder<I> + Decoder,
{
fn from(err: E) -> Self {
DispatcherError::Service(err)
}
}
impl<E, U, I> fmt::Debug for DispatcherError<E, U, I>
where
E: fmt::Debug,
U: Encoder<I> + Decoder,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
DispatcherError::Service(ref e) => {
write!(fmt, "DispatcherError::Service({:?})", e)
}
DispatcherError::Encoder(ref e) => {
write!(fmt, "DispatcherError::Encoder({:?})", e)
}
DispatcherError::Decoder(ref e) => {
write!(fmt, "DispatcherError::Decoder({:?})", e)
}
}
}
}
impl<E, U, I> fmt::Display for DispatcherError<E, U, I>
where
E: fmt::Display,
U: Encoder<I> + Decoder,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
DispatcherError::Service(ref e) => write!(fmt, "{}", e),
DispatcherError::Encoder(ref e) => write!(fmt, "{:?}", e),
DispatcherError::Decoder(ref e) => write!(fmt, "{:?}", e),
}
}
}
impl<E, U, I> ResponseError for DispatcherError<E, U, I>
where
E: fmt::Debug + fmt::Display,
U: Encoder<I> + Decoder,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
}
/// Message type wrapper for signalling end of message stream.
pub enum Message<T> {
/// Message item.
Item(T),
/// Signal from service to flush all messages and stop processing.
Close,
}
pin_project! {
/// A future that reads frames from a [`Framed`] object and passes them to a [`Service`].
pub struct Dispatcher<S, T, U, I>
where
S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead,
T: AsyncWrite,
U: Encoder<I>,
U: Decoder,
I: 'static,
<U as Encoder<I>>::Error: fmt::Debug,
{
service: S,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
}
enum State<S, U, I>
where
S: Service<<U as Decoder>::Item>,
U: Encoder<I> + Decoder,
{
Processing,
Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>),
FlushAndStop,
Stopping,
}
impl<S, U, I> State<S, U, I>
where
S: Service<<U as Decoder>::Item>,
U: Encoder<I> + Decoder,
{
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::FramedError(err) => err,
_ => panic!(),
}
}
}
impl<S, T, U, I> Dispatcher<S, T, U, I>
where
S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Decoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
{
/// Create new `Dispatcher`.
pub fn new<F>(framed: Framed<T, U>, service: F) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let (tx, rx) = mpsc::channel();
Dispatcher {
framed,
rx,
tx,
service: service.into_service(),
state: State::Processing,
}
}
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
pub fn with_rx<F>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let tx = rx.sender();
Dispatcher {
framed,
rx,
tx,
service: service.into_service(),
state: State::Processing,
}
}
/// Get sender handle.
pub fn tx(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
/// Get reference to a service wrapped by `Dispatcher` instance.
pub fn service(&self) -> &S {
&self.service
}
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
pub fn service_mut(&mut self) -> &mut S {
&mut self.service
}
/// Get reference to a framed instance wrapped by `Dispatcher` instance.
pub fn framed(&self) -> &Framed<T, U> {
&self.framed
}
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
&mut self.framed
}
/// Read from framed object.
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: fmt::Debug,
{
loop {
let this = self.as_mut().project();
match this.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match this.framed.next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
*this.state =
State::FramedError(DispatcherError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
*this.state = State::Stopping;
return true;
}
};
let tx = this.tx.clone();
let fut = this.service.call(item);
actix_rt::spawn(async move {
let item = fut.await;
let _ = tx.send(item.map(Message::Item));
});
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
*this.state = State::Error(DispatcherError::Service(err));
return true;
}
}
}
}
/// Write to framed object.
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state =
State::FramedError(DispatcherError::Encoder(err));
return true;
}
}
Poll::Ready(Some(Ok(Message::Close))) => {
*this.state = State::FlushAndStop;
return true;
}
Poll::Ready(Some(Err(err))) => {
*this.state = State::Error(DispatcherError::Service(err));
return true;
}
Poll::Ready(None) | Poll::Pending => break,
}
}
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
*this.state =
State::FramedError(DispatcherError::Encoder(err));
return true;
}
}
} else {
break;
}
}
false
}
}
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where
S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let this = self.as_mut().project();
return match this.state {
State::Processing => {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
continue;
} else {
Poll::Pending
}
}
State::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty()
&& this.framed.flush(cx).is_pending()
{
return Poll::Pending;
}
Poll::Ready(Err(this.state.take_error()))
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
this.framed.flush(cx).map(|res| {
if let Err(err) = res {
debug!("Error sending data: {:?}", err);
}
Ok(())
})
} else {
Poll::Ready(Ok(()))
}
}
State::FramedError(_) => {
Poll::Ready(Err(this.state.take_framed_error()))
}
State::Stopping => Poll::Ready(Ok(())),
};
}
}
}
}

View file

@ -3,11 +3,9 @@ use actix_http::{
}; };
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::ServiceFactoryExt; use actix_service::ServiceFactoryExt;
use actix_utils::future;
use bytes::Bytes; use bytes::Bytes;
use futures_util::{ use futures_util::StreamExt as _;
future::{self, ok},
StreamExt as _,
};
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \
@ -63,7 +61,7 @@ async fn test_h1_v2() {
async fn test_connection_close() { async fn test_connection_close() {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.finish(|_| ok::<_, ()>(Response::Ok().body(STR))) .finish(|_| future::ok::<_, ()>(Response::Ok().body(STR)))
.tcp() .tcp()
.map(|_| ()) .map(|_| ())
}) })
@ -79,9 +77,9 @@ async fn test_with_query_parameter() {
HttpService::build() HttpService::build()
.finish(|req: Request| { .finish(|req: Request| {
if req.uri().query().unwrap().contains("qp=") { if req.uri().query().unwrap().contains("qp=") {
ok::<_, ()>(Response::Ok().finish()) future::ok::<_, ()>(Response::Ok().finish())
} else { } else {
ok::<_, ()>(Response::BadRequest().finish()) future::ok::<_, ()>(Response::BadRequest().finish())
} }
}) })
.tcp() .tcp()

View file

@ -11,12 +11,10 @@ use actix_http::HttpMessage;
use actix_http::{body, Error, HttpService, Request, Response}; use actix_http::{body, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{fn_service, ServiceFactoryExt}; use actix_service::{fn_service, ServiceFactoryExt};
use actix_utils::future::{err, ok, ready};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::{ use futures_util::stream::{once, StreamExt as _};
future::{err, ok, ready},
stream::{once, StreamExt as _},
};
use openssl::{ use openssl::{
pkey::PKey, pkey::PKey,
ssl::{SslAcceptor, SslMethod}, ssl::{SslAcceptor, SslMethod},

View file

@ -8,10 +8,10 @@ use actix_http::http::{Method, StatusCode, Version};
use actix_http::{body, error, Error, HttpService, Request, Response}; use actix_http::{body, error, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{fn_factory_with_config, fn_service}; use actix_service::{fn_factory_with_config, fn_service};
use actix_utils::future::{err, ok};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::{self, err, ok};
use futures_util::stream::{once, StreamExt as _}; use futures_util::stream::{once, StreamExt as _};
use rustls::{ use rustls::{
internal::pemfile::{certs, pkcs8_private_keys}, internal::pemfile::{certs, pkcs8_private_keys},
@ -51,7 +51,7 @@ fn tls_config() -> RustlsServerConfig {
async fn test_h1() -> io::Result<()> { async fn test_h1() -> io::Result<()> {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, Error>(Response::Ok().finish())) .h1(|_| ok::<_, Error>(Response::Ok().finish()))
.rustls(tls_config()) .rustls(tls_config())
}) })
.await; .await;
@ -65,7 +65,7 @@ async fn test_h1() -> io::Result<()> {
async fn test_h2() -> io::Result<()> { async fn test_h2() -> io::Result<()> {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|_| future::ok::<_, Error>(Response::Ok().finish())) .h2(|_| ok::<_, Error>(Response::Ok().finish()))
.rustls(tls_config()) .rustls(tls_config())
}) })
.await; .await;
@ -82,7 +82,7 @@ async fn test_h1_1() -> io::Result<()> {
.h1(|req: Request| { .h1(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
future::ok::<_, Error>(Response::Ok().finish()) ok::<_, Error>(Response::Ok().finish())
}) })
.rustls(tls_config()) .rustls(tls_config())
}) })
@ -100,7 +100,7 @@ async fn test_h2_1() -> io::Result<()> {
.finish(|req: Request| { .finish(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
assert_eq!(req.version(), Version::HTTP_2); assert_eq!(req.version(), Version::HTTP_2);
future::ok::<_, Error>(Response::Ok().finish()) ok::<_, Error>(Response::Ok().finish())
}) })
.rustls(tls_config()) .rustls(tls_config())
}) })
@ -144,7 +144,7 @@ async fn test_h2_content_length() {
StatusCode::OK, StatusCode::OK,
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
]; ];
future::ok::<_, ()>(Response::new(statuses[indx])) ok::<_, ()>(Response::new(statuses[indx]))
}) })
.rustls(tls_config()) .rustls(tls_config())
}) })
@ -213,7 +213,7 @@ async fn test_h2_headers() {
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ", TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ",
)); ));
} }
future::ok::<_, ()>(config.body(data.clone())) ok::<_, ()>(config.body(data.clone()))
}) })
.rustls(tls_config()) .rustls(tls_config())
}).await; }).await;
@ -252,7 +252,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
async fn test_h2_body2() { async fn test_h2_body2() {
let mut srv = test_server(move || { let mut srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|_| future::ok::<_, ()>(Response::Ok().body(STR))) .h2(|_| ok::<_, ()>(Response::Ok().body(STR)))
.rustls(tls_config()) .rustls(tls_config())
}) })
.await; .await;

View file

@ -5,9 +5,10 @@ use std::{net, thread};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_rt::time::sleep; use actix_rt::time::sleep;
use actix_service::fn_service; use actix_service::fn_service;
use actix_utils::future::{err, ok, ready};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{self, err, ok, ready, FutureExt};
use futures_util::stream::{once, StreamExt as _}; use futures_util::stream::{once, StreamExt as _};
use futures_util::FutureExt as _;
use regex::Regex; use regex::Regex;
use actix_http::HttpMessage; use actix_http::HttpMessage;
@ -24,7 +25,7 @@ async fn test_h1() {
.client_disconnect(1000) .client_disconnect(1000)
.h1(|req: Request| { .h1(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
future::ok::<_, ()>(Response::Ok().finish()) ok::<_, ()>(Response::Ok().finish())
}) })
.tcp() .tcp()
}) })
@ -44,7 +45,7 @@ async fn test_h1_2() {
.finish(|req: Request| { .finish(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
assert_eq!(req.version(), http::Version::HTTP_11); assert_eq!(req.version(), http::Version::HTTP_11);
future::ok::<_, ()>(Response::Ok().finish()) ok::<_, ()>(Response::Ok().finish())
}) })
.tcp() .tcp()
}) })
@ -65,7 +66,7 @@ async fn test_expect_continue() {
err(error::ErrorPreconditionFailed("error")) err(error::ErrorPreconditionFailed("error"))
} }
})) }))
.finish(|_| future::ok::<_, ()>(Response::Ok().finish())) .finish(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -96,7 +97,7 @@ async fn test_expect_continue_h1() {
} }
}) })
})) }))
.h1(fn_service(|_| future::ok::<_, ()>(Response::Ok().finish()))) .h1(fn_service(|_| ok::<_, ()>(Response::Ok().finish())))
.tcp() .tcp()
}) })
.await; .await;
@ -175,7 +176,7 @@ async fn test_slow_request() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.client_timeout(100) .client_timeout(100)
.finish(|_| future::ok::<_, ()>(Response::Ok().finish())) .finish(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -191,7 +192,7 @@ async fn test_slow_request() {
async fn test_http1_malformed_request() { async fn test_http1_malformed_request() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -207,7 +208,7 @@ async fn test_http1_malformed_request() {
async fn test_http1_keepalive() { async fn test_http1_keepalive() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -229,7 +230,7 @@ async fn test_http1_keepalive_timeout() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(1) .keep_alive(1)
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -250,7 +251,7 @@ async fn test_http1_keepalive_timeout() {
async fn test_http1_keepalive_close() { async fn test_http1_keepalive_close() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -271,7 +272,7 @@ async fn test_http1_keepalive_close() {
async fn test_http10_keepalive_default_close() { async fn test_http10_keepalive_default_close() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -291,7 +292,7 @@ async fn test_http10_keepalive_default_close() {
async fn test_http10_keepalive() { async fn test_http10_keepalive() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -319,7 +320,7 @@ async fn test_http1_keepalive_disabled() {
let srv = test_server(|| { let srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| ok::<_, ()>(Response::Ok().finish()))
.tcp() .tcp()
}) })
.await; .await;
@ -354,7 +355,7 @@ async fn test_content_length() {
StatusCode::OK, StatusCode::OK,
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
]; ];
future::ok::<_, ()>(Response::new(statuses[indx])) ok::<_, ()>(Response::new(statuses[indx]))
}) })
.tcp() .tcp()
}) })
@ -409,7 +410,7 @@ async fn test_h1_headers() {
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ", TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ",
)); ));
} }
future::ok::<_, ()>(builder.body(data.clone())) ok::<_, ()>(builder.body(data.clone()))
}).tcp() }).tcp()
}).await; }).await;
@ -645,7 +646,7 @@ async fn test_h1_response_http_error_handling() {
async fn test_h1_service_error() { async fn test_h1_service_error() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| future::err::<Response, Error>(error::ErrorBadRequest("error"))) .h1(|_| err::<Response, _>(error::ErrorBadRequest("error")))
.tcp() .tcp()
}) })
.await; .await;
@ -667,7 +668,7 @@ async fn test_h1_on_connect() {
}) })
.h1(|req: Request| { .h1(|req: Request| {
assert!(req.extensions().contains::<isize>()); assert!(req.extensions().contains::<isize>());
future::ok::<_, ()>(Response::Ok().finish()) ok::<_, ()>(Response::Ok().finish())
}) })
.tcp() .tcp()
}) })

View file

@ -9,11 +9,12 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::{body, h1, ws, Error, HttpService, Request, Response}; use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{fn_factory, Service}; use actix_service::{fn_factory, Service};
use actix_utils::dispatcher::Dispatcher; use actix_utils::future;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future;
use futures_util::{SinkExt as _, StreamExt as _}; use futures_util::{SinkExt as _, StreamExt as _};
use crate::ws::Dispatcher;
struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>); struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>);
impl<T> WsService<T> { impl<T> WsService<T> {
@ -58,7 +59,7 @@ where
.await .await
.unwrap(); .unwrap();
Dispatcher::new(framed.replace_codec(ws::Codec::new()), service) Dispatcher::with(framed.replace_codec(ws::Codec::new()), service)
.await .await
.map_err(|_| panic!()) .map_err(|_| panic!())
}; };

View file

@ -17,12 +17,14 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "4.0.0-beta.4", default-features = false } actix-web = { version = "4.0.0-beta.4", default-features = false }
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0-beta.4"
bytes = "1" bytes = "1"
derive_more = "0.99.5" derive_more = "0.99.5"
httparse = "1.3" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
httparse = "1.3"
local-waker = "0.1"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
twoway = "0.2" twoway = "0.2"

View file

@ -1,21 +1,22 @@
//! Multipart payload support //! Multipart payload support
use actix_utils::future::{ready, Ready};
use actix_web::{dev::Payload, Error, FromRequest, HttpRequest}; use actix_web::{dev::Payload, Error, FromRequest, HttpRequest};
use futures_util::future::{ok, Ready};
use crate::server::Multipart; use crate::server::Multipart;
/// Get request's payload as multipart stream /// Get request's payload as multipart stream.
/// ///
/// Content-type: multipart/form-data; /// Content-type: multipart/form-data;
/// ///
/// ## Server example /// ## Server example
/// ///
/// ``` /// ```
/// use futures_util::stream::{Stream, StreamExt};
/// use actix_web::{web, HttpResponse, Error}; /// use actix_web::{web, HttpResponse, Error};
/// use actix_multipart as mp; /// use actix_multipart::Multipart;
/// use futures_util::stream::StreamExt as _;
/// ///
/// async fn index(mut payload: mp::Multipart) -> Result<HttpResponse, Error> { /// async fn index(mut payload: Multipart) -> Result<HttpResponse, Error> {
/// // iterate over multipart stream /// // iterate over multipart stream
/// while let Some(item) = payload.next().await { /// while let Some(item) = payload.next().await {
/// let mut field = item?; /// let mut field = item?;
@ -25,9 +26,9 @@ use crate::server::Multipart;
/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?)); /// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?));
/// } /// }
/// } /// }
///
/// Ok(HttpResponse::Ok().into()) /// Ok(HttpResponse::Ok().into())
/// } /// }
/// # fn main() {}
/// ``` /// ```
impl FromRequest for Multipart { impl FromRequest for Multipart {
type Error = Error; type Error = Error;
@ -36,9 +37,9 @@ impl FromRequest for Multipart {
#[inline] #[inline]
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
ok(match Multipart::boundary(req.headers()) { ready(Ok(match Multipart::boundary(req.headers()) {
Ok(boundary) => Multipart::from_boundary(boundary, payload.take()), Ok(boundary) => Multipart::from_boundary(boundary, payload.take()),
Err(err) => Multipart::from_error(err), Err(err) => Multipart::from_error(err),
}) }))
} }
} }

View file

@ -1,4 +1,4 @@
//! Multipart payload support //! Multipart response payload support.
use std::cell::{Cell, RefCell, RefMut}; use std::cell::{Cell, RefCell, RefMut};
use std::convert::TryFrom; use std::convert::TryFrom;
@ -8,12 +8,12 @@ use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{cmp, fmt}; use std::{cmp, fmt};
use bytes::{Bytes, BytesMut};
use futures_util::stream::{LocalBoxStream, Stream, StreamExt};
use actix_utils::task::LocalWaker;
use actix_web::error::{ParseError, PayloadError}; use actix_web::error::{ParseError, PayloadError};
use actix_web::http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue}; use actix_web::http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut};
use futures_core::stream::{LocalBoxStream, Stream};
use futures_util::stream::StreamExt as _;
use local_waker::LocalWaker;
use crate::error::MultipartError; use crate::error::MultipartError;

View file

@ -5,7 +5,7 @@ use actix_web::{
}; };
use actix_web_actors::*; use actix_web_actors::*;
use bytes::Bytes; use bytes::Bytes;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt as _, StreamExt as _};
struct Ws; struct Ws;

View file

@ -21,6 +21,7 @@ proc-macro2 = "1"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2" actix-rt = "2.2"
actix-web = "4.0.0-beta.4" actix-web = "4.0.0-beta.4"
futures-util = { version = "0.3.7", default-features = false } actix-utils = "3.0.0-beta.4"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
trybuild = "1" trybuild = "1"
rustversion = "1" rustversion = "1"

View file

@ -1,11 +1,12 @@
use std::future::Future; use std::future::Future;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_utils::future;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::http::header::{HeaderName, HeaderValue}; use actix_web::http::header::{HeaderName, HeaderValue};
use actix_web::{http, test, web::Path, App, Error, HttpResponse, Responder}; use actix_web::{http, test, web::Path, App, Error, HttpResponse, Responder};
use actix_web_codegen::{connect, delete, get, head, options, patch, post, put, route, trace}; use actix_web_codegen::{connect, delete, get, head, options, patch, post, put, route, trace};
use futures_util::future::{self, LocalBoxFuture}; use futures_core::future::LocalBoxFuture;
// Make sure that we can name function as 'config' // Make sure that we can name function as 'config'
#[get("/config")] #[get("/config")]

View file

@ -69,7 +69,7 @@ tls-rustls = { version = "0.19.0", package = "rustls", optional = true, features
actix-web = { version = "4.0.0-beta.4", features = ["openssl"] } actix-web = { version = "4.0.0-beta.4", features = ["openssl"] }
actix-http = { version = "3.0.0-beta.4", features = ["openssl"] } actix-http = { version = "3.0.0-beta.4", features = ["openssl"] }
actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] } actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] }
actix-utils = "3.0.0-beta.1" actix-utils = "3.0.0-beta.4"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
actix-tls = { version = "3.0.0-beta.5", features = ["openssl", "rustls"] } actix-tls = { version = "3.0.0-beta.5", features = ["openssl", "rustls"] }

View file

@ -6,7 +6,7 @@
//! //!
//! ```no_run //! ```no_run
//! use awc::{Client, ws}; //! use awc::{Client, ws};
//! use futures_util::{sink::SinkExt, stream::StreamExt}; //! use futures_util::{sink::SinkExt as _, stream::StreamExt as _};
//! //!
//! #[actix_rt::main] //! #[actix_rt::main]
//! async fn main() { //! async fn main() {

View file

@ -5,12 +5,13 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_utils::future::ok;
use brotli2::write::BrotliEncoder; use brotli2::write::BrotliEncoder;
use bytes::Bytes; use bytes::Bytes;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use futures_util::{future::ok, stream}; use futures_util::stream;
use rand::Rng; use rand::Rng;
use actix_http::{ use actix_http::{
@ -159,7 +160,7 @@ async fn test_timeout_override() {
#[actix_rt::test] #[actix_rt::test]
async fn test_response_timeout() { async fn test_response_timeout() {
use futures_util::stream::{once, StreamExt}; use futures_util::stream::{once, StreamExt as _};
let srv = test::start(|| { let srv = test::start(|| {
App::new().service(web::resource("/").route(web::to(|| async { App::new().service(web::resource("/").route(web::to(|| async {

View file

@ -13,8 +13,8 @@ use std::{
use actix_http::HttpService; use actix_http::HttpService;
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{map_config, pipeline_factory, ServiceFactoryExt}; use actix_service::{map_config, pipeline_factory, ServiceFactoryExt};
use actix_utils::future::ok;
use actix_web::{dev::AppConfig, http::Version, web, App, HttpResponse}; use actix_web::{dev::AppConfig, http::Version, web, App, HttpResponse};
use futures_util::future::ok;
use rustls::internal::pemfile::{certs, pkcs8_private_keys}; use rustls::internal::pemfile::{certs, pkcs8_private_keys};
use rustls::{ClientConfig, NoClientAuth, ServerConfig}; use rustls::{ClientConfig, NoClientAuth, ServerConfig};

View file

@ -8,9 +8,9 @@ use std::sync::Arc;
use actix_http::HttpService; use actix_http::HttpService;
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{map_config, pipeline_factory, ServiceFactoryExt}; use actix_service::{map_config, pipeline_factory, ServiceFactoryExt};
use actix_utils::future::ok;
use actix_web::http::Version; use actix_web::http::Version;
use actix_web::{dev::AppConfig, web, App, HttpResponse}; use actix_web::{dev::AppConfig, web, App, HttpResponse};
use futures_util::future::ok;
use openssl::{ use openssl::{
pkey::PKey, pkey::PKey,
ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode}, ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode},

View file

@ -3,9 +3,9 @@ use std::io;
use actix_codec::Framed; use actix_codec::Framed;
use actix_http::{body::BodySize, h1, ws, Error, HttpService, Request, Response}; use actix_http::{body::BodySize, h1, ws, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_utils::future::ok;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::ok; use futures_util::{SinkExt as _, StreamExt as _};
use futures_util::{SinkExt, StreamExt};
async fn ws_service(req: ws::Frame) -> Result<ws::Message, io::Error> { async fn ws_service(req: ws::Frame) -> Result<ws::Message, io::Error> {
match req { match req {

View file

@ -1,12 +1,12 @@
use std::future::Future; use std::{future::Future, time::Instant};
use std::time::Instant;
use actix_http::Response; use actix_http::Response;
use actix_utils::future::{ready, Ready};
use actix_web::http::StatusCode; use actix_web::http::StatusCode;
use actix_web::test::TestRequest; use actix_web::test::TestRequest;
use actix_web::{error, Error, HttpRequest, HttpResponse, Responder}; use actix_web::{error, Error, HttpRequest, HttpResponse, Responder};
use criterion::{criterion_group, criterion_main, Criterion}; use criterion::{criterion_group, criterion_main, Criterion};
use futures_util::future::{ready, Either, Ready}; use futures_util::future::{join_all, Either};
// responder simulate the old responder trait. // responder simulate the old responder trait.
trait FutureResponder { trait FutureResponder {
@ -79,7 +79,7 @@ fn future_responder(c: &mut Criterion) {
.await .await
}); });
let futs = futures_util::future::join_all(futs); let futs = join_all(futs);
let start = Instant::now(); let start = Instant::now();

View file

@ -1,19 +1,37 @@
digraph { digraph {
rankdir=TB
subgraph cluster_net { subgraph cluster_net {
label="actix/actix-net"; label="actix-net"
"actix-codec" "actix-macros" "actix-rt" "actix-server" "actix-service" "actix-codec" "actix-macros" "actix-rt" "actix-server" "actix-service"
"actix-tls" "actix-tracing" "actix-utils" "actix-router" "actix-tls" "actix-tracing" "actix-utils" "actix-router"
"local-channel" "local-waker" }
subgraph cluster_other {
label="other actix owned crates"
{ rank=same; "local-channel" "local-waker" "bytestring" }
} }
"actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" } subgraph cluster_tokio {
label="tokio"
"tokio" "tokio-util"
}
"actix-codec" -> { "tokio" }
"actix-codec" -> { "tokio-util" }[color=red]
"actix-utils" -> { "local-waker" } "actix-utils" -> { "local-waker" }
"actix-tracing" -> { "actix-service" } "actix-tracing" -> { "actix-service" }
"actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" } "actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" }
"actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" } "actix-tls" -> { "tokio-util" }[color="#009900"]
"actix-server" -> { "actix-service" "actix-rt" "actix-utils" "tokio" }
"actix-rt" -> { "actix-macros" "tokio" } "actix-rt" -> { "actix-macros" "tokio" }
"actix-router" -> { "bytestring" }
"local-channel" -> { "local-waker" } "local-channel" -> { "local-waker" }
"tokio" [fontcolor = darkgreen] // invisible edges to force nicer layout
edge [style=invis]
"actix-macros" -> "tokio"
"actix-service" -> "bytestring"
"actix-macros" -> "bytestring"
} }

View file

@ -10,7 +10,7 @@ use actix_service::boxed::{self, BoxServiceFactory};
use actix_service::{ use actix_service::{
apply, apply_fn_factory, IntoServiceFactory, ServiceFactory, ServiceFactoryExt, Transform, apply, apply_fn_factory, IntoServiceFactory, ServiceFactory, ServiceFactoryExt, Transform,
}; };
use futures_util::future::FutureExt; use futures_util::future::FutureExt as _;
use crate::app_service::{AppEntry, AppInit, AppRoutingFactory}; use crate::app_service::{AppEntry, AppInit, AppRoutingFactory};
use crate::config::ServiceConfig; use crate::config::ServiceConfig;
@ -465,8 +465,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::Service; use actix_service::Service;
use actix_utils::future::{err, ok};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{err, ok};
use super::*; use super::*;
use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::http::{header, HeaderValue, Method, StatusCode};

View file

@ -4,7 +4,8 @@ use std::sync::Arc;
use actix_http::error::{Error, ErrorInternalServerError}; use actix_http::error::{Error, ErrorInternalServerError};
use actix_http::Extensions; use actix_http::Extensions;
use futures_util::future::{err, ok, LocalBoxFuture, Ready}; use actix_utils::future::{err, ok, Ready};
use futures_core::future::LocalBoxFuture;
use serde::Serialize; use serde::Serialize;
use crate::dev::Payload; use crate::dev::Payload;
@ -147,10 +148,10 @@ impl<T: ?Sized + 'static> DataFactory for Data<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::Service;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use super::*; use super::*;
use crate::dev::Service;
use crate::http::StatusCode; use crate::http::StatusCode;
use crate::test::{self, init_service, TestRequest}; use crate::test::{self, init_service, TestRequest};
use crate::{web, App, HttpResponse}; use crate::{web, App, HttpResponse};

View file

@ -6,10 +6,8 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_util::{ use actix_utils::future::{ready, Ready};
future::{ready, Ready}, use futures_core::ready;
ready,
};
use crate::{dev::Payload, Error, HttpRequest}; use crate::{dev::Payload, Error, HttpRequest};

View file

@ -5,8 +5,8 @@ use std::task::{Context, Poll};
use actix_http::{Error, Response}; use actix_http::{Error, Response};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_util::ready; use futures_core::ready;
use pin_project::pin_project; use pin_project::pin_project;
use crate::extract::FromRequest; use crate::extract::FromRequest;

View file

@ -16,8 +16,8 @@ use actix_http::{
Error, Error,
}; };
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use actix_utils::future::{ok, Ready};
use futures_core::ready; use futures_core::ready;
use futures_util::future::{ok, Ready};
use pin_project::pin_project; use pin_project::pin_project;
use crate::{ use crate::{

View file

@ -3,7 +3,9 @@
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use futures_util::future::{Either, FutureExt, LocalBoxFuture}; use actix_utils::future::Either;
use futures_core::future::LocalBoxFuture;
use futures_util::future::FutureExt as _;
/// Middleware for conditionally enabling other middleware. /// Middleware for conditionally enabling other middleware.
/// ///
@ -85,8 +87,8 @@ where
fn call(&self, req: Req) -> Self::Future { fn call(&self, req: Req) -> Self::Future {
match self { match self {
ConditionMiddleware::Enable(service) => Either::Left(service.call(req)), ConditionMiddleware::Enable(service) => Either::left(service.call(req)),
ConditionMiddleware::Disable(service) => Either::Right(service.call(req)), ConditionMiddleware::Disable(service) => Either::right(service.call(req)),
} }
} }
} }
@ -94,7 +96,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::IntoService; use actix_service::IntoService;
use futures_util::future::ok; use actix_utils::future::ok;
use super::*; use super::*;
use crate::{ use crate::{

View file

@ -9,10 +9,8 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_util::{ use actix_utils::future::{ready, Ready};
future::{ready, Ready}, use futures_core::ready;
ready,
};
use crate::{ use crate::{
dev::{Service, Transform}, dev::{Service, Transform},
@ -188,7 +186,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::IntoService; use actix_service::IntoService;
use futures_util::future::ok; use actix_utils::future::ok;
use super::*; use super::*;
use crate::{ use crate::{

View file

@ -175,7 +175,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::IntoService; use actix_service::IntoService;
use futures_util::future::{ok, FutureExt}; use actix_utils::future::ok;
use futures_util::future::FutureExt as _;
use super::*; use super::*;
use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode}; use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode};

View file

@ -13,8 +13,9 @@ use std::{
}; };
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use actix_utils::future::{ok, Ready};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{ok, Ready}; use futures_core::ready;
use log::{debug, warn}; use log::{debug, warn};
use regex::{Regex, RegexSet}; use regex::{Regex, RegexSet};
use time::OffsetDateTime; use time::OffsetDateTime;
@ -269,7 +270,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
let res = match futures_util::ready!(this.fut.poll(cx)) { let res = match ready!(this.fut.poll(cx)) {
Ok(res) => res, Ok(res) => res,
Err(e) => return Poll::Ready(Err(e)), Err(e) => return Poll::Ready(Err(e)),
}; };
@ -588,7 +589,7 @@ impl<'a> fmt::Display for FormatDisplay<'a> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::{IntoService, Service, Transform}; use actix_service::{IntoService, Service, Transform};
use futures_util::future::ok; use actix_utils::future::ok;
use super::*; use super::*;
use crate::http::{header, StatusCode}; use crate::http::{header, StatusCode};

View file

@ -2,8 +2,8 @@
use actix_http::http::{PathAndQuery, Uri}; use actix_http::http::{PathAndQuery, Uri};
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use actix_utils::future::{ready, Ready};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{ready, Ready};
use regex::Regex; use regex::Regex;
use crate::{ use crate::{

View file

@ -5,7 +5,7 @@ use std::{fmt, net};
use actix_http::http::{HeaderMap, Method, Uri, Version}; use actix_http::http::{HeaderMap, Method, Uri, Version};
use actix_http::{Error, Extensions, HttpMessage, Message, Payload, RequestHead}; use actix_http::{Error, Extensions, HttpMessage, Message, Payload, RequestHead};
use actix_router::{Path, Url}; use actix_router::{Path, Url};
use futures_util::future::{ok, Ready}; use actix_utils::future::{ok, Ready};
use smallvec::SmallVec; use smallvec::SmallVec;
use crate::app_service::AppInitServiceState; use crate::app_service::AppInitServiceState;

View file

@ -1,7 +1,7 @@
use std::{any::type_name, ops::Deref}; use std::{any::type_name, ops::Deref};
use actix_http::error::{Error, ErrorInternalServerError}; use actix_http::error::{Error, ErrorInternalServerError};
use futures_util::future; use actix_utils::future::{err, ok, Ready};
use crate::{dev::Payload, FromRequest, HttpRequest}; use crate::{dev::Payload, FromRequest, HttpRequest};
@ -67,11 +67,11 @@ impl<T: Clone + 'static> Deref for ReqData<T> {
impl<T: Clone + 'static> FromRequest for ReqData<T> { impl<T: Clone + 'static> FromRequest for ReqData<T> {
type Config = (); type Config = ();
type Error = Error; type Error = Error;
type Future = future::Ready<Result<Self, Error>>; type Future = Ready<Result<Self, Error>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
if let Some(st) = req.extensions().get::<T>() { if let Some(st) = req.extensions().get::<T>() {
future::ok(ReqData(st.clone())) ok(ReqData(st.clone()))
} else { } else {
log::debug!( log::debug!(
"Failed to construct App-level ReqData extractor. \ "Failed to construct App-level ReqData extractor. \
@ -79,7 +79,7 @@ impl<T: Clone + 'static> FromRequest for ReqData<T> {
req.path(), req.path(),
type_name::<T>(), type_name::<T>(),
); );
future::err(ErrorInternalServerError( err(ErrorInternalServerError(
"Missing expected request extension data", "Missing expected request extension data",
)) ))
} }

View file

@ -519,7 +519,7 @@ mod tests {
use actix_rt::time::sleep; use actix_rt::time::sleep;
use actix_service::Service; use actix_service::Service;
use futures_util::future::ok; use actix_utils::future::ok;
use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::http::{header, HeaderValue, Method, StatusCode};
use crate::middleware::DefaultHeaders; use crate::middleware::DefaultHeaders;

View file

@ -575,8 +575,8 @@ impl ServiceFactory<ServiceRequest> for ScopeEndpoint {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::Service; use actix_service::Service;
use actix_utils::future::ok;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::ok;
use crate::dev::{Body, ResponseBody}; use crate::dev::{Body, ResponseBody};
use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::http::{header, HeaderValue, Method, StatusCode};

View file

@ -602,7 +602,7 @@ mod tests {
use crate::test::{init_service, TestRequest}; use crate::test::{init_service, TestRequest};
use crate::{guard, http, web, App, HttpResponse}; use crate::{guard, http, web, App, HttpResponse};
use actix_service::Service; use actix_service::Service;
use futures_util::future::ok; use actix_utils::future::ok;
#[actix_rt::test] #[actix_rt::test]
async fn test_service() { async fn test_service() {

View file

@ -15,12 +15,12 @@ use actix_http::{ws, Extensions, HttpService, Request};
use actix_router::{Path, ResourceDef, Url}; use actix_router::{Path, ResourceDef, Url};
use actix_rt::{time::sleep, System}; use actix_rt::{time::sleep, System};
use actix_service::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory};
use actix_utils::future::ok;
use awc::error::PayloadError; use awc::error::PayloadError;
use awc::{Client, ClientRequest, ClientResponse, Connector}; use awc::{Client, ClientRequest, ClientResponse, Connector};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::ok; use futures_util::StreamExt as _;
use futures_util::StreamExt;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};

View file

@ -1,7 +1,8 @@
//! For either helper, see [`Either`]. //! For either helper, see [`Either`].
use bytes::Bytes; use bytes::Bytes;
use futures_util::{future::LocalBoxFuture, FutureExt, TryFutureExt}; use futures_core::future::LocalBoxFuture;
use futures_util::{FutureExt as _, TryFutureExt as _};
use crate::{ use crate::{
dev, dev,

View file

@ -12,10 +12,8 @@ use std::{
use actix_http::Payload; use actix_http::Payload;
use bytes::BytesMut; use bytes::BytesMut;
use encoding_rs::{Encoding, UTF_8}; use encoding_rs::{Encoding, UTF_8};
use futures_util::{ use futures_core::future::LocalBoxFuture;
future::{FutureExt, LocalBoxFuture}, use futures_util::{FutureExt as _, StreamExt as _};
StreamExt,
};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "compress")] #[cfg(feature = "compress")]

View file

@ -11,7 +11,7 @@ use std::{
}; };
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::{ready, stream::Stream}; use futures_core::{ready, stream::Stream as _};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use actix_http::Payload; use actix_http::Payload;

View file

@ -4,7 +4,7 @@ use std::{fmt, ops, sync::Arc};
use actix_http::error::{Error, ErrorNotFound}; use actix_http::error::{Error, ErrorNotFound};
use actix_router::PathDeserializer; use actix_router::PathDeserializer;
use futures_util::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use serde::de; use serde::de;
use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest}; use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest};

View file

@ -8,13 +8,10 @@ use std::{
}; };
use actix_http::error::{ErrorBadRequest, PayloadError}; use actix_http::error::{ErrorBadRequest, PayloadError};
use actix_utils::future::{ready, Either, Ready};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use encoding_rs::{Encoding, UTF_8}; use encoding_rs::{Encoding, UTF_8};
use futures_core::stream::Stream; use futures_core::{ready, stream::Stream};
use futures_util::{
future::{ready, Either, ErrInto, Ready, TryFutureExt as _},
ready,
};
use mime::Mime; use mime::Mime;
use crate::{dev, http::header, web, Error, FromRequest, HttpMessage, HttpRequest}; use crate::{dev, http::header, web, Error, FromRequest, HttpMessage, HttpRequest};
@ -26,7 +23,7 @@ use crate::{dev, http::header, web, Error, FromRequest, HttpMessage, HttpRequest
/// # Examples /// # Examples
/// ``` /// ```
/// use std::future::Future; /// use std::future::Future;
/// use futures_util::stream::{Stream, StreamExt}; /// use futures_util::stream::StreamExt as _;
/// use actix_web::{post, web}; /// use actix_web::{post, web};
/// ///
/// // `body: web::Payload` parameter extracts raw payload stream from request /// // `body: web::Payload` parameter extracts raw payload stream from request
@ -91,7 +88,7 @@ impl FromRequest for Payload {
impl FromRequest for Bytes { impl FromRequest for Bytes {
type Config = PayloadConfig; type Config = PayloadConfig;
type Error = Error; type Error = Error;
type Future = Either<ErrInto<HttpMessageBody, Error>, Ready<Result<Bytes, Error>>>; type Future = Either<BytesExtractFut, Ready<Result<Bytes, Error>>>;
#[inline] #[inline]
fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future { fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
@ -99,12 +96,25 @@ impl FromRequest for Bytes {
let cfg = PayloadConfig::from_req(req); let cfg = PayloadConfig::from_req(req);
if let Err(err) = cfg.check_mimetype(req) { if let Err(err) = cfg.check_mimetype(req) {
return Either::Right(ready(Err(err))); return Either::right(ready(Err(err)));
} }
let limit = cfg.limit; Either::left(BytesExtractFut {
let fut = HttpMessageBody::new(req, payload).limit(limit); body_fut: HttpMessageBody::new(req, payload).limit(cfg.limit),
Either::Left(fut.err_into()) })
}
}
/// Future for `Bytes` extractor.
pub struct BytesExtractFut {
body_fut: HttpMessageBody,
}
impl<'a> Future for BytesExtractFut {
type Output = Result<Bytes, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.body_fut).poll(cx).map_err(Into::into)
} }
} }
@ -135,21 +145,22 @@ impl FromRequest for String {
// check content-type // check content-type
if let Err(err) = cfg.check_mimetype(req) { if let Err(err) = cfg.check_mimetype(req) {
return Either::Right(ready(Err(err))); return Either::right(ready(Err(err)));
} }
// check charset // check charset
let encoding = match req.encoding() { let encoding = match req.encoding() {
Ok(enc) => enc, Ok(enc) => enc,
Err(err) => return Either::Right(ready(Err(err.into()))), Err(err) => return Either::right(ready(Err(err.into()))),
}; };
let limit = cfg.limit; let limit = cfg.limit;
let body_fut = HttpMessageBody::new(req, payload).limit(limit); let body_fut = HttpMessageBody::new(req, payload).limit(limit);
Either::Left(StringExtractFut { body_fut, encoding }) Either::left(StringExtractFut { body_fut, encoding })
} }
} }
/// Future for `String` extractor.
pub struct StringExtractFut { pub struct StringExtractFut {
body_fut: HttpMessageBody, body_fut: HttpMessageBody,
encoding: &'static Encoding, encoding: &'static Encoding,

View file

@ -2,7 +2,7 @@
use std::{fmt, ops, sync::Arc}; use std::{fmt, ops, sync::Arc};
use futures_util::future::{err, ok, Ready}; use actix_utils::future::{err, ok, Ready};
use serde::de; use serde::de;
use crate::{dev::Payload, error::QueryPayloadError, Error, FromRequest, HttpRequest}; use crate::{dev::Payload, error::QueryPayloadError, Error, FromRequest, HttpRequest};

View file

@ -177,7 +177,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt as _;
use super::*; use super::*;
use crate::test::TestRequest; use crate::test::TestRequest;

View file

@ -20,7 +20,7 @@ use flate2::{
write::{GzEncoder, ZlibDecoder, ZlibEncoder}, write::{GzEncoder, ZlibDecoder, ZlibEncoder},
Compression, Compression,
}; };
use futures_util::ready; use futures_core::ready;
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
use openssl::{ use openssl::{
pkey::PKey, pkey::PKey,