From 341e5291c3c1a40a3e209c78a47078fc4410b600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 22 Dec 2014 15:00:36 +0100 Subject: [PATCH] aggregator: Don't count the number of times we need to wake up but instead check all conditions for waiting again This simplifies the code and also makes sure that we don't forget to check all conditions for waiting. Also fix a potential deadlock caused by not checking if we're actually still running before starting to wait. --- gst-libs/gst/base/gstaggregator.c | 44 +++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index 7e5b70e36c..9c2e702dfc 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -167,19 +167,12 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); } G_STMT_END #define SRC_STREAM_BROADCAST(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ - g_thread_self()); \ - g_cond_broadcast(&(self->priv->src_cond)); \ - } G_STMT_END - -#define KICK_SRC_THREAD(self) G_STMT_START { \ SRC_STREAM_LOCK (self); \ - GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \ - g_thread_self ()); \ + GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ + g_thread_self()); \ if (self->priv->aggregate_id) \ gst_clock_id_unschedule (self->priv->aggregate_id); \ - self->priv->n_kicks++; \ - SRC_STREAM_BROADCAST (self); \ + g_cond_broadcast(&(self->priv->src_cond)); \ SRC_STREAM_UNLOCK (self); \ } G_STMT_END @@ -248,7 +241,6 @@ struct _GstAggregatorPrivate /* aggregate */ GstClockID aggregate_id; - gint n_kicks; GMutex src_lock; GCond src_cond; }; @@ -507,13 +499,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) } SRC_STREAM_LOCK (self); + + /* Before waiting, check if we're actually still running */ + if (!self->priv->running || !self->priv->send_eos) { + SRC_STREAM_UNLOCK (self); + + return FALSE; + } + start = gst_aggregator_get_next_time (self); if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || !GST_CLOCK_TIME_IS_VALID (start)) { - while (self->priv->n_kicks <= 0) - SRC_STREAM_WAIT (self); - self->priv->n_kicks--; + /* We wake up here when something happened, and below + * then check if we're ready now. If we return FALSE, + * we will be directly called again. + */ + SRC_STREAM_WAIT (self); } else { GstClockTime base_time, time; GstClock *clock; @@ -559,7 +561,6 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) gst_clock_id_unref (self->priv->aggregate_id); self->priv->aggregate_id = NULL; } - self->priv->n_kicks--; GST_DEBUG_OBJECT (self, "clock returned %d", status); @@ -641,14 +642,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) flush_start ? "Pausing" : "Stopping"); self->priv->running = FALSE; - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); } gst_pad_stop_task (self->srcpad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); return res; } @@ -659,7 +660,6 @@ _start_srcpad_task (GstAggregator * self) GST_INFO_OBJECT (self, "Starting srcpad task"); self->priv->running = TRUE; - self->priv->n_kicks = 0; gst_pad_start_task (GST_PAD (self->srcpad), (GstTaskFunction) aggregate_func, self, NULL); } @@ -776,7 +776,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gst_pad_push_event (self->srcpad, event); priv->send_eos = TRUE; event = NULL; - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -804,7 +804,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) } PAD_UNLOCK_EVENT (aggpad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); goto eat; } case GST_EVENT_SEGMENT: @@ -928,7 +928,7 @@ _release_pad (GstElement * element, GstPad * pad) gst_buffer_replace (&tmpbuf, NULL); gst_element_remove_pad (element, pad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); } static GstPad * @@ -1655,7 +1655,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); GST_DEBUG_OBJECT (aggpad, "Done chaining");