mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-06-07 16:08:55 +00:00
protocol: Add a welcoming message, removing registering as consumer requirement
Since clients know from the start their Peer ID it is fine to go ahead and connect as a consumer without prior registration.
This commit is contained in:
parent
dbcbfef8c7
commit
7e59bb519e
6 changed files with 39 additions and 13 deletions
|
@ -122,6 +122,7 @@ impl Signaller {
|
||||||
|
|
||||||
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
|
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
|
||||||
match msg {
|
match msg {
|
||||||
|
p::OutgoingMessage::Welcome { .. } => (),
|
||||||
p::OutgoingMessage::Registered(
|
p::OutgoingMessage::Registered(
|
||||||
p::RegisteredMessage::Producer { peer_id, .. },
|
p::RegisteredMessage::Producer { peer_id, .. },
|
||||||
) => {
|
) => {
|
||||||
|
|
|
@ -665,6 +665,7 @@ impl State {
|
||||||
session: &mut Session,
|
session: &mut Session,
|
||||||
signal: bool,
|
signal: bool,
|
||||||
) {
|
) {
|
||||||
|
gst::info!(CAT, "Ending session {}", session.id);
|
||||||
session.pipeline.debug_to_dot_file_with_ts(
|
session.pipeline.debug_to_dot_file_with_ts(
|
||||||
gst::DebugGraphDetails::all(),
|
gst::DebugGraphDetails::all(),
|
||||||
format!("removing-peer-{}-", session.peer_id,),
|
format!("removing-peer-{}-", session.peer_id,),
|
||||||
|
|
|
@ -70,6 +70,8 @@ pub struct Peer {
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
/// Messages sent from the server to peers
|
/// Messages sent from the server to peers
|
||||||
pub enum OutgoingMessage {
|
pub enum OutgoingMessage {
|
||||||
|
/// Welcoming message, sets the Peer ID linked to a new connection
|
||||||
|
Welcome { peer_id: String },
|
||||||
/// Confirms registration
|
/// Confirms registration
|
||||||
Registered(RegisteredMessage),
|
Registered(RegisteredMessage),
|
||||||
/// Confirms registration
|
/// Confirms registration
|
||||||
|
@ -210,6 +212,8 @@ pub struct EndSessionMessage {
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
/// Messages received by the server from peers
|
/// Messages received by the server from peers
|
||||||
pub enum IncomingMessage {
|
pub enum IncomingMessage {
|
||||||
|
/// Register as a peer type
|
||||||
|
NewPeer,
|
||||||
/// Register as a peer type
|
/// Register as a peer type
|
||||||
Register(RegisterMessage),
|
Register(RegisterMessage),
|
||||||
/// Unregister as a peer type
|
/// Unregister as a peer type
|
||||||
|
|
|
@ -67,6 +67,16 @@ impl Handler {
|
||||||
msg: p::IncomingMessage,
|
msg: p::IncomingMessage,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match msg {
|
match msg {
|
||||||
|
p::IncomingMessage::NewPeer => {
|
||||||
|
self.items.push_back((
|
||||||
|
peer_id.into(),
|
||||||
|
p::OutgoingMessage::Welcome {
|
||||||
|
peer_id: peer_id.to_string(),
|
||||||
|
},
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
p::IncomingMessage::Register(message) => match message {
|
p::IncomingMessage::Register(message) => match message {
|
||||||
p::RegisterMessage::Producer { meta } => self.register_producer(peer_id, meta),
|
p::RegisterMessage::Producer { meta } => self.register_producer(peer_id, meta),
|
||||||
p::RegisterMessage::Consumer { meta } => self.register_consumer(peer_id, meta),
|
p::RegisterMessage::Consumer { meta } => self.register_consumer(peer_id, meta),
|
||||||
|
@ -357,13 +367,6 @@ impl Handler {
|
||||||
/// Start a session between two peers
|
/// Start a session between two peers
|
||||||
#[instrument(level = "debug", skip(self))]
|
#[instrument(level = "debug", skip(self))]
|
||||||
fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> {
|
fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> {
|
||||||
if !self.consumers.contains_key(consumer_id) {
|
|
||||||
return Err(anyhow!(
|
|
||||||
"Peer with id {} is not registered as a consumer",
|
|
||||||
consumer_id
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.producers.contains_key(producer_id) {
|
if !self.producers.contains_key(producer_id) {
|
||||||
return Err(anyhow!(
|
return Err(anyhow!(
|
||||||
"Peer with id {} is not registered as a producer",
|
"Peer with id {} is not registered as a producer",
|
||||||
|
|
|
@ -156,7 +156,25 @@ impl Server {
|
||||||
let this_id_clone = this_id.clone();
|
let this_id_clone = this_id.clone();
|
||||||
let state_clone = self.state.clone();
|
let state_clone = self.state.clone();
|
||||||
let receive_task_handle = task::spawn(async move {
|
let receive_task_handle = task::spawn(async move {
|
||||||
|
if let Some(tx) = tx.as_mut() {
|
||||||
|
if let Err(err) = tx
|
||||||
|
.send((
|
||||||
|
this_id_clone.clone(),
|
||||||
|
Some(
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "newPeer",
|
||||||
|
})
|
||||||
|
.to_string(),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while let Some(msg) = ws_stream.next().await {
|
while let Some(msg) = ws_stream.next().await {
|
||||||
|
info!("Received message {msg:?}");
|
||||||
match msg {
|
match msg {
|
||||||
Ok(WsMessage::Text(msg)) => {
|
Ok(WsMessage::Text(msg)) => {
|
||||||
if let Some(tx) = tx.as_mut() {
|
if let Some(tx) = tx.as_mut() {
|
||||||
|
|
|
@ -220,11 +220,8 @@ function Session(our_id, peer_id, closed_callback) {
|
||||||
this.ws_conn = new WebSocket(ws_url);
|
this.ws_conn = new WebSocket(ws_url);
|
||||||
/* When connected, immediately register with the server */
|
/* When connected, immediately register with the server */
|
||||||
this.ws_conn.addEventListener('open', (event) => {
|
this.ws_conn.addEventListener('open', (event) => {
|
||||||
this.ws_conn.send(JSON.stringify({
|
this.setStatus("Connecting to the peer");
|
||||||
"type": "register",
|
this.connectPeer();
|
||||||
"peerType": "consumer"
|
|
||||||
}));
|
|
||||||
this.setStatus("Registering with server");
|
|
||||||
});
|
});
|
||||||
this.ws_conn.addEventListener('error', this.onServerError.bind(this));
|
this.ws_conn.addEventListener('error', this.onServerError.bind(this));
|
||||||
this.ws_conn.addEventListener('message', this.onServerMessage.bind(this));
|
this.ws_conn.addEventListener('message', this.onServerMessage.bind(this));
|
||||||
|
@ -395,7 +392,9 @@ function onServerMessage(event) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg.type == "registered") {
|
if (msg.type == "welcome") {
|
||||||
|
console.info(`Got welcomed with ID ${msg.peer_id}`);
|
||||||
|
} else if (msg.type == "registered") {
|
||||||
ws_conn.send(JSON.stringify({
|
ws_conn.send(JSON.stringify({
|
||||||
"type": "list"
|
"type": "list"
|
||||||
}));
|
}));
|
||||||
|
|
Loading…
Reference in a new issue