webrtcsink: fix segment format mismatch with remote offer

webrtcsink was starting the negotiation process on Ready and concurrently
moving the consumer pipeline to Playing, but when answering the remote
description was set so fast that input streams were connected (and the time
format set on appsrc) before the state change to Paused had completed.

This meant gst_base_src_start was happening after that and setting the format
back to bytes, the time segment that was next coming in then caused:

basesrc gstbasesrc.c:4255:gst_base_src_push_segment:<video_0> segment format mismatched, ignore

And the consumer pipeline errored out.

The same issue existed in theory when webrtcsink was creating the offer,
but was much harder to trigger as it required that the remote answer
came in before the state change to Paused had completed.

This commit fixes the issue by simply waiting for the state to have
changed to Paused before negotiating.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1786>
This commit is contained in:
Mathieu Duponchelle 2024-08-20 14:29:59 +02:00 committed by Sebastian Dröge
parent 0b1d5d8682
commit e64546598d

View file

@ -162,6 +162,7 @@ impl CustomBusStream {
); );
} }
} }
let _ = sender.unbounded_send(msg.to_owned());
} }
_ => { _ => {
let _ = sender.unbounded_send(msg.to_owned()); let _ = sender.unbounded_send(msg.to_owned());
@ -2744,6 +2745,8 @@ impl BaseWebRTCSink {
let element_clone = element.downgrade(); let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade(); let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.clone(); let session_id_clone = session_id.clone();
let offer_clone = offer.cloned();
let peer_id_clone = peer_id.clone();
RUNTIME.spawn(async move { RUNTIME.spawn(async move {
while let Some(msg) = bus_stream.next().await { while let Some(msg) = bus_stream.next().await {
@ -2765,6 +2768,38 @@ impl BaseWebRTCSink {
); );
let _ = this.remove_session(&element, &session_id_clone, true); let _ = this.remove_session(&element, &session_id_clone, true);
} }
gst::MessageView::StateChanged(state_changed) => {
if state_changed.src() == Some(pipeline.upcast_ref())
&& state_changed.old() == gst::State::Ready
&& state_changed.current() == gst::State::Paused
{
gst::info!(
CAT,
obj: pipeline,
"{peer_id_clone} pipeline reached PAUSED, negotiating"
);
// We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock:
//
// Going to Ready triggers synchronous emission of the on-negotiation-needed
// signal, during which time the application may add a data channel, causing
// renegotiation, which we do not support at this time.
//
// This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Paused state, and all its transceivers have codec_preferences.
this.negotiate(&element, &session_id_clone, offer_clone.as_ref());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
gst::warning!(
CAT,
obj: element,
"Failed to bring {peer_id_clone} pipeline to PLAYING: {}",
err
);
let _ = this.remove_session(&element, &session_id_clone, true);
}
}
}
gst::MessageView::Latency(..) => { gst::MessageView::Latency(..) => {
gst::info!(CAT, obj: pipeline, "Recalculating latency"); gst::info!(CAT, obj: pipeline, "Recalculating latency");
let _ = pipeline.recalculate_latency(); let _ = pipeline.recalculate_latency();
@ -2897,22 +2932,15 @@ impl BaseWebRTCSink {
signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
signaller.emit_by_name::<()>("webrtcbin-ready", &[&peer_id, &webrtcbin]); signaller.emit_by_name::<()>("webrtcbin-ready", &[&peer_id, &webrtcbin]);
// We don't connect to on-negotiation-needed, this in order to call the above // We now bring the state to PAUSED before negotiating, as connecting
// signal without holding the state lock: // input streams before that can create a race condition with our
// // configuring of the format on the app sources and the start()
// Going to Ready triggers synchronous emission of the on-negotiation-needed // implementation of base source resetting the format to bytes.
// signal, during which time the application may add a data channel, causing if let Err(err) = pipeline.set_state(gst::State::Paused) {
// renegotiation, which we do not support at this time.
//
// This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Ready state, and all its transceivers have codec_preferences.
this.negotiate(&element, &session_id, offer_clone.as_ref());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
gst::warning!( gst::warning!(
CAT, CAT,
obj: element, obj: element,
"Failed to bring {peer_id} pipeline to PLAYING: {}", "Failed to bring {peer_id} pipeline to PAUSED: {}",
err err
); );
let _ = this.remove_session(&element, &session_id, true); let _ = this.remove_session(&element, &session_id, true);