janusvr_signaller: Do not block in end_session()

Only stop() is allowed to block, wait there.

Fixes #603

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1848>
This commit is contained in:
Xavier Claessens 2024-10-10 11:42:36 -04:00 committed by GStreamer Marge Bot
parent 2cbf83d7e2
commit 372c44655a
2 changed files with 36 additions and 35 deletions

View file

@ -271,6 +271,7 @@ struct State {
transaction_id: Option<String>, transaction_id: Option<String>,
room_id: Option<JanusId>, room_id: Option<JanusId>,
feed_id: Option<JanusId>, feed_id: Option<JanusId>,
leave_room_rx: Option<tokio::sync::oneshot::Receiver<()>>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -568,20 +569,6 @@ impl Signaller {
} }
} }
// Only used at the end when cleaning up the resources.
// So that `SignallableImpl::stop` waits the last message
// to be sent properly.
fn send_blocking(&self, msg: OutgoingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.ws_sender.clone() {
RUNTIME.block_on(async {
if let Err(err) = sender.send(msg).await {
self.raise_error(err.to_string());
}
});
}
}
fn set_transaction_id(&self, transaction: String) { fn set_transaction_id(&self, transaction: String) {
self.state.lock().unwrap().transaction_id = Some(transaction); self.state.lock().unwrap().transaction_id = Some(transaction);
} }
@ -677,8 +664,8 @@ impl Signaller {
} }
fn leave_room(&self) { fn leave_room(&self) {
let mut state = self.state.lock().unwrap();
let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = { let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if settings.room_id.is_none() { if settings.room_id.is_none() {
@ -696,20 +683,34 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
self.send_blocking(OutgoingMessage::RoomRequest(RoomRequestMsg { if let Some(mut sender) = state.ws_sender.clone() {
janus: "message".to_string(), let (tx, rx) = tokio::sync::oneshot::channel::<()>();
transaction, state.leave_room_rx = Some(rx);
session_id, let msg = OutgoingMessage::RoomRequest(RoomRequestMsg {
handle_id, janus: "message".to_string(),
apisecret, transaction,
body: RoomRequestBody { session_id,
request: "leave".to_string(), handle_id,
ptype: "publisher".to_string(), apisecret,
room, body: RoomRequestBody {
id: Some(feed_id), request: "leave".to_string(),
display, ptype: "publisher".to_string(),
}, room,
})); id: Some(feed_id),
display,
},
});
RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
if let Err(err) = sender.send(msg).await {
this.raise_error(err.to_string());
}
let _ = tx.send(());
}
));
}
} }
fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) { fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) {
@ -873,6 +874,12 @@ impl SignallableImpl for Signaller {
}); });
} }
if let Some(rx) = state.leave_room_rx.take() {
RUNTIME.block_on(async move {
let _ = rx.await;
});
}
state.session_id = None; state.session_id = None;
state.handle_id = None; state.handle_id = None;
state.transaction_id = None; state.transaction_id = None;

View file

@ -3707,12 +3707,6 @@ impl BaseWebRTCSink {
drop(settings); drop(settings);
signaller.end_session(session_id); signaller.end_session(session_id);
} }
gst::warning!(
CAT,
imp = self,
"Consumer refused media {session_id}, {media_idx}"
);
return; return;
} }
} }