diff --git a/plugins/src/signaller/imp.rs b/plugins/src/signaller/imp.rs index 26ef1e90..77273da2 100644 --- a/plugins/src/signaller/imp.rs +++ b/plugins/src/signaller/imp.rs @@ -133,23 +133,30 @@ impl Signaller { ); } p::OutgoingMessage::Registered(_) => unreachable!(), - p::OutgoingMessage::StartSession { peer_id } => { - if let Err(err) = element.add_consumer(&peer_id) { + p::OutgoingMessage::StartSession { + session_id, + peer_id, + } => { + if let Err(err) = + element.start_session(&session_id, &peer_id) + { gst::warning!(CAT, obj: &element, "{}", err); } } - p::OutgoingMessage::EndSession { peer_id } => { - if let Err(err) = element.remove_consumer(&peer_id) { + p::OutgoingMessage::EndSession(session_info) => { + if let Err(err) = + element.end_session(&session_info.session_id) + { gst::warning!(CAT, obj: &element, "{}", err); } } p::OutgoingMessage::Peer(p::PeerMessage { - peer_id, + session_id, peer_message, }) => match peer_message { p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => { if let Err(err) = element.handle_sdp( - &peer_id, + &session_id, &gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Answer, gst_sdp::SDPMessage::parse_buffer( @@ -175,7 +182,7 @@ impl Signaller { sdp_m_line_index, } => { if let Err(err) = element.handle_ice( - &peer_id, + &session_id, Some(sdp_m_line_index), None, &candidate, @@ -254,13 +261,13 @@ impl Signaller { pub fn handle_sdp( &self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) { let state = self.state.lock().unwrap(); let msg = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: sdp.sdp().as_text().unwrap(), }), @@ -281,7 +288,7 @@ impl Signaller { pub fn handle_ice( &self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_m_line_index: Option, _sdp_mid: Option, @@ -289,7 +296,7 @@ impl Signaller { let state = self.state.lock().unwrap(); let msg = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Ice { candidate: candidate.to_string(), sdp_m_line_index: sdp_m_line_index.unwrap(), @@ -331,17 +338,17 @@ impl Signaller { } } - pub fn consumer_removed(&self, element: &WebRTCSink, peer_id: &str) { - gst::debug!(CAT, obj: element, "Signalling consumer {} removed", peer_id); + pub fn end_session(&self, element: &WebRTCSink, session_id: &str) { + gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id); let state = self.state.lock().unwrap(); - let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); let element = element.downgrade(); if let Some(mut sender) = state.websocket_sender.clone() { task::spawn(async move { if let Err(err) = sender .send(p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), })) .await { diff --git a/plugins/src/signaller/mod.rs b/plugins/src/signaller/mod.rs index 8caf2e6b..2959a24a 100644 --- a/plugins/src/signaller/mod.rs +++ b/plugins/src/signaller/mod.rs @@ -34,13 +34,13 @@ impl Signallable for Signaller { fn handle_ice( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_mline_index: Option, sdp_mid: Option, ) -> Result<(), Box> { let signaller = imp::Signaller::from_instance(self); - signaller.handle_ice(element, peer_id, candidate, sdp_mline_index, sdp_mid); + signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid); Ok(()) } @@ -49,9 +49,9 @@ impl Signallable for Signaller { signaller.stop(element); } - fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str) { + fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) { let signaller = imp::Signaller::from_instance(self); - signaller.consumer_removed(element, peer_id); + signaller.end_session(element, session_id); } } diff --git a/plugins/src/webrtcsink/imp.rs b/plugins/src/webrtcsink/imp.rs index 9c60be2a..da86cf29 100644 --- a/plugins/src/webrtcsink/imp.rs +++ b/plugins/src/webrtcsink/imp.rs @@ -125,12 +125,14 @@ pub struct VideoEncoder { filter: gst::Element, halved_framerate: gst::Fraction, video_info: gst_video::VideoInfo, - peer_id: String, + session_id: String, mitigation_mode: WebRTCSinkMitigationMode, pub transceiver: gst_webrtc::WebRTCRTPTransceiver, } -struct Consumer { +struct Session { + id: String, + pipeline: gst::Pipeline, webrtcbin: gst::Element, rtprtxsend: Option, @@ -165,7 +167,7 @@ struct NavigationEvent { struct State { signaller: Box, signaller_state: SignallerState, - consumers: HashMap, + sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery codecs_abort_handle: Option, @@ -275,7 +277,7 @@ impl Default for State { Self { signaller: Box::new(signaller), signaller_state: SignallerState::Stopped, - consumers: HashMap::new(), + sessions: HashMap::new(), codecs: BTreeMap::new(), codecs_abort_handle: None, codecs_done_receiver: None, @@ -541,7 +543,7 @@ impl VideoEncoder { filter, halved_framerate, video_info, - peer_id: peer_id.to_string(), + session_id: peer_id.to_string(), mitigation_mode: WebRTCSinkMitigationMode::NONE, transceiver, } @@ -632,8 +634,8 @@ impl VideoEncoder { gst::log!( CAT, obj: element, - "consumer {}: setting bitrate {} and caps {} on encoder {:?}", - self.peer_id, + "session {}: setting bitrate {} and caps {} on encoder {:?}", + self.session_id, bitrate, caps, self.element @@ -657,39 +659,39 @@ impl VideoEncoder { } impl State { - fn finalize_consumer( + fn finalize_session( &mut self, element: &super::WebRTCSink, - consumer: &mut Consumer, + session: &mut Session, signal: bool, ) { - consumer.pipeline.debug_to_dot_file_with_ts( + session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("removing-peer-{}-", consumer.peer_id,), + format!("removing-peer-{}-", session.peer_id,), ); - for ssrc in consumer.webrtc_pads.keys() { - consumer.links.remove(ssrc); + for ssrc in session.webrtc_pads.keys() { + session.links.remove(ssrc); } - consumer.pipeline.call_async(|pipeline| { + session.pipeline.call_async(|pipeline| { let _ = pipeline.set_state(gst::State::Null); }); if signal { - self.signaller.consumer_removed(element, &consumer.peer_id); + self.signaller.session_ended(element, &session.peer_id); } } - fn remove_consumer( + fn end_session( &mut self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, signal: bool, - ) -> Option { - if let Some(mut consumer) = self.consumers.remove(peer_id) { - self.finalize_consumer(element, &mut consumer, signal); - Some(consumer) + ) -> Option { + if let Some(mut session) = self.sessions.remove(session_id) { + self.finalize_session(element, &mut session, signal); + Some(session) } else { None } @@ -723,8 +725,9 @@ impl State { } } -impl Consumer { +impl Session { fn new( + id: String, pipeline: gst::Pipeline, webrtcbin: gst::Element, peer_id: String, @@ -732,6 +735,7 @@ impl Consumer { cc_info: CCInfo, ) -> Self { Self { + id, pipeline, webrtcbin, peer_id, @@ -1144,10 +1148,10 @@ impl WebRTCSink { let mut state = self.state.lock().unwrap(); - let consumer_ids: Vec<_> = state.consumers.keys().map(|k| k.to_owned()).collect(); + let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); - for id in consumer_ids { - state.remove_consumer(element, &id, true); + for id in session_ids { + state.end_session(element, &id, true); } state @@ -1197,39 +1201,39 @@ impl WebRTCSink { &self, element: &super::WebRTCSink, offer: gst_webrtc::WebRTCSessionDescription, - peer_id: &str, + session_id: &str, ) { let mut state = self.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get(peer_id) { - consumer + if let Some(session) = state.sessions.get(session_id) { + session .webrtcbin .emit_by_name::<()>("set-local-description", &[&offer, &None::]); - if let Err(err) = state.signaller.handle_sdp(element, peer_id, &offer) { + if let Err(err) = state.signaller.handle_sdp(element, session_id, &offer) { gst::warning!( CAT, - "Failed to handle SDP for consumer {}: {}", - peer_id, + "Failed to handle SDP for session {}: {}", + session_id, err ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); } } } - fn negotiate(&self, element: &super::WebRTCSink, peer_id: &str) { + fn negotiate(&self, element: &super::WebRTCSink, session_id: &str) { let state = self.state.lock().unwrap(); - gst::debug!(CAT, obj: element, "Negotiating for peer {}", peer_id); + gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id); - if let Some(consumer) = state.consumers.get(peer_id) { + if let Some(session) = state.sessions.get(session_id) { let element = element.downgrade(); - gst::debug!(CAT, "Creating offer for peer {}", peer_id); - let peer_id = peer_id.to_string(); + gst::debug!(CAT, "Creating offer for session {}", session_id); + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "Created offer for peer {}", peer_id); + gst::debug!(CAT, "Created offer for session {}", session_id); if let Some(element) = element.upgrade() { let this = Self::from_instance(&element); @@ -1240,9 +1244,9 @@ impl WebRTCSink { CAT, obj: &element, "Promise returned without a reply for {}", - peer_id + session_id ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); return; } Err(err) => { @@ -1250,10 +1254,10 @@ impl WebRTCSink { CAT, obj: &element, "Promise returned with an error for {}: {:?}", - peer_id, + session_id, err ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); return; } }; @@ -1262,28 +1266,29 @@ impl WebRTCSink { .value("offer") .map(|offer| offer.get::().unwrap()) { - this.on_offer_created(&element, offer, &peer_id); + this.on_offer_created(&element, offer, &session_id); } else { gst::warning!( CAT, - "Reply without an offer for consumer {}: {:?}", - peer_id, + "Reply without an offer for session {}: {:?}", + session_id, reply ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); } } }); - consumer + session .webrtcbin .emit_by_name::<()>("create-offer", &[&None::, &promise]); } else { gst::debug!( CAT, obj: element, - "consumer for peer {} no longer exists", - peer_id + "consumer for session {} no longer exists (sessions: {:?}", + session_id, + state.sessions.keys().map(|id| id) ); } } @@ -1291,47 +1296,58 @@ impl WebRTCSink { fn on_ice_candidate( &self, element: &super::WebRTCSink, - peer_id: String, + session_id: String, sdp_m_line_index: u32, candidate: String, ) { let mut state = self.state.lock().unwrap(); - if let Err(err) = - state - .signaller - .handle_ice(element, &peer_id, &candidate, Some(sdp_m_line_index), None) - { + if let Err(err) = state.signaller.handle_ice( + element, + &session_id, + &candidate, + Some(sdp_m_line_index), + None, + ) { gst::warning!( CAT, - "Failed to handle ICE for consumer {}: {}", - peer_id, + "Failed to handle ICE in session {}: {}", + session_id, err ); - state.remove_consumer(element, &peer_id, true); + state.end_session(element, &session_id, true); } } - /// Called by the signaller to add a new consumer - pub fn add_consumer( + /// Called by the signaller to add a new session + pub fn start_session( &self, element: &super::WebRTCSink, + session_id: &str, peer_id: &str, ) -> Result<(), WebRTCSinkError> { let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); - if state.consumers.contains_key(&peer_id) { - return Err(WebRTCSinkError::DuplicateConsumerId(peer_id)); + if state.sessions.contains_key(&session_id) { + return Err(WebRTCSinkError::DuplicateSessionId(session_id)); } - gst::info!(CAT, obj: element, "Adding consumer {}", peer_id); + gst::info!( + CAT, + obj: element, + "Adding session: {} for peer: {}", + peer_id, + session_id + ); - let pipeline = gst::Pipeline::new(Some(&format!("consumer-pipeline-{}", peer_id))); + let pipeline = gst::Pipeline::new(Some(&format!("session-pipeline-{}", session_id))); - let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", peer_id))) - .map_err(|err| WebRTCSinkError::ConsumerPipelineError { + let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", session_id))) + .map_err(|err| WebRTCSinkError::SessionPipelineError { + session_id: session_id.clone(), peer_id: peer_id.clone(), details: err.to_string(), })?; @@ -1351,7 +1367,7 @@ impl WebRTCSink { webrtcbin.connect_closure( "request-aux-sender", false, - glib::closure!(@watch element, @strong peer_id + glib::closure!(@watch element, @strong session_id => move |_webrtcbin: gst::Element, _transport: gst::Object| { let settings = element.imp().settings.lock().unwrap(); @@ -1378,9 +1394,9 @@ impl WebRTCSink { }; cc.connect_notify(Some("estimated-bitrate"), - glib::clone!(@weak element, @strong peer_id + glib::clone!(@weak element, @strong session_id => move |bwe, pspec| { - element.imp().set_bitrate(&element, &peer_id, + element.imp().set_bitrate(&element, &session_id, bwe.property::(pspec.name())); } )); @@ -1392,12 +1408,12 @@ impl WebRTCSink { webrtcbin.connect_closure( "deep-element-added", false, - glib::closure!(@watch element, @strong peer_id + glib::closure!(@watch element, @strong session_id => move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { if e.factory().map_or(false, |f| f.name() == "rtprtxsend") { if e.has_property("stuffing-kbps", Some(i32::static_type())) { - element.imp().set_rtptrxsend(&element, &peer_id, e); + element.imp().set_rtptrxsend(&element, &session_id, e); } else { gst::warning!(CAT, "rtprtxsend doesn't have a `stuffing-kbps` \ property, stuffing disabled"); @@ -1412,7 +1428,7 @@ impl WebRTCSink { pipeline.add(&webrtcbin).unwrap(); let element_clone = element.downgrade(); - let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect("on-ice-candidate", false, move |values| { if let Some(element) = element_clone.upgrade() { let this = Self::from_instance(&element); @@ -1420,7 +1436,7 @@ impl WebRTCSink { let candidate = values[2].get::().expect("Invalid argument"); this.on_ice_candidate( &element, - peer_id_clone.to_string(), + session_id_clone.to_string(), sdp_m_line_index, candidate, ); @@ -1430,6 +1446,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("connection-state"), move |webrtcbin, _pspec| { if let Some(element) = element_clone.upgrade() { let state = @@ -1441,16 +1458,18 @@ impl WebRTCSink { gst::warning!( CAT, obj: &element, - "Connection state for consumer {} failed", + "Connection state for in session {} (peer {}) failed", + session_id_clone, peer_id_clone ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => { gst::log!( CAT, obj: &element, - "Connection state for consumer {} changed: {:?}", + "Connection state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); @@ -1461,6 +1480,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("ice-connection-state"), move |webrtcbin, _pspec| { if let Some(element) = element_clone.upgrade() { let state = webrtcbin @@ -1472,16 +1492,18 @@ impl WebRTCSink { gst::warning!( CAT, obj: &element, - "Ice connection state for consumer {} failed", - peer_id_clone + "Ice connection state in session {} (peer {}) failed", + session_id_clone, + peer_id_clone, ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => { gst::log!( CAT, obj: &element, - "Ice connection state for consumer {} changed: {:?}", + "Ice connection state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); @@ -1491,8 +1513,8 @@ impl WebRTCSink { if state == gst_webrtc::WebRTCICEConnectionState::Completed { let state = this.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get(&peer_id_clone) { - for webrtc_pad in consumer.webrtc_pads.values() { + if let Some(session) = state.sessions.get(&session_id_clone) { + for webrtc_pad in session.webrtc_pads.values() { if let Some(srcpad) = webrtc_pad.pad.peer() { srcpad.send_event( gst_video::UpstreamForceKeyUnitEvent::builder() @@ -1508,6 +1530,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { let state = webrtcbin.property::("ice-gathering-state"); @@ -1516,14 +1539,16 @@ impl WebRTCSink { gst::log!( CAT, obj: &element, - "Ice gathering state for consumer {} changed: {:?}", + "Ice gathering state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); } }); - let mut consumer = Consumer::new( + let mut session = Session::new( + session_id.clone(), pipeline.clone(), webrtcbin.clone(), peer_id.clone(), @@ -1549,23 +1574,23 @@ impl WebRTCSink { .child_by_name("rtpbin") .unwrap(); - if consumer.congestion_controller.is_some() { - let peer_id_str = peer_id.to_string(); - if consumer.stats_sigid.is_none() { - consumer.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true, + if session.congestion_controller.is_some() { + let session_id_str = session_id.to_string(); + if session.stats_sigid.is_none() { + session.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true, glib::closure!(@weak-allow-none element, @weak-allow-none webrtcbin => move |rtpbin: gst::Object, session_id: u32, _src: u32| { - let session = rtpbin.emit_by_name::("get-session", &[&session_id]); + let rtp_session = rtpbin.emit_by_name::("get-session", &[&session_id]); let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?"); let webrtcbin = webrtcbin.unwrap(); let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(&peer_id_str) { + if let Some(mut session) = state.sessions.get_mut(&session_id_str) { - consumer.stats_sigid = Some(session.connect_notify(Some("twcc-stats"), - glib::clone!(@strong peer_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| { + session.stats_sigid = Some(rtp_session.connect_notify(Some("twcc-stats"), + glib::clone!(@strong session_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| { // Run the Loss-based control algortithm on new peer TWCC feedbacks - element.imp().process_loss_stats(&element, &peer_id_str, &sess.property::(pspec.name())); + element.imp().process_loss_stats(&element, &session_id_str, &sess.property::(pspec.name())); }) )); } @@ -1577,7 +1602,7 @@ impl WebRTCSink { state .streams .iter() - .for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &settings, stream)); + .for_each(|(_, stream)| session.request_webrtcbin_pad(element, &settings, stream)); let clock = element.clock(); @@ -1588,7 +1613,7 @@ impl WebRTCSink { let mut bus_stream = pipeline.bus().unwrap().stream(); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); - let peer_id_clone = peer_id.to_owned(); + let session_id_clone = session_id.to_owned(); task::spawn(async move { while let Some(msg) = bus_stream.next().await { @@ -1598,12 +1623,12 @@ impl WebRTCSink { gst::MessageView::Error(err) => { gst::error!( CAT, - "Consumer {} error: {}, details: {:?}", - peer_id_clone, + "session {} error: {}, details: {:?}", + session_id_clone, err.error(), err.debug() ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } gst::MessageView::StateChanged(state_changed) => { if let Some(pipeline) = pipeline_clone.upgrade() { @@ -1611,8 +1636,8 @@ impl WebRTCSink { pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), format!( - "webrtcsink-peer-{}-{:?}-to-{:?}", - peer_id_clone, + "webrtcsink-session-{}-{:?}-to-{:?}", + session_id_clone, state_changed.old(), state_changed.current() ), @@ -1629,10 +1654,10 @@ impl WebRTCSink { gst::MessageView::Eos(..) => { gst::error!( CAT, - "Unexpected end of stream for consumer {}", - peer_id_clone + "Unexpected end of stream in session {}", + session_id_clone, ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => (), } @@ -1641,7 +1666,8 @@ impl WebRTCSink { }); pipeline.set_state(gst::State::Ready).map_err(|err| { - WebRTCSinkError::ConsumerPipelineError { + WebRTCSinkError::SessionPipelineError { + session_id: session_id.to_string(), peer_id: peer_id.to_string(), details: err.to_string(), } @@ -1651,7 +1677,7 @@ impl WebRTCSink { state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin)); } - state.consumers.insert(peer_id.to_string(), consumer); + state.sessions.insert(session_id.to_string(), session); drop(state); @@ -1669,10 +1695,11 @@ impl WebRTCSink { // // This is completely safe, as we know that by now all conditions are gathered: // webrtcbin is in the Ready state, and all its transceivers have codec_preferences. - self.negotiate(element, &peer_id); + self.negotiate(element, &session_id); pipeline.set_state(gst::State::Playing).map_err(|err| { - WebRTCSinkError::ConsumerPipelineError { + WebRTCSinkError::SessionPipelineError { + session_id: session_id.to_string(), peer_id: peer_id.to_string(), details: err.to_string(), } @@ -1682,21 +1709,21 @@ impl WebRTCSink { } /// Called by the signaller to remove a consumer - pub fn remove_consumer( + pub fn remove_session( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, signal: bool, ) -> Result<(), WebRTCSinkError> { let mut state = self.state.lock().unwrap(); - if !state.consumers.contains_key(peer_id) { - return Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())); + if !state.sessions.contains_key(session_id) { + return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); } - if let Some(consumer) = state.remove_consumer(element, peer_id, signal) { + if let Some(session) = state.end_session(element, session_id, signal) { drop(state); - element.emit_by_name::<()>("consumer-removed", &[&peer_id, &consumer.webrtcbin]); + element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); } Ok(()) @@ -1705,30 +1732,35 @@ impl WebRTCSink { fn process_loss_stats( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, stats: &gst::Structure, ) { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(peer_id) { - if let Some(congestion_controller) = consumer.congestion_controller.as_mut() { - congestion_controller.loss_control(&element, stats, &mut consumer.encoders); + if let Some(mut session) = state.sessions.get_mut(session_id) { + if let Some(congestion_controller) = session.congestion_controller.as_mut() { + congestion_controller.loss_control(&element, stats, &mut session.encoders); } - consumer.stats = stats.to_owned(); + session.stats = stats.to_owned(); } } - fn process_stats(&self, element: &super::WebRTCSink, webrtcbin: gst::Element, peer_id: &str) { - let peer_id = peer_id.to_string(); + fn process_stats( + &self, + element: &super::WebRTCSink, + webrtcbin: gst::Element, + session_id: &str, + ) { + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func( - glib::clone!(@strong peer_id, @weak element => move |reply| { + glib::clone!(@strong session_id, @weak element => move |reply| { if let Ok(Some(stats)) = reply { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(&peer_id) { - if let Some(congestion_controller) = consumer.congestion_controller.as_mut() { - congestion_controller.delay_control(&element, stats, &mut consumer.encoders,); + if let Some(mut session) = state.sessions.get_mut(&session_id) { + if let Some(congestion_controller) = session.congestion_controller.as_mut() { + congestion_controller.delay_control(&element, stats, &mut session.encoders,); } - consumer.stats = stats.to_owned(); + session.stats = stats.to_owned(); } } }), @@ -1740,34 +1772,34 @@ impl WebRTCSink { fn set_rtptrxsend(&self, element: &super::WebRTCSink, peer_id: &str, rtprtxsend: gst::Element) { let mut state = element.imp().state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { - consumer.rtprtxsend = Some(rtprtxsend); + if let Some(session) = state.sessions.get_mut(peer_id) { + session.rtprtxsend = Some(rtprtxsend); } } fn set_bitrate(&self, element: &super::WebRTCSink, peer_id: &str, bitrate: u32) { let mut state = element.imp().state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { + if let Some(session) = state.sessions.get_mut(peer_id) { let fec_ratio = { // Start adding some FEC when the bitrate > 2Mbps as we found experimentally // that it is not worth it below that threshold - if bitrate <= 2_000_000 || consumer.cc_info.max_bitrate <= 2_000_000 { + if bitrate <= 2_000_000 || session.cc_info.max_bitrate <= 2_000_000 { 0f64 } else { (bitrate as f64 - 2_000_000.) - / (consumer.cc_info.max_bitrate as f64 - 2_000_000.) + / (session.cc_info.max_bitrate as f64 - 2_000_000.) } }; let fec_percentage = fec_ratio * 50f64; let encoders_bitrate = ((bitrate as f64) / (1. + (fec_percentage / 100.))) as i32; - if let Some(ref rtpxsend) = consumer.rtprtxsend.as_ref() { + if let Some(ref rtpxsend) = session.rtprtxsend.as_ref() { rtpxsend.set_property("stuffing-kbps", (bitrate as f64 / 1000.) as i32); } - for encoder in consumer.encoders.iter_mut() { + for encoder in session.encoders.iter_mut() { encoder.set_bitrate(element, encoders_bitrate); encoder .transceiver @@ -1776,12 +1808,12 @@ impl WebRTCSink { } } - fn on_remote_description_set(&self, element: &super::WebRTCSink, peer_id: String) { + fn on_remote_description_set(&self, element: &super::WebRTCSink, session_id: String) { let mut state = self.state.lock().unwrap(); let mut remove = false; - if let Some(mut consumer) = state.consumers.remove(&peer_id) { - for webrtc_pad in consumer.webrtc_pads.clone().values() { + if let Some(mut session) = state.sessions.remove(&session_id) { + for webrtc_pad in session.webrtc_pads.clone().values() { let transceiver = webrtc_pad .pad .property::("transceiver"); @@ -1798,14 +1830,14 @@ impl WebRTCSink { .and_then(|stream| stream.producer.as_ref()) { if let Err(err) = - consumer.connect_input_stream(element, producer, webrtc_pad, &state.codecs) + session.connect_input_stream(element, producer, webrtc_pad, &state.codecs) { gst::error!( CAT, obj: element, - "Failed to connect input stream {} for consumer {}: {}", + "Failed to connect input stream {} for session {}: {}", webrtc_pad.stream_name, - peer_id, + session_id, err ); remove = true; @@ -1815,21 +1847,21 @@ impl WebRTCSink { gst::error!( CAT, obj: element, - "No producer to connect consumer {} to", - peer_id, + "No producer to connect session {} to", + session_id, ); remove = true; break; } } - consumer.pipeline.debug_to_dot_file_with_ts( + session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("webrtcsink-peer-{}-remote-description-set", peer_id,), + format!("webrtcsink-peer-{}-remote-description-set", session_id,), ); let element_clone = element.downgrade(); - let webrtcbin = consumer.webrtcbin.downgrade(); + let webrtcbin = session.webrtcbin.downgrade(); task::spawn(async move { let mut interval = async_std::stream::interval(std::time::Duration::from_millis(100)); @@ -1839,7 +1871,9 @@ impl WebRTCSink { if let (Some(webrtcbin), Some(element)) = (webrtcbin.upgrade(), element_clone.upgrade()) { - element.imp().process_stats(&element, webrtcbin, &peer_id); + element + .imp() + .process_stats(&element, webrtcbin, &session_id); } else { break; } @@ -1847,9 +1881,9 @@ impl WebRTCSink { }); if remove { - state.finalize_consumer(element, &mut consumer, true); + state.finalize_session(element, &mut session, true); } else { - state.consumers.insert(consumer.peer_id.clone(), consumer); + state.sessions.insert(session.id.clone(), session); } } } @@ -1858,7 +1892,7 @@ impl WebRTCSink { pub fn handle_ice( &self, _element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, @@ -1867,14 +1901,14 @@ impl WebRTCSink { let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; - if let Some(consumer) = state.consumers.get(peer_id) { - gst::trace!(CAT, "adding ice candidate for peer {}", peer_id); - consumer + if let Some(session) = state.sessions.get(session_id) { + gst::trace!(CAT, "adding ice candidate for session {}", session_id); + session .webrtcbin .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); Ok(()) } else { - Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())) + Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) } } @@ -1882,17 +1916,17 @@ impl WebRTCSink { pub fn handle_sdp( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), WebRTCSinkError> { let mut state = self.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { + if let Some(session) = state.sessions.get_mut(session_id) { let sdp = desc.sdp(); - consumer.sdp = Some(sdp.to_owned()); + session.sdp = Some(sdp.to_owned()); - for webrtc_pad in consumer.webrtc_pads.values_mut() { + for webrtc_pad in session.webrtc_pads.values_mut() { let media_idx = webrtc_pad.media_idx; /* TODO: support partial answer, webrtcbin doesn't seem * very well equipped to deal with this at the moment */ @@ -1904,15 +1938,15 @@ impl WebRTCSink { gst::warning!( CAT, - "consumer {} refused media {}: {:?}", - peer_id, + "consumer from session {} refused media {}: {:?}", + session_id, media_idx, media_str ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); return Err(WebRTCSinkError::ConsumerRefusedMedia { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), media_idx, }); } @@ -1927,39 +1961,40 @@ impl WebRTCSink { } else { gst::warning!( CAT, - "consumer {} did not provide valid payload for media index {}", - peer_id, - media_idx + "consumer from session {} did not provide valid payload for media index {} for session {}", + session_id, + media_idx, + session_id, ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); return Err(WebRTCSinkError::ConsumerNoValidPayload { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), media_idx, }); } } let element = element.downgrade(); - let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func(move |reply| { gst::debug!(CAT, "received reply {:?}", reply); if let Some(element) = element.upgrade() { let this = Self::from_instance(&element); - this.on_remote_description_set(&element, peer_id); + this.on_remote_description_set(&element, session_id); } }); - consumer + session .webrtcbin .emit_by_name::<()>("set-remote-description", &[desc, &promise]); Ok(()) } else { - Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())) + Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) } } @@ -2135,7 +2170,7 @@ impl WebRTCSink { self.state .lock() .unwrap() - .consumers + .sessions .iter() .map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())), ) @@ -2493,11 +2528,11 @@ impl ObjectImpl for WebRTCSink { .param_types([String::static_type(), gst::Element::static_type()]) .build(), /* - * RsWebRTCSink::get_consumers: + * RsWebRTCSink::get_sessions: * - * List all consumers (by ID). + * List all sessions (by ID). */ - glib::subclass::Signal::builder("get-consumers") + glib::subclass::Signal::builder("get-sessions") .action() .class_handler(|_, args| { let element = args[0].get::().expect("signal arg"); @@ -2507,7 +2542,7 @@ impl ObjectImpl for WebRTCSink { this.state .lock() .unwrap() - .consumers + .sessions .keys() .cloned() .collect::>() diff --git a/plugins/src/webrtcsink/mod.rs b/plugins/src/webrtcsink/mod.rs index acb7a176..da124e13 100644 --- a/plugins/src/webrtcsink/mod.rs +++ b/plugins/src/webrtcsink/mod.rs @@ -15,18 +15,22 @@ unsafe impl Sync for WebRTCSink {} #[derive(thiserror::Error, Debug)] pub enum WebRTCSinkError { - #[error("no consumer with id")] - NoConsumerWithId(String), + #[error("no session with id")] + NoSessionWithId(String), #[error("consumer refused media")] - ConsumerRefusedMedia { peer_id: String, media_idx: u32 }, + ConsumerRefusedMedia { session_id: String, media_idx: u32 }, #[error("consumer did not provide valid payload for media")] - ConsumerNoValidPayload { peer_id: String, media_idx: u32 }, + ConsumerNoValidPayload { session_id: String, media_idx: u32 }, #[error("SDP mline index is currently mandatory")] MandatorySdpMlineIndex, - #[error("duplicate consumer id")] - DuplicateConsumerId(String), + #[error("duplicate session id")] + DuplicateSessionId(String), #[error("error setting up consumer pipeline")] - ConsumerPipelineError { peer_id: String, details: String }, + SessionPipelineError { + session_id: String, + peer_id: String, + details: String, + }, } pub trait Signallable: Sync + Send + 'static { @@ -35,7 +39,7 @@ pub trait Signallable: Sync + Send + 'static { fn handle_sdp( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), Box>; @@ -46,13 +50,13 @@ pub trait Signallable: Sync + Send + 'static { fn handle_ice( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_m_line_index: Option, sdp_mid: Option, ) -> Result<(), Box>; - fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str); + fn session_ended(&mut self, element: &WebRTCSink, session_id: &str); fn stop(&mut self, element: &WebRTCSink); } @@ -86,12 +90,12 @@ impl WebRTCSink { pub fn handle_sdp( &self, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.handle_sdp(self, peer_id, sdp) + ws.handle_sdp(self, session_id, sdp) } /// sdp_mid is exposed for future proofing, see @@ -99,14 +103,14 @@ impl WebRTCSink { /// at the moment sdp_m_line_index must be Some pub fn handle_ice( &self, - peer_id: &str, + session_id: &str, sdp_m_line_index: Option, sdp_mid: Option, candidate: &str, ) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.handle_ice(self, peer_id, sdp_m_line_index, sdp_mid, candidate) + ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate) } pub fn handle_signalling_error(&self, error: Box) { @@ -115,16 +119,16 @@ impl WebRTCSink { ws.handle_signalling_error(self, anyhow::anyhow!(error)); } - pub fn add_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> { + pub fn start_session(&self, session_id: &str, peer_id: &str) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.add_consumer(self, peer_id) + ws.start_session(self, session_id, peer_id) } - pub fn remove_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> { + pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.remove_consumer(self, peer_id, false) + ws.remove_session(self, session_id, false) } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 74c62be8..4f362e74 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -88,12 +88,17 @@ pub enum OutgoingMessage { #[serde(default)] meta: Option, }, - /// Instructs a peer to generate an offer + /// Instructs a peer to generate an offer and inform about the session ID #[serde(rename_all = "camelCase")] - StartSession { peer_id: String }, + StartSession { peer_id: String, session_id: String }, + + /// Let consumer know that the requested session is starting with the specified identifier + #[serde(rename_all = "camelCase")] + SessionStarted { peer_id: String, session_id: String }, + /// Signals that the session the peer was in was ended #[serde(rename_all = "camelCase")] - EndSession { peer_id: String }, + EndSession(EndSessionMessage), /// Messages directly forwarded from one peer to another Peer(PeerMessage), /// Provides the current list of consumer peers @@ -151,7 +156,7 @@ pub struct StartSessionMessage { pub peer_id: String, } -#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] #[serde(tag = "type")] #[serde(rename_all = "camelCase")] /// Conveys a SDP @@ -168,7 +173,7 @@ pub enum SdpMessage { }, } -#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] #[serde(rename_all = "camelCase")] /// Contents of the peer message pub enum PeerMessageInner { @@ -187,19 +192,17 @@ pub enum PeerMessageInner { #[serde(rename_all = "camelCase")] /// Messages directly forwarded from one peer to another pub struct PeerMessage { - /// The identifier of the peer, which must be in a session with the sender - pub peer_id: String, - /// The contents of the message + pub session_id: String, #[serde(flatten)] pub peer_message: PeerMessageInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] /// End a session pub struct EndSessionMessage { - /// The identifier of the peer to end the session with - pub peer_id: String, + /// The identifier of the session to end + pub session_id: String, } #[derive(Serialize, Deserialize, Debug)] diff --git a/signalling/src/handlers/mod.rs b/signalling/src/handlers/mod.rs index c17b96e4..7fc834a1 100644 --- a/signalling/src/handlers/mod.rs +++ b/signalling/src/handlers/mod.rs @@ -1,25 +1,46 @@ use anyhow::{anyhow, Error}; +use anyhow::{bail, Context}; use futures::prelude::*; use futures::ready; use pin_project_lite::pin_project; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context as TaskContext, Poll}; +use tracing::log::error; use tracing::{info, instrument, warn}; use webrtcsink_protocol as p; type PeerId = String; +#[derive(Clone)] +struct Session { + id: String, + producer: PeerId, + consumer: PeerId, +} + +impl Session { + fn other_peer_id(&self, id: &str) -> Result<&str, Error> { + if self.producer == id { + Ok(&self.consumer) + } else if self.consumer == id { + Ok(&self.producer) + } else { + bail!("Peer {id} is not part of {}", self.id) + } + } +} + pin_project! { #[must_use = "streams do nothing unless polled"] pub struct Handler { #[pin] stream: Pin)> + Send>>, items: VecDeque<(String, p::OutgoingMessage)>, - producers: HashMap>, - consumers: HashMap>, - listeners: HashSet, - meta: HashMap>, + producers: HashMap>, + consumers: HashMap>, + listeners: HashMap>, + sessions: HashMap, } } @@ -32,10 +53,10 @@ impl Handler { Self { stream, items: VecDeque::new(), - producers: HashMap::new(), - consumers: HashMap::new(), - listeners: HashSet::new(), - meta: HashMap::new(), + producers: Default::default(), + consumers: Default::default(), + listeners: Default::default(), + sessions: Default::default(), } } @@ -52,29 +73,19 @@ impl Handler { p::RegisterMessage::Listener { meta } => self.register_listener(peer_id, meta), }, p::IncomingMessage::Unregister(message) => { - let meta = self.meta.get(peer_id).unwrap_or_else(|| &None).clone(); let answer = match message { - p::UnregisterMessage::Producer => { - self.remove_producer_peer(peer_id); - p::UnregisteredMessage::Producer { - peer_id: peer_id.into(), - meta, - } - } - p::UnregisterMessage::Consumer => { - self.remove_consumer_peer(peer_id); - p::UnregisteredMessage::Consumer { - peer_id: peer_id.into(), - meta, - } - } - p::UnregisterMessage::Listener => { - self.remove_listener_peer(peer_id); - p::UnregisteredMessage::Listener { - peer_id: peer_id.into(), - meta, - } - } + p::UnregisterMessage::Producer => p::UnregisteredMessage::Producer { + peer_id: peer_id.into(), + meta: self.remove_producer_peer(peer_id), + }, + p::UnregisterMessage::Consumer => p::UnregisteredMessage::Consumer { + peer_id: peer_id.into(), + meta: self.remove_consumer_peer(peer_id), + }, + p::UnregisterMessage::Listener => p::UnregisteredMessage::Listener { + peer_id: peer_id.into(), + meta: self.remove_listener_peer(peer_id), + }, }; self.items.push_back(( @@ -87,10 +98,10 @@ impl Handler { p::UnregisterMessage::Producer | p::UnregisterMessage::Consumer => { let mut messages = self .listeners - .iter() - .map(|listener| { + .keys() + .map(|listener_id| { ( - listener.to_string(), + listener_id.to_string(), p::OutgoingMessage::Unregistered(answer.clone()), ) }) @@ -106,34 +117,46 @@ impl Handler { p::IncomingMessage::StartSession(message) => { self.start_session(&message.peer_id, peer_id) } - p::IncomingMessage::Peer(p::PeerMessage { - peer_id: other_peer_id, - peer_message, - }) => match peer_message { - p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - } => self.handle_ice(candidate, sdp_m_line_index, peer_id, &other_peer_id), - p::PeerMessageInner::Sdp(sdp_message) => match sdp_message { - p::SdpMessage::Offer { sdp } => { - self.handle_sdp_offer(sdp, peer_id, &other_peer_id) - } - p::SdpMessage::Answer { sdp } => { - self.handle_sdp_answer(sdp, peer_id, &other_peer_id) - } - }, - }, + p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), p::IncomingMessage::List => self.list_producers(peer_id), - p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: other_peer_id, - }) => self.end_session(peer_id, &other_peer_id), + p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id), } } + fn handle_peer_message(&mut self, peer_id: &str, peermsg: p::PeerMessage) -> Result<(), Error> { + let session_id = &peermsg.session_id; + let session = self + .sessions + .get(session_id) + .context(format!("Session {} doesn't exist", session_id))? + .clone(); + + if matches!( + peermsg.peer_message, + p::PeerMessageInner::Sdp(p::SdpMessage::Offer { .. }) + ) { + if peer_id == session.consumer { + bail!( + r#"cannot forward offer from "{peer_id}" to "{}" as "{peer_id}" is not the producer"#, + session.producer, + ); + } + } + + self.items.push_back(( + session.other_peer_id(peer_id)?.to_owned(), + p::OutgoingMessage::Peer(p::PeerMessage { + session_id: session_id.to_string(), + peer_message: peermsg.peer_message.clone(), + }), + )); + + Ok(()) + } + #[instrument(level = "debug", skip(self))] - /// Remove a peer, this can cause sessions to be ended - fn remove_listener_peer(&mut self, peer_id: &str) { - self.listeners.remove(peer_id); + fn remove_listener_peer(&mut self, peer_id: &str) -> Option { + self.listeners.remove(peer_id).unwrap_or(None) } #[instrument(level = "debug", skip(self))] @@ -146,110 +169,82 @@ impl Handler { self.remove_consumer_peer(peer_id); } - fn remove_producer_peer(&mut self, peer_id: &str) { - if let Some(consumers) = self.producers.remove(peer_id) { - for consumer_id in &consumers { - info!(producer_id=%peer_id, consumer_id=%consumer_id, "ended session"); - self.consumers.insert(consumer_id.clone(), None); - self.items.push_back(( - consumer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); + #[instrument(level = "debug", skip(self))] + fn remove_producer_peer(&mut self, peer_id: &str) -> Option { + let sessions_to_end = self + .sessions + .iter() + .filter_map(|(session_id, session)| { + if session.producer == peer_id { + Some(session_id.clone()) + } else { + None + } + }) + .collect::>(); + + sessions_to_end.iter().for_each(|session_id| { + if let Err(e) = self.end_session(peer_id, session_id) { + error!("Could not end session {session_id}: {e:?}"); } + }); - for listener in &self.listeners { - self.items.push_back(( - listener.to_string(), - p::OutgoingMessage::ProducerRemoved { - peer_id: peer_id.to_string(), - meta: match self.meta.get(peer_id) { - Some(meta) => meta.clone(), - None => Default::default(), - }, - }, - )); - } - } - } - - fn remove_consumer_peer(&mut self, peer_id: &str) { - if let Some(Some(producer_id)) = self.consumers.remove(peer_id) { - info!(producer_id=%producer_id, consumer_id=%peer_id, "ended session"); - - self.producers - .get_mut(&producer_id) - .unwrap() - .remove(peer_id); + let meta = self.producers.remove(peer_id); + for listener in self.listeners.keys() { self.items.push_back(( - producer_id.to_string(), - p::OutgoingMessage::EndSession { + listener.to_string(), + p::OutgoingMessage::ProducerRemoved { peer_id: peer_id.to_string(), + meta: meta + .as_ref() + .map_or_else(Default::default, |meta| meta.clone()), }, )); } - let _ = self.meta.remove(peer_id); + meta.unwrap_or(None) + } + + #[instrument(level = "debug", skip(self))] + fn remove_consumer_peer(&mut self, peer_id: &str) -> Option { + let sessions_to_end = self + .sessions + .iter() + .filter_map(|(session_id, session)| { + if session.consumer == peer_id { + Some(session_id.clone()) + } else { + None + } + }) + .collect::>(); + + sessions_to_end.iter().for_each(|session_id| { + if let Err(e) = self.end_session(peer_id, session_id) { + error!("Could not end session {session_id}: {e:?}"); + } + }); + + self.consumers.remove(peer_id).unwrap_or(None) } #[instrument(level = "debug", skip(self))] /// End a session between two peers - fn end_session(&mut self, peer_id: &str, other_peer_id: &str) -> Result<(), Error> { - info!(peer_id=%peer_id, other_peer_id=%other_peer_id, "endsession request"); - if let Some(ref mut consumers) = self.producers.get_mut(peer_id) { - if consumers.remove(other_peer_id) { - info!(producer_id=%peer_id, consumer_id=%other_peer_id, "ended session"); + fn end_session(&mut self, peer_id: &str, session_id: &str) -> Result<(), Error> { + let session = self + .sessions + .remove(session_id) + .with_context(|| format!("Session {session_id} doesn't exist"))?; - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); + self.items.push_back(( + session.other_peer_id(peer_id)?.to_string(), + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.to_string(), + }), + )); - self.consumers.insert(other_peer_id.to_string(), None); - Ok(()) - } else { - Err(anyhow!( - "Producer {} has no consumer {}", - peer_id, - other_peer_id - )) - } - } else if let Some(Some(producer_id)) = self.consumers.get(peer_id) { - if producer_id == other_peer_id { - info!(producer_id=%other_peer_id, consumer_id=%peer_id, "ended session"); - - self.consumers.insert(peer_id.to_string(), None); - self.producers - .get_mut(other_peer_id) - .unwrap() - .remove(peer_id); - - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); - - Ok(()) - } else { - Err(anyhow!( - "Consumer {} is not in a session with {}", - peer_id, - other_peer_id - )) - } - } else { - Err(anyhow!( - "No session between {} and {}", - peer_id, - other_peer_id - )) - } + Ok(()) } /// List producer peers @@ -260,14 +255,10 @@ impl Handler { p::OutgoingMessage::List { producers: self .producers - .keys() - .cloned() - .map(|peer_id| p::Peer { + .iter() + .map(|(peer_id, meta)| p::Peer { id: peer_id.clone(), - meta: self - .meta - .get(&peer_id) - .map_or_else(|| Default::default(), |m| m.clone()), + meta: meta.clone(), }) .collect(), }, @@ -276,134 +267,6 @@ impl Handler { Ok(()) } - /// Handle ICE candidate sent by one peer to another peer - #[instrument(level = "debug", skip(self))] - fn handle_ice( - &mut self, - candidate: String, - sdp_m_line_index: u32, - peer_id: &str, - other_peer_id: &str, - ) -> Result<(), Error> { - if let Some(consumers) = self.producers.get(peer_id) { - if consumers.contains(other_peer_id) { - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), - peer_message: p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - }, - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id - )) - } - } else if let Some(producer) = self.consumers.get(peer_id) { - if &Some(other_peer_id.to_string()) == producer { - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), - peer_message: p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - }, - }), - )); - - Ok(()) - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id - )) - } - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id, - )) - } - } - - /// Handle SDP offered by one peer to another peer - #[instrument(level = "debug", skip(self))] - fn handle_sdp_offer( - &mut self, - sdp: String, - producer_id: &str, - consumer_id: &str, - ) -> Result<(), Error> { - if let Some(consumers) = self.producers.get(producer_id) { - if consumers.contains(consumer_id) { - self.items.push_back(( - consumer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: producer_id.to_string(), - peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp }), - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward offer from {} to {} as they are not in a session", - producer_id, - consumer_id - )) - } - } else { - Err(anyhow!( - "cannot forward offer from {} to {} as they are not in a session or {} is not the producer", - producer_id, - consumer_id, - producer_id, - )) - } - } - - /// Handle the SDP answer from one peer to another peer - #[instrument(level = "debug", skip(self))] - fn handle_sdp_answer( - &mut self, - sdp: String, - consumer_id: &str, - producer_id: &str, - ) -> Result<(), Error> { - if let Some(producer) = self.consumers.get(consumer_id) { - if &Some(producer_id.to_string()) == producer { - self.items.push_back(( - producer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: consumer_id.to_string(), - peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }), - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward answer from {} to {} as they are not in a session", - consumer_id, - producer_id - )) - } - } else { - Err(anyhow!( - "cannot forward answer from {} to {} as they are not in a session", - consumer_id, - producer_id - )) - } - } - /// Register peer as a producer #[instrument(level = "debug", skip(self))] fn register_producer( @@ -414,9 +277,9 @@ impl Handler { if self.producers.contains_key(peer_id) { Err(anyhow!("{} is already registered as a producer", peer_id)) } else { - self.producers.insert(peer_id.to_string(), HashSet::new()); + self.producers.insert(peer_id.to_string(), meta.clone()); - for listener in &self.listeners { + for listener in self.listeners.keys() { self.items.push_back(( listener.to_string(), p::OutgoingMessage::ProducerAdded { @@ -430,12 +293,10 @@ impl Handler { peer_id.to_string(), p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { peer_id: peer_id.to_string(), - meta: meta.clone(), + meta: meta, }), )); - self.meta.insert(peer_id.to_string(), meta); - info!(peer_id = %peer_id, "registered as a producer"); Ok(()) @@ -452,7 +313,7 @@ impl Handler { if self.consumers.contains_key(peer_id) { Err(anyhow!("{} is already registered as a consumer", peer_id)) } else { - self.consumers.insert(peer_id.to_string(), None); + self.consumers.insert(peer_id.to_string(), meta.clone()); self.items.push_back(( peer_id.to_string(), @@ -462,8 +323,6 @@ impl Handler { }), )); - self.meta.insert(peer_id.to_string(), meta); - info!(peer_id = %peer_id, "registered as a consumer"); Ok(()) @@ -477,23 +336,22 @@ impl Handler { peer_id: &str, meta: Option, ) -> Result<(), Error> { - if !self.listeners.insert(peer_id.to_string()) { - Err(anyhow!("{} is already registered as a listener", peer_id)) - } else { - self.items.push_back(( - peer_id.to_string(), - p::OutgoingMessage::Registered(p::RegisteredMessage::Listener { - peer_id: peer_id.to_string(), - meta: meta.clone(), - }), - )); - - self.meta.insert(peer_id.to_string(), meta); - - info!(peer_id = %peer_id, "registered as a listener"); - - Ok(()) + if self.listeners.contains_key(peer_id) { + bail!("{} is already registered as a listener", peer_id); } + + self.listeners.insert(peer_id.to_string(), meta.clone()); + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::Registered(p::RegisteredMessage::Listener { + peer_id: peer_id.to_string(), + meta: meta.clone(), + }), + )); + + info!(peer_id = %peer_id, "registered as a listener"); + + Ok(()) } /// Start a session between two peers @@ -506,14 +364,6 @@ impl Handler { )); } - if let Some(producer_id) = self.consumers.get(consumer_id).unwrap() { - return Err(anyhow!( - "Consumer with id {} is already in a session with producer {}", - consumer_id, - producer_id, - )); - } - if !self.producers.contains_key(producer_id) { return Err(anyhow!( "Peer with id {} is not registered as a producer", @@ -521,21 +371,31 @@ impl Handler { )); } - self.consumers - .insert(consumer_id.to_string(), Some(producer_id.to_string())); - self.producers - .get_mut(producer_id) - .unwrap() - .insert(consumer_id.to_string()); - + let session_id = uuid::Uuid::new_v4().to_string(); + self.sessions.insert( + session_id.clone(), + Session { + id: session_id.clone(), + consumer: consumer_id.to_string(), + producer: producer_id.to_string(), + }, + ); + self.items.push_back(( + consumer_id.to_string(), + p::OutgoingMessage::SessionStarted { + peer_id: producer_id.to_string(), + session_id: session_id.clone(), + }, + )); self.items.push_back(( producer_id.to_string(), p::OutgoingMessage::StartSession { peer_id: consumer_id.to_string(), + session_id: session_id.clone(), }, )); - info!(producer_id = %producer_id, consumer_id = %consumer_id, "started a session"); + info!(id = %session_id, producer_id = %producer_id, consumer_id = %consumer_id, "started a session"); Ok(()) } @@ -544,7 +404,7 @@ impl Handler { impl Stream for Handler { type Item = (String, p::OutgoingMessage); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { loop { let this = self.as_mut().project(); @@ -633,7 +493,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::List { - producers: vec![p::Peer { + producers: vec![p::Peer::Producer { id: "producer".to_string(), meta: Some(json!( {"display-name": "foobar".to_string() @@ -762,12 +622,25 @@ mod tests { .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "producer"); assert_eq!( sent_message, p::OutgoingMessage::StartSession { - peer_id: "consumer".to_string() + peer_id: "consumer".to_string(), + session_id: session_id.to_string(), } ); } @@ -799,6 +672,19 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Register(p::RegisterMessage::Listener { @@ -815,9 +701,7 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) ); let (peer_id, sent_message) = handler.next().await.unwrap(); @@ -859,11 +743,25 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), }); + tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); @@ -872,9 +770,9 @@ mod tests { assert_eq!(peer_id, "producer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "consumer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id + }) ); } @@ -905,10 +803,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), }); tx.send(("producer".to_string(), Some(message))) .await @@ -918,9 +829,7 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) ); } @@ -951,29 +860,52 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); + // The consumer ends the session let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), }); - tx.send(("producer".to_string(), Some(message))) + tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), - }); - tx.send(("producer".to_string(), Some(message))) - .await - .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone() + }) + ); + + let message = p::IncomingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, p::OutgoingMessage::Error { - details: "Producer producer has no consumer consumer".into() + details: format!("Session {session_id} doesn't exist") } ); } @@ -1005,10 +937,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string(), }), @@ -1022,7 +967,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string() }) @@ -1057,10 +1002,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42, @@ -1075,7 +1033,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42 @@ -1084,7 +1042,7 @@ mod tests { ); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42, @@ -1099,7 +1057,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42 @@ -1135,10 +1093,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id, peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string(), }), @@ -1148,10 +1119,10 @@ mod tests { .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); - assert_eq!(peer_id, "consumer"); + // assert_eq!(peer_id, "consumer"); assert_eq!(sent_message, p::OutgoingMessage::Error { - details: "cannot forward offer from consumer to producer as they are not in a session or consumer is not the producer".into() + details: r#"cannot forward offer from "consumer" to "producer" as "consumer" is not the producer"#.into() } ); } @@ -1213,6 +1184,18 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; let (peer_id, sent_message) = handler.next().await.unwrap(); @@ -1220,7 +1203,8 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::StartSession { - peer_id: "consumer".to_string() + peer_id: "consumer".to_string(), + session_id: session_id.clone(), } ); @@ -1234,9 +1218,9 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone(), + }) ); let (peer_id, sent_message) = handler.next().await.unwrap(); @@ -1300,13 +1284,27 @@ mod tests { .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "producer"); assert_eq!( sent_message, p::OutgoingMessage::StartSession { - peer_id: "consumer".to_string() + peer_id: "consumer".to_string(), + session_id: session_id.clone(), } ); @@ -1320,9 +1318,9 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone(), + }) ); let (peer_id, sent_message) = handler.next().await.unwrap(); @@ -1411,6 +1409,19 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session0_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::StartSession(p::StartSessionMessage { @@ -1421,14 +1432,18 @@ mod tests { .await .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); - assert_eq!(peer_id, "consumer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Error { - details: "Consumer with id consumer is already in a session with producer producer" - .into() + let session1_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() } - ); + _ => panic!("SessionStarted message missing"), + }; + + assert_ne!(session0_id, session1_id); } } diff --git a/www/webrtc.js b/www/webrtc.js index d003ccf8..e9f905d7 100644 --- a/www/webrtc.js +++ b/www/webrtc.js @@ -47,6 +47,7 @@ function Uint8ToString(u8a){ } function Session(our_id, peer_id, closed_callback) { + this.id = null; this.peer_connection = null; this.ws_conn = null; this.peer_id = peer_id; @@ -113,7 +114,7 @@ function Session(our_id, peer_id, closed_callback) { this.setStatus("Sending SDP answer"); var sdp = { 'type': 'peer', - 'peerId': this.peer_id, + 'sessionId': this.id, 'sdp': this.peer_connection.localDescription.toJSON() }; this.ws_conn.send(JSON.stringify(sdp)); @@ -164,6 +165,9 @@ function Session(our_id, peer_id, closed_callback) { if (msg.type == "registered") { this.setStatus("Registered with server"); this.connectPeer(); + } else if (msg.type == "sessionStarted") { + this.setStatus("Registered with server"); + this.id = msg.sessionId; } else if (msg.type == "error") { this.handleIncomingError(msg.details); } else if (msg.type == "endSession") { @@ -315,7 +319,7 @@ function Session(our_id, peer_id, closed_callback) { } this.ws_conn.send(JSON.stringify({ "type": "peer", - "peerId": this.peer_id, + "sessionId": this.id, "ice": event.candidate.toJSON() })); };