From 0fda4266561dc70dfd025042a6269a6b21446808 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Wed, 23 Oct 2024 15:17:22 +0200 Subject: [PATCH] urisourcebin: Aggregate collections from multiple parsebin In the case where multiple parsebin are present (ex: from rtsp sources), we want to aggregate the collections provided by the different parsebin and expose a single "unified" collection. Part-of: --- .../gst/playback/gsturisourcebin.c | 97 +++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c index 5841bc31f6..0d24b00b74 100644 --- a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c +++ b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c @@ -122,6 +122,9 @@ struct _ChildSrcPadInfo /* use_queue2: TRUE if the contents should be buffered through a queue2 * element */ gboolean use_queue2; + + /* Current StreamCollection */ + GstStreamCollection *collection; }; /* Output Slot: @@ -194,6 +197,9 @@ struct _GstURISourceBin gint last_buffering_pct; /* Avoid sending buffering over and over */ GMutex buffering_lock; GMutex buffering_post_lock; + + /* Current output collection (if provided by internal elements) */ + GstStreamCollection *collection; }; struct _GstURISourceBinClass @@ -565,6 +571,7 @@ gst_uri_source_bin_finalize (GObject * obj) g_mutex_clear (&urisrc->buffering_post_lock); g_free (urisrc->uri); g_free (urisrc->download_dir); + gst_object_replace ((GstObject **) & urisrc->collection, NULL); G_OBJECT_CLASS (parent_class)->finalize (obj); } @@ -786,6 +793,8 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc) gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue); } + gst_object_replace ((GstObject **) & info->collection, NULL); + g_free (info); } @@ -2909,8 +2918,80 @@ handle_parsebin_collection (ChildSrcPadInfo * info, g_list_free (unused_slots); g_list_free (streams); + + /* Store the collection */ + gst_object_replace ((GstObject **) & info->collection, + (GstObject *) collection); } +/* uri_source_bin_aggregate_collection: + * + * Go over the collections provided by the various parsebin (via + * ChildSrcPadInfo) and provide a unified collection. + * + * If there are more than one collection and they are different, a new + * aggregated collection will be returned. + * + * In all cases, the collection will be stored as the uri source bin main + * collection. + * + * Return: (transfer full): The collection, or %NULL. + */ +static GstStreamCollection * +uri_source_bin_aggregate_collection (GstURISourceBin * urisrc) +{ + GList *iter; + GList *streams = NULL; + gboolean collections_need_merging = FALSE; + GstStreamCollection *res = NULL; + + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + + GST_DEBUG_OBJECT (info->demuxer, "collection %" GST_PTR_FORMAT, + info->collection); + if (res && info->collection && info->collection != res) { + collections_need_merging = TRUE; + break; + } + + if (info->collection) + res = info->collection; + } + + if (!collections_need_merging) { + GST_DEBUG_OBJECT (urisrc, "No need to aggregate"); + goto store_exit; + } + + res = gst_stream_collection_new ("urisourcebin"); + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + if (info->collection) { + guint i, len; + len = gst_stream_collection_get_size (info->collection); + for (i = 0; i < len; i++) { + GstStream *stream = + gst_stream_collection_get_stream (info->collection, i); + if (!g_list_find (streams, stream)) { + streams = g_list_append (streams, stream); + } + } + } + } + + for (iter = streams; iter; iter = iter->next) { + GstStream *stream = iter->data; + gst_stream_collection_add_stream (res, gst_object_ref (stream)); + } + if (streams) + g_list_free (streams); + +store_exit: + gst_object_replace ((GstObject **) & urisrc->collection, (GstObject *) res); + GST_DEBUG_OBJECT (urisrc, "Aggregated collection %" GST_PTR_FORMAT, res); + return res ? gst_object_ref (res) : NULL; +} static void handle_message (GstBin * bin, GstMessage * msg) @@ -2948,17 +3029,21 @@ handle_message (GstBin * bin, GstMessage * msg) if (info->demuxer_is_parsebin) { GstStreamCollection *collection = NULL; gst_message_parse_stream_collection (msg, &collection); + GST_DEBUG_OBJECT (bin, "Seen collection %" GST_PTR_FORMAT, + collection); /* Check if some output slots can/could be re-used with this new collection */ if (collection) { + GstStreamCollection *aggregated = NULL; handle_parsebin_collection (info, collection); + aggregated = uri_source_bin_aggregate_collection (urisrc); + if (aggregated != collection) { + gst_message_unref (msg); + msg = + gst_message_new_stream_collection ((GstObject *) urisrc, + aggregated); + } gst_object_unref (collection); } - if (g_list_length (urisrc->src_infos) > 1) { - GST_DEBUG_OBJECT (bin, - "Dropping stream-collection, multiple parsebins present"); - gst_message_unref (msg); - msg = NULL; - } } } else if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source) { GST_LOG_OBJECT (bin, "Collection %" GST_PTR_FORMAT, msg);