1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-21 07:36:43 +00:00

handle disconnects

This commit is contained in:
Nikolay Kim 2019-05-25 03:16:46 -07:00
parent 7f12b754e9
commit aa626a1e72
3 changed files with 47 additions and 25 deletions

View file

@ -1,12 +1,16 @@
# Changes # Changes
## [0.1.1] - 2019-05-25
* Fix disconnect handling #834
## [0.1.0] - 2019-05-18 ## [0.1.0] - 2019-05-18
* Release * Release
## [0.1.0-beta.4] - 2019-05-12 ## [0.1.0-beta.4] - 2019-05-12
* Handle cancellation of uploads #834 #736 * Handle cancellation of uploads #736
* Upgrade to actix-web 1.0.0-beta.4 * Upgrade to actix-web 1.0.0-beta.4

View file

@ -1,6 +1,6 @@
[package] [package]
name = "actix-multipart" name = "actix-multipart"
version = "0.1.0" version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Multipart support for actix web framework." description = "Multipart support for actix web framework."
readme = "README.md" readme = "README.md"

View file

@ -128,7 +128,7 @@ impl InnerMultipart {
fn read_headers( fn read_headers(
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
) -> Result<Option<HeaderMap>, MultipartError> { ) -> Result<Option<HeaderMap>, MultipartError> {
match payload.read_until(b"\r\n\r\n") { match payload.read_until(b"\r\n\r\n")? {
None => { None => {
if payload.eof { if payload.eof {
Err(MultipartError::Incomplete) Err(MultipartError::Incomplete)
@ -167,7 +167,7 @@ impl InnerMultipart {
boundary: &str, boundary: &str,
) -> Result<Option<bool>, MultipartError> { ) -> Result<Option<bool>, MultipartError> {
// TODO: need to read epilogue // TODO: need to read epilogue
match payload.readline() { match payload.readline()? {
None => { None => {
if payload.eof { if payload.eof {
Ok(Some(true)) Ok(Some(true))
@ -200,7 +200,7 @@ impl InnerMultipart {
) -> Result<Option<bool>, MultipartError> { ) -> Result<Option<bool>, MultipartError> {
let mut eof = false; let mut eof = false;
loop { loop {
match payload.readline() { match payload.readline()? {
Some(chunk) => { Some(chunk) => {
if chunk.is_empty() { if chunk.is_empty() {
return Err(MultipartError::Boundary); return Err(MultipartError::Boundary);
@ -481,7 +481,7 @@ impl InnerField {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.read_max(*size) { match payload.read_max(*size)? {
Some(mut chunk) => { Some(mut chunk) => {
let len = cmp::min(chunk.len() as u64, *size); let len = cmp::min(chunk.len() as u64, *size);
*size -= len; *size -= len;
@ -512,7 +512,11 @@ impl InnerField {
let len = payload.buf.len(); let len = payload.buf.len();
if len == 0 { if len == 0 {
return Ok(Async::NotReady); return if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(Async::NotReady)
};
} }
// check boundary // check boundary
@ -597,7 +601,7 @@ impl InnerField {
} }
} }
match payload.readline() { match payload.readline()? {
None => Async::Ready(None), None => Async::Ready(None),
Some(line) => { Some(line) => {
if line.as_ref() != b"\r\n" { if line.as_ref() != b"\r\n" {
@ -749,23 +753,31 @@ impl PayloadBuffer {
} }
} }
fn read_max(&mut self, size: u64) -> Option<Bytes> { fn read_max(&mut self, size: u64) -> Result<Option<Bytes>, MultipartError> {
if !self.buf.is_empty() { if !self.buf.is_empty() {
let size = std::cmp::min(self.buf.len() as u64, size) as usize; let size = std::cmp::min(self.buf.len() as u64, size) as usize;
Some(self.buf.split_to(size).freeze()) Ok(Some(self.buf.split_to(size).freeze()))
} else if self.eof {
Err(MultipartError::Incomplete)
} else { } else {
None Ok(None)
} }
} }
/// Read until specified ending /// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Option<Bytes> { pub fn read_until(&mut self, line: &[u8]) -> Result<Option<Bytes>, MultipartError> {
twoway::find_bytes(&self.buf, line) let res = twoway::find_bytes(&self.buf, line)
.map(|idx| self.buf.split_to(idx + line.len()).freeze()) .map(|idx| self.buf.split_to(idx + line.len()).freeze());
if res.is_none() && self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(res)
}
} }
/// Read bytes until new line delimiter /// Read bytes until new line delimiter
pub fn readline(&mut self) -> Option<Bytes> { pub fn readline(&mut self) -> Result<Option<Bytes>, MultipartError> {
self.read_until(b"\n") self.read_until(b"\n")
} }
@ -991,7 +1003,7 @@ mod tests {
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
payload.poll_stream().unwrap(); payload.poll_stream().unwrap();
assert_eq!(None, payload.read_max(1)); assert_eq!(None, payload.read_max(1).unwrap());
}) })
} }
@ -1001,14 +1013,14 @@ mod tests {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(4)); assert_eq!(None, payload.read_max(4).unwrap());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
sender.feed_eof(); sender.feed_eof();
payload.poll_stream().unwrap(); payload.poll_stream().unwrap();
assert_eq!(Some(Bytes::from("data")), payload.read_max(4)); assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
assert_eq!(None, payload.read_max(1)); assert!(payload.read_max(1).is_err());
assert!(payload.eof); assert!(payload.eof);
}) })
} }
@ -1018,7 +1030,7 @@ mod tests {
run_on(|| { run_on(|| {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(1)); assert_eq!(None, payload.read_max(1).unwrap());
sender.set_error(PayloadError::Incomplete(None)); sender.set_error(PayloadError::Incomplete(None));
payload.poll_stream().err().unwrap(); payload.poll_stream().err().unwrap();
}) })
@ -1035,10 +1047,10 @@ mod tests {
payload.poll_stream().unwrap(); payload.poll_stream().unwrap();
assert_eq!(payload.buf.len(), 10); assert_eq!(payload.buf.len(), 10);
assert_eq!(Some(Bytes::from("line1")), payload.read_max(5)); assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
assert_eq!(payload.buf.len(), 5); assert_eq!(payload.buf.len(), 5);
assert_eq!(Some(Bytes::from("line2")), payload.read_max(5)); assert_eq!(Some(Bytes::from("line2")), payload.read_max(5).unwrap());
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
}) })
} }
@ -1069,16 +1081,22 @@ mod tests {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_until(b"ne")); assert_eq!(None, payload.read_until(b"ne").unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap(); payload.poll_stream().unwrap();
assert_eq!(Some(Bytes::from("line")), payload.read_until(b"ne")); assert_eq!(
Some(Bytes::from("line")),
payload.read_until(b"ne").unwrap()
);
assert_eq!(payload.buf.len(), 6); assert_eq!(payload.buf.len(), 6);
assert_eq!(Some(Bytes::from("1line2")), payload.read_until(b"2")); assert_eq!(
Some(Bytes::from("1line2")),
payload.read_until(b"2").unwrap()
);
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
}) })
} }