mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-24 18:08:15 +00:00
net/webrtcsink: don't miss ice candidates
During `on_remote_description_set()` processing, current session is removed from the sessions `HashMap`. If an ice candidate is submitted to `handle_ice()` by that time, the session can't be found and the candidate is ignored. This commit wraps the Session in the sessions `HashMap` so an entry is kept while `on_remote_description_set()` is running. Incoming candidates received by `handle_ice()` will be processed immediately or enqueued and handled when the session is restored by `on_remote_description_set()`. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1387>
This commit is contained in:
parent
19597b3737
commit
3627e52673
1 changed files with 170 additions and 19 deletions
|
@ -238,11 +238,144 @@ struct NavigationEvent {
|
||||||
event: gst_video::NavigationEvent,
|
event: gst_video::NavigationEvent,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct IceCandidate {
|
||||||
|
sdp_m_line_index: u32,
|
||||||
|
candidate: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper around `Session`.
|
||||||
|
///
|
||||||
|
/// This makes it possible for the `Session` to be taken out of the `State`,
|
||||||
|
/// without removing the entry in the `sessions` `HashMap`, thus allowing
|
||||||
|
/// the `State` lock to be released, e.g. before calling a `Signal`.
|
||||||
|
///
|
||||||
|
/// Taking the `Session`, replaces it with a placeholder which can enqueue
|
||||||
|
/// items (currently ICE candidates) received while the `Session` is taken.
|
||||||
|
/// In which case, the enqueued items will be processed when the `Session` is
|
||||||
|
/// restored.
|
||||||
|
enum SessionWrapper {
|
||||||
|
/// The `Session` is available in the `SessionWrapper`.
|
||||||
|
InPlace(Session),
|
||||||
|
/// The `Session` was taken out the `SessionWrapper`.
|
||||||
|
Taken(Vec<IceCandidate>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionWrapper {
|
||||||
|
/// Unwraps a reference to the `Session` of this `SessionWrapper`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics is the `Session` was taken.
|
||||||
|
fn unwrap(&self) -> &Session {
|
||||||
|
match self {
|
||||||
|
SessionWrapper::InPlace(session) => session,
|
||||||
|
_ => panic!("Session is not In Place"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unwraps a mutable reference to the `Session` of this `SessionWrapper`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics is the `Session` was taken.
|
||||||
|
fn unwrap_mut(&mut self) -> &mut Session {
|
||||||
|
match self {
|
||||||
|
SessionWrapper::InPlace(session) => session,
|
||||||
|
_ => panic!("Session is not In Place"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes the `SessionWrapper`, returning the wrapped `Session`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics is the `Session` was taken.
|
||||||
|
fn into_inner(self) -> Session {
|
||||||
|
match self {
|
||||||
|
SessionWrapper::InPlace(session) => session,
|
||||||
|
_ => panic!("Session is not In Place"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes the `Session` out of this `SessionWrapper`, leaving it in the `Taken` state.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics is the `Session` was taken.
|
||||||
|
fn take(&mut self) -> Session {
|
||||||
|
use SessionWrapper::*;
|
||||||
|
match std::mem::replace(self, Taken(Vec::new())) {
|
||||||
|
InPlace(session) => session,
|
||||||
|
_ => panic!("Session is not In Place"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restores a `Session` to this `SessionWrapper`.
|
||||||
|
///
|
||||||
|
/// Processes any pending items enqueued while the `Session` was taken.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics is the `Session` is already in place.
|
||||||
|
fn restore(&mut self, session: Session) {
|
||||||
|
let cands = if let SessionWrapper::Taken(ref cands) = self {
|
||||||
|
cands
|
||||||
|
} else {
|
||||||
|
panic!("Session is already in place");
|
||||||
|
};
|
||||||
|
|
||||||
|
if !cands.is_empty() {
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
"handling {} pending ice candidates for session {}",
|
||||||
|
cands.len(),
|
||||||
|
session.id,
|
||||||
|
);
|
||||||
|
for cand in cands {
|
||||||
|
session.webrtcbin.emit_by_name::<()>(
|
||||||
|
"add-ice-candidate",
|
||||||
|
&[&cand.sdp_m_line_index, &cand.candidate],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*self = SessionWrapper::InPlace(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an ICE candidate to this `SessionWrapper`.
|
||||||
|
///
|
||||||
|
/// If the `Session` is in place, the ICE candidate is added immediately,
|
||||||
|
/// otherwise, it will be added when the `Session` is restored.
|
||||||
|
fn add_ice_candidate(&mut self, session_id: &str, sdp_m_line_index: u32, candidate: &str) {
|
||||||
|
match self {
|
||||||
|
SessionWrapper::InPlace(session) => {
|
||||||
|
gst::trace!(CAT, "adding ice candidate for session {session_id}");
|
||||||
|
session
|
||||||
|
.webrtcbin
|
||||||
|
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
|
||||||
|
}
|
||||||
|
SessionWrapper::Taken(cands) => {
|
||||||
|
gst::trace!(CAT, "queuing ice candidate for session {session_id}");
|
||||||
|
cands.push(IceCandidate {
|
||||||
|
sdp_m_line_index,
|
||||||
|
candidate: candidate.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Session> for SessionWrapper {
|
||||||
|
fn from(session: Session) -> Self {
|
||||||
|
SessionWrapper::InPlace(session)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Our internal state */
|
/* Our internal state */
|
||||||
struct State {
|
struct State {
|
||||||
signaller: Box<dyn super::SignallableObject>,
|
signaller: Box<dyn super::SignallableObject>,
|
||||||
signaller_state: SignallerState,
|
signaller_state: SignallerState,
|
||||||
sessions: HashMap<String, Session>,
|
sessions: HashMap<String, SessionWrapper>,
|
||||||
codecs: BTreeMap<i32, Codec>,
|
codecs: BTreeMap<i32, Codec>,
|
||||||
/// Used to abort codec discovery
|
/// Used to abort codec discovery
|
||||||
codecs_abort_handle: Option<futures::future::AbortHandle>,
|
codecs_abort_handle: Option<futures::future::AbortHandle>,
|
||||||
|
@ -890,7 +1023,8 @@ impl State {
|
||||||
session_id: &str,
|
session_id: &str,
|
||||||
signal: bool,
|
signal: bool,
|
||||||
) -> Option<Session> {
|
) -> Option<Session> {
|
||||||
if let Some(mut session) = self.sessions.remove(session_id) {
|
if let Some(session) = self.sessions.remove(session_id) {
|
||||||
|
let mut session = session.into_inner();
|
||||||
self.finalize_session(element, &mut session, signal);
|
self.finalize_session(element, &mut session, signal);
|
||||||
Some(session)
|
Some(session)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1418,6 +1552,7 @@ impl WebRTCSink {
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get(session_id) {
|
if let Some(session) = state.sessions.get(session_id) {
|
||||||
session
|
session
|
||||||
|
.unwrap()
|
||||||
.webrtcbin
|
.webrtcbin
|
||||||
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
|
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
|
||||||
|
|
||||||
|
@ -1491,6 +1626,7 @@ impl WebRTCSink {
|
||||||
});
|
});
|
||||||
|
|
||||||
session
|
session
|
||||||
|
.unwrap()
|
||||||
.webrtcbin
|
.webrtcbin
|
||||||
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
|
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1730,7 +1866,7 @@ impl WebRTCSink {
|
||||||
let state = this.state.lock().unwrap();
|
let state = this.state.lock().unwrap();
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get(&session_id_clone) {
|
if let Some(session) = state.sessions.get(&session_id_clone) {
|
||||||
for webrtc_pad in session.webrtc_pads.values() {
|
for webrtc_pad in session.unwrap().webrtc_pads.values() {
|
||||||
if let Some(srcpad) = webrtc_pad.pad.peer() {
|
if let Some(srcpad) = webrtc_pad.pad.peer() {
|
||||||
srcpad.send_event(
|
srcpad.send_event(
|
||||||
gst_video::UpstreamForceKeyUnitEvent::builder()
|
gst_video::UpstreamForceKeyUnitEvent::builder()
|
||||||
|
@ -1795,7 +1931,8 @@ impl WebRTCSink {
|
||||||
|
|
||||||
let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?");
|
let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?");
|
||||||
let mut state = element.imp().state.lock().unwrap();
|
let mut state = element.imp().state.lock().unwrap();
|
||||||
if let Some(mut session) = state.sessions.get_mut(&session_id_str) {
|
if let Some(session) = state.sessions.get_mut(&session_id_str) {
|
||||||
|
let session = session.unwrap_mut();
|
||||||
|
|
||||||
if session.stats_sigid.is_none() {
|
if session.stats_sigid.is_none() {
|
||||||
let session_id_str = session_id_str.clone();
|
let session_id_str = session_id_str.clone();
|
||||||
|
@ -1893,7 +2030,9 @@ impl WebRTCSink {
|
||||||
state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin));
|
state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin));
|
||||||
}
|
}
|
||||||
|
|
||||||
state.sessions.insert(session_id.to_string(), session);
|
state
|
||||||
|
.sessions
|
||||||
|
.insert(session_id.to_string(), session.into());
|
||||||
|
|
||||||
drop(state);
|
drop(state);
|
||||||
drop(settings);
|
drop(settings);
|
||||||
|
@ -1953,7 +2092,8 @@ impl WebRTCSink {
|
||||||
stats: &gst::Structure,
|
stats: &gst::Structure,
|
||||||
) {
|
) {
|
||||||
let mut state = element.imp().state.lock().unwrap();
|
let mut state = element.imp().state.lock().unwrap();
|
||||||
if let Some(mut session) = state.sessions.get_mut(session_id) {
|
if let Some(session) = state.sessions.get_mut(session_id) {
|
||||||
|
let session = session.unwrap_mut();
|
||||||
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
|
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
|
||||||
congestion_controller.loss_control(element, stats, &mut session.encoders);
|
congestion_controller.loss_control(element, stats, &mut session.encoders);
|
||||||
}
|
}
|
||||||
|
@ -1973,7 +2113,8 @@ impl WebRTCSink {
|
||||||
if let Ok(Some(stats)) = reply {
|
if let Ok(Some(stats)) = reply {
|
||||||
|
|
||||||
let mut state = element.imp().state.lock().unwrap();
|
let mut state = element.imp().state.lock().unwrap();
|
||||||
if let Some(mut session) = state.sessions.get_mut(&session_id) {
|
if let Some(session) = state.sessions.get_mut(&session_id) {
|
||||||
|
let session = session.unwrap_mut();
|
||||||
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
|
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
|
||||||
congestion_controller.delay_control(&element, stats, &mut session.encoders,);
|
congestion_controller.delay_control(&element, stats, &mut session.encoders,);
|
||||||
}
|
}
|
||||||
|
@ -1990,7 +2131,7 @@ impl WebRTCSink {
|
||||||
let mut state = element.imp().state.lock().unwrap();
|
let mut state = element.imp().state.lock().unwrap();
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get_mut(peer_id) {
|
if let Some(session) = state.sessions.get_mut(peer_id) {
|
||||||
session.rtprtxsend = Some(rtprtxsend);
|
session.unwrap_mut().rtprtxsend = Some(rtprtxsend);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1999,6 +2140,8 @@ impl WebRTCSink {
|
||||||
let mut state = element.imp().state.lock().unwrap();
|
let mut state = element.imp().state.lock().unwrap();
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get_mut(peer_id) {
|
if let Some(session) = state.sessions.get_mut(peer_id) {
|
||||||
|
let session = session.unwrap_mut();
|
||||||
|
|
||||||
let n_encoders = session.encoders.len();
|
let n_encoders = session.encoders.len();
|
||||||
|
|
||||||
let fec_ratio = {
|
let fec_ratio = {
|
||||||
|
@ -2033,7 +2176,9 @@ impl WebRTCSink {
|
||||||
let mut remove = false;
|
let mut remove = false;
|
||||||
let codecs = state.codecs.clone();
|
let codecs = state.codecs.clone();
|
||||||
|
|
||||||
if let Some(mut session) = state.sessions.remove(&session_id) {
|
if let Some(session) = state.sessions.get_mut(&session_id) {
|
||||||
|
let mut session = session.take();
|
||||||
|
|
||||||
for webrtc_pad in session.webrtc_pads.clone().values() {
|
for webrtc_pad in session.webrtc_pads.clone().values() {
|
||||||
let transceiver = webrtc_pad
|
let transceiver = webrtc_pad
|
||||||
.pad
|
.pad
|
||||||
|
@ -2106,8 +2251,10 @@ impl WebRTCSink {
|
||||||
|
|
||||||
if remove {
|
if remove {
|
||||||
state.finalize_session(element, &mut session, true);
|
state.finalize_session(element, &mut session, true);
|
||||||
|
} else if let Some(session_wrapper) = state.sessions.get_mut(&session.id) {
|
||||||
|
session_wrapper.restore(session);
|
||||||
} else {
|
} else {
|
||||||
state.sessions.insert(session.id.clone(), session);
|
gst::warning!(CAT, "Session {} was removed", session.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2121,15 +2268,12 @@ impl WebRTCSink {
|
||||||
_sdp_mid: Option<String>,
|
_sdp_mid: Option<String>,
|
||||||
candidate: &str,
|
candidate: &str,
|
||||||
) -> Result<(), WebRTCSinkError> {
|
) -> Result<(), WebRTCSinkError> {
|
||||||
let state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?;
|
let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?;
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get(session_id) {
|
if let Some(session_wrapper) = state.sessions.get_mut(session_id) {
|
||||||
gst::trace!(CAT, "adding ice candidate for session {}", session_id);
|
session_wrapper.add_ice_candidate(session_id, sdp_m_line_index, candidate);
|
||||||
session
|
|
||||||
.webrtcbin
|
|
||||||
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()))
|
Err(WebRTCSinkError::NoSessionWithId(session_id.to_string()))
|
||||||
|
@ -2146,6 +2290,8 @@ impl WebRTCSink {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
if let Some(session) = state.sessions.get_mut(session_id) {
|
if let Some(session) = state.sessions.get_mut(session_id) {
|
||||||
|
let session = session.unwrap_mut();
|
||||||
|
|
||||||
let sdp = desc.sdp();
|
let sdp = desc.sdp();
|
||||||
|
|
||||||
session.sdp = Some(sdp.to_owned());
|
session.sdp = Some(sdp.to_owned());
|
||||||
|
@ -2403,7 +2549,7 @@ impl WebRTCSink {
|
||||||
return Err(anyhow!("No caps found for stream {}", name));
|
return Err(anyhow!("No caps found for stream {}", name));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(mut stream) = state.streams.get_mut(&name) {
|
if let Some(stream) = state.streams.get_mut(&name) {
|
||||||
stream.out_caps = Some(caps);
|
stream.out_caps = Some(caps);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2421,7 +2567,12 @@ impl WebRTCSink {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.sessions
|
.sessions
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())),
|
.map(|(name, consumer)| {
|
||||||
|
(
|
||||||
|
name.as_str(),
|
||||||
|
consumer.unwrap().gather_stats().to_send_value(),
|
||||||
|
)
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2448,7 +2599,7 @@ impl WebRTCSink {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.streams
|
.streams
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.for_each(|(_, mut stream)| {
|
.for_each(|(_, stream)| {
|
||||||
if stream.sink_pad.upcast_ref::<gst::Pad>() == pad {
|
if stream.sink_pad.upcast_ref::<gst::Pad>() == pad {
|
||||||
// We do not want VideoInfo to consider max-framerate
|
// We do not want VideoInfo to consider max-framerate
|
||||||
// when computing fps, so we strip it away here
|
// when computing fps, so we strip it away here
|
||||||
|
|
Loading…
Reference in a new issue