webrtcsink: adapt commit "abort stats collection before stopping the Signaller"

Adapt a commit [1] that was introduced as part of the forward port of the MR
'add signal "request-encoded-filter"' [2].

The deadlock said commit was fixing doesn't happen on main branch due to
changes in the element design: the Sessions are no longer aborted with the
element `State` held. However, we want to ensure the stats collection task
is terminated when the `webrtcbin` element returns from the Ready to Null
transition, meaning that the related resources are released.

[1]: gstreamer/gst-plugins-rs!1176 (0e6b9df9)
[2]: gstreamer/gst-plugins-rs!1176

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1222>
This commit is contained in:
François Laignel 2023-05-24 21:21:14 +02:00
parent e3c46b40a0
commit e62e9f5bd4

View file

@ -173,6 +173,8 @@ struct Session {
// When not None, constructed from offer SDP
codecs: Option<BTreeMap<i32, Codec>>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@ -218,7 +220,6 @@ struct State {
navigation_handler: Option<NavigationEventHandler>,
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
}
@ -334,7 +335,6 @@ impl Default for State {
navigation_handler: None,
mids: HashMap::new(),
signaller_signals: Default::default(),
stats_collection_handle: None,
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
}
}
@ -792,18 +792,27 @@ impl State {
session.links.remove(ssrc);
}
let stats_collection_handle = session.stats_collection_handle.take();
let finalizing_sessions = self.finalizing_sessions.clone();
let session_id = session.id.clone();
let (sessions, _cvar) = &*finalizing_sessions;
sessions.lock().unwrap().insert(session_id.clone());
session.pipeline.call_async(move |pipeline| {
if let Some(stats_collection_handle) = stats_collection_handle {
stats_collection_handle.abort();
let _ = RUNTIME.block_on(stats_collection_handle);
}
let _ = pipeline.set_state(gst::State::Null);
let (sessions, cvar) = &*finalizing_sessions;
let mut sessions = sessions.lock().unwrap();
sessions.remove(&session_id);
cvar.notify_one();
gst::debug!(CAT, "Session {session_id} ended");
});
}
@ -849,6 +858,7 @@ impl Session {
links: HashMap::new(),
stats_sigid: None,
codecs: None,
stats_collection_handle: None,
}
}
@ -1420,14 +1430,7 @@ impl BaseWebRTCSink {
state.signaller_state = SignallerState::Stopped;
}
let stats_collection_handle = state.stats_collection_handle.take();
drop(state);
if let Some(stats_collection_handle) = stats_collection_handle {
stats_collection_handle.abort();
let _ = RUNTIME.block_on(stats_collection_handle);
}
for session in sessions {
signaller.end_session(&session.id);
}
@ -2593,7 +2596,7 @@ impl BaseWebRTCSink {
let element_clone = element.downgrade();
let webrtcbin = session.webrtcbin.downgrade();
let session_id_clone = session_id.clone();
state.stats_collection_handle = Some(RUNTIME.spawn(async move {
session.stats_collection_handle = Some(RUNTIME.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
loop {