mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-09-03 10:13:47 +00:00
webrtc/signalling: don't error out on messages for unknown sessions
An error message is to be considered fatal by its receiver, but sending it to webrtcsink when it sends a message for an unknown session leaves it vulnerable to a race condition where the server has received an endSession message from a consumer, but hasn't notified webrtcsink yet. This commit simply removes the sending of an error message for unknown sessions, it could instead send a new UnknownSession message, or revamp the error message to include a code, but that isn't critically needed as webrtcsink will anyway receive the endSession message eventually. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2215>
This commit is contained in:
parent
655566e9e3
commit
783b75fef8
1 changed files with 15 additions and 84 deletions
|
@ -93,11 +93,14 @@ impl Handler {
|
|||
|
||||
fn handle_peer_message(&mut self, peer_id: &str, peermsg: p::PeerMessage) -> Result<(), Error> {
|
||||
let session_id = &peermsg.session_id;
|
||||
let session = self
|
||||
.sessions
|
||||
.get(session_id)
|
||||
.context(format!("Session {session_id} doesn't exist"))?
|
||||
.clone();
|
||||
|
||||
let Some(session) = self.sessions.get(session_id) else {
|
||||
warn!(
|
||||
peer_id,
|
||||
"Received peer message for unknown session {session_id}"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if matches!(
|
||||
peermsg.peer_message,
|
||||
|
@ -170,10 +173,13 @@ impl Handler {
|
|||
#[instrument(level = "debug", skip(self))]
|
||||
/// End a session between two peers
|
||||
fn end_session(&mut self, peer_id: &str, session_id: &str) -> Result<(), Error> {
|
||||
let session = self
|
||||
.sessions
|
||||
.remove(session_id)
|
||||
.with_context(|| format!("Session {session_id} doesn't exist"))?;
|
||||
let Some(session) = self.sessions.remove(session_id) else {
|
||||
warn!(
|
||||
peer_id,
|
||||
"Received end session message for unknown session {session_id}"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.consumer_sessions
|
||||
.entry(session.consumer.clone())
|
||||
|
@ -792,81 +798,6 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_end_session_twice() {
|
||||
let (mut tx, rx) = mpsc::unbounded();
|
||||
let mut handler = Handler::new(Box::pin(rx));
|
||||
|
||||
new_peer(&mut tx, &mut handler, "producer").await;
|
||||
|
||||
let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus {
|
||||
roles: vec![p::PeerRole::Producer],
|
||||
meta: None,
|
||||
peer_id: None,
|
||||
});
|
||||
tx.send(("producer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
new_peer(&mut tx, &mut handler, "consumer").await;
|
||||
|
||||
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
|
||||
peer_id: "producer".to_string(),
|
||||
offer: None,
|
||||
});
|
||||
tx.send(("consumer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||
assert_eq!(peer_id, "consumer");
|
||||
let session_id = match sent_message {
|
||||
p::OutgoingMessage::SessionStarted {
|
||||
ref peer_id,
|
||||
ref session_id,
|
||||
} => {
|
||||
assert_eq!(peer_id, "producer");
|
||||
session_id.to_string()
|
||||
}
|
||||
_ => panic!("SessionStarted message missing"),
|
||||
};
|
||||
|
||||
let _ = handler.next().await.unwrap();
|
||||
|
||||
// The consumer ends the session
|
||||
let message = p::IncomingMessage::EndSession(p::EndSessionMessage {
|
||||
session_id: session_id.clone(),
|
||||
});
|
||||
tx.send(("consumer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||
|
||||
assert_eq!(peer_id, "producer");
|
||||
assert_eq!(
|
||||
sent_message,
|
||||
p::OutgoingMessage::EndSession(p::EndSessionMessage {
|
||||
session_id: session_id.clone()
|
||||
})
|
||||
);
|
||||
|
||||
let message = p::IncomingMessage::EndSession(p::EndSessionMessage {
|
||||
session_id: session_id.clone(),
|
||||
});
|
||||
tx.send(("consumer".to_string(), Some(message)))
|
||||
.await
|
||||
.unwrap();
|
||||
let (peer_id, sent_message) = handler.next().await.unwrap();
|
||||
|
||||
assert_eq!(peer_id, "consumer");
|
||||
assert_eq!(
|
||||
sent_message,
|
||||
p::OutgoingMessage::Error {
|
||||
details: format!("Session {session_id} doesn't exist")
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sdp_exchange() {
|
||||
let (mut tx, rx) = mpsc::unbounded();
|
||||
|
|
Loading…
Reference in a new issue