urisourcebin: Handle legacy pad replacements from parsebin

When dealing with demuxers which aren't streams-aware, we need to handle the
old-school "stream replacement" dance from `parsebin` and hide that in such a
way that output pads are re-used (if compatible).

By analyzing the collection posted by parsebin, we can:
* Identify whether some output slots are no longer used (because the stream they
  currently handle is not present in the collection)
* Decide if some upcoming streams could re-use the existing slot

This supports both buffering and non-buffering modes.

Fixes #1651

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6201>
This commit is contained in:
Edward Hervey 2024-02-23 11:15:49 +01:00 committed by GStreamer Marge Bot
parent e690b53d05
commit 489f310881

View file

@ -132,7 +132,11 @@ struct _OutputSlotInfo
{ {
ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */ ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
GstStream *stream; /* The current stream */
GstStream *pending_stream; /* The stream this slot should switch to */
GstPad *originating_pad; /* Pad that created this OutputSlotInfo (ref held) */ GstPad *originating_pad; /* Pad that created this OutputSlotInfo (ref held) */
GstPad *pending_pad; /* Pad this slot should use once originating_pad goes away (ref held) */
GstPad *output_pad; /* Output ghost pad */ GstPad *output_pad; /* Output ghost pad */
gboolean is_eos; /* Did EOS get fed into the buffering element */ gboolean is_eos; /* Did EOS get fed into the buffering element */
@ -143,6 +147,7 @@ struct _OutputSlotInfo
gulong bitrate_changed_id; /* queue bitrate changed notification */ gulong bitrate_changed_id; /* queue bitrate changed notification */
guint demuxer_event_probe_id; guint demuxer_event_probe_id;
guint pending_probe_id; /* demuxer_event_probe_id for pending_pad */
}; };
/** /**
@ -796,13 +801,28 @@ new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
return info; return info;
} }
static OutputSlotInfo *
find_replacement_slot (ChildSrcPadInfo * info, GstStream * stream)
{
GList *iter;
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *slot = iter->data;
if (slot->pending_stream == stream)
return slot;
}
return NULL;
}
/* Called by the signal handlers when a demuxer has produced a new stream */ /* Called by the signal handlers when a demuxer has produced a new stream */
static void static void
new_demuxer_pad_added_cb (GstElement * element, GstPad * pad, new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
ChildSrcPadInfo * info) ChildSrcPadInfo * info)
{ {
GstURISourceBin *urisrc = info->urisrc; GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot; OutputSlotInfo *slot = NULL;
GstPad *output_pad; GstPad *output_pad;
GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad); GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
@ -816,6 +836,31 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
("Adaptive demuxer is not streams-aware, check your installation")); ("Adaptive demuxer is not streams-aware, check your installation"));
} }
/* For parsebin source pads we want to check if this is a replacement pad for
* which we want to re-use an existing OutputSlotInfo */
if (info->demuxer_is_parsebin) {
GstStream *stream = gst_pad_get_stream (pad);
if (stream) {
slot = find_replacement_slot (info, stream);
if (slot) {
GST_DEBUG_OBJECT (pad, "Can re-use slot %s:%s",
GST_DEBUG_PAD_NAME (slot->originating_pad));
slot->pending_pad = gst_object_ref (pad);
slot->pending_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) demux_pad_events, slot, NULL);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
GST_DEBUG_OBJECT (pad, "No existing output slot to re-use");
} else {
GST_WARNING_OBJECT (pad, "No GstStream on pad ??");
}
}
/* If the demuxer handles buffering and is streams-aware, we can expose it /* If the demuxer handles buffering and is streams-aware, we can expose it
as-is directly. We still add an event probe to deal with EOS */ as-is directly. We still add an event probe to deal with EOS */
slot = new_output_slot (info, pad); slot = new_output_slot (info, pad);
@ -880,6 +925,11 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad); GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
if (slot->pending_pad && pad != slot->pending_pad) {
GST_DEBUG_OBJECT (pad, "A pending pad is present, ignoring");
break;
}
BUFFERING_LOCK (urisrc); BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */ /* Mark that we fed an EOS to this slot */
slot->is_eos = TRUE; slot->is_eos = TRUE;
@ -1266,6 +1316,8 @@ new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
slot->output_pad = create_output_pad (slot, originating_pad); slot->output_pad = create_output_pad (slot, originating_pad);
} }
slot->originating_pad = gst_object_ref (originating_pad); slot->originating_pad = gst_object_ref (originating_pad);
/* Store stream if present */
slot->stream = gst_pad_get_stream (originating_pad);
/* save output slot so we can remove it later */ /* save output slot so we can remove it later */
info->outputs = g_list_append (info->outputs, slot); info->outputs = g_list_append (info->outputs, slot);
@ -1273,6 +1325,8 @@ new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
GST_DEBUG_OBJECT (urisrc, GST_DEBUG_OBJECT (urisrc,
"New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT, "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
slot->output_pad, originating_pad); slot->output_pad, originating_pad);
if (slot->stream)
GST_DEBUG_OBJECT (urisrc, " and stream %" GST_PTR_FORMAT, slot->stream);
return slot; return slot;
@ -1436,6 +1490,31 @@ demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
gst_pad_remove_probe (pad, slot->demuxer_event_probe_id); gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
slot->demuxer_event_probe_id = 0; slot->demuxer_event_probe_id = 0;
if (slot->pending_pad) {
/* Switch over to pending pad */
GST_DEBUG_OBJECT (urisrc, "Switching to pending pad <%s:%s>",
GST_DEBUG_PAD_NAME (slot->pending_pad));
slot->demuxer_event_probe_id = slot->pending_probe_id;
slot->pending_probe_id = 0;
gst_object_unref (slot->originating_pad);
slot->originating_pad = slot->pending_pad;
slot->pending_pad = NULL;
gst_object_unref (slot->stream);
slot->stream = slot->pending_stream;
slot->pending_stream = NULL;
if (slot->queue_sinkpad) {
gst_pad_link (slot->originating_pad, slot->queue_sinkpad);
} else {
gst_ghost_pad_set_target ((GstGhostPad *) slot->output_pad,
slot->originating_pad);
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
if (slot->queue) { if (slot->queue) {
gboolean was_eos; gboolean was_eos;
@ -2276,6 +2355,15 @@ free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
if (slot->demuxer_event_probe_id) if (slot->demuxer_event_probe_id)
gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id); gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
if (slot->pending_pad) {
if (slot->pending_probe_id)
gst_pad_remove_probe (slot->pending_pad, slot->pending_probe_id);
gst_object_unref (slot->pending_pad);
}
if (slot->stream)
gst_object_unref (slot->stream);
if (slot->pending_stream)
gst_object_unref (slot->pending_stream);
gst_object_unref (slot->originating_pad); gst_object_unref (slot->originating_pad);
/* deactivate and remove the srcpad */ /* deactivate and remove the srcpad */
@ -2725,6 +2813,65 @@ find_adaptive_demuxer_cspi_for_msg (GstURISourceBin * urisrc,
return res; return res;
} }
static GstStream *
find_compatible_stream (GList * streams, GstStream * stream)
{
GList *iter;
GstStreamType stream_type = gst_stream_get_stream_type (stream);
for (iter = streams; iter; iter = iter->next) {
GstStream *candidate = iter->data;
if (gst_stream_get_stream_type (candidate) == stream_type)
return candidate;
}
return NULL;
}
static void
handle_parsebin_collection (ChildSrcPadInfo * info,
GstStreamCollection * collection)
{
GList *unused_slots = NULL, *iter;
GList *streams = NULL;
guint i, nb_streams;
nb_streams = gst_stream_collection_get_size (collection);
for (i = 0; i < nb_streams; i++)
streams =
g_list_append (streams, gst_stream_collection_get_stream (collection,
i));
/* Get list of output info slots not present in the collection */
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *output = iter->data;
if (output->stream && !g_list_find (streams, output->stream)) {
GST_DEBUG_OBJECT (output->originating_pad,
"No longer used in new collection");
unused_slots = g_list_append (unused_slots, output);
}
}
/* For each of those slots, check if there is a compatible stream from the
* collection that could be assigned to it */
for (iter = unused_slots; iter; iter = iter->next) {
OutputSlotInfo *output = iter->data;
GstStream *replacement = find_compatible_stream (streams, output->stream);
if (replacement) {
GST_DEBUG_OBJECT (output->originating_pad, "Assigning stream %s",
gst_stream_get_stream_id (replacement));
output->pending_stream = gst_object_ref (replacement);
streams = g_list_remove (streams, replacement);
}
}
g_list_free (unused_slots);
g_list_free (streams);
}
static void static void
handle_message (GstBin * bin, GstMessage * msg) handle_message (GstBin * bin, GstMessage * msg)
{ {
@ -2755,7 +2902,14 @@ handle_message (GstBin * bin, GstMessage * msg)
if (info) { if (info) {
info->demuxer_streams_aware = TRUE; info->demuxer_streams_aware = TRUE;
if (info->demuxer_is_parsebin) { if (info->demuxer_is_parsebin) {
GstStreamCollection *collection = NULL;
gst_message_parse_stream_collection (msg, &collection);
GST_DEBUG_OBJECT (bin, "Dropping stream-collection from parsebin"); GST_DEBUG_OBJECT (bin, "Dropping stream-collection from parsebin");
/* Check if some output slots can/could be re-used with this new collection */
if (collection) {
handle_parsebin_collection (info, collection);
gst_object_unref (collection);
}
gst_message_unref (msg); gst_message_unref (msg);
msg = NULL; msg = NULL;
} }