diff --git a/src/client/connector.rs b/src/client/connector.rs index 5f27b8265..4e8ac214b 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -12,9 +12,11 @@ use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature="alpn")] -use openssl::ssl::{SslMethod, SslConnector, SslVerifyMode, Error as OpensslError}; +use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError}; #[cfg(feature="alpn")] use tokio_openssl::SslConnectorExt; +#[cfg(feature="alpn")] +use futures::Future; use HAS_OPENSSL; use server::IoStream; @@ -92,7 +94,7 @@ impl Default for ClientConnector { fn default() -> ClientConnector { #[cfg(feature="alpn")] { - let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + let builder = SslConnector::builder(SslMethod::tls()).unwrap(); ClientConnector { connector: builder.build() } @@ -149,9 +151,7 @@ impl ClientConnector { /// } /// ``` pub fn with_connector(connector: SslConnector) -> ClientConnector { - ClientConnector { - connector: connector - } + ClientConnector { connector } } } @@ -196,7 +196,7 @@ impl Handler for ClientConnector { if proto.is_secure() { fut::Either::A( _act.connector.connect_async(&host, stream) - .map_err(|e| ClientConnectorError::SslError(e)) + .map_err(ClientConnectorError::SslError) .map(|stream| Connection{stream: Box::new(stream)}) .into_actor(_act)) } else { diff --git a/src/payload.rs b/src/payload.rs index b193cf646..401832b89 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -4,7 +4,7 @@ use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::VecDeque; use bytes::{Bytes, BytesMut}; -use futures::{Future, Async, Poll, Stream}; +use futures::{Async, Poll, Stream}; use futures::task::{Task, current as current_task}; use error::PayloadError; @@ -62,30 +62,6 @@ impl Payload { self.inner.borrow().len() == 0 } - /// Get exact number of bytes - #[inline] - pub fn readexactly(&self, size: usize) -> ReadExactly { - ReadExactly(Rc::clone(&self.inner), size) - } - - /// Read until `\n` - #[inline] - pub fn readline(&self) -> ReadLine { - ReadLine(Rc::clone(&self.inner)) - } - - /// Read until match line - #[inline] - pub fn readuntil(&self, line: &[u8]) -> ReadUntil { - ReadUntil(Rc::clone(&self.inner), line.to_vec()) - } - - #[doc(hidden)] - #[inline] - pub fn readall(&self) -> Option { - self.inner.borrow_mut().readall() - } - /// Put unused data back to payload #[inline] pub fn unread_data(&mut self, data: Bytes) { @@ -103,6 +79,11 @@ impl Payload { pub fn set_buffer_size(&self, size: usize) { self.inner.borrow_mut().set_buffer_size(size) } + + #[cfg(test)] + pub(crate) fn readall(&self) -> Option { + self.inner.borrow_mut().readall() + } } impl Stream for Payload { @@ -121,51 +102,6 @@ impl Clone for Payload { } } -/// Get exact number of bytes -pub struct ReadExactly(Rc>, usize); - -impl Future for ReadExactly { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll { - match self.0.borrow_mut().readexactly(self.1, false)? { - Async::Ready(chunk) => Ok(Async::Ready(chunk)), - Async::NotReady => Ok(Async::NotReady), - } - } -} - -/// Read until `\n` -pub struct ReadLine(Rc>); - -impl Future for ReadLine { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll { - match self.0.borrow_mut().readline(false)? { - Async::Ready(chunk) => Ok(Async::Ready(chunk)), - Async::NotReady => Ok(Async::NotReady), - } - } -} - -/// Read until match line -pub struct ReadUntil(Rc>, Vec); - -impl Future for ReadUntil { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll { - match self.0.borrow_mut().readuntil(&self.1, false)? { - Async::Ready(chunk) => Ok(Async::Ready(chunk)), - Async::NotReady => Ok(Async::NotReady), - } - } -} - /// Payload writer interface. pub trait PayloadWriter { @@ -271,6 +207,22 @@ impl Inner { self.len } + #[cfg(test)] + pub(crate) fn readall(&mut self) -> Option { + let len = self.items.iter().map(|b| b.len()).sum(); + if len > 0 { + let mut buf = BytesMut::with_capacity(len); + for item in &self.items { + buf.extend_from_slice(item); + } + self.items = VecDeque::new(); + self.len = 0; + Some(buf.take().freeze()) + } else { + None + } + } + fn readany(&mut self, notify: bool) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); @@ -287,107 +239,6 @@ impl Inner { } } - fn readexactly(&mut self, size: usize, notify: bool) -> Result, PayloadError> { - if size <= self.len { - let mut buf = BytesMut::with_capacity(size); - while buf.len() < size { - let mut chunk = self.items.pop_front().unwrap(); - let rem = cmp::min(size - buf.len(), chunk.len()); - self.len -= rem; - buf.extend_from_slice(&chunk.split_to(rem)); - if !chunk.is_empty() { - self.items.push_front(chunk); - } - } - return Ok(Async::Ready(buf.freeze())) - } - - if let Some(err) = self.err.take() { - Err(err) - } else { - if notify { - self.task = Some(current_task()); - } - Ok(Async::NotReady) - } - } - - fn readuntil(&mut self, line: &[u8], notify: bool) -> Result, PayloadError> { - let mut idx = 0; - let mut num = 0; - let mut offset = 0; - let mut found = false; - let mut length = 0; - - for no in 0..self.items.len() { - { - let chunk = &self.items[no]; - for (pos, ch) in chunk.iter().enumerate() { - if *ch == line[idx] { - idx += 1; - if idx == line.len() { - num = no; - offset = pos+1; - length += pos+1; - found = true; - break; - } - } else { - idx = 0 - } - } - if !found { - length += chunk.len() - } - } - - if found { - let mut buf = BytesMut::with_capacity(length); - if num > 0 { - for _ in 0..num { - buf.extend_from_slice(&self.items.pop_front().unwrap()); - } - } - if offset > 0 { - let mut chunk = self.items.pop_front().unwrap(); - buf.extend_from_slice(&chunk.split_to(offset)); - if !chunk.is_empty() { - self.items.push_front(chunk) - } - } - self.len -= length; - return Ok(Async::Ready(buf.freeze())) - } - } - if let Some(err) = self.err.take() { - Err(err) - } else { - if notify { - self.task = Some(current_task()); - } - Ok(Async::NotReady) - } - } - - fn readline(&mut self, notify: bool) -> Result, PayloadError> { - self.readuntil(b"\n", notify) - } - - pub fn readall(&mut self) -> Option { - let len = self.items.iter().map(|b| b.len()).sum(); - if len > 0 { - let mut buf = BytesMut::with_capacity(len); - for item in &self.items { - buf.extend_from_slice(item); - } - self.items = VecDeque::new(); - self.len = 0; - Some(buf.take().freeze()) - } else { - None - } - } - fn unread_data(&mut self, data: Bytes) { self.len += data.len(); self.items.push_front(data); @@ -592,12 +443,11 @@ mod tests { #[test] fn test_basic() { Core::new().unwrap().run(lazy(|| { - let (_, mut payload) = Payload::new(false); + let (_, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); - assert!(!payload.eof()); - assert!(payload.is_empty()); - assert_eq!(payload.len(), 0); - assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); + assert_eq!(payload.len, 0); + assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) @@ -607,23 +457,18 @@ mod tests { #[test] fn test_eof() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); - - assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); - assert!(!payload.eof()); + let (mut sender, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); + assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); sender.feed_data(Bytes::from("data")); sender.feed_eof(); - assert!(!payload.eof()); - assert_eq!(Async::Ready(Some(Bytes::from("data"))), - payload.poll().ok().unwrap()); - assert!(payload.is_empty()); - assert!(payload.eof()); - assert_eq!(payload.len(), 0); + payload.readany().ok().unwrap()); + assert_eq!(payload.len, 0); + assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); - assert_eq!(Async::Ready(None), payload.poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -632,12 +477,13 @@ mod tests { #[test] fn test_err() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); - assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); sender.set_error(PayloadError::Incomplete); - payload.poll().err().unwrap(); + payload.readany().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -646,21 +492,19 @@ mod tests { #[test] fn test_readany() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); sender.feed_data(Bytes::from("line1")); - - assert!(!payload.is_empty()); - assert_eq!(payload.len(), 5); - sender.feed_data(Bytes::from("line2")); - assert!(!payload.is_empty()); - assert_eq!(payload.len(), 10); assert_eq!(Async::Ready(Some(Bytes::from("line1"))), - payload.poll().ok().unwrap()); - assert!(!payload.is_empty()); - assert_eq!(payload.len(), 5); + payload.readany().ok().unwrap()); + assert_eq!(payload.len, 0); + + assert_eq!(Async::Ready(Some(Bytes::from("line2"))), + payload.readany().ok().unwrap()); + assert_eq!(payload.len, 0); let res: Result<(), ()> = Ok(()); result(res) @@ -671,23 +515,23 @@ mod tests { fn test_readexactly() { Core::new().unwrap().run(lazy(|| { let (mut sender, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); - assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); - assert_eq!(payload.len(), 10); - assert_eq!(Async::Ready(Bytes::from("li")), - payload.readexactly(2).poll().ok().unwrap()); - assert_eq!(payload.len(), 8); + assert_eq!(Async::Ready(Some(BytesMut::from("li"))), + payload.readexactly(2).ok().unwrap()); + assert_eq!(payload.len, 3); - assert_eq!(Async::Ready(Bytes::from("ne1l")), - payload.readexactly(4).poll().ok().unwrap()); - assert_eq!(payload.len(), 4); + assert_eq!(Async::Ready(Some(BytesMut::from("ne1l"))), + payload.readexactly(4).ok().unwrap()); + assert_eq!(payload.len, 4); sender.set_error(PayloadError::Incomplete); - payload.readexactly(10).poll().err().unwrap(); + payload.readexactly(10).err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) @@ -698,23 +542,23 @@ mod tests { fn test_readuntil() { Core::new().unwrap().run(lazy(|| { let (mut sender, payload) = Payload::new(false); + let mut payload = PayloadHelper::new(payload); - assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); - assert_eq!(payload.len(), 10); - assert_eq!(Async::Ready(Bytes::from("line")), - payload.readuntil(b"ne").poll().ok().unwrap()); - assert_eq!(payload.len(), 6); + assert_eq!(Async::Ready(Some(Bytes::from("line"))), + payload.readuntil(b"ne").ok().unwrap()); + assert_eq!(payload.len, 1); - assert_eq!(Async::Ready(Bytes::from("1line2")), - payload.readuntil(b"2").poll().ok().unwrap()); - assert_eq!(payload.len(), 0); + assert_eq!(Async::Ready(Some(Bytes::from("1line2"))), + payload.readuntil(b"2").ok().unwrap()); + assert_eq!(payload.len, 0); sender.set_error(PayloadError::Incomplete); - payload.readuntil(b"b").poll().err().unwrap(); + payload.readuntil(b"b").err().unwrap(); let res: Result<(), ()> = Ok(()); result(res)