mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-24 17:20:36 +00:00
compositor: perform conversions in parallel
Improves throughput of the total convert and blend process and allows for higher performance across slightly more threads. Also make use of video aggregator's task pool for blending as well in order to reduce the number of threads. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1129>
This commit is contained in:
parent
8a5e5ddeeb
commit
f0205645b3
3 changed files with 114 additions and 126 deletions
|
@ -1926,6 +1926,7 @@
|
||||||
"GstCompositorPad": {
|
"GstCompositorPad": {
|
||||||
"hierarchy": [
|
"hierarchy": [
|
||||||
"GstCompositorPad",
|
"GstCompositorPad",
|
||||||
|
"GstVideoAggregatorParallelConvertPad",
|
||||||
"GstVideoAggregatorConvertPad",
|
"GstVideoAggregatorConvertPad",
|
||||||
"GstVideoAggregatorPad",
|
"GstVideoAggregatorPad",
|
||||||
"GstAggregatorPad",
|
"GstAggregatorPad",
|
||||||
|
|
|
@ -183,7 +183,7 @@ enum
|
||||||
};
|
};
|
||||||
|
|
||||||
G_DEFINE_TYPE (GstCompositorPad, gst_compositor_pad,
|
G_DEFINE_TYPE (GstCompositorPad, gst_compositor_pad,
|
||||||
GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD);
|
GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_compositor_pad_get_property (GObject * object, guint prop_id,
|
gst_compositor_pad_get_property (GObject * object, guint prop_id,
|
||||||
|
@ -386,8 +386,8 @@ _pad_obscures_rectangle (GstVideoAggregator * vagg, GstVideoAggregatorPad * pad,
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static void
|
||||||
gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
|
gst_compositor_pad_prepare_frame_start (GstVideoAggregatorPad * pad,
|
||||||
GstVideoAggregator * vagg, GstBuffer * buffer,
|
GstVideoAggregator * vagg, GstBuffer * buffer,
|
||||||
GstVideoFrame * prepared_frame)
|
GstVideoFrame * prepared_frame)
|
||||||
{
|
{
|
||||||
|
@ -417,7 +417,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
|
||||||
|
|
||||||
if (cpad->alpha == 0.0) {
|
if (cpad->alpha == 0.0) {
|
||||||
GST_DEBUG_OBJECT (pad, "Pad has alpha 0.0, not converting frame");
|
GST_DEBUG_OBJECT (pad, "Pad has alpha 0.0, not converting frame");
|
||||||
goto done;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
frame_rect = clamp_rectangle (cpad->xpos, cpad->ypos, width, height,
|
frame_rect = clamp_rectangle (cpad->xpos, cpad->ypos, width, height,
|
||||||
|
@ -426,7 +426,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
|
||||||
if (frame_rect.w == 0 || frame_rect.h == 0) {
|
if (frame_rect.w == 0 || frame_rect.h == 0) {
|
||||||
GST_DEBUG_OBJECT (pad, "Resulting frame is zero-width or zero-height "
|
GST_DEBUG_OBJECT (pad, "Resulting frame is zero-width or zero-height "
|
||||||
"(w: %i, h: %i), skipping", frame_rect.w, frame_rect.h);
|
"(w: %i, h: %i), skipping", frame_rect.w, frame_rect.h);
|
||||||
goto done;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_OBJECT_LOCK (vagg);
|
GST_OBJECT_LOCK (vagg);
|
||||||
|
@ -446,16 +446,11 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
|
||||||
GST_OBJECT_UNLOCK (vagg);
|
GST_OBJECT_UNLOCK (vagg);
|
||||||
|
|
||||||
if (frame_obscured)
|
if (frame_obscured)
|
||||||
goto done;
|
return;
|
||||||
|
|
||||||
return
|
GST_VIDEO_AGGREGATOR_PAD_CLASS
|
||||||
GST_VIDEO_AGGREGATOR_PAD_CLASS
|
(gst_compositor_pad_parent_class)->prepare_frame_start (pad, vagg, buffer,
|
||||||
(gst_compositor_pad_parent_class)->prepare_frame (pad, vagg, buffer,
|
|
||||||
prepared_frame);
|
prepared_frame);
|
||||||
|
|
||||||
done:
|
|
||||||
|
|
||||||
return TRUE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -539,8 +534,8 @@ gst_compositor_pad_class_init (GstCompositorPadClass * klass)
|
||||||
GST_TYPE_COMPOSITOR_OPERATOR, DEFAULT_PAD_OPERATOR,
|
GST_TYPE_COMPOSITOR_OPERATOR, DEFAULT_PAD_OPERATOR,
|
||||||
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
|
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
|
||||||
|
|
||||||
vaggpadclass->prepare_frame =
|
vaggpadclass->prepare_frame_start =
|
||||||
GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame);
|
GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame_start);
|
||||||
|
|
||||||
vaggcpadclass->create_conversion_info =
|
vaggcpadclass->create_conversion_info =
|
||||||
GST_DEBUG_FUNCPTR (gst_compositor_pad_create_conversion_info);
|
GST_DEBUG_FUNCPTR (gst_compositor_pad_create_conversion_info);
|
||||||
|
@ -861,123 +856,105 @@ _fixate_caps (GstAggregator * agg, GstCaps * caps)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static gpointer
|
static void
|
||||||
gst_parallelized_task_thread_func (gpointer data)
|
gst_parallelized_task_thread_func (gpointer data)
|
||||||
{
|
{
|
||||||
GstParallelizedTaskThread *self = data;
|
GstParallelizedTaskRunner *runner = data;
|
||||||
|
gint idx;
|
||||||
|
|
||||||
g_mutex_lock (&self->runner->lock);
|
g_mutex_lock (&runner->lock);
|
||||||
self->runner->n_done++;
|
idx = runner->n_todo--;
|
||||||
if (self->runner->n_done == self->runner->n_threads - 1)
|
g_assert (runner->n_todo >= -1);
|
||||||
g_cond_signal (&self->runner->cond_done);
|
g_mutex_unlock (&runner->lock);
|
||||||
|
|
||||||
do {
|
g_assert (runner->func != NULL);
|
||||||
gint idx;
|
|
||||||
|
|
||||||
while (self->runner->n_todo == -1 && !self->runner->quit)
|
runner->func (runner->task_data[idx]);
|
||||||
g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
|
}
|
||||||
|
|
||||||
if (self->runner->quit)
|
static void
|
||||||
break;
|
gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
|
||||||
|
{
|
||||||
|
gboolean joined = FALSE;
|
||||||
|
|
||||||
idx = self->runner->n_todo--;
|
while (!joined) {
|
||||||
g_assert (self->runner->n_todo >= -1);
|
g_mutex_lock (&self->lock);
|
||||||
g_mutex_unlock (&self->runner->lock);
|
if (!(joined = gst_queue_array_is_empty (self->tasks))) {
|
||||||
|
gpointer task = gst_queue_array_pop_head (self->tasks);
|
||||||
g_assert (self->runner->func != NULL);
|
g_mutex_unlock (&self->lock);
|
||||||
|
gst_task_pool_join (self->pool, task);
|
||||||
self->runner->func (self->runner->task_data[idx]);
|
} else {
|
||||||
|
g_mutex_unlock (&self->lock);
|
||||||
g_mutex_lock (&self->runner->lock);
|
}
|
||||||
self->runner->n_done++;
|
}
|
||||||
if (self->runner->n_done == self->runner->n_threads - 1)
|
|
||||||
g_cond_signal (&self->runner->cond_done);
|
|
||||||
} while (TRUE);
|
|
||||||
|
|
||||||
g_mutex_unlock (&self->runner->lock);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
|
gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
|
||||||
{
|
{
|
||||||
guint i;
|
gst_parallelized_task_runner_join (self);
|
||||||
|
|
||||||
g_mutex_lock (&self->lock);
|
|
||||||
self->quit = TRUE;
|
|
||||||
g_cond_broadcast (&self->cond_todo);
|
|
||||||
g_mutex_unlock (&self->lock);
|
|
||||||
|
|
||||||
for (i = 1; i < self->n_threads; i++) {
|
|
||||||
if (!self->threads[i].thread)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
g_thread_join (self->threads[i].thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
gst_queue_array_free (self->tasks);
|
||||||
|
if (self->own_pool)
|
||||||
|
gst_task_pool_cleanup (self->pool);
|
||||||
|
gst_object_unref (self->pool);
|
||||||
g_mutex_clear (&self->lock);
|
g_mutex_clear (&self->lock);
|
||||||
g_cond_clear (&self->cond_todo);
|
|
||||||
g_cond_clear (&self->cond_done);
|
|
||||||
g_free (self->threads);
|
|
||||||
g_free (self);
|
g_free (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstParallelizedTaskRunner *
|
static GstParallelizedTaskRunner *
|
||||||
gst_parallelized_task_runner_new (guint n_threads)
|
gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool,
|
||||||
|
gboolean async_tasks)
|
||||||
{
|
{
|
||||||
GstParallelizedTaskRunner *self;
|
GstParallelizedTaskRunner *self;
|
||||||
guint i;
|
|
||||||
GError *err = NULL;
|
|
||||||
|
|
||||||
if (n_threads == 0)
|
if (n_threads == 0)
|
||||||
n_threads = g_get_num_processors ();
|
n_threads = g_get_num_processors ();
|
||||||
|
|
||||||
self = g_new0 (GstParallelizedTaskRunner, 1);
|
self = g_new0 (GstParallelizedTaskRunner, 1);
|
||||||
self->n_threads = n_threads;
|
|
||||||
self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
|
|
||||||
|
|
||||||
self->quit = FALSE;
|
if (pool) {
|
||||||
|
self->pool = g_object_ref (pool);
|
||||||
|
self->own_pool = FALSE;
|
||||||
|
|
||||||
|
/* No reason to split up the work between more threads than the
|
||||||
|
* pool can spawn */
|
||||||
|
if (GST_IS_SHARED_TASK_POOL (pool))
|
||||||
|
n_threads =
|
||||||
|
MIN (n_threads,
|
||||||
|
gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
|
||||||
|
} else {
|
||||||
|
self->pool = gst_shared_task_pool_new ();
|
||||||
|
self->own_pool = TRUE;
|
||||||
|
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
|
||||||
|
n_threads);
|
||||||
|
gst_task_pool_prepare (self->pool, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
self->tasks = gst_queue_array_new (n_threads);
|
||||||
|
|
||||||
|
self->n_threads = n_threads;
|
||||||
|
|
||||||
self->n_todo = -1;
|
self->n_todo = -1;
|
||||||
self->n_done = 0;
|
|
||||||
g_mutex_init (&self->lock);
|
g_mutex_init (&self->lock);
|
||||||
g_cond_init (&self->cond_todo);
|
|
||||||
g_cond_init (&self->cond_done);
|
|
||||||
|
|
||||||
/* Set when scheduling a job */
|
/* Set when scheduling a job */
|
||||||
self->func = NULL;
|
self->func = NULL;
|
||||||
self->task_data = NULL;
|
self->task_data = NULL;
|
||||||
|
self->async_tasks = async_tasks;
|
||||||
for (i = 0; i < n_threads; i++) {
|
|
||||||
self->threads[i].runner = self;
|
|
||||||
self->threads[i].idx = i;
|
|
||||||
|
|
||||||
/* First thread is the one calling run() */
|
|
||||||
if (i > 0) {
|
|
||||||
self->threads[i].thread =
|
|
||||||
g_thread_try_new ("compositor-blend",
|
|
||||||
gst_parallelized_task_thread_func, &self->threads[i], &err);
|
|
||||||
if (!self->threads[i].thread)
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
g_mutex_lock (&self->lock);
|
|
||||||
while (self->n_done < self->n_threads - 1)
|
|
||||||
g_cond_wait (&self->cond_done, &self->lock);
|
|
||||||
self->n_done = 0;
|
|
||||||
g_mutex_unlock (&self->lock);
|
|
||||||
|
|
||||||
return self;
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
error:
|
static void
|
||||||
{
|
gst_parallelized_task_runner_finish (GstParallelizedTaskRunner * self)
|
||||||
GST_ERROR ("Failed to start thread %u: %s", i, err->message);
|
{
|
||||||
g_clear_error (&err);
|
g_return_if_fail (self->func != NULL);
|
||||||
|
|
||||||
gst_parallelized_task_runner_free (self);
|
gst_parallelized_task_runner_join (self);
|
||||||
return NULL;
|
|
||||||
}
|
self->func = NULL;
|
||||||
|
self->task_data = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -989,32 +966,43 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self,
|
||||||
self->func = func;
|
self->func = func;
|
||||||
self->task_data = task_data;
|
self->task_data = task_data;
|
||||||
|
|
||||||
if (n_threads > 1) {
|
if (n_threads > 1 || self->async_tasks) {
|
||||||
|
guint i = 0;
|
||||||
g_mutex_lock (&self->lock);
|
g_mutex_lock (&self->lock);
|
||||||
self->n_todo = self->n_threads - 2;
|
self->n_todo = self->n_threads - 1;
|
||||||
self->n_done = 0;
|
if (!self->async_tasks) {
|
||||||
g_cond_broadcast (&self->cond_todo);
|
/* if not async, perform one of the functions in the current thread */
|
||||||
|
self->n_todo--;
|
||||||
|
i = 1;
|
||||||
|
}
|
||||||
|
for (; i < n_threads; i++) {
|
||||||
|
gpointer task =
|
||||||
|
gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
|
||||||
|
self, NULL);
|
||||||
|
|
||||||
|
/* The return value of push() is nullable but NULL is only returned
|
||||||
|
* with the shared task pool when gst_task_pool_prepare() has not been
|
||||||
|
* called and would thus be a programming error that we should hard-fail
|
||||||
|
* on.
|
||||||
|
*/
|
||||||
|
g_assert (task != NULL);
|
||||||
|
gst_queue_array_push_tail (self->tasks, task);
|
||||||
|
}
|
||||||
g_mutex_unlock (&self->lock);
|
g_mutex_unlock (&self->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
self->func (self->task_data[self->n_threads - 1]);
|
if (!self->async_tasks) {
|
||||||
|
self->func (self->task_data[self->n_threads - 1]);
|
||||||
|
|
||||||
if (n_threads > 1) {
|
gst_parallelized_task_runner_finish (self);
|
||||||
g_mutex_lock (&self->lock);
|
|
||||||
while (self->n_done < self->n_threads - 1)
|
|
||||||
g_cond_wait (&self->cond_done, &self->lock);
|
|
||||||
self->n_done = 0;
|
|
||||||
g_mutex_unlock (&self->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self->func = NULL;
|
|
||||||
self->task_data = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
_negotiated_caps (GstAggregator * agg, GstCaps * caps)
|
_negotiated_caps (GstAggregator * agg, GstCaps * caps)
|
||||||
{
|
{
|
||||||
GstCompositor *compositor = GST_COMPOSITOR (agg);
|
GstCompositor *compositor = GST_COMPOSITOR (agg);
|
||||||
|
GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg);
|
||||||
GstVideoInfo v_info;
|
GstVideoInfo v_info;
|
||||||
guint n_threads;
|
guint n_threads;
|
||||||
|
|
||||||
|
@ -1041,8 +1029,12 @@ _negotiated_caps (GstAggregator * agg, GstCaps * caps)
|
||||||
gst_parallelized_task_runner_free (compositor->blend_runner);
|
gst_parallelized_task_runner_free (compositor->blend_runner);
|
||||||
compositor->blend_runner = NULL;
|
compositor->blend_runner = NULL;
|
||||||
}
|
}
|
||||||
if (!compositor->blend_runner)
|
if (!compositor->blend_runner) {
|
||||||
compositor->blend_runner = gst_parallelized_task_runner_new (n_threads);
|
GstTaskPool *pool = gst_video_aggregator_get_execution_task_pool (vagg);
|
||||||
|
compositor->blend_runner =
|
||||||
|
gst_parallelized_task_runner_new (n_threads, pool, FALSE);
|
||||||
|
gst_clear_object (&pool);
|
||||||
|
}
|
||||||
|
|
||||||
return GST_AGGREGATOR_CLASS (parent_class)->negotiated_src_caps (agg, caps);
|
return GST_AGGREGATOR_CLASS (parent_class)->negotiated_src_caps (agg, caps);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include <gst/gst.h>
|
#include <gst/gst.h>
|
||||||
#include <gst/video/video.h>
|
#include <gst/video/video.h>
|
||||||
#include <gst/video/gstvideoaggregator.h>
|
#include <gst/video/gstvideoaggregator.h>
|
||||||
|
#include <gst/base/base.h>
|
||||||
|
|
||||||
#include "blend.h"
|
#include "blend.h"
|
||||||
|
|
||||||
|
@ -35,7 +36,7 @@ G_DECLARE_FINAL_TYPE (GstCompositor, gst_compositor, GST, COMPOSITOR,
|
||||||
|
|
||||||
#define GST_TYPE_COMPOSITOR_PAD (gst_compositor_pad_get_type())
|
#define GST_TYPE_COMPOSITOR_PAD (gst_compositor_pad_get_type())
|
||||||
G_DECLARE_FINAL_TYPE (GstCompositorPad, gst_compositor_pad, GST, COMPOSITOR_PAD,
|
G_DECLARE_FINAL_TYPE (GstCompositorPad, gst_compositor_pad, GST, COMPOSITOR_PAD,
|
||||||
GstVideoAggregatorConvertPad)
|
GstVideoAggregatorParallelConvertPad)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GstCompositorBackground:
|
* GstCompositorBackground:
|
||||||
|
@ -80,28 +81,22 @@ typedef enum
|
||||||
typedef void (*GstParallelizedTaskFunc) (gpointer user_data);
|
typedef void (*GstParallelizedTaskFunc) (gpointer user_data);
|
||||||
|
|
||||||
typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
|
typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
|
||||||
typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread;
|
|
||||||
|
|
||||||
struct _GstParallelizedTaskThread
|
|
||||||
{
|
|
||||||
GstParallelizedTaskRunner *runner;
|
|
||||||
guint idx;
|
|
||||||
GThread *thread;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct _GstParallelizedTaskRunner
|
struct _GstParallelizedTaskRunner
|
||||||
{
|
{
|
||||||
|
GstTaskPool *pool;
|
||||||
|
gboolean own_pool;
|
||||||
guint n_threads;
|
guint n_threads;
|
||||||
|
|
||||||
GstParallelizedTaskThread *threads;
|
GstQueueArray *tasks;
|
||||||
|
|
||||||
GstParallelizedTaskFunc func;
|
GstParallelizedTaskFunc func;
|
||||||
gpointer *task_data;
|
gpointer *task_data;
|
||||||
|
|
||||||
GMutex lock;
|
GMutex lock;
|
||||||
GCond cond_todo, cond_done;
|
gint n_todo;
|
||||||
gint n_todo, n_done;
|
|
||||||
gboolean quit;
|
gboolean async_tasks;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -139,7 +134,7 @@ struct _GstCompositor
|
||||||
*/
|
*/
|
||||||
struct _GstCompositorPad
|
struct _GstCompositorPad
|
||||||
{
|
{
|
||||||
GstVideoAggregatorConvertPad parent;
|
GstVideoAggregatorParallelConvertPad parent;
|
||||||
|
|
||||||
/* properties */
|
/* properties */
|
||||||
gint xpos, ypos;
|
gint xpos, ypos;
|
||||||
|
|
Loading…
Reference in a new issue