aggregator: Add a streaming lock so to secure flush start action

Without a lock that is taken in FLUSH_START we had a rare race where we
end up aggregating a buffer that was before the whole FLUSH_START/STOP
dance. That could lead to very wrong behaviour in subclasses.
This commit is contained in:
Thibault Saunier 2014-08-02 18:25:01 +02:00 committed by Tim-Philipp Müller
parent d5a3056ef2
commit 1efd6b2edd

View file

@ -122,6 +122,22 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
g_thread_self()); \
} G_STMT_END
#define PAD_STREAM_LOCK(pad) G_STMT_START { \
GST_LOG_OBJECT (pad, "Taking lock from thread %p", \
g_thread_self()); \
g_mutex_lock(&pad->priv->stream_lock); \
GST_LOG_OBJECT (pad, "Took lock from thread %p", \
g_thread_self()); \
} G_STMT_END
#define PAD_STREAM_UNLOCK(pad) G_STMT_START { \
GST_LOG_OBJECT (pad, "Releasing lock from thread %p", \
g_thread_self()); \
g_mutex_unlock(&pad->priv->stream_lock); \
GST_LOG_OBJECT (pad, "Release lock from thread %p", \
g_thread_self()); \
} G_STMT_END
struct _GstAggregatorPadPrivate
{
gboolean pending_flush_start;
@ -131,6 +147,8 @@ struct _GstAggregatorPadPrivate
GMutex event_lock;
GCond event_cond;
GMutex stream_lock;
};
static gboolean
@ -581,6 +599,47 @@ _all_flush_stop_received (GstAggregator * self)
return TRUE;
}
static void
_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
{
GstBuffer *tmpbuf;
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
g_atomic_int_set (&aggpad->priv->flushing, TRUE);
/* Remove pad buffer and wake up the streaming thread */
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL);
PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
TRUE, FALSE) == TRUE) {
GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
}
if (g_atomic_int_get (&priv->flush_seeking)) {
/* If flush_seeking we forward the first FLUSH_START */
if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
TRUE, FALSE) == TRUE) {
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
_stop_srcpad_task (self, event);
priv->flow_return = GST_FLOW_OK;
GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
GST_PAD_STREAM_LOCK (self->srcpad);
GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
event = NULL;
}
} else {
gst_event_unref (event);
}
PAD_STREAM_UNLOCK (aggpad);
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL);
}
/* GstAggregator vmethods default implementations */
static gboolean
_sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
@ -588,41 +647,13 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
gboolean res = TRUE;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
GstBuffer *tmpbuf;
g_atomic_int_set (&aggpad->priv->flushing, TRUE);
/* Remove pad buffer and wake up the streaming thread */
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL);
if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
TRUE, FALSE) == TRUE) {
GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
}
if (g_atomic_int_get (&priv->flush_seeking)) {
/* If flush_seeking we forward the first FLUSH_START */
if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
TRUE, FALSE) == TRUE) {
GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task");
_stop_srcpad_task (self, event);
priv->flow_return = GST_FLOW_OK;
GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
GST_PAD_STREAM_LOCK (self->srcpad);
GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
event = NULL;
goto eat;
}
}
_flush_start (self, aggpad, event);
/* We forward only in one case: right after flush_seeking */
event = NULL;
goto eat;
}
case GST_EVENT_FLUSH_STOP:
@ -1258,6 +1289,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
@ -1274,7 +1306,6 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
if (aggclass->clip) {
aggclass->clip (self, aggpad, buffer, &actual_buf);
}
@ -1284,6 +1315,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
gst_buffer_unref (aggpad->buffer);
aggpad->buffer = actual_buf;
PAD_UNLOCK_EVENT (aggpad);
PAD_STREAM_UNLOCK (aggpad);
_add_aggregate_gsource (self);
@ -1292,6 +1324,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
return priv->flow_return;
flushing:
PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "We are flushing");
@ -1299,6 +1332,7 @@ flushing:
return GST_FLOW_FLUSHING;
eos:
PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "We are EOS already...");
@ -1374,6 +1408,7 @@ gst_aggregator_pad_finalize (GObject * object)
g_mutex_clear (&pad->priv->event_lock);
g_cond_clear (&pad->priv->event_cond);
g_mutex_clear (&pad->priv->stream_lock);
G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
}
@ -1415,6 +1450,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
g_mutex_init (&pad->priv->event_lock);
g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->stream_lock);
}
/**