mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-21 22:58:16 +00:00
urisourcebin: Aggregate collections from multiple parsebin
In the case where multiple parsebin are present (ex: from rtsp sources), we want to aggregate the collections provided by the different parsebin and expose a single "unified" collection. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7764>
This commit is contained in:
parent
f7ab9f91e7
commit
5d6a4565b0
1 changed files with 91 additions and 6 deletions
|
@ -122,6 +122,9 @@ struct _ChildSrcPadInfo
|
|||
/* use_queue2: TRUE if the contents should be buffered through a queue2
|
||||
* element */
|
||||
gboolean use_queue2;
|
||||
|
||||
/* Current StreamCollection */
|
||||
GstStreamCollection *collection;
|
||||
};
|
||||
|
||||
/* Output Slot:
|
||||
|
@ -194,6 +197,9 @@ struct _GstURISourceBin
|
|||
gint last_buffering_pct; /* Avoid sending buffering over and over */
|
||||
GMutex buffering_lock;
|
||||
GMutex buffering_post_lock;
|
||||
|
||||
/* Current output collection (if provided by internal elements) */
|
||||
GstStreamCollection *collection;
|
||||
};
|
||||
|
||||
struct _GstURISourceBinClass
|
||||
|
@ -565,6 +571,7 @@ gst_uri_source_bin_finalize (GObject * obj)
|
|||
g_mutex_clear (&urisrc->buffering_post_lock);
|
||||
g_free (urisrc->uri);
|
||||
g_free (urisrc->download_dir);
|
||||
gst_object_replace ((GstObject **) & urisrc->collection, NULL);
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (obj);
|
||||
}
|
||||
|
@ -786,6 +793,8 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
|
|||
gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
|
||||
}
|
||||
|
||||
gst_object_replace ((GstObject **) & info->collection, NULL);
|
||||
|
||||
g_free (info);
|
||||
}
|
||||
|
||||
|
@ -2909,8 +2918,80 @@ handle_parsebin_collection (ChildSrcPadInfo * info,
|
|||
|
||||
g_list_free (unused_slots);
|
||||
g_list_free (streams);
|
||||
|
||||
/* Store the collection */
|
||||
gst_object_replace ((GstObject **) & info->collection,
|
||||
(GstObject *) collection);
|
||||
}
|
||||
|
||||
/* uri_source_bin_aggregate_collection:
|
||||
*
|
||||
* Go over the collections provided by the various parsebin (via
|
||||
* ChildSrcPadInfo) and provide a unified collection.
|
||||
*
|
||||
* If there are more than one collection and they are different, a new
|
||||
* aggregated collection will be returned.
|
||||
*
|
||||
* In all cases, the collection will be stored as the uri source bin main
|
||||
* collection.
|
||||
*
|
||||
* Return: (transfer full): The collection, or %NULL.
|
||||
*/
|
||||
static GstStreamCollection *
|
||||
uri_source_bin_aggregate_collection (GstURISourceBin * urisrc)
|
||||
{
|
||||
GList *iter;
|
||||
GList *streams = NULL;
|
||||
gboolean collections_need_merging = FALSE;
|
||||
GstStreamCollection *res = NULL;
|
||||
|
||||
for (iter = urisrc->src_infos; iter; iter = iter->next) {
|
||||
ChildSrcPadInfo *info = iter->data;
|
||||
|
||||
GST_DEBUG_OBJECT (info->demuxer, "collection %" GST_PTR_FORMAT,
|
||||
info->collection);
|
||||
if (res && info->collection && info->collection != res) {
|
||||
collections_need_merging = TRUE;
|
||||
break;
|
||||
}
|
||||
|
||||
if (info->collection)
|
||||
res = info->collection;
|
||||
}
|
||||
|
||||
if (!collections_need_merging) {
|
||||
GST_DEBUG_OBJECT (urisrc, "No need to aggregate");
|
||||
goto store_exit;
|
||||
}
|
||||
|
||||
res = gst_stream_collection_new ("urisourcebin");
|
||||
for (iter = urisrc->src_infos; iter; iter = iter->next) {
|
||||
ChildSrcPadInfo *info = iter->data;
|
||||
if (info->collection) {
|
||||
guint i, len;
|
||||
len = gst_stream_collection_get_size (info->collection);
|
||||
for (i = 0; i < len; i++) {
|
||||
GstStream *stream =
|
||||
gst_stream_collection_get_stream (info->collection, i);
|
||||
if (!g_list_find (streams, stream)) {
|
||||
streams = g_list_append (streams, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (iter = streams; iter; iter = iter->next) {
|
||||
GstStream *stream = iter->data;
|
||||
gst_stream_collection_add_stream (res, gst_object_ref (stream));
|
||||
}
|
||||
if (streams)
|
||||
g_list_free (streams);
|
||||
|
||||
store_exit:
|
||||
gst_object_replace ((GstObject **) & urisrc->collection, (GstObject *) res);
|
||||
GST_DEBUG_OBJECT (urisrc, "Aggregated collection %" GST_PTR_FORMAT, res);
|
||||
return res ? gst_object_ref (res) : NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
handle_message (GstBin * bin, GstMessage * msg)
|
||||
|
@ -2948,17 +3029,21 @@ handle_message (GstBin * bin, GstMessage * msg)
|
|||
if (info->demuxer_is_parsebin) {
|
||||
GstStreamCollection *collection = NULL;
|
||||
gst_message_parse_stream_collection (msg, &collection);
|
||||
GST_DEBUG_OBJECT (bin, "Seen collection %" GST_PTR_FORMAT,
|
||||
collection);
|
||||
/* Check if some output slots can/could be re-used with this new collection */
|
||||
if (collection) {
|
||||
GstStreamCollection *aggregated = NULL;
|
||||
handle_parsebin_collection (info, collection);
|
||||
aggregated = uri_source_bin_aggregate_collection (urisrc);
|
||||
if (aggregated != collection) {
|
||||
gst_message_unref (msg);
|
||||
msg =
|
||||
gst_message_new_stream_collection ((GstObject *) urisrc,
|
||||
aggregated);
|
||||
}
|
||||
gst_object_unref (collection);
|
||||
}
|
||||
if (g_list_length (urisrc->src_infos) > 1) {
|
||||
GST_DEBUG_OBJECT (bin,
|
||||
"Dropping stream-collection, multiple parsebins present");
|
||||
gst_message_unref (msg);
|
||||
msg = NULL;
|
||||
}
|
||||
}
|
||||
} else if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source) {
|
||||
GST_LOG_OBJECT (bin, "Collection %" GST_PTR_FORMAT, msg);
|
||||
|
|
Loading…
Reference in a new issue