mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-04 14:38:48 +00:00
aggregator: Replace GMainContext with GAsyncQueue (v2)
The previous implementation kept accumulating GSources, slowing down the iteration and leaking memory. Instead of trying to fix the main context flushing, replace it with a GAsyncQueue which is simple to flush and has less overhead. https://bugzilla.gnome.org/show_bug.cgi?id=736782
This commit is contained in:
parent
33fbf7f16d
commit
dce92c75b1
1 changed files with 42 additions and 75 deletions
|
@ -170,34 +170,41 @@ _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
|
||||||
*************************************/
|
*************************************/
|
||||||
static GstElementClass *aggregator_parent_class = NULL;
|
static GstElementClass *aggregator_parent_class = NULL;
|
||||||
|
|
||||||
#define MAIN_CONTEXT_LOCK(self) G_STMT_START { \
|
#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
|
||||||
GST_LOG_OBJECT (self, "Getting MAIN_CONTEXT_LOCK in thread %p", \
|
|
||||||
|
#define QUEUE_PUSH(self) G_STMT_START { \
|
||||||
|
GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \
|
||||||
|
g_thread_self()); \
|
||||||
|
g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
|
||||||
|
} G_STMT_END
|
||||||
|
|
||||||
|
#define QUEUE_POP(self) G_STMT_START { \
|
||||||
|
GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
g_mutex_lock(&((GstAggregator*)self)->priv->mcontext_lock); \
|
g_async_queue_pop (AGGREGATOR_QUEUE (self)); \
|
||||||
GST_LOG_OBJECT (self, "Got MAIN_CONTEXT_LOCK in thread %p", \
|
GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define MAIN_CONTEXT_UNLOCK(self) G_STMT_START { \
|
#define QUEUE_FLUSH(self) G_STMT_START { \
|
||||||
g_mutex_unlock(&((GstAggregator*)self)->priv->mcontext_lock); \
|
GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \
|
||||||
GST_LOG_OBJECT (self, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \
|
g_thread_self()); \
|
||||||
g_thread_self()); \
|
g_async_queue_lock (AGGREGATOR_QUEUE (self)); \
|
||||||
|
while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \
|
||||||
|
g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \
|
||||||
|
GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \
|
||||||
|
g_thread_self()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
struct _GstAggregatorPrivate
|
struct _GstAggregatorPrivate
|
||||||
{
|
{
|
||||||
gint padcount;
|
gint padcount;
|
||||||
|
|
||||||
GMainContext *mcontext;
|
GAsyncQueue *queue;
|
||||||
|
|
||||||
/* Our state is >= PAUSED */
|
/* Our state is >= PAUSED */
|
||||||
gboolean running;
|
gboolean running;
|
||||||
|
|
||||||
/* Ensure that when we remove all sources from the maincontext
|
|
||||||
* we can not add any source, avoiding:
|
|
||||||
* "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
|
|
||||||
GMutex mcontext_lock;
|
|
||||||
GList *gsources;
|
|
||||||
|
|
||||||
gint seqnum;
|
gint seqnum;
|
||||||
gboolean send_stream_start;
|
gboolean send_stream_start;
|
||||||
|
@ -430,31 +437,20 @@ _push_eos (GstAggregator * self)
|
||||||
gst_pad_push_event (self->srcpad, event);
|
gst_pad_push_event (self->srcpad, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_destroy_gsource (GSource * source)
|
|
||||||
{
|
|
||||||
g_source_destroy (source);
|
|
||||||
g_source_unref (source);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
_remove_all_sources (GstAggregator * self)
|
|
||||||
{
|
|
||||||
GstAggregatorPrivate *priv = self->priv;
|
|
||||||
|
|
||||||
MAIN_CONTEXT_LOCK (self);
|
|
||||||
g_list_free_full (priv->gsources, (GDestroyNotify) _destroy_gsource);
|
|
||||||
priv->gsources = NULL;
|
|
||||||
MAIN_CONTEXT_UNLOCK (self);
|
|
||||||
}
|
|
||||||
|
|
||||||
static gboolean
|
|
||||||
aggregate_func (GstAggregator * self)
|
aggregate_func (GstAggregator * self)
|
||||||
{
|
{
|
||||||
GstAggregatorPrivate *priv = self->priv;
|
GstAggregatorPrivate *priv = self->priv;
|
||||||
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
|
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
|
||||||
|
|
||||||
|
if (self->priv->running == FALSE) {
|
||||||
|
GST_DEBUG_OBJECT (self, "Not running anymore");
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
QUEUE_POP (self);
|
||||||
|
|
||||||
GST_LOG_OBJECT (self, "Checking aggregate");
|
GST_LOG_OBJECT (self, "Checking aggregate");
|
||||||
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
|
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
|
||||||
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
|
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
|
||||||
|
@ -464,8 +460,7 @@ aggregate_func (GstAggregator * self)
|
||||||
priv->flow_return = klass->aggregate (self);
|
priv->flow_return = klass->aggregate (self);
|
||||||
|
|
||||||
if (priv->flow_return == GST_FLOW_EOS) {
|
if (priv->flow_return == GST_FLOW_EOS) {
|
||||||
g_main_context_wakeup (self->priv->mcontext);
|
QUEUE_FLUSH (self);
|
||||||
_remove_all_sources (self);
|
|
||||||
_push_eos (self);
|
_push_eos (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,19 +475,6 @@ aggregate_func (GstAggregator * self)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return G_SOURCE_REMOVE;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
iterate_main_context_func (GstAggregator * self)
|
|
||||||
{
|
|
||||||
if (self->priv->running == FALSE) {
|
|
||||||
GST_DEBUG_OBJECT (self, "Not running anymore");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
g_main_context_iteration (self->priv->mcontext, TRUE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
@ -523,15 +505,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
|
||||||
flush_start ? "Pausing" : "Stopping");
|
flush_start ? "Pausing" : "Stopping");
|
||||||
|
|
||||||
self->priv->running = FALSE;
|
self->priv->running = FALSE;
|
||||||
|
QUEUE_PUSH (self);
|
||||||
|
|
||||||
/* Clean the stack of GSource set on the MainContext */
|
|
||||||
g_main_context_wakeup (self->priv->mcontext);
|
|
||||||
_remove_all_sources (self);
|
|
||||||
if (flush_start) {
|
if (flush_start) {
|
||||||
res = gst_pad_push_event (self->srcpad, flush_start);
|
res = gst_pad_push_event (self->srcpad, flush_start);
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_pad_stop_task (self->srcpad);
|
gst_pad_stop_task (self->srcpad);
|
||||||
|
QUEUE_FLUSH (self);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -543,21 +524,7 @@ _start_srcpad_task (GstAggregator * self)
|
||||||
|
|
||||||
self->priv->running = TRUE;
|
self->priv->running = TRUE;
|
||||||
gst_pad_start_task (GST_PAD (self->srcpad),
|
gst_pad_start_task (GST_PAD (self->srcpad),
|
||||||
(GstTaskFunction) iterate_main_context_func, self, NULL);
|
(GstTaskFunction) aggregate_func, self, NULL);
|
||||||
}
|
|
||||||
|
|
||||||
static inline void
|
|
||||||
_add_aggregate_gsource (GstAggregator * self)
|
|
||||||
{
|
|
||||||
GSource *source;
|
|
||||||
GstAggregatorPrivate *priv = self->priv;
|
|
||||||
|
|
||||||
MAIN_CONTEXT_LOCK (self);
|
|
||||||
source = g_idle_source_new ();
|
|
||||||
g_source_set_callback (source, (GSourceFunc) aggregate_func, self, NULL);
|
|
||||||
priv->gsources = g_list_prepend (priv->gsources, source);
|
|
||||||
g_source_attach (source, priv->mcontext);
|
|
||||||
MAIN_CONTEXT_UNLOCK (self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
|
@ -672,7 +639,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
|
||||||
gst_pad_push_event (self->srcpad, event);
|
gst_pad_push_event (self->srcpad, event);
|
||||||
priv->send_eos = TRUE;
|
priv->send_eos = TRUE;
|
||||||
event = NULL;
|
event = NULL;
|
||||||
_add_aggregate_gsource (self);
|
QUEUE_PUSH (self);
|
||||||
|
|
||||||
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
|
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
|
||||||
GST_PAD_STREAM_UNLOCK (self->srcpad);
|
GST_PAD_STREAM_UNLOCK (self->srcpad);
|
||||||
|
@ -700,7 +667,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
|
||||||
}
|
}
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK_EVENT (aggpad);
|
||||||
|
|
||||||
_add_aggregate_gsource (self);
|
QUEUE_PUSH (self);
|
||||||
goto eat;
|
goto eat;
|
||||||
}
|
}
|
||||||
case GST_EVENT_SEGMENT:
|
case GST_EVENT_SEGMENT:
|
||||||
|
@ -825,7 +792,7 @@ _release_pad (GstElement * element, GstPad * pad)
|
||||||
gst_element_remove_pad (element, pad);
|
gst_element_remove_pad (element, pad);
|
||||||
|
|
||||||
/* Something changed make sure we try to aggregate */
|
/* Something changed make sure we try to aggregate */
|
||||||
_add_aggregate_gsource (self);
|
QUEUE_PUSH (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstPad *
|
static GstPad *
|
||||||
|
@ -1160,7 +1127,6 @@ gst_aggregator_finalize (GObject * object)
|
||||||
{
|
{
|
||||||
GstAggregator *self = (GstAggregator *) object;
|
GstAggregator *self = (GstAggregator *) object;
|
||||||
|
|
||||||
g_mutex_clear (&self->priv->mcontext_lock);
|
|
||||||
g_mutex_clear (&self->priv->setcaps_lock);
|
g_mutex_clear (&self->priv->setcaps_lock);
|
||||||
|
|
||||||
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
|
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
|
||||||
|
@ -1173,8 +1139,10 @@ gst_aggregator_dispose (GObject * object)
|
||||||
|
|
||||||
G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
|
G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
|
||||||
|
|
||||||
g_main_context_unref (self->priv->mcontext);
|
if (AGGREGATOR_QUEUE (self)) {
|
||||||
_remove_all_sources (self);
|
g_async_queue_unref (AGGREGATOR_QUEUE (self));
|
||||||
|
AGGREGATOR_QUEUE (self) = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* GObject vmethods implementations */
|
/* GObject vmethods implementations */
|
||||||
|
@ -1231,7 +1199,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
|
||||||
priv->tags_changed = FALSE;
|
priv->tags_changed = FALSE;
|
||||||
_reset_flow_values (self);
|
_reset_flow_values (self);
|
||||||
|
|
||||||
priv->mcontext = g_main_context_new ();
|
AGGREGATOR_QUEUE (self) = g_async_queue_new ();
|
||||||
self->srcpad = gst_pad_new_from_template (pad_template, "src");
|
self->srcpad = gst_pad_new_from_template (pad_template, "src");
|
||||||
|
|
||||||
gst_pad_set_event_function (self->srcpad,
|
gst_pad_set_event_function (self->srcpad,
|
||||||
|
@ -1243,7 +1211,6 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
|
||||||
|
|
||||||
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
|
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
|
||||||
|
|
||||||
g_mutex_init (&self->priv->mcontext_lock);
|
|
||||||
g_mutex_init (&self->priv->setcaps_lock);
|
g_mutex_init (&self->priv->setcaps_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1314,7 +1281,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK_EVENT (aggpad);
|
||||||
PAD_STREAM_UNLOCK (aggpad);
|
PAD_STREAM_UNLOCK (aggpad);
|
||||||
|
|
||||||
_add_aggregate_gsource (self);
|
QUEUE_PUSH (self);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (aggpad, "Done chaining");
|
GST_DEBUG_OBJECT (aggpad, "Done chaining");
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue