aggregator: Don't count the number of times we need to wake up but instead check all conditions for waiting again

This simplifies the code and also makes sure that we don't forget to check all
conditions for waiting.

Also fix a potential deadlock caused by not checking if we're actually still
running before starting to wait.
This commit is contained in:
Sebastian Dröge 2014-12-22 15:00:36 +01:00 committed by Tim-Philipp Müller
parent b1dcc7efb0
commit a20863a7ec

View file

@ -167,19 +167,12 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
} G_STMT_END } G_STMT_END
#define SRC_STREAM_BROADCAST(self) G_STMT_START { \ #define SRC_STREAM_BROADCAST(self) G_STMT_START { \
GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
g_thread_self()); \
g_cond_broadcast(&(self->priv->src_cond)); \
} G_STMT_END
#define KICK_SRC_THREAD(self) G_STMT_START { \
SRC_STREAM_LOCK (self); \ SRC_STREAM_LOCK (self); \
GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \ GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
g_thread_self ()); \ g_thread_self()); \
if (self->priv->aggregate_id) \ if (self->priv->aggregate_id) \
gst_clock_id_unschedule (self->priv->aggregate_id); \ gst_clock_id_unschedule (self->priv->aggregate_id); \
self->priv->n_kicks++; \ g_cond_broadcast(&(self->priv->src_cond)); \
SRC_STREAM_BROADCAST (self); \
SRC_STREAM_UNLOCK (self); \ SRC_STREAM_UNLOCK (self); \
} G_STMT_END } G_STMT_END
@ -248,7 +241,6 @@ struct _GstAggregatorPrivate
/* aggregate */ /* aggregate */
GstClockID aggregate_id; GstClockID aggregate_id;
gint n_kicks;
GMutex src_lock; GMutex src_lock;
GCond src_cond; GCond src_cond;
}; };
@ -507,13 +499,23 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
} }
SRC_STREAM_LOCK (self); SRC_STREAM_LOCK (self);
/* Before waiting, check if we're actually still running */
if (!self->priv->running || !self->priv->send_eos) {
SRC_STREAM_UNLOCK (self);
return FALSE;
}
start = gst_aggregator_get_next_time (self); start = gst_aggregator_get_next_time (self);
if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
|| !GST_CLOCK_TIME_IS_VALID (start)) { || !GST_CLOCK_TIME_IS_VALID (start)) {
while (self->priv->n_kicks <= 0) /* We wake up here when something happened, and below
SRC_STREAM_WAIT (self); * then check if we're ready now. If we return FALSE,
self->priv->n_kicks--; * we will be directly called again.
*/
SRC_STREAM_WAIT (self);
} else { } else {
GstClockTime base_time, time; GstClockTime base_time, time;
GstClock *clock; GstClock *clock;
@ -559,7 +561,6 @@ _wait_and_check (GstAggregator * self, gboolean * timeout)
gst_clock_id_unref (self->priv->aggregate_id); gst_clock_id_unref (self->priv->aggregate_id);
self->priv->aggregate_id = NULL; self->priv->aggregate_id = NULL;
} }
self->priv->n_kicks--;
GST_DEBUG_OBJECT (self, "clock returned %d", status); GST_DEBUG_OBJECT (self, "clock returned %d", status);
@ -641,14 +642,14 @@ _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
flush_start ? "Pausing" : "Stopping"); flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE; self->priv->running = FALSE;
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
if (flush_start) { if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start); res = gst_pad_push_event (self->srcpad, flush_start);
} }
gst_pad_stop_task (self->srcpad); gst_pad_stop_task (self->srcpad);
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
return res; return res;
} }
@ -659,7 +660,6 @@ _start_srcpad_task (GstAggregator * self)
GST_INFO_OBJECT (self, "Starting srcpad task"); GST_INFO_OBJECT (self, "Starting srcpad task");
self->priv->running = TRUE; self->priv->running = TRUE;
self->priv->n_kicks = 0;
gst_pad_start_task (GST_PAD (self->srcpad), gst_pad_start_task (GST_PAD (self->srcpad),
(GstTaskFunction) aggregate_func, self, NULL); (GstTaskFunction) aggregate_func, self, NULL);
} }
@ -776,7 +776,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
gst_pad_push_event (self->srcpad, event); gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE; priv->send_eos = TRUE;
event = NULL; event = NULL;
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad); GST_PAD_STREAM_UNLOCK (self->srcpad);
@ -804,7 +804,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
} }
PAD_UNLOCK_EVENT (aggpad); PAD_UNLOCK_EVENT (aggpad);
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
goto eat; goto eat;
} }
case GST_EVENT_SEGMENT: case GST_EVENT_SEGMENT:
@ -928,7 +928,7 @@ _release_pad (GstElement * element, GstPad * pad)
gst_buffer_replace (&tmpbuf, NULL); gst_buffer_replace (&tmpbuf, NULL);
gst_element_remove_pad (element, pad); gst_element_remove_pad (element, pad);
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
} }
static GstPad * static GstPad *
@ -1655,7 +1655,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (gst_aggregator_iterate_sinkpads (self, if (gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL)) (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
KICK_SRC_THREAD (self); SRC_STREAM_BROADCAST (self);
GST_DEBUG_OBJECT (aggpad, "Done chaining"); GST_DEBUG_OBJECT (aggpad, "Done chaining");