diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs index 48dde814..8fa159cd 100644 --- a/net/webrtc/src/janusvr_signaller/imp.rs +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -271,6 +271,7 @@ struct State { transaction_id: Option, room_id: Option, feed_id: Option, + leave_room_rx: Option>, } #[derive(Clone)] @@ -568,20 +569,6 @@ impl Signaller { } } - // Only used at the end when cleaning up the resources. - // So that `SignallableImpl::stop` waits the last message - // to be sent properly. - fn send_blocking(&self, msg: OutgoingMessage) { - let state = self.state.lock().unwrap(); - if let Some(mut sender) = state.ws_sender.clone() { - RUNTIME.block_on(async { - if let Err(err) = sender.send(msg).await { - self.raise_error(err.to_string()); - } - }); - } - } - fn set_transaction_id(&self, transaction: String) { self.state.lock().unwrap().transaction_id = Some(transaction); } @@ -677,8 +664,8 @@ impl Signaller { } fn leave_room(&self) { + let mut state = self.state.lock().unwrap(); let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = { - let state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); if settings.room_id.is_none() { @@ -696,20 +683,34 @@ impl Signaller { settings.secret_key.clone(), ) }; - self.send_blocking(OutgoingMessage::RoomRequest(RoomRequestMsg { - janus: "message".to_string(), - transaction, - session_id, - handle_id, - apisecret, - body: RoomRequestBody { - request: "leave".to_string(), - ptype: "publisher".to_string(), - room, - id: Some(feed_id), - display, - }, - })); + if let Some(mut sender) = state.ws_sender.clone() { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + state.leave_room_rx = Some(rx); + let msg = OutgoingMessage::RoomRequest(RoomRequestMsg { + janus: "message".to_string(), + transaction, + session_id, + handle_id, + apisecret, + body: RoomRequestBody { + request: "leave".to_string(), + ptype: "publisher".to_string(), + room, + id: Some(feed_id), + display, + }, + }); + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { + if let Err(err) = sender.send(msg).await { + this.raise_error(err.to_string()); + } + let _ = tx.send(()); + } + )); + } } fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) { @@ -873,6 +874,12 @@ impl SignallableImpl for Signaller { }); } + if let Some(rx) = state.leave_room_rx.take() { + RUNTIME.block_on(async move { + let _ = rx.await; + }); + } + state.session_id = None; state.handle_id = None; state.transaction_id = None; diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 40ceea0a..81c4434b 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -3707,12 +3707,6 @@ impl BaseWebRTCSink { drop(settings); signaller.end_session(session_id); } - - gst::warning!( - CAT, - imp = self, - "Consumer refused media {session_id}, {media_idx}" - ); return; } }