webrtc/signalling: fix race condition in message ordering

Spawning one task per message to send out instead of sending them out
sequentially from the one task used to poll the handler sometimes
resulted in peers receiving ICE candidates before SDP offers, triggering
hard to understand errors in the browser.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1253>
This commit is contained in:
Mathieu Duponchelle 2023-06-08 01:51:45 +02:00
parent d3cda3dd3a
commit 12f1f5b097

View file

@ -38,7 +38,7 @@ impl Server {
#[instrument(level = "debug", skip(factory))] #[instrument(level = "debug", skip(factory))]
pub fn spawn< pub fn spawn<
I: for<'a> Deserialize<'a>, I: for<'a> Deserialize<'a>,
O: Serialize + std::fmt::Debug, O: Serialize + std::fmt::Debug + Send + Sync,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St, Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St,
St: Stream<Item = (String, O)>, St: Stream<Item = (String, O)>,
>( >(
@ -72,12 +72,19 @@ impl Server {
task::spawn(async move { task::spawn(async move {
while let Some((peer_id, msg)) = handler.next().await { while let Some((peer_id, msg)) = handler.next().await {
match serde_json::to_string(&msg) { match serde_json::to_string(&msg) {
Ok(msg) => { Ok(msg_str) => {
if let Some(peer) = state_clone.lock().unwrap().peers.get_mut(&peer_id) { let sender = {
let mut sender = peer.sender.clone(); let mut state = state_clone.lock().unwrap();
task::spawn(async move { if let Some(peer) = state.peers.get_mut(&peer_id) {
let _ = sender.send(msg).await; Some(peer.sender.clone())
}); } else {
None
}
};
if let Some(mut sender) = sender {
trace!("Sending {}", msg_str);
let _ = sender.send(msg_str).await;
} }
} }
Err(err) => { Err(err) => {