webrtcsink: fix segment format mismatch with remote offer

webrtcsink was starting the negotiation process on Ready and concurrently
moving the consumer pipeline to Playing, but when answering the remote
description was set so fast that input streams were connected (and the time
format set on appsrc) before the state change to Paused had completed.

This meant gst_base_src_start was happening after that and setting the format
back to bytes, the time segment that was next coming in then caused:

basesrc gstbasesrc.c:4255:gst_base_src_push_segment:<video_0> segment format mismatched, ignore

And the consumer pipeline errored out.

The same issue existed in theory when webrtcsink was creating the offer,
but was much harder to trigger as it required that the remote answer
came in before the state change to Paused had completed.

This commit fixes the issue by simply waiting for the state to have
changed to Paused before negotiating.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1730>
This commit is contained in:
Mathieu Duponchelle 2024-08-20 14:29:59 +02:00 committed by Arun Raghavan
parent b6406013c5
commit 16ee51621e

View file

@ -197,6 +197,7 @@ impl CustomBusStream {
);
}
}
let _ = sender.unbounded_send(msg.clone());
}
_ => {
let _ = sender.unbounded_send(msg.clone());
@ -3222,6 +3223,8 @@ impl BaseWebRTCSink {
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.clone();
let offer_clone = offer.cloned();
let peer_id_clone = peer_id.clone();
RUNTIME.spawn(async move {
while let Some(msg) = bus_stream.next().await {
@ -3255,6 +3258,39 @@ impl BaseWebRTCSink {
);
let _ = this.remove_session(&session_id_clone, true);
}
gst::MessageView::StateChanged(state_changed) => {
if state_changed.src() == Some(pipeline.upcast_ref()) {
if state_changed.old() == gst::State::Ready
&& state_changed.current() == gst::State::Paused
{
gst::info!(
CAT,
obj = pipeline,
"{peer_id_clone} pipeline reached PAUSED, negotiating"
);
// We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock:
//
// Going to Ready triggers synchronous emission of the on-negotiation-needed
// signal, during which time the application may add a data channel, causing
// renegotiation, which we do not support at this time.
//
// This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Paused state, and all its transceivers have codec_preferences.
this.negotiate(&session_id_clone, offer_clone.as_ref());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
gst::warning!(
CAT,
obj = element,
"Failed to bring {peer_id_clone} pipeline to PLAYING: {}",
err
);
let _ = this.remove_session(&session_id_clone, true);
}
}
}
}
_ => (),
}
}
@ -3379,22 +3415,15 @@ impl BaseWebRTCSink {
signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
signaller.emit_by_name::<()>("webrtcbin-ready", &[&peer_id, &webrtcbin]);
// We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock:
//
// Going to Ready triggers synchronous emission of the on-negotiation-needed
// signal, during which time the application may add a data channel, causing
// renegotiation, which we do not support at this time.
//
// This is completely safe, as we know that by now all conditions are gathered:
// webrtcbin is in the Ready state, and all its transceivers have codec_preferences.
this.negotiate(&session_id, offer.as_ref());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
// We now bring the state to PAUSED before negotiating, as connecting
// input streams before that can create a race condition with our
// configuring of the format on the app sources and the start()
// implementation of base source resetting the format to bytes.
if let Err(err) = pipeline.set_state(gst::State::Paused) {
gst::warning!(
CAT,
obj = element,
"Failed to bring {peer_id} pipeline to PLAYING: {}",
"Failed to bring {peer_id} pipeline to PAUSED: {}",
err
);
let _ = this.remove_session(&session_id, true);