1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-01 12:58:46 +00:00
actix-web/tests/test_ws.rs

395 lines
11 KiB
Rust
Raw Normal View History

2018-01-30 23:13:33 +00:00
extern crate actix;
extern crate actix_web;
2018-04-13 23:02:01 +00:00
extern crate bytes;
2018-01-30 23:13:33 +00:00
extern crate futures;
extern crate http;
2018-03-19 16:30:58 +00:00
extern crate rand;
2018-01-30 23:13:33 +00:00
2018-10-02 03:15:26 +00:00
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
2018-10-02 17:43:23 +00:00
use std::{thread, time};
2018-10-02 03:15:26 +00:00
2018-01-30 23:13:33 +00:00
use bytes::Bytes;
use futures::Stream;
2018-05-29 17:31:37 +00:00
use rand::distributions::Alphanumeric;
2018-03-19 16:30:58 +00:00
use rand::Rng;
2018-01-30 23:13:33 +00:00
2018-10-02 18:18:59 +00:00
#[cfg(feature = "ssl")]
2018-03-20 18:23:35 +00:00
extern crate openssl;
2018-07-29 06:43:04 +00:00
#[cfg(feature = "rust-tls")]
extern crate rustls;
2018-03-20 18:23:35 +00:00
2018-01-30 23:13:33 +00:00
use actix::prelude::*;
2018-04-13 23:02:01 +00:00
use actix_web::*;
2018-01-30 23:13:33 +00:00
struct Ws;
impl Actor for Ws {
type Context = ws::WebsocketContext<Self>;
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
2018-06-21 07:34:36 +00:00
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
2018-01-30 23:13:33 +00:00
match msg {
2018-06-21 07:34:36 +00:00
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
2018-01-30 23:13:33 +00:00
}
}
}
#[test]
fn test_simple() {
2018-04-13 23:02:01 +00:00
let mut srv = test::TestServer::new(|app| app.handler(|req| ws::start(req, Ws)));
2018-01-30 23:13:33 +00:00
let (reader, mut writer) = srv.ws().unwrap();
writer.text("text");
let (item, reader) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Text("text".to_owned())));
writer.binary(b"text".as_ref());
let (item, reader) = srv.execute(reader.into_future()).unwrap();
2018-04-29 16:09:08 +00:00
assert_eq!(
item,
2018-05-17 19:20:20 +00:00
Some(ws::Message::Binary(Bytes::from_static(b"text").into()))
2018-04-29 16:09:08 +00:00
);
2018-01-30 23:13:33 +00:00
writer.ping("ping");
2018-01-31 00:04:04 +00:00
let (item, reader) = srv.execute(reader.into_future()).unwrap();
2018-01-30 23:13:33 +00:00
assert_eq!(item, Some(ws::Message::Pong("ping".to_owned())));
2018-01-31 00:04:04 +00:00
writer.close(Some(ws::CloseCode::Normal.into()));
2018-01-31 00:04:04 +00:00
let (item, _) = srv.execute(reader.into_future()).unwrap();
2018-04-29 16:09:08 +00:00
assert_eq!(
item,
Some(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
);
2018-01-30 23:13:33 +00:00
}
2018-03-19 16:30:58 +00:00
// websocket resource helper function
fn start_ws_resource(req: &HttpRequest) -> Result<HttpResponse, Error> {
ws::start(req, Ws)
}
#[test]
fn test_simple_path() {
2018-08-23 16:48:01 +00:00
const PATH: &str = "/v1/ws/";
// Create a websocket at a specific path.
let mut srv = test::TestServer::new(|app| {
app.resource(PATH, |r| r.route().f(start_ws_resource));
});
// fetch the sockets for the resource at a given path.
let (reader, mut writer) = srv.ws_at(PATH).unwrap();
writer.text("text");
let (item, reader) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Text("text".to_owned())));
writer.binary(b"text".as_ref());
let (item, reader) = srv.execute(reader.into_future()).unwrap();
assert_eq!(
item,
Some(ws::Message::Binary(Bytes::from_static(b"text").into()))
);
writer.ping("ping");
let (item, reader) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Pong("ping".to_owned())));
writer.close(Some(ws::CloseCode::Normal.into()));
let (item, _) = srv.execute(reader.into_future()).unwrap();
assert_eq!(
item,
Some(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
);
}
2018-04-21 01:55:07 +00:00
#[test]
fn test_empty_close_code() {
let mut srv = test::TestServer::new(|app| app.handler(|req| ws::start(req, Ws)));
let (reader, mut writer) = srv.ws().unwrap();
writer.close(None);
2018-04-21 01:55:07 +00:00
let (item, _) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Close(None)));
2018-04-21 01:55:07 +00:00
}
2018-04-21 21:20:23 +00:00
#[test]
fn test_close_description() {
let mut srv = test::TestServer::new(|app| app.handler(|req| ws::start(req, Ws)));
let (reader, mut writer) = srv.ws().unwrap();
2018-04-29 05:55:47 +00:00
let close_reason: ws::CloseReason =
(ws::CloseCode::Normal, "close description").into();
2018-04-21 21:20:23 +00:00
writer.close(Some(close_reason.clone()));
let (item, _) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Close(Some(close_reason))));
}
2018-03-19 16:30:58 +00:00
#[test]
fn test_large_text() {
2018-04-29 16:09:08 +00:00
let data = rand::thread_rng()
2018-05-29 17:31:37 +00:00
.sample_iter(&Alphanumeric)
2018-04-29 16:09:08 +00:00
.take(65_536)
.collect::<String>();
2018-03-19 16:30:58 +00:00
2018-04-13 23:02:01 +00:00
let mut srv = test::TestServer::new(|app| app.handler(|req| ws::start(req, Ws)));
2018-03-19 16:30:58 +00:00
let (mut reader, mut writer) = srv.ws().unwrap();
for _ in 0..100 {
writer.text(data.clone());
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, Some(ws::Message::Text(data.clone())));
}
}
#[test]
fn test_large_bin() {
2018-04-29 16:09:08 +00:00
let data = rand::thread_rng()
2018-05-29 17:31:37 +00:00
.sample_iter(&Alphanumeric)
2018-04-29 16:09:08 +00:00
.take(65_536)
.collect::<String>();
2018-03-19 16:30:58 +00:00
2018-04-13 23:02:01 +00:00
let mut srv = test::TestServer::new(|app| app.handler(|req| ws::start(req, Ws)));
2018-03-19 16:30:58 +00:00
let (mut reader, mut writer) = srv.ws().unwrap();
for _ in 0..100 {
writer.binary(data.clone());
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
2018-05-17 19:20:20 +00:00
assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone()))));
2018-03-19 16:30:58 +00:00
}
}
2018-03-19 20:12:36 +00:00
2018-07-05 06:50:54 +00:00
#[test]
fn test_client_frame_size() {
let data = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(131_072)
.collect::<String>();
let mut srv = test::TestServer::new(|app| {
app.handler(|req| -> Result<HttpResponse> {
let mut resp = ws::handshake(req)?;
let stream = ws::WsStream::new(req.payload()).max_size(131_072);
let body = ws::WebsocketContext::create(req.clone(), Ws, stream);
Ok(resp.body(body))
})
});
let (reader, mut writer) = srv.ws().unwrap();
writer.binary(data.clone());
match srv.execute(reader.into_future()).err().unwrap().0 {
ws::ProtocolError::Overflow => (),
_ => panic!(),
}
}
2018-03-19 20:12:36 +00:00
struct Ws2 {
count: usize,
bin: bool,
2018-03-19 20:12:36 +00:00
}
impl Actor for Ws2 {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.send(ctx);
}
}
impl Ws2 {
fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
if self.bin {
ctx.binary(Vec::from("0".repeat(65_536)));
} else {
ctx.text("0".repeat(65_536));
}
2018-04-13 23:02:01 +00:00
ctx.drain()
.and_then(|_, act, ctx| {
act.count += 1;
2018-08-08 16:12:32 +00:00
if act.count != 10_000 {
2018-04-13 23:02:01 +00:00
act.send(ctx);
}
actix::fut::ok(())
2018-08-23 16:48:01 +00:00
}).wait(ctx);
2018-03-19 20:12:36 +00:00
}
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
2018-06-21 07:34:36 +00:00
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
2018-03-19 20:12:36 +00:00
match msg {
2018-06-21 07:34:36 +00:00
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
2018-03-19 20:12:36 +00:00
}
}
}
#[test]
fn test_server_send_text() {
let data = Some(ws::Message::Text("0".repeat(65_536)));
2018-04-13 23:02:01 +00:00
let mut srv = test::TestServer::new(|app| {
app.handler(|req| {
ws::start(
req,
Ws2 {
count: 0,
bin: false,
},
)
})
});
2018-03-19 20:12:36 +00:00
let (mut reader, _writer) = srv.ws().unwrap();
2018-08-08 16:12:32 +00:00
for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
#[test]
fn test_server_send_bin() {
let data = Some(ws::Message::Binary(Binary::from("0".repeat(65_536))));
2018-04-13 23:02:01 +00:00
let mut srv = test::TestServer::new(|app| {
app.handler(|req| {
ws::start(
req,
Ws2 {
count: 0,
bin: true,
},
)
})
});
let (mut reader, _writer) = srv.ws().unwrap();
2018-08-08 16:12:32 +00:00
for _ in 0..10_000 {
2018-03-19 20:12:36 +00:00
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
2018-03-20 18:23:35 +00:00
#[test]
2018-10-02 18:18:59 +00:00
#[cfg(feature = "ssl")]
2018-03-20 18:23:35 +00:00
fn test_ws_server_ssl() {
2018-04-13 23:02:01 +00:00
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
2018-03-20 18:23:35 +00:00
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
2018-04-29 16:09:08 +00:00
builder
.set_private_key_file("tests/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("tests/cert.pem")
.unwrap();
2018-05-29 17:59:24 +00:00
let mut srv = test::TestServer::build().ssl(builder).start(|app| {
2018-05-17 19:20:20 +00:00
app.handler(|req| {
ws::start(
req,
Ws2 {
count: 0,
bin: false,
},
)
})
});
2018-03-20 18:23:35 +00:00
let (mut reader, _writer) = srv.ws().unwrap();
let data = Some(ws::Message::Text("0".repeat(65_536)));
2018-08-08 16:12:32 +00:00
for _ in 0..10_000 {
2018-03-20 18:23:35 +00:00
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
2018-07-29 06:43:04 +00:00
#[test]
#[cfg(feature = "rust-tls")]
fn test_ws_server_rust_tls() {
2018-07-29 06:43:04 +00:00
use rustls::internal::pemfile::{certs, rsa_private_keys};
use rustls::{NoClientAuth, ServerConfig};
2018-07-29 06:43:04 +00:00
use std::fs::File;
use std::io::BufReader;
2018-07-29 06:43:04 +00:00
// load ssl keys
let mut config = ServerConfig::new(NoClientAuth::new());
let cert_file = &mut BufReader::new(File::open("tests/cert.pem").unwrap());
let key_file = &mut BufReader::new(File::open("tests/key.pem").unwrap());
let cert_chain = certs(cert_file).unwrap();
let mut keys = rsa_private_keys(key_file).unwrap();
config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
let mut srv = test::TestServer::build().rustls(config).start(|app| {
2018-07-29 06:43:04 +00:00
app.handler(|req| {
ws::start(
req,
Ws2 {
count: 0,
bin: false,
},
)
})
});
let (mut reader, _writer) = srv.ws().unwrap();
let data = Some(ws::Message::Text("0".repeat(65_536)));
2018-08-08 16:12:32 +00:00
for _ in 0..10_000 {
2018-07-29 06:43:04 +00:00
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
2018-10-02 03:15:26 +00:00
struct WsStopped(Arc<AtomicUsize>);
impl Actor for WsStopped {
type Context = ws::WebsocketContext<Self>;
2018-10-02 04:30:00 +00:00
fn stopped(&mut self, _: &mut Self::Context) {
2018-10-02 03:15:26 +00:00
self.0.fetch_add(1, Ordering::Relaxed);
}
}
impl StreamHandler<ws::Message, ws::ProtocolError> for WsStopped {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Text(text) => ctx.text(text),
_ => (),
}
}
}
#[test]
fn test_ws_stopped() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
2018-10-02 17:43:23 +00:00
let mut srv = test::TestServer::new(move |app| {
2018-10-02 03:15:26 +00:00
let num3 = num2.clone();
2018-10-02 17:43:23 +00:00
app.handler(move |req| ws::start(req, WsStopped(num3.clone())))
});
{
2018-10-02 03:15:26 +00:00
let (reader, mut writer) = srv.ws().unwrap();
writer.text("text");
2018-10-02 04:30:00 +00:00
let (item, _) = srv.execute(reader.into_future()).unwrap();
2018-10-02 03:15:26 +00:00
assert_eq!(item, Some(ws::Message::Text("text".to_owned())));
2018-10-02 17:43:23 +00:00
}
thread::sleep(time::Duration::from_millis(1000));
2018-10-02 03:15:26 +00:00
assert_eq!(num.load(Ordering::Relaxed), 1);
}