From e8c34afd5ff6d224f5198ad53c16f116e36aa6df Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 17 Sep 2014 16:48:02 +0200 Subject: [PATCH] aggregator: Replace GMainContext with GAsyncQueue (v2) The previous implementation kept accumulating GSources, slowing down the iteration and leaking memory. Instead of trying to fix the main context flushing, replace it with a GAsyncQueue which is simple to flush and has less overhead. https://bugzilla.gnome.org/show_bug.cgi?id=736782 --- libs/gst/base/gstaggregator.c | 117 ++++++++++++---------------------- 1 file changed, 42 insertions(+), 75 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 55ac8689f7..dce7180737 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -170,34 +170,41 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) *************************************/ static GstElementClass *aggregator_parent_class = NULL; -#define MAIN_CONTEXT_LOCK(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p", \ +#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue) + +#define QUEUE_PUSH(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \ + g_thread_self()); \ + g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \ +} G_STMT_END + +#define QUEUE_POP(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \ g_thread_self()); \ - g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock); \ - GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p", \ + g_async_queue_pop (AGGREGATOR_QUEUE (self)); \ + GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \ g_thread_self()); \ } G_STMT_END -#define MAIN_CONTEXT_UNLOCK(self) G_STMT_START { \ - g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock); \ - GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \ - g_thread_self()); \ +#define QUEUE_FLUSH(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \ + g_thread_self()); \ + g_async_queue_lock (AGGREGATOR_QUEUE (self)); \ + while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \ + g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \ + GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \ + g_thread_self()); \ } G_STMT_END struct _GstAggregatorPrivate { gint padcount; - GMainContext *mcontext; + GAsyncQueue *queue; /* Our state is >= PAUSED */ gboolean running; - /* Ensure that when we remove all sources from the maincontext - * we can not add any source, avoiding: - * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */ - GMutex mcontext_lock; - GList *gsources; gint seqnum; gboolean send_stream_start; @@ -430,31 +437,20 @@ _push_eos (GstAggregator * self) gst_pad_push_event (self->srcpad, event); } - static void -_destroy_gsource (GSource * source) -{ - g_source_destroy (source); - g_source_unref (source); -} - -static void -_remove_all_sources (GstAggregator * self) -{ - GstAggregatorPrivate *priv = self->priv; - - MAIN_CONTEXT_LOCK (self); - g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource); - priv->gsources = NULL; - MAIN_CONTEXT_UNLOCK (self); -} - -static gboolean aggregate_func (GstAggregator * self) { GstAggregatorPrivate *priv = self->priv; GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + if (self->priv->running == FALSE) { + GST_DEBUG_OBJECT (self, "Not running anymore"); + + return; + } + + QUEUE_POP (self); + GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, @@ -464,8 +460,7 @@ aggregate_func (GstAggregator * self) priv->flow_return = klass->aggregate (self); if (priv->flow_return == GST_FLOW_EOS) { - g_main_context_wakeup (self->priv->mcontext); - _remove_all_sources (self); + QUEUE_FLUSH (self); _push_eos (self); } @@ -480,19 +475,6 @@ aggregate_func (GstAggregator * self) break; } - return G_SOURCE_REMOVE; -} - -static void -iterate_main_context_func (GstAggregator * self) -{ - if (self->priv->running == FALSE) { - GST_DEBUG_OBJECT (self, "Not running anymore"); - - return; - } - - g_main_context_iteration (self->priv->mcontext, TRUE); } static gboolean @@ -523,15 +505,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) flush_start ? "Pausing" : "Stopping"); self->priv->running = FALSE; + QUEUE_PUSH (self); - /* Clean the stack of GSource set on the MainContext */ - g_main_context_wakeup (self->priv->mcontext); - _remove_all_sources (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); } gst_pad_stop_task (self->srcpad); + QUEUE_FLUSH (self); return res; } @@ -543,21 +524,7 @@ _start_srcpad_task (GstAggregator * self) self->priv->running = TRUE; gst_pad_start_task (GST_PAD (self->srcpad), - (GstTaskFunction) iterate_main_context_func, self, NULL); -} - -static inline void -_add_aggregate_gsource (GstAggregator * self) -{ - GSource *source; - GstAggregatorPrivate *priv = self->priv; - - MAIN_CONTEXT_LOCK (self); - source = g_idle_source_new (); - g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL); - priv->gsources = g_list_prepend (priv->gsources, source); - g_source_attach (source, priv->mcontext); - MAIN_CONTEXT_UNLOCK (self); + (GstTaskFunction) aggregate_func, self, NULL); } static GstFlowReturn @@ -672,7 +639,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gst_pad_push_event (self->srcpad, event); priv->send_eos = TRUE; event = NULL; - _add_aggregate_gsource (self); + QUEUE_PUSH (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -700,7 +667,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) } PAD_UNLOCK_EVENT (aggpad); - _add_aggregate_gsource (self); + QUEUE_PUSH (self); goto eat; } case GST_EVENT_SEGMENT: @@ -825,7 +792,7 @@ _release_pad (GstElement * element, GstPad * pad) gst_element_remove_pad (element, pad); /* Something changed make sure we try to aggregate */ - _add_aggregate_gsource (self); + QUEUE_PUSH (self); } static GstPad * @@ -1160,7 +1127,6 @@ gst_aggregator_finalize (GObject * object) { GstAggregator *self = (GstAggregator *) object; - g_mutex_clear (&self->priv->mcontext_lock); g_mutex_clear (&self->priv->setcaps_lock); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); @@ -1173,8 +1139,10 @@ gst_aggregator_dispose (GObject * object) G_OBJECT_CLASS (aggregator_parent_class)->dispose (object); - g_main_context_unref (self->priv->mcontext); - _remove_all_sources (self); + if (AGGREGATOR_QUEUE (self)) { + g_async_queue_unref (AGGREGATOR_QUEUE (self)); + AGGREGATOR_QUEUE (self) = NULL; + } } /* GObject vmethods implementations */ @@ -1231,7 +1199,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) priv->tags_changed = FALSE; _reset_flow_values (self); - priv->mcontext = g_main_context_new (); + AGGREGATOR_QUEUE (self) = g_async_queue_new (); self->srcpad = gst_pad_new_from_template (pad_template, "src"); gst_pad_set_event_function (self->srcpad, @@ -1243,7 +1211,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); - g_mutex_init (&self->priv->mcontext_lock); g_mutex_init (&self->priv->setcaps_lock); } @@ -1314,7 +1281,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) PAD_UNLOCK_EVENT (aggpad); PAD_STREAM_UNLOCK (aggpad); - _add_aggregate_gsource (self); + QUEUE_PUSH (self); GST_DEBUG_OBJECT (aggpad, "Done chaining");