urisourcebin: Aggregate collections from multiple parsebin

In the case where multiple parsebin are present (ex: from rtsp sources), we want
to aggregate the collections provided by the different parsebin and expose a
single "unified" collection.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7716>
This commit is contained in:
Edward Hervey 2024-10-23 15:17:22 +02:00 committed by GStreamer Marge Bot
parent a26984a72b
commit 0fda426656

View file

@ -122,6 +122,9 @@ struct _ChildSrcPadInfo
/* use_queue2: TRUE if the contents should be buffered through a queue2 /* use_queue2: TRUE if the contents should be buffered through a queue2
* element */ * element */
gboolean use_queue2; gboolean use_queue2;
/* Current StreamCollection */
GstStreamCollection *collection;
}; };
/* Output Slot: /* Output Slot:
@ -194,6 +197,9 @@ struct _GstURISourceBin
gint last_buffering_pct; /* Avoid sending buffering over and over */ gint last_buffering_pct; /* Avoid sending buffering over and over */
GMutex buffering_lock; GMutex buffering_lock;
GMutex buffering_post_lock; GMutex buffering_post_lock;
/* Current output collection (if provided by internal elements) */
GstStreamCollection *collection;
}; };
struct _GstURISourceBinClass struct _GstURISourceBinClass
@ -565,6 +571,7 @@ gst_uri_source_bin_finalize (GObject * obj)
g_mutex_clear (&urisrc->buffering_post_lock); g_mutex_clear (&urisrc->buffering_post_lock);
g_free (urisrc->uri); g_free (urisrc->uri);
g_free (urisrc->download_dir); g_free (urisrc->download_dir);
gst_object_replace ((GstObject **) & urisrc->collection, NULL);
G_OBJECT_CLASS (parent_class)->finalize (obj); G_OBJECT_CLASS (parent_class)->finalize (obj);
} }
@ -786,6 +793,8 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue); gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
} }
gst_object_replace ((GstObject **) & info->collection, NULL);
g_free (info); g_free (info);
} }
@ -2909,8 +2918,80 @@ handle_parsebin_collection (ChildSrcPadInfo * info,
g_list_free (unused_slots); g_list_free (unused_slots);
g_list_free (streams); g_list_free (streams);
/* Store the collection */
gst_object_replace ((GstObject **) & info->collection,
(GstObject *) collection);
} }
/* uri_source_bin_aggregate_collection:
*
* Go over the collections provided by the various parsebin (via
* ChildSrcPadInfo) and provide a unified collection.
*
* If there are more than one collection and they are different, a new
* aggregated collection will be returned.
*
* In all cases, the collection will be stored as the uri source bin main
* collection.
*
* Return: (transfer full): The collection, or %NULL.
*/
static GstStreamCollection *
uri_source_bin_aggregate_collection (GstURISourceBin * urisrc)
{
GList *iter;
GList *streams = NULL;
gboolean collections_need_merging = FALSE;
GstStreamCollection *res = NULL;
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
GST_DEBUG_OBJECT (info->demuxer, "collection %" GST_PTR_FORMAT,
info->collection);
if (res && info->collection && info->collection != res) {
collections_need_merging = TRUE;
break;
}
if (info->collection)
res = info->collection;
}
if (!collections_need_merging) {
GST_DEBUG_OBJECT (urisrc, "No need to aggregate");
goto store_exit;
}
res = gst_stream_collection_new ("urisourcebin");
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
if (info->collection) {
guint i, len;
len = gst_stream_collection_get_size (info->collection);
for (i = 0; i < len; i++) {
GstStream *stream =
gst_stream_collection_get_stream (info->collection, i);
if (!g_list_find (streams, stream)) {
streams = g_list_append (streams, stream);
}
}
}
}
for (iter = streams; iter; iter = iter->next) {
GstStream *stream = iter->data;
gst_stream_collection_add_stream (res, gst_object_ref (stream));
}
if (streams)
g_list_free (streams);
store_exit:
gst_object_replace ((GstObject **) & urisrc->collection, (GstObject *) res);
GST_DEBUG_OBJECT (urisrc, "Aggregated collection %" GST_PTR_FORMAT, res);
return res ? gst_object_ref (res) : NULL;
}
static void static void
handle_message (GstBin * bin, GstMessage * msg) handle_message (GstBin * bin, GstMessage * msg)
@ -2948,16 +3029,20 @@ handle_message (GstBin * bin, GstMessage * msg)
if (info->demuxer_is_parsebin) { if (info->demuxer_is_parsebin) {
GstStreamCollection *collection = NULL; GstStreamCollection *collection = NULL;
gst_message_parse_stream_collection (msg, &collection); gst_message_parse_stream_collection (msg, &collection);
GST_DEBUG_OBJECT (bin, "Seen collection %" GST_PTR_FORMAT,
collection);
/* Check if some output slots can/could be re-used with this new collection */ /* Check if some output slots can/could be re-used with this new collection */
if (collection) { if (collection) {
GstStreamCollection *aggregated = NULL;
handle_parsebin_collection (info, collection); handle_parsebin_collection (info, collection);
gst_object_unref (collection); aggregated = uri_source_bin_aggregate_collection (urisrc);
} if (aggregated != collection) {
if (g_list_length (urisrc->src_infos) > 1) {
GST_DEBUG_OBJECT (bin,
"Dropping stream-collection, multiple parsebins present");
gst_message_unref (msg); gst_message_unref (msg);
msg = NULL; msg =
gst_message_new_stream_collection ((GstObject *) urisrc,
aggregated);
}
gst_object_unref (collection);
} }
} }
} else if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source) { } else if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source) {