diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 7e5b70e36c..9c2e702dfc 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -167,19 +167,12 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); } G_STMT_END #define SRC_STREAM_BROADCAST(self) G_STMT_START { \ - GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ - g_thread_self()); \ - g_cond_broadcast(&(self->priv->src_cond)); \ - } G_STMT_END - -#define KICK_SRC_THREAD(self) G_STMT_START { \ SRC_STREAM_LOCK (self); \ - GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \ - g_thread_self ()); \ + GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ + g_thread_self()); \ if (self->priv->aggregate_id) \ gst_clock_id_unschedule (self->priv->aggregate_id); \ - self->priv->n_kicks++; \ - SRC_STREAM_BROADCAST (self); \ + g_cond_broadcast(&(self->priv->src_cond)); \ SRC_STREAM_UNLOCK (self); \ } G_STMT_END @@ -248,7 +241,6 @@ struct _GstAggregatorPrivate /* aggregate */ GstClockID aggregate_id; - gint n_kicks; GMutex src_lock; GCond src_cond; }; @@ -507,13 +499,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) } SRC_STREAM_LOCK (self); + + /* Before waiting, check if we're actually still running */ + if (!self->priv->running || !self->priv->send_eos) { + SRC_STREAM_UNLOCK (self); + + return FALSE; + } + start = gst_aggregator_get_next_time (self); if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || !GST_CLOCK_TIME_IS_VALID (start)) { - while (self->priv->n_kicks <= 0) - SRC_STREAM_WAIT (self); - self->priv->n_kicks--; + /* We wake up here when something happened, and below + * then check if we're ready now. If we return FALSE, + * we will be directly called again. + */ + SRC_STREAM_WAIT (self); } else { GstClockTime base_time, time; GstClock *clock; @@ -559,7 +561,6 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) gst_clock_id_unref (self->priv->aggregate_id); self->priv->aggregate_id = NULL; } - self->priv->n_kicks--; GST_DEBUG_OBJECT (self, "clock returned %d", status); @@ -641,14 +642,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) flush_start ? "Pausing" : "Stopping"); self->priv->running = FALSE; - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); if (flush_start) { res = gst_pad_push_event (self->srcpad, flush_start); } gst_pad_stop_task (self->srcpad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); return res; } @@ -659,7 +660,6 @@ _start_srcpad_task (GstAggregator * self) GST_INFO_OBJECT (self, "Starting srcpad task"); self->priv->running = TRUE; - self->priv->n_kicks = 0; gst_pad_start_task (GST_PAD (self->srcpad), (GstTaskFunction) aggregate_func, self, NULL); } @@ -776,7 +776,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gst_pad_push_event (self->srcpad, event); priv->send_eos = TRUE; event = NULL; - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_PAD_STREAM_UNLOCK (self->srcpad); @@ -804,7 +804,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) } PAD_UNLOCK_EVENT (aggpad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); goto eat; } case GST_EVENT_SEGMENT: @@ -928,7 +928,7 @@ _release_pad (GstElement * element, GstPad * pad) gst_buffer_replace (&tmpbuf, NULL); gst_element_remove_pad (element, pad); - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); } static GstPad * @@ -1655,7 +1655,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) - KICK_SRC_THREAD (self); + SRC_STREAM_BROADCAST (self); GST_DEBUG_OBJECT (aggpad, "Done chaining");