1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-09-29 06:42:06 +00:00

remove actix-threadpool.use actix_rt::task::spawn_blocking (#1878)

This commit is contained in:
fakeshadow 2021-01-10 00:04:19 +08:00 committed by GitHub
parent f6cc829758
commit fe392abeb4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 83 additions and 71 deletions

View file

@ -38,13 +38,15 @@
### Removed ### Removed
* Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now * Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now
exposed directly by the `middleware` module. exposed directly by the `middleware` module.
* Remove `actix-threadpool` as dependency. `actix_threadpool::BlockingError` error type can be imported
from `actix_web::error` module. [#1878]
[#1812]: https://github.com/actix/actix-web/pull/1812 [#1812]: https://github.com/actix/actix-web/pull/1812
[#1813]: https://github.com/actix/actix-web/pull/1813 [#1813]: https://github.com/actix/actix-web/pull/1813
[#1852]: https://github.com/actix/actix-web/pull/1852 [#1852]: https://github.com/actix/actix-web/pull/1852
[#1865]: https://github.com/actix/actix-web/pull/1865 [#1865]: https://github.com/actix/actix-web/pull/1865
[#1875]: https://github.com/actix/actix-web/pull/1875 [#1875]: https://github.com/actix/actix-web/pull/1875
[#1878]: https://github.com/actix/actix-web/pull/1878
## 3.3.2 - 2020-12-01 ## 3.3.2 - 2020-12-01
### Fixed ### Fixed

View file

@ -76,11 +76,10 @@ required-features = ["rustls"]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-macros = "0.1.0" actix-macros = "0.1.0"
actix-router = "0.2.4" actix-router = "0.2.4"
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-server = "2.0.0-beta.2" actix-server = "2.0.0-beta.2"
actix-service = "2.0.0-beta.2" actix-service = "2.0.0-beta.3"
actix-utils = "3.0.0-beta.1" actix-utils = "3.0.0-beta.1"
actix-threadpool = "0.3.1"
actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true } actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true }
actix-web-codegen = "0.4.0" actix-web-codegen = "0.4.0"

View file

@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "4.0.0-beta.1", default-features = false } actix-web = { version = "4.0.0-beta.1", default-features = false }
actix-service = "2.0.0-beta.2" actix-service = "2.0.0-beta.3"
bitflags = "1" bitflags = "1"
bytes = "1" bytes = "1"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
@ -31,5 +31,5 @@ percent-encoding = "2.1"
v_htmlescape = "0.12" v_htmlescape = "0.12"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-web = "4.0.0-beta.1" actix-web = "4.0.0-beta.1"

View file

@ -8,17 +8,11 @@ use std::{
}; };
use actix_web::{ use actix_web::{
error::{BlockingError, Error}, error::{Error, ErrorInternalServerError},
web, rt::task::{spawn_blocking, JoinHandle},
}; };
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use futures_util::future::{FutureExt, LocalBoxFuture};
use crate::handle_error;
type ChunkedBoxFuture =
LocalBoxFuture<'static, Result<(File, Bytes), BlockingError<io::Error>>>;
#[doc(hidden)] #[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file /// A helper created from a `std::fs::File` which reads the file
@ -27,7 +21,7 @@ pub struct ChunkedReadFile {
pub(crate) size: u64, pub(crate) size: u64,
pub(crate) offset: u64, pub(crate) offset: u64,
pub(crate) file: Option<File>, pub(crate) file: Option<File>,
pub(crate) fut: Option<ChunkedBoxFuture>, pub(crate) fut: Option<JoinHandle<Result<(File, Bytes), io::Error>>>,
pub(crate) counter: u64, pub(crate) counter: u64,
} }
@ -45,18 +39,20 @@ impl Stream for ChunkedReadFile {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = self.fut {
return match ready!(Pin::new(fut).poll(cx)) { let res = match ready!(Pin::new(fut).poll(cx)) {
Ok((file, bytes)) => { Ok(Ok((file, bytes))) => {
self.fut.take(); self.fut.take();
self.file = Some(file); self.file = Some(file);
self.offset += bytes.len() as u64; self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64; self.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes))) Ok(bytes)
} }
Err(e) => Poll::Ready(Some(Err(handle_error(e)))), Ok(Err(e)) => Err(e.into()),
Err(_) => Err(ErrorInternalServerError("Unexpected error")),
}; };
return Poll::Ready(Some(res));
} }
let size = self.size; let size = self.size;
@ -68,10 +64,8 @@ impl Stream for ChunkedReadFile {
} else { } else {
let mut file = self.file.take().expect("Use after completion"); let mut file = self.file.take().expect("Use after completion");
self.fut = Some( self.fut = Some(spawn_blocking(move || {
web::block(move || { let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let max_bytes =
cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes); let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?; file.seek(io::SeekFrom::Start(offset))?;
@ -84,9 +78,7 @@ impl Stream for ChunkedReadFile {
} }
Ok((file, Bytes::from(buf))) Ok((file, Bytes::from(buf)))
}) }));
.boxed_local(),
);
self.poll_next(cx) self.poll_next(cx)
} }

View file

@ -14,12 +14,10 @@
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![warn(missing_docs, missing_debug_implementations)] #![warn(missing_docs, missing_debug_implementations)]
use std::io;
use actix_service::boxed::{BoxService, BoxServiceFactory}; use actix_service::boxed::{BoxService, BoxServiceFactory};
use actix_web::{ use actix_web::{
dev::{ServiceRequest, ServiceResponse}, dev::{ServiceRequest, ServiceResponse},
error::{BlockingError, Error, ErrorInternalServerError}, error::Error,
http::header::DispositionType, http::header::DispositionType,
}; };
use mime_guess::from_ext; use mime_guess::from_ext;
@ -56,13 +54,6 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime {
from_ext(ext).first_or_octet_stream() from_ext(ext).first_or_octet_stream()
} }
pub(crate) fn handle_error(err: BlockingError<io::Error>) -> Error {
match err {
BlockingError::Error(err) => err.into(),
BlockingError::Canceled => ErrorInternalServerError("Unexpected error"),
}
}
type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType;
#[cfg(test)] #[cfg(test)]

View file

@ -29,11 +29,11 @@ default = []
openssl = ["open-ssl", "awc/openssl"] openssl = ["open-ssl", "awc/openssl"]
[dependencies] [dependencies]
actix-service = "2.0.0-beta.2" actix-service = "2.0.0-beta.3"
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-tls = "3.0.0-beta.2" actix-tls = "3.0.0-beta.2"
actix-utils = "3.0.0-beta.1" actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-server = "2.0.0-beta.2" actix-server = "2.0.0-beta.2"
awc = "3.0.0-beta.1" awc = "3.0.0-beta.1"

View file

@ -25,11 +25,14 @@
* Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`. * Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`.
due to the removal of this type from `tokio-openssl` crate. openssl handshake due to the removal of this type from `tokio-openssl` crate. openssl handshake
error would return as `ConnectError::SslError`. [#1813] error would return as `ConnectError::SslError`. [#1813]
* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`.
Due to this change `actix_threadpool::BlockingError` type is moved into
`actix_http::error` module. [#1878]
[#1813]: https://github.com/actix/actix-web/pull/1813 [#1813]: https://github.com/actix/actix-web/pull/1813
[#1857]: https://github.com/actix/actix-web/pull/1857 [#1857]: https://github.com/actix/actix-web/pull/1857
[#1864]: https://github.com/actix/actix-web/pull/1864 [#1864]: https://github.com/actix/actix-web/pull/1864
[#1878]: https://github.com/actix/actix-web/pull/1878
## 2.2.0 - 2020-11-25 ## 2.2.0 - 2020-11-25
### Added ### Added

View file

@ -40,11 +40,10 @@ secure-cookies = ["cookie/secure"]
actors = ["actix"] actors = ["actix"]
[dependencies] [dependencies]
actix-service = "2.0.0-beta.2" actix-service = "2.0.0-beta.3"
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.1" actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-threadpool = "0.3.1"
actix-tls = "3.0.0-beta.2" actix-tls = "3.0.0-beta.2"
actix = { version = "0.11.0-beta.1", optional = true } actix = { version = "0.11.0-beta.1", optional = true }

View file

@ -3,14 +3,14 @@ use std::io::{self, Write};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_threadpool::{run, CpuFuture}; use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliDecoder; use brotli2::write::BrotliDecoder;
use bytes::Bytes; use bytes::Bytes;
use flate2::write::{GzDecoder, ZlibDecoder}; use flate2::write::{GzDecoder, ZlibDecoder};
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use super::Writer; use super::Writer;
use crate::error::PayloadError; use crate::error::{BlockingError, PayloadError};
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}; use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
const INPLACE: usize = 2049; const INPLACE: usize = 2049;
@ -19,7 +19,7 @@ pub struct Decoder<S> {
decoder: Option<ContentDecoder>, decoder: Option<ContentDecoder>,
stream: S, stream: S,
eof: bool, eof: bool,
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>, fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
} }
impl<S> Decoder<S> impl<S> Decoder<S>
@ -80,8 +80,13 @@ where
loop { loop {
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item, Ok(Ok(item)) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))), Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
}
}; };
self.decoder = Some(decoder); self.decoder = Some(decoder);
self.fut.take(); self.fut.take();
@ -105,7 +110,7 @@ where
return Poll::Ready(Some(Ok(chunk))); return Poll::Ready(Some(Ok(chunk)));
} }
} else { } else {
self.fut = Some(run(move || { self.fut = Some(spawn_blocking(move || {
let chunk = decoder.feed_data(chunk)?; let chunk = decoder.feed_data(chunk)?;
Ok((chunk, decoder)) Ok((chunk, decoder))
})); }));

View file

@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_threadpool::{run, CpuFuture}; use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliEncoder; use brotli2::write::BrotliEncoder;
use bytes::Bytes; use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::write::{GzEncoder, ZlibEncoder};
@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode};
use crate::{Error, ResponseHead}; use crate::{Error, ResponseHead};
use super::Writer; use super::Writer;
use crate::error::BlockingError;
const INPLACE: usize = 1024; const INPLACE: usize = 1024;
@ -26,7 +27,7 @@ pub struct Encoder<B> {
#[pin] #[pin]
body: EncoderBody<B>, body: EncoderBody<B>,
encoder: Option<ContentEncoder>, encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>, fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
} }
impl<B: MessageBody> Encoder<B> { impl<B: MessageBody> Encoder<B> {
@ -136,8 +137,15 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
if let Some(ref mut fut) = this.fut { if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) { let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item, Ok(Ok(item)) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))), Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(
BlockingError::<io::Error>::Canceled.into(),
)))
}
}; };
let chunk = encoder.take(); let chunk = encoder.take();
*this.encoder = Some(encoder); *this.encoder = Some(encoder);
@ -160,7 +168,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
return Poll::Ready(Some(Ok(chunk))); return Poll::Ready(Some(Ok(chunk)));
} }
} else { } else {
*this.fut = Some(run(move || { *this.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?; encoder.write(&chunk)?;
Ok(encoder) Ok(encoder)
})); }));

View file

@ -7,7 +7,6 @@ use std::string::FromUtf8Error;
use std::{fmt, io, result}; use std::{fmt, io, result};
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
pub use actix_threadpool::BlockingError;
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError; use actix_utils::timeout::TimeoutError;
use bytes::BytesMut; use bytes::BytesMut;
@ -186,9 +185,6 @@ impl ResponseError for DeError {
/// `InternalServerError` for `Canceled` /// `InternalServerError` for `Canceled`
impl ResponseError for Canceled {} impl ResponseError for Canceled {}
/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
/// Return `BAD_REQUEST` for `Utf8Error` /// Return `BAD_REQUEST` for `Utf8Error`
impl ResponseError for Utf8Error { impl ResponseError for Utf8Error {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
@ -300,6 +296,20 @@ impl From<httparse::Error> for ParseError {
} }
} }
/// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
#[derive(Display, Debug)] #[derive(Display, Debug)]
/// A set of errors that can occur during payload parsing /// A set of errors that can occur during payload parsing
pub enum PayloadError { pub enum PayloadError {

View file

@ -28,5 +28,5 @@ mime = "0.3"
twoway = "0.2" twoway = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-http = "3.0.0-beta.1" actix-http = "3.0.0-beta.1"

View file

@ -28,6 +28,6 @@ pin-project = "1.0.0"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
env_logger = "0.7" env_logger = "0.7"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View file

@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] }
proc-macro2 = "1" proc-macro2 = "1"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
actix-web = "4.0.0-beta.1" actix-web = "4.0.0-beta.1"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }
trybuild = "1" trybuild = "1"

View file

@ -38,9 +38,9 @@ compress = ["actix-http/compress"]
[dependencies] [dependencies]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-service = "2.0.0-beta.2" actix-service = "2.0.0-beta.3"
actix-http = "3.0.0-beta.1" actix-http = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.1" actix-rt = "2.0.0-beta.2"
base64 = "0.13" base64 = "0.13"
bytes = "1" bytes = "1"

View file

@ -280,5 +280,8 @@ where
I: Send + 'static, I: Send + 'static,
E: Send + std::fmt::Debug + 'static, E: Send + std::fmt::Debug + 'static,
{ {
actix_threadpool::run(f).await match actix_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}
} }