From bbde713640a3e475a6c327f111fd3e15f729b169 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Sat, 27 Dec 2014 04:19:52 +0530 Subject: [PATCH] aggregator: Take the stream lock when iterating sink pads When iterating sink pads to collect some data, we should take the stream lock so we don't get stale data and possibly deadlock because of that. This fixes a definitive deadlock in _wait_and_check() that manifests with high max latencies in a live pipeline, and fixes other possible race conditions. --- libs/gst/base/gstaggregator.c | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 47f60de1d0..6a3796f004 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -166,13 +166,17 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); g_thread_self()); \ } G_STMT_END -#define SRC_STREAM_BROADCAST(self) G_STMT_START { \ - SRC_STREAM_LOCK (self); \ +#define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START { \ GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ g_thread_self()); \ if (self->priv->aggregate_id) \ gst_clock_id_unschedule (self->priv->aggregate_id); \ g_cond_broadcast(&(self->priv->src_cond)); \ + } G_STMT_END + +#define SRC_STREAM_BROADCAST(self) G_STMT_START { \ + SRC_STREAM_LOCK (self); \ + SRC_STREAM_BROADCAST_UNLOCKED (self); \ SRC_STREAM_UNLOCK (self); \ } G_STMT_END @@ -485,21 +489,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) { GstClockTime latency_max, latency_min; GstClockTime start; - gboolean live; + gboolean live, res; *timeout = FALSE; + SRC_STREAM_LOCK (self); + gst_aggregator_get_latency (self, &live, &latency_min, &latency_max); if (gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) { GST_DEBUG_OBJECT (self, "all pads have data"); + SRC_STREAM_UNLOCK (self); + return TRUE; } - SRC_STREAM_LOCK (self); - /* Before waiting, check if we're actually still running */ if (!self->priv->running || !self->priv->send_eos) { SRC_STREAM_UNLOCK (self); @@ -571,10 +577,12 @@ _wait_and_check (GstAggregator * self, gboolean * timeout) return TRUE; } } + + res = gst_aggregator_iterate_sinkpads (self, + (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL); SRC_STREAM_UNLOCK (self); - return gst_aggregator_iterate_sinkpads (self, - (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL); + return res; } static void @@ -1074,8 +1082,10 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) data.live = FALSE; /* query upstream's latency */ + SRC_STREAM_LOCK (self); gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _latency_query, &data); + SRC_STREAM_UNLOCK (self); if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) && self->latency > data.max) { @@ -1678,9 +1688,11 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) PAD_UNLOCK_EVENT (aggpad); PAD_STREAM_UNLOCK (aggpad); + SRC_STREAM_LOCK (self); if (gst_aggregator_iterate_sinkpads (self, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) - SRC_STREAM_BROADCAST (self); + SRC_STREAM_BROADCAST_UNLOCKED (self); + SRC_STREAM_UNLOCK (self); GST_DEBUG_OBJECT (aggpad, "Done chaining");