From 20cf0094e591f53c31aa87987c0875bcf3595a05 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 08:23:59 -0800 Subject: [PATCH] fix master branch build. change web::block output type. (#1957) --- CHANGES.md | 2 + Cargo.toml | 6 +- actix-files/Cargo.toml | 2 +- actix-files/src/chunked.rs | 106 +++++++++++++++++------------ actix-files/src/named.rs | 16 +---- actix-http-test/Cargo.toml | 2 +- actix-http/CHANGES.md | 2 + actix-http/Cargo.toml | 2 +- actix-http/src/encoding/decoder.rs | 11 +-- actix-http/src/encoding/encoder.rs | 13 +--- actix-http/src/error.rs | 27 +++----- actix-multipart/Cargo.toml | 2 +- actix-web-actors/Cargo.toml | 2 +- actix-web-codegen/Cargo.toml | 2 +- awc/Cargo.toml | 2 +- src/test.rs | 2 +- src/web.rs | 13 ++-- 17 files changed, 98 insertions(+), 114 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a6bcd56cc..3f6b09d7a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ * `ServiceRequest::into_parts` and `ServiceRequest::from_parts` would not fail. `ServiceRequest::from_request` would not fail and no payload would be generated [#1893] * Our `Either` type now uses `Left`/`Right` variants (instead of `A`/`B`) [#1894] +* `web::block` accept any closure that has an output bound to `Send` and `'static`. [#1957] ### Fixed * Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906] @@ -28,6 +29,7 @@ [#1894]: https://github.com/actix/actix-web/pull/1894 [#1869]: https://github.com/actix/actix-web/pull/1869 [#1906]: https://github.com/actix/actix-web/pull/1906 +[#1957]: https://github.com/actix/actix-web/pull/1957 ## 4.0.0-beta.1 - 2021-01-07 diff --git a/Cargo.toml b/Cargo.toml index bae6cb6cb..28cee0dad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,11 +74,11 @@ required-features = ["rustls"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-macros = "0.1.0" +actix-macros = "=0.2.0-beta.1" actix-router = "0.2.4" -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-server = "2.0.0-beta.2" -actix-service = "2.0.0-beta.3" +actix-service = "=2.0.0-beta.3" actix-utils = "3.0.0-beta.1" actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true } diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index bde2cb717..b72631eb1 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -31,5 +31,5 @@ percent-encoding = "2.1" v_htmlescape = "0.12" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-web = "4.0.0-beta.1" diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index 5b7b17dc4..2a62b1d26 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_web::{ - error::{Error, ErrorInternalServerError}, + error::{BlockingError, Error}, rt::task::{spawn_blocking, JoinHandle}, }; use bytes::Bytes; @@ -18,11 +18,26 @@ use futures_core::{ready, Stream}; /// A helper created from a `std::fs::File` which reads the file /// chunk-by-chunk on a `ThreadPool`. pub struct ChunkedReadFile { - pub(crate) size: u64, - pub(crate) offset: u64, - pub(crate) file: Option, - pub(crate) fut: Option>>, - pub(crate) counter: u64, + size: u64, + offset: u64, + state: ChunkedReadFileState, + counter: u64, +} + +enum ChunkedReadFileState { + File(Option), + Future(JoinHandle>), +} + +impl ChunkedReadFile { + pub(crate) fn new(size: u64, offset: u64, file: File) -> Self { + Self { + size, + offset, + state: ChunkedReadFileState::File(Some(file)), + counter: 0, + } + } } impl fmt::Debug for ChunkedReadFile { @@ -38,49 +53,52 @@ impl Stream for ChunkedReadFile { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Some(ref mut fut) = self.fut { - let res = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok((file, bytes))) => { - self.fut.take(); - self.file = Some(file); + let this = self.as_mut().get_mut(); + match this.state { + ChunkedReadFileState::File(ref mut file) => { + let size = this.size; + let offset = this.offset; + let counter = this.counter; - self.offset += bytes.len() as u64; - self.counter += bytes.len() as u64; + if size == counter { + Poll::Ready(None) + } else { + let mut file = file + .take() + .expect("ChunkedReadFile polled after completion"); - Ok(bytes) + let fut = spawn_blocking(move || { + let max_bytes = + cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let mut buf = Vec::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = file + .by_ref() + .take(max_bytes as u64) + .read_to_end(&mut buf)?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok((file, Bytes::from(buf))) + }); + this.state = ChunkedReadFileState::Future(fut); + self.poll_next(cx) } - Ok(Err(e)) => Err(e.into()), - Err(_) => Err(ErrorInternalServerError("Unexpected error")), - }; - return Poll::Ready(Some(res)); - } + } + ChunkedReadFileState::Future(ref mut fut) => { + let (file, bytes) = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; + this.state = ChunkedReadFileState::File(Some(file)); - let size = self.size; - let offset = self.offset; - let counter = self.counter; + this.offset += bytes.len() as u64; + this.counter += bytes.len() as u64; - if size == counter { - Poll::Ready(None) - } else { - let mut file = self.file.take().expect("Use after completion"); - - self.fut = Some(spawn_blocking(move || { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; - - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok((file, Bytes::from(buf))) - })); - - self.poll_next(cx) + Poll::Ready(Some(Ok(bytes))) + } } } } diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index 8cd2a23f9..6fa3f7c6c 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -298,13 +298,7 @@ impl NamedFile { res.encoding(current_encoding); } - let reader = ChunkedReadFile { - size: self.md.len(), - offset: 0, - file: Some(self.file), - fut: None, - counter: 0, - }; + let reader = ChunkedReadFile::new(self.md.len(), 0, self.file); return res.streaming(reader); } @@ -426,13 +420,7 @@ impl NamedFile { return resp.status(StatusCode::NOT_MODIFIED).finish(); } - let reader = ChunkedReadFile { - offset, - size: length, - file: Some(self.file), - fut: None, - counter: 0, - }; + let reader = ChunkedReadFile::new(length, offset, self.file); if offset != 0 || length != self.md.len() { resp.status(StatusCode::PARTIAL_CONTENT); diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index 772b60f76..bc71f967a 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -33,7 +33,7 @@ actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-tls = "3.0.0-beta.2" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-server = "2.0.0-beta.2" awc = "3.0.0-beta.1" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 34de7727a..73339974d 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -16,6 +16,7 @@ * `Extensions::insert` returns Option of replaced item. [#1904] * Remove `HttpResponseBuilder::json2()` and make `HttpResponseBuilder::json()` take a value by reference. [#1903] +* Simplify `BlockingError` type to a struct. It's only triggered with blocking thread pool is dead. [#1957] ### Removed * `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869] @@ -29,6 +30,7 @@ [#1903]: https://github.com/actix/actix-web/pull/1903 [#1904]: https://github.com/actix/actix-web/pull/1904 [#1912]: https://github.com/actix/actix-web/pull/1912 +[#1957]: https://github.com/actix/actix-web/pull/1957 ## 3.0.0-beta.1 - 2021-01-07 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index af6209248..8f638c1d7 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -43,7 +43,7 @@ actors = ["actix"] actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-tls = "3.0.0-beta.2" actix = { version = "0.11.0-beta.1", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index b26609911..2cf2f6e03 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -79,15 +79,8 @@ where ) -> Poll> { loop { if let Some(ref mut fut) = self.fut { - let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => { - return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) - } - Err(_) => { - return Poll::Ready(Some(Err(BlockingError::Canceled.into()))) - } - }; + let (chunk, decoder) = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; self.decoder = Some(decoder); self.fut.take(); if let Some(chunk) = chunk { diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 28c757076..1d4a8e933 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -136,17 +136,8 @@ impl MessageBody for Encoder { } if let Some(ref mut fut) = this.fut { - let mut encoder = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => { - return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) - } - Err(_) => { - return Poll::Ready(Some(Err( - BlockingError::::Canceled.into(), - ))) - } - }; + let mut encoder = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; let chunk = encoder.take(); *this.encoder = Some(encoder); this.fut.take(); diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 9ff154240..28697cbbf 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -297,17 +297,13 @@ impl From for ParseError { /// A set of errors that can occur running blocking tasks in thread pool. #[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} +#[display(fmt = "Blocking thread pool is gone")] +pub struct BlockingError; -impl std::error::Error for BlockingError {} +impl std::error::Error for BlockingError {} /// `InternalServerError` for `BlockingError` -impl ResponseError for BlockingError {} +impl ResponseError for BlockingError {} #[derive(Display, Debug)] /// A set of errors that can occur during payload parsing @@ -372,15 +368,12 @@ impl From for PayloadError { } } -impl From> for PayloadError { - fn from(err: BlockingError) -> Self { - match err { - BlockingError::Error(e) => PayloadError::Io(e), - BlockingError::Canceled => PayloadError::Io(io::Error::new( - io::ErrorKind::Other, - "Operation is canceled", - )), - } +impl From for PayloadError { + fn from(_: BlockingError) -> Self { + PayloadError::Io(io::Error::new( + io::ErrorKind::Other, + "Operation is canceled", + )) } } diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 44a7e8d16..60620bce7 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -28,5 +28,5 @@ mime = "0.3" twoway = "0.2" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-http = "3.0.0-beta.1" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 0f90edb07..34bf713ae 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -28,6 +28,6 @@ pin-project = "1.0.0" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" env_logger = "0.7" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 00875cf1b..70ab55681 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] } proc-macro2 = "1" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" actix-web = "4.0.0-beta.1" futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 90f33c9ba..675110e5a 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -40,7 +40,7 @@ compress = ["actix-http/compress"] actix-codec = "0.4.0-beta.1" actix-service = "2.0.0-beta.3" actix-http = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = "=2.0.0-beta.2" base64 = "0.13" bytes = "1" diff --git a/src/test.rs b/src/test.rs index 769cece55..f3347bbbf 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1213,7 +1213,7 @@ mod tests { match res { Ok(value) => Ok(HttpResponse::Ok() .content_type("text/plain") - .body(format!("Async with block value: {}", value))), + .body(format!("Async with block value: {:?}", value))), Err(_) => panic!("Unexpected"), } } diff --git a/src/web.rs b/src/web.rs index 88071f551..3b4475b63 100644 --- a/src/web.rs +++ b/src/web.rs @@ -274,14 +274,11 @@ pub fn service(path: T) -> WebService { /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub async fn block(f: F) -> Result> +pub fn block(f: F) -> impl Future> where - F: FnOnce() -> Result + Send + 'static, - I: Send + 'static, - E: Send + std::fmt::Debug + 'static, + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { - match actix_rt::task::spawn_blocking(f).await { - Ok(res) => res.map_err(BlockingError::Error), - Err(_) => Err(BlockingError::Canceled), - } + let fut = actix_rt::task::spawn_blocking(f); + async { fut.await.map_err(|_| BlockingError) } }