diff --git a/.gitignore b/.gitignore index fb74feb51..2c1955fc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ -/target/ - +target/ Cargo.lock /gh-pages __pycache__ @@ -10,7 +9,6 @@ __pycache__ *.pid *.sock *~ -target/ *.egg-info/ # These are backup files generated by rustfmt diff --git a/Cargo.toml b/Cargo.toml index 579b74575..a143b7d7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,6 @@ codecov = { repository = "fafhrd91/actix-web", branch = "master", service = "git name = "actix_web" path = "src/lib.rs" -[[bin]] -name = "test" -path = "src/main.rs" - [features] default = ["nightly"] @@ -32,17 +28,18 @@ default = ["nightly"] nightly = [] [dependencies] +log = "0.3" time = "0.1" http = "0.1" httparse = "0.1" http-range = "0.1" +mime = "0.3" mime_guess = "1.8" cookie = { version="0.10", features=["percent-encode"] } regex = "0.2" slab = "0.4" sha1 = "0.2" url = "1.5" -multipart-async = { version = "0.*", features=["server"]} # tokio bytes = "0.4" @@ -51,10 +48,6 @@ tokio-core = "0.1" tokio-io = "0.1" tokio-proto = "0.1" -# other -log = "0.3" -env_logger = "*" - [dependencies.actix] #path = "../actix" #git = "https://github.com/fafhrd91/actix.git" diff --git a/examples/multipart/Cargo.toml b/examples/multipart/Cargo.toml new file mode 100644 index 000000000..596e42043 --- /dev/null +++ b/examples/multipart/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "multipart-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "multipart" +path = "src/main.rs" + +[dependencies] +env_logger = "*" +actix = "0.2" +actix-web = { path = "../../" } diff --git a/examples/multipart/client.py b/examples/multipart/client.py new file mode 100644 index 000000000..698f291bd --- /dev/null +++ b/examples/multipart/client.py @@ -0,0 +1,18 @@ +import asyncio +import aiohttp + + +def client(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + + resp = yield from aiohttp.request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + +loop = asyncio.get_event_loop() +loop.run_until_complete(client()) diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs new file mode 100644 index 000000000..c2041b1ac --- /dev/null +++ b/examples/multipart/src/main.rs @@ -0,0 +1,70 @@ +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix::*; +use actix_web::*; + +struct MyRoute; + +impl Actor for MyRoute { + type Context = HttpContext; +} + +impl Route for MyRoute { + type State = (); + + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { + println!("{:?}", req); + match req.multipart(payload) { + Ok(multipart) => { + ctx.add_stream(multipart); + Reply::async(MyRoute) + }, + // can not read multipart + Err(_) => { + Reply::reply(httpcodes::HTTPBadRequest) + } + } + } +} + +impl ResponseType for MyRoute { + type Item = (); + type Error = (); +} + +impl StreamHandler for MyRoute { + fn finished(&mut self, ctx: &mut Self::Context) { + println!("FINISHED"); + ctx.start(httpcodes::HTTPOk); + ctx.write_eof(); + } +} + +impl Handler for MyRoute { + fn handle(&mut self, msg: multipart::MultipartItem, ctx: &mut HttpContext) + -> Response + { + println!("==== FIELD ==== {:?}", msg); + //if let Some(req) = self.req.take() { + Self::empty() + } +} + +fn main() { + let _ = env_logger::init(); + let sys = actix::System::new("multipart-example"); + + HttpServer::new( + RoutingMap::default() + .app("/", Application::default() + .resource("/multipart", |r| { + r.post::(); + }) + .finish()) + .finish()) + .serve::<_, ()>("127.0.0.1:8080").unwrap(); + + let _ = sys.run(); +} diff --git a/src/main.rs b/examples/websocket/src/main.rs similarity index 100% rename from src/main.rs rename to examples/websocket/src/main.rs diff --git a/src/decode.rs b/src/decode.rs index eb0caef02..e8115f47f 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -87,6 +87,9 @@ impl Decoder { if *remaining == 0 { Ok(Async::Ready(None)) } else { + if body.is_empty() { + return Ok(Async::NotReady) + } let len = body.len() as u64; let buf; if *remaining > len { @@ -106,7 +109,7 @@ impl Decoder { // advances the chunked state *state = try_ready!(state.step(body, size, &mut buf)); if *state == ChunkedState::End { - trace!("end of chunked"); + trace!("End of chunked stream"); return Ok(Async::Ready(None)); } if let Some(buf) = buf { diff --git a/src/httprequest.rs b/src/httprequest.rs index 5b5410228..95551b6b5 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -1,21 +1,19 @@ //! HTTP Request message related code. -use std::{io, str}; +use std::{str, fmt}; use std::collections::HashMap; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::{Async, Future, Stream, Poll}; use url::form_urlencoded; -use multipart_async::server::BodyChunk; use http::{header, Method, Version, Uri, HeaderMap}; use {Cookie, CookieParseError}; use {HttpRange, HttpRangeParseError}; use error::ParseError; use recognizer::Params; -use multipart::Multipart; use payload::{Payload, PayloadError}; +use multipart::{Multipart, MultipartError}; -#[derive(Debug)] /// An HTTP Request pub struct HttpRequest { version: Version, @@ -179,26 +177,13 @@ impl HttpRequest { /// Return stream to process BODY as multipart. /// /// Content-type: multipart/form-data; - pub fn multipart(&self, payload: Payload) -> Result, Payload> { - const BOUNDARY: &'static str = "boundary="; - - if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) { - if let Ok(content_type) = content_type.to_str() { - if let Some(start) = content_type.find(BOUNDARY) { - let start = start + BOUNDARY.len(); - let end = content_type[start..].find(';') - .map_or(content_type.len(), |end| start + end); - let boundary = &content_type[start .. end]; - - return Ok(Multipart::with_body(Req{pl: payload}, boundary)) - } - } - } - Err(payload) + pub fn multipart(&self, payload: Payload) -> Result { + Multipart::new(self, payload) } /// Parse `application/x-www-form-urlencoded` encoded body. - /// Return `UrlEncoded` future. It resolves to a `HashMap`. + /// Return `UrlEncoded` future. It resolves to a `HashMap` which + /// contains decoded parameters. /// /// Returns error: /// @@ -238,51 +223,25 @@ impl HttpRequest { } } - -#[doc(hidden)] -pub struct Req { - pl: Payload, -} - -#[doc(hidden)] -pub struct Chunk(Bytes); - -impl BodyChunk for Chunk { - #[inline] - fn split_at(mut self, idx: usize) -> (Self, Self) { - (Chunk(self.0.split_to(idx)), self) - } - - #[inline] - fn as_slice(&self) -> &[u8] { - self.0.as_ref() - } -} - -impl Stream for Req { - type Item = Chunk; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - match self.pl.poll() { - Err(_) => - Err(io::Error::new(io::ErrorKind::InvalidData, "incomplete")), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::Ready(Some(item))) => match item { - Ok(bytes) => Ok(Async::Ready(Some(Chunk(bytes)))), - Err(err) => match err { - PayloadError::Incomplete => - Err(io::Error::new(io::ErrorKind::InvalidData, "incomplete")), - PayloadError::ParseError(err) => - Err(err.into()) - } +impl fmt::Debug for HttpRequest { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let res = write!(f, "\nHttpRequest {:?} {}:{}\n", self.version, self.method, self.uri); + if !self.params.is_empty() { + let _ = write!(f, " params: {:?}\n", self.params); + } + let _ = write!(f, " headers:\n"); + for key in self.headers.keys() { + let vals: Vec<_> = self.headers.get_all(key).iter().collect(); + if vals.len() > 1 { + let _ = write!(f, " {:?}: {:?}\n", key, vals); + } else { + let _ = write!(f, " {:?}: {:?}\n", key, vals[0]); } } + res } } - /// Future that resolves to a parsed urlencoded values. pub struct UrlEncoded { pl: Payload, diff --git a/src/lib.rs b/src/lib.rs index 952d453be..2b766c410 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,8 @@ extern crate cookie; extern crate http; extern crate httparse; extern crate http_range; +extern crate mime; extern crate mime_guess; -extern crate multipart_async; extern crate url; extern crate actix; @@ -47,6 +47,7 @@ mod wsproto; pub mod ws; pub mod dev; pub mod httpcodes; +pub mod multipart; pub use error::ParseError; pub use application::{Application, ApplicationBuilder}; pub use httprequest::{HttpRequest, UrlEncoded}; @@ -65,9 +66,3 @@ pub use http::{Method, StatusCode}; pub use cookie::{Cookie, CookieBuilder}; pub use cookie::{ParseError as CookieParseError}; pub use http_range::{HttpRange, HttpRangeParseError}; - -/// Multipart support -pub mod multipart { - pub use multipart_async::server::{ - Field, FieldData, FieldHeaders, Multipart, ReadTextField, TextField}; -} diff --git a/src/multipart.rs b/src/multipart.rs new file mode 100644 index 000000000..82ed25716 --- /dev/null +++ b/src/multipart.rs @@ -0,0 +1,537 @@ +//! Multipart requests support. +use std::{cmp, fmt}; +use std::rc::Rc; +use std::cell::RefCell; +use std::marker::PhantomData; + +use mime; +use httparse; +use bytes::Bytes; +use http::HttpTryFrom; +use http::header::{self, HeaderMap, HeaderName, HeaderValue}; +use futures::{Async, Stream, Poll}; +use futures::task::{Task, current as current_task}; + +use payload::{Payload, PayloadError}; +use httprequest::HttpRequest; + +const MAX_HEADERS: usize = 32; + +#[derive(Debug)] +pub struct MultipartError { + pub payload: Payload, +} + +/// The server-side implementation of `multipart/form-data` requests. +/// +/// This will parse the incoming stream into `MultipartItem` instances via its +/// Stream implementation. +/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` +/// is used for nested multipart streams. +#[derive(Debug)] +pub struct Multipart { + safety: Safety, + payload: PayloadRef, + boundary: String, + eof: bool, + bof: bool, + item: InnerMultipartItem, +} + +#[derive(Debug)] +pub enum MultipartItem { + // Multipart field + Field(Field), + // Nested multipart item + Multipart(Multipart), +} + +#[derive(Debug)] +enum InnerMultipartItem { + None, + Field(Rc>), + // Nested multipart item + // Multipart(Multipart), +} + +impl Multipart { + pub fn new(req: &HttpRequest, payload: Payload) -> Result { + if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if let Ok(ct) = content_type.parse::() { + if let Some(boundary) = ct.get_param(mime::BOUNDARY) { + return Ok(Multipart { + safety: Safety::new(), + payload: PayloadRef::new(payload), + boundary: boundary.as_str().to_owned(), + eof: false, + bof: true, + item: InnerMultipartItem::None, + }) + } + } + } + } + Err(MultipartError{payload: payload}) + } + + fn read_headers(payload: &mut Payload) -> Poll + { + match payload.readuntil(b"\r\n\r\n")? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(bytes) => { + let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; + match httparse::parse_headers(&bytes, &mut hdrs) { + Ok(httparse::Status::Complete((_, hdrs))) => { + // convert headers + let mut headers = HeaderMap::with_capacity(hdrs.len()); + for h in hdrs { + if let Ok(name) = HeaderName::try_from(h.name) { + if let Ok(value) = HeaderValue::try_from(h.value) { + headers.append(name, value); + } else { + return Err(PayloadError::Incomplete) + } + } else { + return Err(PayloadError::Incomplete) + } + } + Ok(Async::Ready(headers)) + } + Ok(httparse::Status::Partial) | Err(_) => Err(PayloadError::Incomplete), + } + } + } + } + + fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll + { + // TODO: need to read epilogue + match payload.readline()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(chunk) => { + if chunk.len() == boundary.len() + 4 && + &chunk[..2] == b"--" && + &chunk[2..boundary.len()+2] == boundary.as_bytes() + { + Ok(Async::Ready(false)) + } else if chunk.len() == boundary.len() + 6 && + &chunk[..2] == b"--" && + &chunk[2..boundary.len()+2] == boundary.as_bytes() && + &chunk[boundary.len()+2..boundary.len()+4] == b"--" + { + Ok(Async::Ready(true)) + } else { + Err(PayloadError::Incomplete) + } + } + } + } + + fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll + { + let mut eof = false; + loop { + if let Async::Ready(chunk) = payload.readline()? { + if chunk.is_empty() { + //ValueError("Could not find starting boundary %r" + //% (self._boundary)) + } + if &chunk[..2] == b"--" && &chunk[2..chunk.len()-2] == boundary.as_bytes() { + break; + } else { + let b: &[u8] = boundary.as_ref(); + if chunk.len() <= boundary.len() + 2 && + &chunk[..boundary.len()] == b && + &chunk[boundary.len()..boundary.len()+2] == b"--" { + eof = true; + break; + } + } + } else { + return Ok(Async::NotReady) + } + } + Ok(Async::Ready(eof)) + } +} + +impl Drop for Multipart { + fn drop(&mut self) { + // InnerMultipartItem::Field has to be dropped first because of Safety. + self.item = InnerMultipartItem::None; + } +} + +impl Stream for Multipart { + type Item = MultipartItem; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.eof { + Ok(Async::Ready(None)) + } else { + // release field + loop { + let stop = match self.item { + InnerMultipartItem::Field(ref mut field) => { + match field.borrow_mut().poll(&self.safety)? { + Async::NotReady => + return Ok(Async::NotReady), + Async::Ready(Some(_)) => + continue, + Async::Ready(None) => + true, + } + } + _ => false, + }; + if stop { + self.item = InnerMultipartItem::None; + } + if let InnerMultipartItem::None = self.item { + break; + } + } + + let headers = if let Some(payload) = self.payload.get_mut(&self.safety) { + // read until first boundary + if self.bof { + if let Async::Ready(eof) = + Multipart::skip_until_boundary(payload, &self.boundary)? + { + self.eof = eof; + } else { + return Ok(Async::NotReady) + } + self.bof = false; + } else { + // read boundary + match Multipart::read_boundary(payload, &self.boundary)? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(eof) => self.eof = eof, + } + } + + if self.eof { + return Ok(Async::Ready(None)) + } + + // read field headers + if let Async::Ready(headers) = Multipart::read_headers(payload)? { + headers + } else { + return Ok(Async::NotReady) + } + } else { + debug!("NotReady: field is in flight"); + return Ok(Async::NotReady) + }; + + // + let field = Rc::new(RefCell::new(InnerField::new( + self.payload.clone(), self.boundary.clone(), &headers)?)); + self.item = InnerMultipartItem::Field(Rc::clone(&field)); + + Ok(Async::Ready(Some( + MultipartItem::Field(Field::new(self.safety.clone(), headers, field))))) + } + } +} + +/// A single field in a multipart stream +pub struct Field { + ct: mime::Mime, + headers: HeaderMap, + inner: Rc>, + safety: Safety, +} + +impl Field { + + fn new(safety: Safety, headers: HeaderMap, inner: Rc>) -> Self { + let mut mt = mime::APPLICATION_OCTET_STREAM; + if let Some(content_type) = headers.get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if let Ok(ct) = content_type.parse::() { + mt = ct; + } + } + } + Field { + ct: mt, + headers: headers, + inner: inner, + safety: safety, + } + } + + pub fn headers(&self) -> &HeaderMap { + &self.headers + } + + pub fn content_type(&self) -> &mime::Mime { + &self.ct + } +} + +impl Stream for Field { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.safety.current() { + self.inner.borrow_mut().poll(&self.safety) + } else { + Ok(Async::NotReady) + } + } +} + +impl fmt::Debug for Field { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let res = write!(f, "\nMultipartField: {}\n", self.ct); + let _ = write!(f, " boundary: {}\n", self.inner.borrow().boundary); + let _ = write!(f, " headers:\n"); + for key in self.headers.keys() { + let vals: Vec<_> = self.headers.get_all(key).iter().collect(); + if vals.len() > 1 { + let _ = write!(f, " {:?}: {:?}\n", key, vals); + } else { + let _ = write!(f, " {:?}: {:?}\n", key, vals[0]); + } + } + res + } +} + +#[derive(Debug)] +struct InnerField { + payload: Option, + boundary: String, + eof: bool, + length: Option, +} + +impl InnerField { + + fn new(payload: PayloadRef, boundary: String, headers: &HeaderMap) + -> Result + { + let len = if let Some(len) = headers.get(header::CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + Some(len) + } else { + return Err(PayloadError::Incomplete) + } + } else { + return Err(PayloadError::Incomplete) + } + } else { + None + }; + + Ok(InnerField { + payload: Some(payload), + boundary: boundary, + eof: false, + length: len }) + } + + /// Reads body part content chunk of the specified size. + /// The body part must has `Content-Length` header with proper value. + fn read_len(payload: &mut Payload, size: &mut u64) -> Poll, PayloadError> + { + if *size == 0 { + Ok(Async::Ready(None)) + } else { + match payload.readany() { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::Ready(Some(Ok(mut chunk))) => { + let len = cmp::min(chunk.len() as u64, *size); + *size -= len; + let ch = chunk.split_to(len as usize); + if !chunk.is_empty() { + payload.unread_data(chunk); + } + Ok(Async::Ready(Some(ch))) + }, + Async::Ready(Some(Err(err))) => Err(err) + } + } + } + + /// Reads content chunk of body part with unknown length. + /// The `Content-Length` header for body part is not necessary. + fn read_stream(payload: &mut Payload, boundary: &str) -> Poll, PayloadError> + { + match payload.readuntil(b"\r")? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(mut chunk) => { + if chunk.len() == 1 { + payload.unread_data(chunk); + match payload.readexactly(boundary.len() + 4)? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(chunk) => { + if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && + &chunk[4..] == boundary.as_bytes() + { + payload.unread_data(chunk); + Ok(Async::Ready(None)) + } else { + Ok(Async::Ready(Some(chunk))) + } + } + } + } else { + let to = chunk.len() - 1; + let ch = chunk.split_to(to); + payload.unread_data(chunk); + Ok(Async::Ready(Some(ch))) + } + } + } + } + + fn poll(&mut self, s: &Safety) -> Poll, PayloadError> { + if self.payload.is_none() { + return Ok(Async::Ready(None)) + } + if self.eof { + if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { + match payload.readline()? { + Async::NotReady => + return Ok(Async::NotReady), + Async::Ready(chunk) => { + assert_eq!( + chunk.as_ref(), b"\r\n", + "reader did not read all the data or it is malformed"); + } + } + } else { + return Ok(Async::NotReady); + } + + self.payload.take(); + return Ok(Async::Ready(None)) + } + + let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { + let res = if let Some(ref mut len) = self.length { + InnerField::read_len(payload, len)? + } else { + InnerField::read_stream(payload, &self.boundary)? + }; + + match res { + Async::NotReady => Async::NotReady, + Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), + Async::Ready(None) => { + self.eof = true; + match payload.readline()? { + Async::NotReady => Async::NotReady, + Async::Ready(chunk) => { + assert_eq!( + chunk.as_ref(), b"\r\n", + "reader did not read all the data or it is malformed"); + Async::Ready(None) + } + } + } + } + } else { + Async::NotReady + }; + + if Async::Ready(None) == result { + self.payload.take(); + } + Ok(result) + } +} + +#[derive(Debug)] +struct PayloadRef { + task: Option, + payload: Rc, +} + +impl PayloadRef { + fn new(payload: Payload) -> PayloadRef { + PayloadRef { + task: None, + payload: Rc::new(payload), + } + } + + fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut Payload> + where 'a: 'b + { + if s.current() { + let payload: &mut Payload = unsafe { + &mut *(self.payload.as_ref() as *const _ as *mut _)}; + Some(payload) + } else { + None + } + } +} + +impl Clone for PayloadRef { + fn clone(&self) -> PayloadRef { + PayloadRef { + task: Some(current_task()), + payload: Rc::clone(&self.payload), + } + } +} + +/// Counter. It tracks of number of clones of payloads and give access to payload only +/// to top most task panics if Safety get destroyed and it not top most task. +#[derive(Debug)] +struct Safety { + task: Option, + level: usize, + payload: Rc>, +} + +impl Safety { + fn new() -> Safety { + let payload = Rc::new(PhantomData); + Safety { + task: None, + level: Rc::strong_count(&payload), + payload: payload, + } + } + + fn current(&self) -> bool { + Rc::strong_count(&self.payload) == self.level + } + +} + +impl Clone for Safety { + fn clone(&self) -> Safety { + let payload = Rc::clone(&self.payload); + Safety { + task: Some(current_task()), + level: Rc::strong_count(&payload), + payload: payload, + } + } +} + +impl Drop for Safety { + fn drop(&mut self) { + // parent task is dead + if Rc::strong_count(&self.payload) != self.level { + panic!("Safety get dropped but it is not from top-most task"); + } + if let Some(task) = self.task.take() { + task.notify() + } + } +} diff --git a/src/payload.rs b/src/payload.rs index 6eb02df30..2ea9bddb2 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -30,6 +30,7 @@ impl From for PayloadError { /// Stream of byte chunks /// /// Payload stores chunks in vector. First chunk can be received with `.readany()` method. +#[derive(Debug)] pub struct Payload { inner: Rc>, } @@ -65,11 +66,29 @@ impl Payload { } /// Get first available chunk of data. - /// Chunk get returned as Some(PayloadItem), `None` indicates eof. + /// Returns Some(PayloadItem) as chunk, `None` indicates eof. pub fn readany(&mut self) -> Async> { self.inner.borrow_mut().readany() } + /// Get exactly number of bytes + /// Returns Some(PayloadItem) as chunk, `None` indicates eof. + pub fn readexactly(&mut self, size: usize) -> Result, PayloadError> { + self.inner.borrow_mut().readexactly(size) + } + + /// Read until `\n` + /// Returns Some(PayloadItem) as line, `None` indicates eof. + pub fn readline(&mut self) -> Result, PayloadError> { + self.inner.borrow_mut().readline() + } + + /// Read until match line + /// Returns Some(PayloadItem) as line, `None` indicates eof. + pub fn readuntil(&mut self, line: &[u8]) -> Result, PayloadError> { + self.inner.borrow_mut().readuntil(line) + } + #[doc(hidden)] pub fn readall(&mut self) -> Option { self.inner.borrow_mut().readall() @@ -135,6 +154,7 @@ impl PayloadSender { } } +#[derive(Debug)] struct Inner { len: usize, eof: bool, @@ -213,6 +233,87 @@ impl Inner { } } + fn readexactly(&mut self, size: usize) -> Result, PayloadError> { + if size <= self.len { + let mut buf = BytesMut::with_capacity(size); + while buf.len() < size { + let mut chunk = self.items.pop_front().unwrap(); + let rem = size - buf.len(); + buf.extend(&chunk.split_to(rem)); + if !chunk.is_empty() { + self.items.push_front(chunk); + return Ok(Async::Ready(buf.freeze())) + } + } + } + + if let Some(err) = self.err.take() { + Err(err) + } else { + self.task = Some(current_task()); + Ok(Async::NotReady) + } + } + + fn readuntil(&mut self, line: &[u8]) -> Result, PayloadError> { + let mut idx = 0; + let mut num = 0; + let mut offset = 0; + let mut found = false; + let mut length = 0; + + for no in 0..self.items.len() { + { + let chunk = &self.items[no]; + for (pos, ch) in chunk.iter().enumerate() { + if *ch == line[idx] { + idx += 1; + if idx == line.len() { + num = no; + offset = pos+1; + length += pos; + found = true; + break; + } + } else { + idx = 0 + } + } + if !found { + length += chunk.len() + } + } + + if found { + let mut buf = BytesMut::with_capacity(length); + if num > 0 { + for _ in 0..num { + buf.extend(self.items.pop_front().unwrap()); + } + } + if offset > 0 { + let mut chunk = self.items.pop_front().unwrap(); + buf.extend(chunk.split_to(offset)); + if !chunk.is_empty() { + self.items.push_front(chunk) + } + } + self.len -= length; + return Ok(Async::Ready(buf.freeze())) + } + } + if let Some(err) = self.err.take() { + Err(err) + } else { + self.task = Some(current_task()); + Ok(Async::NotReady) + } + } + + fn readline(&mut self) -> Result, PayloadError> { + self.readuntil(b"\n") + } + #[doc(hidden)] pub fn readall(&mut self) -> Option { let len = self.items.iter().fold(0, |cur, item| cur + item.len()); @@ -222,6 +323,7 @@ impl Inner { buf.extend(item); } self.items = VecDeque::new(); + self.len = 0; Some(buf.take().freeze()) } else { None diff --git a/src/reader.rs b/src/reader.rs index 10be74131..85af482cb 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -168,7 +168,7 @@ impl Reader { } match self.read_from_io(io) { Ok(Async::Ready(0)) => { - trace!("parse eof"); + trace!("Eof during parse"); return Err(ReaderError::Error(ParseError::Incomplete)); }, Ok(Async::Ready(_)) => (), diff --git a/src/recognizer.rs b/src/recognizer.rs index 6c01141c3..c3296d74a 100644 --- a/src/recognizer.rs +++ b/src/recognizer.rs @@ -134,7 +134,9 @@ pub struct Params { } impl Params { - pub(crate) fn new(names: Rc>, text: &str, captures: Captures) -> Self + pub(crate) fn new(names: Rc>, + text: &str, + captures: Captures) -> Self { Params { names, @@ -155,6 +157,10 @@ impl Params { } } + pub fn is_empty(&self) -> bool { + self.names.is_empty() + } + fn by_idx(&self, index: usize) -> Option<&str> { self.matches .get(index + 1) diff --git a/src/resource.rs b/src/resource.rs index 762a41179..d1a8aba3e 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -141,7 +141,7 @@ pub struct Reply (ReplyItem); impl Reply where A: Actor + Route { /// Create async response - pub fn stream(act: A) -> Self { + pub fn async(act: A) -> Self { Reply(ReplyItem::Actor(act)) } diff --git a/src/ws.rs b/src/ws.rs index 774ff62a3..d141ce336 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -32,7 +32,7 @@ //! // Map Payload into WsStream //! ctx.add_stream(ws::WsStream::new(payload)); //! // Start ws messages processing -//! Reply::stream(WsRoute) +//! Reply::async(WsRoute) //! }, //! Err(err) => //! Reply::reply(err)