mirror of
https://github.com/actix/actix-web.git
synced 2024-11-25 19:11:10 +00:00
move blocking error to web (#2660)
This commit is contained in:
parent
1c1d6477ef
commit
ad38973767
11 changed files with 52 additions and 46 deletions
|
@ -81,7 +81,7 @@ async fn chunked_read_file_callback(
|
||||||
) -> Result<(File, Bytes), Error> {
|
) -> Result<(File, Bytes), Error> {
|
||||||
use io::{Read as _, Seek as _};
|
use io::{Read as _, Seek as _};
|
||||||
|
|
||||||
let res = actix_web::rt::task::spawn_blocking(move || {
|
let res = actix_web::web::block(move || {
|
||||||
let mut buf = Vec::with_capacity(max_bytes);
|
let mut buf = Vec::with_capacity(max_bytes);
|
||||||
|
|
||||||
file.seek(io::SeekFrom::Start(offset))?;
|
file.seek(io::SeekFrom::Start(offset))?;
|
||||||
|
@ -94,8 +94,7 @@ async fn chunked_read_file_callback(
|
||||||
Ok((file, Bytes::from(buf)))
|
Ok((file, Bytes::from(buf)))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await??;
|
||||||
.map_err(|_| actix_web::error::BlockingError)??;
|
|
||||||
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Removed
|
||||||
|
- `error::BlockingError` [#2660]
|
||||||
|
|
||||||
|
[#2660]: https://github.com/actix/actix-web/pull/2660
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-rc.4 - 2022-02-22
|
## 3.0.0-rc.4 - 2022-02-22
|
||||||
|
|
|
@ -19,7 +19,7 @@ use zstd::stream::write::Decoder as ZstdDecoder;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
encoding::Writer,
|
encoding::Writer,
|
||||||
error::{BlockingError, PayloadError},
|
error::PayloadError,
|
||||||
header::{ContentEncoding, HeaderMap, CONTENT_ENCODING},
|
header::{ContentEncoding, HeaderMap, CONTENT_ENCODING},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -47,14 +47,17 @@ where
|
||||||
ContentEncoding::Brotli => Some(ContentDecoder::Brotli(Box::new(
|
ContentEncoding::Brotli => Some(ContentDecoder::Brotli(Box::new(
|
||||||
brotli::DecompressorWriter::new(Writer::new(), 8_096),
|
brotli::DecompressorWriter::new(Writer::new(), 8_096),
|
||||||
))),
|
))),
|
||||||
|
|
||||||
#[cfg(feature = "compress-gzip")]
|
#[cfg(feature = "compress-gzip")]
|
||||||
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
|
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
|
||||||
ZlibDecoder::new(Writer::new()),
|
ZlibDecoder::new(Writer::new()),
|
||||||
))),
|
))),
|
||||||
|
|
||||||
#[cfg(feature = "compress-gzip")]
|
#[cfg(feature = "compress-gzip")]
|
||||||
ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(GzDecoder::new(
|
ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(GzDecoder::new(
|
||||||
Writer::new(),
|
Writer::new(),
|
||||||
)))),
|
)))),
|
||||||
|
|
||||||
#[cfg(feature = "compress-zstd")]
|
#[cfg(feature = "compress-zstd")]
|
||||||
ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new(
|
ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new(
|
||||||
ZstdDecoder::new(Writer::new()).expect(
|
ZstdDecoder::new(Writer::new()).expect(
|
||||||
|
@ -98,8 +101,12 @@ where
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(ref mut fut) = this.fut {
|
if let Some(ref mut fut) = this.fut {
|
||||||
let (chunk, decoder) =
|
let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| {
|
||||||
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
|
PayloadError::Io(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"Blocking task was cancelled unexpectedly",
|
||||||
|
))
|
||||||
|
})??;
|
||||||
|
|
||||||
*this.decoder = Some(decoder);
|
*this.decoder = Some(decoder);
|
||||||
this.fut.take();
|
this.fut.take();
|
||||||
|
@ -159,10 +166,13 @@ where
|
||||||
enum ContentDecoder {
|
enum ContentDecoder {
|
||||||
#[cfg(feature = "compress-gzip")]
|
#[cfg(feature = "compress-gzip")]
|
||||||
Deflate(Box<ZlibDecoder<Writer>>),
|
Deflate(Box<ZlibDecoder<Writer>>),
|
||||||
|
|
||||||
#[cfg(feature = "compress-gzip")]
|
#[cfg(feature = "compress-gzip")]
|
||||||
Gzip(Box<GzDecoder<Writer>>),
|
Gzip(Box<GzDecoder<Writer>>),
|
||||||
|
|
||||||
#[cfg(feature = "compress-brotli")]
|
#[cfg(feature = "compress-brotli")]
|
||||||
Brotli(Box<brotli::DecompressorWriter<Writer>>),
|
Brotli(Box<brotli::DecompressorWriter<Writer>>),
|
||||||
|
|
||||||
// We need explicit 'static lifetime here because ZstdDecoder need lifetime
|
// We need explicit 'static lifetime here because ZstdDecoder need lifetime
|
||||||
// argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static`
|
// argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static`
|
||||||
#[cfg(feature = "compress-zstd")]
|
#[cfg(feature = "compress-zstd")]
|
||||||
|
|
|
@ -23,7 +23,6 @@ use zstd::stream::write::Encoder as ZstdEncoder;
|
||||||
use super::Writer;
|
use super::Writer;
|
||||||
use crate::{
|
use crate::{
|
||||||
body::{self, BodySize, MessageBody},
|
body::{self, BodySize, MessageBody},
|
||||||
error::BlockingError,
|
|
||||||
header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
|
header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
|
||||||
ResponseHead, StatusCode,
|
ResponseHead, StatusCode,
|
||||||
};
|
};
|
||||||
|
@ -173,7 +172,12 @@ where
|
||||||
|
|
||||||
if let Some(ref mut fut) = this.fut {
|
if let Some(ref mut fut) = this.fut {
|
||||||
let mut encoder = ready!(Pin::new(fut).poll(cx))
|
let mut encoder = ready!(Pin::new(fut).poll(cx))
|
||||||
.map_err(|_| EncoderError::Blocking(BlockingError))?
|
.map_err(|_| {
|
||||||
|
EncoderError::Io(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"Blocking task was cancelled unexpectedly",
|
||||||
|
))
|
||||||
|
})?
|
||||||
.map_err(EncoderError::Io)?;
|
.map_err(EncoderError::Io)?;
|
||||||
|
|
||||||
let chunk = encoder.take();
|
let chunk = encoder.take();
|
||||||
|
@ -400,12 +404,11 @@ fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
|
||||||
#[derive(Debug, Display)]
|
#[derive(Debug, Display)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
pub enum EncoderError {
|
pub enum EncoderError {
|
||||||
|
/// Wrapped body stream error.
|
||||||
#[display(fmt = "body")]
|
#[display(fmt = "body")]
|
||||||
Body(Box<dyn StdError>),
|
Body(Box<dyn StdError>),
|
||||||
|
|
||||||
#[display(fmt = "blocking")]
|
/// Generic I/O error.
|
||||||
Blocking(BlockingError),
|
|
||||||
|
|
||||||
#[display(fmt = "io")]
|
#[display(fmt = "io")]
|
||||||
Io(io::Error),
|
Io(io::Error),
|
||||||
}
|
}
|
||||||
|
@ -414,7 +417,6 @@ impl StdError for EncoderError {
|
||||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
EncoderError::Body(err) => Some(&**err),
|
EncoderError::Body(err) => Some(&**err),
|
||||||
EncoderError::Blocking(err) => Some(err),
|
|
||||||
EncoderError::Io(err) => Some(err),
|
EncoderError::Io(err) => Some(err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ impl Error {
|
||||||
Self::new(Kind::SendResponse)
|
Self::new(Kind::SendResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused)] // reserved for future use (TODO: remove allow when being used)
|
#[allow(unused)] // available for future use
|
||||||
pub(crate) fn new_io() -> Self {
|
pub(crate) fn new_io() -> Self {
|
||||||
Self::new(Kind::Io)
|
Self::new(Kind::Io)
|
||||||
}
|
}
|
||||||
|
@ -252,12 +252,6 @@ impl From<ParseError> for Response<BoxBody> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A set of errors that can occur running blocking tasks in thread pool.
|
|
||||||
#[derive(Debug, Display, Error)]
|
|
||||||
#[display(fmt = "Blocking thread pool is gone")]
|
|
||||||
// TODO: non-exhaustive
|
|
||||||
pub struct BlockingError;
|
|
||||||
|
|
||||||
/// A set of errors that can occur during payload parsing.
|
/// A set of errors that can occur during payload parsing.
|
||||||
#[derive(Debug, Display)]
|
#[derive(Debug, Display)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
|
@ -295,13 +289,13 @@ impl std::error::Error for PayloadError {
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
PayloadError::Incomplete(None) => None,
|
PayloadError::Incomplete(None) => None,
|
||||||
PayloadError::Incomplete(Some(err)) => Some(err as &dyn std::error::Error),
|
PayloadError::Incomplete(Some(err)) => Some(err),
|
||||||
PayloadError::EncodingCorrupted => None,
|
PayloadError::EncodingCorrupted => None,
|
||||||
PayloadError::Overflow => None,
|
PayloadError::Overflow => None,
|
||||||
PayloadError::UnknownLength => None,
|
PayloadError::UnknownLength => None,
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error),
|
PayloadError::Http2Payload(err) => Some(err),
|
||||||
PayloadError::Io(err) => Some(err as &dyn std::error::Error),
|
PayloadError::Io(err) => Some(err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -325,15 +319,6 @@ impl From<io::Error> for PayloadError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BlockingError> for PayloadError {
|
|
||||||
fn from(_: BlockingError) -> Self {
|
|
||||||
PayloadError::Io(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
|
||||||
"Operation is canceled",
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PayloadError> for Error {
|
impl From<PayloadError> for Error {
|
||||||
fn from(err: PayloadError) -> Self {
|
fn from(err: PayloadError) -> Self {
|
||||||
Self::new_payload().with_cause(err)
|
Self::new_payload().with_cause(err)
|
||||||
|
|
|
@ -850,7 +850,8 @@ async fn not_modified_spec_h1() {
|
||||||
Some(&header::HeaderValue::from_static("4")),
|
Some(&header::HeaderValue::from_static("4")),
|
||||||
);
|
);
|
||||||
// server does not prevent payload from being sent but clients may choose not to read it
|
// server does not prevent payload from being sent but clients may choose not to read it
|
||||||
// TODO: this is probably a bug, especially since CL header can differ in length from the body
|
// TODO: this is probably a bug in the client, especially since CL header can differ in length
|
||||||
|
// from the body
|
||||||
assert!(!srv.load_body(res).await.unwrap().is_empty());
|
assert!(!srv.load_body(res).await.unwrap().is_empty());
|
||||||
|
|
||||||
// TODO: add stream response tests
|
// TODO: add stream response tests
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Changed
|
||||||
- Rename `test::{simple_service => status_service}`. [#2659]
|
- Rename `test::{simple_service => status_service}`. [#2659]
|
||||||
|
|
||||||
[#2659]: https://github.com/actix/actix-web/pull/2659
|
[#2659]: https://github.com/actix/actix-web/pull/2659
|
||||||
|
|
|
@ -47,7 +47,6 @@ impl fmt::Debug for Error {
|
||||||
|
|
||||||
impl StdError for Error {
|
impl StdError for Error {
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
// TODO: populate if replacement for Box<dyn Error> is found
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
//
|
//
|
||||||
// See <https://github.com/rust-lang/rust/issues/83375>
|
// See <https://github.com/rust-lang/rust/issues/83375>
|
||||||
pub use actix_http::error::{
|
pub use actix_http::error::{
|
||||||
BlockingError, ContentTypeError, DispatchError, HttpError, ParseError, PayloadError,
|
ContentTypeError, DispatchError, HttpError, ParseError, PayloadError,
|
||||||
};
|
};
|
||||||
|
|
||||||
use derive_more::{Display, Error, From};
|
use derive_more::{Display, Error, From};
|
||||||
|
@ -33,6 +33,14 @@ pub(crate) use macros::{downcast_dyn, downcast_get_type_id};
|
||||||
/// This type alias is generally used to avoid writing out `actix_http::Error` directly.
|
/// This type alias is generally used to avoid writing out `actix_http::Error` directly.
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
/// An error representing a problem running a blocking task on a thread pool.
|
||||||
|
#[derive(Debug, Display, Error)]
|
||||||
|
#[display(fmt = "Blocking thread pool is shut down unexpectedly")]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub struct BlockingError;
|
||||||
|
|
||||||
|
impl ResponseError for crate::error::BlockingError {}
|
||||||
|
|
||||||
/// Errors which can occur when attempting to generate resource uri.
|
/// Errors which can occur when attempting to generate resource uri.
|
||||||
#[derive(Debug, PartialEq, Display, Error, From)]
|
#[derive(Debug, PartialEq, Display, Error, From)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
|
|
|
@ -6,20 +6,22 @@ use std::{
|
||||||
io::{self, Write as _},
|
io::{self, Write as _},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::Response;
|
||||||
body::BoxBody,
|
|
||||||
header::{self, TryIntoHeaderValue},
|
|
||||||
Response, StatusCode,
|
|
||||||
};
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
body::BoxBody,
|
||||||
error::{downcast_dyn, downcast_get_type_id},
|
error::{downcast_dyn, downcast_get_type_id},
|
||||||
helpers, HttpResponse,
|
helpers,
|
||||||
|
http::{
|
||||||
|
header::{self, TryIntoHeaderValue},
|
||||||
|
StatusCode,
|
||||||
|
},
|
||||||
|
HttpResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Errors that can generate responses.
|
/// Errors that can generate responses.
|
||||||
// TODO: add std::error::Error bound when replacement for Box<dyn Error> is found
|
// TODO: flesh out documentation
|
||||||
pub trait ResponseError: fmt::Debug + fmt::Display {
|
pub trait ResponseError: fmt::Debug + fmt::Display {
|
||||||
/// Returns appropriate status code for error.
|
/// Returns appropriate status code for error.
|
||||||
///
|
///
|
||||||
|
@ -73,7 +75,6 @@ impl ResponseError for std::str::Utf8Error {
|
||||||
|
|
||||||
impl ResponseError for std::io::Error {
|
impl ResponseError for std::io::Error {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
// TODO: decide if these errors should consider not found or permission errors
|
|
||||||
match self.kind() {
|
match self.kind() {
|
||||||
io::ErrorKind::NotFound => StatusCode::NOT_FOUND,
|
io::ErrorKind::NotFound => StatusCode::NOT_FOUND,
|
||||||
io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN,
|
io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN,
|
||||||
|
@ -86,7 +87,6 @@ impl ResponseError for actix_http::error::HttpError {}
|
||||||
|
|
||||||
impl ResponseError for actix_http::Error {
|
impl ResponseError for actix_http::Error {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
// TODO: map error kinds to status code better
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,8 +107,6 @@ impl ResponseError for actix_http::error::ParseError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ResponseError for actix_http::error::BlockingError {}
|
|
||||||
|
|
||||||
impl ResponseError for actix_http::error::PayloadError {
|
impl ResponseError for actix_http::error::PayloadError {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
match *self {
|
match *self {
|
||||||
|
|
|
@ -2,5 +2,4 @@
|
||||||
|
|
||||||
pub mod header;
|
pub mod header;
|
||||||
|
|
||||||
// TODO: figure out how best to expose http::Error vs actix_http::Error
|
|
||||||
pub use actix_http::{uri, ConnectionType, Error, KeepAlive, Method, StatusCode, Uri, Version};
|
pub use actix_http::{uri, ConnectionType, Error, KeepAlive, Method, StatusCode, Uri, Version};
|
||||||
|
|
Loading…
Reference in a new issue