video/aggregator: add parallel convert pad class

Each required conversion will be performed concurrently

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1129>
This commit is contained in:
Matthew Waters 2021-04-02 16:40:37 +11:00
parent c30534122e
commit 8a5e5ddeeb
2 changed files with 368 additions and 63 deletions

View file

@ -49,6 +49,32 @@ GST_DEBUG_CATEGORY_STATIC (gst_video_aggregator_debug);
/* Needed prototypes */
static void gst_video_aggregator_reset_qos (GstVideoAggregator * vagg);
struct _GstVideoAggregatorPrivate
{
/* Lock to prevent the state to change while aggregating */
GMutex lock;
/* Current downstream segment */
GstClockTime ts_offset;
guint64 nframes;
/* QoS stuff */
gdouble proportion;
GstClockTime earliest_time;
guint64 qos_processed, qos_dropped;
/* current caps */
GstCaps *current_caps;
gboolean live;
/* The (ordered) list of #GstVideoFormatInfo supported by the aggregation
method (from the srcpad template caps). */
GPtrArray *supported_formats;
GstTaskPool *task_pool;
};
/****************************************
* GstVideoAggregatorPad implementation *
****************************************/
@ -415,8 +441,6 @@ struct _GstVideoAggregatorConvertPadPrivate
* and as such are protected with the object lock */
GstStructure *converter_config;
gboolean converter_config_changed;
GstTaskPool *task_pool;
};
G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad,
@ -435,11 +459,6 @@ gst_video_aggregator_convert_pad_finalize (GObject * o)
gst_structure_free (vaggpad->priv->converter_config);
vaggpad->priv->converter_config = NULL;
if (vaggpad->priv->task_pool)
gst_task_pool_cleanup (vaggpad->priv->task_pool);
gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL);
G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o);
}
@ -454,15 +473,6 @@ static void
GST_OBJECT_UNLOCK (pad);
}
static guint
get_opt_uint (const GstStructure * config, const gchar * opt, guint def)
{
guint res;
if (!gst_structure_get_uint (config, opt, &res))
res = def;
return res;
}
static gboolean
gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
GstVideoAggregator * vagg, GstBuffer * buffer,
@ -480,8 +490,10 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
gst_video_info_init (&conversion_info);
klass->create_conversion_info (pad, vagg, &conversion_info);
if (conversion_info.finfo == NULL)
if (conversion_info.finfo == NULL) {
GST_OBJECT_UNLOCK (pad);
return FALSE;
}
pad->priv->converter_config_changed = FALSE;
pad->priv->conversion_info = conversion_info;
@ -491,24 +503,14 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
pad->priv->convert = NULL;
if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) {
if (pad->priv->converter_config) {
guint n_threads = get_opt_uint (pad->priv->converter_config,
GST_VIDEO_CONVERTER_OPT_THREADS, 1);
if (n_threads == 0 || n_threads > g_get_num_processors ())
n_threads = g_get_num_processors ();
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv->
task_pool), n_threads);
}
pad->priv->convert =
gst_video_converter_new_with_pool (&vpad->info,
&pad->priv->conversion_info,
pad->priv->converter_config ? gst_structure_copy (pad->
priv->converter_config) : NULL, pad->priv->task_pool);
priv->converter_config) : NULL, vagg->priv->task_pool);
if (!pad->priv->convert) {
GST_WARNING_OBJECT (pad, "No path found for conversion");
GST_OBJECT_UNLOCK (pad);
return FALSE;
}
@ -719,9 +721,6 @@ gst_video_aggregator_convert_pad_init (GstVideoAggregatorConvertPad * vaggpad)
vaggpad->priv->convert = NULL;
vaggpad->priv->converter_config = NULL;
vaggpad->priv->converter_config_changed = FALSE;
vaggpad->priv->task_pool = gst_shared_task_pool_new ();
gst_task_pool_prepare (vaggpad->priv->task_pool, NULL);
}
/**
@ -742,6 +741,197 @@ void gst_video_aggregator_convert_pad_update_conversion_info
GST_OBJECT_UNLOCK (pad);
}
struct _GstVideoAggregatorParallelConvertPadPrivate
{
GstVideoFrame src_frame;
gboolean is_converting;
};
typedef struct _GstVideoAggregatorParallelConvertPadPrivate
GstVideoAggregatorParallelConvertPadPrivate;
G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorParallelConvertPad,
gst_video_aggregator_parallel_convert_pad,
GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD);
#define PARALLEL_CONVERT_PAD_GET_PRIVATE(o) \
gst_video_aggregator_parallel_convert_pad_get_instance_private (o)
static void
gst_video_aggregator_parallel_convert_pad_prepare_frame_start
(GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg,
GstBuffer * buffer, GstVideoFrame * prepared_frame)
{
GstVideoAggregatorParallelConvertPad *ppad =
GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (vpad);
GstVideoAggregatorParallelConvertPadPrivate *pcp_priv =
PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad);
GstVideoAggregatorConvertPad *pad = GST_VIDEO_AGGREGATOR_CONVERT_PAD (vpad);
memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame));
pcp_priv->is_converting = FALSE;
/* Update/create converter as needed */
GST_OBJECT_LOCK (pad);
if (pad->priv->converter_config_changed) {
GstVideoAggregatorConvertPadClass *klass =
GST_VIDEO_AGGREGATOR_CONVERT_PAD_GET_CLASS (pad);
GstVideoInfo conversion_info;
gst_video_info_init (&conversion_info);
klass->create_conversion_info (pad, vagg, &conversion_info);
if (conversion_info.finfo == NULL) {
GST_OBJECT_UNLOCK (pad);
return;
}
pad->priv->converter_config_changed = FALSE;
pad->priv->conversion_info = conversion_info;
if (pad->priv->convert)
gst_video_converter_free (pad->priv->convert);
pad->priv->convert = NULL;
if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) {
GstStructure *conv_config;
if (pad->priv->converter_config) {
conv_config = gst_structure_copy (pad->priv->converter_config);
} else {
conv_config = gst_structure_new_empty ("GstVideoConverterConfig");
}
gst_structure_set (conv_config, GST_VIDEO_CONVERTER_OPT_ASYNC_TASKS,
G_TYPE_BOOLEAN, TRUE, NULL);
pad->priv->convert =
gst_video_converter_new_with_pool (&vpad->info,
&pad->priv->conversion_info, conv_config, vagg->priv->task_pool);
if (!pad->priv->convert) {
GST_WARNING_OBJECT (pad, "No path found for conversion");
GST_OBJECT_UNLOCK (pad);
return;
}
GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s",
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)),
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv->
conversion_info)));
} else {
GST_DEBUG_OBJECT (pad, "This pad will not need conversion");
}
}
GST_OBJECT_UNLOCK (pad);
if (!gst_video_frame_map (&pcp_priv->src_frame, &vpad->info, buffer,
GST_MAP_READ)) {
GST_WARNING_OBJECT (vagg, "Could not map input buffer");
return;
}
if (pad->priv->convert) {
GstBuffer *converted_buf = NULL;
static GstAllocationParams params = { 0, 15, 0, 0, };
gint converted_size;
guint outsize;
/* We wait until here to set the conversion infos, in case vagg->info changed */
converted_size = pad->priv->conversion_info.size;
outsize = GST_VIDEO_INFO_SIZE (&vagg->info);
converted_size = converted_size > outsize ? converted_size : outsize;
converted_buf = gst_buffer_new_allocate (NULL, converted_size, &params);
if (!gst_video_frame_map (prepared_frame, &(pad->priv->conversion_info),
converted_buf, GST_MAP_READWRITE)) {
GST_WARNING_OBJECT (vagg, "Could not map converted frame");
gst_clear_buffer (&converted_buf);
gst_video_frame_unmap (&pcp_priv->src_frame);
memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame));
return;
}
gst_video_converter_frame (pad->priv->convert, &pcp_priv->src_frame,
prepared_frame);
pad->priv->converted_buffer = converted_buf;
pcp_priv->is_converting = TRUE;
} else {
*prepared_frame = pcp_priv->src_frame;
memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame));
}
}
static void
gst_video_aggregator_parallel_convert_pad_prepare_frame_finish
(GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg,
GstVideoFrame * prepared_frame)
{
GstVideoAggregatorParallelConvertPad *ppad =
GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (vpad);
GstVideoAggregatorParallelConvertPadPrivate *pcp_priv =
PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad);
GstVideoAggregatorConvertPad *cpad = GST_VIDEO_AGGREGATOR_CONVERT_PAD (vpad);
if (cpad->priv->convert && pcp_priv->is_converting) {
pcp_priv->is_converting = FALSE;
gst_video_converter_frame_finish (cpad->priv->convert);
if (pcp_priv->src_frame.buffer) {
gst_video_frame_unmap (&pcp_priv->src_frame);
memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame));
}
}
}
static void
gst_video_aggregator_parallel_convert_pad_finalize (GObject * object)
{
GstVideoAggregatorParallelConvertPad *ppad =
GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (object);
GstVideoAggregatorParallelConvertPadPrivate *pcp_priv =
PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad);
GstVideoAggregatorConvertPad *cpad =
GST_VIDEO_AGGREGATOR_CONVERT_PAD (object);
if (cpad->priv->convert && pcp_priv->is_converting) {
pcp_priv->is_converting = FALSE;
gst_video_converter_frame_finish (cpad->priv->convert);
if (pcp_priv->src_frame.buffer) {
gst_video_frame_unmap (&pcp_priv->src_frame);
memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame));
}
}
return
G_OBJECT_CLASS
(gst_video_aggregator_parallel_convert_pad_parent_class)->finalize
(object);
}
static void
gst_video_aggregator_parallel_convert_pad_class_init
(GstVideoAggregatorParallelConvertPadClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstVideoAggregatorPadClass *vaggpadclass =
(GstVideoAggregatorPadClass *) klass;
gobject_class->finalize =
GST_DEBUG_FUNCPTR (gst_video_aggregator_parallel_convert_pad_finalize);
vaggpadclass->prepare_frame = NULL;
vaggpadclass->prepare_frame_start =
GST_DEBUG_FUNCPTR
(gst_video_aggregator_parallel_convert_pad_prepare_frame_start);
vaggpadclass->prepare_frame_finish =
GST_DEBUG_FUNCPTR
(gst_video_aggregator_parallel_convert_pad_prepare_frame_finish);
}
static void
gst_video_aggregator_parallel_convert_pad_init
(GstVideoAggregatorParallelConvertPad * vaggpad)
{
}
/**************************************
* GstVideoAggregator implementation *
**************************************/
@ -765,29 +955,6 @@ void gst_video_aggregator_convert_pad_update_conversion_info
} G_STMT_END
struct _GstVideoAggregatorPrivate
{
/* Lock to prevent the state to change while aggregating */
GMutex lock;
/* Current downstream segment */
GstClockTime ts_offset;
guint64 nframes;
/* QoS stuff */
gdouble proportion;
GstClockTime earliest_time;
guint64 qos_processed, qos_dropped;
/* current caps */
GstCaps *current_caps;
gboolean live;
/* The (ordered) list of #GstVideoFormatInfo supported by the aggregation
method (from the srcpad template caps). */
GPtrArray *supported_formats;
};
/* Can't use the G_DEFINE_TYPE macros because we need the
* videoaggregator class in the _init to be able to set
@ -1828,7 +1995,7 @@ sync_pad_values (GstElement * vagg, GstPad * pad, gpointer user_data)
}
static gboolean
prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data)
prepare_frames_start (GstElement * agg, GstPad * pad, gpointer user_data)
{
GstVideoAggregatorPad *vpad = GST_VIDEO_AGGREGATOR_PAD_CAST (pad);
GstVideoAggregatorPadClass *vaggpad_class =
@ -1836,7 +2003,27 @@ prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data)
memset (&vpad->priv->prepared_frame, 0, sizeof (GstVideoFrame));
if (vpad->priv->buffer == NULL || !vaggpad_class->prepare_frame)
if (vpad->priv->buffer == NULL || !vaggpad_class->prepare_frame_start)
return TRUE;
g_return_val_if_fail (vaggpad_class->prepare_frame_start
&& vaggpad_class->prepare_frame_finish, TRUE);
vaggpad_class->prepare_frame_start (vpad, GST_VIDEO_AGGREGATOR_CAST (agg),
vpad->priv->buffer, &vpad->priv->prepared_frame);
return TRUE;
}
static gboolean
prepare_frames_finish (GstElement * agg, GstPad * pad, gpointer user_data)
{
GstVideoAggregatorPad *vpad = GST_VIDEO_AGGREGATOR_PAD_CAST (pad);
GstVideoAggregatorPadClass *vaggpad_class =
GST_VIDEO_AGGREGATOR_PAD_GET_CLASS (pad);
if (vpad->priv->buffer == NULL || (!vaggpad_class->prepare_frame
&& !vaggpad_class->prepare_frame_start))
return TRUE;
/* GAP event, nothing to do */
@ -1846,8 +2033,14 @@ prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data)
return TRUE;
}
return vaggpad_class->prepare_frame (vpad, GST_VIDEO_AGGREGATOR_CAST (agg),
vpad->priv->buffer, &vpad->priv->prepared_frame);
if (vaggpad_class->prepare_frame_start && vaggpad_class->prepare_frame_finish) {
vaggpad_class->prepare_frame_finish (vpad, GST_VIDEO_AGGREGATOR_CAST (agg),
&vpad->priv->prepared_frame);
return TRUE;
} else {
return vaggpad_class->prepare_frame (vpad, GST_VIDEO_AGGREGATOR_CAST (agg),
vpad->priv->buffer, &vpad->priv->prepared_frame);
}
}
static gboolean
@ -1908,7 +2101,10 @@ gst_video_aggregator_do_aggregate (GstVideoAggregator * vagg,
GST_BUFFER_DTS (*outbuf), GST_BUFFER_DURATION (*outbuf), NULL);
/* Convert all the frames the subclass has before aggregating */
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames, NULL);
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames_start,
NULL);
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames_finish,
NULL);
ret = vagg_klass->aggregate_frames (vagg, *outbuf);
@ -2695,6 +2891,29 @@ gst_video_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * bpad,
return ret;
}
/**
* gst_video_aggregator_get_execution_task_pool:
* @vagg: the #GstVideoAggregator
*
* The returned #GstTaskPool is used internally for performing parallel
* video format conversions/scaling/etc during the
* #GstVideoAggregatorPadClass::prepare_frame_start() process.
* Subclasses can add their own operation to perform using the returned
* #GstTaskPool during #GstVideoAggregatorClass::aggregate_frames().
*
* Returns: (transfer full): the #GstTaskPool that can be used by subclasses
* for performing concurrent operations
*
* Since: 1.20
*/
GstTaskPool *
gst_video_aggregator_get_execution_task_pool (GstVideoAggregator * vagg)
{
g_return_val_if_fail (GST_IS_VIDEO_AGGREGATOR (vagg), NULL);
return gst_object_ref (vagg->priv->task_pool);
}
/* GObject vmethods */
static void
gst_video_aggregator_finalize (GObject * o)
@ -2704,6 +2923,10 @@ gst_video_aggregator_finalize (GObject * o)
g_mutex_clear (&vagg->priv->lock);
g_ptr_array_unref (vagg->priv->supported_formats);
if (vagg->priv->task_pool)
gst_task_pool_cleanup (vagg->priv->task_pool);
gst_clear_object (&vagg->priv->task_pool);
G_OBJECT_CLASS (gst_video_aggregator_parent_class)->finalize (o);
}
@ -2841,4 +3064,9 @@ gst_video_aggregator_init (GstVideoAggregator * vagg,
}
gst_caps_unref (src_template);
vagg->priv->task_pool = gst_shared_task_pool_new ();
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (vagg->
priv->task_pool), g_get_num_processors ());
gst_task_pool_prepare (vagg->priv->task_pool, NULL);
}

View file

@ -75,11 +75,38 @@ struct _GstVideoAggregatorPad
* GstVideoAggregatorPadClass:
* @update_conversion_info: Called when either the input or output formats
* have changed.
* @prepare_frame: Prepare the frame from the pad buffer and sets it to prepared_frame
* @prepare_frame: Prepare the frame from the pad buffer and sets it to prepared_frame.
* Implementations should always return TRUE. Returning FALSE will cease
* iteration over subsequent pads.
* @clean_frame: clean the frame previously prepared in prepare_frame
*
* Since: 1.16
*/
/**
* GstVideoAggregatorPadClass::prepare_frame_start:
* @pad: the #GstVideoAggregatorPad
* @videoaggregator: the parent #GstVideoAggregator
* @buffer: the input #GstBuffer to prepare
* @prepared_frame: the #GstVideoFrame to prepare into
*
* Begin preparing the frame from the pad buffer and sets it to prepared_frame.
*
* If overriden, `prepare_frame_finish` must also be overriden.
*
* Since: 1.20
*/
/**
* GstVideoAggregatorPadClass::prepare_frame_finish:
* @pad: the #GstVideoAggregatorPad
* @videoaggregator: the parent #GstVideoAggregator
* @prepared_frame: the #GstVideoFrame to prepare into
*
* Finish preparing @prepared_frame.
*
* If overriden, `prepare_frame_start` must also be overriden.
*
* Since: 1.20
*/
struct _GstVideoAggregatorPadClass
{
GstAggregatorPadClass parent_class;
@ -94,7 +121,16 @@ struct _GstVideoAggregatorPadClass
GstVideoAggregator * videoaggregator,
GstVideoFrame * prepared_frame);
gpointer _gst_reserved[GST_PADDING_LARGE];
void (*prepare_frame_start) (GstVideoAggregatorPad * pad,
GstVideoAggregator * videoaggregator,
GstBuffer * buffer,
GstVideoFrame * prepared_frame);
void (*prepare_frame_finish) (GstVideoAggregatorPad * pad,
GstVideoAggregator * videoaggregator,
GstVideoFrame * prepared_frame);
gpointer _gst_reserved[GST_PADDING_LARGE-2];
};
GST_VIDEO_API
@ -167,6 +203,45 @@ GType gst_video_aggregator_convert_pad_get_type (void);
GST_VIDEO_API
void gst_video_aggregator_convert_pad_update_conversion_info (GstVideoAggregatorConvertPad * pad);
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorConvertPad, gst_object_unref);
/****************************************
* GstVideoAggregatorParallelConvertPad *
****************************************/
#define GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (gst_video_aggregator_parallel_convert_pad_get_type())
GST_VIDEO_API
G_DECLARE_DERIVABLE_TYPE (GstVideoAggregatorParallelConvertPad, gst_video_aggregator_parallel_convert_pad, GST, VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPad);
#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorParallelConvertPad))
#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPadClass))
#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPadClass))
#define GST_IS_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD))
#define GST_IS_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD))
/**
* GstVideoAggregatorParallelConvertPad:
*
* An implementation of GstPad that can be used with #GstVideoAggregator.
*
* See #GstVideoAggregator for more details.
*
* Since: 1.20
*/
/**
* GstVideoAggregatorParallelConvertPadClass:
*
* Since: 1.20
*/
struct _GstVideoAggregatorParallelConvertPadClass
{
GstVideoAggregatorConvertPadClass parent_class;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
/**********************
* GstVideoAggregator *
*********************/
@ -247,9 +322,11 @@ struct _GstVideoAggregatorClass
GST_VIDEO_API
GType gst_video_aggregator_get_type (void);
GST_VIDEO_API
GstTaskPool * gst_video_aggregator_get_execution_task_pool (GstVideoAggregator * vagg);
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregator, gst_object_unref)
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorPad, gst_object_unref)
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorConvertPad, gst_object_unref)
G_END_DECLS
#endif /* __GST_VIDEO_AGGREGATOR_H__ */