From cf803ea9f4e3fde92c1da86ecc47444035f7c0a7 Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Tue, 8 Aug 2017 17:39:43 -0400 Subject: [PATCH] tee: Implement allocation query aggregation This will aggregate allocation params, pool and will keep all meta that has no parameters. https://bugzilla.gnome.org/show_bug.cgi?id=730758 --- plugins/elements/gsttee.c | 226 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) diff --git a/plugins/elements/gsttee.c b/plugins/elements/gsttee.c index 8694b95eeb..ce5449a34c 100644 --- a/plugins/elements/gsttee.c +++ b/plugins/elements/gsttee.c @@ -565,12 +565,238 @@ gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) return res; } +struct AllocQueryCtx +{ + GstTee *tee; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; +}; + +/* This function will aggregate some of the allocation query information with + * the strategy to force upstream allocation. Depending on downstream + * allocation would otherwise make dynamic pipelines much more complicated as + * application would need to now drain buffer in certain cases before getting + * rid of a tee branch. */ +static gboolean +gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data) +{ + struct AllocQueryCtx *ctx = user_data; + GstPad *src_pad = g_value_get_object (item); + GstPad *peer_pad; + GstCaps *caps; + GstQuery *query; + guint count, i, size, min; + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s", + GST_DEBUG_PAD_NAME (src_pad)); + + peer_pad = gst_pad_get_peer (src_pad); + if (!peer_pad) { + if (ctx->tee->allow_not_linked) { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.", + GST_DEBUG_PAD_NAME (src_pad)); + return TRUE; + } else { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.", + GST_DEBUG_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + return FALSE; + } + } + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_pad_query (peer_pad, query)) { + GST_DEBUG_OBJECT (ctx->tee, + "Allocation query failed on pad %s, ignoring allocation", + GST_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + gst_query_unref (query); + gst_object_unref (peer_pad); + return FALSE; + } + + gst_object_unref (peer_pad); + + /* Allocation Params: + * store the maximum alignment, prefix and pading, but ignore the + * allocators and the flags which are tied to downstream allocation */ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (ctx->tee, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * pad. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + gst_query_unref (query); + + return TRUE; +} + + +static void +gst_tee_clear_query_allocation_meta (GstQuery * query) +{ + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) + gst_query_remove_nth_allocation_meta (query, count - i); +} + static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { + GstTee *tee = GST_TEE (parent); gboolean res; switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + { + GstIterator *iter; + GValue ret = G_VALUE_INIT; + struct AllocQueryCtx ctx = { tee, query, }; + + g_value_init (&ret, G_TYPE_BOOLEAN); + g_value_set_boolean (&ret, TRUE); + + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); + while (GST_ITERATOR_RESYNC == + gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) { + gst_iterator_resync (iter); + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + ctx.size = 0; + ctx.min_buffers = 0; + gst_tee_clear_query_allocation_meta (query); + } + + gst_iterator_free (iter); + res = g_value_get_boolean (&ret); + g_value_unset (&ret); + + if (res) { + GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix, + ctx.params.padding); + + GST_DEBUG_OBJECT (tee, + "Aggregated allocation pools size=%u min_buffers=%u", ctx.size, + ctx.min_buffers); + +#ifndef GST_DISABLE_GST_DEBUG + { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count); + + for (i = 0; i < count; i++) + GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } +#endif + + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } else { + gst_tee_clear_query_allocation_meta (query); + } + break; + } default: res = gst_pad_query_default (pad, parent, query); break;