diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index dd365a3b..67590b92 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -15,6 +15,7 @@ use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription}; use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; use reqwest::StatusCode; +use std::collections::HashMap; use std::sync::LazyLock; use std::sync::Mutex; @@ -631,7 +632,7 @@ struct WhipServerSettings { timeout: u32, shutdown_signal: Option>, server_handle: Option>, - sdp_answer: Option>>, + sdp_answer: HashMap>>, } impl Default for WhipServerSettings { @@ -643,7 +644,7 @@ impl Default for WhipServerSettings { timeout: DEFAULT_TIMEOUT, shutdown_signal: None, server_handle: None, - sdp_answer: None, + sdp_answer: HashMap::new(), } } } @@ -651,19 +652,20 @@ impl Default for WhipServerSettings { #[derive(Default)] pub struct WhipServer { settings: Mutex, - canceller: Mutex>, } impl WhipServer { pub fn on_webrtcbin_ready(&self) -> RustClosure { glib::closure!(|signaller: &super::WhipServerSignaller, - _producer_identifier: &str, + session_id: &str, webrtcbin: &gst::Element| { webrtcbin.connect_notify( Some("ice-gathering-state"), glib::clone!( #[weak] signaller, + #[to_owned] + session_id, move |webrtcbin, _pspec| { let state = webrtcbin.property::("ice-gathering-state"); @@ -673,7 +675,11 @@ impl WhipServer { gst::info!(CAT, obj = signaller, "ICE gathering started"); } WebRTCICEGatheringState::Complete => { - gst::info!(CAT, obj = signaller, "ICE gathering complete"); + gst::info!( + CAT, + obj = signaller, + "ICE gathering complete for {session_id}" + ); let ans: Option; let mut settings = signaller.imp().settings.lock().unwrap(); if let Some(answer_desc) = webrtcbin @@ -687,7 +693,7 @@ impl WhipServer { } let tx = settings .sdp_answer - .take() + .remove(&session_id) .expect("SDP answer Sender needs to be valid"); RUNTIME.spawn(glib::clone!( @@ -796,7 +802,7 @@ impl WhipServer { let wait_timeout = { let mut settings = self.settings.lock().unwrap(); let wait_timeout = settings.timeout; - settings.sdp_answer = Some(tx); + settings.sdp_answer.insert(session_id.clone(), tx); drop(settings); wait_timeout }; @@ -821,7 +827,10 @@ impl WhipServer { } } - let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await; + // We don't support cancellation, instead we depend on the server shutdown oneshot and wait + // timeout here. FIXME: Maybe we can do better? + let canceller = Mutex::new(None); + let result = wait_async(&canceller, rx.recv(), wait_timeout).await; let answer = match result { Ok(ans) => match ans {