rtpbin: separate out the two fec decoder locations

The pipeline flow for receiving looks like this:

rtpsession ! rtpssrcdemux ! session_fec_decoder ! rtpjitterbuffer ! \
  rtpptdemux ! stream_fec_decoder ! ...

There are two places where a fec decoder could be placed.
1. As requested from the 'request-fec-decoder' signal: after rtpptdemux
   for each ssrc/pt produced
2. after rtpssrcdemux but before rtpjitterbuffer: added for the
   rtpst2022-1-fecenc/dec elements,

However, there was some cross-contamination of the elements involved and
the request-fec-decoder signal was also being used to request the fec
decoder for the session_fec_decoder which would then be cached and
re-used for subsequent fec decoder requests.  This would cause the same
element to be attempted to be linked to multiple elements in different
places in the pipeline.  This would fail and cause all kinds of havoc
usually resulting in a not-linked error being returned upstream and an
error message being posted by the source.

Fix by not using the request-fec-decoder signal for requesting the
session_fec_decoder and instead solely rely on the added properties for
that case.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1300>
This commit is contained in:
Matthew Waters 2021-11-09 15:10:06 +11:00 committed by GStreamer Marge Bot
parent bd91286a3b
commit 71dd47516c

View file

@ -528,7 +528,10 @@ struct _GstRtpBinSession
GSList *recv_fec_sinks; GSList *recv_fec_sinks;
GSList *recv_fec_sink_ghosts; GSList *recv_fec_sink_ghosts;
GstElement *fec_decoder; /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
* XXX: This does not yet support multiple ssrc's in the same rtp session
*/
GstElement *early_fec_decoder;
GSList *send_fec_src_ghosts; GSList *send_fec_src_ghosts;
}; };
@ -2488,7 +2491,9 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
* @session: the session index * @session: the session index
* *
* Request a FEC decoder element for the given @session. The element * Request a FEC decoder element for the given @session. The element
* will be added to the bin after the pt demuxer. * will be added to the bin after the pt demuxer. If there are multiple
* ssrc's and pt's in @session, this signal may be called multiple times for
* the same @session each corresponding to a newly discovered ssrc.
* *
* If no handler is connected, no FEC decoder will be used. * If no handler is connected, no FEC decoder will be used.
* *
@ -3558,12 +3563,12 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
} }
static gboolean static gboolean
ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session) ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
{ {
const gchar *factory; const gchar *factory;
gchar *sess_id_str; gchar *sess_id_str;
if (session->fec_decoder) if (session->early_fec_decoder)
goto done; goto done;
sess_id_str = g_strdup_printf ("%u", session->id); sess_id_str = g_strdup_printf ("%u", session->id);
@ -3574,29 +3579,27 @@ ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
if (factory) { if (factory) {
GError *err = NULL; GError *err = NULL;
session->fec_decoder = session->early_fec_decoder =
gst_parse_bin_from_description_full (factory, TRUE, NULL, gst_parse_bin_from_description_full (factory, TRUE, NULL,
GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS, GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
&err); &err);
if (!session->fec_decoder) { if (!session->early_fec_decoder) {
GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s", GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
err->message); err->message);
} }
bin_manage_element (session->bin, session->fec_decoder); bin_manage_element (session->bin, session->early_fec_decoder);
session->elements = session->elements =
g_slist_prepend (session->elements, session->fec_decoder); g_slist_prepend (session->elements, session->early_fec_decoder);
GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
" for session %u", session->fec_decoder, session->id); " for session %u", session->early_fec_decoder, session->id);
} }
/* Fallback to the signal */ /* Do not fallback to the signal as the signal expects a fec decoder to
if (!session->fec_decoder) * be placed at a different place in the pipeline */
session->fec_decoder =
session_request_element (session, SIGNAL_REQUEST_FEC_DECODER);
done: done:
return session->fec_decoder != NULL; return session->early_fec_decoder != NULL;
} }
static void static void
@ -3610,9 +3613,11 @@ expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
gst_object_ref (pad); gst_object_ref (pad);
if (stream->session->storage && !stream->session->fec_decoder) { if (stream->session->storage) {
if (ensure_fec_decoder (rtpbin, stream->session)) { GstElement *fec_decoder =
GstElement *fec_decoder = stream->session->fec_decoder; session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
if (fec_decoder) {
GstPad *sinkpad, *srcpad; GstPad *sinkpad, *srcpad;
GstPadLinkReturn ret; GstPadLinkReturn ret;
@ -3849,12 +3854,13 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
srcpad = gst_element_get_static_pad (element, padname); srcpad = gst_element_get_static_pad (element, padname);
g_free (padname); g_free (padname);
if (session->fec_decoder) { if (session->early_fec_decoder) {
sinkpad = gst_element_get_static_pad (session->fec_decoder, "sink"); GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (sinkpad); gst_object_unref (sinkpad);
gst_object_unref (srcpad); gst_object_unref (srcpad);
srcpad = gst_element_get_static_pad (session->fec_decoder, "src"); srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
} }
sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
@ -4203,12 +4209,12 @@ complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
gchar *padname; gchar *padname;
GstPad *ret; GstPad *ret;
if (!ensure_fec_decoder (rtpbin, session)) if (!ensure_early_fec_decoder (rtpbin, session))
goto no_decoder; goto no_decoder;
GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad"); GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad");
padname = g_strdup_printf ("fec_%u", fec_idx); padname = g_strdup_printf ("fec_%u", fec_idx);
ret = gst_element_request_pad_simple (session->fec_decoder, padname); ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
g_free (padname); g_free (padname);
if (ret == NULL) if (ret == NULL)
@ -4467,7 +4473,7 @@ remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
if (target) { if (target) {
item = g_slist_find (session->recv_fec_sinks, target); item = g_slist_find (session->recv_fec_sinks, target);
if (item) { if (item) {
gst_element_release_request_pad (session->fec_decoder, item->data); gst_element_release_request_pad (session->early_fec_decoder, item->data);
session->recv_fec_sinks = session->recv_fec_sinks =
g_slist_delete_link (session->recv_fec_sinks, item); g_slist_delete_link (session->recv_fec_sinks, item);
} }