fix websocket heartbeat

This commit is contained in:
Felix Ableitner 2022-12-05 11:47:28 +01:00
parent f1d9853227
commit a70b3d5a0f

View file

@ -148,18 +148,18 @@ pub async fn websocket(
let connection_id = context.chat_server().handle_connect(session.clone())?;
info!("{} joined", &client_ip);
heartbeat(&session);
let alive = Arc::new(Mutex::new(Instant::now()));
heartbeat(session.clone(), alive.clone());
actix_rt::spawn(handle_messages(
stream,
client_ip,
session,
connection_id,
alive,
rate_limiter,
context,
))
.await
.map_err(LemmyError::from)??;
));
Ok(response)
}
@ -169,11 +169,23 @@ async fn handle_messages(
client_ip: IpAddr,
mut session: Session,
connection_id: ConnectionId,
alive: Arc<Mutex<Instant>>,
rate_limiter: web::Data<RateLimitCell>,
context: web::Data<LemmyContext>,
) -> Result<(), LemmyError> {
while let Some(Ok(msg)) = stream.next().await {
match msg {
ws::Message::Ping(bytes) => {
if session.pong(&bytes).await.is_err() {
break;
}
}
ws::Message::Pong(_) => {
let mut lock = alive
.lock()
.expect("Failed to acquire websocket heartbeat alive lock");
*lock = Instant::now();
}
ws::Message::Text(text) => {
let msg = text.trim().to_string();
let executed = parse_json_message(
@ -192,26 +204,23 @@ async fn handle_messages(
});
session.text(res).await?;
}
ws::Message::Binary(_bin) => info!("Unexpected binary"),
ws::Message::Close(_) => {
session.close(None).await?;
context.chat_server().handle_disconnect(&connection_id)?;
break;
}
ws::Message::Binary(_) => info!("Unexpected binary"),
_ => {}
}
}
Ok(())
}
fn heartbeat(session: &Session) {
let alive = Arc::new(Mutex::new(Instant::now()));
let mut session2 = session.clone();
fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
actix_rt::spawn(async move {
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if session2.ping(b"").await.is_err() {
if session.ping(b"").await.is_err() {
break;
}
@ -222,9 +231,10 @@ fn heartbeat(session: &Session) {
Instant::now().duration_since(*alive_lock)
};
if duration_since > Duration::from_secs(10) {
let _ = session2.close(None).await;
let _ = session.close(None).await;
break;
}
interval.tick().await;
}
});
}