diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index eef719f711..10205dc248 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -1926,6 +1926,7 @@ "GstCompositorPad": { "hierarchy": [ "GstCompositorPad", + "GstVideoAggregatorParallelConvertPad", "GstVideoAggregatorConvertPad", "GstVideoAggregatorPad", "GstAggregatorPad", diff --git a/gst/compositor/compositor.c b/gst/compositor/compositor.c index 38eba7616d..8a891f1fc1 100644 --- a/gst/compositor/compositor.c +++ b/gst/compositor/compositor.c @@ -183,7 +183,7 @@ enum }; G_DEFINE_TYPE (GstCompositorPad, gst_compositor_pad, - GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD); + GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD); static void gst_compositor_pad_get_property (GObject * object, guint prop_id, @@ -386,8 +386,8 @@ _pad_obscures_rectangle (GstVideoAggregator * vagg, GstVideoAggregatorPad * pad, return TRUE; } -static gboolean -gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad, +static void +gst_compositor_pad_prepare_frame_start (GstVideoAggregatorPad * pad, GstVideoAggregator * vagg, GstBuffer * buffer, GstVideoFrame * prepared_frame) { @@ -417,7 +417,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad, if (cpad->alpha == 0.0) { GST_DEBUG_OBJECT (pad, "Pad has alpha 0.0, not converting frame"); - goto done; + return; } frame_rect = clamp_rectangle (cpad->xpos, cpad->ypos, width, height, @@ -426,7 +426,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad, if (frame_rect.w == 0 || frame_rect.h == 0) { GST_DEBUG_OBJECT (pad, "Resulting frame is zero-width or zero-height " "(w: %i, h: %i), skipping", frame_rect.w, frame_rect.h); - goto done; + return; } GST_OBJECT_LOCK (vagg); @@ -446,16 +446,11 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad, GST_OBJECT_UNLOCK (vagg); if (frame_obscured) - goto done; + return; - return - GST_VIDEO_AGGREGATOR_PAD_CLASS - (gst_compositor_pad_parent_class)->prepare_frame (pad, vagg, buffer, + GST_VIDEO_AGGREGATOR_PAD_CLASS + (gst_compositor_pad_parent_class)->prepare_frame_start (pad, vagg, buffer, prepared_frame); - -done: - - return TRUE; } static void @@ -539,8 +534,8 @@ gst_compositor_pad_class_init (GstCompositorPadClass * klass) GST_TYPE_COMPOSITOR_OPERATOR, DEFAULT_PAD_OPERATOR, G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); - vaggpadclass->prepare_frame = - GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame); + vaggpadclass->prepare_frame_start = + GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame_start); vaggcpadclass->create_conversion_info = GST_DEBUG_FUNCPTR (gst_compositor_pad_create_conversion_info); @@ -861,123 +856,105 @@ _fixate_caps (GstAggregator * agg, GstCaps * caps) return ret; } -static gpointer +static void gst_parallelized_task_thread_func (gpointer data) { - GstParallelizedTaskThread *self = data; + GstParallelizedTaskRunner *runner = data; + gint idx; - g_mutex_lock (&self->runner->lock); - self->runner->n_done++; - if (self->runner->n_done == self->runner->n_threads - 1) - g_cond_signal (&self->runner->cond_done); + g_mutex_lock (&runner->lock); + idx = runner->n_todo--; + g_assert (runner->n_todo >= -1); + g_mutex_unlock (&runner->lock); - do { - gint idx; + g_assert (runner->func != NULL); - while (self->runner->n_todo == -1 && !self->runner->quit) - g_cond_wait (&self->runner->cond_todo, &self->runner->lock); + runner->func (runner->task_data[idx]); +} - if (self->runner->quit) - break; +static void +gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self) +{ + gboolean joined = FALSE; - idx = self->runner->n_todo--; - g_assert (self->runner->n_todo >= -1); - g_mutex_unlock (&self->runner->lock); - - g_assert (self->runner->func != NULL); - - self->runner->func (self->runner->task_data[idx]); - - g_mutex_lock (&self->runner->lock); - self->runner->n_done++; - if (self->runner->n_done == self->runner->n_threads - 1) - g_cond_signal (&self->runner->cond_done); - } while (TRUE); - - g_mutex_unlock (&self->runner->lock); - - return NULL; + while (!joined) { + g_mutex_lock (&self->lock); + if (!(joined = gst_queue_array_is_empty (self->tasks))) { + gpointer task = gst_queue_array_pop_head (self->tasks); + g_mutex_unlock (&self->lock); + gst_task_pool_join (self->pool, task); + } else { + g_mutex_unlock (&self->lock); + } + } } static void gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self) { - guint i; - - g_mutex_lock (&self->lock); - self->quit = TRUE; - g_cond_broadcast (&self->cond_todo); - g_mutex_unlock (&self->lock); - - for (i = 1; i < self->n_threads; i++) { - if (!self->threads[i].thread) - continue; - - g_thread_join (self->threads[i].thread); - } + gst_parallelized_task_runner_join (self); + gst_queue_array_free (self->tasks); + if (self->own_pool) + gst_task_pool_cleanup (self->pool); + gst_object_unref (self->pool); g_mutex_clear (&self->lock); - g_cond_clear (&self->cond_todo); - g_cond_clear (&self->cond_done); - g_free (self->threads); g_free (self); } static GstParallelizedTaskRunner * -gst_parallelized_task_runner_new (guint n_threads) +gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool, + gboolean async_tasks) { GstParallelizedTaskRunner *self; - guint i; - GError *err = NULL; if (n_threads == 0) n_threads = g_get_num_processors (); self = g_new0 (GstParallelizedTaskRunner, 1); - self->n_threads = n_threads; - self->threads = g_new0 (GstParallelizedTaskThread, n_threads); - self->quit = FALSE; + if (pool) { + self->pool = g_object_ref (pool); + self->own_pool = FALSE; + + /* No reason to split up the work between more threads than the + * pool can spawn */ + if (GST_IS_SHARED_TASK_POOL (pool)) + n_threads = + MIN (n_threads, + gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool))); + } else { + self->pool = gst_shared_task_pool_new (); + self->own_pool = TRUE; + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool), + n_threads); + gst_task_pool_prepare (self->pool, NULL); + } + + self->tasks = gst_queue_array_new (n_threads); + + self->n_threads = n_threads; + self->n_todo = -1; - self->n_done = 0; g_mutex_init (&self->lock); - g_cond_init (&self->cond_todo); - g_cond_init (&self->cond_done); /* Set when scheduling a job */ self->func = NULL; self->task_data = NULL; - - for (i = 0; i < n_threads; i++) { - self->threads[i].runner = self; - self->threads[i].idx = i; - - /* First thread is the one calling run() */ - if (i > 0) { - self->threads[i].thread = - g_thread_try_new ("compositor-blend", - gst_parallelized_task_thread_func, &self->threads[i], &err); - if (!self->threads[i].thread) - goto error; - } - } - - g_mutex_lock (&self->lock); - while (self->n_done < self->n_threads - 1) - g_cond_wait (&self->cond_done, &self->lock); - self->n_done = 0; - g_mutex_unlock (&self->lock); + self->async_tasks = async_tasks; return self; +} -error: - { - GST_ERROR ("Failed to start thread %u: %s", i, err->message); - g_clear_error (&err); +static void +gst_parallelized_task_runner_finish (GstParallelizedTaskRunner * self) +{ + g_return_if_fail (self->func != NULL); - gst_parallelized_task_runner_free (self); - return NULL; - } + gst_parallelized_task_runner_join (self); + + self->func = NULL; + self->task_data = NULL; } static void @@ -989,32 +966,43 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self, self->func = func; self->task_data = task_data; - if (n_threads > 1) { + if (n_threads > 1 || self->async_tasks) { + guint i = 0; g_mutex_lock (&self->lock); - self->n_todo = self->n_threads - 2; - self->n_done = 0; - g_cond_broadcast (&self->cond_todo); + self->n_todo = self->n_threads - 1; + if (!self->async_tasks) { + /* if not async, perform one of the functions in the current thread */ + self->n_todo--; + i = 1; + } + for (; i < n_threads; i++) { + gpointer task = + gst_task_pool_push (self->pool, gst_parallelized_task_thread_func, + self, NULL); + + /* The return value of push() is nullable but NULL is only returned + * with the shared task pool when gst_task_pool_prepare() has not been + * called and would thus be a programming error that we should hard-fail + * on. + */ + g_assert (task != NULL); + gst_queue_array_push_tail (self->tasks, task); + } g_mutex_unlock (&self->lock); } - self->func (self->task_data[self->n_threads - 1]); + if (!self->async_tasks) { + self->func (self->task_data[self->n_threads - 1]); - if (n_threads > 1) { - g_mutex_lock (&self->lock); - while (self->n_done < self->n_threads - 1) - g_cond_wait (&self->cond_done, &self->lock); - self->n_done = 0; - g_mutex_unlock (&self->lock); + gst_parallelized_task_runner_finish (self); } - - self->func = NULL; - self->task_data = NULL; } static gboolean _negotiated_caps (GstAggregator * agg, GstCaps * caps) { GstCompositor *compositor = GST_COMPOSITOR (agg); + GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg); GstVideoInfo v_info; guint n_threads; @@ -1041,8 +1029,12 @@ _negotiated_caps (GstAggregator * agg, GstCaps * caps) gst_parallelized_task_runner_free (compositor->blend_runner); compositor->blend_runner = NULL; } - if (!compositor->blend_runner) - compositor->blend_runner = gst_parallelized_task_runner_new (n_threads); + if (!compositor->blend_runner) { + GstTaskPool *pool = gst_video_aggregator_get_execution_task_pool (vagg); + compositor->blend_runner = + gst_parallelized_task_runner_new (n_threads, pool, FALSE); + gst_clear_object (&pool); + } return GST_AGGREGATOR_CLASS (parent_class)->negotiated_src_caps (agg, caps); } diff --git a/gst/compositor/compositor.h b/gst/compositor/compositor.h index 4067fb1db2..38e069315d 100644 --- a/gst/compositor/compositor.h +++ b/gst/compositor/compositor.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "blend.h" @@ -35,7 +36,7 @@ G_DECLARE_FINAL_TYPE (GstCompositor, gst_compositor, GST, COMPOSITOR, #define GST_TYPE_COMPOSITOR_PAD (gst_compositor_pad_get_type()) G_DECLARE_FINAL_TYPE (GstCompositorPad, gst_compositor_pad, GST, COMPOSITOR_PAD, - GstVideoAggregatorConvertPad) + GstVideoAggregatorParallelConvertPad) /** * GstCompositorBackground: @@ -80,28 +81,22 @@ typedef enum typedef void (*GstParallelizedTaskFunc) (gpointer user_data); typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner; -typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread; - -struct _GstParallelizedTaskThread -{ - GstParallelizedTaskRunner *runner; - guint idx; - GThread *thread; -}; struct _GstParallelizedTaskRunner { + GstTaskPool *pool; + gboolean own_pool; guint n_threads; - GstParallelizedTaskThread *threads; + GstQueueArray *tasks; GstParallelizedTaskFunc func; gpointer *task_data; GMutex lock; - GCond cond_todo, cond_done; - gint n_todo, n_done; - gboolean quit; + gint n_todo; + + gboolean async_tasks; }; /** @@ -139,7 +134,7 @@ struct _GstCompositor */ struct _GstCompositorPad { - GstVideoAggregatorConvertPad parent; + GstVideoAggregatorParallelConvertPad parent; /* properties */ gint xpos, ypos;