aggregator: Add a streaming lock so to secure flush start action

Without a lock that is taken in FLUSH_START we had a rare race where we
end up aggregating a buffer that was before the whole FLUSH_START/STOP
dance. That could lead to very wrong behaviour in subclasses.
This commit is contained in:
Thibault Saunier 2014-08-02 18:25:01 +02:00
parent 65d20d1028
commit 982b9dc3ea

View file

@ -122,6 +122,22 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
g_thread_self()); \ g_thread_self()); \
} G_STMT_END } G_STMT_END
#define PAD_STREAM_LOCK(pad) G_STMT_START { \
GST_LOG_OBJECT (pad, "Taking lock from thread %p", \
g_thread_self()); \
g_mutex_lock(&pad->priv->stream_lock); \
GST_LOG_OBJECT (pad, "Took lock from thread %p", \
g_thread_self()); \
} G_STMT_END
#define PAD_STREAM_UNLOCK(pad) G_STMT_START { \
GST_LOG_OBJECT (pad, "Releasing lock from thread %p", \
g_thread_self()); \
g_mutex_unlock(&pad->priv->stream_lock); \
GST_LOG_OBJECT (pad, "Release lock from thread %p", \
g_thread_self()); \
} G_STMT_END
struct _GstAggregatorPadPrivate struct _GstAggregatorPadPrivate
{ {
gboolean pending_flush_start; gboolean pending_flush_start;
@ -131,6 +147,8 @@ struct _GstAggregatorPadPrivate
GMutex event_lock; GMutex event_lock;
GCond event_cond; GCond event_cond;
GMutex stream_lock;
}; };
static gboolean static gboolean
@ -581,24 +599,18 @@ _all_flush_stop_received (GstAggregator * self)
return TRUE; return TRUE;
} }
/* GstAggregator vmethods default implementations */ static void
static gboolean _flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
_sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
{
gboolean res = TRUE;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{ {
GstBuffer *tmpbuf; GstBuffer *tmpbuf;
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPadPrivate *padpriv = aggpad->priv;
g_atomic_int_set (&aggpad->priv->flushing, TRUE); g_atomic_int_set (&aggpad->priv->flushing, TRUE);
/* Remove pad buffer and wake up the streaming thread */ /* Remove pad buffer and wake up the streaming thread */
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL); gst_buffer_replace (&tmpbuf, NULL);
PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
TRUE, FALSE) == TRUE) { TRUE, FALSE) == TRUE) {
GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
@ -610,7 +622,7 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start, if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
TRUE, FALSE) == TRUE) { TRUE, FALSE) == TRUE) {
GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task"); GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
_stop_srcpad_task (self, event); _stop_srcpad_task (self, event);
priv->flow_return = GST_FLOW_OK; priv->flow_return = GST_FLOW_OK;
@ -618,11 +630,30 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
GST_PAD_STREAM_LOCK (self->srcpad); GST_PAD_STREAM_LOCK (self->srcpad);
GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
event = NULL; event = NULL;
goto eat;
} }
} else {
gst_event_unref (event);
}
PAD_STREAM_UNLOCK (aggpad);
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL);
} }
/* GstAggregator vmethods default implementations */
static gboolean
_sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
{
gboolean res = TRUE;
GstPad *pad = GST_PAD (aggpad);
GstAggregatorPrivate *priv = self->priv;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
_flush_start (self, aggpad, event);
/* We forward only in one case: right after flush_seeking */ /* We forward only in one case: right after flush_seeking */
event = NULL;
goto eat; goto eat;
} }
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
@ -1258,6 +1289,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing; goto flushing;
@ -1274,7 +1306,6 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing; goto flushing;
if (aggclass->clip) { if (aggclass->clip) {
aggclass->clip (self, aggpad, buffer, &actual_buf); aggclass->clip (self, aggpad, buffer, &actual_buf);
} }
@ -1284,6 +1315,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
gst_buffer_unref (aggpad->buffer); gst_buffer_unref (aggpad->buffer);
aggpad->buffer = actual_buf; aggpad->buffer = actual_buf;
PAD_UNLOCK_EVENT (aggpad); PAD_UNLOCK_EVENT (aggpad);
PAD_STREAM_UNLOCK (aggpad);
_add_aggregate_gsource (self); _add_aggregate_gsource (self);
@ -1292,6 +1324,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
return priv->flow_return; return priv->flow_return;
flushing: flushing:
PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (aggpad, "We are flushing"); GST_DEBUG_OBJECT (aggpad, "We are flushing");
@ -1299,6 +1332,7 @@ flushing:
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
eos: eos:
PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "We are EOS already..."); GST_DEBUG_OBJECT (pad, "We are EOS already...");
@ -1374,6 +1408,7 @@ gst_aggregator_pad_finalize (GObject * object)
g_mutex_clear (&pad->priv->event_lock); g_mutex_clear (&pad->priv->event_lock);
g_cond_clear (&pad->priv->event_cond); g_cond_clear (&pad->priv->event_cond);
g_mutex_clear (&pad->priv->stream_lock);
G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object); G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
} }
@ -1415,6 +1450,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
g_mutex_init (&pad->priv->event_lock); g_mutex_init (&pad->priv->event_lock);
g_cond_init (&pad->priv->event_cond); g_cond_init (&pad->priv->event_cond);
g_mutex_init (&pad->priv->stream_lock);
} }
/** /**