From 9ddf5a35500b5aa804dae26fad1ad29ae6c804d3 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 09:28:22 -0700 Subject: [PATCH 1/6] better doc string for Either --- guide/src/qs_4.md | 16 ++++++++-------- src/handler.rs | 4 +++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index dd432d85a..be3d6cf33 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -253,23 +253,23 @@ use actix_web::{Either, Error, HttpResponse, httpcodes}; type RegisterResult = Either>>; fn index(req: HttpRequest) -> RegisterResult { - if true { // <- choose variant A + if is_a_variant() { // <- choose variant A Either::A( httpcodes::HttpBadRequest.with_body("Bad data")) } else { - Either::B( // <- variant B + Either::B( // <- variant B result(HttpResponse::Ok() .content_type("text/html") .body(format!("Hello!")) .map_err(|e| e.into())).responder()) } } - -fn main() { - Application::new() - .resource("/register", |r| r.f(index)) - .finish(); -} +# fn is_a_variant() -> bool { true } +# fn main() { +# Application::new() +# .resource("/register", |r| r.f(index)) +# .finish(); +# } ``` ## Tokio core handle diff --git a/src/handler.rs b/src/handler.rs index cd16e164c..498da39de 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -46,8 +46,9 @@ pub trait Responder { /// /// type RegisterResult = Either>>; /// +/// /// fn index(req: HttpRequest) -> RegisterResult { -/// if true { // <- choose variant A +/// if is_a_variant() { // <- choose variant A /// Either::A( /// httpcodes::HttpBadRequest.with_body("Bad data")) /// } else { @@ -58,6 +59,7 @@ pub trait Responder { /// .map_err(|e| e.into())).responder()) /// } /// } +/// # fn is_a_variant() -> bool { true } /// # fn main() {} /// ``` #[derive(Debug)] From a4c933e56e0cb9c10b2857bb289886b3def142c5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 09:36:54 -0700 Subject: [PATCH 2/6] update doc string --- guide/src/qs_4.md | 3 ++- src/handler.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index be3d6cf33..2f96ddd06 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -238,9 +238,10 @@ Both methods could be combined. (i.e Async response with streaming body) ## Different return types (Either) Sometimes you need to return different types of responses. For example -you can do error check and return error, otherwise return async response. +you can do error check and return error and return async response otherwise. Or any result that requires two different types. For this case [*Either*](../actix_web/enum.Either.html) type can be used. +*Either* allows to combine two different responder types into a single type. ```rust # extern crate actix_web; diff --git a/src/handler.rs b/src/handler.rs index 498da39de..035d02fa5 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -34,7 +34,7 @@ pub trait Responder { fn respond_to(self, req: HttpRequest) -> Result; } -/// Combines two different responders types into a single type +/// Combines two different responder types into a single type /// /// ```rust /// # extern crate actix_web; From fee1e255acf74775f25c2aae900e1fee14121e39 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 10:10:30 -0700 Subject: [PATCH 3/6] add comments --- examples/diesel/src/main.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 75c201558..06cb485d6 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -1,8 +1,8 @@ //! Actix web diesel example //! //! Diesel does not support tokio, so we have to run it in separate threads. -//! Actix supports sync actors by default, so we going to create sync actor that will -//! use diesel. Technically sync actors are worker style actors, multiple of them +//! Actix supports sync actors by default, so we going to create sync actor that use diesel. +//! Technically sync actors are worker style actors, multiple of them //! can run in parallel and process messages from same queue. extern crate serde; extern crate serde_json; @@ -38,6 +38,7 @@ struct State { fn index(req: HttpRequest) -> Box> { let name = &req.match_info()["name"]; + // send async `CreateUser` message to a `DbExecutor` req.state().db.send(CreateUser{name: name.to_owned()}) .from_err() .and_then(|res| { @@ -54,7 +55,7 @@ fn main() { let _ = env_logger::init(); let sys = actix::System::new("diesel-example"); - // Start db executor actors + // Start 3 db executor actors let addr = SyncArbiter::start(3, || { DbExecutor(SqliteConnection::establish("test.db").unwrap()) }); From 31fbbd316812eeb1e100e8ea5ab16c69c4e6b6c2 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 14:50:13 -0700 Subject: [PATCH 4/6] Fix panic on unknown content encoding --- CHANGES.md | 4 ++++ src/handler.rs | 2 +- src/header/mod.rs | 3 +-- src/server/encoding.rs | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6c3de1048..f115cc03f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## 0.4.7 (2018-03-xx) + +* Fix panic on unknown content encoding + ## 0.4.6 (2018-03-10) * Fix client cookie handling diff --git a/src/handler.rs b/src/handler.rs index 035d02fa5..fd689699e 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -52,7 +52,7 @@ pub trait Responder { /// Either::A( /// httpcodes::HttpBadRequest.with_body("Bad data")) /// } else { -/// Either::B( // <- variant B +/// Either::B( // <- variant B /// result(HttpResponse::Ok() /// .content_type("text/html") /// .body(format!("Hello!")) diff --git a/src/header/mod.rs b/src/header/mod.rs index 7c3ad7eb1..d02ca9778 100644 --- a/src/header/mod.rs +++ b/src/header/mod.rs @@ -165,8 +165,7 @@ impl<'a> From<&'a str> for ContentEncoding { "br" => ContentEncoding::Br, "gzip" => ContentEncoding::Gzip, "deflate" => ContentEncoding::Deflate, - "identity" => ContentEncoding::Identity, - _ => ContentEncoding::Auto, + _ => ContentEncoding::Identity, } } } diff --git a/src/server/encoding.rs b/src/server/encoding.rs index df901818d..a7eaf1c00 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -466,8 +466,8 @@ impl ContentEncoder { GzEncoder::new(transfer, Compression::default())), ContentEncoding::Br => ContentEncoder::Br( BrotliEncoder::new(transfer, 5)), - ContentEncoding::Identity => ContentEncoder::Identity(transfer), - ContentEncoding::Auto => unreachable!() + ContentEncoding::Identity | ContentEncoding::Auto => + ContentEncoder::Identity(transfer), } } From 051703eb2c721d3a44f210ff29a784dfc6ecca9d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 15:37:33 -0700 Subject: [PATCH 5/6] Fix connection get closed too early --- CHANGES.md | 2 ++ src/server/h1.rs | 21 +++++++-------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f115cc03f..24f01b75c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ * Fix panic on unknown content encoding +* Fix connection get closed too early + ## 0.4.6 (2018-03-10) * Fix client cookie handling diff --git a/src/server/h1.rs b/src/server/h1.rs index 097804ba2..e5cb04699 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -110,18 +110,6 @@ impl Http1 } } - fn poll_completed(&mut self, shutdown: bool) -> Result { - // check stream state - match self.stream.poll_completed(shutdown) { - Ok(Async::Ready(_)) => Ok(true), - Ok(Async::NotReady) => Ok(false), - Err(err) => { - debug!("Error sending data: {}", err); - Err(()) - } - } - } - // TODO: refactor pub fn poll_io(&mut self) -> Poll { // read incoming data @@ -272,8 +260,13 @@ impl Http1 } // check stream state - if !self.poll_completed(true)? { - return Ok(Async::NotReady) + match self.stream.poll_completed(self.tasks.is_empty()) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => { + debug!("Error sending data: {}", err); + return Err(()) + } + _ => (), } // deal with keep-alive From 4af115a19c63dbcfa7b538e96d9907497e742502 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 16:37:44 -0700 Subject: [PATCH 6/6] Fix steraming response handling for http/2 --- CHANGES.md | 5 ++- src/server/h2.rs | 38 +++++++++++++----- src/server/h2writer.rs | 91 +++++++++++++++++------------------------- 3 files changed, 70 insertions(+), 64 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 24f01b75c..75a29d5d3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,11 +1,14 @@ # Changes -## 0.4.7 (2018-03-xx) +## 0.4.7 (2018-03-11) * Fix panic on unknown content encoding * Fix connection get closed too early +* Fix steraming response handling for http/2 + + ## 0.4.6 (2018-03-10) * Fix client cookie handling diff --git a/src/server/h2.rs b/src/server/h2.rs index 02951593e..6cc682a11 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -26,7 +26,7 @@ use payload::{Payload, PayloadWriter, PayloadStatus}; use super::h2writer::H2Writer; use super::encoding::PayloadType; use super::settings::WorkerSettings; -use super::{HttpHandler, HttpHandlerTask}; +use super::{HttpHandler, HttpHandlerTask, Writer}; bitflags! { struct Flags: u8 { @@ -109,22 +109,27 @@ impl Http2 loop { match item.task.poll_io(&mut item.stream) { Ok(Async::Ready(ready)) => { - item.flags.insert(EntryFlags::EOF); if ready { - item.flags.insert(EntryFlags::FINISHED); + item.flags.insert( + EntryFlags::EOF | EntryFlags::FINISHED); + } else { + item.flags.insert(EntryFlags::EOF); } not_ready = false; }, Ok(Async::NotReady) => { - if item.payload.need_read() == PayloadStatus::Read && !retry + if item.payload.need_read() == PayloadStatus::Read + && !retry { continue } }, Err(err) => { error!("Unhandled error: {}", err); - item.flags.insert(EntryFlags::EOF); - item.flags.insert(EntryFlags::ERROR); + item.flags.insert( + EntryFlags::EOF | + EntryFlags::ERROR | + EntryFlags::WRITE_DONE); item.stream.reset(Reason::INTERNAL_ERROR); } } @@ -138,18 +143,32 @@ impl Http2 item.flags.insert(EntryFlags::FINISHED); }, Err(err) => { - item.flags.insert(EntryFlags::ERROR); - item.flags.insert(EntryFlags::FINISHED); + item.flags.insert( + EntryFlags::ERROR | EntryFlags::WRITE_DONE | + EntryFlags::FINISHED); error!("Unhandled error: {}", err); } } } + + if !item.flags.contains(EntryFlags::WRITE_DONE) { + match item.stream.poll_completed(false) { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + not_ready = false; + item.flags.insert(EntryFlags::WRITE_DONE); + } + Err(_err) => { + item.flags.insert(EntryFlags::ERROR); + } + } + } } // cleanup finished tasks while !self.tasks.is_empty() { if self.tasks[0].flags.contains(EntryFlags::EOF) && - self.tasks[0].flags.contains(EntryFlags::FINISHED) || + self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) || self.tasks[0].flags.contains(EntryFlags::ERROR) { self.tasks.pop_front(); @@ -251,6 +270,7 @@ bitflags! { const REOF = 0b0000_0010; const ERROR = 0b0000_0100; const FINISHED = 0b0000_1000; + const WRITE_DONE = 0b0001_0000; } } diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index d57d92db5..7fccd4ecf 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -24,6 +24,7 @@ bitflags! { const STARTED = 0b0000_0001; const DISCONNECTED = 0b0000_0010; const EOF = 0b0000_0100; + const RESERVED = 0b0000_1000; } } @@ -56,55 +57,6 @@ impl H2Writer { stream.send_reset(reason) } } - - fn write_to_stream(&mut self) -> io::Result { - if !self.flags.contains(Flags::STARTED) { - return Ok(WriterState::Done) - } - - if let Some(ref mut stream) = self.stream { - if self.buffer.is_empty() { - if self.flags.contains(Flags::EOF) { - let _ = stream.send_data(Bytes::new(), true); - } - return Ok(WriterState::Done) - } - - loop { - match stream.poll_capacity() { - Ok(Async::NotReady) => { - if self.buffer.len() > self.buffer_capacity { - return Ok(WriterState::Pause) - } else { - return Ok(WriterState::Done) - } - } - Ok(Async::Ready(None)) => { - return Ok(WriterState::Done) - } - Ok(Async::Ready(Some(cap))) => { - let len = self.buffer.len(); - let bytes = self.buffer.split_to(cmp::min(cap, len)); - let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); - self.written += bytes.len() as u64; - - if let Err(err) = stream.send_data(bytes.freeze(), eof) { - return Err(io::Error::new(io::ErrorKind::Other, err)) - } else if !self.buffer.is_empty() { - let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); - stream.reserve_capacity(cap); - } else { - return Ok(WriterState::Pause) - } - } - Err(_) => { - return Err(io::Error::new(io::ErrorKind::Other, "")) - } - } - } - } - Ok(WriterState::Done) - } } impl Writer for H2Writer { @@ -172,6 +124,7 @@ impl Writer for H2Writer { self.written = bytes.len() as u64; self.encoder.write(bytes)?; if let Some(ref mut stream) = self.stream { + self.flags.insert(Flags::RESERVED); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); } Ok(WriterState::Pause) @@ -195,7 +148,7 @@ impl Writer for H2Writer { } } - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > self.buffer_capacity { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -217,10 +170,40 @@ impl Writer for H2Writer { } fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> { - match self.write_to_stream() { - Ok(WriterState::Done) => Ok(Async::Ready(())), - Ok(WriterState::Pause) => Ok(Async::NotReady), - Err(err) => Err(err) + if !self.flags.contains(Flags::STARTED) { + return Ok(Async::NotReady); } + + if let Some(ref mut stream) = self.stream { + // reserve capacity + if !self.flags.contains(Flags::RESERVED) && !self.buffer.is_empty() { + self.flags.insert(Flags::RESERVED); + stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); + } + + loop { + match stream.poll_capacity() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(cap))) => { + let len = self.buffer.len(); + let bytes = self.buffer.split_to(cmp::min(cap, len)); + let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); + self.written += bytes.len() as u64; + + if let Err(e) = stream.send_data(bytes.freeze(), eof) { + return Err(io::Error::new(io::ErrorKind::Other, e)) + } else if !self.buffer.is_empty() { + let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + return Ok(Async::NotReady) + } + } + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + } + return Ok(Async::NotReady) } }