diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3432f022..d6d2a725 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -2179,13 +2179,19 @@ impl BaseWebRTCSink { drop(settings); let mut state = self.state.lock().unwrap(); + let mut join_handles = vec![]; + #[cfg(feature = "web_server")] if let Some(web_shutdown_tx) = state.web_shutdown_tx.take() { let _ = web_shutdown_tx.send(()); let web_join_handle = state.web_join_handle.take().expect("no web join handle"); - RUNTIME.block_on(async { - let _ = web_join_handle.await; - }); + // wait for this later + join_handles.push( + async { + let _ = web_join_handle.await; + } + .boxed_local(), + ); } let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); @@ -2208,9 +2214,12 @@ impl BaseWebRTCSink { gst::debug!(CAT, imp = self, "Waiting for codec discoveries to finish"); let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers); codecs_done_receiver.into_iter().for_each(|receiver| { - RUNTIME.block_on(async { - let _ = receiver.await; - }); + join_handles.push( + async { + let _ = receiver.await; + } + .boxed_local(), + ); }); gst::debug!(CAT, imp = self, "No codec discovery is running anymore"); @@ -2223,6 +2232,14 @@ impl BaseWebRTCSink { } drop(state); + + // only wait for all handles after the state lock has been dropped. Some of the futures may + // be waiting on the state lock to make forward progress before being able to be cancelled + // from calls above. + for handle in join_handles { + RUNTIME.block_on(handle); + } + gst::debug!(CAT, imp = self, "Ending sessions"); for session in sessions { signaller.end_session(&session.0.lock().unwrap().id);