diff --git a/gst-libs/gst/video/gstvideoaggregator.c b/gst-libs/gst/video/gstvideoaggregator.c index b1d5577537..7cd9ee1aa1 100644 --- a/gst-libs/gst/video/gstvideoaggregator.c +++ b/gst-libs/gst/video/gstvideoaggregator.c @@ -49,6 +49,32 @@ GST_DEBUG_CATEGORY_STATIC (gst_video_aggregator_debug); /* Needed prototypes */ static void gst_video_aggregator_reset_qos (GstVideoAggregator * vagg); +struct _GstVideoAggregatorPrivate +{ + /* Lock to prevent the state to change while aggregating */ + GMutex lock; + + /* Current downstream segment */ + GstClockTime ts_offset; + guint64 nframes; + + /* QoS stuff */ + gdouble proportion; + GstClockTime earliest_time; + guint64 qos_processed, qos_dropped; + + /* current caps */ + GstCaps *current_caps; + + gboolean live; + + /* The (ordered) list of #GstVideoFormatInfo supported by the aggregation + method (from the srcpad template caps). */ + GPtrArray *supported_formats; + + GstTaskPool *task_pool; +}; + /**************************************** * GstVideoAggregatorPad implementation * ****************************************/ @@ -415,8 +441,6 @@ struct _GstVideoAggregatorConvertPadPrivate * and as such are protected with the object lock */ GstStructure *converter_config; gboolean converter_config_changed; - - GstTaskPool *task_pool; }; G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad, @@ -435,11 +459,6 @@ gst_video_aggregator_convert_pad_finalize (GObject * o) gst_structure_free (vaggpad->priv->converter_config); vaggpad->priv->converter_config = NULL; - if (vaggpad->priv->task_pool) - gst_task_pool_cleanup (vaggpad->priv->task_pool); - - gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL); - G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o); } @@ -454,15 +473,6 @@ static void GST_OBJECT_UNLOCK (pad); } -static guint -get_opt_uint (const GstStructure * config, const gchar * opt, guint def) -{ - guint res; - if (!gst_structure_get_uint (config, opt, &res)) - res = def; - return res; -} - static gboolean gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg, GstBuffer * buffer, @@ -480,8 +490,10 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, gst_video_info_init (&conversion_info); klass->create_conversion_info (pad, vagg, &conversion_info); - if (conversion_info.finfo == NULL) + if (conversion_info.finfo == NULL) { + GST_OBJECT_UNLOCK (pad); return FALSE; + } pad->priv->converter_config_changed = FALSE; pad->priv->conversion_info = conversion_info; @@ -491,24 +503,14 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, pad->priv->convert = NULL; if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) { - if (pad->priv->converter_config) { - guint n_threads = get_opt_uint (pad->priv->converter_config, - GST_VIDEO_CONVERTER_OPT_THREADS, 1); - - if (n_threads == 0 || n_threads > g_get_num_processors ()) - n_threads = g_get_num_processors (); - - gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv-> - task_pool), n_threads); - } - pad->priv->convert = gst_video_converter_new_with_pool (&vpad->info, &pad->priv->conversion_info, pad->priv->converter_config ? gst_structure_copy (pad-> - priv->converter_config) : NULL, pad->priv->task_pool); + priv->converter_config) : NULL, vagg->priv->task_pool); if (!pad->priv->convert) { GST_WARNING_OBJECT (pad, "No path found for conversion"); + GST_OBJECT_UNLOCK (pad); return FALSE; } @@ -719,9 +721,6 @@ gst_video_aggregator_convert_pad_init (GstVideoAggregatorConvertPad * vaggpad) vaggpad->priv->convert = NULL; vaggpad->priv->converter_config = NULL; vaggpad->priv->converter_config_changed = FALSE; - vaggpad->priv->task_pool = gst_shared_task_pool_new (); - - gst_task_pool_prepare (vaggpad->priv->task_pool, NULL); } /** @@ -742,6 +741,197 @@ void gst_video_aggregator_convert_pad_update_conversion_info GST_OBJECT_UNLOCK (pad); } +struct _GstVideoAggregatorParallelConvertPadPrivate +{ + GstVideoFrame src_frame; + gboolean is_converting; +}; + +typedef struct _GstVideoAggregatorParallelConvertPadPrivate + GstVideoAggregatorParallelConvertPadPrivate; + +G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorParallelConvertPad, + gst_video_aggregator_parallel_convert_pad, + GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD); +#define PARALLEL_CONVERT_PAD_GET_PRIVATE(o) \ + gst_video_aggregator_parallel_convert_pad_get_instance_private (o) + +static void + gst_video_aggregator_parallel_convert_pad_prepare_frame_start + (GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg, + GstBuffer * buffer, GstVideoFrame * prepared_frame) +{ + GstVideoAggregatorParallelConvertPad *ppad = + GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (vpad); + GstVideoAggregatorParallelConvertPadPrivate *pcp_priv = + PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad); + GstVideoAggregatorConvertPad *pad = GST_VIDEO_AGGREGATOR_CONVERT_PAD (vpad); + + memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame)); + + pcp_priv->is_converting = FALSE; + + /* Update/create converter as needed */ + GST_OBJECT_LOCK (pad); + if (pad->priv->converter_config_changed) { + GstVideoAggregatorConvertPadClass *klass = + GST_VIDEO_AGGREGATOR_CONVERT_PAD_GET_CLASS (pad); + GstVideoInfo conversion_info; + + gst_video_info_init (&conversion_info); + klass->create_conversion_info (pad, vagg, &conversion_info); + if (conversion_info.finfo == NULL) { + GST_OBJECT_UNLOCK (pad); + return; + } + pad->priv->converter_config_changed = FALSE; + + pad->priv->conversion_info = conversion_info; + + if (pad->priv->convert) + gst_video_converter_free (pad->priv->convert); + pad->priv->convert = NULL; + + if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) { + GstStructure *conv_config; + + if (pad->priv->converter_config) { + conv_config = gst_structure_copy (pad->priv->converter_config); + } else { + conv_config = gst_structure_new_empty ("GstVideoConverterConfig"); + } + gst_structure_set (conv_config, GST_VIDEO_CONVERTER_OPT_ASYNC_TASKS, + G_TYPE_BOOLEAN, TRUE, NULL); + + pad->priv->convert = + gst_video_converter_new_with_pool (&vpad->info, + &pad->priv->conversion_info, conv_config, vagg->priv->task_pool); + if (!pad->priv->convert) { + GST_WARNING_OBJECT (pad, "No path found for conversion"); + GST_OBJECT_UNLOCK (pad); + return; + } + + GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s", + gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)), + gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv-> + conversion_info))); + } else { + GST_DEBUG_OBJECT (pad, "This pad will not need conversion"); + } + } + GST_OBJECT_UNLOCK (pad); + + if (!gst_video_frame_map (&pcp_priv->src_frame, &vpad->info, buffer, + GST_MAP_READ)) { + GST_WARNING_OBJECT (vagg, "Could not map input buffer"); + return; + } + + if (pad->priv->convert) { + GstBuffer *converted_buf = NULL; + static GstAllocationParams params = { 0, 15, 0, 0, }; + gint converted_size; + guint outsize; + + /* We wait until here to set the conversion infos, in case vagg->info changed */ + converted_size = pad->priv->conversion_info.size; + outsize = GST_VIDEO_INFO_SIZE (&vagg->info); + converted_size = converted_size > outsize ? converted_size : outsize; + converted_buf = gst_buffer_new_allocate (NULL, converted_size, ¶ms); + + if (!gst_video_frame_map (prepared_frame, &(pad->priv->conversion_info), + converted_buf, GST_MAP_READWRITE)) { + GST_WARNING_OBJECT (vagg, "Could not map converted frame"); + + gst_clear_buffer (&converted_buf); + gst_video_frame_unmap (&pcp_priv->src_frame); + memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame)); + return; + } + + gst_video_converter_frame (pad->priv->convert, &pcp_priv->src_frame, + prepared_frame); + pad->priv->converted_buffer = converted_buf; + pcp_priv->is_converting = TRUE; + } else { + *prepared_frame = pcp_priv->src_frame; + memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame)); + } +} + +static void + gst_video_aggregator_parallel_convert_pad_prepare_frame_finish + (GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg, + GstVideoFrame * prepared_frame) +{ + GstVideoAggregatorParallelConvertPad *ppad = + GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (vpad); + GstVideoAggregatorParallelConvertPadPrivate *pcp_priv = + PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad); + GstVideoAggregatorConvertPad *cpad = GST_VIDEO_AGGREGATOR_CONVERT_PAD (vpad); + + if (cpad->priv->convert && pcp_priv->is_converting) { + pcp_priv->is_converting = FALSE; + gst_video_converter_frame_finish (cpad->priv->convert); + if (pcp_priv->src_frame.buffer) { + gst_video_frame_unmap (&pcp_priv->src_frame); + memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame)); + } + } +} + +static void +gst_video_aggregator_parallel_convert_pad_finalize (GObject * object) +{ + GstVideoAggregatorParallelConvertPad *ppad = + GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (object); + GstVideoAggregatorParallelConvertPadPrivate *pcp_priv = + PARALLEL_CONVERT_PAD_GET_PRIVATE (ppad); + GstVideoAggregatorConvertPad *cpad = + GST_VIDEO_AGGREGATOR_CONVERT_PAD (object); + + if (cpad->priv->convert && pcp_priv->is_converting) { + pcp_priv->is_converting = FALSE; + gst_video_converter_frame_finish (cpad->priv->convert); + if (pcp_priv->src_frame.buffer) { + gst_video_frame_unmap (&pcp_priv->src_frame); + memset (&pcp_priv->src_frame, 0, sizeof (pcp_priv->src_frame)); + } + } + + return + G_OBJECT_CLASS + (gst_video_aggregator_parallel_convert_pad_parent_class)->finalize + (object); +} + +static void + gst_video_aggregator_parallel_convert_pad_class_init + (GstVideoAggregatorParallelConvertPadClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstVideoAggregatorPadClass *vaggpadclass = + (GstVideoAggregatorPadClass *) klass; + + gobject_class->finalize = + GST_DEBUG_FUNCPTR (gst_video_aggregator_parallel_convert_pad_finalize); + + vaggpadclass->prepare_frame = NULL; + vaggpadclass->prepare_frame_start = + GST_DEBUG_FUNCPTR + (gst_video_aggregator_parallel_convert_pad_prepare_frame_start); + vaggpadclass->prepare_frame_finish = + GST_DEBUG_FUNCPTR + (gst_video_aggregator_parallel_convert_pad_prepare_frame_finish); +} + +static void + gst_video_aggregator_parallel_convert_pad_init + (GstVideoAggregatorParallelConvertPad * vaggpad) +{ +} + /************************************** * GstVideoAggregator implementation * **************************************/ @@ -765,29 +955,6 @@ void gst_video_aggregator_convert_pad_update_conversion_info } G_STMT_END -struct _GstVideoAggregatorPrivate -{ - /* Lock to prevent the state to change while aggregating */ - GMutex lock; - - /* Current downstream segment */ - GstClockTime ts_offset; - guint64 nframes; - - /* QoS stuff */ - gdouble proportion; - GstClockTime earliest_time; - guint64 qos_processed, qos_dropped; - - /* current caps */ - GstCaps *current_caps; - - gboolean live; - - /* The (ordered) list of #GstVideoFormatInfo supported by the aggregation - method (from the srcpad template caps). */ - GPtrArray *supported_formats; -}; /* Can't use the G_DEFINE_TYPE macros because we need the * videoaggregator class in the _init to be able to set @@ -1828,7 +1995,7 @@ sync_pad_values (GstElement * vagg, GstPad * pad, gpointer user_data) } static gboolean -prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data) +prepare_frames_start (GstElement * agg, GstPad * pad, gpointer user_data) { GstVideoAggregatorPad *vpad = GST_VIDEO_AGGREGATOR_PAD_CAST (pad); GstVideoAggregatorPadClass *vaggpad_class = @@ -1836,7 +2003,27 @@ prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data) memset (&vpad->priv->prepared_frame, 0, sizeof (GstVideoFrame)); - if (vpad->priv->buffer == NULL || !vaggpad_class->prepare_frame) + if (vpad->priv->buffer == NULL || !vaggpad_class->prepare_frame_start) + return TRUE; + + g_return_val_if_fail (vaggpad_class->prepare_frame_start + && vaggpad_class->prepare_frame_finish, TRUE); + + vaggpad_class->prepare_frame_start (vpad, GST_VIDEO_AGGREGATOR_CAST (agg), + vpad->priv->buffer, &vpad->priv->prepared_frame); + + return TRUE; +} + +static gboolean +prepare_frames_finish (GstElement * agg, GstPad * pad, gpointer user_data) +{ + GstVideoAggregatorPad *vpad = GST_VIDEO_AGGREGATOR_PAD_CAST (pad); + GstVideoAggregatorPadClass *vaggpad_class = + GST_VIDEO_AGGREGATOR_PAD_GET_CLASS (pad); + + if (vpad->priv->buffer == NULL || (!vaggpad_class->prepare_frame + && !vaggpad_class->prepare_frame_start)) return TRUE; /* GAP event, nothing to do */ @@ -1846,8 +2033,14 @@ prepare_frames (GstElement * agg, GstPad * pad, gpointer user_data) return TRUE; } - return vaggpad_class->prepare_frame (vpad, GST_VIDEO_AGGREGATOR_CAST (agg), - vpad->priv->buffer, &vpad->priv->prepared_frame); + if (vaggpad_class->prepare_frame_start && vaggpad_class->prepare_frame_finish) { + vaggpad_class->prepare_frame_finish (vpad, GST_VIDEO_AGGREGATOR_CAST (agg), + &vpad->priv->prepared_frame); + return TRUE; + } else { + return vaggpad_class->prepare_frame (vpad, GST_VIDEO_AGGREGATOR_CAST (agg), + vpad->priv->buffer, &vpad->priv->prepared_frame); + } } static gboolean @@ -1908,7 +2101,10 @@ gst_video_aggregator_do_aggregate (GstVideoAggregator * vagg, GST_BUFFER_DTS (*outbuf), GST_BUFFER_DURATION (*outbuf), NULL); /* Convert all the frames the subclass has before aggregating */ - gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames, NULL); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames_start, + NULL); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (vagg), prepare_frames_finish, + NULL); ret = vagg_klass->aggregate_frames (vagg, *outbuf); @@ -2695,6 +2891,29 @@ gst_video_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * bpad, return ret; } +/** + * gst_video_aggregator_get_execution_task_pool: + * @vagg: the #GstVideoAggregator + * + * The returned #GstTaskPool is used internally for performing parallel + * video format conversions/scaling/etc during the + * #GstVideoAggregatorPadClass::prepare_frame_start() process. + * Subclasses can add their own operation to perform using the returned + * #GstTaskPool during #GstVideoAggregatorClass::aggregate_frames(). + * + * Returns: (transfer full): the #GstTaskPool that can be used by subclasses + * for performing concurrent operations + * + * Since: 1.20 + */ +GstTaskPool * +gst_video_aggregator_get_execution_task_pool (GstVideoAggregator * vagg) +{ + g_return_val_if_fail (GST_IS_VIDEO_AGGREGATOR (vagg), NULL); + + return gst_object_ref (vagg->priv->task_pool); +} + /* GObject vmethods */ static void gst_video_aggregator_finalize (GObject * o) @@ -2704,6 +2923,10 @@ gst_video_aggregator_finalize (GObject * o) g_mutex_clear (&vagg->priv->lock); g_ptr_array_unref (vagg->priv->supported_formats); + if (vagg->priv->task_pool) + gst_task_pool_cleanup (vagg->priv->task_pool); + gst_clear_object (&vagg->priv->task_pool); + G_OBJECT_CLASS (gst_video_aggregator_parent_class)->finalize (o); } @@ -2841,4 +3064,9 @@ gst_video_aggregator_init (GstVideoAggregator * vagg, } gst_caps_unref (src_template); + + vagg->priv->task_pool = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (vagg-> + priv->task_pool), g_get_num_processors ()); + gst_task_pool_prepare (vagg->priv->task_pool, NULL); } diff --git a/gst-libs/gst/video/gstvideoaggregator.h b/gst-libs/gst/video/gstvideoaggregator.h index 3c7345fe34..52afd00e53 100644 --- a/gst-libs/gst/video/gstvideoaggregator.h +++ b/gst-libs/gst/video/gstvideoaggregator.h @@ -75,11 +75,38 @@ struct _GstVideoAggregatorPad * GstVideoAggregatorPadClass: * @update_conversion_info: Called when either the input or output formats * have changed. - * @prepare_frame: Prepare the frame from the pad buffer and sets it to prepared_frame + * @prepare_frame: Prepare the frame from the pad buffer and sets it to prepared_frame. + * Implementations should always return TRUE. Returning FALSE will cease + * iteration over subsequent pads. * @clean_frame: clean the frame previously prepared in prepare_frame * * Since: 1.16 */ +/** + * GstVideoAggregatorPadClass::prepare_frame_start: + * @pad: the #GstVideoAggregatorPad + * @videoaggregator: the parent #GstVideoAggregator + * @buffer: the input #GstBuffer to prepare + * @prepared_frame: the #GstVideoFrame to prepare into + * + * Begin preparing the frame from the pad buffer and sets it to prepared_frame. + * + * If overriden, `prepare_frame_finish` must also be overriden. + * + * Since: 1.20 + */ +/** + * GstVideoAggregatorPadClass::prepare_frame_finish: + * @pad: the #GstVideoAggregatorPad + * @videoaggregator: the parent #GstVideoAggregator + * @prepared_frame: the #GstVideoFrame to prepare into + * + * Finish preparing @prepared_frame. + * + * If overriden, `prepare_frame_start` must also be overriden. + * + * Since: 1.20 + */ struct _GstVideoAggregatorPadClass { GstAggregatorPadClass parent_class; @@ -94,7 +121,16 @@ struct _GstVideoAggregatorPadClass GstVideoAggregator * videoaggregator, GstVideoFrame * prepared_frame); - gpointer _gst_reserved[GST_PADDING_LARGE]; + void (*prepare_frame_start) (GstVideoAggregatorPad * pad, + GstVideoAggregator * videoaggregator, + GstBuffer * buffer, + GstVideoFrame * prepared_frame); + + void (*prepare_frame_finish) (GstVideoAggregatorPad * pad, + GstVideoAggregator * videoaggregator, + GstVideoFrame * prepared_frame); + + gpointer _gst_reserved[GST_PADDING_LARGE-2]; }; GST_VIDEO_API @@ -167,6 +203,45 @@ GType gst_video_aggregator_convert_pad_get_type (void); GST_VIDEO_API void gst_video_aggregator_convert_pad_update_conversion_info (GstVideoAggregatorConvertPad * pad); +G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorConvertPad, gst_object_unref); + +/**************************************** + * GstVideoAggregatorParallelConvertPad * + ****************************************/ + +#define GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD (gst_video_aggregator_parallel_convert_pad_get_type()) +GST_VIDEO_API +G_DECLARE_DERIVABLE_TYPE (GstVideoAggregatorParallelConvertPad, gst_video_aggregator_parallel_convert_pad, GST, VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPad); + +#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorParallelConvertPad)) +#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPadClass)) +#define GST_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD, GstVideoAggregatorConvertPadClass)) +#define GST_IS_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD)) +#define GST_IS_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD)) + +/** + * GstVideoAggregatorParallelConvertPad: + * + * An implementation of GstPad that can be used with #GstVideoAggregator. + * + * See #GstVideoAggregator for more details. + * + * Since: 1.20 + */ + +/** + * GstVideoAggregatorParallelConvertPadClass: + * + * Since: 1.20 + */ +struct _GstVideoAggregatorParallelConvertPadClass +{ + GstVideoAggregatorConvertPadClass parent_class; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + /********************** * GstVideoAggregator * *********************/ @@ -247,9 +322,11 @@ struct _GstVideoAggregatorClass GST_VIDEO_API GType gst_video_aggregator_get_type (void); +GST_VIDEO_API +GstTaskPool * gst_video_aggregator_get_execution_task_pool (GstVideoAggregator * vagg); + G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregator, gst_object_unref) G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorPad, gst_object_unref) -G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstVideoAggregatorConvertPad, gst_object_unref) G_END_DECLS #endif /* __GST_VIDEO_AGGREGATOR_H__ */