1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-27 01:18:10 +00:00
actix-web/actix-http/tests/test_ws.rs
Rob Ede 6f0a6bd1bb
address clippy lints
For intrepid commit message readers:
The choice to add allows for the inlined format args lint instead of actually
inlining them is not very clear because our actual real world MSRV is not clear.
We currently claim 1.60 is our MSRV but this is mainly due to dependencies. I'm
fairly sure that we could support < 1.58 if those deps are outdated in a users
lockfile. We'll remove these allows again at some point soon.
2023-01-01 20:56:34 +00:00

197 lines
5.4 KiB
Rust

#![allow(clippy::uninlined_format_args)]
use std::{
cell::Cell,
convert::Infallible,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::{
body::{BodySize, BoxBody},
h1,
ws::{self, CloseCode, Frame, Item, Message},
Error, HttpService, Request, Response,
};
use actix_http_test::test_server;
use actix_service::{fn_factory, Service};
use bytes::Bytes;
use derive_more::{Display, Error, From};
use futures_core::future::LocalBoxFuture;
use futures_util::{SinkExt as _, StreamExt as _};
#[derive(Clone)]
struct WsService(Cell<bool>);
impl WsService {
fn new() -> Self {
WsService(Cell::new(false))
}
fn set_polled(&self) {
self.0.set(true);
}
fn was_polled(&self) -> bool {
self.0.get()
}
}
#[derive(Debug, Display, Error, From)]
enum WsServiceError {
#[display(fmt = "http error")]
Http(actix_http::Error),
#[display(fmt = "ws handshake error")]
Ws(actix_http::ws::HandshakeError),
#[display(fmt = "io error")]
Io(std::io::Error),
#[display(fmt = "dispatcher error")]
Dispatcher,
}
impl From<WsServiceError> for Response<BoxBody> {
fn from(err: WsServiceError) -> Self {
match err {
WsServiceError::Http(err) => err.into(),
WsServiceError::Ws(err) => err.into(),
WsServiceError::Io(_err) => unreachable!(),
WsServiceError::Dispatcher => {
Response::internal_server_error().set_body(BoxBody::new(format!("{}", err)))
}
}
}
}
impl<T> Service<(Request, Framed<T, h1::Codec>)> for WsService
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = ();
type Error = WsServiceError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.set_polled();
Poll::Ready(Ok(()))
}
fn call(&self, (req, mut framed): (Request, Framed<T, h1::Codec>)) -> Self::Future {
assert!(self.was_polled());
Box::pin(async move {
let res = ws::handshake(req.head())?.message_body(())?;
framed.send((res, BodySize::None).into()).await?;
let framed = framed.replace_codec(ws::Codec::new());
ws::Dispatcher::with(framed, service)
.await
.map_err(|_| WsServiceError::Dispatcher)?;
Ok(())
})
}
}
async fn service(msg: Frame) -> Result<Message, Error> {
let msg = match msg {
Frame::Ping(msg) => Message::Pong(msg),
Frame::Text(text) => Message::Text(String::from_utf8_lossy(&text).into_owned().into()),
Frame::Binary(bin) => Message::Binary(bin),
Frame::Continuation(item) => Message::Continuation(item),
Frame::Close(reason) => Message::Close(reason),
_ => return Err(ws::ProtocolError::BadOpCode.into()),
};
Ok(msg)
}
#[actix_rt::test]
async fn simple() {
let mut srv = test_server(|| {
HttpService::build()
.upgrade(fn_factory(|| async {
Ok::<_, Infallible>(WsService::new())
}))
.finish(|_| async { Ok::<_, Infallible>(Response::not_found()) })
.tcp()
})
.await;
// client service
let mut framed = srv.ws().await.unwrap();
framed.send(Message::Text("text".into())).await.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, Frame::Text(Bytes::from_static(b"text")));
framed.send(Message::Binary("text".into())).await.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, Frame::Binary(Bytes::from_static(&b"text"[..])));
framed.send(Message::Ping("text".into())).await.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, Frame::Pong("text".to_string().into()));
framed
.send(Message::Continuation(Item::FirstText("text".into())))
.await
.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(
item,
Frame::Continuation(Item::FirstText(Bytes::from_static(b"text")))
);
assert!(framed
.send(Message::Continuation(Item::FirstText("text".into())))
.await
.is_err());
assert!(framed
.send(Message::Continuation(Item::FirstBinary("text".into())))
.await
.is_err());
framed
.send(Message::Continuation(Item::Continue("text".into())))
.await
.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(
item,
Frame::Continuation(Item::Continue(Bytes::from_static(b"text")))
);
framed
.send(Message::Continuation(Item::Last("text".into())))
.await
.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(
item,
Frame::Continuation(Item::Last(Bytes::from_static(b"text")))
);
assert!(framed
.send(Message::Continuation(Item::Continue("text".into())))
.await
.is_err());
assert!(framed
.send(Message::Continuation(Item::Last("text".into())))
.await
.is_err());
framed
.send(Message::Close(Some(CloseCode::Normal.into())))
.await
.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, Frame::Close(Some(CloseCode::Normal.into())));
}