From 783b75fef8afcd2a061ec32baeb6179458f4d401 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 24 Apr 2025 18:38:23 +0200 Subject: [PATCH] webrtc/signalling: don't error out on messages for unknown sessions An error message is to be considered fatal by its receiver, but sending it to webrtcsink when it sends a message for an unknown session leaves it vulnerable to a race condition where the server has received an endSession message from a consumer, but hasn't notified webrtcsink yet. This commit simply removes the sending of an error message for unknown sessions, it could instead send a new UnknownSession message, or revamp the error message to include a code, but that isn't critically needed as webrtcsink will anyway receive the endSession message eventually. Part-of: --- net/webrtc/signalling/src/handlers/mod.rs | 99 ++++------------------- 1 file changed, 15 insertions(+), 84 deletions(-) diff --git a/net/webrtc/signalling/src/handlers/mod.rs b/net/webrtc/signalling/src/handlers/mod.rs index 7e3f211b3..f27db2a50 100644 --- a/net/webrtc/signalling/src/handlers/mod.rs +++ b/net/webrtc/signalling/src/handlers/mod.rs @@ -93,11 +93,14 @@ impl Handler { fn handle_peer_message(&mut self, peer_id: &str, peermsg: p::PeerMessage) -> Result<(), Error> { let session_id = &peermsg.session_id; - let session = self - .sessions - .get(session_id) - .context(format!("Session {session_id} doesn't exist"))? - .clone(); + + let Some(session) = self.sessions.get(session_id) else { + warn!( + peer_id, + "Received peer message for unknown session {session_id}" + ); + return Ok(()); + }; if matches!( peermsg.peer_message, @@ -170,10 +173,13 @@ impl Handler { #[instrument(level = "debug", skip(self))] /// End a session between two peers fn end_session(&mut self, peer_id: &str, session_id: &str) -> Result<(), Error> { - let session = self - .sessions - .remove(session_id) - .with_context(|| format!("Session {session_id} doesn't exist"))?; + let Some(session) = self.sessions.remove(session_id) else { + warn!( + peer_id, + "Received end session message for unknown session {session_id}" + ); + return Ok(()); + }; self.consumer_sessions .entry(session.consumer.clone()) @@ -792,81 +798,6 @@ mod tests { ); } - #[tokio::test] - async fn test_end_session_twice() { - let (mut tx, rx) = mpsc::unbounded(); - let mut handler = Handler::new(Box::pin(rx)); - - new_peer(&mut tx, &mut handler, "producer").await; - - let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { - roles: vec![p::PeerRole::Producer], - meta: None, - peer_id: None, - }); - tx.send(("producer".to_string(), Some(message))) - .await - .unwrap(); - - new_peer(&mut tx, &mut handler, "consumer").await; - - let message = p::IncomingMessage::StartSession(p::StartSessionMessage { - peer_id: "producer".to_string(), - offer: None, - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let (peer_id, sent_message) = handler.next().await.unwrap(); - assert_eq!(peer_id, "consumer"); - let session_id = match sent_message { - p::OutgoingMessage::SessionStarted { - ref peer_id, - ref session_id, - } => { - assert_eq!(peer_id, "producer"); - session_id.to_string() - } - _ => panic!("SessionStarted message missing"), - }; - - let _ = handler.next().await.unwrap(); - - // The consumer ends the session - let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - session_id: session_id.clone(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - - let (peer_id, sent_message) = handler.next().await.unwrap(); - - assert_eq!(peer_id, "producer"); - assert_eq!( - sent_message, - p::OutgoingMessage::EndSession(p::EndSessionMessage { - session_id: session_id.clone() - }) - ); - - let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - session_id: session_id.clone(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let (peer_id, sent_message) = handler.next().await.unwrap(); - - assert_eq!(peer_id, "consumer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Error { - details: format!("Session {session_id} doesn't exist") - } - ); - } - #[tokio::test] async fn test_sdp_exchange() { let (mut tx, rx) = mpsc::unbounded();