webrtc/signalling: fix race condition in message ordering

Spawning one task per message to send out instead of sending them out
sequentially from the one task used to poll the handler sometimes
resulted in peers receiving ICE candidates before SDP offers, triggering
hard to understand errors in the browser.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1254>
This commit is contained in:
Mathieu Duponchelle 2023-06-08 01:51:45 +02:00
parent dfe2442c92
commit 0954af10c7

View file

@ -38,7 +38,7 @@ impl Server {
#[instrument(level = "debug", skip(factory))]
pub fn spawn<
I: for<'a> Deserialize<'a>,
O: Serialize + std::fmt::Debug,
O: Serialize + std::fmt::Debug + Send + Sync,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St,
St: Stream<Item = (String, O)>,
>(
@ -72,12 +72,19 @@ impl Server {
let _ = task::spawn(async move {
while let Some((peer_id, msg)) = handler.next().await {
match serde_json::to_string(&msg) {
Ok(msg) => {
if let Some(peer) = state_clone.lock().unwrap().peers.get_mut(&peer_id) {
let mut sender = peer.sender.clone();
task::spawn(async move {
let _ = sender.send(msg).await;
});
Ok(msg_str) => {
let sender = {
let mut state = state_clone.lock().unwrap();
if let Some(peer) = state.peers.get_mut(&peer_id) {
Some(peer.sender.clone())
} else {
None
}
};
if let Some(mut sender) = sender {
trace!("Sending {}", msg_str);
let _ = sender.send(msg_str).await;
}
}
Err(err) => {