From 7a27ccffcbebda03d76f75e10c1e932fb5f70d31 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Tue, 1 Aug 2023 17:17:17 +0200 Subject: [PATCH] Added dedicated test to check web socket close procedure. --- actix-web-actors/tests/test_ws_close.rs | 96 +++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 actix-web-actors/tests/test_ws_close.rs diff --git a/actix-web-actors/tests/test_ws_close.rs b/actix-web-actors/tests/test_ws_close.rs new file mode 100644 index 000000000..2eddff2ef --- /dev/null +++ b/actix-web-actors/tests/test_ws_close.rs @@ -0,0 +1,96 @@ +use actix::prelude::*; +use actix_web::{web, App, HttpRequest}; +use actix_web_actors::ws; +use futures_util::{SinkExt as _, StreamExt as _}; +use tokio::sync::mpsc::Sender; + +struct Ws { + finished: Sender<()>, +} + +impl Actor for Ws { + type Context = ws::WebsocketContext; +} + +impl StreamHandler> for Ws { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Close(reason)) => ctx.close(reason), + _ => ctx.close(Some(ws::CloseCode::Normal.into())), + } + } + + fn finished(&mut self, _ctx: &mut Self::Context) { + _ = self.finished.try_send(()).unwrap(); + } +} + +#[actix_rt::test] +async fn close_initiated_by_client() { + let (tx, mut finished) = tokio::sync::mpsc::channel(1); + let mut srv = actix_test::start(move || { + let tx = tx.clone(); + App::new().service(web::resource("{anything:.*}").to( + move |req: HttpRequest, stream: web::Payload| { + let tx: Sender<()> = tx.clone(); + async move { ws::WsResponseBuilder::new(Ws { finished: tx }, &req, stream).start() } + }, + )) + }); + + let mut framed = srv.ws().await.unwrap(); + + framed + .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + .await + .unwrap(); + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into()))); + + let nothing = actix_rt::time::timeout(std::time::Duration::from_secs(1), framed.next()).await; + assert_eq!(true, nothing.is_ok()); + assert_eq!(true, nothing.unwrap().is_none()); + + let finished = + actix_rt::time::timeout(std::time::Duration::from_secs(1), finished.recv()).await; + assert_eq!(true, finished.is_ok()); + assert_eq!(Some(()), finished.unwrap()); +} + +#[actix_rt::test] +async fn close_initiated_by_server() { + let (tx, mut finished) = tokio::sync::mpsc::channel(1); + let mut srv = actix_test::start(move || { + let tx = tx.clone(); + App::new().service(web::resource("{anything:.*}").to( + move |req: HttpRequest, stream: web::Payload| { + let tx: Sender<()> = tx.clone(); + async move { ws::WsResponseBuilder::new(Ws { finished: tx }, &req, stream).start() } + }, + )) + }); + + let mut framed = srv.ws().await.unwrap(); + + framed + .send(ws::Message::Text("I'll initiate close by server".into())) + .await + .unwrap(); + + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into()))); + + framed + .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + .await + .unwrap(); + + let nothing = actix_rt::time::timeout(std::time::Duration::from_secs(1), framed.next()).await; + assert_eq!(true, nothing.is_ok()); + assert_eq!(true, nothing.unwrap().is_none()); + + let finished = + actix_rt::time::timeout(std::time::Duration::from_secs(1), finished.recv()).await; + assert_eq!(true, finished.is_ok()); + assert_eq!(Some(()), finished.unwrap()); +}