aggregator: Take the stream lock when iterating sink pads

When iterating sink pads to collect some data, we should take the stream lock so
we don't get stale data and possibly deadlock because of that. This fixes
a definitive deadlock in _wait_and_check() that manifests with high max
latencies in a live pipeline, and fixes other possible race conditions.
This commit is contained in:
Nirbheek Chauhan 2014-12-27 04:19:52 +05:30 committed by Sebastian Dröge
parent 882018e6dd
commit f3efb9e7d6

View file

@ -166,13 +166,17 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
g_thread_self()); \ g_thread_self()); \
} G_STMT_END } G_STMT_END
#define SRC_STREAM_BROADCAST(self) G_STMT_START { \ #define SRC_STREAM_BROADCAST_UNLOCKED(self) G_STMT_START { \
SRC_STREAM_LOCK (self); \
GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \ GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
g_thread_self()); \ g_thread_self()); \
if (self->priv->aggregate_id) \ if (self->priv->aggregate_id) \
gst_clock_id_unschedule (self->priv->aggregate_id); \ gst_clock_id_unschedule (self->priv->aggregate_id); \
g_cond_broadcast(&(self->priv->src_cond)); \ g_cond_broadcast(&(self->priv->src_cond)); \
} G_STMT_END
#define SRC_STREAM_BROADCAST(self) G_STMT_START { \
SRC_STREAM_LOCK (self); \
SRC_STREAM_BROADCAST_UNLOCKED (self); \
SRC_STREAM_UNLOCK (self); \ SRC_STREAM_UNLOCK (self); \
} G_STMT_END } G_STMT_END
@ -485,21 +489,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
{ {
GstClockTime latency_max, latency_min; GstClockTime latency_max, latency_min;
GstClockTime start; GstClockTime start;
gboolean live; gboolean live, res;
*timeout = FALSE; *timeout = FALSE;
SRC_STREAM_LOCK (self);
gst_aggregator_get_latency (self, &live, &latency_min, &latency_max); gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
if (gst_aggregator_iterate_sinkpads (self, if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
NULL)) { NULL)) {
GST_DEBUG_OBJECT (self, "all pads have data"); GST_DEBUG_OBJECT (self, "all pads have data");
SRC_STREAM_UNLOCK (self);
return TRUE; return TRUE;
} }
SRC_STREAM_LOCK (self);
/* Before waiting, check if we're actually still running */ /* Before waiting, check if we're actually still running */
if (!self->priv->running || !self->priv->send_eos) { if (!self->priv->running || !self->priv->send_eos) {
SRC_STREAM_UNLOCK (self); SRC_STREAM_UNLOCK (self);
@ -571,10 +577,12 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
return TRUE; return TRUE;
} }
} }
res = gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
SRC_STREAM_UNLOCK (self); SRC_STREAM_UNLOCK (self);
return gst_aggregator_iterate_sinkpads (self, return res;
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
} }
static void static void
@ -1074,8 +1082,10 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
data.live = FALSE; data.live = FALSE;
/* query upstream's latency */ /* query upstream's latency */
SRC_STREAM_LOCK (self);
gst_aggregator_iterate_sinkpads (self, gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _latency_query, &data); (GstAggregatorPadForeachFunc) _latency_query, &data);
SRC_STREAM_UNLOCK (self);
if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) && if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
self->latency > data.max) { self->latency > data.max) {
@ -1678,9 +1688,11 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
PAD_UNLOCK_EVENT (aggpad); PAD_UNLOCK_EVENT (aggpad);
PAD_STREAM_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad);
SRC_STREAM_LOCK (self);
if (gst_aggregator_iterate_sinkpads (self, if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
SRC_STREAM_BROADCAST (self); SRC_STREAM_BROADCAST_UNLOCKED (self);
SRC_STREAM_UNLOCK (self);
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");