diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 1be780d9..42b36474 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -15,14 +15,14 @@ use anyhow::{anyhow, Error}; use once_cell::sync::Lazy; use std::collections::HashMap; use std::ops::Mul; -use std::sync::Mutex; +use std::sync::{Arc, Condvar, Mutex}; use super::homegrown_cc::CongestionController; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::RUNTIME; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -219,6 +219,7 @@ struct State { mids: HashMap, signaller_signals: Option, stats_collection_handle: Option>, + finalizing_sessions: Arc<(Mutex>, Condvar)>, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { @@ -334,6 +335,7 @@ impl Default for State { mids: HashMap::new(), signaller_signals: Default::default(), stats_collection_handle: None, + finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), } } } @@ -790,8 +792,18 @@ impl State { session.links.remove(ssrc); } - session.pipeline.call_async(|pipeline| { + let finalizing_sessions = self.finalizing_sessions.clone(); + let session_id = session.id.clone(); + let (sessions, _cvar) = &*finalizing_sessions; + sessions.lock().unwrap().insert(session_id.clone()); + + session.pipeline.call_async(move |pipeline| { let _ = pipeline.set_state(gst::State::Null); + + let (sessions, cvar) = &*finalizing_sessions; + let mut sessions = sessions.lock().unwrap(); + sessions.remove(&session_id); + cvar.notify_one(); }); } @@ -1425,6 +1437,14 @@ impl BaseWebRTCSink { gst::info!(CAT, "Stopped signaller"); } + let finalizing_sessions = self.state.lock().unwrap().finalizing_sessions.clone(); + + let (sessions, cvar) = &*finalizing_sessions; + let mut sessions = sessions.lock().unwrap(); + while !sessions.is_empty() { + sessions = cvar.wait(sessions).unwrap(); + } + Ok(()) }