webrtcsink: store mids per-session instead of globally

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1730>
This commit is contained in:
Mathieu Duponchelle 2024-08-20 19:06:06 +02:00 committed by Arun Raghavan
parent 16ee51621e
commit 5dc2d56c0e

View file

@ -505,16 +505,16 @@ struct State {
discoveries: HashMap<String, Vec<DiscoveryInfo>>, discoveries: HashMap<String, Vec<DiscoveryInfo>>,
navigation_handler: Option<NavigationEventHandler>, navigation_handler: Option<NavigationEventHandler>,
control_events_handler: Option<ControlRequestHandler>, control_events_handler: Option<ControlRequestHandler>,
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>, signaller_signals: Option<SignallerSignals>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>, finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>, web_shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_join_handle: Option<tokio::task::JoinHandle<()>>, web_join_handle: Option<tokio::task::JoinHandle<()>>,
session_mids: HashMap<String, HashMap<String, String>>,
} }
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) {
let event: Result<NavigationEvent, _> = serde_json::from_str(msg); let event: Result<NavigationEvent, _> = serde_json::from_str(msg);
if let Ok(event) = event { if let Ok(event) = event {
@ -524,14 +524,18 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
let this = sink.imp(); let this = sink.imp();
let state = this.state.lock().unwrap(); let state = this.state.lock().unwrap();
if let Some(stream_name) = state.mids.get(&mid) { let Some(stream) = state
if let Some(stream) = state.streams.get(stream_name) { .session_mids
let event = gst::event::Navigation::new(event.event.structure()); .get(session_id)
.and_then(|mids| mids.get(&mid))
.and_then(|name| state.streams.get(name))
else {
return;
};
let event = gst::event::Navigation::new(event.event.structure());
if !stream.sink_pad.push_event(event.clone()) { if !stream.sink_pad.push_event(event.clone()) {
gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); gst::info!(CAT, obj = sink, "Could not send event: {:?}", event);
}
}
} }
} else { } else {
let this = sink.imp(); let this = sink.imp();
@ -600,6 +604,7 @@ fn deserialize_serde_object(obj: &serde_json::Value, name: &str) -> Result<gst::
fn handle_control_event( fn handle_control_event(
sink: &super::BaseWebRTCSink, sink: &super::BaseWebRTCSink,
msg: &str, msg: &str,
session_id: &str,
) -> Result<utils::ControlResponseMessage, Error> { ) -> Result<utils::ControlResponseMessage, Error> {
let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?; let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?;
@ -623,14 +628,18 @@ fn handle_control_event(
let this = sink.imp(); let this = sink.imp();
let state = this.state.lock().unwrap(); let state = this.state.lock().unwrap();
if let Some(stream_name) = state.mids.get(&mid) { let Some(stream) = state
if let Some(stream) = state.streams.get(stream_name) { .session_mids
if !stream.sink_pad.push_event(event.clone()) { .get(session_id)
gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); .and_then(|mids| mids.get(&mid))
} else { .and_then(|name| state.streams.get(name))
ret = true; else {
} return Err(anyhow!("No relevant stream to forward event to"));
} };
if !stream.sink_pad.push_event(event.clone()) {
gst::info!(CAT, obj = sink, "Could not send event: {:?}", event);
} else {
ret = true;
} }
} else { } else {
let this = sink.imp(); let this = sink.imp();
@ -736,13 +745,13 @@ impl Default for State {
discoveries: HashMap::new(), discoveries: HashMap::new(),
navigation_handler: None, navigation_handler: None,
control_events_handler: None, control_events_handler: None,
mids: HashMap::new(),
signaller_signals: Default::default(), signaller_signals: Default::default(),
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_shutdown_tx: None, web_shutdown_tx: None,
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_join_handle: None, web_join_handle: None,
session_mids: HashMap::new(),
} }
} }
} }
@ -1728,7 +1737,7 @@ impl InputStream {
} }
impl NavigationEventHandler { impl NavigationEventHandler {
fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, session_id: &str) -> Self {
gst::info!(CAT, obj = element, "Creating navigation data channel"); gst::info!(CAT, obj = element, "Creating navigation data channel");
let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>( let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>(
"create-data-channel", "create-data-channel",
@ -1740,6 +1749,8 @@ impl NavigationEventHandler {
], ],
); );
let session_id = session_id.to_string();
Self(( Self((
channel.connect_closure( channel.connect_closure(
"on-message-string", "on-message-string",
@ -1747,8 +1758,10 @@ impl NavigationEventHandler {
glib::closure!( glib::closure!(
#[watch] #[watch]
element, element,
#[strong]
session_id,
move |_channel: &WebRTCDataChannel, msg: &str| { move |_channel: &WebRTCDataChannel, msg: &str| {
create_navigation_event(element, msg); create_navigation_event(element, msg, &session_id);
} }
), ),
), ),
@ -1758,7 +1771,7 @@ impl NavigationEventHandler {
} }
impl ControlRequestHandler { impl ControlRequestHandler {
fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, session_id: &str) -> Self {
let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>( let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>(
"create-data-channel", "create-data-channel",
&[ &[
@ -1769,6 +1782,8 @@ impl ControlRequestHandler {
], ],
); );
let session_id = session_id.to_string();
Self(( Self((
channel.connect_closure( channel.connect_closure(
"on-message-string", "on-message-string",
@ -1776,8 +1791,10 @@ impl ControlRequestHandler {
glib::closure!( glib::closure!(
#[watch] #[watch]
element, element,
#[strong]
session_id,
move |channel: &WebRTCDataChannel, msg: &str| { move |channel: &WebRTCDataChannel, msg: &str| {
match handle_control_event(element, msg) { match handle_control_event(element, msg, &session_id) {
Err(err) => { Err(err) => {
gst::error!(CAT, "Failed to handle control event: {err:?}"); gst::error!(CAT, "Failed to handle control event: {err:?}");
} }
@ -3259,35 +3276,34 @@ impl BaseWebRTCSink {
let _ = this.remove_session(&session_id_clone, true); let _ = this.remove_session(&session_id_clone, true);
} }
gst::MessageView::StateChanged(state_changed) => { gst::MessageView::StateChanged(state_changed) => {
if state_changed.src() == Some(pipeline.upcast_ref()) { if state_changed.src() == Some(pipeline.upcast_ref())
if state_changed.old() == gst::State::Ready && state_changed.old() == gst::State::Ready
&& state_changed.current() == gst::State::Paused && state_changed.current() == gst::State::Paused
{ {
gst::info!( gst::info!(
CAT, CAT,
obj = pipeline, obj = pipeline,
"{peer_id_clone} pipeline reached PAUSED, negotiating" "{peer_id_clone} pipeline reached PAUSED, negotiating"
); );
// We don't connect to on-negotiation-needed, this in order to call the above // We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock: // signal without holding the state lock:
// //
// Going to Ready triggers synchronous emission of the on-negotiation-needed // Going to Ready triggers synchronous emission of the on-negotiation-needed
// signal, during which time the application may add a data channel, causing // signal, during which time the application may add a data channel, causing
// renegotiation, which we do not support at this time. // renegotiation, which we do not support at this time.
// //
// This is completely safe, as we know that by now all conditions are gathered: // This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Paused state, and all its transceivers have codec_preferences. // webrtcbin is in the Paused state, and all its transceivers have codec_preferences.
this.negotiate(&session_id_clone, offer_clone.as_ref()); this.negotiate(&session_id_clone, offer_clone.as_ref());
if let Err(err) = pipeline.set_state(gst::State::Playing) { if let Err(err) = pipeline.set_state(gst::State::Playing) {
gst::warning!( gst::warning!(
CAT, CAT,
obj = element, obj = element,
"Failed to bring {peer_id_clone} pipeline to PLAYING: {}", "Failed to bring {peer_id_clone} pipeline to PLAYING: {}",
err err
); );
let _ = this.remove_session(&session_id_clone, true); let _ = this.remove_session(&session_id_clone, true);
}
} }
} }
} }
@ -3398,14 +3414,20 @@ impl BaseWebRTCSink {
if enable_data_channel_navigation { if enable_data_channel_navigation {
let mut state = this.state.lock().unwrap(); let mut state = this.state.lock().unwrap();
state.navigation_handler = state.navigation_handler = Some(NavigationEventHandler::new(
Some(NavigationEventHandler::new(&element, &webrtcbin)); &element,
&webrtcbin,
&session_id,
));
} }
if enable_control_data_channel { if enable_control_data_channel {
let mut state = this.state.lock().unwrap(); let mut state = this.state.lock().unwrap();
state.control_events_handler = state.control_events_handler = Some(ControlRequestHandler::new(
Some(ControlRequestHandler::new(&element, &webrtcbin)); &element,
&webrtcbin,
&session_id,
));
} }
// This is intentionally emitted with the pipeline in the Ready state, // This is intentionally emitted with the pipeline in the Ready state,
@ -3564,7 +3586,11 @@ impl BaseWebRTCSink {
}; };
if let Some(mid) = transceiver.mid() { if let Some(mid) = transceiver.mid() {
state.mids.insert(mid.to_string(), stream_name.clone()); state
.session_mids
.entry(session_id.clone())
.or_default()
.insert(mid.to_string(), stream_name.clone());
} }
if let Some(producer) = state if let Some(producer) = state