webrtc: Make older peers less likely to crash when webrtcsrc is used

Commit abf97ba2 introduced an API break by adding a new Consumer peer
role and adding a new field to the existing List message.
This caused older clients to crash as soon as a 'new' webrtcsrc
registered due to their inability to parse the new role.

This fix consists of two parts:
- makes webrtcsrc only register as a Consumer when it's actually waiting
for a producer session request, instead of doing that in all cases
- Reverts the List message to the old version, and instead adds a
ListConsumers message for the purpose of retrieving the list of
'awaiting' consumers

This is still technically an API break, and older clients will *still*
crash if a webrtcsrc registers as a Consumer. However, if ran the 'old'
way - either with a peer ID set or with connect-to-first-producer=true -
it won't do that anymore.

It's a band-aid fix to not break too many things while GStreamer 1.26
still follows the main branch here. After gst-plugins-rs 0.14 branches
away we'll be able to do a proper API break, aimed at making similar
changes possible in the future without causing old clients to crash.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2259>
This commit is contained in:
Piotr Brzeziński 2025-05-26 15:37:39 +02:00 committed by GStreamer Marge Bot
parent 3c830e7ea8
commit 1e243add0c
5 changed files with 75 additions and 38 deletions

View file

@ -666,7 +666,7 @@ async fn listen(
.unwrap_or_else(|| Err(anyhow!("Signaller ended session")))
.context("List response")?
{
FromSignaller::List { producers, .. } => {
FromSignaller::List { producers } => {
for peer in producers {
spawn_consumer(&signaller_url, &pipeline, args.clone(), peer.id, peer.meta)
.context("Spawning consumer")?;

View file

@ -16,6 +16,7 @@ const SignallingServerMessageType = Object.freeze({
welcome: "welcome",
peerStatusChanged: "peerStatusChanged",
list: "list",
listConsumers: "listConsumers",
sessionStarted: "sessionStarted",
peer: "peer",
startSession: "startSession",
@ -119,6 +120,7 @@ class ComChannel extends EventTarget {
this._ready = true;
this.dispatchEvent(new Event("ready"));
this.send({ type: "list" });
this.send({ type: "listConsumers"});
}
if (this._producerSession && msg.roles.includes("producer")) {
@ -146,20 +148,14 @@ class ComChannel extends EventTarget {
}
case SignallingServerMessageType.list: {
const addPeers = (items, role) => {
items.forEach(item => {
const peer = normalizePeer(item, this._channelId);
if (peer) {
this._peers[peer.id] = [role];
this.dispatchEvent(new CustomEvent("peerAdded", { detail: { peer, role } }));
}
});
};
this._peers = {};
addPeers(msg.producers, "producer");
addPeers(msg.consumers, "consumer");
this.clearPeers("producer");
this.addPeers(msg.producers, "producer");
break;
}
case SignallingServerMessageType.listConsumers: {
this.clearPeers("consumer");
this.addPeers(msg.consumers, "consumer");
break;
}
@ -354,6 +350,25 @@ class ComChannel extends EventTarget {
}
}
}
clearPeers(role) {
for (const peerId in this._peers) {
if (this._peers[peerId].includes(role)) {
delete this._peers[peerId];
this.dispatchEvent(new CustomEvent("peerRemoved", { detail: { peerId, role } }));
}
}
}
addPeers(items, role) {
items.forEach(item => {
const peer = normalizePeer(item, this._channelId);
if (peer) {
this._peers[peer.id] = [role];
this.dispatchEvent(new CustomEvent("peerAdded", { detail: { peer, role } }));
}
});
};
}
export default ComChannel;

View file

@ -35,12 +35,12 @@ pub enum OutgoingMessage {
EndSession(EndSessionMessage),
/// Messages directly forwarded from one peer to another
Peer(PeerMessage),
/// Provides the current list of producers and passive consumers (awaiting session initiation)
/// Provides the current list of producers
#[serde(rename_all = "camelCase")]
List {
producers: Vec<Peer>,
consumers: Vec<Peer>,
},
List { producers: Vec<Peer> },
/// Provides the current list of consumers (awaiting a session request)
#[serde(rename_all = "camelCase")]
ListConsumers { consumers: Vec<Peer> },
/// Notifies that an error occurred with the peer's current session
#[serde(rename_all = "camelCase")]
Error { details: String },
@ -160,4 +160,6 @@ pub enum IncomingMessage {
Peer(PeerMessage),
/// Retrieve the current list of producers
List,
/// Retrieve the current list of consumers
ListConsumers,
}

View file

@ -86,7 +86,8 @@ impl Handler {
self.start_session(peer_id, &message.peer_id, message.offer.as_deref())
}
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
p::IncomingMessage::List => self.list_producers_and_consumers(peer_id),
p::IncomingMessage::List => self.list_producers(peer_id),
p::IncomingMessage::ListConsumers => self.list_consumers(peer_id),
p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id),
}
}
@ -203,26 +204,41 @@ impl Handler {
Ok(())
}
/// List producer and consumer peers
#[instrument(level = "debug", skip(self))]
fn list_producers_and_consumers(&mut self, peer_id: &str) -> Result<(), Error> {
let filter_peers = |f: fn(&PeerStatus) -> bool| {
self.peers
.iter()
.filter_map(|(peer_id, peer)| {
f(peer).then_some(p::Peer {
id: peer_id.clone(),
meta: peer.meta.clone(),
})
fn filter_peers<F>(&self, filter: F) -> Vec<p::Peer>
where
F: Fn(&PeerStatus) -> bool,
{
self.peers
.iter()
.filter_map(move |(peer_id, peer)| {
filter(peer).then_some(p::Peer {
id: peer_id.clone(),
meta: peer.meta.clone(),
})
.collect()
};
})
.collect()
}
/// List producer peers
#[instrument(level = "debug", skip(self))]
fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> {
self.items.push_back((
peer_id.to_string(),
p::OutgoingMessage::List {
producers: filter_peers(PeerStatus::producing),
consumers: filter_peers(PeerStatus::consuming),
producers: self.filter_peers(PeerStatus::producing),
},
));
Ok(())
}
/// List consumer peers
#[instrument(level = "debug", skip(self))]
fn list_consumers(&mut self, peer_id: &str) -> Result<(), Error> {
self.items.push_back((
peer_id.to_string(),
p::OutgoingMessage::ListConsumers {
consumers: self.filter_peers(PeerStatus::consuming),
},
));
@ -463,7 +479,6 @@ mod tests {
{"display-name": "foobar".to_string()
})),
}],
consumers: vec!()
}
);
}

View file

@ -276,7 +276,11 @@ impl Signaller {
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Consumer],
roles: if self.connect_to_first_producer() || self.producer_peer_id().is_some() {
vec![]
} else {
vec![p::PeerRole::Consumer]
},
},
super::WebRTCSignallerRole::Producer => p::PeerStatus {
meta: meta.clone(),
@ -513,7 +517,7 @@ impl Signaller {
);
}
},
p::OutgoingMessage::List { producers, .. } => {
p::OutgoingMessage::List { producers } => {
let mut settings = self.settings.lock().unwrap();
let role = settings.role;
if matches!(role, super::WebRTCSignallerRole::Consumer)
@ -557,6 +561,7 @@ impl Signaller {
}
}
}
p::OutgoingMessage::ListConsumers { .. } => todo!(),
p::OutgoingMessage::Error { details } => {
self.obj().emit_by_name::<()>(
"error",