diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index b7394f5c..d119d687 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -173,6 +173,8 @@ struct Session { // When not None, constructed from offer SDP codecs: Option>, + + stats_collection_handle: Option>, } #[derive(Debug, PartialEq, Eq, Copy, Clone)] @@ -218,7 +220,6 @@ struct State { navigation_handler: Option, mids: HashMap, signaller_signals: Option, - stats_collection_handle: Option>, finalizing_sessions: Arc<(Mutex>, Condvar)>, } @@ -334,7 +335,6 @@ impl Default for State { navigation_handler: None, mids: HashMap::new(), signaller_signals: Default::default(), - stats_collection_handle: None, finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), } } @@ -792,18 +792,27 @@ impl State { session.links.remove(ssrc); } + let stats_collection_handle = session.stats_collection_handle.take(); + let finalizing_sessions = self.finalizing_sessions.clone(); let session_id = session.id.clone(); let (sessions, _cvar) = &*finalizing_sessions; sessions.lock().unwrap().insert(session_id.clone()); session.pipeline.call_async(move |pipeline| { + if let Some(stats_collection_handle) = stats_collection_handle { + stats_collection_handle.abort(); + let _ = RUNTIME.block_on(stats_collection_handle); + } + let _ = pipeline.set_state(gst::State::Null); let (sessions, cvar) = &*finalizing_sessions; let mut sessions = sessions.lock().unwrap(); sessions.remove(&session_id); cvar.notify_one(); + + gst::debug!(CAT, "Session {session_id} ended"); }); } @@ -849,6 +858,7 @@ impl Session { links: HashMap::new(), stats_sigid: None, codecs: None, + stats_collection_handle: None, } } @@ -1420,14 +1430,7 @@ impl BaseWebRTCSink { state.signaller_state = SignallerState::Stopped; } - let stats_collection_handle = state.stats_collection_handle.take(); drop(state); - - if let Some(stats_collection_handle) = stats_collection_handle { - stats_collection_handle.abort(); - let _ = RUNTIME.block_on(stats_collection_handle); - } - for session in sessions { signaller.end_session(&session.id); } @@ -2593,7 +2596,7 @@ impl BaseWebRTCSink { let element_clone = element.downgrade(); let webrtcbin = session.webrtcbin.downgrade(); let session_id_clone = session_id.clone(); - state.stats_collection_handle = Some(RUNTIME.spawn(async move { + session.stats_collection_handle = Some(RUNTIME.spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); loop {