From d53f3d718793aa9f7aed0feca1f6de74b970a4f7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 5 Oct 2018 10:19:15 -0700 Subject: [PATCH] re-enable websockets --- src/lib.rs | 2 +- src/ws/client.rs | 602 ----------------------------------------------- src/ws/mod.rs | 71 +----- 3 files changed, 6 insertions(+), 669 deletions(-) delete mode 100644 src/ws/client.rs diff --git a/src/lib.rs b/src/lib.rs index 0544569af..ae5a9f957 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,7 +142,7 @@ pub mod error; pub mod h1; pub(crate) mod helpers; pub mod test; -//pub mod ws; +pub mod ws; pub use body::{Binary, Body}; pub use error::{Error, ResponseError, Result}; pub use extensions::Extensions; diff --git a/src/ws/client.rs b/src/ws/client.rs deleted file mode 100644 index 18789fef8..000000000 --- a/src/ws/client.rs +++ /dev/null @@ -1,602 +0,0 @@ -//! Http client request -use std::cell::RefCell; -use std::rc::Rc; -use std::time::Duration; -use std::{fmt, io, str}; - -use base64; -use bytes::Bytes; -use cookie::Cookie; -use futures::sync::mpsc::{unbounded, UnboundedSender}; -use futures::{Async, Future, Poll, Stream}; -use http::header::{self, HeaderName, HeaderValue}; -use http::{Error as HttpError, HttpTryFrom, StatusCode}; -use rand; -use sha1::Sha1; - -use actix::{Addr, SystemService}; - -use body::{Binary, Body}; -use error::{Error, UrlParseError}; -use header::IntoHeaderValue; -use httpmessage::HttpMessage; -use payload::PayloadBuffer; - -use client::{ - ClientConnector, ClientRequest, ClientRequestBuilder, HttpResponseParserError, - Pipeline, SendRequest, SendRequestError, -}; - -use super::frame::{Frame, FramedMessage}; -use super::proto::{CloseReason, OpCode}; -use super::{Message, ProtocolError, WsWriter}; - -/// Websocket client error -#[derive(Fail, Debug)] -pub enum ClientError { - /// Invalid url - #[fail(display = "Invalid url")] - InvalidUrl, - /// Invalid response status - #[fail(display = "Invalid response status")] - InvalidResponseStatus(StatusCode), - /// Invalid upgrade header - #[fail(display = "Invalid upgrade header")] - InvalidUpgradeHeader, - /// Invalid connection header - #[fail(display = "Invalid connection header")] - InvalidConnectionHeader(HeaderValue), - /// Missing CONNECTION header - #[fail(display = "Missing CONNECTION header")] - MissingConnectionHeader, - /// Missing SEC-WEBSOCKET-ACCEPT header - #[fail(display = "Missing SEC-WEBSOCKET-ACCEPT header")] - MissingWebSocketAcceptHeader, - /// Invalid challenge response - #[fail(display = "Invalid challenge response")] - InvalidChallengeResponse(String, HeaderValue), - /// Http parsing error - #[fail(display = "Http parsing error")] - Http(Error), - /// Url parsing error - #[fail(display = "Url parsing error")] - Url(UrlParseError), - /// Response parsing error - #[fail(display = "Response parsing error")] - ResponseParseError(HttpResponseParserError), - /// Send request error - #[fail(display = "{}", _0)] - SendRequest(SendRequestError), - /// Protocol error - #[fail(display = "{}", _0)] - Protocol(#[cause] ProtocolError), - /// IO Error - #[fail(display = "{}", _0)] - Io(io::Error), - /// "Disconnected" - #[fail(display = "Disconnected")] - Disconnected, -} - -impl From for ClientError { - fn from(err: Error) -> ClientError { - ClientError::Http(err) - } -} - -impl From for ClientError { - fn from(err: UrlParseError) -> ClientError { - ClientError::Url(err) - } -} - -impl From for ClientError { - fn from(err: SendRequestError) -> ClientError { - ClientError::SendRequest(err) - } -} - -impl From for ClientError { - fn from(err: ProtocolError) -> ClientError { - ClientError::Protocol(err) - } -} - -impl From for ClientError { - fn from(err: io::Error) -> ClientError { - ClientError::Io(err) - } -} - -impl From for ClientError { - fn from(err: HttpResponseParserError) -> ClientError { - ClientError::ResponseParseError(err) - } -} - -/// `WebSocket` client -/// -/// Example of `WebSocket` client usage is available in -/// [websocket example]( -/// https://github.com/actix/examples/blob/master/websocket/src/client.rs#L24) -pub struct Client { - request: ClientRequestBuilder, - err: Option, - http_err: Option, - origin: Option, - protocols: Option, - conn: Addr, - max_size: usize, - no_masking: bool, -} - -impl Client { - /// Create new websocket connection - pub fn new>(uri: S) -> Client { - Client::with_connector(uri, ClientConnector::from_registry()) - } - - /// Create new websocket connection with custom `ClientConnector` - pub fn with_connector>(uri: S, conn: Addr) -> Client { - let mut cl = Client { - request: ClientRequest::build(), - err: None, - http_err: None, - origin: None, - protocols: None, - max_size: 65_536, - no_masking: false, - conn, - }; - cl.request.uri(uri.as_ref()); - cl - } - - /// Set supported websocket protocols - pub fn protocols(mut self, protos: U) -> Self - where - U: IntoIterator + 'static, - V: AsRef, - { - let mut protos = protos - .into_iter() - .fold(String::new(), |acc, s| acc + s.as_ref() + ","); - protos.pop(); - self.protocols = Some(protos); - self - } - - /// Set cookie for handshake request - pub fn cookie(mut self, cookie: Cookie) -> Self { - self.request.cookie(cookie); - self - } - - /// Set request Origin - pub fn origin(mut self, origin: V) -> Self - where - HeaderValue: HttpTryFrom, - { - match HeaderValue::try_from(origin) { - Ok(value) => self.origin = Some(value), - Err(e) => self.http_err = Some(e.into()), - } - self - } - - /// Set max frame size - /// - /// By default max size is set to 64kb - pub fn max_frame_size(mut self, size: usize) -> Self { - self.max_size = size; - self - } - - /// Set write buffer capacity - /// - /// Default buffer capacity is 32kb - pub fn write_buffer_capacity(mut self, cap: usize) -> Self { - self.request.write_buffer_capacity(cap); - self - } - - /// Disable payload masking. By default ws client masks frame payload. - pub fn no_masking(mut self) -> Self { - self.no_masking = true; - self - } - - /// Set request header - pub fn header(mut self, key: K, value: V) -> Self - where - HeaderName: HttpTryFrom, - V: IntoHeaderValue, - { - self.request.header(key, value); - self - } - - /// Set websocket handshake timeout - /// - /// Handshake timeout is a total time for successful handshake. - /// Default value is 5 seconds. - pub fn timeout(mut self, timeout: Duration) -> Self { - self.request.timeout(timeout); - self - } - - /// Connect to websocket server and do ws handshake - pub fn connect(&mut self) -> ClientHandshake { - if let Some(e) = self.err.take() { - ClientHandshake::error(e) - } else if let Some(e) = self.http_err.take() { - ClientHandshake::error(Error::from(e).into()) - } else { - // origin - if let Some(origin) = self.origin.take() { - self.request.set_header(header::ORIGIN, origin); - } - - self.request.upgrade(); - self.request.set_header(header::UPGRADE, "websocket"); - self.request.set_header(header::CONNECTION, "upgrade"); - self.request.set_header(header::SEC_WEBSOCKET_VERSION, "13"); - self.request.with_connector(self.conn.clone()); - - if let Some(protocols) = self.protocols.take() { - self.request - .set_header(header::SEC_WEBSOCKET_PROTOCOL, protocols.as_str()); - } - let request = match self.request.finish() { - Ok(req) => req, - Err(err) => return ClientHandshake::error(err.into()), - }; - - if request.uri().host().is_none() { - return ClientHandshake::error(ClientError::InvalidUrl); - } - if let Some(scheme) = request.uri().scheme_part() { - if scheme != "http" - && scheme != "https" - && scheme != "ws" - && scheme != "wss" - { - return ClientHandshake::error(ClientError::InvalidUrl); - } - } else { - return ClientHandshake::error(ClientError::InvalidUrl); - } - - // start handshake - ClientHandshake::new(request, self.max_size, self.no_masking) - } - } -} - -struct Inner { - tx: UnboundedSender, - rx: PayloadBuffer>, - closed: bool, -} - -/// Future that implementes client websocket handshake process. -/// -/// It resolves to a pair of `ClientReader` and `ClientWriter` that -/// can be used for reading and writing websocket frames. -pub struct ClientHandshake { - request: Option, - tx: Option>, - key: String, - error: Option, - max_size: usize, - no_masking: bool, -} - -impl ClientHandshake { - fn new( - mut request: ClientRequest, max_size: usize, no_masking: bool, - ) -> ClientHandshake { - // Generate a random key for the `Sec-WebSocket-Key` header. - // a base64-encoded (see Section 4 of [RFC4648]) value that, - // when decoded, is 16 bytes in length (RFC 6455) - let sec_key: [u8; 16] = rand::random(); - let key = base64::encode(&sec_key); - - request.headers_mut().insert( - header::SEC_WEBSOCKET_KEY, - HeaderValue::try_from(key.as_str()).unwrap(), - ); - - let (tx, rx) = unbounded(); - request.set_body(Body::Streaming(Box::new(rx.map_err(|_| { - io::Error::new(io::ErrorKind::Other, "disconnected").into() - })))); - - ClientHandshake { - key, - max_size, - no_masking, - request: Some(request.send()), - tx: Some(tx), - error: None, - } - } - - fn error(err: ClientError) -> ClientHandshake { - ClientHandshake { - key: String::new(), - request: None, - tx: None, - error: Some(err), - max_size: 0, - no_masking: false, - } - } - - /// Set handshake timeout - /// - /// Handshake timeout is a total time before handshake should be completed. - /// Default value is 5 seconds. - pub fn timeout(mut self, timeout: Duration) -> Self { - if let Some(request) = self.request.take() { - self.request = Some(request.timeout(timeout)); - } - self - } - - /// Set connection timeout - /// - /// Connection timeout includes resolving hostname and actual connection to - /// the host. - /// Default value is 1 second. - pub fn conn_timeout(mut self, timeout: Duration) -> Self { - if let Some(request) = self.request.take() { - self.request = Some(request.conn_timeout(timeout)); - } - self - } -} - -impl Future for ClientHandshake { - type Item = (ClientReader, ClientWriter); - type Error = ClientError; - - fn poll(&mut self) -> Poll { - if let Some(err) = self.error.take() { - return Err(err); - } - - let resp = match self.request.as_mut().unwrap().poll()? { - Async::Ready(response) => { - self.request.take(); - response - } - Async::NotReady => return Ok(Async::NotReady), - }; - - // verify response - if resp.status() != StatusCode::SWITCHING_PROTOCOLS { - return Err(ClientError::InvalidResponseStatus(resp.status())); - } - // Check for "UPGRADE" to websocket header - let has_hdr = if let Some(hdr) = resp.headers().get(header::UPGRADE) { - if let Ok(s) = hdr.to_str() { - s.to_lowercase().contains("websocket") - } else { - false - } - } else { - false - }; - if !has_hdr { - trace!("Invalid upgrade header"); - return Err(ClientError::InvalidUpgradeHeader); - } - // Check for "CONNECTION" header - if let Some(conn) = resp.headers().get(header::CONNECTION) { - if let Ok(s) = conn.to_str() { - if !s.to_lowercase().contains("upgrade") { - trace!("Invalid connection header: {}", s); - return Err(ClientError::InvalidConnectionHeader(conn.clone())); - } - } else { - trace!("Invalid connection header: {:?}", conn); - return Err(ClientError::InvalidConnectionHeader(conn.clone())); - } - } else { - trace!("Missing connection header"); - return Err(ClientError::MissingConnectionHeader); - } - - if let Some(key) = resp.headers().get(header::SEC_WEBSOCKET_ACCEPT) { - // field is constructed by concatenating /key/ - // with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455) - const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - let mut sha1 = Sha1::new(); - sha1.update(self.key.as_ref()); - sha1.update(WS_GUID); - let encoded = base64::encode(&sha1.digest().bytes()); - if key.as_bytes() != encoded.as_bytes() { - trace!( - "Invalid challenge response: expected: {} received: {:?}", - encoded, - key - ); - return Err(ClientError::InvalidChallengeResponse(encoded, key.clone())); - } - } else { - trace!("Missing SEC-WEBSOCKET-ACCEPT header"); - return Err(ClientError::MissingWebSocketAcceptHeader); - }; - - let inner = Inner { - tx: self.tx.take().unwrap(), - rx: PayloadBuffer::new(resp.payload()), - closed: false, - }; - - let inner = Rc::new(RefCell::new(inner)); - Ok(Async::Ready(( - ClientReader { - inner: Rc::clone(&inner), - max_size: self.max_size, - no_masking: self.no_masking, - }, - ClientWriter { inner }, - ))) - } -} - -/// Websocket reader client -pub struct ClientReader { - inner: Rc>, - max_size: usize, - no_masking: bool, -} - -impl fmt::Debug for ClientReader { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ws::ClientReader()") - } -} - -impl Stream for ClientReader { - type Item = Message; - type Error = ProtocolError; - - fn poll(&mut self) -> Poll, Self::Error> { - let max_size = self.max_size; - let no_masking = self.no_masking; - let mut inner = self.inner.borrow_mut(); - if inner.closed { - return Ok(Async::Ready(None)); - } - - // read - match Frame::parse(&mut inner.rx, no_masking, max_size) { - Ok(Async::Ready(Some(frame))) => { - let (_finished, opcode, payload) = frame.unpack(); - - match opcode { - // continuation is not supported - OpCode::Continue => { - inner.closed = true; - Err(ProtocolError::NoContinuation) - } - OpCode::Bad => { - inner.closed = true; - Err(ProtocolError::BadOpCode) - } - OpCode::Close => { - inner.closed = true; - let close_reason = Frame::parse_close_payload(&payload); - Ok(Async::Ready(Some(Message::Close(close_reason)))) - } - OpCode::Ping => Ok(Async::Ready(Some(Message::Ping( - String::from_utf8_lossy(payload.as_ref()).into(), - )))), - OpCode::Pong => Ok(Async::Ready(Some(Message::Pong( - String::from_utf8_lossy(payload.as_ref()).into(), - )))), - OpCode::Binary => Ok(Async::Ready(Some(Message::Binary(payload)))), - OpCode::Text => { - let tmp = Vec::from(payload.as_ref()); - match String::from_utf8(tmp) { - Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), - Err(_) => { - inner.closed = true; - Err(ProtocolError::BadEncoding) - } - } - } - } - } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => { - inner.closed = true; - Err(e) - } - } - } -} - -/// Websocket writer client -pub struct ClientWriter { - inner: Rc>, -} - -impl ClientWriter { - /// Write payload - #[inline] - fn write(&mut self, mut data: FramedMessage) { - let inner = self.inner.borrow_mut(); - if !inner.closed { - let _ = inner.tx.unbounded_send(data.0.take()); - } else { - warn!("Trying to write to disconnected response"); - } - } - - /// Send text frame - #[inline] - pub fn text>(&mut self, text: T) { - self.write(Frame::message(text.into(), OpCode::Text, true, true)); - } - - /// Send binary frame - #[inline] - pub fn binary>(&mut self, data: B) { - self.write(Frame::message(data, OpCode::Binary, true, true)); - } - - /// Send ping frame - #[inline] - pub fn ping(&mut self, message: &str) { - self.write(Frame::message(Vec::from(message), OpCode::Ping, true, true)); - } - - /// Send pong frame - #[inline] - pub fn pong(&mut self, message: &str) { - self.write(Frame::message(Vec::from(message), OpCode::Pong, true, true)); - } - - /// Send close frame - #[inline] - pub fn close(&mut self, reason: Option) { - self.write(Frame::close(reason, true)); - } -} - -impl WsWriter for ClientWriter { - /// Send text frame - #[inline] - fn send_text>(&mut self, text: T) { - self.text(text) - } - - /// Send binary frame - #[inline] - fn send_binary>(&mut self, data: B) { - self.binary(data) - } - - /// Send ping frame - #[inline] - fn send_ping(&mut self, message: &str) { - self.ping(message) - } - - /// Send pong frame - #[inline] - fn send_pong(&mut self, message: &str) { - self.pong(message) - } - - /// Send close frame - #[inline] - fn send_close(&mut self, reason: Option) { - self.close(reason); - } -} diff --git a/src/ws/mod.rs b/src/ws/mod.rs index c16f8d6d2..6bb84c189 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -1,69 +1,23 @@ -//! `WebSocket` support for Actix +//! `WebSocket` support. //! //! To setup a `WebSocket`, first do web socket handshake then on success //! convert `Payload` into a `WsStream` stream and then use `WsWriter` to //! communicate with the peer. -//! -//! ## Example -//! -//! ```rust -//! # extern crate actix_web; -//! # use actix_web::actix::*; -//! # use actix_web::*; -//! use actix_web::{ws, HttpRequest, HttpResponse}; -//! -//! // do websocket handshake and start actor -//! fn ws_index(req: &HttpRequest) -> Result { -//! ws::start(req, Ws) -//! } -//! -//! struct Ws; -//! -//! impl Actor for Ws { -//! type Context = ws::WebsocketContext; -//! } -//! -//! // Handler for ws::Message messages -//! impl StreamHandler for Ws { -//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { -//! match msg { -//! ws::Message::Ping(msg) => ctx.pong(&msg), -//! ws::Message::Text(text) => ctx.text(text), -//! ws::Message::Binary(bin) => ctx.binary(bin), -//! _ => (), -//! } -//! } -//! } -//! # -//! # fn main() { -//! # App::new() -//! # .resource("/ws/", |r| r.f(ws_index)) // <- register websocket route -//! # .finish(); -//! # } //! ``` use bytes::Bytes; use futures::{Async, Poll, Stream}; use http::{header, Method, StatusCode}; -use super::actix::{Actor, StreamHandler}; - use body::Binary; -use error::{Error, PayloadError, ResponseError}; -use httpmessage::HttpMessage; -use httprequest::HttpRequest; +use error::{PayloadError, ResponseError}; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; use payload::PayloadBuffer; +use request::Request; -mod client; -mod context; mod frame; mod mask; mod proto; -pub use self::client::{ - Client, ClientError, ClientHandshake, ClientReader, ClientWriter, -}; -pub use self::context::WebsocketContext; pub use self::frame::{Frame, FramedMessage}; pub use self::proto::{CloseCode, CloseReason, OpCode}; @@ -156,7 +110,7 @@ impl ResponseError for HandshakeError { } /// `WebSocket` Message -#[derive(Debug, PartialEq, Message)] +#[derive(Debug, PartialEq)] pub enum Message { /// Text message Text(String), @@ -170,19 +124,6 @@ pub enum Message { Close(Option), } -/// Do websocket handshake and start actor -pub fn start(req: &HttpRequest, actor: A) -> Result -where - A: Actor> + StreamHandler, - S: 'static, -{ - let mut resp = handshake(req)?; - let stream = WsStream::new(req.payload()); - - let body = WebsocketContext::create(req.clone(), actor, stream); - Ok(resp.body(body)) -} - /// Prepare `WebSocket` handshake response. /// /// This function returns handshake `HttpResponse`, ready to send to peer. @@ -191,9 +132,7 @@ where // /// `protocols` is a sequence of known protocols. On successful handshake, // /// the returned response headers contain the first protocol in this list // /// which the server also knows. -pub fn handshake( - req: &HttpRequest, -) -> Result { +pub fn handshake(req: &Request) -> Result { // WebSocket accepts only GET if *req.method() != Method::GET { return Err(HandshakeError::GetMethodRequired);