diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 33179c73..b0cb68b2 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -505,16 +505,16 @@ struct State { discoveries: HashMap>, navigation_handler: Option, control_events_handler: Option, - mids: HashMap, signaller_signals: Option, finalizing_sessions: Arc<(Mutex>, Condvar)>, #[cfg(feature = "web_server")] web_shutdown_tx: Option>, #[cfg(feature = "web_server")] web_join_handle: Option>, + session_mids: HashMap>, } -fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { +fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) { let event: Result = serde_json::from_str(msg); if let Ok(event) = event { @@ -524,14 +524,18 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { let this = sink.imp(); let state = this.state.lock().unwrap(); - if let Some(stream_name) = state.mids.get(&mid) { - if let Some(stream) = state.streams.get(stream_name) { - let event = gst::event::Navigation::new(event.event.structure()); + let Some(stream) = state + .session_mids + .get(session_id) + .and_then(|mids| mids.get(&mid)) + .and_then(|name| state.streams.get(name)) + else { + return; + }; + let event = gst::event::Navigation::new(event.event.structure()); - if !stream.sink_pad.push_event(event.clone()) { - gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); - } - } + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); } } else { let this = sink.imp(); @@ -600,6 +604,7 @@ fn deserialize_serde_object(obj: &serde_json::Value, name: &str) -> Result Result { let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?; @@ -623,14 +628,18 @@ fn handle_control_event( let this = sink.imp(); let state = this.state.lock().unwrap(); - if let Some(stream_name) = state.mids.get(&mid) { - if let Some(stream) = state.streams.get(stream_name) { - if !stream.sink_pad.push_event(event.clone()) { - gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); - } else { - ret = true; - } - } + let Some(stream) = state + .session_mids + .get(session_id) + .and_then(|mids| mids.get(&mid)) + .and_then(|name| state.streams.get(name)) + else { + return Err(anyhow!("No relevant stream to forward event to")); + }; + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); + } else { + ret = true; } } else { let this = sink.imp(); @@ -736,13 +745,13 @@ impl Default for State { discoveries: HashMap::new(), navigation_handler: None, control_events_handler: None, - mids: HashMap::new(), signaller_signals: Default::default(), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), #[cfg(feature = "web_server")] web_shutdown_tx: None, #[cfg(feature = "web_server")] web_join_handle: None, + session_mids: HashMap::new(), } } } @@ -1728,7 +1737,7 @@ impl InputStream { } impl NavigationEventHandler { - fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { + fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, session_id: &str) -> Self { gst::info!(CAT, obj = element, "Creating navigation data channel"); let channel = webrtcbin.emit_by_name::( "create-data-channel", @@ -1740,6 +1749,8 @@ impl NavigationEventHandler { ], ); + let session_id = session_id.to_string(); + Self(( channel.connect_closure( "on-message-string", @@ -1747,8 +1758,10 @@ impl NavigationEventHandler { glib::closure!( #[watch] element, + #[strong] + session_id, move |_channel: &WebRTCDataChannel, msg: &str| { - create_navigation_event(element, msg); + create_navigation_event(element, msg, &session_id); } ), ), @@ -1758,7 +1771,7 @@ impl NavigationEventHandler { } impl ControlRequestHandler { - fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { + fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, session_id: &str) -> Self { let channel = webrtcbin.emit_by_name::( "create-data-channel", &[ @@ -1769,6 +1782,8 @@ impl ControlRequestHandler { ], ); + let session_id = session_id.to_string(); + Self(( channel.connect_closure( "on-message-string", @@ -1776,8 +1791,10 @@ impl ControlRequestHandler { glib::closure!( #[watch] element, + #[strong] + session_id, move |channel: &WebRTCDataChannel, msg: &str| { - match handle_control_event(element, msg) { + match handle_control_event(element, msg, &session_id) { Err(err) => { gst::error!(CAT, "Failed to handle control event: {err:?}"); } @@ -3259,35 +3276,34 @@ impl BaseWebRTCSink { let _ = this.remove_session(&session_id_clone, true); } gst::MessageView::StateChanged(state_changed) => { - if state_changed.src() == Some(pipeline.upcast_ref()) { - if state_changed.old() == gst::State::Ready - && state_changed.current() == gst::State::Paused - { - gst::info!( - CAT, - obj = pipeline, - "{peer_id_clone} pipeline reached PAUSED, negotiating" - ); - // We don't connect to on-negotiation-needed, this in order to call the above - // signal without holding the state lock: - // - // Going to Ready triggers synchronous emission of the on-negotiation-needed - // signal, during which time the application may add a data channel, causing - // renegotiation, which we do not support at this time. - // - // This is completely safe, as we know that by now all conditions are gathered: - // webrtcbin is in the Paused state, and all its transceivers have codec_preferences. - this.negotiate(&session_id_clone, offer_clone.as_ref()); + if state_changed.src() == Some(pipeline.upcast_ref()) + && state_changed.old() == gst::State::Ready + && state_changed.current() == gst::State::Paused + { + gst::info!( + CAT, + obj = pipeline, + "{peer_id_clone} pipeline reached PAUSED, negotiating" + ); + // We don't connect to on-negotiation-needed, this in order to call the above + // signal without holding the state lock: + // + // Going to Ready triggers synchronous emission of the on-negotiation-needed + // signal, during which time the application may add a data channel, causing + // renegotiation, which we do not support at this time. + // + // This is completely safe, as we know that by now all conditions are gathered: + // webrtcbin is in the Paused state, and all its transceivers have codec_preferences. + this.negotiate(&session_id_clone, offer_clone.as_ref()); - if let Err(err) = pipeline.set_state(gst::State::Playing) { - gst::warning!( - CAT, - obj = element, - "Failed to bring {peer_id_clone} pipeline to PLAYING: {}", - err - ); - let _ = this.remove_session(&session_id_clone, true); - } + if let Err(err) = pipeline.set_state(gst::State::Playing) { + gst::warning!( + CAT, + obj = element, + "Failed to bring {peer_id_clone} pipeline to PLAYING: {}", + err + ); + let _ = this.remove_session(&session_id_clone, true); } } } @@ -3398,14 +3414,20 @@ impl BaseWebRTCSink { if enable_data_channel_navigation { let mut state = this.state.lock().unwrap(); - state.navigation_handler = - Some(NavigationEventHandler::new(&element, &webrtcbin)); + state.navigation_handler = Some(NavigationEventHandler::new( + &element, + &webrtcbin, + &session_id, + )); } if enable_control_data_channel { let mut state = this.state.lock().unwrap(); - state.control_events_handler = - Some(ControlRequestHandler::new(&element, &webrtcbin)); + state.control_events_handler = Some(ControlRequestHandler::new( + &element, + &webrtcbin, + &session_id, + )); } // This is intentionally emitted with the pipeline in the Ready state, @@ -3564,7 +3586,11 @@ impl BaseWebRTCSink { }; if let Some(mid) = transceiver.mid() { - state.mids.insert(mid.to_string(), stream_name.clone()); + state + .session_mids + .entry(session_id.clone()) + .or_default() + .insert(mid.to_string(), stream_name.clone()); } if let Some(producer) = state