From dd347e0bd069f00c67992fdcf805a21a8c2274cc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 22 Nov 2021 09:19:09 +0800 Subject: [PATCH] implement io-uring for actix-files (#2408) Co-authored-by: Rob Ede --- .cargo/config.toml | 22 ++- .github/workflows/ci.yml | 53 ++++--- Cargo.toml | 5 +- actix-files/CHANGES.md | 6 + actix-files/Cargo.toml | 7 +- actix-files/src/chunked.rs | 294 +++++++++++++++++++++++++++++------- actix-files/src/files.rs | 22 ++- actix-files/src/lib.rs | 105 +++++++------ actix-files/src/named.rs | 170 +++++++++++++++------ actix-files/src/path_buf.rs | 2 +- actix-files/src/service.rs | 220 +++++++++++++++------------ actix-http-test/CHANGES.md | 3 + actix-http-test/src/lib.rs | 31 ++-- actix-test/CHANGES.md | 3 + actix-test/src/lib.rs | 281 ++++++++++++++++++---------------- src/middleware/compress.rs | 3 +- 16 files changed, 794 insertions(+), 433 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 40a513efd..606c30de7 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,14 +1,12 @@ [alias] -chk = "check --workspace --all-features --tests --examples --bins" -lint = "clippy --workspace --all-features --tests --examples --bins" -ci-min = "hack check --workspace --no-default-features" -ci-min-test = "hack check --workspace --no-default-features --tests --examples" -ci-default = "check --workspace --bins --tests --examples" -ci-full = "check --workspace --all-features --bins --tests --examples" -ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture" -ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" +lint = "clippy --workspace --tests --examples --bins -- -Dclippy::todo" +lint-all = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" -ci-feature-powerset-check-no-tls="hack --workspace --feature-powerset --skip=__compress,rustls,openssl check" -ci-feature-powerset-check-rustls="hack --workspace --feature-powerset --features=rustls --skip=__compress,openssl check" -ci-feature-powerset-check-openssl="hack --workspace --feature-powerset --features=openssl --skip=__compress,rustls check" -ci-feature-powerset-check-all="hack --workspace --feature-powerset --skip=__compress check" +# lib checking +ci-check-min = "hack --workspace check --no-default-features" +ci-check-default = "hack --workspace check" +ci-check-all-feature-powerset="hack --workspace --feature-powerset --skip=__compress,io-uring check" +ci-check-all-feature-powerset-linux="hack --workspace --feature-powerset --skip=__compress check" + +# testing +ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8f586d8d8..38c066d6c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,26 +62,34 @@ jobs: - name: check minimal uses: actions-rs/cargo@v1 - with: { command: ci-min } - - - name: check minimal + tests - uses: actions-rs/cargo@v1 - with: { command: ci-min-test } + with: { command: ci-check-min } - name: check default uses: actions-rs/cargo@v1 - with: { command: ci-default } - - - name: check full - uses: actions-rs/cargo@v1 - with: { command: ci-full } + with: { command: ci-check-default } - name: tests - uses: actions-rs/cargo@v1 timeout-minutes: 60 - with: - command: ci-test - args: --skip=test_reading_deflate_encoding_large_random_rustls + run: | + cargo test --lib --tests -p=actix-router --all-features + cargo test --lib --tests -p=actix-http --all-features + cargo test --lib --tests -p=actix-web --features=rustls,openssl -- --skip=test_reading_deflate_encoding_large_random_rustls + cargo test --lib --tests -p=actix-web-codegen --all-features + cargo test --lib --tests -p=awc --all-features + cargo test --lib --tests -p=actix-http-test --all-features + cargo test --lib --tests -p=actix-test --all-features + cargo test --lib --tests -p=actix-files + cargo test --lib --tests -p=actix-multipart --all-features + cargo test --lib --tests -p=actix-web-actors --all-features + + - name: tests (io-uring) + if: matrix.target.os == 'ubuntu-latest' + timeout-minutes: 60 + run: > + sudo bash -c "ulimit -Sl 512 + && ulimit -Hl 512 + && PATH=$PATH:/usr/share/rust/.cargo/bin + && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo test --lib --tests -p=actix-files --all-features" - name: Clear the cargo caches run: | @@ -114,9 +122,12 @@ jobs: args: cargo-hack - name: check feature combinations - # if: github.ref == 'refs/heads/master' uses: actions-rs/cargo@v1 - with: { command: ci-feature-powerset-check-all } + with: { command: ci-check-all-feature-powerset } + + - name: check feature combinations + uses: actions-rs/cargo@v1 + with: { command: ci-check-all-feature-powerset-linux } coverage: name: coverage @@ -166,11 +177,11 @@ jobs: - name: Cache Dependencies uses: Swatinem/rust-cache@v1.3.0 - - name: Install cargo-hack - uses: actions-rs/cargo@v1 - with: - command: install - args: cargo-hack + # - name: Install cargo-hack + # uses: actions-rs/cargo@v1 + # with: + # command: install + # args: cargo-hack - name: doc tests uses: actions-rs/cargo@v1 diff --git a/Cargo.toml b/Cargo.toml index 3f1f54fcc..537d1b5fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,10 +65,13 @@ rustls = ["actix-http/rustls", "actix-tls/accept", "actix-tls/rustls"] # Don't rely on these whatsoever. They may disappear at anytime. __compress = [] +# io-uring feature only avaiable for Linux OSes. +experimental-io-uring = ["actix-server/io-uring"] + [dependencies] actix-codec = "0.4.1" actix-macros = "0.2.3" -actix-rt = "2.2" +actix-rt = "2.3" actix-server = "2.0.0-beta.9" actix-service = "2.0.0" actix-utils = "3.0.0" diff --git a/actix-files/CHANGES.md b/actix-files/CHANGES.md index 41336c21c..7da775607 100644 --- a/actix-files/CHANGES.md +++ b/actix-files/CHANGES.md @@ -1,8 +1,14 @@ # Changes ## Unreleased - 2021-xx-xx +* Add crate feature `experimental-io-uring`, enabling async file I/O to be utilized. This feature is only available on Linux OSes with recent kernel versions. This feature is semver-exempt. [#2408] +* Add `NamedFile::open_async`. [#2408] * Fix 304 Not Modified responses to omit the Content-Length header, as per the spec. [#2453] +* The `Responder` impl for `NamedFile` now has a boxed future associated type. [#2408] +* The `Service` impl for `NamedFileService` now has a boxed future associated type. [#2408] +* Add `impl Clone` for `FilesService`. [#2408] +[#2408]: https://github.com/actix/actix-web/pull/2408 [#2453]: https://github.com/actix/actix-web/pull/2453 diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index bbb9f551a..c0ff18678 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -14,11 +14,13 @@ edition = "2018" name = "actix_files" path = "src/lib.rs" +[features] +experimental-io-uring = ["actix-web/experimental-io-uring", "tokio-uring"] + [dependencies] actix-web = { version = "4.0.0-beta.11", default-features = false } actix-http = "3.0.0-beta.12" actix-service = "2.0.0" -actix-utils = "3.0.0" askama_escape = "0.10" bitflags = "1" @@ -30,6 +32,9 @@ log = "0.4" mime = "0.3" mime_guess = "2.0.1" percent-encoding = "2.1" +pin-project-lite = "0.2.7" + +tokio-uring = { version = "0.1", optional = true } [dev-dependencies] actix-rt = "2.2" diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index f639848c9..fbb46e417 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -1,98 +1,278 @@ use std::{ cmp, fmt, - fs::File, future::Future, - io::{self, Read, Seek}, + io, pin::Pin, task::{Context, Poll}, }; -use actix_web::{ - error::{BlockingError, Error}, - rt::task::{spawn_blocking, JoinHandle}, -}; +use actix_web::error::Error; use bytes::Bytes; use futures_core::{ready, Stream}; +use pin_project_lite::pin_project; -#[doc(hidden)] -/// A helper created from a `std::fs::File` which reads the file -/// chunk-by-chunk on a `ThreadPool`. -pub struct ChunkedReadFile { - size: u64, - offset: u64, - state: ChunkedReadFileState, - counter: u64, -} +use super::named::File; -enum ChunkedReadFileState { - File(Option), - Future(JoinHandle>), -} - -impl ChunkedReadFile { - pub(crate) fn new(size: u64, offset: u64, file: File) -> Self { - Self { - size, - offset, - state: ChunkedReadFileState::File(Some(file)), - counter: 0, - } +pin_project! { + /// Adapter to read a `std::file::File` in chunks. + #[doc(hidden)] + pub struct ChunkedReadFile { + size: u64, + offset: u64, + #[pin] + state: ChunkedReadFileState, + counter: u64, + callback: F, } } -impl fmt::Debug for ChunkedReadFile { +#[cfg(not(feature = "experimental-io-uring"))] +pin_project! { + #[project = ChunkedReadFileStateProj] + #[project_replace = ChunkedReadFileStateProjReplace] + enum ChunkedReadFileState { + File { file: Option, }, + Future { #[pin] fut: Fut }, + } +} + +#[cfg(feature = "experimental-io-uring")] +pin_project! { + #[project = ChunkedReadFileStateProj] + #[project_replace = ChunkedReadFileStateProjReplace] + enum ChunkedReadFileState { + File { file: Option<(File, BytesMut)> }, + Future { #[pin] fut: Fut }, + } +} + +impl fmt::Debug for ChunkedReadFile { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("ChunkedReadFile") } } -impl Stream for ChunkedReadFile { +pub(crate) fn new_chunked_read( + size: u64, + offset: u64, + file: File, +) -> impl Stream> { + ChunkedReadFile { + size, + offset, + #[cfg(not(feature = "experimental-io-uring"))] + state: ChunkedReadFileState::File { file: Some(file) }, + #[cfg(feature = "experimental-io-uring")] + state: ChunkedReadFileState::File { + file: Some((file, BytesMut::new())), + }, + counter: 0, + callback: chunked_read_file_callback, + } +} + +#[cfg(not(feature = "experimental-io-uring"))] +async fn chunked_read_file_callback( + mut file: File, + offset: u64, + max_bytes: usize, +) -> Result<(File, Bytes), Error> { + use io::{Read as _, Seek as _}; + + let res = actix_web::rt::task::spawn_blocking(move || { + let mut buf = Vec::with_capacity(max_bytes); + + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + + if n_bytes == 0 { + Err(io::Error::from(io::ErrorKind::UnexpectedEof)) + } else { + Ok((file, Bytes::from(buf))) + } + }) + .await + .map_err(|_| actix_web::error::BlockingError)??; + + Ok(res) +} + +#[cfg(feature = "experimental-io-uring")] +async fn chunked_read_file_callback( + file: File, + offset: u64, + max_bytes: usize, + mut bytes_mut: BytesMut, +) -> io::Result<(File, Bytes, BytesMut)> { + bytes_mut.reserve(max_bytes); + + let (res, mut bytes_mut) = file.read_at(bytes_mut, offset).await; + let n_bytes = res?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + let bytes = bytes_mut.split_to(n_bytes).freeze(); + + Ok((file, bytes, bytes_mut)) +} + +#[cfg(feature = "experimental-io-uring")] +impl Stream for ChunkedReadFile +where + F: Fn(File, u64, usize, BytesMut) -> Fut, + Fut: Future>, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().get_mut(); - match this.state { - ChunkedReadFileState::File(ref mut file) => { - let size = this.size; - let offset = this.offset; - let counter = this.counter; + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + ChunkedReadFileStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; if size == counter { Poll::Ready(None) } else { - let mut file = file + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let (file, bytes_mut) = file .take() .expect("ChunkedReadFile polled after completion"); - let fut = spawn_blocking(move || { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + let fut = (this.callback)(file, offset, max_bytes, bytes_mut); - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; + this.state + .project_replace(ChunkedReadFileState::Future { fut }); - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok((file, Bytes::from(buf))) - }); - this.state = ChunkedReadFileState::Future(fut); self.poll_next(cx) } } - ChunkedReadFileState::Future(ref mut fut) => { - let (file, bytes) = - ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; - this.state = ChunkedReadFileState::File(Some(file)); + ChunkedReadFileStateProj::Future { fut } => { + let (file, bytes, bytes_mut) = ready!(fut.poll(cx))?; - this.offset += bytes.len() as u64; - this.counter += bytes.len() as u64; + this.state.project_replace(ChunkedReadFileState::File { + file: Some((file, bytes_mut)), + }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; Poll::Ready(Some(Ok(bytes))) } } } } + +#[cfg(not(feature = "experimental-io-uring"))] +impl Stream for ChunkedReadFile +where + F: Fn(File, u64, usize) -> Fut, + Fut: Future>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + ChunkedReadFileStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; + + if size == counter { + Poll::Ready(None) + } else { + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let file = file + .take() + .expect("ChunkedReadFile polled after completion"); + + let fut = (this.callback)(file, offset, max_bytes); + + this.state + .project_replace(ChunkedReadFileState::Future { fut }); + + self.poll_next(cx) + } + } + ChunkedReadFileStateProj::Future { fut } => { + let (file, bytes) = ready!(fut.poll(cx))?; + + this.state + .project_replace(ChunkedReadFileState::File { file: Some(file) }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; + + Poll::Ready(Some(Ok(bytes))) + } + } + } +} + +#[cfg(feature = "experimental-io-uring")] +use bytes_mut::BytesMut; + +// TODO: remove new type and use bytes::BytesMut directly +#[doc(hidden)] +#[cfg(feature = "experimental-io-uring")] +mod bytes_mut { + use std::ops::{Deref, DerefMut}; + + use tokio_uring::buf::{IoBuf, IoBufMut}; + + #[derive(Debug)] + pub struct BytesMut(bytes::BytesMut); + + impl BytesMut { + pub(super) fn new() -> Self { + Self(bytes::BytesMut::new()) + } + } + + impl Deref for BytesMut { + type Target = bytes::BytesMut; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl DerefMut for BytesMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } + + unsafe impl IoBuf for BytesMut { + fn stable_ptr(&self) -> *const u8 { + self.0.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.0.len() + } + + fn bytes_total(&self) -> usize { + self.0.capacity() + } + } + + unsafe impl IoBufMut for BytesMut { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.0.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.0.set_len(init_len); + } + } + } +} diff --git a/actix-files/src/files.rs b/actix-files/src/files.rs index 68879822a..06909bf08 100644 --- a/actix-files/src/files.rs +++ b/actix-files/src/files.rs @@ -6,7 +6,6 @@ use std::{ }; use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt}; -use actix_utils::future::ok; use actix_web::{ dev::{ AppService, HttpServiceFactory, RequestHead, ResourceDef, ServiceRequest, @@ -20,8 +19,9 @@ use actix_web::{ use futures_core::future::LocalBoxFuture; use crate::{ - directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService, - MimeOverride, PathFilter, + directory_listing, named, + service::{FilesService, FilesServiceInner}, + Directory, DirectoryRenderer, HttpNewService, MimeOverride, PathFilter, }; /// Static files handling service. @@ -283,11 +283,17 @@ impl Files { /// Setting a fallback static file handler: /// ``` /// use actix_files::{Files, NamedFile}; + /// use actix_web::dev::{ServiceRequest, ServiceResponse, fn_service}; /// /// # fn run() -> Result<(), actix_web::Error> { /// let files = Files::new("/", "./static") /// .index_file("index.html") - /// .default_handler(NamedFile::open("./static/404.html")?); + /// .default_handler(fn_service(|req: ServiceRequest| async { + /// let (req, _) = req.into_parts(); + /// let file = NamedFile::open_async("./static/404.html").await?; + /// let res = file.into_response(&req); + /// Ok(ServiceResponse::new(req, res)) + /// })); /// # Ok(()) /// # } /// ``` @@ -353,7 +359,7 @@ impl ServiceFactory for Files { type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - let mut srv = FilesService { + let mut inner = FilesServiceInner { directory: self.directory.clone(), index: self.index.clone(), show_index: self.show_index, @@ -372,14 +378,14 @@ impl ServiceFactory for Files { Box::pin(async { match fut.await { Ok(default) => { - srv.default = Some(default); - Ok(srv) + inner.default = Some(default); + Ok(FilesService(Rc::new(inner))) } Err(_) => Err(()), } }) } else { - Box::pin(ok(srv)) + Box::pin(async move { Ok(FilesService(Rc::new(inner))) }) } } } diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 175c6eaee..3af5282f1 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -33,12 +33,12 @@ mod path_buf; mod range; mod service; -pub use crate::chunked::ChunkedReadFile; -pub use crate::directory::Directory; -pub use crate::files::Files; -pub use crate::named::NamedFile; -pub use crate::range::HttpRange; -pub use crate::service::FilesService; +pub use self::chunked::ChunkedReadFile; +pub use self::directory::Directory; +pub use self::files::Files; +pub use self::named::NamedFile; +pub use self::range::HttpRange; +pub use self::service::FilesService; use self::directory::{directory_listing, DirectoryRenderer}; use self::error::FilesError; @@ -62,13 +62,12 @@ type PathFilter = dyn Fn(&Path, &RequestHead) -> bool; #[cfg(test)] mod tests { use std::{ - fs::{self, File}, + fs::{self}, ops::Add, time::{Duration, SystemTime}, }; use actix_service::ServiceFactory; - use actix_utils::future::ok; use actix_web::{ guard, http::{ @@ -82,6 +81,7 @@ mod tests { }; use super::*; + use crate::named::File; #[actix_web::test] async fn test_file_extension_to_mime() { @@ -100,7 +100,7 @@ mod tests { #[actix_rt::test] async fn test_if_modified_since_without_if_none_match() { - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let since = header::HttpDate::from(SystemTime::now().add(Duration::from_secs(60))); let req = TestRequest::default() @@ -112,7 +112,7 @@ mod tests { #[actix_rt::test] async fn test_if_modified_since_without_if_none_match_same() { - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let since = file.last_modified().unwrap(); let req = TestRequest::default() @@ -124,7 +124,7 @@ mod tests { #[actix_rt::test] async fn test_if_modified_since_with_if_none_match() { - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let since = header::HttpDate::from(SystemTime::now().add(Duration::from_secs(60))); let req = TestRequest::default() @@ -137,7 +137,7 @@ mod tests { #[actix_rt::test] async fn test_if_unmodified_since() { - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let since = file.last_modified().unwrap(); let req = TestRequest::default() @@ -149,7 +149,7 @@ mod tests { #[actix_rt::test] async fn test_if_unmodified_since_failed() { - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let since = header::HttpDate::from(SystemTime::UNIX_EPOCH); let req = TestRequest::default() @@ -161,8 +161,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_text() { - assert!(NamedFile::open("test--").is_err()); - let mut file = NamedFile::open("Cargo.toml").unwrap(); + assert!(NamedFile::open_async("test--").await.is_err()); + let mut file = NamedFile::open_async("Cargo.toml").await.unwrap(); { file.file(); let _f: &File = &file; @@ -185,8 +185,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_content_disposition() { - assert!(NamedFile::open("test--").is_err()); - let mut file = NamedFile::open("Cargo.toml").unwrap(); + assert!(NamedFile::open_async("test--").await.is_err()); + let mut file = NamedFile::open_async("Cargo.toml").await.unwrap(); { file.file(); let _f: &File = &file; @@ -202,7 +202,8 @@ mod tests { "inline; filename=\"Cargo.toml\"" ); - let file = NamedFile::open("Cargo.toml") + let file = NamedFile::open_async("Cargo.toml") + .await .unwrap() .disable_content_disposition(); let req = TestRequest::default().to_http_request(); @@ -212,8 +213,19 @@ mod tests { #[actix_rt::test] async fn test_named_file_non_ascii_file_name() { - let mut file = - NamedFile::from_file(File::open("Cargo.toml").unwrap(), "貨物.toml").unwrap(); + let file = { + #[cfg(feature = "experimental-io-uring")] + { + crate::named::File::open("Cargo.toml").await.unwrap() + } + + #[cfg(not(feature = "experimental-io-uring"))] + { + crate::named::File::open("Cargo.toml").unwrap() + } + }; + + let mut file = NamedFile::from_file(file, "貨物.toml").unwrap(); { file.file(); let _f: &File = &file; @@ -236,7 +248,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_set_content_type() { - let mut file = NamedFile::open("Cargo.toml") + let mut file = NamedFile::open_async("Cargo.toml") + .await .unwrap() .set_content_type(mime::TEXT_XML); { @@ -261,7 +274,7 @@ mod tests { #[actix_rt::test] async fn test_named_file_image() { - let mut file = NamedFile::open("tests/test.png").unwrap(); + let mut file = NamedFile::open_async("tests/test.png").await.unwrap(); { file.file(); let _f: &File = &file; @@ -284,7 +297,7 @@ mod tests { #[actix_rt::test] async fn test_named_file_javascript() { - let file = NamedFile::open("tests/test.js").unwrap(); + let file = NamedFile::open_async("tests/test.js").await.unwrap(); let req = TestRequest::default().to_http_request(); let resp = file.respond_to(&req).await.unwrap(); @@ -304,7 +317,8 @@ mod tests { disposition: DispositionType::Attachment, parameters: vec![DispositionParam::Filename(String::from("test.png"))], }; - let mut file = NamedFile::open("tests/test.png") + let mut file = NamedFile::open_async("tests/test.png") + .await .unwrap() .set_content_disposition(cd); { @@ -329,7 +343,7 @@ mod tests { #[actix_rt::test] async fn test_named_file_binary() { - let mut file = NamedFile::open("tests/test.binary").unwrap(); + let mut file = NamedFile::open_async("tests/test.binary").await.unwrap(); { file.file(); let _f: &File = &file; @@ -352,7 +366,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_status_code_text() { - let mut file = NamedFile::open("Cargo.toml") + let mut file = NamedFile::open_async("Cargo.toml") + .await .unwrap() .set_status_code(StatusCode::NOT_FOUND); { @@ -568,7 +583,8 @@ mod tests { async fn test_named_file_content_encoding() { let srv = test::init_service(App::new().wrap(Compress::default()).service( web::resource("/").to(|| async { - NamedFile::open("Cargo.toml") + NamedFile::open_async("Cargo.toml") + .await .unwrap() .set_content_encoding(header::ContentEncoding::Identity) }), @@ -588,7 +604,8 @@ mod tests { async fn test_named_file_content_encoding_gzip() { let srv = test::init_service(App::new().wrap(Compress::default()).service( web::resource("/").to(|| async { - NamedFile::open("Cargo.toml") + NamedFile::open_async("Cargo.toml") + .await .unwrap() .set_content_encoding(header::ContentEncoding::Gzip) }), @@ -614,7 +631,7 @@ mod tests { #[actix_rt::test] async fn test_named_file_allowed_method() { let req = TestRequest::default().method(Method::GET).to_http_request(); - let file = NamedFile::open("Cargo.toml").unwrap(); + let file = NamedFile::open_async("Cargo.toml").await.unwrap(); let resp = file.respond_to(&req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); } @@ -705,8 +722,8 @@ mod tests { #[actix_rt::test] async fn test_default_handler_file_missing() { let st = Files::new("/", ".") - .default_handler(|req: ServiceRequest| { - ok(req.into_response(HttpResponse::Ok().body("default content"))) + .default_handler(|req: ServiceRequest| async { + Ok(req.into_response(HttpResponse::Ok().body("default content"))) }) .new_service(()) .await @@ -789,9 +806,8 @@ mod tests { #[actix_rt::test] async fn test_serve_named_file() { - let srv = - test::init_service(App::new().service(NamedFile::open("Cargo.toml").unwrap())) - .await; + let factory = NamedFile::open_async("Cargo.toml").await.unwrap(); + let srv = test::init_service(App::new().service(factory)).await; let req = TestRequest::get().uri("/Cargo.toml").to_request(); let res = test::call_service(&srv, req).await; @@ -808,11 +824,9 @@ mod tests { #[actix_rt::test] async fn test_serve_named_file_prefix() { - let srv = test::init_service( - App::new() - .service(web::scope("/test").service(NamedFile::open("Cargo.toml").unwrap())), - ) - .await; + let factory = NamedFile::open_async("Cargo.toml").await.unwrap(); + let srv = + test::init_service(App::new().service(web::scope("/test").service(factory))).await; let req = TestRequest::get().uri("/test/Cargo.toml").to_request(); let res = test::call_service(&srv, req).await; @@ -829,10 +843,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_default_service() { - let srv = test::init_service( - App::new().default_service(NamedFile::open("Cargo.toml").unwrap()), - ) - .await; + let factory = NamedFile::open_async("Cargo.toml").await.unwrap(); + let srv = test::init_service(App::new().default_service(factory)).await; for route in ["/foobar", "/baz", "/"].iter() { let req = TestRequest::get().uri(route).to_request(); @@ -847,8 +859,9 @@ mod tests { #[actix_rt::test] async fn test_default_handler_named_file() { + let factory = NamedFile::open_async("Cargo.toml").await.unwrap(); let st = Files::new("/", ".") - .default_handler(NamedFile::open("Cargo.toml").unwrap()) + .default_handler(factory) .new_service(()) .await .unwrap(); @@ -926,8 +939,8 @@ mod tests { #[actix_rt::test] async fn test_default_handler_filter() { let st = Files::new("/", ".") - .default_handler(|req: ServiceRequest| { - ok(req.into_response(HttpResponse::Ok().body("default content"))) + .default_handler(|req: ServiceRequest| async { + Ok(req.into_response(HttpResponse::Ok().body("default content"))) }) .path_filter(|path, _| path.extension() == Some("png".as_ref())) .new_service(()) diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index dac548708..547048bbd 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -1,5 +1,6 @@ use std::{ - fs::{File, Metadata}, + fmt, + fs::Metadata, io, ops::{Deref, DerefMut}, path::{Path, PathBuf}, @@ -11,7 +12,6 @@ use std::os::unix::fs::MetadataExt; use actix_http::body::AnyBody; use actix_service::{Service, ServiceFactory}; -use actix_utils::future::{ok, ready, Ready}; use actix_web::{ dev::{ AppService, BodyEncoding, HttpServiceFactory, ResourceDef, ServiceRequest, @@ -26,9 +26,9 @@ use actix_web::{ Error, HttpMessage, HttpRequest, HttpResponse, Responder, }; use bitflags::bitflags; +use futures_core::future::LocalBoxFuture; use mime_guess::from_path; -use crate::ChunkedReadFile; use crate::{encoding::equiv_utf8_text, range::HttpRange}; bitflags! { @@ -53,9 +53,9 @@ impl Default for Flags { /// use actix_web::App; /// use actix_files::NamedFile; /// -/// # fn run() -> Result<(), Box> { -/// let app = App::new() -/// .service(NamedFile::open("./static/index.html")?); +/// # async fn run() -> Result<(), Box> { +/// let file = NamedFile::open_async("./static/index.html").await?; +/// let app = App::new().service(file); /// # Ok(()) /// # } /// ``` @@ -67,10 +67,9 @@ impl Default for Flags { /// /// #[get("/")] /// async fn index() -> impl Responder { -/// NamedFile::open("./static/index.html") +/// NamedFile::open_async("./static/index.html").await /// } /// ``` -#[derive(Debug)] pub struct NamedFile { path: PathBuf, file: File, @@ -83,6 +82,37 @@ pub struct NamedFile { pub(crate) encoding: Option, } +impl fmt::Debug for NamedFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NamedFile") + .field("path", &self.path) + .field( + "file", + #[cfg(feature = "experimental-io-uring")] + { + &"tokio_uring::File" + }, + #[cfg(not(feature = "experimental-io-uring"))] + { + &self.file + }, + ) + .field("modified", &self.modified) + .field("md", &self.md) + .field("flags", &self.flags) + .field("status_code", &self.status_code) + .field("content_type", &self.content_type) + .field("content_disposition", &self.content_disposition) + .field("encoding", &self.encoding) + .finish() + } +} + +#[cfg(not(feature = "experimental-io-uring"))] +pub(crate) use std::fs::File; +#[cfg(feature = "experimental-io-uring")] +pub(crate) use tokio_uring::fs::File; + impl NamedFile { /// Creates an instance from a previously opened file. /// @@ -90,8 +120,7 @@ impl NamedFile { /// `ContentDisposition` headers. /// /// # Examples - /// - /// ``` + /// ```ignore /// use actix_files::NamedFile; /// use std::io::{self, Write}; /// use std::env; @@ -152,7 +181,30 @@ impl NamedFile { (ct, cd) }; - let md = file.metadata()?; + let md = { + #[cfg(not(feature = "experimental-io-uring"))] + { + file.metadata()? + } + + #[cfg(feature = "experimental-io-uring")] + { + use std::os::unix::prelude::{AsRawFd, FromRawFd}; + + let fd = file.as_raw_fd(); + + // SAFETY: fd is borrowed and lives longer than the unsafe block + unsafe { + let file = std::fs::File::from_raw_fd(fd); + let md = file.metadata(); + // SAFETY: forget the fd before exiting block in success or error case but don't + // run destructor (that would close file handle) + std::mem::forget(file); + md? + } + } + }; + let modified = md.modified().ok(); let encoding = None; @@ -169,17 +221,45 @@ impl NamedFile { }) } + #[cfg(not(feature = "experimental-io-uring"))] /// Attempts to open a file in read-only mode. /// /// # Examples - /// /// ``` /// use actix_files::NamedFile; - /// /// let file = NamedFile::open("foo.txt"); /// ``` pub fn open>(path: P) -> io::Result { - Self::from_file(File::open(&path)?, path) + let file = File::open(&path)?; + Self::from_file(file, path) + } + + /// Attempts to open a file asynchronously in read-only mode. + /// + /// When the `experimental-io-uring` crate feature is enabled, this will be async. + /// Otherwise, it will be just like [`open`][Self::open]. + /// + /// # Examples + /// ``` + /// use actix_files::NamedFile; + /// # async fn open() { + /// let file = NamedFile::open_async("foo.txt").await.unwrap(); + /// # } + /// ``` + pub async fn open_async>(path: P) -> io::Result { + let file = { + #[cfg(not(feature = "experimental-io-uring"))] + { + File::open(&path)? + } + + #[cfg(feature = "experimental-io-uring")] + { + File::open(&path).await? + } + }; + + Self::from_file(file, path) } /// Returns reference to the underlying `File` object. @@ -191,13 +271,12 @@ impl NamedFile { /// Retrieve the path of this file. /// /// # Examples - /// /// ``` /// # use std::io; /// use actix_files::NamedFile; /// - /// # fn path() -> io::Result<()> { - /// let file = NamedFile::open("test.txt")?; + /// # async fn path() -> io::Result<()> { + /// let file = NamedFile::open_async("test.txt").await?; /// assert_eq!(file.path().as_os_str(), "foo.txt"); /// # Ok(()) /// # } @@ -337,7 +416,7 @@ impl NamedFile { res.encoding(current_encoding); } - let reader = ChunkedReadFile::new(self.md.len(), 0, self.file); + let reader = super::chunked::new_chunked_read(self.md.len(), 0, self.file); return res.streaming(reader); } @@ -451,7 +530,7 @@ impl NamedFile { return resp.status(StatusCode::NOT_MODIFIED).body(AnyBody::None); } - let reader = ChunkedReadFile::new(length, offset, self.file); + let reader = super::chunked::new_chunked_read(length, offset, self.file); if offset != 0 || length != self.md.len() { resp.status(StatusCode::PARTIAL_CONTENT); @@ -461,20 +540,6 @@ impl NamedFile { } } -impl Deref for NamedFile { - type Target = File; - - fn deref(&self) -> &File { - &self.file - } -} - -impl DerefMut for NamedFile { - fn deref_mut(&mut self) -> &mut File { - &mut self.file - } -} - /// Returns true if `req` has no `If-Match` header or one which matches `etag`. fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { match req.get_header::() { @@ -515,6 +580,20 @@ fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { } } +impl Deref for NamedFile { + type Target = File; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl DerefMut for NamedFile { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } +} + impl Responder for NamedFile { fn respond_to(self, req: &HttpRequest) -> HttpResponse { self.into_response(req) @@ -525,14 +604,16 @@ impl ServiceFactory for NamedFile { type Response = ServiceResponse; type Error = Error; type Config = (); - type InitError = (); type Service = NamedFileService; - type Future = Ready>; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ok(NamedFileService { + let service = NamedFileService { path: self.path.clone(), - }) + }; + + Box::pin(async move { Ok(service) }) } } @@ -545,18 +626,19 @@ pub struct NamedFileService { impl Service for NamedFileService { type Response = ServiceResponse; type Error = Error; - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); fn call(&self, req: ServiceRequest) -> Self::Future { let (req, _) = req.into_parts(); - ready( - NamedFile::open(&self.path) - .map_err(|e| e.into()) - .map(|f| f.into_response(&req)) - .map(|res| ServiceResponse::new(req, res)), - ) + + let path = self.path.clone(); + Box::pin(async move { + let file = NamedFile::open_async(path).await?; + let res = file.into_response(&req); + Ok(ServiceResponse::new(req, res)) + }) } } diff --git a/actix-files/src/path_buf.rs b/actix-files/src/path_buf.rs index 0e0d4f51d..8c8bca6ce 100644 --- a/actix-files/src/path_buf.rs +++ b/actix-files/src/path_buf.rs @@ -1,9 +1,9 @@ use std::{ + future::{ready, Ready}, path::{Path, PathBuf}, str::FromStr, }; -use actix_utils::future::{ready, Ready}; use actix_web::{dev::Payload, FromRequest, HttpRequest}; use crate::error::UriSegmentError; diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index 09122c63e..f6e1c2e11 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -1,7 +1,6 @@ -use std::{fmt, io, path::PathBuf, rc::Rc}; +use std::{fmt, io, ops::Deref, path::PathBuf, rc::Rc}; use actix_service::Service; -use actix_utils::future::ok; use actix_web::{ dev::{ServiceRequest, ServiceResponse}, error::Error, @@ -17,7 +16,18 @@ use crate::{ }; /// Assembled file serving service. -pub struct FilesService { +#[derive(Clone)] +pub struct FilesService(pub(crate) Rc); + +impl Deref for FilesService { + type Target = FilesServiceInner; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +pub struct FilesServiceInner { pub(crate) directory: PathBuf, pub(crate) index: Option, pub(crate) show_index: bool, @@ -31,20 +41,50 @@ pub struct FilesService { pub(crate) hidden_files: bool, } +impl fmt::Debug for FilesServiceInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("FilesServiceInner") + } +} + impl FilesService { - fn handle_err( + async fn handle_err( &self, err: io::Error, req: ServiceRequest, - ) -> LocalBoxFuture<'static, Result> { + ) -> Result { log::debug!("error handling {}: {}", req.path(), err); if let Some(ref default) = self.default { - Box::pin(default.call(req)) + default.call(req).await } else { - Box::pin(ok(req.error_response(err))) + Ok(req.error_response(err)) } } + + fn serve_named_file( + &self, + req: ServiceRequest, + mut named_file: NamedFile, + ) -> ServiceResponse { + if let Some(ref mime_override) = self.mime_override { + let new_disposition = mime_override(&named_file.content_type.type_()); + named_file.content_disposition.disposition = new_disposition; + } + named_file.flags = self.file_flags; + + let (req, _) = req.into_parts(); + let res = named_file.into_response(&req); + ServiceResponse::new(req, res) + } + + fn show_index(&self, req: ServiceRequest, path: PathBuf) -> ServiceResponse { + let dir = Directory::new(self.directory.clone(), path); + + let (req, _) = req.into_parts(); + + (self.renderer)(&dir, &req).unwrap_or_else(|e| ServiceResponse::from_err(e, req)) + } } impl fmt::Debug for FilesService { @@ -56,7 +96,7 @@ impl fmt::Debug for FilesService { impl Service for FilesService { type Response = ServiceResponse; type Error = Error; - type Future = LocalBoxFuture<'static, Result>; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); @@ -69,103 +109,87 @@ impl Service for FilesService { matches!(*req.method(), Method::HEAD | Method::GET) }; - if !is_method_valid { - return Box::pin(ok(req.into_response( - actix_web::HttpResponse::MethodNotAllowed() - .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) - .body("Request did not meet this resource's requirements."), - ))); - } + let this = self.clone(); - let real_path = - match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) { - Ok(item) => item, - Err(e) => return Box::pin(ok(req.error_response(e))), - }; + Box::pin(async move { + if !is_method_valid { + return Ok(req.into_response( + actix_web::HttpResponse::MethodNotAllowed() + .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) + .body("Request did not meet this resource's requirements."), + )); + } - if let Some(filter) = &self.path_filter { - if !filter(real_path.as_ref(), req.head()) { - if let Some(ref default) = self.default { - return Box::pin(default.call(req)); - } else { - return Box::pin(ok( - req.into_response(actix_web::HttpResponse::NotFound().finish()) + let real_path = + match PathBufWrap::parse_path(req.match_info().path(), this.hidden_files) { + Ok(item) => item, + Err(e) => return Ok(req.error_response(e)), + }; + + if let Some(filter) = &this.path_filter { + if !filter(real_path.as_ref(), req.head()) { + if let Some(ref default) = this.default { + return default.call(req).await; + } else { + return Ok( + req.into_response(actix_web::HttpResponse::NotFound().finish()) + ); + } + } + } + + // full file path + let path = this.directory.join(&real_path); + if let Err(err) = path.canonicalize() { + return this.handle_err(err, req).await; + } + + if path.is_dir() { + if this.redirect_to_slash + && !req.path().ends_with('/') + && (this.index.is_some() || this.show_index) + { + let redirect_to = format!("{}/", req.path()); + + return Ok(req.into_response( + HttpResponse::Found() + .insert_header((header::LOCATION, redirect_to)) + .finish(), )); } - } - } - // full file path - let path = self.directory.join(&real_path); - if let Err(err) = path.canonicalize() { - return Box::pin(self.handle_err(err, req)); - } - - if path.is_dir() { - if self.redirect_to_slash - && !req.path().ends_with('/') - && (self.index.is_some() || self.show_index) - { - let redirect_to = format!("{}/", req.path()); - - return Box::pin(ok(req.into_response( - HttpResponse::Found() - .insert_header((header::LOCATION, redirect_to)) - .finish(), - ))); - } - - let serve_named_file = |req: ServiceRequest, mut named_file: NamedFile| { - if let Some(ref mime_override) = self.mime_override { - let new_disposition = mime_override(&named_file.content_type.type_()); - named_file.content_disposition.disposition = new_disposition; - } - named_file.flags = self.file_flags; - - let (req, _) = req.into_parts(); - let res = named_file.into_response(&req); - Box::pin(ok(ServiceResponse::new(req, res))) - }; - - let show_index = |req: ServiceRequest| { - let dir = Directory::new(self.directory.clone(), path.clone()); - - let (req, _) = req.into_parts(); - let x = (self.renderer)(&dir, &req); - - Box::pin(match x { - Ok(resp) => ok(resp), - Err(err) => ok(ServiceResponse::from_err(err, req)), - }) - }; - - match self.index { - Some(ref index) => match NamedFile::open(path.join(index)) { - Ok(named_file) => serve_named_file(req, named_file), - Err(_) if self.show_index => show_index(req), - Err(err) => self.handle_err(err, req), - }, - None if self.show_index => show_index(req), - _ => Box::pin(ok(ServiceResponse::from_err( - FilesError::IsDirectory, - req.into_parts().0, - ))), - } - } else { - match NamedFile::open(path) { - Ok(mut named_file) => { - if let Some(ref mime_override) = self.mime_override { - let new_disposition = mime_override(&named_file.content_type.type_()); - named_file.content_disposition.disposition = new_disposition; + match this.index { + Some(ref index) => { + let named_path = path.join(index); + match NamedFile::open_async(named_path).await { + Ok(named_file) => Ok(this.serve_named_file(req, named_file)), + Err(_) if this.show_index => Ok(this.show_index(req, path)), + Err(err) => this.handle_err(err, req).await, + } } - named_file.flags = self.file_flags; - - let (req, _) = req.into_parts(); - let res = named_file.into_response(&req); - Box::pin(ok(ServiceResponse::new(req, res))) + None if this.show_index => Ok(this.show_index(req, path)), + _ => Ok(ServiceResponse::from_err( + FilesError::IsDirectory, + req.into_parts().0, + )), + } + } else { + match NamedFile::open_async(&path).await { + Ok(mut named_file) => { + if let Some(ref mime_override) = this.mime_override { + let new_disposition = + mime_override(&named_file.content_type.type_()); + named_file.content_disposition.disposition = new_disposition; + } + named_file.flags = this.file_flags; + + let (req, _) = req.into_parts(); + let res = named_file.into_response(&req); + Ok(ServiceResponse::new(req, res)) + } + Err(err) => this.handle_err(err, req).await, } - Err(err) => self.handle_err(err, req), } - } + }) } } diff --git a/actix-http-test/CHANGES.md b/actix-http-test/CHANGES.md index ea00acb0c..3356f5334 100644 --- a/actix-http-test/CHANGES.md +++ b/actix-http-test/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Fix compatibility with experimental `io-uring` feature of `actix-rt`. [#2408] + +[#2408]: https://github.com/actix/actix-web/pull/2408 ## 3.0.0-beta.6 - 2021-11-15 diff --git a/actix-http-test/src/lib.rs b/actix-http-test/src/lib.rs index 699bb2660..a4bc6b2bb 100644 --- a/actix-http-test/src/lib.rs +++ b/actix-http-test/src/lib.rs @@ -66,25 +66,24 @@ pub async fn test_server_with_addr>( // run server in separate thread thread::spawn(move || { - let sys = System::new(); - let local_addr = tcp.local_addr().unwrap(); + System::new().block_on(async move { + let local_addr = tcp.local_addr().unwrap(); - let srv = Server::build() - .workers(1) - .disable_signals() - .listen("test", tcp, factory) - .expect("test server could not be created"); + let srv = Server::build() + .workers(1) + .disable_signals() + .system_exit() + .listen("test", tcp, factory) + .expect("test server could not be created"); - let srv = srv.run(); - started_tx - .send((System::current(), srv.handle(), local_addr)) - .unwrap(); + let srv = srv.run(); + started_tx + .send((System::current(), srv.handle(), local_addr)) + .unwrap(); - // drive server loop - sys.block_on(srv).unwrap(); - - // start system event loop - sys.run().unwrap(); + // drive server loop + srv.await.unwrap(); + }); // notify TestServer that server and system have shut down // all thread managed resources should be dropped at this point diff --git a/actix-test/CHANGES.md b/actix-test/CHANGES.md index 5c22139ae..78fd4e4ca 100644 --- a/actix-test/CHANGES.md +++ b/actix-test/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Fix compatibility with experimental `io-uring` feature of `actix-rt`. [#2408] + +[#2408]: https://github.com/actix/actix-web/pull/2408 ## 0.1.0-beta.6 - 2021-11-15 diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index 6c776a871..b80918ec0 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -146,156 +146,183 @@ where // run server in separate orphaned thread thread::spawn(move || { - let sys = rt::System::new(); - let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); - let local_addr = tcp.local_addr().unwrap(); - let factory = factory.clone(); - let srv_cfg = cfg.clone(); - let timeout = cfg.client_timeout; - let builder = Server::build().workers(1).disable_signals().system_exit(); + rt::System::new().block_on(async move { + let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = tcp.local_addr().unwrap(); + let factory = factory.clone(); + let srv_cfg = cfg.clone(); + let timeout = cfg.client_timeout; - let srv = match srv_cfg.stream { - StreamType::Tcp => match srv_cfg.tp { - HttpVer::Http1 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let builder = Server::build().workers(1).disable_signals().system_exit(); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let srv = match srv_cfg.stream { + StreamType::Tcp => match srv_cfg.tp { + HttpVer::Http1 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h1(map_config(fac, move |_| app_cfg.clone())) - .tcp() - }), - HttpVer::Http2 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h1(map_config(fac, move |_| app_cfg.clone())) + .tcp() + }), + HttpVer::Http2 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h2(map_config(fac, move |_| app_cfg.clone())) - .tcp() - }), - HttpVer::Both => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h2(map_config(fac, move |_| app_cfg.clone())) + .tcp() + }), + HttpVer::Both => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .finish(map_config(fac, move |_| app_cfg.clone())) - .tcp() - }), - }, - #[cfg(feature = "openssl")] - StreamType::Openssl(acceptor) => match cfg.tp { - HttpVer::Http1 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .finish(map_config(fac, move |_| app_cfg.clone())) + .tcp() + }), + }, + #[cfg(feature = "openssl")] + StreamType::Openssl(acceptor) => match cfg.tp { + HttpVer::Http1 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h1(map_config(fac, move |_| app_cfg.clone())) - .openssl(acceptor.clone()) - }), - HttpVer::Http2 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h1(map_config(fac, move |_| app_cfg.clone())) + .openssl(acceptor.clone()) + }), + HttpVer::Http2 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h2(map_config(fac, move |_| app_cfg.clone())) - .openssl(acceptor.clone()) - }), - HttpVer::Both => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h2(map_config(fac, move |_| app_cfg.clone())) + .openssl(acceptor.clone()) + }), + HttpVer::Both => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .finish(map_config(fac, move |_| app_cfg.clone())) - .openssl(acceptor.clone()) - }), - }, - #[cfg(feature = "rustls")] - StreamType::Rustls(config) => match cfg.tp { - HttpVer::Http1 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .finish(map_config(fac, move |_| app_cfg.clone())) + .openssl(acceptor.clone()) + }), + }, + #[cfg(feature = "rustls")] + StreamType::Rustls(config) => match cfg.tp { + HttpVer::Http1 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h1(map_config(fac, move |_| app_cfg.clone())) - .rustls(config.clone()) - }), - HttpVer::Http2 => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h1(map_config(fac, move |_| app_cfg.clone())) + .rustls(config.clone()) + }), + HttpVer::Http2 => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .h2(map_config(fac, move |_| app_cfg.clone())) - .rustls(config.clone()) - }), - HttpVer::Both => builder.listen("test", tcp, move || { - let app_cfg = - AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + HttpService::build() + .client_timeout(timeout) + .h2(map_config(fac, move |_| app_cfg.clone())) + .rustls(config.clone()) + }), + HttpVer::Both => builder.listen("test", tcp, move || { + let app_cfg = AppConfig::__priv_test_new( + false, + local_addr.to_string(), + local_addr, + ); - HttpService::build() - .client_timeout(timeout) - .finish(map_config(fac, move |_| app_cfg.clone())) - .rustls(config.clone()) - }), - }, - } - .expect("test server could not be created"); + let fac = factory() + .into_factory() + .map_err(|err| err.into().error_response()); - let srv = srv.run(); - started_tx - .send((System::current(), srv.handle(), local_addr)) - .unwrap(); + HttpService::build() + .client_timeout(timeout) + .finish(map_config(fac, move |_| app_cfg.clone())) + .rustls(config.clone()) + }), + }, + } + .expect("test server could not be created"); - // drive server loop - sys.block_on(srv).unwrap(); + let srv = srv.run(); + started_tx + .send((System::current(), srv.handle(), local_addr)) + .unwrap(); - // start system event loop - sys.run().unwrap(); + // drive server loop + srv.await.unwrap(); + + // notify TestServer that server and system have shut down + // all thread managed resources should be dropped at this point + }); - // notify TestServer that server and system have shut down - // all thread managed resources should be dropped at this point let _ = thread_stop_tx.send(()); }); diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 3e85cb846..d8878a82a 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -82,7 +82,8 @@ pub struct CompressMiddleware { } static SUPPORTED_ALGORITHM_NAMES: Lazy = Lazy::new(|| { - let mut encoding = vec![]; + #[allow(unused_mut)] // only unused when no compress features enabled + let mut encoding: Vec<&str> = vec![]; #[cfg(feature = "compress-brotli")] {