From 16ee51621e22aa841947815a6190059a0f5f157f Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 20 Aug 2024 14:29:59 +0200 Subject: [PATCH] webrtcsink: fix segment format mismatch with remote offer webrtcsink was starting the negotiation process on Ready and concurrently moving the consumer pipeline to Playing, but when answering the remote description was set so fast that input streams were connected (and the time format set on appsrc) before the state change to Paused had completed. This meant gst_base_src_start was happening after that and setting the format back to bytes, the time segment that was next coming in then caused: basesrc gstbasesrc.c:4255:gst_base_src_push_segment: segment format mismatched, ignore And the consumer pipeline errored out. The same issue existed in theory when webrtcsink was creating the offer, but was much harder to trigger as it required that the remote answer came in before the state change to Paused had completed. This commit fixes the issue by simply waiting for the state to have changed to Paused before negotiating. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 55 ++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 2327aed8..33179c73 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -197,6 +197,7 @@ impl CustomBusStream { ); } } + let _ = sender.unbounded_send(msg.clone()); } _ => { let _ = sender.unbounded_send(msg.clone()); @@ -3222,6 +3223,8 @@ impl BaseWebRTCSink { let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.clone(); + let offer_clone = offer.cloned(); + let peer_id_clone = peer_id.clone(); RUNTIME.spawn(async move { while let Some(msg) = bus_stream.next().await { @@ -3255,6 +3258,39 @@ impl BaseWebRTCSink { ); let _ = this.remove_session(&session_id_clone, true); } + gst::MessageView::StateChanged(state_changed) => { + if state_changed.src() == Some(pipeline.upcast_ref()) { + if state_changed.old() == gst::State::Ready + && state_changed.current() == gst::State::Paused + { + gst::info!( + CAT, + obj = pipeline, + "{peer_id_clone} pipeline reached PAUSED, negotiating" + ); + // We don't connect to on-negotiation-needed, this in order to call the above + // signal without holding the state lock: + // + // Going to Ready triggers synchronous emission of the on-negotiation-needed + // signal, during which time the application may add a data channel, causing + // renegotiation, which we do not support at this time. + // + // This is completely safe, as we know that by now all conditions are gathered: + // webrtcbin is in the Paused state, and all its transceivers have codec_preferences. + this.negotiate(&session_id_clone, offer_clone.as_ref()); + + if let Err(err) = pipeline.set_state(gst::State::Playing) { + gst::warning!( + CAT, + obj = element, + "Failed to bring {peer_id_clone} pipeline to PLAYING: {}", + err + ); + let _ = this.remove_session(&session_id_clone, true); + } + } + } + } _ => (), } } @@ -3379,22 +3415,15 @@ impl BaseWebRTCSink { signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); signaller.emit_by_name::<()>("webrtcbin-ready", &[&peer_id, &webrtcbin]); - // We don't connect to on-negotiation-needed, this in order to call the above - // signal without holding the state lock: - // - // Going to Ready triggers synchronous emission of the on-negotiation-needed - // signal, during which time the application may add a data channel, causing - // renegotiation, which we do not support at this time. - // - // This is completely safe, as we know that by now all conditions are gathered: - // webrtcbin is in the Ready state, and all its transceivers have codec_preferences. - this.negotiate(&session_id, offer.as_ref()); - - if let Err(err) = pipeline.set_state(gst::State::Playing) { + // We now bring the state to PAUSED before negotiating, as connecting + // input streams before that can create a race condition with our + // configuring of the format on the app sources and the start() + // implementation of base source resetting the format to bytes. + if let Err(err) = pipeline.set_state(gst::State::Paused) { gst::warning!( CAT, obj = element, - "Failed to bring {peer_id} pipeline to PLAYING: {}", + "Failed to bring {peer_id} pipeline to PAUSED: {}", err ); let _ = this.remove_session(&session_id, true);