protocol: Rework the way peers set their status

- Remove registration messages
- Add a setPeerStatus method which lets other peers know about their roles
This commit is contained in:
Thibault Saunier 2022-08-18 18:15:09 -04:00 committed by Mathieu Duponchelle
parent 7e59bb519e
commit a4f036499e
5 changed files with 337 additions and 595 deletions

View file

@ -105,10 +105,11 @@ impl Signaller {
} else { } else {
None None
}; };
websocket_sender websocket_sender
.send(p::IncomingMessage::Register(p::RegisterMessage::Producer { .send(p::IncomingMessage::SetPeerStatus(p::PeerStatus {
roles: vec![p::PeerRole::Producer],
meta, meta,
peer_id: None,
})) }))
.await?; .await?;
@ -122,10 +123,7 @@ impl Signaller {
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) { if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg { match msg {
p::OutgoingMessage::Welcome { .. } => (), p::OutgoingMessage::Welcome { peer_id } => {
p::OutgoingMessage::Registered(
p::RegisteredMessage::Producer { peer_id, .. },
) => {
gst::info!( gst::info!(
CAT, CAT,
obj: &element, obj: &element,
@ -133,7 +131,6 @@ impl Signaller {
peer_id peer_id
); );
} }
p::OutgoingMessage::Registered(_) => unreachable!(),
p::OutgoingMessage::StartSession { p::OutgoingMessage::StartSession {
session_id, session_id,
peer_id, peer_id,

View file

@ -1,62 +1,6 @@
/// The default protocol used by the signalling server /// The default protocol used by the signalling server
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")]
/// Confirms registration
pub enum RegisteredMessage {
/// Registered as a producer
#[serde(rename_all = "camelCase")]
Producer {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Registered as a consumer
#[serde(rename_all = "camelCase")]
Consumer {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Registered as a listener
#[serde(rename_all = "camelCase")]
Listener {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")]
/// Confirms registration
pub enum UnregisteredMessage {
/// Unregistered as a producer
#[serde(rename_all = "camelCase")]
Producer {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Unregistered as a consumer
#[serde(rename_all = "camelCase")]
Consumer {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Unregistered as a listener
#[serde(rename_all = "camelCase")]
Listener {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Peer { pub struct Peer {
@ -72,32 +16,14 @@ pub struct Peer {
pub enum OutgoingMessage { pub enum OutgoingMessage {
/// Welcoming message, sets the Peer ID linked to a new connection /// Welcoming message, sets the Peer ID linked to a new connection
Welcome { peer_id: String }, Welcome { peer_id: String },
/// Confirms registration /// Notifies listeners that a peer status has changed
Registered(RegisteredMessage), PeerStatusChanged(PeerStatus),
/// Confirms registration
Unregistered(UnregisteredMessage),
/// Notifies listeners that a producer was registered
#[serde(rename_all = "camelCase")]
ProducerAdded {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Notifies listeners that a producer was removed
#[serde(rename_all = "camelCase")]
ProducerRemoved {
peer_id: String,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Instructs a peer to generate an offer and inform about the session ID /// Instructs a peer to generate an offer and inform about the session ID
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
StartSession { peer_id: String, session_id: String }, StartSession { peer_id: String, session_id: String },
/// Let consumer know that the requested session is starting with the specified identifier /// Let consumer know that the requested session is starting with the specified identifier
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
SessionStarted { peer_id: String, session_id: String }, SessionStarted { peer_id: String, session_id: String },
/// Signals that the session the peer was in was ended /// Signals that the session the peer was in was ended
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
EndSession(EndSessionMessage), EndSession(EndSessionMessage),
@ -109,45 +35,36 @@ pub enum OutgoingMessage {
Error { details: String }, Error { details: String },
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// Register with a peer type /// Register with a peer type
pub enum RegisterMessage { pub enum PeerRole {
/// Register as a producer /// Register as a producer
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Producer { Producer,
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Register as a consumer
#[serde(rename_all = "camelCase")]
Consumer {
#[serde(default)]
meta: Option<serde_json::Value>,
},
/// Register as a listener /// Register as a listener
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Listener { Listener,
#[serde(default)]
meta: Option<serde_json::Value>,
},
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// Register with a peer type pub struct PeerStatus {
pub enum UnregisterMessage { pub roles: Vec<PeerRole>,
/// Unregister a producer pub meta: Option<serde_json::Value>,
#[serde(rename_all = "camelCase")] #[serde(skip_serializing_if = "Option::is_none")]
Producer, #[serde(default)]
/// Unregister a consumer pub peer_id: Option<String>,
#[serde(rename_all = "camelCase")] }
Consumer,
/// Unregister a listener impl PeerStatus {
#[serde(rename_all = "camelCase")] pub fn producing(&self) -> bool {
Listener, self.roles.iter().any(|t| matches!(t, PeerRole::Producer))
}
pub fn listening(&self) -> bool {
self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
}
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -212,12 +129,10 @@ pub struct EndSessionMessage {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
/// Messages received by the server from peers /// Messages received by the server from peers
pub enum IncomingMessage { pub enum IncomingMessage {
/// Register as a peer type /// Internal message to let know about new peers
NewPeer, NewPeer,
/// Register as a peer type /// Set current peer status
Register(RegisterMessage), SetPeerStatus(PeerStatus),
/// Unregister as a peer type
Unregister(UnregisterMessage),
/// Start a session with a producer peer /// Start a session with a producer peer
StartSession(StartSessionMessage), StartSession(StartSessionMessage),
/// End an existing session /// End an existing session

File diff suppressed because it is too large Load diff

View file

@ -172,7 +172,6 @@ impl Server {
warn!(this = %this_id_clone, "Error handling message: {:?}", err); warn!(this = %this_id_clone, "Error handling message: {:?}", err);
} }
} }
while let Some(msg) = ws_stream.next().await { while let Some(msg) = ws_stream.next().await {
info!("Received message {msg:?}"); info!("Received message {msg:?}");
match msg { match msg {

View file

@ -385,16 +385,15 @@ function onServerMessage(event) {
msg = JSON.parse(event.data); msg = JSON.parse(event.data);
} catch (e) { } catch (e) {
if (e instanceof SyntaxError) { if (e instanceof SyntaxError) {
this.handleIncomingError("Error parsing incoming JSON: " + event.data); console.error("Error parsing incoming JSON: " + event.data);
} else { } else {
this.handleIncomingError("Unknown error parsing response: " + event.data); console.error("Unknown error parsing response: " + event.data);
} }
return; return;
} }
if (msg.type == "welcome") { if (msg.type == "welcome") {
console.info(`Got welcomed with ID ${msg.peer_id}`); console.info(`Got welcomed with ID ${msg.peer_id}`);
} else if (msg.type == "registered") {
ws_conn.send(JSON.stringify({ ws_conn.send(JSON.stringify({
"type": "list" "type": "list"
})); }));
@ -403,13 +402,18 @@ function onServerMessage(event) {
for (i = 0; i < msg.producers.length; i++) { for (i = 0; i < msg.producers.length; i++) {
addPeer(msg.producers[i].id, msg.producers[i].meta); addPeer(msg.producers[i].id, msg.producers[i].meta);
} }
} else if (msg.type == "producerAdded") { } else if (msg.type == "peerStatusChanged") {
addPeer(msg.peerId, msg.meta);
} else if (msg.type == "producerRemoved") {
var li = document.getElementById("peer-" + msg.peerId); var li = document.getElementById("peer-" + msg.peerId);
if (msg.roles.includes("producer")) {
if (li == null) {
console.error('Adding peer');
addPeer(msg.peerId, msg.meta);
}
} else if (li != null) {
li.parentNode.removeChild(li); li.parentNode.removeChild(li);
}
} else { } else {
this.handleIncomingError("Unsupported message: ", msg); console.error("Unsupported message: ", msg);
} }
}; };
@ -448,8 +452,8 @@ function connect() {
ws_conn = new WebSocket(ws_url); ws_conn = new WebSocket(ws_url);
ws_conn.addEventListener('open', (event) => { ws_conn.addEventListener('open', (event) => {
ws_conn.send(JSON.stringify({ ws_conn.send(JSON.stringify({
"type": "register", "type": "setPeerStatus",
"peerType": "listener" "roles": ["listener"]
})); }));
}); });
ws_conn.addEventListener('error', onServerError); ws_conn.addEventListener('error', onServerError);