From 42d2a29b1d49eae5115091d17eb327217f01c75b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 7 Mar 2018 17:40:13 -0800 Subject: [PATCH] non-blocking processing for NamedFile --- CHANGES.md | 2 + src/fs.rs | 108 ++++++++++++++++++++++++++++++++++++-------- src/httpmessage.rs | 8 +++- src/httprequest.rs | 2 +- src/httpresponse.rs | 10 ++++ 5 files changed, 108 insertions(+), 22 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4ab0c5239..542d02516 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,8 @@ * Fix client cookie handling #111 +* Non-blocking processing of a `NamedFile` + * Enable compression support for `NamedFile` * Better support for `NamedFile` type diff --git a/src/fs.rs b/src/fs.rs index 1a41d81a1..ac09f2320 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,8 +1,8 @@ //! Static files support. // //! TODO: needs to re-implement actual files handling, current impl blocks -use std::io; -use std::io::Read; +use std::{io, cmp}; +use std::io::{Read, Seek}; use std::fmt::Write; use std::fs::{File, DirEntry, Metadata}; use std::path::{Path, PathBuf}; @@ -12,10 +12,14 @@ use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(unix)] use std::os::unix::fs::MetadataExt; +use bytes::{Bytes, BytesMut, BufMut}; use http::{Method, StatusCode}; +use futures::{Async, Poll, Future, Stream}; +use futures_cpupool::{CpuPool, CpuFuture}; use mime_guess::get_mime_type; use header; +use error::Error; use param::FromParam; use handler::{Handler, Responder}; use httpmessage::HttpMessage; @@ -31,6 +35,7 @@ pub struct NamedFile { file: File, md: Metadata, modified: Option, + cpu_pool: Option, } impl NamedFile { @@ -48,7 +53,8 @@ impl NamedFile { let md = file.metadata()?; let path = path.as_ref().to_path_buf(); let modified = md.modified().ok(); - Ok(NamedFile{path, file, md, modified}) + let cpu_pool = None; + Ok(NamedFile{path, file, md, modified, cpu_pool}) } /// Returns reference to the underlying `File` object. @@ -76,6 +82,13 @@ impl NamedFile { self.path.as_path() } + /// Returns reference to the underlying `File` object. + #[inline] + pub fn set_cpu_pool(mut self, cpu_pool: CpuPool) -> Self { + self.cpu_pool = Some(cpu_pool); + self + } + fn etag(&self) -> Option { // This etag format is similar to Apache's. self.modified.as_ref().map(|mtime| { @@ -117,8 +130,8 @@ impl DerefMut for NamedFile { /// Returns true if `req` has no `If-Match` header or one which matches `etag`. fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { match req.get_header::() { - Err(_) | Ok(header::IfMatch::Any) => true, - Ok(header::IfMatch::Items(ref items)) => { + None | Some(header::IfMatch::Any) => true, + Some(header::IfMatch::Items(ref items)) => { if let Some(some_etag) = etag { for item in items { if item.strong_eq(some_etag) { @@ -134,8 +147,8 @@ fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { /// Returns true if `req` doesn't have an `If-None-Match` header matching `req`. fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { match req.get_header::() { - Ok(header::IfNoneMatch::Any) => false, - Ok(header::IfNoneMatch::Items(ref items)) => { + Some(header::IfNoneMatch::Any) => false, + Some(header::IfNoneMatch::Items(ref items)) => { if let Some(some_etag) = etag { for item in items { if item.weak_eq(some_etag) { @@ -145,7 +158,7 @@ fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { } true } - Err(_) => true, + None => true, } } @@ -154,7 +167,7 @@ impl Responder for NamedFile { type Item = HttpResponse; type Error = io::Error; - fn respond_to(mut self, req: HttpRequest) -> Result { + fn respond_to(self, req: HttpRequest) -> Result { if *req.method() != Method::GET && *req.method() != Method::HEAD { return Ok(HttpMethodNotAllowed.build() .header(header::http::CONTENT_TYPE, "text/plain") @@ -168,7 +181,7 @@ impl Responder for NamedFile { // check preconditions let precondition_failed = if !any_match(etag.as_ref(), &req) { true - } else if let (Some(ref m), Ok(header::IfUnmodifiedSince(ref since))) = + } else if let (Some(ref m), Some(header::IfUnmodifiedSince(ref since))) = (last_modified, req.get_header()) { m > since @@ -179,7 +192,7 @@ impl Responder for NamedFile { // check last modified let not_modified = if !none_match(etag.as_ref(), &req) { true - } else if let (Some(ref m), Ok(header::IfModifiedSince(ref since))) = + } else if let (Some(ref m), Some(header::IfModifiedSince(ref since))) = (last_modified, req.get_header()) { m <= since @@ -202,18 +215,71 @@ impl Responder for NamedFile { return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap()) } - resp.content_length(self.md.len()); - if *req.method() == Method::GET { - let mut data = Vec::new(); - let _ = self.file.read_to_end(&mut data); - Ok(resp.body(data).unwrap()) + let reader = ChunkedReadFile { + size: self.md.len(), + offset: 0, + cpu_pool: self.cpu_pool.unwrap_or_else(|| req.cpu_pool().clone()), + file: Some(self.file), + fut: None, + }; + Ok(resp.streaming(reader).unwrap()) } else { Ok(resp.finish().unwrap()) } } } +/// A helper created from a `std::fs::File` which reads the file +/// chunk-by-chunk on a `CpuPool`. +pub struct ChunkedReadFile { + size: u64, + offset: u64, + cpu_pool: CpuPool, + file: Option, + fut: Option>, +} + +impl Stream for ChunkedReadFile { + type Item = Bytes; + type Error= Error; + + fn poll(&mut self) -> Poll, Error> { + if self.fut.is_some() { + return match self.fut.as_mut().unwrap().poll()? { + Async::Ready((file, bytes)) => { + self.fut.take(); + self.file = Some(file); + self.offset += bytes.len() as u64; + Ok(Async::Ready(Some(bytes))) + }, + Async::NotReady => Ok(Async::NotReady), + }; + } + + let size = self.size; + let offset = self.offset; + + if size == offset { + Ok(Async::Ready(None)) + } else { + let mut file = self.file.take().expect("Use after completion"); + self.fut = Some(self.cpu_pool.spawn_fn(move || { + let max_bytes = cmp::min(size.saturating_sub(offset), 65_536) as usize; + let mut buf = BytesMut::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; + let nbytes = file.read(unsafe{buf.bytes_mut()})?; + if nbytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()) + } + unsafe{buf.advance_mut(nbytes)}; + Ok((file, buf.freeze())) + })); + self.poll() + } + } +} + /// A directory; responds with the generated directory listing. #[derive(Debug)] pub struct Directory{ @@ -329,6 +395,7 @@ pub struct StaticFiles { accessible: bool, index: Option, show_index: bool, + cpu_pool: CpuPool, _chunk_size: usize, _follow_symlinks: bool, } @@ -362,6 +429,7 @@ impl StaticFiles { accessible: access, index: None, show_index: index, + cpu_pool: CpuPool::new(40), _chunk_size: 0, _follow_symlinks: false, } @@ -409,15 +477,17 @@ impl Handler for StaticFiles { Ok(FilesystemElement::Redirect( HttpFound .build() - .header::<_, &str>("LOCATION", &new_path) + .header(header::http::LOCATION, new_path.as_str()) .finish().unwrap())) } else if self.show_index { - Ok(FilesystemElement::Directory(Directory::new(self.directory.clone(), path))) + Ok(FilesystemElement::Directory( + Directory::new(self.directory.clone(), path))) } else { Err(io::Error::new(io::ErrorKind::NotFound, "not found")) } } else { - Ok(FilesystemElement::File(NamedFile::open(path)?)) + Ok(FilesystemElement::File( + NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone()))) } } } diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 577074942..69065c49c 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -26,8 +26,12 @@ pub trait HttpMessage { #[doc(hidden)] /// Get a header - fn get_header(&self) -> Result where Self: Sized { - H::parse(self) + fn get_header(&self) -> Option where Self: Sized { + if self.headers().contains_key(H::name()) { + H::parse(self).ok() + } else { + None + } } /// Read the request content type. If request does not contain diff --git a/src/httprequest.rs b/src/httprequest.rs index 5203b8cca..5b88ec1f5 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -188,7 +188,7 @@ impl HttpRequest { /// Default `CpuPool` #[inline] #[doc(hidden)] - pub fn cpu_pool(&mut self) -> &CpuPool { + pub fn cpu_pool(&self) -> &CpuPool { self.router().expect("HttpRequest has to have Router instance") .server_settings().cpu_pool() } diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 04538ae74..6cf453db5 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; use cookie::{Cookie, CookieJar}; use bytes::{Bytes, BytesMut, BufMut}; +use futures::Stream; use http::{StatusCode, Version, HeaderMap, HttpTryFrom, Error as HttpError}; use http::header::{self, HeaderName, HeaderValue}; use serde_json; @@ -480,6 +481,15 @@ impl HttpResponseBuilder { Ok(HttpResponse(Some(response))) } + /// Set a streaming body and generate `HttpResponse`. + /// + /// `HttpResponseBuilder` can not be used after this call. + pub fn streaming(&mut self, stream: S) -> Result + where S: Stream + 'static, + { + self.body(Body::Streaming(Box::new(stream))) + } + /// Set a json body and generate `HttpResponse` /// /// `HttpResponseBuilder` can not be used after this call.