protocol: allow registering with a display name

While the signalling server will allocate unique identifiers for
each peer, users may want to also pass their own identifiers.
This commit is contained in:
Mathieu Duponchelle 2022-03-23 01:33:00 +01:00
parent bd88395859
commit bd560fa9f9
6 changed files with 212 additions and 61 deletions

View file

@ -90,7 +90,9 @@ impl Signaller {
}); });
websocket_sender websocket_sender
.send(p::IncomingMessage::Register(p::RegisterMessage::Producer)) .send(p::IncomingMessage::Register(p::RegisterMessage::Producer {
display_name: element.property("display-name"),
}))
.await?; .await?;
let element_clone = element.downgrade(); let element_clone = element.downgrade();
@ -104,7 +106,7 @@ impl Signaller {
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) { if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg { match msg {
p::OutgoingMessage::Registered( p::OutgoingMessage::Registered(
p::RegisteredMessage::Producer { peer_id }, p::RegisteredMessage::Producer { peer_id, .. },
) => { ) => {
gst_info!( gst_info!(
CAT, CAT,

View file

@ -37,6 +37,7 @@ const RTP_TWCC_URI: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
const DEFAULT_DISPLAY_NAME: Option<&str> = None;
const DEFAULT_MIN_BITRATE: u32 = 1000; const DEFAULT_MIN_BITRATE: u32 = 1000;
/* I have found higher values to cause packet loss *somewhere* in /* I have found higher values to cause packet loss *somewhere* in
@ -61,6 +62,7 @@ struct Settings {
do_fec: bool, do_fec: bool,
do_retransmission: bool, do_retransmission: bool,
enable_data_channel_navigation: bool, enable_data_channel_navigation: bool,
display_name: Option<String>,
} }
/// Represents a codec we can offer /// Represents a codec we can offer
@ -213,8 +215,8 @@ struct PipelineWrapper(gst::Pipeline);
// Structure to generate GstNavigation event from a WebRTCDataChannel // Structure to generate GstNavigation event from a WebRTCDataChannel
#[derive(Debug)] #[derive(Debug)]
struct NavigationEventHandler { struct NavigationEventHandler {
channel: WebRTCDataChannel, _channel: WebRTCDataChannel,
message_sig: glib::SignalHandlerId, _message_sig: glib::SignalHandlerId,
} }
/// Our instance structure /// Our instance structure
@ -243,6 +245,7 @@ impl Default for Settings {
do_fec: DEFAULT_DO_FEC, do_fec: DEFAULT_DO_FEC,
do_retransmission: DEFAULT_DO_RETRANSMISSION, do_retransmission: DEFAULT_DO_RETRANSMISSION,
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
display_name: DEFAULT_DISPLAY_NAME.map(String::from),
} }
} }
} }
@ -994,6 +997,8 @@ impl Consumer {
gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly, gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly,
); );
transceiver.set_property("msid-appdata", stream.sink_pad.name());
transceiver.set_property("codec-preferences", &payloader_caps); transceiver.set_property("codec-preferences", &payloader_caps);
if stream.sink_pad.name().starts_with("video_") { if stream.sink_pad.name().starts_with("video_") {
@ -1214,7 +1219,7 @@ impl NavigationEventHandler {
let weak_element = element.downgrade(); let weak_element = element.downgrade();
Self { Self {
message_sig: channel.connect("on-message-string", false, move |values| { _message_sig: channel.connect("on-message-string", false, move |values| {
if let Some(element) = weak_element.upgrade() { if let Some(element) = weak_element.upgrade() {
let _channel = values[0].get::<WebRTCDataChannel>().unwrap(); let _channel = values[0].get::<WebRTCDataChannel>().unwrap();
let msg = values[1].get::<&str>().unwrap(); let msg = values[1].get::<&str>().unwrap();
@ -1223,7 +1228,7 @@ impl NavigationEventHandler {
None None
}), }),
channel, _channel: channel,
} }
} }
} }
@ -2321,6 +2326,13 @@ impl ObjectImpl for WebRTCSink {
DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY
), ),
glib::ParamSpecString::new(
"display-name",
"Display name",
"The display name of the producer",
DEFAULT_DISPLAY_NAME,
glib::ParamFlags::READWRITE,
),
] ]
}); });
@ -2414,6 +2426,12 @@ impl ObjectImpl for WebRTCSink {
settings.enable_data_channel_navigation = settings.enable_data_channel_navigation =
value.get::<bool>().expect("type checked upstream"); value.get::<bool>().expect("type checked upstream");
} }
"display-name" => {
let mut settings = self.settings.lock().unwrap();
settings.display_name = value
.get::<Option<String>>()
.expect("type checked upstream")
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -2461,6 +2479,10 @@ impl ObjectImpl for WebRTCSink {
settings.enable_data_channel_navigation.to_value() settings.enable_data_channel_navigation.to_value()
} }
"stats" => self.gather_stats().to_value(), "stats" => self.gather_stats().to_value(),
"display-name" => {
let settings = self.settings.lock().unwrap();
settings.display_name.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -8,13 +8,29 @@ use serde::{Deserialize, Serialize};
pub enum RegisteredMessage { pub enum RegisteredMessage {
/// Registered as a producer /// Registered as a producer
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Producer { peer_id: String }, Producer {
peer_id: String,
display_name: Option<String>,
},
/// Registered as a consumer /// Registered as a consumer
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Consumer { peer_id: String }, Consumer {
peer_id: String,
display_name: Option<String>,
},
/// Registered as a listener /// Registered as a listener
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Listener { peer_id: String }, Listener {
peer_id: String,
display_name: Option<String>,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Peer {
pub id: String,
pub display_name: Option<String>,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
@ -26,10 +42,16 @@ pub enum OutgoingMessage {
Registered(RegisteredMessage), Registered(RegisteredMessage),
/// Notifies listeners that a producer was registered /// Notifies listeners that a producer was registered
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
ProducerAdded { peer_id: String }, ProducerAdded {
peer_id: String,
display_name: Option<String>,
},
/// Notifies listeners that a producer was removed /// Notifies listeners that a producer was removed
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
ProducerRemoved { peer_id: String }, ProducerRemoved {
peer_id: String,
display_name: Option<String>,
},
/// Instructs a peer to generate an offer /// Instructs a peer to generate an offer
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
StartSession { peer_id: String }, StartSession { peer_id: String },
@ -39,7 +61,7 @@ pub enum OutgoingMessage {
/// Messages directly forwarded from one peer to another /// Messages directly forwarded from one peer to another
Peer(PeerMessage), Peer(PeerMessage),
/// Provides the current list of consumer peers /// Provides the current list of consumer peers
List { producers: Vec<String> }, List { producers: Vec<Peer> },
/// Notifies that an error occurred with the peer's current session /// Notifies that an error occurred with the peer's current session
Error { details: String }, Error { details: String },
} }
@ -50,11 +72,23 @@ pub enum OutgoingMessage {
/// Register with a peer type /// Register with a peer type
pub enum RegisterMessage { pub enum RegisterMessage {
/// Register as a producer /// Register as a producer
Producer, #[serde(rename_all = "camelCase")]
Producer {
#[serde(default)]
display_name: Option<String>,
},
/// Register as a consumer /// Register as a consumer
Consumer, #[serde(rename_all = "camelCase")]
Consumer {
#[serde(default)]
display_name: Option<String>,
},
/// Register as a listener /// Register as a listener
Listener, #[serde(rename_all = "camelCase")]
Listener {
#[serde(default)]
display_name: Option<String>,
},
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]

View file

@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tracing::{info, instrument}; use tracing::{info, instrument, warn};
use webrtcsink_protocol as p; use webrtcsink_protocol as p;
type PeerId = String; type PeerId = String;
@ -19,6 +19,7 @@ pin_project! {
producers: HashMap<PeerId, HashSet<PeerId>>, producers: HashMap<PeerId, HashSet<PeerId>>,
consumers: HashMap<PeerId, Option<PeerId>>, consumers: HashMap<PeerId, Option<PeerId>>,
listeners: HashSet<PeerId>, listeners: HashSet<PeerId>,
display_names: HashMap<PeerId, Option<String>>,
} }
} }
@ -34,6 +35,7 @@ impl Handler {
producers: HashMap::new(), producers: HashMap::new(),
consumers: HashMap::new(), consumers: HashMap::new(),
listeners: HashSet::new(), listeners: HashSet::new(),
display_names: HashMap::new(),
} }
} }
@ -45,9 +47,15 @@ impl Handler {
) -> Result<(), Error> { ) -> Result<(), Error> {
match msg { match msg {
p::IncomingMessage::Register(message) => match message { p::IncomingMessage::Register(message) => match message {
p::RegisterMessage::Producer => self.register_producer(peer_id), p::RegisterMessage::Producer { display_name } => {
p::RegisterMessage::Consumer => self.register_consumer(peer_id), self.register_producer(peer_id, display_name)
p::RegisterMessage::Listener => self.register_listener(peer_id), }
p::RegisterMessage::Consumer { display_name } => {
self.register_consumer(peer_id, display_name)
}
p::RegisterMessage::Listener { display_name } => {
self.register_listener(peer_id, display_name)
}
}, },
p::IncomingMessage::StartSession(message) => { p::IncomingMessage::StartSession(message) => {
self.start_session(&message.peer_id, peer_id) self.start_session(&message.peer_id, peer_id)
@ -100,6 +108,10 @@ impl Handler {
listener.to_string(), listener.to_string(),
p::OutgoingMessage::ProducerRemoved { p::OutgoingMessage::ProducerRemoved {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
display_name: match self.display_names.get(peer_id) {
Some(name) => name.clone(),
None => None,
},
}, },
)); ));
} }
@ -120,6 +132,8 @@ impl Handler {
}, },
)); ));
} }
let _ = self.display_names.remove(peer_id);
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
@ -186,7 +200,18 @@ impl Handler {
self.items.push_back(( self.items.push_back((
peer_id.to_string(), peer_id.to_string(),
p::OutgoingMessage::List { p::OutgoingMessage::List {
producers: self.producers.keys().cloned().collect(), producers: self
.producers
.keys()
.cloned()
.map(|peer_id| p::Peer {
id: peer_id.clone(),
display_name: match self.display_names.get(&peer_id) {
Some(name) => name.clone(),
None => None,
},
})
.collect(),
}, },
)); ));
@ -323,7 +348,11 @@ impl Handler {
/// Register peer as a producer /// Register peer as a producer
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
fn register_producer(&mut self, peer_id: &str) -> Result<(), Error> { fn register_producer(
&mut self,
peer_id: &str,
display_name: Option<String>,
) -> Result<(), Error> {
if self.producers.contains_key(peer_id) { if self.producers.contains_key(peer_id) {
Err(anyhow!("{} is already registered as a producer", peer_id)) Err(anyhow!("{} is already registered as a producer", peer_id))
} else { } else {
@ -334,6 +363,7 @@ impl Handler {
listener.to_string(), listener.to_string(),
p::OutgoingMessage::ProducerAdded { p::OutgoingMessage::ProducerAdded {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
display_name: display_name.clone(),
}, },
)); ));
} }
@ -342,9 +372,12 @@ impl Handler {
peer_id.to_string(), peer_id.to_string(),
p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { p::OutgoingMessage::Registered(p::RegisteredMessage::Producer {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
display_name: display_name.clone(),
}), }),
)); ));
self.display_names.insert(peer_id.to_string(), display_name);
info!(peer_id = %peer_id, "registered as a producer"); info!(peer_id = %peer_id, "registered as a producer");
Ok(()) Ok(())
@ -353,7 +386,11 @@ impl Handler {
/// Register peer as a consumer /// Register peer as a consumer
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
fn register_consumer(&mut self, peer_id: &str) -> Result<(), Error> { fn register_consumer(
&mut self,
peer_id: &str,
display_name: Option<String>,
) -> Result<(), Error> {
if self.consumers.contains_key(peer_id) { if self.consumers.contains_key(peer_id) {
Err(anyhow!("{} is already registered as a consumer", peer_id)) Err(anyhow!("{} is already registered as a consumer", peer_id))
} else { } else {
@ -363,9 +400,12 @@ impl Handler {
peer_id.to_string(), peer_id.to_string(),
p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
display_name: display_name.clone(),
}), }),
)); ));
self.display_names.insert(peer_id.to_string(), display_name);
info!(peer_id = %peer_id, "registered as a consumer"); info!(peer_id = %peer_id, "registered as a consumer");
Ok(()) Ok(())
@ -374,7 +414,11 @@ impl Handler {
/// Register peer as a listener /// Register peer as a listener
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
fn register_listener(&mut self, peer_id: &str) -> Result<(), Error> { fn register_listener(
&mut self,
peer_id: &str,
display_name: Option<String>,
) -> Result<(), Error> {
if !self.listeners.insert(peer_id.to_string()) { if !self.listeners.insert(peer_id.to_string()) {
Err(anyhow!("{} is already registered as a listener", peer_id)) Err(anyhow!("{} is already registered as a listener", peer_id))
} else { } else {
@ -382,9 +426,12 @@ impl Handler {
peer_id.to_string(), peer_id.to_string(),
p::OutgoingMessage::Registered(p::RegisteredMessage::Listener { p::OutgoingMessage::Registered(p::RegisteredMessage::Listener {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
display_name: display_name.clone(),
}), }),
)); ));
self.display_names.insert(peer_id.to_string(), display_name);
info!(peer_id = %peer_id, "registered as a listener"); info!(peer_id = %peer_id, "registered as a listener");
Ok(()) Ok(())
@ -480,7 +527,8 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
@ -492,7 +540,8 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { p::OutgoingMessage::Registered(p::RegisteredMessage::Producer {
peer_id: "producer".to_string() peer_id: "producer".to_string(),
display_name: None,
}) })
); );
} }
@ -502,7 +551,9 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message = p::IncomingMessage::Register(p::RegisterMessage::Producer {
display_name: Some("foobar".to_string()),
});
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
@ -522,7 +573,10 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::List { p::OutgoingMessage::List {
producers: vec!["producer".to_string()] producers: vec![p::Peer {
id: "producer".to_string(),
display_name: Some("foobar".to_string())
}]
} }
); );
} }
@ -532,7 +586,8 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
@ -544,7 +599,8 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer {
peer_id: "consumer".to_string() peer_id: "consumer".to_string(),
display_name: None,
}) })
); );
} }
@ -554,13 +610,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -580,13 +638,16 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Listener); let message =
p::IncomingMessage::Register(p::RegisterMessage::Listener { display_name: None });
tx.send(("listener".to_string(), Some(message))) tx.send(("listener".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message = p::IncomingMessage::Register(p::RegisterMessage::Producer {
display_name: Some("foobar".to_string()),
});
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -596,7 +657,8 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::ProducerAdded { p::OutgoingMessage::ProducerAdded {
peer_id: "producer".to_string() peer_id: "producer".to_string(),
display_name: Some("foobar".to_string()),
} }
); );
} }
@ -606,13 +668,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -641,13 +705,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -661,7 +727,8 @@ mod tests {
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Listener); let message =
p::IncomingMessage::Register(p::RegisterMessage::Listener { display_name: None });
tx.send(("listener".to_string(), Some(message))) tx.send(("listener".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -684,7 +751,8 @@ mod tests {
assert_eq!( assert_eq!(
sent_message, sent_message,
p::OutgoingMessage::ProducerRemoved { p::OutgoingMessage::ProducerRemoved {
peer_id: "producer".to_string() peer_id: "producer".to_string(),
display_name: None
} }
); );
} }
@ -694,13 +762,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -736,13 +806,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -778,13 +850,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -828,13 +902,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -876,13 +952,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -950,13 +1028,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -994,7 +1074,8 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -1022,7 +1103,8 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
@ -1050,13 +1132,15 @@ mod tests {
let (mut tx, rx) = mpsc::unbounded(); let (mut tx, rx) = mpsc::unbounded();
let mut handler = Handler::new(Box::pin(rx)); let mut handler = Handler::new(Box::pin(rx));
let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Producer { display_name: None });
tx.send(("producer".to_string(), Some(message))) tx.send(("producer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();
let _ = handler.next().await.unwrap(); let _ = handler.next().await.unwrap();
let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); let message =
p::IncomingMessage::Register(p::RegisterMessage::Consumer { display_name: None });
tx.send(("consumer".to_string(), Some(message))) tx.send(("consumer".to_string(), Some(message)))
.await .await
.unwrap(); .unwrap();

View file

@ -52,7 +52,7 @@ impl Server {
match serde_json::from_str::<I>(&msg) { match serde_json::from_str::<I>(&msg) {
Ok(msg) => Some((peer_id, Some(msg))), Ok(msg) => Some((peer_id, Some(msg))),
Err(err) => { Err(err) => {
warn!("Failed to parse incoming message: {}", err); warn!("Failed to parse incoming message: {} ({})", err, msg);
None None
} }
} }

View file

@ -240,6 +240,8 @@ function Session(our_id, peer_id, closed_callback) {
var videoTracks = event.stream.getVideoTracks(); var videoTracks = event.stream.getVideoTracks();
var audioTracks = event.stream.getAudioTracks(); var audioTracks = event.stream.getAudioTracks();
console.log(videoTracks);
if (videoTracks.length > 0) { if (videoTracks.length > 0) {
console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks'); console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks');
this.getVideoElement().srcObject = event.stream; this.getVideoElement().srcObject = event.stream;
@ -340,9 +342,16 @@ function session_closed(peer_id) {
sessions[peer_id] = null; sessions[peer_id] = null;
} }
function addPeer(peer_id) { function addPeer(peer_id, display_name) {
console.log("Display name: ", display_name);
var nav_ul = document.getElementById("camera-list"); var nav_ul = document.getElementById("camera-list");
var li_str = '<li id="peer-' + peer_id + '"><button class="button button1">' + peer_id + '</button></li>'
if (display_name == null) {
var li_str = '<li id="peer-' + peer_id + '"><button class="button button1">' + peer_id + '</button></li>';
} else {
var li_str = '<li id="peer-' + peer_id + '"><button class="button button1">' + display_name + '</button></li>';
}
nav_ul.insertAdjacentHTML('beforeend', li_str); nav_ul.insertAdjacentHTML('beforeend', li_str);
var li = document.getElementById("peer-" + peer_id); var li = document.getElementById("peer-" + peer_id);
@ -384,10 +393,10 @@ function onServerMessage(event) {
} else if (msg.type == "list") { } else if (msg.type == "list") {
clearPeers(); clearPeers();
for (i = 0; i < msg.producers.length; i++) { for (i = 0; i < msg.producers.length; i++) {
addPeer(msg.producers[i]); addPeer(msg.producers[i].id, msg.producers[i].displayName);
} }
} else if (msg.type == "producerAdded") { } else if (msg.type == "producerAdded") {
addPeer(msg.peerId); addPeer(msg.peerId, msg.displayName);
} else if (msg.type == "producerRemoved") { } else if (msg.type == "producerRemoved") {
var li = document.getElementById("peer-" + msg.peerId); var li = document.getElementById("peer-" + msg.peerId);
li.parentNode.removeChild(li); li.parentNode.removeChild(li);